Skip to content

Commit 5b5000c

Browse files
craig[bot]spilchen
andcommitted
Merge #156775
156775: bulkio: add SST writer and file allocator r=spilchen a=spilchen Implements bulksst.Writer and FileAllocator to flush sorted KVs into SSTs on local or external storage and track per-file metadata (key range, size, sample key). This is to be used for distributed merge. Much of the code here is taken from: https://github.com/jeffswenson/cockroach/tree/feature-distributed-merge. Resolves: #156571 Epic: CRDB-48845 Release note (none): none Co-authored by: `@jeffswenson,` `@fqazi` Co-authored-by: Matt Spilchen <matt.spilchen@cockroachlabs.com>
2 parents 8f19f41 + bb93844 commit 5b5000c

File tree

9 files changed

+960
-0
lines changed

9 files changed

+960
-0
lines changed

pkg/BUILD.bazel

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -396,6 +396,7 @@ ALL_TESTS = [
396396
"//pkg/sql/appstatspb:appstatspb_test",
397397
"//pkg/sql/auditlogging:auditlogging_test",
398398
"//pkg/sql/backfill:backfill_test",
399+
"//pkg/sql/bulksst:bulksst_test",
399400
"//pkg/sql/bulkutil:bulkutil_test",
400401
"//pkg/sql/cacheutil:cacheutil_test",
401402
"//pkg/sql/catalog/bootstrap:bootstrap_test",
@@ -1880,6 +1881,8 @@ GO_TARGETS = [
18801881
"//pkg/sql/auditlogging:auditlogging_test",
18811882
"//pkg/sql/backfill:backfill",
18821883
"//pkg/sql/backfill:backfill_test",
1884+
"//pkg/sql/bulksst:bulksst",
1885+
"//pkg/sql/bulksst:bulksst_test",
18831886
"//pkg/sql/bulkutil:bulkutil",
18841887
"//pkg/sql/bulkutil:bulkutil_test",
18851888
"//pkg/sql/cacheutil:cacheutil",

pkg/gen/protobuf.bzl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ PROTOBUF_SRCS = [
5555
"//pkg/server/status/statuspb:statuspb_go_proto",
5656
"//pkg/settings:settings_go_proto",
5757
"//pkg/sql/appstatspb:appstatspb_go_proto",
58+
"//pkg/sql/bulksst:bulksst_go_proto",
5859
"//pkg/sql/catalog/catenumpb:catenumpb_go_proto",
5960
"//pkg/sql/catalog/catpb:catpb_go_proto",
6061
"//pkg/sql/catalog/descpb:descpb_go_proto",

pkg/sql/bulksst/BUILD.bazel

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
2+
load("@rules_proto//proto:defs.bzl", "proto_library")
3+
load("@io_bazel_rules_go//proto:def.bzl", "go_proto_library")
4+
5+
proto_library(
6+
name = "bulksst_proto",
7+
srcs = ["sst_info.proto"],
8+
strip_import_prefix = "/pkg",
9+
visibility = ["//visibility:public"],
10+
deps = ["@com_github_gogo_protobuf//gogoproto:gogo_proto"],
11+
)
12+
13+
go_proto_library(
14+
name = "bulksst_go_proto",
15+
compilers = ["//pkg/cmd/protoc-gen-gogoroach:protoc-gen-gogoroach_compiler"],
16+
importpath = "github.com/cockroachdb/cockroach/pkg/sql/bulksst",
17+
proto = ":bulksst_proto",
18+
visibility = ["//visibility:public"],
19+
deps = ["@com_github_gogo_protobuf//gogoproto"],
20+
)
21+
22+
go_library(
23+
name = "bulksst",
24+
srcs = [
25+
"sst_file_allocator.go",
26+
"sst_writer.go",
27+
],
28+
embed = [":bulksst_go_proto"],
29+
importpath = "github.com/cockroachdb/cockroach/pkg/sql/bulksst",
30+
visibility = ["//visibility:public"],
31+
deps = [
32+
"//pkg/cloud",
33+
"//pkg/kv/kvpb",
34+
"//pkg/kv/kvserver/kvserverbase",
35+
"//pkg/roachpb",
36+
"//pkg/settings",
37+
"//pkg/settings/cluster",
38+
"//pkg/storage",
39+
"//pkg/util/hlc",
40+
"//pkg/util/log",
41+
"//pkg/util/randutil",
42+
"//pkg/util/timeutil",
43+
"@com_github_cockroachdb_pebble//objstorage",
44+
"@com_github_cockroachdb_pebble//objstorage/objstorageprovider",
45+
],
46+
)
47+
48+
go_test(
49+
name = "bulksst_test",
50+
srcs = [
51+
"main_test.go",
52+
"sst_file_allocator_test.go",
53+
"sst_writer_test.go",
54+
],
55+
embed = [":bulksst"],
56+
deps = [
57+
"//pkg/base",
58+
"//pkg/cloud/cloudpb",
59+
"//pkg/cloud/impl:cloudimpl",
60+
"//pkg/cloud/nodelocal",
61+
"//pkg/kv/kvpb",
62+
"//pkg/roachpb",
63+
"//pkg/security/securityassets",
64+
"//pkg/security/securitytest",
65+
"//pkg/server",
66+
"//pkg/settings/cluster",
67+
"//pkg/storage",
68+
"//pkg/testutils/serverutils",
69+
"//pkg/testutils/testcluster",
70+
"//pkg/util/hlc",
71+
"//pkg/util/intsets",
72+
"//pkg/util/leaktest",
73+
"//pkg/util/log",
74+
"//pkg/util/randutil",
75+
"@com_github_cockroachdb_pebble//objstorage",
76+
"@com_github_cockroachdb_pebble//sstable",
77+
"@com_github_cockroachdb_pebble//vfs",
78+
"@com_github_stretchr_testify//require",
79+
],
80+
)

pkg/sql/bulksst/main_test.go

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
// Copyright 2025 The Cockroach Authors.
2+
//
3+
// Use of this software is governed by the CockroachDB Software License
4+
// included in the /LICENSE file.
5+
6+
package bulksst_test
7+
8+
import (
9+
"os"
10+
"testing"
11+
12+
"github.com/cockroachdb/cockroach/pkg/security/securityassets"
13+
"github.com/cockroachdb/cockroach/pkg/security/securitytest"
14+
"github.com/cockroachdb/cockroach/pkg/server"
15+
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
16+
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
17+
"github.com/cockroachdb/cockroach/pkg/util/randutil"
18+
)
19+
20+
//go:generate ../../util/leaktest/add-leaktest.sh *_test.go
21+
22+
func init() {
23+
securityassets.SetLoader(securitytest.EmbeddedAssets)
24+
}
25+
func TestMain(m *testing.M) {
26+
randutil.SeedForTests()
27+
serverutils.InitTestServerFactory(server.TestServerFactory)
28+
serverutils.InitTestClusterFactory(testcluster.TestClusterFactory)
29+
30+
code := m.Run()
31+
32+
os.Exit(code)
33+
}
Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
// Copyright 2025 The Cockroach Authors.
2+
//
3+
// Use of this software is governed by the CockroachDB Software License
4+
// included in the /LICENSE file.
5+
6+
package bulksst
7+
8+
import (
9+
"context"
10+
"fmt"
11+
"math/rand"
12+
13+
"github.com/cockroachdb/cockroach/pkg/cloud"
14+
"github.com/cockroachdb/cockroach/pkg/roachpb"
15+
"github.com/cockroachdb/cockroach/pkg/util/hlc"
16+
"github.com/cockroachdb/cockroach/pkg/util/randutil"
17+
"github.com/cockroachdb/pebble/objstorage"
18+
"github.com/cockroachdb/pebble/objstorage/objstorageprovider"
19+
)
20+
21+
const (
22+
maxRowSamples = 1024
23+
)
24+
25+
// FileAllocator is used to allocate new files for SSTs ingested via the Writer.
26+
type FileAllocator interface {
27+
// AddFile creates a new file and returns the objstorage.Writable and URI for
28+
// tracking. The allocator generates a unique filename internally. The caller
29+
// is responsible for closing the returned objstorage.Writable.
30+
AddFile(ctx context.Context) (objstorage.Writable, string, error)
31+
32+
// CommitFile records metadata for a successfully written SST file.
33+
CommitFile(uri string, span roachpb.Span, rowSample roachpb.Key, fileSize uint64)
34+
35+
// GetFileList gets all the files created by this file allocator.
36+
GetFileList() *SSTFiles
37+
}
38+
39+
// fileAllocatorBase helps track metadata for created SST files.
40+
type fileAllocatorBase struct {
41+
fileInfo SSTFiles
42+
rowSampleRand *rand.Rand
43+
totalRowSamples int
44+
}
45+
46+
// GetFileList gets all the files created by this file allocator.
47+
func (f *fileAllocatorBase) GetFileList() *SSTFiles {
48+
return &f.fileInfo
49+
}
50+
51+
// addFile helps track metadata for created SST files.
52+
func (f *fileAllocatorBase) addFile(
53+
uri string, span roachpb.Span, rowSample roachpb.Key, fileSize uint64,
54+
) {
55+
f.fileInfo.SST = append(f.fileInfo.SST, &SSTFileInfo{
56+
URI: uri,
57+
StartKey: span.Key,
58+
EndKey: span.EndKey,
59+
FileSize: fileSize,
60+
})
61+
f.fileInfo.TotalSize += fileSize
62+
f.recordRowSample(rowSample)
63+
}
64+
65+
func (f *fileAllocatorBase) recordRowSample(rowSample roachpb.Key) {
66+
if len(rowSample) == 0 {
67+
return
68+
}
69+
f.totalRowSamples++
70+
if len(f.fileInfo.RowSamples) < maxRowSamples {
71+
f.fileInfo.RowSamples = append(f.fileInfo.RowSamples, string(rowSample))
72+
return
73+
}
74+
if f.rowSampleRand == nil {
75+
rng, _ := randutil.NewLockedPseudoRand()
76+
f.rowSampleRand = rng
77+
}
78+
// Reservoir sampling: replace an existing sample with probability maxRowSamples / totalRowSamples.
79+
idx := f.rowSampleRand.Intn(f.totalRowSamples)
80+
if idx < maxRowSamples {
81+
f.fileInfo.RowSamples[idx] = string(rowSample)
82+
}
83+
}
84+
85+
// ExternalFileAllocator allocates external files for SSTs.
86+
type ExternalFileAllocator struct {
87+
es cloud.ExternalStorage
88+
baseURI string
89+
clock *hlc.Clock
90+
fileAllocatorBase
91+
}
92+
93+
func NewExternalFileAllocator(
94+
es cloud.ExternalStorage, baseURI string, clock *hlc.Clock,
95+
) FileAllocator {
96+
return &ExternalFileAllocator{
97+
es: es,
98+
baseURI: baseURI,
99+
clock: clock,
100+
fileAllocatorBase: fileAllocatorBase{},
101+
}
102+
}
103+
104+
// AddFile creates a new file with an HLC timestamp-based unique name.
105+
func (e *ExternalFileAllocator) AddFile(ctx context.Context) (objstorage.Writable, string, error) {
106+
// Use HLC timestamp for unique filename generation.
107+
ts := e.clock.Now()
108+
fileName := fmt.Sprintf("%d-%d.sst", ts.WallTime, ts.Logical)
109+
writer, err := e.es.Writer(ctx, fileName)
110+
if err != nil {
111+
return nil, "", err
112+
}
113+
remoteWritable := objstorageprovider.NewRemoteWritable(writer)
114+
return remoteWritable, e.baseURI + fileName, nil
115+
}
116+
117+
// CommitFile records metadata for a successfully written SST file.
118+
func (e *ExternalFileAllocator) CommitFile(
119+
uri string, span roachpb.Span, rowSample roachpb.Key, fileSize uint64,
120+
) {
121+
e.fileAllocatorBase.addFile(uri, span, rowSample, fileSize)
122+
}
Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
// Copyright 2025 The Cockroach Authors.
2+
//
3+
// Use of this software is governed by the CockroachDB Software License
4+
// included in the /LICENSE file.
5+
6+
package bulksst
7+
8+
import (
9+
"context"
10+
"errors"
11+
"fmt"
12+
"testing"
13+
14+
"github.com/cockroachdb/cockroach/pkg/roachpb"
15+
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
16+
"github.com/cockroachdb/cockroach/pkg/storage"
17+
"github.com/cockroachdb/cockroach/pkg/util/hlc"
18+
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
19+
"github.com/cockroachdb/cockroach/pkg/util/log"
20+
"github.com/cockroachdb/pebble/objstorage"
21+
"github.com/stretchr/testify/require"
22+
)
23+
24+
func TestRowSampleReservoirCapsSamples(t *testing.T) {
25+
defer leaktest.AfterTest(t)()
26+
defer log.Scope(t).Close(t)
27+
28+
var base fileAllocatorBase
29+
expected := make(map[string]struct{})
30+
total := maxRowSamples*2 + 17
31+
for i := 0; i < total; i++ {
32+
key := roachpb.Key(fmt.Sprintf("key-%04d", i))
33+
expected[string(key)] = struct{}{}
34+
span := roachpb.Span{Key: key, EndKey: key.Next()}
35+
base.addFile(fmt.Sprintf("uri-%d", i), span, key, 1)
36+
}
37+
38+
require.Equal(t, total, base.totalRowSamples)
39+
require.Len(t, base.fileInfo.RowSamples, maxRowSamples)
40+
41+
for _, sample := range base.fileInfo.RowSamples {
42+
_, ok := expected[sample]
43+
require.True(t, ok, "sample %q must correspond to ingested keys", sample)
44+
}
45+
}
46+
47+
func TestRowSampleSkipsEmptyKeys(t *testing.T) {
48+
defer leaktest.AfterTest(t)()
49+
defer log.Scope(t).Close(t)
50+
51+
var base fileAllocatorBase
52+
key := roachpb.Key("some-key")
53+
span := roachpb.Span{Key: key, EndKey: key.Next()}
54+
55+
base.addFile("uri-1", span, key, 1)
56+
base.addFile("uri-2", span, nil, 1)
57+
58+
require.Len(t, base.fileInfo.RowSamples, 1)
59+
require.Equal(t, string(key), base.fileInfo.RowSamples[0])
60+
}
61+
62+
func TestWriterFlushFailureDoesNotPersistMetadata(t *testing.T) {
63+
defer leaktest.AfterTest(t)()
64+
defer log.Scope(t).Close(t)
65+
66+
ctx := context.Background()
67+
st := cluster.MakeTestingClusterSettings()
68+
failErr := errors.New("finish failed")
69+
allocator := &mockFailingAllocator{finishErr: failErr}
70+
71+
writer := NewUnsortedSSTBatcher(st, allocator)
72+
ts := hlc.Timestamp{WallTime: 1}
73+
require.NoError(t, writer.AddMVCCKey(ctx, storage.MVCCKey{Key: roachpb.Key("k"), Timestamp: ts}, []byte("v")))
74+
75+
err := writer.Flush(ctx)
76+
require.ErrorIs(t, err, failErr)
77+
78+
fileList := allocator.GetFileList()
79+
require.Empty(t, fileList.SST, "no files should be tracked after a failed flush")
80+
require.Zero(t, fileList.TotalSize)
81+
require.Empty(t, fileList.RowSamples)
82+
}
83+
84+
type mockFailingAllocator struct {
85+
fileAllocatorBase
86+
finishErr error
87+
nextID int
88+
}
89+
90+
func (m *mockFailingAllocator) AddFile(ctx context.Context) (objstorage.Writable, string, error) {
91+
uri := fmt.Sprintf("mock://%d", m.nextID)
92+
m.nextID++
93+
w := &failingWritable{finishErr: m.finishErr}
94+
return w, uri, nil
95+
}
96+
97+
func (m *mockFailingAllocator) CommitFile(
98+
uri string, span roachpb.Span, rowSample roachpb.Key, fileSize uint64,
99+
) {
100+
m.fileAllocatorBase.addFile(uri, span, rowSample, fileSize)
101+
}
102+
103+
type failingWritable struct {
104+
finishErr error
105+
}
106+
107+
func (f *failingWritable) Write(p []byte) error {
108+
return nil
109+
}
110+
111+
func (f *failingWritable) Finish() error {
112+
return f.finishErr
113+
}
114+
115+
func (f *failingWritable) Abort() {}

0 commit comments

Comments
 (0)