Skip to content
This repository was archived by the owner on Jul 19, 2023. It is now read-only.

Commit 7565547

Browse files
committed
Add more meta informations and rewrite stacktraceIDs
1 parent 2e3aa7d commit 7565547

File tree

4 files changed

+223
-15
lines changed

4 files changed

+223
-15
lines changed

pkg/phlaredb/block_querier.go

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -435,6 +435,13 @@ func (b *singleBlockQuerier) Index() IndexReader {
435435
return b.index
436436
}
437437

438+
func (b *singleBlockQuerier) Meta() block.Meta {
439+
if b.meta == nil {
440+
return block.Meta{}
441+
}
442+
return *b.meta
443+
}
444+
438445
func (b *singleBlockQuerier) Close() error {
439446
b.openLock.Lock()
440447
defer func() {
@@ -941,9 +948,7 @@ func (b *singleBlockQuerier) SelectMatchingProfiles(ctx context.Context, params
941948
}
942949
}
943950

944-
var (
945-
buf [][]parquet.Value
946-
)
951+
var buf [][]parquet.Value
947952

948953
pIt := query.NewBinaryJoinIterator(
949954
0,

pkg/phlaredb/compact.go

Lines changed: 127 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"os"
77
"path/filepath"
88

9+
"github.com/oklog/ulid"
910
"github.com/pkg/errors"
1011
"github.com/prometheus/common/model"
1112
"github.com/prometheus/prometheus/storage"
@@ -23,20 +24,17 @@ import (
2324
)
2425

2526
type BlockReader interface {
27+
Meta() block.Meta
2628
Profiles() []parquet.RowGroup
2729
Index() IndexReader
28-
// Symbols() SymbolReader
29-
}
30-
31-
type SymbolReader interface {
32-
// todo
30+
// todo symbdb
3331
}
3432

3533
func Compact(ctx context.Context, src []BlockReader, dst string) (block.Meta, error) {
3634
if len(src) <= 1 {
3735
return block.Meta{}, errors.New("not enough blocks to compact")
3836
}
39-
meta := block.NewMeta()
37+
meta := compactedMeta(src)
4038
blockPath := filepath.Join(dst, meta.ULID.String())
4139
if err := os.MkdirAll(blockPath, 0o777); err != nil {
4240
return block.Meta{}, err
@@ -59,9 +57,9 @@ func Compact(ctx context.Context, src []BlockReader, dst string) (block.Meta, er
5957
if err != nil {
6058
return block.Meta{}, err
6159
}
62-
rowsIt = newSeriesRewriter(rowsIt, indexw)
63-
rowsIt = newSymbolsRewriter(rowsIt)
64-
reader := phlareparquet.NewIteratorRowReader(newRowsIterator(rowsIt))
60+
seriesRewriter := newSeriesRewriter(rowsIt, indexw)
61+
symbolsRewriter := newSymbolsRewriter(seriesRewriter)
62+
reader := phlareparquet.NewIteratorRowReader(newRowsIterator(symbolsRewriter))
6563

6664
total, _, err := phlareparquet.CopyAsRowGroups(profileWriter, reader, defaultParquetConfig.MaxBufferRowCount)
6765
if err != nil {
@@ -78,12 +76,61 @@ func Compact(ctx context.Context, src []BlockReader, dst string) (block.Meta, er
7876
}
7977
// todo: block meta
8078
meta.Stats.NumProfiles = total
79+
meta.Stats.NumSeries = seriesRewriter.NumSeries()
80+
meta.Stats.NumSamples = symbolsRewriter.NumSamples()
81+
8182
if _, err := meta.WriteToFile(util.Logger, blockPath); err != nil {
8283
return block.Meta{}, err
8384
}
8485
return *meta, nil
8586
}
8687

88+
func compactedMeta(src []BlockReader) *block.Meta {
89+
meta := block.NewMeta()
90+
highestCompactionLevel := 0
91+
ulids := make([]ulid.ULID, len(src))
92+
parents := make([]tsdb.BlockDesc, len(src))
93+
minTime, maxTime := model.Latest, model.Earliest
94+
labels := make(map[string]string)
95+
for _, b := range src {
96+
if b.Meta().Compaction.Level > highestCompactionLevel {
97+
highestCompactionLevel = b.Meta().Compaction.Level
98+
}
99+
ulids = append(ulids, b.Meta().ULID)
100+
parents = append(parents, tsdb.BlockDesc{
101+
ULID: b.Meta().ULID,
102+
MinTime: int64(b.Meta().MinTime),
103+
MaxTime: int64(b.Meta().MaxTime),
104+
})
105+
if b.Meta().MinTime < minTime {
106+
minTime = b.Meta().MinTime
107+
}
108+
if b.Meta().MaxTime > maxTime {
109+
maxTime = b.Meta().MaxTime
110+
}
111+
for k, v := range b.Meta().Labels {
112+
if k == block.HostnameLabel {
113+
continue
114+
}
115+
labels[k] = v
116+
}
117+
}
118+
if hostname, err := os.Hostname(); err == nil {
119+
labels[block.HostnameLabel] = hostname
120+
}
121+
meta.Source = block.CompactorSource
122+
meta.Compaction = tsdb.BlockMetaCompaction{
123+
Deletable: meta.Stats.NumSamples == 0,
124+
Level: highestCompactionLevel + 1,
125+
Sources: ulids,
126+
Parents: parents,
127+
}
128+
meta.MaxTime = maxTime
129+
meta.MinTime = minTime
130+
meta.Labels = labels
131+
return meta
132+
}
133+
87134
type profileRow struct {
88135
timeNanos int64
89136

@@ -205,14 +252,77 @@ func newMergeRowProfileIterator(src []BlockReader) (iter.Iterator[profileRow], e
205252
}, nil
206253
}
207254

255+
type noopStacktraceRewriter struct{}
256+
257+
func (noopStacktraceRewriter) RewriteStacktraces(src, dst []uint32) error {
258+
copy(dst, src)
259+
return nil
260+
}
261+
262+
type StacktraceRewriter interface {
263+
RewriteStacktraces(src, dst []uint32) error
264+
}
265+
208266
type symbolsRewriter struct {
209267
iter.Iterator[profileRow]
268+
err error
269+
270+
rewriter StacktraceRewriter
271+
src, dst []uint32
272+
numSamples uint64
210273
}
211274

212275
// todo remap symbols & ingest symbols
213276
func newSymbolsRewriter(it iter.Iterator[profileRow]) *symbolsRewriter {
214277
return &symbolsRewriter{
215278
Iterator: it,
279+
rewriter: noopStacktraceRewriter{},
280+
}
281+
}
282+
283+
func (s *symbolsRewriter) NumSamples() uint64 {
284+
return s.numSamples
285+
}
286+
287+
func (s *symbolsRewriter) Next() bool {
288+
if !s.Iterator.Next() {
289+
return false
290+
}
291+
var err error
292+
s.Iterator.At().row.ForStacktraceIDsValues(func(values []parquet.Value) {
293+
s.numSamples += uint64(len(values))
294+
s.loadStacktracesID(values)
295+
err = s.rewriter.RewriteStacktraces(s.src, s.dst)
296+
if err != nil {
297+
return
298+
}
299+
for i, v := range values {
300+
values[i] = parquet.Int64Value(int64(s.dst[i])).Level(v.RepetitionLevel(), v.DefinitionLevel(), v.Column())
301+
}
302+
})
303+
if err != nil {
304+
s.err = err
305+
return false
306+
}
307+
return true
308+
}
309+
310+
func (s *symbolsRewriter) Err() error {
311+
if s.err != nil {
312+
return s.err
313+
}
314+
return s.Iterator.Err()
315+
}
316+
317+
func (s *symbolsRewriter) loadStacktracesID(values []parquet.Value) {
318+
if cap(s.src) < len(values) {
319+
s.src = make([]uint32, len(values)*2)
320+
s.dst = make([]uint32, len(values)*2)
321+
}
322+
s.src = s.src[:len(values)]
323+
s.dst = s.dst[:len(values)]
324+
for i := range values {
325+
s.src[i] = values[i].Uint32()
216326
}
217327
}
218328

@@ -226,6 +336,8 @@ type seriesRewriter struct {
226336
previousFp model.Fingerprint
227337
currentChunkMeta index.ChunkMeta
228338
err error
339+
340+
numSeries uint64
229341
}
230342

231343
func newSeriesRewriter(it iter.Iterator[profileRow], indexw *index.Writer) *seriesRewriter {
@@ -235,13 +347,18 @@ func newSeriesRewriter(it iter.Iterator[profileRow], indexw *index.Writer) *seri
235347
}
236348
}
237349

350+
func (s *seriesRewriter) NumSeries() uint64 {
351+
return s.numSeries
352+
}
353+
238354
func (s *seriesRewriter) Next() bool {
239355
if !s.Iterator.Next() {
240356
if s.previousFp != 0 {
241357
if err := s.indexw.AddSeries(s.seriesRef, s.labels, s.previousFp, s.currentChunkMeta); err != nil {
242358
s.err = err
243359
return false
244360
}
361+
s.numSeries++
245362
}
246363
return false
247364
}
@@ -253,6 +370,7 @@ func (s *seriesRewriter) Next() bool {
253370
s.err = err
254371
return false
255372
}
373+
s.numSeries++
256374
}
257375
s.seriesRef++
258376
s.labels = currentProfile.labels.Clone()

pkg/phlaredb/schemas/v1/profiles.go

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,9 +42,10 @@ var (
4242
phlareparquet.NewGroupField("DefaultSampleType", parquet.Optional(parquet.Int(64))),
4343
})
4444

45-
maxProfileRow parquet.Row
46-
seriesIndexColIndex int
47-
timeNanoColIndex int
45+
maxProfileRow parquet.Row
46+
seriesIndexColIndex int
47+
stacktraceIDColIndex int
48+
timeNanoColIndex int
4849
)
4950

5051
func init() {
@@ -62,6 +63,11 @@ func init() {
6263
panic(fmt.Errorf("TimeNanos column not found"))
6364
}
6465
timeNanoColIndex = timeCol.ColumnIndex
66+
stacktraceIDCol, ok := profilesSchema.Lookup("Samples", "list", "element", "StacktraceID")
67+
if !ok {
68+
panic(fmt.Errorf("StacktraceID column not found"))
69+
}
70+
stacktraceIDColIndex = stacktraceIDCol.ColumnIndex
6571
}
6672

6773
type Sample struct {
@@ -479,3 +485,22 @@ func (p ProfileRow) TimeNanos() int64 {
479485
func (p ProfileRow) SetSeriesIndex(v uint32) {
480486
p[seriesIndexColIndex] = parquet.Int32Value(int32(v)).Level(0, 0, seriesIndexColIndex)
481487
}
488+
489+
func (p ProfileRow) ForStacktraceIDsValues(fn func([]parquet.Value)) {
490+
start := -1
491+
var i int
492+
for i = 0; i < len(p); i++ {
493+
col := p[i].Column()
494+
if col == stacktraceIDColIndex && p[i].DefinitionLevel() == 1 {
495+
if start == -1 {
496+
start = i
497+
}
498+
}
499+
if col > stacktraceIDColIndex {
500+
break
501+
}
502+
}
503+
if start != -1 {
504+
fn(p[start:i])
505+
}
506+
}

pkg/phlaredb/schemas/v1/profiles_test.go

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -207,6 +207,66 @@ func TestLessProfileRows(t *testing.T) {
207207
}
208208
}
209209

210+
func TestProfileRowStacktraceIDs(t *testing.T) {
211+
for _, tc := range []struct {
212+
name string
213+
expected []uint32
214+
profile InMemoryProfile
215+
}{
216+
{"empty", nil, InMemoryProfile{}},
217+
{"one sample", []uint32{1}, InMemoryProfile{
218+
SeriesIndex: 1,
219+
StacktracePartition: 2,
220+
TotalValue: 3,
221+
Samples: Samples{StacktraceIDs: []uint32{1}, Values: []uint64{1}},
222+
}},
223+
{"many", []uint32{1, 1, 2, 3, 4}, InMemoryProfile{
224+
SeriesIndex: 1,
225+
StacktracePartition: 2,
226+
TotalValue: 3,
227+
Samples: Samples{
228+
StacktraceIDs: []uint32{1, 1, 2, 3, 4},
229+
Values: []uint64{4, 2, 4, 5, 2},
230+
},
231+
}},
232+
} {
233+
tc := tc
234+
t.Run(tc.name, func(t *testing.T) {
235+
rows := generateProfileRow([]InMemoryProfile{tc.profile})
236+
var ids []uint32
237+
ProfileRow(rows[0]).ForStacktraceIDsValues(func(values []parquet.Value) {
238+
for _, v := range values {
239+
ids = append(ids, v.Uint32())
240+
}
241+
})
242+
require.Equal(t, tc.expected, ids)
243+
})
244+
}
245+
}
246+
247+
func TestProfileRowMutateValues(t *testing.T) {
248+
row := ProfileRow(generateProfileRow([]InMemoryProfile{
249+
{
250+
Samples: Samples{
251+
StacktraceIDs: []uint32{1, 1, 2, 3, 4},
252+
Values: []uint64{4, 2, 4, 5, 2},
253+
},
254+
},
255+
})[0])
256+
row.ForStacktraceIDsValues(func(values []parquet.Value) {
257+
for i := range values {
258+
values[i] = parquet.Int32Value(1).Level(0, 1, values[i].Column())
259+
}
260+
})
261+
var ids []uint32
262+
row.ForStacktraceIDsValues(func(values []parquet.Value) {
263+
for _, v := range values {
264+
ids = append(ids, v.Uint32())
265+
}
266+
})
267+
require.Equal(t, []uint32{1, 1, 1, 1, 1}, ids)
268+
}
269+
210270
func BenchmarkProfileRows(b *testing.B) {
211271
a := generateProfileRow([]InMemoryProfile{{SeriesIndex: 1, TimeNanos: 1}})[0]
212272
a1 := generateProfileRow([]InMemoryProfile{{SeriesIndex: 1, TimeNanos: 2}})[0]

0 commit comments

Comments
 (0)