Skip to content
This repository was archived by the owner on Jul 19, 2023. It is now read-only.

Commit f125450

Browse files
Ingester limits (#523)
This bring ingester limits: - Global & Local maximum of active series - Out of orders. This is intentionally tracked outside of the storage package to not account for reset when data is flushed. Co-authored-by: Christian Simon <simon@swine.de>
1 parent e425b2a commit f125450

File tree

18 files changed

+399
-61
lines changed

18 files changed

+399
-61
lines changed

docs/sources/operators-guide/configure/reference-configuration-parameters/index.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -193,16 +193,16 @@ limits:
193193

194194
# Maximum number of active series of profiles per tenant, per ingester. 0 to
195195
# disable.
196-
# CLI flag: -ingester.max-series-per-tenant
197-
[max_series_per_tenant: <int> | default = 0]
196+
# CLI flag: -ingester.max-local-series-per-tenant
197+
[max_local_series_per_tenant: <int> | default = 0]
198198

199199
# Maximum number of active series of profiles per tenant, across the cluster.
200200
# 0 to disable. When the global limit is enabled, each ingester is configured
201201
# with a dynamic local limit based on the replication factor and the current
202202
# number of healthy ingesters, and is kept updated whenever the number of
203203
# ingesters change.
204204
# CLI flag: -ingester.max-global-series-per-tenant
205-
[max_global_series_per_user: <int> | default = 5000]
205+
[max_global_series_per_tenant: <int> | default = 5000]
206206

207207
# Limit how far back in profiling data can be queried, up until lookback
208208
# duration ago. This limit is enforced in the query frontend. If the requested

pkg/ingester/ingester.go

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
"github.com/grafana/phlare/pkg/tenant"
2525
"github.com/grafana/phlare/pkg/usagestats"
2626
"github.com/grafana/phlare/pkg/util"
27+
"github.com/grafana/phlare/pkg/validation"
2728
)
2829

2930
var activeTenantsStats = usagestats.NewInt("ingester_active_tenants")
@@ -57,7 +58,8 @@ type Ingester struct {
5758
instances map[string]*instance
5859
instancesMtx sync.RWMutex
5960

60-
reg prometheus.Registerer
61+
limits Limits
62+
reg prometheus.Registerer
6163
}
6264

6365
type ingesterFlusherCompat struct {
@@ -71,7 +73,7 @@ func (i *ingesterFlusherCompat) Flush() {
7173
}
7274
}
7375

