aboutsummaryrefslogtreecommitdiff
path: root/src/profiling/memory/heapprofd_producer.h
blob: b8e9c53af1ecf423c1c9b9d008dd52e1d038d99e (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
/*
 * Copyright (C) 2018 The Android Open Source Project
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#ifndef SRC_PROFILING_MEMORY_HEAPPROFD_PRODUCER_H_
#define SRC_PROFILING_MEMORY_HEAPPROFD_PRODUCER_H_

#include <array>
#include <functional>
#include <map>
#include <vector>

#include <inttypes.h>

#include "perfetto/base/task_runner.h"
#include "perfetto/ext/base/optional.h"
#include "perfetto/ext/base/unix_socket.h"
#include "perfetto/ext/base/unix_task_runner.h"

#include "perfetto/ext/tracing/core/basic_types.h"
#include "perfetto/ext/tracing/core/producer.h"
#include "perfetto/ext/tracing/core/trace_writer.h"
#include "perfetto/ext/tracing/core/tracing_service.h"
#include "perfetto/tracing/core/data_source_config.h"

#include "perfetto/tracing/core/forward_decls.h"
#include "src/profiling/common/interning_output.h"
#include "src/profiling/common/proc_utils.h"
#include "src/profiling/common/profiler_guardrails.h"
#include "src/profiling/memory/bookkeeping.h"
#include "src/profiling/memory/bookkeeping_dump.h"
#include "src/profiling/memory/log_histogram.h"
#include "src/profiling/memory/shared_ring_buffer.h"
#include "src/profiling/memory/system_property.h"
#include "src/profiling/memory/unwinding.h"
#include "src/profiling/memory/unwound_messages.h"

#include "protos/perfetto/config/profiling/heapprofd_config.gen.h"

namespace perfetto {
namespace profiling {

using HeapprofdConfig = protos::gen::HeapprofdConfig;

struct Process {
  pid_t pid;
  std::string cmdline;
};

// TODO(rsavitski): central daemon can do less work if it knows that the global
// operating mode is fork-based, as it then will not be interacting with the
// clients. This can be implemented as an additional mode here.
enum class HeapprofdMode { kCentral, kChild };

bool HeapprofdConfigToClientConfiguration(
    const HeapprofdConfig& heapprofd_config,
    ClientConfiguration* cli_config);

// Heap profiling producer. Can be instantiated in two modes, central and
// child (also referred to as fork mode).
//
// The central mode producer is instantiated by the system heapprofd daemon. Its
// primary responsibility is activating profiling (via system properties and
// signals) in targets identified by profiling configs. On debug platform
// builds, the central producer can also handle the out-of-process unwinding &
// writing of the profiles for all client processes.
//
// An alternative model is where the central heapprofd triggers the profiling in
// the target process, but the latter fork-execs a private heapprofd binary to
// handle unwinding only for that process. The forked heapprofd instantiates
// this producer in the "child" mode. In this scenario, the profiled process
// never talks to the system daemon.
//
// TODO(fmayer||rsavitski): cover interesting invariants/structure of the
// implementation (e.g. number of data sources in child mode), including
// threading structure.
class HeapprofdProducer : public Producer, public UnwindingWorker::Delegate {
 public:
  friend class SocketDelegate;

  // TODO(fmayer): Split into two delegates for the listening socket in kCentral
  // and for the per-client sockets to make this easier to understand?
  // Alternatively, find a better name for this.
  class SocketDelegate : public base::UnixSocket::EventListener {
   public:
    explicit SocketDelegate(HeapprofdProducer* producer)
        : producer_(producer) {}

    void OnDisconnect(base::UnixSocket* self) override;
    void OnNewIncomingConnection(
        base::UnixSocket* self,
        std::unique_ptr<base::UnixSocket> new_connection) override;
    void OnDataAvailable(base::UnixSocket* self) override;

   private:
    HeapprofdProducer* producer_;
  };

  HeapprofdProducer(HeapprofdMode mode,
                    base::TaskRunner* task_runner,
                    bool exit_when_done);
  ~HeapprofdProducer() override;

  // Producer Impl:
  void OnConnect() override;
  void OnDisconnect() override;
  void SetupDataSource(DataSourceInstanceID, const DataSourceConfig&) override;
  void StartDataSource(DataSourceInstanceID, const DataSourceConfig&) override;
  void StopDataSource(DataSourceInstanceID) override;
  void OnTracingSetup() override;
  void Flush(FlushRequestID,
             const DataSourceInstanceID* data_source_ids,
             size_t num_data_sources) override;
  void ClearIncrementalState(const DataSourceInstanceID* /*data_source_ids*/,
                             size_t /*num_data_sources*/) override {}

  // TODO(fmayer): Refactor once/if we have generic reconnect logic.
  void ConnectWithRetries(const char* socket_name);
  void DumpAll();

  // UnwindingWorker::Delegate impl:
  void PostAllocRecord(UnwindingWorker*, std::unique_ptr<AllocRecord>) override;
  void PostFreeRecord(UnwindingWorker*, std::vector<FreeRecord>) override;
  void PostHeapNameRecord(UnwindingWorker*, HeapNameRecord) override;
  void PostSocketDisconnected(UnwindingWorker*,
                              DataSourceInstanceID,
                              pid_t,
                              SharedRingBuffer::Stats) override;

  void HandleAllocRecord(AllocRecord*);
  void HandleFreeRecord(FreeRecord);
  void HandleHeapNameRecord(HeapNameRecord);
  void HandleSocketDisconnected(DataSourceInstanceID,
                                pid_t,
                                SharedRingBuffer::Stats);

  // Valid only if mode_ == kChild.
  void SetTargetProcess(pid_t target_pid, std::string target_cmdline);
  void SetDataSourceCallback(std::function<void()> fn);

  // Exposed for testing.
  void SetProducerEndpoint(
      std::unique_ptr<TracingService::ProducerEndpoint> endpoint);

  base::UnixSocket::EventListener& socket_delegate() {
    return socket_delegate_;
  }

  // Adopts the (connected) sockets inherited from the target process, invoking
  // the on-connection callback.
  // Specific to mode_ == kChild
  void AdoptSocket(base::ScopedFile fd);

  void TerminateWhenDone();

 private:
  // State of the connection to tracing service (traced).
  enum State {
    kNotStarted = 0,
    kNotConnected,
    kConnecting,
    kConnected,
  };

  struct ProcessState {
    struct HeapInfo {
      HeapInfo(GlobalCallstackTrie* cs, bool dam) : heap_tracker(cs, dam) {}

      HeapTracker heap_tracker;
      std::string heap_name;
      uint64_t sampling_interval = 0u;
      uint64_t orig_sampling_interval = 0u;
    };
    ProcessState(GlobalCallstackTrie* c, bool d)
        : callsites(c), dump_at_max_mode(d) {}
    bool disconnected = false;
    SharedRingBuffer::ErrorState error_state =
        SharedRingBuffer::ErrorState::kNoError;
    bool buffer_corrupted = false;

    uint64_t heap_samples = 0;
    uint64_t map_reparses = 0;
    uint64_t unwinding_errors = 0;

    uint64_t total_unwinding_time_us = 0;
    uint64_t client_spinlock_blocked_us = 0;
    GlobalCallstackTrie* callsites;
    bool dump_at_max_mode;
    LogHistogram unwinding_time_us;
    std::map<uint32_t, HeapInfo> heap_infos;

    HeapInfo& GetHeapInfo(uint32_t heap_id) {
      auto it = heap_infos.find(heap_id);
      if (it == heap_infos.end()) {
        std::tie(it, std::ignore) = heap_infos.emplace(
            std::piecewise_construct, std::forward_as_tuple(heap_id),
            std::forward_as_tuple(callsites, dump_at_max_mode));
      }
      return it->second;
    }

    HeapTracker& GetHeapTracker(uint32_t heap_id) {
      return GetHeapInfo(heap_id).heap_tracker;
    }
  };

  struct DataSource {
    explicit DataSource(std::unique_ptr<TraceWriter> tw)
        : trace_writer(std::move(tw)) {
      // Make MSAN happy.
      memset(&client_configuration, 0, sizeof(client_configuration));
    }

    DataSourceInstanceID id;
    std::unique_ptr<TraceWriter> trace_writer;
    DataSourceConfig ds_config;
    HeapprofdConfig config;
    ClientConfiguration client_configuration;
    std::vector<SystemProperties::Handle> properties;
    std::set<pid_t> signaled_pids;
    std::set<pid_t> rejected_pids;
    std::map<pid_t, ProcessState> process_states;
    std::vector<std::string> normalized_cmdlines;
    InterningOutputTracker intern_state;
    bool shutting_down = false;
    bool started = false;
    bool hit_guardrail = false;
    bool was_stopped = false;
    uint32_t stop_timeout_ms;
    GuardrailConfig guardrail_config;
  };

  struct PendingProcess {
    std::unique_ptr<base::UnixSocket> sock;
    DataSourceInstanceID data_source_instance_id;
    SharedRingBuffer shmem;
  };

  void HandleClientConnection(std::unique_ptr<base::UnixSocket> new_connection,
                              Process process);

  void ConnectService();
  void Restart();
  void ResetConnectionBackoff();
  void IncreaseConnectionBackoff();

  void CheckDataSourceMemoryTask();
  void CheckDataSourceCpuTask();

  void FinishDataSourceFlush(FlushRequestID flush_id);
  void DumpProcessesInDataSource(DataSource* ds);
  void DumpProcessState(DataSource* ds, pid_t pid, ProcessState* process);
  static void SetStats(protos::pbzero::ProfilePacket::ProcessStats* stats,
                       const ProcessState& process_state);

  void DoContinuousDump(DataSourceInstanceID id, uint32_t dump_interval);

  UnwindingWorker& UnwinderForPID(pid_t);
  bool IsPidProfiled(pid_t);
  DataSource* GetDataSourceForProcess(const Process& proc);
  void RecordOtherSourcesAsRejected(DataSource* active_ds, const Process& proc);

  void SetStartupProperties(DataSource* data_source);
  void SignalRunningProcesses(DataSource* data_source);

  // Specific to mode_ == kChild
  void TerminateProcess(int exit_status);
  // Specific to mode_ == kChild
  void ActiveDataSourceWatchdogCheck();

  void ShutdownDataSource(DataSource* ds);
  bool MaybeFinishDataSource(DataSource* ds);

  void WriteRejectedConcurrentSession(BufferID buffer_id, pid_t pid);

  // Class state:

  // Task runner is owned by the main thread.
  base::TaskRunner* const task_runner_;
  const HeapprofdMode mode_;
  // TODO(fmayer): Refactor to make this boolean unnecessary.
  // Whether to terminate this producer after the first data-source has
  // finished.
  bool exit_when_done_;

  // State of connection to the tracing service.
  State state_ = kNotStarted;
  uint32_t connection_backoff_ms_ = 0;
  const char* producer_sock_name_ = nullptr;

  // Client processes that have connected, but with which we have not yet
  // finished the handshake.
  std::map<pid_t, PendingProcess> pending_processes_;

  // Must outlive data_sources_ - owns at least the shared memory referenced by
  // TraceWriters.
  std::unique_ptr<TracingService::ProducerEndpoint> endpoint_;

  // Must outlive data_sources_ - HeapTracker references the trie.
  GlobalCallstackTrie callsites_;

  // Must outlive data_sources_ - DataSource can hold
  // SystemProperties::Handle-s.
  // Specific to mode_ == kCentral
  SystemProperties properties_;

  std::map<FlushRequestID, size_t> flushes_in_progress_;
  std::map<DataSourceInstanceID, DataSource> data_sources_;
  std::vector<UnwindingWorker> unwinding_workers_;

  // Specific to mode_ == kChild
  Process target_process_{base::kInvalidPid, ""};
  base::Optional<std::function<void()>> data_source_callback_;

  SocketDelegate socket_delegate_;

  base::WeakPtrFactory<HeapprofdProducer> weak_factory_;  // Keep last.
};

}  // namespace profiling
}  // namespace perfetto

#endif  // SRC_PROFILING_MEMORY_HEAPPROFD_PRODUCER_H_