Skip to content
This repository was archived by the owner on Dec 3, 2024. It is now read-only.

Commit 807918c

Browse files
committed
Batch requests
Per https://cloud.google.com/monitoring/quotas we can include up to 200 TimeSeries per RPC. Batch them to limit the number of RPCs.
1 parent e5aea7c commit 807918c

File tree

2 files changed

+94
-84
lines changed

2 files changed

+94
-84
lines changed

stackdriver.go

Lines changed: 76 additions & 84 deletions
Original file line numberDiff line numberDiff line change
@@ -234,10 +234,6 @@ func (s *Sink) deep() (time.Time, map[string]*gauge, map[string]*counter, map[st
234234
func (s *Sink) report(ctx context.Context) {
235235
end, rGauges, rCounters, rHistograms := s.deep()
236236

237-
if s.client == nil {
238-
return
239-
}
240-
241237
// https://cloud.google.com/monitoring/api/resources
242238
resource := &monitoredrespb.MonitoredResource{
243239
Type: "generic_task",
@@ -250,72 +246,56 @@ func (s *Sink) report(ctx context.Context) {
250246
},
251247
}
252248

249+
ts := []*monitoringpb.TimeSeries{}
250+
253251
for _, v := range rCounters {
254-
err := s.client.CreateTimeSeries(ctx, &monitoringpb.CreateTimeSeriesRequest{
255-
Name: fmt.Sprintf("projects/%s", s.taskInfo.ProjectID),
256-
TimeSeries: []*monitoringpb.TimeSeries{
257-
{
258-
Metric: &metricpb.Metric{
259-
Type: path.Join("custom.googleapis.com", "go-metrics", v.name.name),
260-
Labels: v.name.labelMap(),
252+
ts = append(ts, &monitoringpb.TimeSeries{
253+
Metric: &metricpb.Metric{
254+
Type: path.Join("custom.googleapis.com", "go-metrics", v.name.name),
255+
Labels: v.name.labelMap(),
256+
},
257+
MetricKind: metric.MetricDescriptor_GAUGE,
258+
Resource: resource,
259+
Points: []*monitoringpb.Point{
260+
&monitoringpb.Point{
261+
Interval: &monitoringpb.TimeInterval{
262+
EndTime: &googlepb.Timestamp{
263+
Seconds: end.Unix(),
264+
},
261265
},
262-
MetricKind: metric.MetricDescriptor_GAUGE,
263-
Resource: resource,
264-
Points: []*monitoringpb.Point{
265-
&monitoringpb.Point{
266-
Interval: &monitoringpb.TimeInterval{
267-
EndTime: &googlepb.Timestamp{
268-
Seconds: end.Unix(),
269-
},
270-
},
271-
Value: &monitoringpb.TypedValue{
272-
Value: &monitoringpb.TypedValue_DoubleValue{
273-
DoubleValue: v.value,
274-
},
275-
},
266+
Value: &monitoringpb.TypedValue{
267+
Value: &monitoringpb.TypedValue_DoubleValue{
268+
DoubleValue: v.value,
276269
},
277270
},
278271
},
279272
},
280273
})
281-
282-
if err != nil {
283-
log.Printf("Failed to write time series data: %v", err)
284-
}
285274
}
286275

287276
for _, v := range rGauges {
288-
err := s.client.CreateTimeSeries(ctx, &monitoringpb.CreateTimeSeriesRequest{
289-
Name: fmt.Sprintf("projects/%s", s.taskInfo.ProjectID),
290-
TimeSeries: []*monitoringpb.TimeSeries{
291-
{
292-
Metric: &metricpb.Metric{
293-
Type: path.Join("custom.googleapis.com", "go-metrics", v.name.name),
294-
Labels: v.name.labelMap(),
277+
ts = append(ts, &monitoringpb.TimeSeries{
278+
Metric: &metricpb.Metric{
279+
Type: path.Join("custom.googleapis.com", "go-metrics", v.name.name),
280+
Labels: v.name.labelMap(),
281+
},
282+
MetricKind: metric.MetricDescriptor_GAUGE,
283+
Resource: resource,
284+
Points: []*monitoringpb.Point{
285+
&monitoringpb.Point{
286+
Interval: &monitoringpb.TimeInterval{
287+
EndTime: &googlepb.Timestamp{
288+
Seconds: end.Unix(),
289+
},
295290
},
296-
MetricKind: metric.MetricDescriptor_GAUGE,
297-
Resource: resource,
298-
Points: []*monitoringpb.Point{
299-
&monitoringpb.Point{
300-
Interval: &monitoringpb.TimeInterval{
301-
EndTime: &googlepb.Timestamp{
302-
Seconds: end.Unix(),
303-
},
304-
},
305-
Value: &monitoringpb.TypedValue{
306-
Value: &monitoringpb.TypedValue_DoubleValue{
307-
DoubleValue: float64(v.value),
308-
},
309-
},
291+
Value: &monitoringpb.TypedValue{
292+
Value: &monitoringpb.TypedValue_DoubleValue{
293+
DoubleValue: float64(v.value),
310294
},
311295
},
312296
},
313297
},
314298
})
315-
316-
if err != nil {
317-
log.Printf("Failed to write time series data: %v", err)
318-
}
319299
}
320300

321301
for _, v := range rHistograms {
@@ -325,46 +305,58 @@ func (s *Sink) report(ctx context.Context) {
325305
count += int64(i)
326306
}
327307

328-
err := s.client.CreateTimeSeries(ctx, &monitoringpb.CreateTimeSeriesRequest{
329-
Name: fmt.Sprintf("projects/%s", s.taskInfo.ProjectID),
330-
TimeSeries: []*monitoringpb.TimeSeries{
331-
{
332-
Metric: &metricpb.Metric{
333-
Type: path.Join("custom.googleapis.com", "go-metrics", v.name.name),
334-
Labels: v.name.labelMap(),
308+
ts = append(ts, &monitoringpb.TimeSeries{
309+
Metric: &metricpb.Metric{
310+
Type: path.Join("custom.googleapis.com", "go-metrics", v.name.name),
311+
Labels: v.name.labelMap(),
312+
},
313+
MetricKind: metric.MetricDescriptor_CUMULATIVE,
314+
Resource: resource,
315+
Points: []*monitoringpb.Point{
316+
&monitoringpb.Point{
317+
Interval: &monitoringpb.TimeInterval{
318+
StartTime: &googlepb.Timestamp{
319+
Seconds: s.firstTime.Unix(),
320+
},
321+
EndTime: &googlepb.Timestamp{
322+
Seconds: end.Unix(),
323+
},
335324
},
336-
MetricKind: metric.MetricDescriptor_CUMULATIVE,
337-
Resource: resource,
338-
Points: []*monitoringpb.Point{
339-
&monitoringpb.Point{
340-
Interval: &monitoringpb.TimeInterval{
341-
StartTime: &googlepb.Timestamp{
342-
Seconds: s.firstTime.Unix(),
343-
},
344-
EndTime: &googlepb.Timestamp{
345-
Seconds: end.Unix(),
346-
},
347-
},
348-
Value: &monitoringpb.TypedValue{
349-
Value: &monitoringpb.TypedValue_DistributionValue{
350-
DistributionValue: &distributionpb.Distribution{
351-
BucketOptions: &distributionpb.Distribution_BucketOptions{
352-
Options: &distributionpb.Distribution_BucketOptions_ExplicitBuckets{
353-
ExplicitBuckets: &distributionpb.Distribution_BucketOptions_Explicit{
354-
Bounds: v.buckets,
355-
},
356-
},
325+
Value: &monitoringpb.TypedValue{
326+
Value: &monitoringpb.TypedValue_DistributionValue{
327+
DistributionValue: &distributionpb.Distribution{
328+
BucketOptions: &distributionpb.Distribution_BucketOptions{
329+
Options: &distributionpb.Distribution_BucketOptions_ExplicitBuckets{
330+
ExplicitBuckets: &distributionpb.Distribution_BucketOptions_Explicit{
331+
Bounds: v.buckets,
357332
},
358-
BucketCounts: v.counts,
359-
Count: count,
360333
},
361334
},
335+
BucketCounts: v.counts,
336+
Count: count,
362337
},
363338
},
364339
},
365340
},
366341
},
367342
})
343+
}
344+
345+
if s.client == nil {
346+
return
347+
}
348+
349+
for i := 0; i < len(ts); i += 200 {
350+
end := i + 200
351+
352+
if end > len(ts) {
353+
end = len(ts)
354+
}
355+
356+
err := s.client.CreateTimeSeries(ctx, &monitoringpb.CreateTimeSeriesRequest{
357+
Name: fmt.Sprintf("projects/%s", s.taskInfo.ProjectID),
358+
TimeSeries: ts[i:end],
359+
})
368360

369361
if err != nil {
370362
log.Printf("Failed to write time series data: %v", err)

stackdriver_test.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -224,6 +224,24 @@ func TestSample(t *testing.T) {
224224
}
225225
},
226226
},
227+
{
228+
name: "batching",
229+
collect: func() {
230+
for i := 0; i < 300; i++ {
231+
ss.SetGauge([]string{"foo", fmt.Sprintf("%d", i)}, 50.0)
232+
}
233+
},
234+
createFn: func(t *testing.T) func(context.Context, *monitoringpb.CreateTimeSeriesRequest) (*emptypb.Empty, error) {
235+
return func(_ context.Context, req *monitoringpb.CreateTimeSeriesRequest) (*emptypb.Empty, error) {
236+
// 300 TimeSeries were created, we expect 2 RPCs (first with 200 and second with 100 TS)
237+
if len(req.TimeSeries) == 200 || len(req.TimeSeries) == 100 {
238+
return &emptypb.Empty{}, nil
239+
}
240+
t.Errorf("unexpected CreateTimeSeriesRequest\ngot(# of TimeSeries): %v", len(req.TimeSeries))
241+
return nil, errors.New("unexpected CreateTimeSeriesRequest")
242+
}
243+
},
244+
},
227245
}
228246

229247
for _, tc := range tests {

0 commit comments

Comments
 (0)