74-
func New(phlarectx context.Context, cfg Config, dbConfig phlaredb.Config, storageBucket phlareobjstore.Bucket) (*Ingester, error) {
76+
func New(phlarectx context.Context, cfg Config, dbConfig phlaredb.Config, storageBucket phlareobjstore.Bucket, limits Limits) (*Ingester, error) {
7577
i := &Ingester{
7678
cfg: cfg,
7779
phlarectx: phlarectx,
@@ -80,6 +82,7 @@ func New(phlarectx context.Context, cfg Config, dbConfig phlaredb.Config, storag
8082
instances: map[string]*instance{},
8183
dbConfig: dbConfig,
8284
storageBucket: storageBucket,
85+
limits: limits,
8386
}
8487

8588
var err error
@@ -135,7 +138,8 @@ func (i *Ingester) GetOrCreateInstance(tenantID string) (*instance, error) { //n
135138
inst, ok = i.instances[tenantID]
136139
if !ok {
137140
var err error
138-
inst, err = newInstance(i.phlarectx, i.dbConfig, tenantID, i.storageBucket)
141+
142+
inst, err = newInstance(i.phlarectx, i.dbConfig, tenantID, i.storageBucket, NewLimiter(tenantID, i.limits, i.lifecycler, i.cfg.LifecyclerConfig.RingConfig.ReplicationFactor))
139143
if err != nil {
140144
return nil, err
141145
}
@@ -184,16 +188,26 @@ func (i *Ingester) Push(ctx context.Context, req *connect.Request[pushv1.PushReq
184188
level.Debug(instance.logger).Log("msg", "message received by ingester push")
185189
for _, series := range req.Msg.Series {
186190
for _, sample := range series.Samples {
187-
p, err := pprof.FromBytes(sample.RawProfile)
191+
p, size, err := pprof.FromBytes(sample.RawProfile)
188192
if err != nil {
189193
return nil, err
190194
}
191-
192195
id, err := uuid.Parse(sample.ID)
193196
if err != nil {
194197
return nil, err
195198
}
196199
if err := instance.Head().Ingest(ctx, p, id, series.Labels...); err != nil {
200+
reason := validation.ReasonOf(err)
201+
if reason != validation.Unknown {
202+
validation.DiscardedProfiles.WithLabelValues(string(reason), instance.tenantID).Add(float64(1))
203+
validation.DiscardedBytes.WithLabelValues(string(reason), instance.tenantID).Add(float64(size))
204+
switch validation.ReasonOf(err) {
205+
case validation.OutOfOrder:
206+
return nil, connect.NewError(connect.CodeInvalidArgument, err)
207+
case validation.SeriesLimit:
208+
return nil, connect.NewError(connect.CodeResourceExhausted, err)
209+
}
210+
}
197211
return nil, err
198212
}
199213
p.ReturnToVTPool()

pkg/ingester/ingester_test.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -57,12 +57,13 @@ func Test_MultitenantReadWrite(t *testing.T) {
5757
reg := prometheus.NewRegistry()
5858
ctx := phlarecontext.WithLogger(context.Background(), logger)
5959
ctx = phlarecontext.WithRegistry(ctx, reg)
60-
cfg := client.Config{StorageBackendConfig: client.StorageBackendConfig{
61-
Backend: client.Filesystem,
62-
Filesystem: filesystem.Config{
63-
Directory: dbPath,
60+
cfg := client.Config{
61+
StorageBackendConfig: client.StorageBackendConfig{
62+
Backend: client.Filesystem,
63+
Filesystem: filesystem.Config{
64+
Directory: dbPath,
65+
},
6466
},
65-
},
6667
}
6768

6869
fs, err := client.NewBucket(ctx, cfg, "storage")
@@ -71,7 +72,7 @@ func Test_MultitenantReadWrite(t *testing.T) {
7172
ing, err := New(ctx, defaultIngesterTestConfig(t), phlaredb.Config{
7273
DataPath: dbPath,
7374
MaxBlockDuration: 30 * time.Hour,
74-
}, fs)
75+
}, fs, &fakeLimits{})
7576
require.NoError(t, err)
7677
require.NoError(t, services.StartAndAwaitRunning(context.Background(), ing))
7778

pkg/ingester/instance.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,15 +24,16 @@ type instance struct {
2424
logger log.Logger
2525
reg prometheus.Registerer
2626

27-
cancel context.CancelFunc
28-
wg sync.WaitGroup
27+
cancel context.CancelFunc
28+
wg sync.WaitGroup
29+
tenantID string
2930
}
3031

31-
func newInstance(phlarectx context.Context, cfg phlaredb.Config, tenantID string, storageBucket phlareobjstore.Bucket) (*instance, error) {
32+
func newInstance(phlarectx context.Context, cfg phlaredb.Config, tenantID string, storageBucket phlareobjstore.Bucket, limiter Limiter) (*instance, error) {
3233
cfg.DataPath = path.Join(cfg.DataPath, tenantID)
3334

3435
phlarectx = phlarecontext.WrapTenant(phlarectx, tenantID)
35-
db, err := phlaredb.New(phlarectx, cfg)
36+
db, err := phlaredb.New(phlarectx, cfg, limiter)
3637
if err != nil {
3738
return nil, err
3839
}
@@ -42,6 +43,7 @@ func newInstance(phlarectx context.Context, cfg phlaredb.Config, tenantID string
4243
logger: phlarecontext.Logger(phlarectx),
4344
reg: phlarecontext.Registry(phlarectx),
4445
cancel: cancel,
46+
tenantID: tenantID,
4547
}
4648
if storageBucket != nil {
4749
inst.shipper = shipper.New(

pkg/ingester/limiter.go

Lines changed: 193 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,193 @@
1+
package ingester
2+
3+
import (
4+
"context"
5+
"sync"
6+
"time"
7+
8+
"github.com/prometheus/common/model"
9+
10+
phlaremodel "github.com/grafana/phlare/pkg/model"
11+
"github.com/grafana/phlare/pkg/validation"
12+
)
13+
14+
var (
15+
activeSeriesTimeout = 10 * time.Minute
16+
activeSeriesCleanup = time.Minute
17+
)
18+
19+
type RingCount interface {
20+
HealthyInstancesCount() int
21+
}
22+
23+
type Limits interface {
24+
MaxLocalSeriesPerTenant(tenantID string) int
25+
MaxGlobalSeriesPerTenant(tenantID string) int
26+
}
27+
28+
type Limiter interface {
29+
// AllowProfile returns an error if the profile is not allowed to be ingested.
30+
// The error is a validation error and can be out of order or max series limit reached.
31+
AllowProfile(fp model.Fingerprint, lbs phlaremodel.Labels, tsNano int64) error
32+
Stop()
33+
}
34+
35+
type limiter struct {
36+
limits Limits
37+
ring RingCount
38+
replicationFactor int
39+
tenantID string
40+
41+
activeSeries map[model.Fingerprint]int64
42+
lastTimestamp map[model.Fingerprint]int64
43+
44+
mtx sync.Mutex // todo: may be shard the lock to avoid latency spikes.
45+
46+
ctx context.Context
47+
cancel context.CancelFunc
48+
wg sync.WaitGroup
49+
}
50+
51+
func NewLimiter(tenantID string, limits Limits, ring RingCount, replicationFactor int) Limiter {
52+
ctx, cancel := context.WithCancel(context.Background())
53+
54+
l := &limiter{
55+
tenantID: tenantID,
56+
limits: limits,
57+
ring: ring,
58+
replicationFactor: replicationFactor,
59+
activeSeries: map[model.Fingerprint]int64{},
60+
lastTimestamp: map[model.Fingerprint]int64{},
61+
cancel: cancel,
62+
ctx: ctx,
63+
}
64+
65+
l.wg.Add(1)
66+
go l.loop()
67+
68+
return l
69+
}
70+
71+
func (l *limiter) Stop() {
72+
l.cancel()
73+
l.wg.Wait()
74+
}
75+
76+
func (l *limiter) loop() {
77+
defer l.wg.Done()
78+
79+
ticker := time.NewTicker(activeSeriesCleanup)
80+
defer ticker.Stop()
81+
82+
for {
83+
select {
84+
case <-ticker.C:
85+
l.cleanup()
86+
case <-l.ctx.Done():
87+
return
88+
}
89+
}
90+
}
91+
92+
// cleanup removes the series that have not been used for a while.
93+
func (l *limiter) cleanup() {
94+
now := time.Now().UnixNano()
95+
l.mtx.Lock()
96+
defer l.mtx.Unlock()
97+
98+
for fp, lastUsed := range l.activeSeries {
99+
if now-lastUsed > int64(activeSeriesTimeout) {
100+
delete(l.activeSeries, fp)
101+
}
102+
}
103+
}
104+
105+
func (l *limiter) AllowProfile(fp model.Fingerprint, lbs phlaremodel.Labels, tsNano int64) error {
106+
l.mtx.Lock()
107+
defer l.mtx.Unlock()
108+
if err := l.allowNewProfile(fp, lbs, tsNano); err != nil {
109+
return err
110+
}
111+
return l.allowNewSeries(fp)
112+
}
113+
114+
func (l *limiter) allowNewProfile(fp model.Fingerprint, lbs phlaremodel.Labels, tsNano int64) error {
115+
max, ok := l.lastTimestamp[fp]
116+
if ok {
117+
// profile is before the last timestamp
118+
if tsNano < max {
119+
return validation.NewErrorf(validation.OutOfOrder, "profile for series %s out of order (received %s last %s)", phlaremodel.LabelPairsString(lbs), time.Unix(0, tsNano), time.Unix(0, max))
120+
}
121+
}
122+
123+
// set the last timestamp
124+
l.lastTimestamp[fp] = tsNano
125+
return nil
126+
}
127+
128+
func (l *limiter) allowNewSeries(fp model.Fingerprint) error {
129+
_, ok := l.activeSeries[fp]
130+
series := len(l.activeSeries)
131+
if !ok {
132+
// can this series be added?
133+
if err := l.assertMaxSeriesPerUser(l.tenantID, series); err != nil {
134+
return err
135+
}
136+
}
137+
138+
// update time or add it
139+
l.activeSeries[fp] = time.Now().UnixNano()
140+
return nil
141+
}
142+
143+
func (l *limiter) assertMaxSeriesPerUser(tenantID string, series int) error {
144+
// Start by setting the local limit either from override or default
145+
localLimit := l.limits.MaxLocalSeriesPerTenant(tenantID)
146+
147+
// We can assume that series are evenly distributed across ingesters
148+
// so we do convert the global limit into a local limit
149+
globalLimit := l.limits.MaxGlobalSeriesPerTenant(tenantID)
150+
adjustedGlobalLimit := convertGlobalToLocalLimit(globalLimit, l.ring, l.replicationFactor)
151+
152+
// Set the calculated limit to the lesser of the local limit or the new calculated global limit
153+
calculatedLimit := minNonZero(localLimit, adjustedGlobalLimit)
154+
155+
// If both the local and global limits are disabled, we just
156+
// use the largest int value
157+
if calculatedLimit == 0 {
158+
return nil
159+
}
160+
161+
if series < calculatedLimit {
162+
return nil
163+
}
164+
return validation.NewErrorf(validation.SeriesLimit, validation.SeriesLimitErrorMsg, series, calculatedLimit)
165+
}
166+
167+
func convertGlobalToLocalLimit(globalLimit int, ringCount RingCount, replicationFactor int) int {
168+
if globalLimit == 0 {
169+
return 0
170+
}
171+
172+
// Given we don't need a super accurate count (ie. when the ingesters
173+
// topology changes) and we prefer to always be in favor of the tenant,
174+
// we can use a per-ingester limit equal to:
175+
// (global limit / number of ingesters) * replication factor
176+
numIngesters := ringCount.HealthyInstancesCount()
177+
178+
// May happen because the number of ingesters is asynchronously updated.
179+
// If happens, we just temporarily ignore the global limit.
180+
if numIngesters > 0 {
181+
return int((float64(globalLimit) / float64(numIngesters)) * float64(replicationFactor))
182+
}
183+
184+
return 0
185+
}
186+
187+
func minNonZero(first, second int) int {
188+
if first == 0 || (second != 0 && first > second) {
189+
return second
190+
}
191+
192+
return first
193+
}

0 commit comments

Comments
 (0)