Skip to content
This repository was archived by the owner on Jul 19, 2023. It is now read-only.

Commit b6d364b

Browse files
authored
Fix concurrent map access in ingester (#763)
* Fix concurrent map access in ingester * Reduce the duration of the lock by only sweeping
1 parent 877b118 commit b6d364b

File tree

1 file changed

+58
-40
lines changed

1 file changed

+58
-40
lines changed

pkg/phlaredb/profile_store.go

Lines changed: 58 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -54,11 +54,12 @@ type profileStore struct {
5454
rowGroups []*rowGroupOnDisk
5555
index *profilesIndex
5656

57-
flushing *atomic.Bool
58-
flushQueue chan int // channel to signal that a flush is needed for slice[:n]
59-
closeOnce sync.Once
60-
flushWg sync.WaitGroup
61-
flushBuffer []*schemav1.Profile
57+
flushing *atomic.Bool
58+
flushQueue chan int // channel to signal that a flush is needed for slice[:n]
59+
closeOnce sync.Once
60+
flushWg sync.WaitGroup
61+
flushBuffer []*schemav1.Profile
62+
flushBufferLbs []phlaremodel.Labels
6263
}
6364

6465
func newProfileStore(phlarectx context.Context) *profileStore {
@@ -141,31 +142,6 @@ func (s *profileStore) RowGroups() (rowGroups []parquet.RowGroup) {
141142
return rowGroups
142143
}
143144

144-
func (s *profileStore) sortProfile(slice []*schemav1.Profile) {
145-
sort.Slice(slice, func(i, j int) bool {
146-
// first compare the labels, if they don't match return
147-
var (
148-
pI = slice[i]
149-
pJ = slice[j]
150-
lbsI = s.index.profilesPerFP[pI.SeriesFingerprint].lbs
151-
lbsJ = s.index.profilesPerFP[pJ.SeriesFingerprint].lbs
152-
)
153-
if cmp := phlaremodel.CompareLabelPairs(lbsI, lbsJ); cmp != 0 {
154-
return cmp < 0
155-
}
156-
157-
// then compare timenanos, if they don't match return
158-
if pI.TimeNanos < pJ.TimeNanos {
159-
return true
160-
} else if pI.TimeNanos > pJ.TimeNanos {
161-
return false
162-
}
163-
164-
// finally use ID as tie breaker
165-
return bytes.Compare(pI.ID[:], pJ.ID[:]) < 0
166-
})
167-
}
168-
169145
// Flush writes row groups and the index to files on disk.
170146
// The call is thread-safe for reading but adding new profiles
171147
// should not be allowed during and after the call.
@@ -264,10 +240,6 @@ func (s *profileStore) cutRowGroup(count int) (err error) {
264240
return err
265241
}
266242

267-
// order profiles properly
268-
// The slice is never accessed at reads, therefore we can sort it in-place.
269-
s.sortProfile(s.flushBuffer)
270-
271243
n, err := s.writer.Write(s.flushBuffer)
272244
if err != nil {
273245
return errors.Wrap(err, "write row group segments to disk")
@@ -323,18 +295,64 @@ func (s *profileStore) cutRowGroup(count int) (err error) {
323295
return nil
324296
}
325297

326-
// loadProfilesToFlush loads profiles to flush into flushBuffer and returns the size of the profiles.
298+
type byLabels struct {
299+
p []*schemav1.Profile
300+
lbs []phlaremodel.Labels
301+
}
302+
303+
func (b byLabels) Len() int { return len(b.p) }
304+
func (b byLabels) Swap(i, j int) {
305+
b.p[i], b.p[j] = b.p[j], b.p[i]
306+
b.lbs[i], b.lbs[j] = b.lbs[j], b.lbs[i]
307+
}
308+
309+
func (by byLabels) Less(i, j int) bool {
310+
// first compare the labels, if they don't match return
311+
var (
312+
pI = by.p[i]
313+
pJ = by.p[j]
314+
lbsI = by.lbs[i]
315+
lbsJ = by.lbs[j]
316+
)
317+
if cmp := phlaremodel.CompareLabelPairs(lbsI, lbsJ); cmp != 0 {
318+
return cmp < 0
319+
}
320+
321+
// then compare timenanos, if they don't match return
322+
if pI.TimeNanos < pJ.TimeNanos {
323+
return true
324+
} else if pI.TimeNanos > pJ.TimeNanos {
325+
return false
326+
}
327+
328+
// finally use ID as tie breaker
329+
return bytes.Compare(pI.ID[:], pJ.ID[:]) < 0
330+
}
331+
332+
// loadProfilesToFlush loads and sort profiles to flush into flushBuffer and returns the size of the profiles.
327333
func (s *profileStore) loadProfilesToFlush(count int) uint64 {
328-
var size uint64
329-
s.profilesLock.Lock()
330-
defer s.profilesLock.Unlock()
331334
if cap(s.flushBuffer) < count {
332335
s.flushBuffer = make([]*schemav1.Profile, 0, count)
333336
}
337+
if cap(s.flushBufferLbs) < count {
338+
s.flushBufferLbs = make([]phlaremodel.Labels, 0, count)
339+
}
340+
s.flushBufferLbs = s.flushBufferLbs[:0]
334341
s.flushBuffer = s.flushBuffer[:0]
342+
s.profilesLock.Lock()
343+
s.index.mutex.RLock()
335344
for i := 0; i < count; i++ {
336-
size += s.helper.size(s.slice[i])
337-
s.flushBuffer = append(s.flushBuffer, s.slice[i])
345+
profile := s.slice[i]
346+
s.flushBuffer = append(s.flushBuffer, profile)
347+
s.flushBufferLbs = append(s.flushBufferLbs, s.index.profilesPerFP[profile.SeriesFingerprint].lbs)
348+
}
349+
s.profilesLock.Unlock()
350+
s.index.mutex.RUnlock()
351+
// order profiles properly
352+
sort.Sort(byLabels{p: s.flushBuffer, lbs: s.flushBufferLbs})
353+
var size uint64
354+
for _, p := range s.flushBuffer {
355+
size += s.helper.size(p)
338356
}
339357
return size
340358
}

0 commit comments

Comments
 (0)