Skip to content
This repository was archived by the owner on Jul 19, 2023. It is now read-only.

Commit faf62ed

Browse files
authored
Flush profiles rowgroup asynchronously (#760)
1 parent 782a320 commit faf62ed

File tree

3 files changed

+128
-61
lines changed

3 files changed

+128
-61
lines changed

pkg/phlaredb/profile_store.go

Lines changed: 98 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -53,16 +53,25 @@ type profileStore struct {
5353
rowsFlushed uint64
5454
rowGroups []*rowGroupOnDisk
5555
index *profilesIndex
56+
57+
flushing *atomic.Bool
58+
flushQueue chan int // channel to signal that a flush is needed for slice[:n]
59+
closeOnce sync.Once
60+
flushWg sync.WaitGroup
61+
flushBuffer []*schemav1.Profile
5662
}
5763

5864
func newProfileStore(phlarectx context.Context) *profileStore {
5965
s := &profileStore{
60-
logger: phlarecontext.Logger(phlarectx),
61-
metrics: contextHeadMetrics(phlarectx),
62-
persister: &schemav1.ProfilePersister{},
63-
helper: &profilesHelper{},
64-
}
65-
66+
logger: phlarecontext.Logger(phlarectx),
67+
metrics: contextHeadMetrics(phlarectx),
68+
persister: &schemav1.ProfilePersister{},
69+
helper: &profilesHelper{},
70+
flushing: atomic.NewBool(false),
71+
flushQueue: make(chan int),
72+
}
73+
s.flushWg.Add(1)
74+
go s.cutRowGroupLoop()
6675
// Initialize writer on /dev/null
6776
// TODO: Reuse parquet.Writer beyond life time of the head.
6877
s.writer = parquet.NewGenericWriter[*schemav1.Profile](io.Discard, s.persister.Schema(),
@@ -91,6 +100,10 @@ func (s *profileStore) Init(path string, cfg *ParquetConfig, metrics *headMetric
91100
if err := s.Close(); err != nil {
92101
return err
93102
}
103+
s.flushQueue = make(chan int)
104+
s.closeOnce = sync.Once{}
105+
s.flushWg.Add(1)
106+
go s.cutRowGroupLoop()
94107

95108
// create index
96109
s.index, err = newProfileIndex(32, s.metrics)
@@ -110,6 +123,13 @@ func (s *profileStore) Init(path string, cfg *ParquetConfig, metrics *headMetric
110123
}
111124

112125
func (s *profileStore) Close() error {
126+
if s.flushQueue != nil {
127+
s.closeOnce.Do(func() {
128+
close(s.flushQueue)
129+
})
130+
131+
s.flushWg.Wait()
132+
}
113133
return nil
114134
}
115135

@@ -121,34 +141,39 @@ func (s *profileStore) RowGroups() (rowGroups []parquet.RowGroup) {
121141
return rowGroups
122142
}
123143

124-
func (s *profileStore) profileSort(i, j int) bool {
125-
// first compare the labels, if they don't match return
126-
var (
127-
pI = s.slice[i]
128-
pJ = s.slice[j]
129-
lbsI = s.index.profilesPerFP[pI.SeriesFingerprint].lbs
130-
lbsJ = s.index.profilesPerFP[pJ.SeriesFingerprint].lbs
131-
)
132-
if cmp := phlaremodel.CompareLabelPairs(lbsI, lbsJ); cmp != 0 {
133-
return cmp < 0
134-
}
144+
func (s *profileStore) sortProfile(slice []*schemav1.Profile) {
145+
sort.Slice(slice, func(i, j int) bool {
146+
// first compare the labels, if they don't match return
147+
var (
148+
pI = slice[i]
149+
pJ = slice[j]
150+
lbsI = s.index.profilesPerFP[pI.SeriesFingerprint].lbs
151+
lbsJ = s.index.profilesPerFP[pJ.SeriesFingerprint].lbs
152+
)
153+
if cmp := phlaremodel.CompareLabelPairs(lbsI, lbsJ); cmp != 0 {
154+
return cmp < 0
155+
}
135156

136-
// then compare timenanos, if they don't match return
137-
if pI.TimeNanos < pJ.TimeNanos {
138-
return true
139-
} else if pI.TimeNanos > pJ.TimeNanos {
140-
return false
141-
}
157+
// then compare timenanos, if they don't match return
158+
if pI.TimeNanos < pJ.TimeNanos {
159+
return true
160+
} else if pI.TimeNanos > pJ.TimeNanos {
161+
return false
162+
}
142163

143-
// finally use ID as tie breaker
144-
return bytes.Compare(pI.ID[:], pJ.ID[:]) < 0
164+
// finally use ID as tie breaker
165+
return bytes.Compare(pI.ID[:], pJ.ID[:]) < 0
166+
})
145167
}
146168

147169
// Flush writes row groups and the index to files on disk.
148170
// The call is thread-safe for reading but adding new profiles
149171
// should not be allowed during and after the call.
150172
func (s *profileStore) Flush(ctx context.Context) (numRows uint64, numRowGroups uint64, err error) {
151-
if err = s.cutRowGroup(); err != nil {
173+
if err := s.Close(); err != nil {
174+
return 0, 0, err
175+
}
176+
if err = s.cutRowGroup(len(s.slice)); err != nil {
152177
return 0, 0, err
153178
}
154179

@@ -216,17 +241,16 @@ func (s *profileStore) prepareFile(path string) (f *os.File, err error) {
216241
// See index.cutRowGroup: we could find a way to not flush all the in-memory
217242
// profiles, including ones added since the start of the call, but only those
218243
// that were added before certain point (this call). The same for s.slice.
219-
func (s *profileStore) cutRowGroup() (err error) {
244+
func (s *profileStore) cutRowGroup(count int) (err error) {
220245
// if cutRowGroup fails record it as failed segment
221246
defer func() {
222247
if err != nil {
223248
s.metrics.writtenProfileSegments.WithLabelValues("failed").Inc()
224249
}
225250
}()
226251

227-
// do nothing with empty buffer
228-
bufferRowNums := len(s.slice)
229-
if bufferRowNums == 0 {
252+
size := s.loadProfilesToFlush(count)
253+
if len(s.flushBuffer) == 0 {
230254
return nil
231255
}
232256

@@ -242,9 +266,9 @@ func (s *profileStore) cutRowGroup() (err error) {
242266

243267
// order profiles properly
244268
// The slice is never accessed at reads, therefore we can sort it in-place.
245-
sort.Slice(s.slice, s.profileSort)
269+
s.sortProfile(s.flushBuffer)
246270

247-
n, err := s.writer.Write(s.slice)
271+
n, err := s.writer.Write(s.flushBuffer)
248272
if err != nil {
249273
return errors.Wrap(err, "write row group segments to disk")
250274
}
@@ -277,25 +301,44 @@ func (s *profileStore) cutRowGroup() (err error) {
277301
s.rowsFlushed += uint64(n)
278302
s.rowGroups = append(s.rowGroups, rowGroup)
279303
// Cutting the index is relatively quick op (no I/O).
280-
err = s.index.cutRowGroup(s.slice)
281-
// After the lock is released, rows/profiles should be read from the disk.
282-
s.rowsLock.Unlock()
283-
for i := range s.slice {
304+
err = s.index.cutRowGroup(s.flushBuffer)
305+
306+
s.profilesLock.Lock()
307+
defer s.profilesLock.Unlock()
308+
for i := range s.slice[:count] {
284309
// don't retain profiles and samples in memory as re-slice.
285310
s.slice[i] = nil
286311
}
287312
// reset slice and metrics
288-
s.slice = s.slice[:0]
289-
s.size.Store(0)
313+
s.slice = s.slice[count:]
314+
currentSize := s.size.Sub(size)
290315
if err != nil {
291316
return err
292317
}
293318

294319
level.Debug(s.logger).Log("msg", "cut row group segment", "path", path, "numProfiles", n)
295-
s.metrics.sizeBytes.WithLabelValues(s.Name()).Set(0)
320+
s.metrics.sizeBytes.WithLabelValues(s.Name()).Set(float64(currentSize))
321+
// After the lock is released, rows/profiles should be read from the disk.
322+
s.rowsLock.Unlock()
296323
return nil
297324
}
298325

326+
// loadProfilesToFlush loads profiles to flush into flushBuffer and returns the size of the profiles.
327+
func (s *profileStore) loadProfilesToFlush(count int) uint64 {
328+
var size uint64
329+
s.profilesLock.Lock()
330+
defer s.profilesLock.Unlock()
331+
if cap(s.flushBuffer) < count {
332+
s.flushBuffer = make([]*schemav1.Profile, 0, count)
333+
}
334+
s.flushBuffer = s.flushBuffer[:0]
335+
for i := 0; i < count; i++ {
336+
size += s.helper.size(s.slice[i])
337+
s.flushBuffer = append(s.flushBuffer, s.slice[i])
338+
}
339+
return size
340+
}
341+
299342
func (s *profileStore) writeRowGroups(path string, rowGroups []parquet.RowGroup) (n uint64, numRowGroups uint64, err error) {
300343
fileCloser, err := s.prepareFile(path)
301344
if err != nil {
@@ -340,11 +383,12 @@ func (s *profileStore) ingest(_ context.Context, profiles []*schemav1.Profile, l
340383
defer s.profilesLock.Unlock()
341384

342385
for pos, p := range profiles {
343-
// check if row group is full
344-
if s.cfg.MaxBufferRowCount > 0 && len(s.slice) >= s.cfg.MaxBufferRowCount ||
345-
s.cfg.MaxRowGroupBytes > 0 && s.size.Load() >= s.cfg.MaxRowGroupBytes {
346-
if err := s.cutRowGroup(); err != nil {
347-
return err
386+
if !s.flushing.Load() {
387+
// check if row group is full
388+
if s.cfg.MaxBufferRowCount > 0 && len(s.slice) >= s.cfg.MaxBufferRowCount ||
389+
s.cfg.MaxRowGroupBytes > 0 && s.size.Load() >= s.cfg.MaxRowGroupBytes {
390+
s.flushing.Store(true)
391+
s.flushQueue <- len(s.slice)
348392
}
349393
}
350394

@@ -364,6 +408,16 @@ func (s *profileStore) ingest(_ context.Context, profiles []*schemav1.Profile, l
364408
return nil
365409
}
366410

411+
func (s *profileStore) cutRowGroupLoop() {
412+
defer s.flushWg.Done()
413+
for n := range s.flushQueue {
414+
if err := s.cutRowGroup(n); err != nil {
415+
level.Error(s.logger).Log("msg", "cutting row group", "err", err)
416+
}
417+
s.flushing.Store(false)
418+
}
419+
}
420+
367421
type rowGroupOnDisk struct {
368422
parquet.RowGroup
369423
file *os.File

pkg/phlaredb/profile_store_test.go

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,6 @@ func nProfileStreams(n int) func(int) *testProfile {
124124

125125
tp.populateFingerprint()
126126
return tp
127-
128127
}
129128
}
130129

@@ -216,6 +215,9 @@ func TestProfileStore_RowGroupSplitting(t *testing.T) {
216215
for i := 0; i < 100; i++ {
217216
p := tc.values(i)
218217
require.NoError(t, store.ingest(ctx, []*schemav1.Profile{&p.p}, p.lbls, p.profileName, emptyRewriter()))
218+
for store.flushing.Load() {
219+
time.Sleep(time.Millisecond)
220+
}
219221
}
220222

221223
// ensure the correct number of files are created
@@ -323,7 +325,7 @@ func BenchmarkFlush(b *testing.B) {
323325
p.p.Samples = samples
324326
require.NoError(b, store.ingest(ctx, []*schemav1.Profile{&p.p}, p.lbls, p.profileName, rw))
325327
}
326-
require.NoError(b, store.cutRowGroup())
328+
require.NoError(b, store.cutRowGroup(len(store.slice)))
327329
}
328330
b.StartTimer()
329331
_, _, err := store.Flush(context.Background())
@@ -361,7 +363,16 @@ func TestProfileStore_Querying(t *testing.T) {
361363
head.profiles.cfg = &ParquetConfig{MaxRowGroupBytes: 128000, MaxBufferRowCount: 3}
362364

363365
for i := 0; i < 9; i++ {
364-
require.NoError(t, ingestThreeProfileStreams(ctx, i, head.Ingest))
366+
require.NoError(t, ingestThreeProfileStreams(ctx, i, func(ctx context.Context, p *profilev1.Profile, u uuid.UUID, lp ...*typesv1.LabelPair) error {
367+
defer func() {
368+
// wait for the profile to be flushed
369+
// todo(cyriltovena): We shouldn't need this, but when calling head.Queriers(), flushing row group and then querying using the queriers previously returned we will miss the new headDiskQuerier.
370+
for head.profiles.flushing.Load() {
371+
time.Sleep(time.Millisecond)
372+
}
373+
}()
374+
return head.Ingest(ctx, p, u, lp...)
375+
}))
365376
}
366377

367378
// now query the store
@@ -372,10 +383,8 @@ func TestProfileStore_Querying(t *testing.T) {
372383
Type: mustParseProfileSelector(t, "process_cpu:cpu:nanoseconds:cpu:nanoseconds"),
373384
}
374385

375-
queriers := head.Queriers()
376-
377386
t.Run("select matching profiles", func(t *testing.T) {
378-
pIt, err := queriers.SelectMatchingProfiles(ctx, params)
387+
pIt, err := head.Queriers().SelectMatchingProfiles(ctx, params)
379388
require.NoError(t, err)
380389

381390
// ensure we see the profiles we expect
@@ -387,7 +396,7 @@ func TestProfileStore_Querying(t *testing.T) {
387396
})
388397

389398
t.Run("merge by labels", func(t *testing.T) {
390-
client, cleanup := queriers.ingesterClient()
399+
client, cleanup := head.Queriers().ingesterClient()
391400
defer cleanup()
392401

393402
bidi := client.MergeProfilesLabels(ctx)
@@ -453,7 +462,7 @@ func TestProfileStore_Querying(t *testing.T) {
453462
})
454463

455464
t.Run("merge by stacktraces", func(t *testing.T) {
456-
client, cleanup := queriers.ingesterClient()
465+
client, cleanup := head.Queriers().ingesterClient()
457466
defer cleanup()
458467

459468
bidi := client.MergeProfilesStacktraces(ctx)
@@ -501,7 +510,7 @@ func TestProfileStore_Querying(t *testing.T) {
501510
})
502511

503512
t.Run("merge by pprof", func(t *testing.T) {
504-
client, cleanup := queriers.ingesterClient()
513+
client, cleanup := head.Queriers().ingesterClient()
505514
defer cleanup()
506515

507516
bidi := client.MergeProfilesPprof(ctx)

pkg/phlaredb/profiles.go

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -433,10 +433,12 @@ func (pi *profilesIndex) writeTo(ctx context.Context, path string) ([][]rowRange
433433
return rangesPerRG, writer.Close()
434434
}
435435

436-
func (pl *profilesIndex) cutRowGroup(rgProfiles []*schemav1.Profile) error {
436+
func (pi *profilesIndex) cutRowGroup(rgProfiles []*schemav1.Profile) error {
437437
// adding rowGroup and rowNum information per fingerprint
438-
rowRangePerFP := make(map[model.Fingerprint]*rowRange, len(pl.profilesPerFP))
438+
rowRangePerFP := make(map[model.Fingerprint]*rowRange, len(pi.profilesPerFP))
439+
countPerFP := make(map[model.Fingerprint]int, len(pi.profilesPerFP))
439440
for rowNum, p := range rgProfiles {
441+
countPerFP[p.SeriesFingerprint]++
440442
if _, ok := rowRangePerFP[p.SeriesFingerprint]; !ok {
441443
rowRangePerFP[p.SeriesFingerprint] = &rowRange{
442444
rowNum: int64(rowNum),
@@ -452,18 +454,19 @@ func (pl *profilesIndex) cutRowGroup(rgProfiles []*schemav1.Profile) error {
452454
}
453455
}
454456

455-
pl.mutex.Lock()
456-
defer pl.mutex.Unlock()
457+
pi.mutex.Lock()
458+
defer pi.mutex.Unlock()
457459

458-
pl.rowGroupsOnDisk += 1
460+
pi.rowGroupsOnDisk += 1
459461

460-
for _, ps := range pl.profilesPerFP {
462+
for fp, ps := range pi.profilesPerFP {
463+
count := countPerFP[fp]
461464
// empty all in memory profiles
462-
for i := range ps.profiles {
465+
for i := range ps.profiles[:count] {
463466
// Allow GC to evict the object.
464467
ps.profiles[i] = nil
465468
}
466-
ps.profiles = ps.profiles[:0]
469+
ps.profiles = ps.profiles[count:]
467470

468471
// attach rowGroup and rowNum information
469472
rowRange := rowRangePerFP[ps.fp]
@@ -472,6 +475,7 @@ func (pl *profilesIndex) cutRowGroup(rgProfiles []*schemav1.Profile) error {
472475
ps.profilesOnDisk,
473476
rowRange,
474477
)
478+
475479
}
476480

477481
return nil

0 commit comments

Comments
 (0)