Skip to content
This repository was archived by the owner on Jul 19, 2023. It is now read-only.

Commit 06093f8

Browse files
authored
Fix queries and storage block operations synchronisation (#699)
* Fix ingest/flush race * Allow concurrent reads in cutRowGroup * profileStore synchronization * alloc dedup slice buffer * Fix block eviction synchronisation
1 parent d303348 commit 06093f8

File tree

11 files changed

+264
-131
lines changed

11 files changed

+264
-131
lines changed

pkg/ingester/ingester.go

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -193,7 +193,7 @@ func (i *Ingester) forInstance(ctx context.Context, f func(*instance) error) err
193193
return f(instance)
194194
}
195195

196-
func (i *Ingester) evictBlock(tenantID string, b ulid.ULID, fn func() error) error {
196+
func (i *Ingester) evictBlock(tenantID string, b ulid.ULID, fn func() error) (err error) {
197197
// We lock instances map for writes to ensure that no new instances are
198198
// created during the procedure. Otherwise, during initialization, the
199199
// new PhlareDB instance may try to load a block that has already been
@@ -204,12 +204,18 @@ func (i *Ingester) evictBlock(tenantID string, b ulid.ULID, fn func() error) err
204204
// the process start, therefore there is no guarantee that we will find the
205205
// discovered candidate block there. If it is the case, we have to ensure that
206206
// the block won't be accessed, before and during deleting it from the disk.
207+
var evicted bool
207208
if tenantInstance, ok := i.instances[tenantID]; ok {
208-
if _, err := tenantInstance.Evict(b); err != nil {
209+
if evicted, err = tenantInstance.Evict(b, fn); err != nil {
209210
return fmt.Errorf("failed to evict block %s/%s: %w", tenantID, b, err)
210211
}
211212
}
212-
return fn()
213+
// If the instance is not found, or the querier is not aware of the block,
214+
// and thus the callback has not been invoked, do it now.
215+
if !evicted {
216+
return fn()
217+
}
218+
return nil
213219
}
214220

215221
func (i *Ingester) Push(ctx context.Context, req *connect.Request[pushv1.PushRequest]) (*connect.Response[pushv1.PushResponse], error) {
@@ -222,7 +228,7 @@ func (i *Ingester) Push(ctx context.Context, req *connect.Request[pushv1.PushReq
222228
if err != nil {
223229
return err
224230
}
225-
if err = instance.Head().Ingest(ctx, p, id, series.Labels...); err != nil {
231+
if err = instance.Ingest(ctx, p, id, series.Labels...); err != nil {
226232
reason := validation.ReasonOf(err)
227233
if reason != validation.Unknown {
228234
validation.DiscardedProfiles.WithLabelValues(string(reason), instance.tenantID).Add(float64(1))

pkg/ingester/query.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,28 +12,28 @@ import (
1212
// LabelValues returns the possible label values for a given label name.
1313
func (i *Ingester) LabelValues(ctx context.Context, req *connect.Request[typesv1.LabelValuesRequest]) (*connect.Response[typesv1.LabelValuesResponse], error) {
1414
return forInstanceUnary(ctx, i, func(instance *instance) (*connect.Response[typesv1.LabelValuesResponse], error) {
15-
return instance.Head().LabelValues(ctx, req)
15+
return instance.LabelValues(ctx, req)
1616
})
1717
}
1818

1919
// LabelNames returns the possible label names.
2020
func (i *Ingester) LabelNames(ctx context.Context, req *connect.Request[typesv1.LabelNamesRequest]) (*connect.Response[typesv1.LabelNamesResponse], error) {
2121
return forInstanceUnary(ctx, i, func(instance *instance) (*connect.Response[typesv1.LabelNamesResponse], error) {
22-
return instance.Head().LabelNames(ctx, req)
22+
return instance.LabelNames(ctx, req)
2323
})
2424
}
2525

2626
// ProfileTypes returns the possible profile types.
2727
func (i *Ingester) ProfileTypes(ctx context.Context, req *connect.Request[ingestv1.ProfileTypesRequest]) (*connect.Response[ingestv1.ProfileTypesResponse], error) {
2828
return forInstanceUnary(ctx, i, func(instance *instance) (*connect.Response[ingestv1.ProfileTypesResponse], error) {
29-
return instance.Head().ProfileTypes(ctx, req)
29+
return instance.ProfileTypes(ctx, req)
3030
})
3131
}
3232

3333
// Series returns labels series for the given set of matchers.
3434
func (i *Ingester) Series(ctx context.Context, req *connect.Request[ingestv1.SeriesRequest]) (*connect.Response[ingestv1.SeriesResponse], error) {
3535
return forInstanceUnary(ctx, i, func(instance *instance) (*connect.Response[ingestv1.SeriesResponse], error) {
36-
return instance.Head().Series(ctx, req)
36+
return instance.Series(ctx, req)
3737
})
3838
}
3939

pkg/ingester/retention.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -76,9 +76,10 @@ func (*realFileSystem) RemoveAll(path string) error { return os.R
7676

7777
// blockEvicter unloads blocks from tenant instance.
7878
type blockEvicter interface {
79-
// evictBlock evicts the block by its ID for the given tenant and invokes
80-
// fn callback, if the tenant is found. The call is thread-safe: tenant
81-
// can't be added or removed during the execution.
79+
// evictBlock evicts the block by its ID from the memory and
80+
// invokes fn callback, regardless of if the tenant is found.
81+
// The call is thread-safe: tenant can't be added or removed
82+
// during the execution.
8283
evictBlock(tenant string, b ulid.ULID, fn func() error) error
8384
}
8485

pkg/phlaredb/block_querier.go

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -225,10 +225,25 @@ func (b *BlockQuerier) Sync(ctx context.Context) error {
225225
return nil
226226
}
227227

228+
func (b *BlockQuerier) AddBlockQuerierByMeta(m *block.Meta) {
229+
q := newSingleBlockQuerierFromMeta(b.phlarectx, b.bucketReader, m)
230+
b.queriersLock.Lock()
231+
defer b.queriersLock.Unlock()
232+
i := sort.Search(len(b.queriers), func(i int) bool {
233+
return b.queriers[i].meta.MinTime >= m.MinTime
234+
})
235+
if i < len(b.queriers) && b.queriers[i].meta.ULID == m.ULID {
236+
// Block with this meta is already present, skipping.
237+
return
238+
}
239+
b.queriers = append(b.queriers, q) // Ensure we have enough capacity.
240+
copy(b.queriers[i+1:], b.queriers[i:])
241+
b.queriers[i] = q
242+
}
243+
228244
// evict removes the block with the given ULID from the querier.
229245
func (b *BlockQuerier) evict(blockID ulid.ULID) (bool, error) {
230246
b.queriersLock.Lock()
231-
defer b.queriersLock.Unlock()
232247
// N.B: queriers are sorted by meta.MinTime.
233248
j := -1
234249
for i, q := range b.queriers {
@@ -238,17 +253,16 @@ func (b *BlockQuerier) evict(blockID ulid.ULID) (bool, error) {
238253
}
239254
}
240255
if j < 0 {
256+
b.queriersLock.Unlock()
241257
return false, nil
242258
}
243259
blockQuerier := b.queriers[j]
244-
if err := blockQuerier.Close(); err != nil {
245-
return true, err
246-
}
247260
// Delete the querier from the slice and make it eligible for GC.
248261
copy(b.queriers[j:], b.queriers[j+1:])
249262
b.queriers[len(b.queriers)-1] = nil
250263
b.queriers = b.queriers[:len(b.queriers)-1]
251-
return true, nil
264+
b.queriersLock.Unlock()
265+
return true, blockQuerier.Close()
252266
}
253267

254268
func (b *BlockQuerier) Close() error {

pkg/phlaredb/deduplicating_slice.go

Lines changed: 12 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,6 @@ type deduplicatingSlice[M Models, K comparable, H Helper[M, K], P schemav1.Persi
4242
metrics *headMetrics
4343
writer *parquet.GenericWriter[P]
4444

45-
buffer *parquet.Buffer
4645
rowsFlushed int
4746
}
4847

@@ -110,17 +109,14 @@ func (s *deduplicatingSlice[M, K, H, P]) maxRowsPerRowGroup() int {
110109
}
111110

112111
func (s *deduplicatingSlice[M, K, H, P]) Flush(ctx context.Context) (numRows uint64, numRowGroups uint64, err error) {
113-
s.lock.Lock()
114-
defer s.lock.Unlock()
115-
116-
// intialise buffer if not existing
117-
if s.buffer == nil {
118-
s.buffer = parquet.NewBuffer(
119-
s.persister.Schema(),
120-
parquet.SortingRowGroupConfig(s.persister.SortingColumns()),
121-
parquet.ColumnBufferCapacity(s.cfg.MaxBufferRowCount),
122-
)
123-
}
112+
s.lock.RLock()
113+
defer s.lock.RUnlock()
114+
115+
buffer := parquet.NewBuffer(
116+
s.persister.Schema(),
117+
parquet.SortingRowGroupConfig(s.persister.SortingColumns()),
118+
parquet.ColumnBufferCapacity(s.cfg.MaxBufferRowCount),
119+
)
124120

125121
var (
126122
maxRows = s.maxRowsPerRowGroup()
@@ -153,14 +149,14 @@ func (s *deduplicatingSlice[M, K, H, P]) Flush(ctx context.Context) (numRows uin
153149
rows[pos] = s.persister.Deconstruct(rows[pos], uint64(slicePos), s.slice[slicePos])
154150
}
155151

156-
s.buffer.Reset()
157-
if _, err := s.buffer.WriteRows(rows); err != nil {
152+
buffer.Reset()
153+
if _, err = buffer.WriteRows(rows); err != nil {
158154
return 0, 0, err
159155
}
160156

161-
sort.Sort(s.buffer)
157+
sort.Sort(buffer)
162158

163-
if _, err := s.writer.WriteRowGroup(s.buffer); err != nil {
159+
if _, err = s.writer.WriteRowGroup(buffer); err != nil {
164160
return 0, 0, err
165161
}
166162

pkg/phlaredb/head.go

Lines changed: 27 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -127,9 +127,9 @@ type Head struct {
127127
headPath string // path while block is actively appended to
128128
localPath string // path once block has been cut
129129

130-
flushCh chan struct{} // this channel is closed once the Head should be flushed, should be used externally
131-
132-
flushForcedTimer *time.Timer // this timer will phlare after the maximum
130+
inFlightProfiles sync.WaitGroup // ongoing ingestion requests.
131+
flushCh chan struct{} // this channel is closed once the Head should be flushed, should be used externally
132+
flushForcedTimer *time.Timer // this timer will phlare after the maximum
133133

134134
metaLock sync.RWMutex
135135
meta *block.Meta
@@ -569,8 +569,8 @@ func (h *Head) InRange(start, end model.Time) bool {
569569

570570
// Returns underlying queries, the queriers should be roughly ordered in TS increasing order
571571
func (h *Head) Queriers() Queriers {
572-
h.profiles.lock.RLock()
573-
defer h.profiles.lock.RUnlock()
572+
h.profiles.rowsLock.RLock()
573+
defer h.profiles.rowsLock.RUnlock()
574574

575575
queriers := make([]Querier, 0, len(h.profiles.rowGroups)+1)
576576
for idx := range h.profiles.rowGroups {
@@ -865,7 +865,9 @@ func (h *Head) Close() error {
865865
return merr.Err()
866866
}
867867

868-
// Flush closes the head and writes data to disk
868+
// Flush closes the head and writes data to disk. No ingestion requests should
869+
// be made concurrently with the call, or after it returns.
870+
// The call is thread-safe for reads.
869871
func (h *Head) Flush(ctx context.Context) error {
870872
start := time.Now()
871873
defer func() {
@@ -880,7 +882,11 @@ func (h *Head) Flush(ctx context.Context) error {
880882
}
881883

882884
func (h *Head) flush(ctx context.Context) error {
883-
if h.profiles.empty() {
885+
// Ensure all the in-flight ingestion requests have finished.
886+
// It must be guaranteed that no new inserts will happen
887+
// after the call start.
888+
h.inFlightProfiles.Wait()
889+
if len(h.profiles.slice) == 0 {
884890
level.Info(h.logger).Log("msg", "head empty - no block written")
885891
return os.RemoveAll(h.headPath)
886892
}
@@ -941,6 +947,20 @@ func (h *Head) flush(ctx context.Context) error {
941947
return err
942948
}
943949
h.metrics.blockDurationSeconds.Observe(h.meta.MaxTime.Sub(h.meta.MinTime).Seconds())
950+
return nil
951+
}
952+
953+
// Move moves the head directory to local blocks. The call is not thread-safe:
954+
// no concurrent reads and writes are allowed.
955+
//
956+
// After the call, head in-memory representation is not valid and should not
957+
// be accessed for querying.
958+
func (h *Head) Move() error {
959+
// Remove intermediate row groups before the move as they are still
960+
// referencing files on the disk.
961+
if err := h.profiles.DeleteRowGroups(); err != nil {
962+
return err
963+
}
944964

945965
// move block to the local directory
946966
if err := os.MkdirAll(filepath.Dir(h.localPath), defaultFolderMode); err != nil {

pkg/phlaredb/head_queriers.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,8 @@ type headOnDiskQuerier struct {
2424
}
2525

2626
func (q *headOnDiskQuerier) rowGroup() *rowGroupOnDisk {
27-
q.head.profiles.lock.RLock()
28-
defer q.head.profiles.lock.RUnlock()
27+
q.head.profiles.rowsLock.RLock()
28+
defer q.head.profiles.rowsLock.RUnlock()
2929
return q.head.profiles.rowGroups[q.rowGroupIdx]
3030
}
3131

@@ -119,7 +119,7 @@ func (q *headOnDiskQuerier) MergePprof(ctx context.Context, rows iter.Iterator[P
119119

120120
stacktraceSamples := profileSampleMap{}
121121

122-
if err := mergeByStacktraces(ctx, q.head.profiles.rowGroups[q.rowGroupIdx], rows, stacktraceSamples); err != nil {
122+
if err := mergeByStacktraces(ctx, q.rowGroup(), rows, stacktraceSamples); err != nil {
123123
return nil, err
124124
}
125125

@@ -132,7 +132,7 @@ func (q *headOnDiskQuerier) MergeByLabels(ctx context.Context, rows iter.Iterato
132132

133133
seriesByLabels := make(seriesByLabels)
134134

135-
if err := mergeByLabels(ctx, q.head.profiles.rowGroups[q.rowGroupIdx], rows, seriesByLabels, by...); err != nil {
135+
if err := mergeByLabels(ctx, q.rowGroup(), rows, seriesByLabels, by...); err != nil {
136136
return nil, err
137137
}
138138

0 commit comments

Comments
 (0)