summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorQuentin Cha <quentincha@google.com>2021-01-22 10:45:13 -0500
committerGitHub <noreply@github.com>2021-01-22 10:45:13 -0500
commit13369a4547045c729dd938a203157a208afc244a (patch)
treeb931af802fa414e8bdd635c77b3388912ebde6dc
parente736602bcaf5ed1e2191a730813ac2bc7caed9eb (diff)
downloadopencensus-go-13369a4547045c729dd938a203157a208afc244a.tar.gz
Adds an exported function to flush internal reader (#1248)
-rw-r--r--metric/metricexport/reader.go15
-rw-r--r--metric/metricexport/reader_test.go68
2 files changed, 81 insertions, 2 deletions
diff --git a/metric/metricexport/reader.go b/metric/metricexport/reader.go
index b920bac..8a09d0f 100644
--- a/metric/metricexport/reader.go
+++ b/metric/metricexport/reader.go
@@ -130,7 +130,7 @@ func (ir *IntervalReader) Start() error {
reportingInterval = ir.ReportingInterval
}
- if ir.done != nil {
+ if ir.quit != nil {
return errAlreadyStarted
}
ir.timer = time.NewTicker(reportingInterval)
@@ -172,6 +172,19 @@ func (ir *IntervalReader) Stop() {
ir.quit = nil
}
+// Flush flushes the metrics if IntervalReader is stopped, otherwise no-op.
+func (ir *IntervalReader) Flush() {
+ ir.mu.Lock()
+ defer ir.mu.Unlock()
+
+ // No-op if IntervalReader is not stopped
+ if ir.quit != nil {
+ return
+ }
+
+ ir.reader.ReadAndExport(ir.exporter)
+}
+
// ReadAndExport reads metrics from all producer registered with
// producer manager and then exports them using provided exporter.
func (r *Reader) ReadAndExport(exporter Exporter) {
diff --git a/metric/metricexport/reader_test.go b/metric/metricexport/reader_test.go
index 33313dc..61c915b 100644
--- a/metric/metricexport/reader_test.go
+++ b/metric/metricexport/reader_test.go
@@ -117,6 +117,69 @@ func TestManualReadForIntervalReader(t *testing.T) {
resetExporter(exporter1)
}
+func TestFlushNoOpForIntervalReader(t *testing.T) {
+ ir1 = createAndStart(exporter1, duration1, t)
+
+ gaugeEntry.Set(1)
+
+ // since IR is not stopped, flush does nothing
+ ir1.Flush()
+
+ // expect no data points
+ checkExportedCount(exporter1, 0, t)
+ checkExportedMetricDesc(exporter1, "active_request", t)
+ ir1.Stop()
+ resetExporter(exporter1)
+}
+
+func TestFlushAllowMultipleForIntervalReader(t *testing.T) {
+ ir1 = createAndStart(exporter1, duration1, t)
+
+ gaugeEntry.Set(1)
+
+ ir1.Stop()
+ ir1.Flush()
+
+ // metric is still coming in
+ gaugeEntry.Add(1)
+
+ // one more flush after IR stopped
+ ir1.Flush()
+
+ // expect 2 data point, one from each flush
+ checkExportedCount(exporter1, 2, t)
+ checkExportedValues(exporter1, []int64{1, 2}, t)
+ checkExportedMetricDesc(exporter1, "active_request", t)
+
+ resetExporter(exporter1)
+}
+
+func TestFlushRestartForIntervalReader(t *testing.T) {
+ ir1 = createAndStart(exporter1, duration1, t)
+
+ gaugeEntry.Set(1)
+ ir1.Stop()
+ ir1.Flush()
+
+ // restart the IR
+ err := ir1.Start()
+ if err != nil {
+ t.Fatalf("error starting reader %v\n", err)
+ }
+
+ gaugeEntry.Add(1)
+
+ ir1.Stop()
+ ir1.Flush()
+
+ // expect 2 data point, one from each flush
+ checkExportedCount(exporter1, 2, t)
+ checkExportedValues(exporter1, []int64{1, 2}, t)
+ checkExportedMetricDesc(exporter1, "active_request", t)
+
+ resetExporter(exporter1)
+}
+
func TestProducerWithIntervalReaderStop(t *testing.T) {
ir1 = createAndStart(exporter1, duration1, t)
ir1.Stop()
@@ -166,7 +229,10 @@ func TestIntervalReaderMultipleStop(t *testing.T) {
func TestIntervalReaderMultipleStart(t *testing.T) {
ir1 = createAndStart(exporter1, duration1, t)
- ir1.Start()
+ err := ir1.Start()
+ if err == nil {
+ t.Fatalf("expected error but got nil\n")
+ }
gaugeEntry.Add(1)