aboutsummaryrefslogtreecommitdiff
path: root/src/trace_processor/span_join_operator_table.h
blob: eb961612c0fd91d6f08dd92a50926a327dcfe1a4 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
/*
 * Copyright (C) 2018 The Android Open Source Project
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#ifndef SRC_TRACE_PROCESSOR_SPAN_JOIN_OPERATOR_TABLE_H_
#define SRC_TRACE_PROCESSOR_SPAN_JOIN_OPERATOR_TABLE_H_

#include <sqlite3.h>

#include <array>
#include <deque>
#include <limits>
#include <map>
#include <memory>
#include <string>
#include <unordered_map>
#include <vector>

#include "perfetto/trace_processor/basic_types.h"
#include "perfetto/trace_processor/status.h"
#include "src/trace_processor/sqlite/scoped_db.h"
#include "src/trace_processor/sqlite/sqlite_table.h"

namespace perfetto {
namespace trace_processor {

// Implements the SPAN JOIN operation between two tables on a particular column.
//
// Span:
// A span is a row with a timestamp and a duration. It can is used to model
// operations which run for a particular *span* of time.
//
// We draw spans like so (time on the x-axis):
// start of span->[ time where opertion is running ]<- end of span
//
// Multiple spans can happen in parallel:
// [      ]
//    [        ]
//   [                    ]
//  [ ]
//
// The above for example, models scheduling activity on a 4-core computer for a
// short period of time.
//
// Span join:
// The span join operation can be thought of as the intersection of span tables.
// That is, the join table has a span for each pair of spans in the child tables
// where the spans overlap. Because many spans are possible in parallel, an
// extra metadata column (labelled the "join column") is used to distinguish
// between the spanned tables.
//
// For a given join key suppose these were the two span tables:
// Table 1:   [        ]              [      ]         [ ]
// Table 2:          [      ]            [  ]           [      ]
// Output :          [ ]                 [  ]           []
//
// All other columns apart from timestamp (ts), duration (dur) and the join key
// are passed through unchanged.
class SpanJoinOperatorTable : public SqliteTable {
 public:
  // Columns of the span operator table.
  enum Column {
    kTimestamp = 0,
    kDuration = 1,
    kPartition = 2,
    // All other columns are dynamic depending on the joined tables.
  };

  enum class PartitioningType {
    kNoPartitioning = 0,
    kSamePartitioning = 1,
    kMixedPartitioning = 2
  };

  // Parsed version of a table descriptor.
  struct TableDescriptor {
    static util::Status Parse(const std::string& raw_descriptor,
                              TableDescriptor* descriptor);

    bool IsPartitioned() const { return !partition_col.empty(); }

    std::string name;
    std::string partition_col;
  };

  // Contains the definition of the child tables.
  class TableDefinition {
   public:
    TableDefinition() = default;

    TableDefinition(std::string name,
                    std::string partition_col,
                    std::vector<SqliteTable::Column> cols,
                    bool emit_shadow_slices,
                    uint32_t ts_idx,
                    uint32_t dur_idx,
                    uint32_t partition_idx);

    const std::string& name() const { return name_; }
    const std::string& partition_col() const { return partition_col_; }
    const std::vector<SqliteTable::Column>& columns() const { return cols_; }

    bool emit_shadow_slices() const { return emit_shadow_slices_; }
    uint32_t ts_idx() const { return ts_idx_; }
    uint32_t dur_idx() const { return dur_idx_; }
    uint32_t partition_idx() const { return partition_idx_; }

    bool IsPartitioned() const { return !partition_col_.empty(); }

   private:
    std::string name_;
    std::string partition_col_;
    std::vector<SqliteTable::Column> cols_;
    bool emit_shadow_slices_;
    uint32_t ts_idx_ = std::numeric_limits<uint32_t>::max();
    uint32_t dur_idx_ = std::numeric_limits<uint32_t>::max();
    uint32_t partition_idx_ = std::numeric_limits<uint32_t>::max();
  };

  class Query {
   public:
    struct StepRet {
      enum Code {
        kRow,
        kEof,
        kError,
      };

      StepRet(Code c, int e = SQLITE_OK) : code(c), err_code(e) {}

      bool is_row() const { return code == Code::kRow; }
      bool is_eof() const { return code == Code::kEof; }
      bool is_err() const { return code == Code::kError; }

      Code code = Code::kEof;
      int err_code = SQLITE_OK;
    };

    Query(SpanJoinOperatorTable*, const TableDefinition*, sqlite3* db);
    virtual ~Query();

    Query(Query&) = delete;
    Query& operator=(const Query&) = delete;

    Query(Query&&) noexcept = default;
    Query& operator=(Query&&) = default;

    int Initialize(const QueryConstraints& qc, sqlite3_value** argv);

    StepRet Step();
    StepRet StepToNextPartition();
    StepRet StepToPartition(int64_t target_partition);
    StepRet StepUntil(int64_t timestamp);

    void ReportSqliteResult(sqlite3_context* context, size_t index);

    int64_t ts_start() const { return ts_start_; }
    int64_t ts_end() const { return ts_end_; }
    int64_t partition() const { return partition_; }

    const TableDefinition* definition() const { return defn_; }

    bool Eof() const { return cursor_eof_ && mode_ == Mode::kRealSlice; }
    bool IsPartitioned() const { return defn_->IsPartitioned(); }
    bool IsRealSlice() const { return mode_ == Mode::kRealSlice; }

    bool IsFullPartitionShadowSlice() const {
      return mode_ == Mode::kShadowSlice && ts_start_ == 0 &&
             ts_end_ == std::numeric_limits<int64_t>::max();
    }

    int64_t CursorPartition() const {
      PERFETTO_DCHECK(defn_->IsPartitioned());
      auto partition_idx = static_cast<int>(defn_->partition_idx());
      return sqlite3_column_int64(stmt_.get(), partition_idx);
    }

   private:
    enum Mode {
      kRealSlice,
      kShadowSlice,
    };

    int PrepareRawStmt();
    std::string CreateSqlQuery(const std::vector<std::string>& cs) const;

    int64_t CursorTs() const {
      auto ts_idx = static_cast<int>(defn_->ts_idx());
      return sqlite3_column_int64(stmt_.get(), ts_idx);
    }

    int64_t CursorDur() const {
      auto dur_idx = static_cast<int>(defn_->dur_idx());
      return sqlite3_column_int64(stmt_.get(), dur_idx);
    }

    std::string sql_query_;
    ScopedStmt stmt_;

    int64_t ts_start_ = 0;
    int64_t ts_end_ = 0;
    int64_t partition_ = std::numeric_limits<int64_t>::lowest();

    bool cursor_eof_ = false;
    Mode mode_ = Mode::kRealSlice;

    const TableDefinition* defn_ = nullptr;
    sqlite3* db_ = nullptr;
    SpanJoinOperatorTable* table_ = nullptr;
  };

  // Base class for a cursor on the span table.
  class Cursor : public SqliteTable::Cursor {
   public:
    Cursor(SpanJoinOperatorTable*, sqlite3* db);
    ~Cursor() override = default;

    int Filter(const QueryConstraints& qc, sqlite3_value** argv) override;
    int Column(sqlite3_context* context, int N) override;
    int Next() override;
    int Eof() override;

   private:
    Cursor(Cursor&) = delete;
    Cursor& operator=(const Cursor&) = delete;

    Cursor(Cursor&&) noexcept = default;
    Cursor& operator=(Cursor&&) = default;

    bool IsOverlappingSpan();
    Query::StepRet StepUntilRealSlice();

    Query t1_;
    Query t2_;
    Query* next_stepped_ = nullptr;

    SpanJoinOperatorTable* table_;
  };

  SpanJoinOperatorTable(sqlite3*, const TraceStorage*);

  static void RegisterTable(sqlite3* db, const TraceStorage* storage);

  // Table implementation.
  util::Status Init(int, const char* const*, SqliteTable::Schema*) override;
  std::unique_ptr<SqliteTable::Cursor> CreateCursor() override;
  int BestIndex(const QueryConstraints& qc, BestIndexInfo* info) override;

 private:
  // Identifier for a column by index in a given table.
  struct ColumnLocator {
    const TableDefinition* defn;
    size_t col_index;
  };

  bool IsLeftJoin() const { return name() == "span_left_join"; }
  bool IsOuterJoin() const { return name() == "span_outer_join"; }

  const std::string& partition_col() const {
    return t1_defn_.IsPartitioned() ? t1_defn_.partition_col()
                                    : t2_defn_.partition_col();
  }

  util::Status CreateTableDefinition(
      const TableDescriptor& desc,
      bool emit_shadow_slices,
      SpanJoinOperatorTable::TableDefinition* defn);

  std::vector<std::string> ComputeSqlConstraintsForDefinition(
      const TableDefinition& defn,
      const QueryConstraints& qc,
      sqlite3_value** argv);

  std::string GetNameForGlobalColumnIndex(const TableDefinition& defn,
                                          int global_column);

  void CreateSchemaColsForDefn(const TableDefinition& defn,
                               std::vector<SqliteTable::Column>* cols);

  TableDefinition t1_defn_;
  TableDefinition t2_defn_;
  PartitioningType partitioning_;
  std::unordered_map<size_t, ColumnLocator> global_index_to_column_locator_;

  sqlite3* const db_;
};

}  // namespace trace_processor
}  // namespace perfetto

#endif  // SRC_TRACE_PROCESSOR_SPAN_JOIN_OPERATOR_TABLE_H_