aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLalit Maganti <lalitm@google.com>2024-05-07 18:37:46 +0100
committerLalit Maganti <lalitm@google.com>2024-05-07 18:37:46 +0100
commit5813a113008dd8ffb3972c845a1eaae547e2adc7 (patch)
treed9f3b7182508f3b9f38f113312c2bc5461bc5fca
parent844f8a84b5f18be38c25c8d13f8af3a50ae3d066 (diff)
downloadperfetto-5813a113008dd8ffb3972c845a1eaae547e2adc7.tar.gz
tp: switch away from having a global connection in RPC code
This allows multiple connection to refer to multiple codepaths Change-Id: I895d64027e87dd0428c057da514e7b219398d54d
-rw-r--r--src/trace_processor/rpc/httpd.cc38
-rw-r--r--src/trace_processor/rpc/rpc.cc3
-rw-r--r--src/trace_processor/rpc/rpc.h8
-rw-r--r--src/trace_processor/rpc/wasm_bridge.cc8
-rw-r--r--ui/src/frontend/rpc_http_dialog.ts2
5 files changed, 32 insertions, 27 deletions
diff --git a/src/trace_processor/rpc/httpd.cc b/src/trace_processor/rpc/httpd.cc
index e63bdb499..21ca077ae 100644
--- a/src/trace_processor/rpc/httpd.cc
+++ b/src/trace_processor/rpc/httpd.cc
@@ -68,28 +68,28 @@ class Httpd : public base::HttpRequestHandler {
base::HttpServer http_srv_;
};
-base::HttpServerConnection* g_cur_conn;
-
base::StringView Vec2Sv(const std::vector<uint8_t>& v) {
return {reinterpret_cast<const char*>(v.data()), v.size()};
}
// Used both by websockets and /rpc chunked HTTP endpoints.
-void SendRpcChunk(const void* data, uint32_t len) {
+void SendRpcChunk(base::HttpServerConnection* conn,
+ const void* data,
+ uint32_t len) {
if (data == nullptr) {
// Unrecoverable RPC error case.
- if (!g_cur_conn->is_websocket())
- g_cur_conn->SendResponseBody("0\r\n\r\n", 5);
- g_cur_conn->Close();
+ if (!conn->is_websocket())
+ conn->SendResponseBody("0\r\n\r\n", 5);
+ conn->Close();
return;
}
- if (g_cur_conn->is_websocket()) {
- g_cur_conn->SendWebsocketMessage(data, len);
+ if (conn->is_websocket()) {
+ conn->SendWebsocketMessage(data, len);
} else {
base::StackString<32> chunk_hdr("%x\r\n", len);
- g_cur_conn->SendResponseBody(chunk_hdr.c_str(), chunk_hdr.len());
- g_cur_conn->SendResponseBody(data, len);
- g_cur_conn->SendResponseBody("\r\n", 2);
+ conn->SendResponseBody(chunk_hdr.c_str(), chunk_hdr.len());
+ conn->SendResponseBody(data, len);
+ conn->SendResponseBody("\r\n", 2);
}
}
@@ -165,13 +165,13 @@ void Httpd::OnHttpRequest(const base::HttpRequest& req) {
// Start the chunked reply.
conn.SendResponseHeaders("200 OK", chunked_headers,
base::HttpServerConnection::kOmitContentLength);
- PERFETTO_CHECK(g_cur_conn == nullptr);
- g_cur_conn = req.conn;
- global_trace_processor_rpc_.SetRpcResponseFunction(SendRpcChunk);
+ global_trace_processor_rpc_.SetRpcResponseFunction(
+ [&](const void* data, uint32_t len) {
+ SendRpcChunk(&conn, data, len);
+ });
// OnRpcRequest() will call SendRpcChunk() one or more times.
global_trace_processor_rpc_.OnRpcRequest(req.body.data(), req.body.size());
global_trace_processor_rpc_.SetRpcResponseFunction(nullptr);
- g_cur_conn = nullptr;
// Terminate chunked stream.
conn.SendResponseBody("0\r\n\r\n", 5);
@@ -249,13 +249,13 @@ void Httpd::OnHttpRequest(const base::HttpRequest& req) {
}
void Httpd::OnWebsocketMessage(const base::WebsocketMessage& msg) {
- PERFETTO_CHECK(g_cur_conn == nullptr);
- g_cur_conn = msg.conn;
- global_trace_processor_rpc_.SetRpcResponseFunction(SendRpcChunk);
+ global_trace_processor_rpc_.SetRpcResponseFunction(
+ [&](const void* data, uint32_t len) {
+ SendRpcChunk(msg.conn, data, len);
+ });
// OnRpcRequest() will call SendRpcChunk() one or more times.
global_trace_processor_rpc_.OnRpcRequest(msg.data.data(), msg.data.size());
global_trace_processor_rpc_.SetRpcResponseFunction(nullptr);
- g_cur_conn = nullptr;
}
} // namespace
diff --git a/src/trace_processor/rpc/rpc.cc b/src/trace_processor/rpc/rpc.cc
index c715784f5..b676e455d 100644
--- a/src/trace_processor/rpc/rpc.cc
+++ b/src/trace_processor/rpc/rpc.cc
@@ -504,8 +504,7 @@ std::vector<uint8_t> Rpc::GetStatus() {
protozero::HeapBuffered<protos::pbzero::StatusResult> status;
status->set_loaded_trace_name(trace_processor_->GetCurrentTraceName());
status->set_human_readable_version(base::GetVersionString());
- const char* version_code = base::GetVersionCode();
- if (version_code) {
+ if (const char* version_code = base::GetVersionCode(); version_code) {
status->set_version_code(version_code);
}
status->set_api_version(protos::pbzero::TRACE_PROCESSOR_CURRENT_API_VERSION);
diff --git a/src/trace_processor/rpc/rpc.h b/src/trace_processor/rpc/rpc.h
index 61cf2e636..d3b2d41ff 100644
--- a/src/trace_processor/rpc/rpc.h
+++ b/src/trace_processor/rpc/rpc.h
@@ -22,6 +22,7 @@
#include <functional>
#include <memory>
#include <string>
+#include <utility>
#include <vector>
#include "perfetto/base/status.h"
@@ -81,8 +82,11 @@ class Rpc {
// with Wasm, where size_t = uint32_t.
// (nullptr, 0) has the semantic of "close the channel" and is issued when an
// unrecoverable wire-protocol framing error is detected.
- using RpcResponseFunction = void (*)(const void* /*data*/, uint32_t /*len*/);
- void SetRpcResponseFunction(RpcResponseFunction f) { rpc_response_fn_ = f; }
+ using RpcResponseFunction =
+ std::function<void(const void* /*data*/, uint32_t /*len*/)>;
+ void SetRpcResponseFunction(RpcResponseFunction f) {
+ rpc_response_fn_ = std::move(f);
+ }
// 2. TraceProcessor legacy RPC endpoints.
// The methods below are exposed for the old RPC interfaces, where each RPC
diff --git a/src/trace_processor/rpc/wasm_bridge.cc b/src/trace_processor/rpc/wasm_bridge.cc
index a9ba607c5..ca27abc0b 100644
--- a/src/trace_processor/rpc/wasm_bridge.cc
+++ b/src/trace_processor/rpc/wasm_bridge.cc
@@ -22,6 +22,8 @@
namespace perfetto::trace_processor {
namespace {
+using RpcResponseFn = void(const void*, uint32_t);
+
Rpc* g_trace_processor_rpc;
// The buffer used to pass the request arguments. The caller (JS) decides how
@@ -35,9 +37,9 @@ uint8_t* g_req_buf;
extern "C" {
// Returns the address of the allocated request buffer.
-uint8_t* EMSCRIPTEN_KEEPALIVE trace_processor_rpc_init(Rpc::RpcResponseFunction,
- uint32_t);
-uint8_t* trace_processor_rpc_init(Rpc::RpcResponseFunction resp_function,
+uint8_t* EMSCRIPTEN_KEEPALIVE
+trace_processor_rpc_init(RpcResponseFn* RpcResponseFn, uint32_t);
+uint8_t* trace_processor_rpc_init(RpcResponseFn* resp_function,
uint32_t req_buffer_size) {
g_trace_processor_rpc = new Rpc();
diff --git a/ui/src/frontend/rpc_http_dialog.ts b/ui/src/frontend/rpc_http_dialog.ts
index 909502331..13db6b6ca 100644
--- a/ui/src/frontend/rpc_http_dialog.ts
+++ b/ui/src/frontend/rpc_http_dialog.ts
@@ -130,7 +130,7 @@ Trace processor RPC API: ${tpStatus.apiVersion}
// | | |No |Yes |
// | | | +---------------------+ |
// | | | | Dialog: Preloaded? | |
-// | | +--+ YES, use loaded trace |
+// | | | + YES, use loaded trace |
// | | +--------| YES, but reset state| |
// | | +---------------------------------------| NO, Use builtin Wasm| |
// | | | | | +---------------------+ |