@@ -6,12 +6,16 @@ import (
66 "flag"
77 "fmt"
88 "hash/fnv"
9+ "net/http"
910 "strconv"
1011 "time"
1112
1213 "github.com/bufbuild/connect-go"
14+ "github.com/dustin/go-humanize"
1315 "github.com/go-kit/log"
1416 "github.com/google/uuid"
17+ "github.com/grafana/dskit/kv"
18+ "github.com/grafana/dskit/limiter"
1519 "github.com/grafana/dskit/ring"
1620 ring_client "github.com/grafana/dskit/ring/client"
1721 "github.com/grafana/dskit/services"
@@ -29,12 +33,23 @@ import (
2933 "github.com/grafana/phlare/pkg/pprof"
3034 "github.com/grafana/phlare/pkg/tenant"
3135 "github.com/grafana/phlare/pkg/usagestats"
36+ "github.com/grafana/phlare/pkg/util"
37+ "github.com/grafana/phlare/pkg/validation"
3238)
3339
3440type PushClient interface {
3541 Push (context.Context , * connect.Request [pushv1.PushRequest ]) (* connect.Response [pushv1.PushResponse ], error )
3642}
3743
44+ const (
45+ // distributorRingKey is the key under which we store the distributors ring in the KVStore.
46+ distributorRingKey = "distributor"
47+
48+ // ringAutoForgetUnhealthyPeriods is how many consecutive timeout periods an unhealthy instance
49+ // in the ring will be automatically removed after.
50+ ringAutoForgetUnhealthyPeriods = 10
51+ )
52+
3853// todo: move to non global metrics.
3954var (
4055 clients = promauto .NewGauge (prometheus.GaugeOpts {
@@ -52,12 +67,16 @@ var (
5267type Config struct {
5368 PushTimeout time.Duration
5469 PoolConfig clientpool.PoolConfig `yaml:"pool_config,omitempty"`
70+
71+ // Distributors ring
72+ DistributorRing RingConfig `yaml:"ring" doc:"hidden"`
5573}
5674
5775// RegisterFlags registers distributor-related flags.
5876func (cfg * Config ) RegisterFlags (fs * flag.FlagSet ) {
5977 cfg .PoolConfig .RegisterFlagsWithPrefix ("distributor" , fs )
6078 fs .DurationVar (& cfg .PushTimeout , "distributor.push.timeout" , 5 * time .Second , "Timeout when pushing data to ingester." )
79+ cfg .DistributorRing .RegisterFlags (fs )
6180}
6281
6382// Distributor coordinates replicates and distribution of log streams.
@@ -66,30 +85,64 @@ type Distributor struct {
6685 logger log.Logger
6786
6887 cfg Config
88+ limits Limits
6989 ingestersRing ring.ReadRing
7090 pool * ring_client.Pool
7191
92+ // The global rate limiter requires a distributors ring to count
93+ // the number of healthy instances
94+ distributorsLifecycler * ring.BasicLifecycler
95+ distributorsRing * ring.Ring
96+ healthyInstancesCount * atomic.Uint32
97+ ingestionRateLimiter * limiter.RateLimiter
98+
7299 subservices * services.Manager
73100 subservicesWatcher * services.FailureWatcher
74101
75102 metrics * metrics
76103}
77104
78- func New (cfg Config , ingestersRing ring.ReadRing , factory ring_client.PoolFactory , reg prometheus.Registerer , logger log.Logger , clientsOptions ... connect.ClientOption ) (* Distributor , error ) {
105+ type Limits interface {
106+ IngestionRateBytes (tenantID string ) float64
107+ IngestionBurstSizeBytes (tenantID string ) int
108+ MaxLabelNameLength (userID string ) int
109+ MaxLabelValueLength (userID string ) int
110+ MaxLabelNamesPerSeries (userID string ) int
111+ }
112+
113+ func New (cfg Config , ingestersRing ring.ReadRing , factory ring_client.PoolFactory , limits Limits , reg prometheus.Registerer , logger log.Logger , clientsOptions ... connect.ClientOption ) (* Distributor , error ) {
79114 d := & Distributor {
80- cfg : cfg ,
81- logger : logger ,
82- ingestersRing : ingestersRing ,
83- pool : clientpool .NewPool (cfg .PoolConfig , ingestersRing , factory , clients , logger , clientsOptions ... ),
84- metrics : newMetrics (reg ),
115+ cfg : cfg ,
116+ logger : logger ,
117+ ingestersRing : ingestersRing ,
118+ pool : clientpool .NewPool (cfg .PoolConfig , ingestersRing , factory , clients , logger , clientsOptions ... ),
119+ metrics : newMetrics (reg ),
120+ healthyInstancesCount : atomic .NewUint32 (0 ),
121+ limits : limits ,
85122 }
86123 var err error
87- d .subservices , err = services .NewManager (d .pool )
124+
125+ subservices := []services.Service (nil )
126+ subservices = append (subservices , d .pool )
127+
128+ distributorsRing , distributorsLifecycler , err := newRingAndLifecycler (cfg .DistributorRing , d .healthyInstancesCount , logger , reg )
129+ if err != nil {
130+ return nil , err
131+ }
132+
133+ subservices = append (subservices , distributorsLifecycler , distributorsRing )
134+
135+ d .ingestionRateLimiter = limiter .NewRateLimiter (newGlobalRateStrategy (newIngestionRateStrategy (limits ), d ), 10 * time .Second )
136+ d .distributorsLifecycler = distributorsLifecycler
137+ d .distributorsRing = distributorsRing
138+
139+ d .subservices , err = services .NewManager (subservices ... )
88140 if err != nil {
89141 return nil , errors .Wrap (err , "services manager" )
90142 }
91143 d .subservicesWatcher = services .NewFailureWatcher ()
92144 d .subservicesWatcher .WatchManager (d .subservices )
145+
93146 d .Service = services .NewBasicService (d .starting , d .running , d .stopping )
94147 rfStats .Set (int64 (ingestersRing .ReplicationFactor ()))
95148 return d , nil
@@ -115,29 +168,37 @@ func (d *Distributor) stopping(_ error) error {
115168func (d * Distributor ) Push (ctx context.Context , req * connect.Request [pushv1.PushRequest ]) (* connect.Response [pushv1.PushResponse ], error ) {
116169 tenantID , err := tenant .ExtractTenantIDFromContext (ctx )
117170 if err != nil {
118- return nil , connect .NewError (connect .CodeInvalidArgument , err )
171+ return nil , connect .NewError (connect .CodeUnauthenticated , err )
119172 }
120173 var (
121- keys = make ([]uint32 , 0 , len (req .Msg .Series ))
122- profiles = make ([]* profileTracker , 0 , len (req .Msg .Series ))
174+ keys = make ([]uint32 , 0 , len (req .Msg .Series ))
175+ profiles = make ([]* profileTracker , 0 , len (req .Msg .Series ))
176+ totalPushUncompressedBytes int64
177+ totalProfiles int64
123178 )
124179
125180 for _ , series := range req .Msg .Series {
181+ // include the labels in the size calculation
182+ for _ , lbs := range series .Labels {
183+ totalPushUncompressedBytes += int64 (len (lbs .Name ))
184+ totalPushUncompressedBytes += int64 (len (lbs .Value ))
185+ }
126186 keys = append (keys , TokenFor (tenantID , labelsString (series .Labels )))
127187 profName := phlaremodel .Labels (series .Labels ).Get (scrape .ProfileName )
128188 for _ , raw := range series .Samples {
129189 usagestats .NewCounter (fmt .Sprintf ("distributor_profile_type_%s_received" , profName )).Inc (1 )
130190 profileReceivedStats .Inc (1 )
131191 bytesReceivedTotalStats .Inc (int64 (len (raw .RawProfile )))
132192 bytesReceivedStats .Record (float64 (len (raw .RawProfile )))
133- d .metrics .receivedCompressedBytes .WithLabelValues (profName ).Observe (float64 (len (raw .RawProfile )))
193+ totalProfiles ++
194+ d .metrics .receivedCompressedBytes .WithLabelValues (profName , tenantID ).Observe (float64 (len (raw .RawProfile )))
134195 p , err := pprof .RawFromBytes (raw .RawProfile )
135196 if err != nil {
136- return nil , err
197+ return nil , connect . NewError ( connect . CodeInvalidArgument , err )
137198 }
138- d .metrics .receivedDecompressedBytes .WithLabelValues (profName ).Observe (float64 (p .SizeBytes ()))
139- d .metrics .receivedSamples .WithLabelValues (profName ).Observe (float64 (len (p .Sample )))
140-
199+ d .metrics .receivedDecompressedBytes .WithLabelValues (profName , tenantID ).Observe (float64 (p .SizeBytes ()))
200+ d .metrics .receivedSamples .WithLabelValues (profName , tenantID ).Observe (float64 (len (p .Sample )))
201+ totalPushUncompressedBytes += int64 ( p . SizeBytes ())
141202 p .Normalize ()
142203
143204 // zip the data back into the buffer
@@ -153,6 +214,24 @@ func (d *Distributor) Push(ctx context.Context, req *connect.Request[pushv1.Push
153214 profiles = append (profiles , & profileTracker {profile : series })
154215 }
155216
217+ // validate the request
218+ for _ , series := range req .Msg .Series {
219+ if err := validation .ValidateLabels (d .limits , tenantID , series .Labels ); err != nil {
220+ validation .DiscardedProfiles .WithLabelValues (string (validation .ReasonOf (err )), tenantID ).Add (float64 (totalProfiles ))
221+ validation .DiscardedBytes .WithLabelValues (string (validation .ReasonOf (err )), tenantID ).Add (float64 (totalPushUncompressedBytes ))
222+ return nil , connect .NewError (connect .CodeInvalidArgument , err )
223+ }
224+ }
225+
226+ // rate limit the request
227+ if ! d .ingestionRateLimiter .AllowN (time .Now (), tenantID , int (totalPushUncompressedBytes )) {
228+ validation .DiscardedProfiles .WithLabelValues (string (validation .RateLimited ), tenantID ).Add (float64 (totalProfiles ))
229+ validation .DiscardedBytes .WithLabelValues (string (validation .RateLimited ), tenantID ).Add (float64 (totalPushUncompressedBytes ))
230+ return nil , connect .NewError (connect .CodeResourceExhausted ,
231+ fmt .Errorf ("push rate limit (%s) exceeded while adding %s" , humanize .Bytes (uint64 (d .limits .IngestionRateBytes (tenantID ))), humanize .Bytes (uint64 (totalPushUncompressedBytes ))),
232+ )
233+ }
234+
156235 const maxExpectedReplicationSet = 5 // typical replication factor 3 plus one for inactive plus one for luck
157236 var descs [maxExpectedReplicationSet ]ring.InstanceDesc
158237
@@ -245,6 +324,35 @@ func (d *Distributor) sendProfilesErr(ctx context.Context, ingester ring.Instanc
245324 return err
246325}
247326
327+ func (d * Distributor ) ServeHTTP (w http.ResponseWriter , req * http.Request ) {
328+ if d .distributorsRing != nil {
329+ d .distributorsRing .ServeHTTP (w , req )
330+ } else {
331+ ringNotEnabledPage := `
332+ <!DOCTYPE html>
333+ <html>
334+ <head>
335+ <meta charset="UTF-8">
336+ <title>Distributor Status</title>
337+ </head>
338+ <body>
339+ <h1>Distributor Status</h1>
340+ <p>Distributor is not running with global limits enabled</p>
341+ </body>
342+ </html>`
343+ util .WriteHTMLResponse (w , ringNotEnabledPage )
344+ }
345+ }
346+
347+ // HealthyInstancesCount implements the ReadLifecycler interface
348+ //
349+ // We use a ring lifecycler delegate to count the number of members of the
350+ // ring. The count is then used to enforce rate limiting correctly for each
351+ // distributor. $EFFECTIVE_RATE_LIMIT = $GLOBAL_RATE_LIMIT / $NUM_INSTANCES
352+ func (d * Distributor ) HealthyInstancesCount () int {
353+ return int (d .healthyInstancesCount .Load ())
354+ }
355+
248356type profileTracker struct {
249357 profile * pushv1.RawProfileSeries
250358 minSuccess int
@@ -283,3 +391,35 @@ func TokenFor(tenantID, labels string) uint32 {
283391 _ , _ = h .Write ([]byte (labels ))
284392 return h .Sum32 ()
285393}
394+
395+ // newRingAndLifecycler creates a new distributor ring and lifecycler with all required lifecycler delegates
396+ func newRingAndLifecycler (cfg RingConfig , instanceCount * atomic.Uint32 , logger log.Logger , reg prometheus.Registerer ) (* ring.Ring , * ring.BasicLifecycler , error ) {
397+ reg = prometheus .WrapRegistererWithPrefix ("phlare_" , reg )
398+ kvStore , err := kv .NewClient (cfg .KVStore , ring .GetCodec (), kv .RegistererWithKVName (reg , "distributor-lifecycler" ), logger )
399+ if err != nil {
400+ return nil , nil , errors .Wrap (err , "failed to initialize distributors' KV store" )
401+ }
402+
403+ lifecyclerCfg , err := cfg .ToBasicLifecyclerConfig (logger )
404+ if err != nil {
405+ return nil , nil , errors .Wrap (err , "failed to build distributors' lifecycler config" )
406+ }
407+
408+ var delegate ring.BasicLifecyclerDelegate
409+ delegate = ring .NewInstanceRegisterDelegate (ring .ACTIVE , lifecyclerCfg .NumTokens )
410+ delegate = newHealthyInstanceDelegate (instanceCount , cfg .HeartbeatTimeout , delegate )
411+ delegate = ring .NewLeaveOnStoppingDelegate (delegate , logger )
412+ delegate = ring .NewAutoForgetDelegate (ringAutoForgetUnhealthyPeriods * cfg .HeartbeatTimeout , delegate , logger )
413+
414+ distributorsLifecycler , err := ring .NewBasicLifecycler (lifecyclerCfg , "distributor" , distributorRingKey , kvStore , delegate , logger , reg )
415+ if err != nil {
416+ return nil , nil , errors .Wrap (err , "failed to initialize distributors' lifecycler" )
417+ }
418+
419+ distributorsRing , err := ring .New (cfg .ToRingConfig (), "distributor" , distributorRingKey , logger , reg )
420+ if err != nil {
421+ return nil , nil , errors .Wrap (err , "failed to initialize distributors' ring client" )
422+ }
423+
424+ return distributorsRing , distributorsLifecycler , nil
425+ }
0 commit comments