Skip to content

Commit 15eb433

Browse files
committed
bulkutil: add utiliy to manage and reuse ExternalStorage instances
Adds a new ExternalStorageMux utility that manages and caches cloud.ExternalStorage instances keyed by their base URI (e.g. nodelocal://1/, s3://bucket/). The mux provides a lightweight layer that allows the processors for distribute merge to reuse existing storage connections instead of creating a new ExternalStorage for every file operation. Each ExternalStorageMux maintains a per-processor cache of open storage handles. If a request is made for a URI with a previously seen base prefix, the mux returns the cached handle; otherwise, it lazily creates and tracks a new one. Closes #156587 Epic: CRDB-48845 Release note: none
1 parent 00ea203 commit 15eb433

File tree

5 files changed

+382
-0
lines changed

5 files changed

+382
-0
lines changed

pkg/BUILD.bazel

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -396,6 +396,7 @@ ALL_TESTS = [
396396
"//pkg/sql/appstatspb:appstatspb_test",
397397
"//pkg/sql/auditlogging:auditlogging_test",
398398
"//pkg/sql/backfill:backfill_test",
399+
"//pkg/sql/bulkutil:bulkutil_test",
399400
"//pkg/sql/cacheutil:cacheutil_test",
400401
"//pkg/sql/catalog/bootstrap:bootstrap_test",
401402
"//pkg/sql/catalog/catalogkeys:catalogkeys_test",
@@ -1878,6 +1879,8 @@ GO_TARGETS = [
18781879
"//pkg/sql/auditlogging:auditlogging_test",
18791880
"//pkg/sql/backfill:backfill",
18801881
"//pkg/sql/backfill:backfill_test",
1882+
"//pkg/sql/bulkutil:bulkutil",
1883+
"//pkg/sql/bulkutil:bulkutil_test",
18811884
"//pkg/sql/cacheutil:cacheutil",
18821885
"//pkg/sql/cacheutil:cacheutil_test",
18831886
"//pkg/sql/catalog/bootstrap:bootstrap",

pkg/sql/bulkutil/BUILD.bazel

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
2+
3+
go_library(
4+
name = "bulkutil",
5+
srcs = ["external_storage_mux.go"],
6+
importpath = "github.com/cockroachdb/cockroach/pkg/sql/bulkutil",
7+
visibility = ["//visibility:public"],
8+
deps = [
9+
"//pkg/ccl/storageccl",
10+
"//pkg/cloud",
11+
"//pkg/security/username",
12+
"@com_github_cockroachdb_errors//:errors",
13+
],
14+
)
15+
16+
go_test(
17+
name = "bulkutil_test",
18+
srcs = [
19+
"external_storage_mux_test.go",
20+
"main_test.go",
21+
],
22+
embed = [":bulkutil"],
23+
deps = [
24+
"//pkg/cloud",
25+
"//pkg/security/username",
26+
"//pkg/util/leaktest",
27+
"//pkg/util/randutil",
28+
"@com_github_stretchr_testify//require",
29+
],
30+
)
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
// Copyright 2025 The Cockroach Authors.
2+
//
3+
// Use of this software is governed by the CockroachDB Software License
4+
// included in the /LICENSE file.
5+
6+
package bulkutil
7+
8+
import (
9+
"context"
10+
"net/url"
11+
12+
"github.com/cockroachdb/cockroach/pkg/ccl/storageccl"
13+
"github.com/cockroachdb/cockroach/pkg/cloud"
14+
"github.com/cockroachdb/cockroach/pkg/security/username"
15+
"github.com/cockroachdb/errors"
16+
)
17+
18+
// ExternalStorageMux is a utility for managing multiple cloud storage instances.
19+
// The main motivator for this is each node has its own nodelocal://<node-id>
20+
// instance.
21+
type ExternalStorageMux struct {
22+
factory cloud.ExternalStorageFromURIFactory
23+
storeInstances map[string]cloud.ExternalStorage
24+
user username.SQLUsername
25+
}
26+
27+
// NewExternalStorageMux creates a new ExternalStorageMux that caches external storage
28+
// instances. This is particularly useful for nodelocal storage where each node
29+
// has its own storage instance (nodelocal://1/, nodelocal://2/, etc.).
30+
func NewExternalStorageMux(
31+
factory cloud.ExternalStorageFromURIFactory, user username.SQLUsername,
32+
) *ExternalStorageMux {
33+
return &ExternalStorageMux{
34+
factory: factory,
35+
storeInstances: make(map[string]cloud.ExternalStorage),
36+
user: user,
37+
}
38+
}
39+
40+
// Close closes all cached storage instances.
41+
func (c *ExternalStorageMux) Close() error {
42+
var err error
43+
for _, store := range c.storeInstances {
44+
err = errors.CombineErrors(err, store.Close())
45+
}
46+
return err
47+
}
48+
49+
// StoreFile splits a URI into its storage prefix and file path, caching the
50+
// storage instance for reuse. For example, "nodelocal://1/import/123/file.sst"
51+
// is split into storage "nodelocal://1" and path "/import/123/file.sst".
52+
func (c *ExternalStorageMux) StoreFile(
53+
ctx context.Context, uri string,
54+
) (storageccl.StoreFile, error) {
55+
prefix, filepath, err := c.splitURI(uri)
56+
if err != nil {
57+
return storageccl.StoreFile{}, err
58+
}
59+
prefixKey := prefix.String()
60+
store, ok := c.storeInstances[prefixKey]
61+
if !ok {
62+
storage, err := c.factory(ctx, prefix.String(), c.user)
63+
if err != nil {
64+
return storageccl.StoreFile{}, err
65+
}
66+
c.storeInstances[prefixKey] = storage
67+
store = storage
68+
}
69+
return storageccl.StoreFile{
70+
Store: store,
71+
FilePath: filepath,
72+
}, nil
73+
}
74+
75+
// splitURI splits a URI into its prefix (scheme + host) and path components.
76+
// For example, "nodelocal://1/import/123/file.sst" becomes:
77+
// - prefix: url.URL{Scheme: "nodelocal", Host: "1"}
78+
// - path: "/import/123/file.sst"
79+
func (c *ExternalStorageMux) splitURI(uri string) (url.URL, string, error) {
80+
parsed, err := url.Parse(uri)
81+
if err != nil {
82+
return url.URL{}, "", errors.Wrap(err, "failed to parse external storage uri")
83+
}
84+
85+
path := parsed.Path
86+
parsed.Path = ""
87+
88+
return *parsed, path, nil
89+
}
Lines changed: 240 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,240 @@
1+
// Copyright 2025 The Cockroach Authors.
2+
//
3+
// Use of this software is governed by the CockroachDB Software License
4+
// included in the /LICENSE file.
5+
6+
package bulkutil
7+
8+
import (
9+
"context"
10+
"testing"
11+
12+
"github.com/cockroachdb/cockroach/pkg/cloud"
13+
"github.com/cockroachdb/cockroach/pkg/security/username"
14+
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
15+
"github.com/stretchr/testify/require"
16+
)
17+
18+
func TestExternalStorageMux_splitURI(t *testing.T) {
19+
defer leaktest.AfterTest(t)()
20+
21+
mux := &ExternalStorageMux{
22+
storeInstances: make(map[string]cloud.ExternalStorage),
23+
}
24+
25+
t.Run("splits nodelocal URI correctly", func(t *testing.T) {
26+
prefix, path, err := mux.splitURI("nodelocal://1/import/123/file.sst")
27+
require.NoError(t, err)
28+
require.Equal(t, "nodelocal", prefix.Scheme)
29+
require.Equal(t, "1", prefix.Host)
30+
require.Equal(t, "", prefix.Path)
31+
require.Equal(t, "/import/123/file.sst", path)
32+
})
33+
34+
t.Run("splits s3 URI correctly", func(t *testing.T) {
35+
prefix, path, err := mux.splitURI("s3://bucket/path/to/file.sst")
36+
require.NoError(t, err)
37+
require.Equal(t, "s3", prefix.Scheme)
38+
require.Equal(t, "bucket", prefix.Host)
39+
require.Equal(t, "", prefix.Path)
40+
require.Equal(t, "/path/to/file.sst", path)
41+
})
42+
43+
t.Run("handles URI with query parameters", func(t *testing.T) {
44+
prefix, path, err := mux.splitURI("nodelocal://1/import/file.sst?param=value")
45+
require.NoError(t, err)
46+
require.Equal(t, "nodelocal", prefix.Scheme)
47+
require.Equal(t, "1", prefix.Host)
48+
require.Equal(t, "param=value", prefix.RawQuery)
49+
require.Equal(t, "/import/file.sst", path)
50+
})
51+
52+
t.Run("handles empty path", func(t *testing.T) {
53+
prefix, path, err := mux.splitURI("nodelocal://1")
54+
require.NoError(t, err)
55+
require.Equal(t, "nodelocal", prefix.Scheme)
56+
require.Equal(t, "1", prefix.Host)
57+
require.Equal(t, "", path)
58+
})
59+
60+
t.Run("handles root path", func(t *testing.T) {
61+
prefix, path, err := mux.splitURI("nodelocal://1/")
62+
require.NoError(t, err)
63+
require.Equal(t, "nodelocal", prefix.Scheme)
64+
require.Equal(t, "1", prefix.Host)
65+
require.Equal(t, "/", path)
66+
})
67+
68+
t.Run("returns error for invalid URI", func(t *testing.T) {
69+
_, _, err := mux.splitURI("://invalid")
70+
require.Error(t, err)
71+
require.ErrorContains(t, err, "failed to parse external storage uri")
72+
})
73+
}
74+
75+
// mockExternalStorage is a minimal mock that implements just Close for testing.
76+
// We embed cloud.ExternalStorage to satisfy the interface, but only implement Close.
77+
type mockExternalStorage struct {
78+
cloud.ExternalStorage // embed to get default nil implementations
79+
uri string
80+
closed bool
81+
}
82+
83+
func (m *mockExternalStorage) Close() error {
84+
m.closed = true
85+
return nil
86+
}
87+
88+
func TestExternalStorageMux_StoreFile(t *testing.T) {
89+
defer leaktest.AfterTest(t)()
90+
91+
ctx := context.Background()
92+
93+
t.Run("returns correct StoreFile components", func(t *testing.T) {
94+
callCount := 0
95+
factory := func(ctx context.Context, uri string, user username.SQLUsername, opts ...cloud.ExternalStorageOption) (cloud.ExternalStorage, error) {
96+
callCount++
97+
return &mockExternalStorage{uri: uri}, nil
98+
}
99+
100+
mux := NewExternalStorageMux(factory, username.RootUserName())
101+
defer func() {
102+
require.NoError(t, mux.Close())
103+
}()
104+
105+
storeFile, err := mux.StoreFile(ctx, "nodelocal://1/import/123/file.sst")
106+
require.NoError(t, err)
107+
require.NotNil(t, storeFile.Store)
108+
require.Equal(t, "/import/123/file.sst", storeFile.FilePath)
109+
require.Equal(t, 1, callCount, "factory should be called once")
110+
})
111+
112+
t.Run("caches storage instances by prefix", func(t *testing.T) {
113+
callCount := 0
114+
var createdURIs []string
115+
factory := func(ctx context.Context, uri string, user username.SQLUsername, opts ...cloud.ExternalStorageOption) (cloud.ExternalStorage, error) {
116+
callCount++
117+
createdURIs = append(createdURIs, uri)
118+
return &mockExternalStorage{uri: uri}, nil
119+
}
120+
121+
mux := NewExternalStorageMux(factory, username.RootUserName())
122+
defer func() {
123+
require.NoError(t, mux.Close())
124+
}()
125+
126+
// First file on node 1
127+
storeFile1, err := mux.StoreFile(ctx, "nodelocal://1/import/123/file1.sst")
128+
require.NoError(t, err)
129+
require.Equal(t, "/import/123/file1.sst", storeFile1.FilePath)
130+
131+
// Second file on node 1 - should reuse cached instance
132+
storeFile2, err := mux.StoreFile(ctx, "nodelocal://1/import/456/file2.sst")
133+
require.NoError(t, err)
134+
require.Equal(t, "/import/456/file2.sst", storeFile2.FilePath)
135+
136+
// Verify same storage instance is reused
137+
require.Same(t, storeFile1.Store, storeFile2.Store, "should reuse cached storage instance")
138+
require.Equal(t, 1, callCount, "factory should only be called once for same prefix")
139+
require.Equal(t, []string{"nodelocal://1"}, createdURIs)
140+
})
141+
142+
t.Run("authenticated URIs should reuse cached instances", func(t *testing.T) {
143+
callCount := 0
144+
factory := func(ctx context.Context, uri string, user username.SQLUsername, opts ...cloud.ExternalStorageOption) (cloud.ExternalStorage, error) {
145+
callCount++
146+
return &mockExternalStorage{uri: uri}, nil
147+
}
148+
149+
mux := NewExternalStorageMux(factory, username.RootUserName())
150+
defer func() {
151+
require.NoError(t, mux.Close())
152+
}()
153+
154+
first, err := mux.StoreFile(ctx, "s3://user:password@bucket/path/to/file1.sst")
155+
require.NoError(t, err)
156+
157+
second, err := mux.StoreFile(ctx, "s3://user:password@bucket/path/to/file2.sst")
158+
require.NoError(t, err)
159+
160+
require.Equal(t, 1, callCount, "factory should only be called once for authenticated URIs")
161+
require.Same(t, first.Store, second.Store, "authenticated URIs should reuse the same storage instance")
162+
})
163+
164+
t.Run("creates separate instances for different prefixes", func(t *testing.T) {
165+
var createdURIs []string
166+
factory := func(ctx context.Context, uri string, user username.SQLUsername, opts ...cloud.ExternalStorageOption) (cloud.ExternalStorage, error) {
167+
createdURIs = append(createdURIs, uri)
168+
return &mockExternalStorage{uri: uri}, nil
169+
}
170+
171+
mux := NewExternalStorageMux(factory, username.RootUserName())
172+
defer func() {
173+
require.NoError(t, mux.Close())
174+
}()
175+
176+
// Files on different nodes
177+
storeFile1, err := mux.StoreFile(ctx, "nodelocal://1/import/file.sst")
178+
require.NoError(t, err)
179+
180+
storeFile2, err := mux.StoreFile(ctx, "nodelocal://2/import/file.sst")
181+
require.NoError(t, err)
182+
183+
storeFile3, err := mux.StoreFile(ctx, "s3://bucket/file.sst")
184+
require.NoError(t, err)
185+
186+
// Verify different instances
187+
require.NotSame(t, storeFile1.Store, storeFile2.Store, "different nodes should have different instances")
188+
require.NotSame(t, storeFile1.Store, storeFile3.Store, "different schemes should have different instances")
189+
require.NotSame(t, storeFile2.Store, storeFile3.Store, "different schemes should have different instances")
190+
191+
// Verify all three prefixes were created
192+
require.ElementsMatch(t, []string{"nodelocal://1", "nodelocal://2", "s3://bucket"}, createdURIs)
193+
require.Len(t, mux.storeInstances, 3, "should have three cached instances")
194+
})
195+
196+
t.Run("propagates factory errors", func(t *testing.T) {
197+
expectedErr := cloud.ErrListingUnsupported
198+
factory := func(ctx context.Context, uri string, user username.SQLUsername, opts ...cloud.ExternalStorageOption) (cloud.ExternalStorage, error) {
199+
return nil, expectedErr
200+
}
201+
202+
mux := NewExternalStorageMux(factory, username.RootUserName())
203+
defer func() {
204+
_ = mux.Close()
205+
}()
206+
207+
_, err := mux.StoreFile(ctx, "nodelocal://1/file.sst")
208+
require.Error(t, err)
209+
require.ErrorIs(t, err, expectedErr)
210+
})
211+
212+
t.Run("Close closes all cached instances", func(t *testing.T) {
213+
mocks := make(map[string]*mockExternalStorage)
214+
factory := func(ctx context.Context, uri string, user username.SQLUsername, opts ...cloud.ExternalStorageOption) (cloud.ExternalStorage, error) {
215+
mock := &mockExternalStorage{uri: uri}
216+
mocks[uri] = mock
217+
return mock, nil
218+
}
219+
220+
mux := NewExternalStorageMux(factory, username.RootUserName())
221+
222+
// Create multiple cached instances
223+
_, err := mux.StoreFile(ctx, "nodelocal://1/file1.sst")
224+
require.NoError(t, err)
225+
_, err = mux.StoreFile(ctx, "nodelocal://2/file2.sst")
226+
require.NoError(t, err)
227+
_, err = mux.StoreFile(ctx, "s3://bucket/file3.sst")
228+
require.NoError(t, err)
229+
230+
// Close should close all instances
231+
err = mux.Close()
232+
require.NoError(t, err)
233+
234+
// Verify all mocks were closed
235+
require.Len(t, mocks, 3)
236+
for uri, mock := range mocks {
237+
require.True(t, mock.closed, "storage instance %s should be closed", uri)
238+
}
239+
})
240+
}

pkg/sql/bulkutil/main_test.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
// Copyright 2025 The Cockroach Authors.
2+
//
3+
// Use of this software is governed by the CockroachDB Software License
4+
// included in the /LICENSE file.
5+
6+
package bulkutil
7+
8+
import (
9+
"os"
10+
"testing"
11+
12+
"github.com/cockroachdb/cockroach/pkg/util/randutil"
13+
)
14+
15+
func TestMain(m *testing.M) {
16+
randutil.SeedForTests()
17+
os.Exit(m.Run())
18+
}
19+
20+
//go:generate ../../util/leaktest/add-leaktest.sh *_test.go

0 commit comments

Comments
 (0)