diff options
author | Quentin Cha <quentincha@google.com> | 2021-01-22 10:45:13 -0500 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-01-22 10:45:13 -0500 |
commit | 13369a4547045c729dd938a203157a208afc244a (patch) | |
tree | b931af802fa414e8bdd635c77b3388912ebde6dc | |
parent | e736602bcaf5ed1e2191a730813ac2bc7caed9eb (diff) | |
download | opencensus-go-13369a4547045c729dd938a203157a208afc244a.tar.gz |
Adds an exported function to flush internal reader (#1248)
-rw-r--r-- | metric/metricexport/reader.go | 15 | ||||
-rw-r--r-- | metric/metricexport/reader_test.go | 68 |
2 files changed, 81 insertions, 2 deletions
diff --git a/metric/metricexport/reader.go b/metric/metricexport/reader.go index b920bac..8a09d0f 100644 --- a/metric/metricexport/reader.go +++ b/metric/metricexport/reader.go @@ -130,7 +130,7 @@ func (ir *IntervalReader) Start() error { reportingInterval = ir.ReportingInterval } - if ir.done != nil { + if ir.quit != nil { return errAlreadyStarted } ir.timer = time.NewTicker(reportingInterval) @@ -172,6 +172,19 @@ func (ir *IntervalReader) Stop() { ir.quit = nil } +// Flush flushes the metrics if IntervalReader is stopped, otherwise no-op. +func (ir *IntervalReader) Flush() { + ir.mu.Lock() + defer ir.mu.Unlock() + + // No-op if IntervalReader is not stopped + if ir.quit != nil { + return + } + + ir.reader.ReadAndExport(ir.exporter) +} + // ReadAndExport reads metrics from all producer registered with // producer manager and then exports them using provided exporter. func (r *Reader) ReadAndExport(exporter Exporter) { diff --git a/metric/metricexport/reader_test.go b/metric/metricexport/reader_test.go index 33313dc..61c915b 100644 --- a/metric/metricexport/reader_test.go +++ b/metric/metricexport/reader_test.go @@ -117,6 +117,69 @@ func TestManualReadForIntervalReader(t *testing.T) { resetExporter(exporter1) } +func TestFlushNoOpForIntervalReader(t *testing.T) { + ir1 = createAndStart(exporter1, duration1, t) + + gaugeEntry.Set(1) + + // since IR is not stopped, flush does nothing + ir1.Flush() + + // expect no data points + checkExportedCount(exporter1, 0, t) + checkExportedMetricDesc(exporter1, "active_request", t) + ir1.Stop() + resetExporter(exporter1) +} + +func TestFlushAllowMultipleForIntervalReader(t *testing.T) { + ir1 = createAndStart(exporter1, duration1, t) + + gaugeEntry.Set(1) + + ir1.Stop() + ir1.Flush() + + // metric is still coming in + gaugeEntry.Add(1) + + // one more flush after IR stopped + ir1.Flush() + + // expect 2 data point, one from each flush + checkExportedCount(exporter1, 2, t) + checkExportedValues(exporter1, []int64{1, 2}, t) + checkExportedMetricDesc(exporter1, "active_request", t) + + resetExporter(exporter1) +} + +func TestFlushRestartForIntervalReader(t *testing.T) { + ir1 = createAndStart(exporter1, duration1, t) + + gaugeEntry.Set(1) + ir1.Stop() + ir1.Flush() + + // restart the IR + err := ir1.Start() + if err != nil { + t.Fatalf("error starting reader %v\n", err) + } + + gaugeEntry.Add(1) + + ir1.Stop() + ir1.Flush() + + // expect 2 data point, one from each flush + checkExportedCount(exporter1, 2, t) + checkExportedValues(exporter1, []int64{1, 2}, t) + checkExportedMetricDesc(exporter1, "active_request", t) + + resetExporter(exporter1) +} + func TestProducerWithIntervalReaderStop(t *testing.T) { ir1 = createAndStart(exporter1, duration1, t) ir1.Stop() @@ -166,7 +229,10 @@ func TestIntervalReaderMultipleStop(t *testing.T) { func TestIntervalReaderMultipleStart(t *testing.T) { ir1 = createAndStart(exporter1, duration1, t) - ir1.Start() + err := ir1.Start() + if err == nil { + t.Fatalf("expected error but got nil\n") + } gaugeEntry.Add(1) |