Skip to content

Commit d595884

Browse files
committed
Add Retryer for Azure storage
Following config parameters are available: - numMaxRetries - default: 3 - minRetryDelay - default: 800ms - maxRetryDelay - default: 60s
1 parent 18eb1ae commit d595884

File tree

3 files changed

+59
-21
lines changed

3 files changed

+59
-21
lines changed

pbm/config/config.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -343,10 +343,10 @@ func (s *StorageConf) Cast() error {
343343
return s.Minio.Cast()
344344
case storage.OSS:
345345
return s.OSS.Cast()
346+
case storage.Azure:
347+
return s.Azure.Cast()
346348
case storage.GCS:
347349
return nil
348-
case storage.Azure: // noop
349-
return nil
350350
case storage.Blackhole: // noop
351351
return nil
352352
}

pbm/storage/azure/azure.go

Lines changed: 25 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -24,50 +24,56 @@ const (
2424

2525
defaultUploadBuff = 10 << 20 // 10Mb
2626

27-
defaultRetries = 10
27+
defaultMaxRetries = 3
28+
defaultMaxRetryDelay = 60 * time.Second
2829

2930
maxBlocks = 50_000
3031

3132
defaultMaxObjSizeGB = 194560 // 190 TB
3233
)
3334

3435
type Blob struct {
35-
opts *Config
36+
cfg *Config
3637
node string
3738
log log.LogEvent
3839
c *azblob.Client
3940
}
4041

41-
func New(opts *Config, node string, l log.LogEvent) (storage.Storage, error) {
42+
func New(cfg *Config, node string, l log.LogEvent) (storage.Storage, error) {
43+
err := cfg.Cast()
44+
if err != nil {
45+
return nil, errors.Wrap(err, "set defaults")
46+
}
4247
if l == nil {
4348
l = log.DiscardEvent
4449
}
4550
b := &Blob{
46-
opts: opts,
51+
cfg: cfg,
4752
node: node,
4853
log: l,
4954
}
5055

51-
var err error
5256
b.c, err = b.client()
5357
if err != nil {
5458
return nil, errors.Wrap(err, "init container")
5559
}
5660

57-
return storage.NewSplitMergeMW(b, opts.GetMaxObjSizeGB()), nil
61+
return storage.NewSplitMergeMW(b, cfg.GetMaxObjSizeGB()), nil
5862
}
5963

6064
func (b *Blob) client() (*azblob.Client, error) {
61-
cred, err := azblob.NewSharedKeyCredential(b.opts.Account, b.opts.Credentials.Key)
65+
cred, err := azblob.NewSharedKeyCredential(b.cfg.Account, b.cfg.Credentials.Key)
6266
if err != nil {
6367
return nil, errors.Wrap(err, "create credentials")
6468
}
6569

6670
opts := &azblob.ClientOptions{}
6771
opts.Retry = policy.RetryOptions{
68-
MaxRetries: defaultRetries,
72+
MaxRetries: b.cfg.Retryer.NumMaxRetries,
73+
RetryDelay: b.cfg.Retryer.MinRetryDelay,
74+
MaxRetryDelay: b.cfg.Retryer.MaxRetryDelay,
6975
}
70-
epURL := b.opts.resolveEndpointURL(b.node)
76+
epURL := b.cfg.resolveEndpointURL(b.node)
7177
return azblob.NewClientWithSharedKeyCredential(epURL, cred, opts)
7278
}
7379

@@ -98,8 +104,8 @@ func (b *Blob) Save(name string, data io.Reader, options ...storage.Option) erro
98104
}
99105

100106
_, err := b.c.UploadStream(context.TODO(),
101-
b.opts.Container,
102-
path.Join(b.opts.Prefix, name),
107+
b.cfg.Container,
108+
path.Join(b.cfg.Prefix, name),
103109
data,
104110
&azblob.UploadStreamOptions{
105111
BlockSize: int64(bufsz),
@@ -110,13 +116,13 @@ func (b *Blob) Save(name string, data io.Reader, options ...storage.Option) erro
110116
}
111117

112118
func (b *Blob) List(prefix, suffix string) ([]storage.FileInfo, error) {
113-
prfx := path.Join(b.opts.Prefix, prefix)
119+
prfx := path.Join(b.cfg.Prefix, prefix)
114120

115121
if prfx != "" && !strings.HasSuffix(prfx, "/") {
116122
prfx += "/"
117123
}
118124

119-
pager := b.c.NewListBlobsFlatPager(b.opts.Container, &azblob.ListBlobsFlatOptions{
125+
pager := b.c.NewListBlobsFlatPager(b.cfg.Container, &azblob.ListBlobsFlatOptions{
120126
Prefix: &prfx,
121127
})
122128

@@ -160,8 +166,8 @@ func (b *Blob) FileStat(name string) (storage.FileInfo, error) {
160166
inf := storage.FileInfo{}
161167

162168
p, err := b.c.ServiceClient().
163-
NewContainerClient(b.opts.Container).
164-
NewBlockBlobClient(path.Join(b.opts.Prefix, name)).
169+
NewContainerClient(b.cfg.Container).
170+
NewBlockBlobClient(path.Join(b.cfg.Prefix, name)).
165171
GetProperties(context.TODO(), nil)
166172
if err != nil {
167173
if isNotFound(err) {
@@ -183,8 +189,8 @@ func (b *Blob) FileStat(name string) (storage.FileInfo, error) {
183189
}
184190

185191
func (b *Blob) Copy(src, dst string) error {
186-
to := b.c.ServiceClient().NewContainerClient(b.opts.Container).NewBlockBlobClient(path.Join(b.opts.Prefix, dst))
187-
from := b.c.ServiceClient().NewContainerClient(b.opts.Container).NewBlockBlobClient(path.Join(b.opts.Prefix, src))
192+
to := b.c.ServiceClient().NewContainerClient(b.cfg.Container).NewBlockBlobClient(path.Join(b.cfg.Prefix, dst))
193+
from := b.c.ServiceClient().NewContainerClient(b.cfg.Container).NewBlockBlobClient(path.Join(b.cfg.Prefix, src))
188194
r, err := to.StartCopyFromURL(context.TODO(), from.BlobClient().URL(), nil)
189195
if err != nil {
190196
return errors.Wrap(err, "start copy")
@@ -224,7 +230,7 @@ func (b *Blob) DownloadStat() storage.DownloadStat {
224230
}
225231

226232
func (b *Blob) SourceReader(name string) (io.ReadCloser, error) {
227-
o, err := b.c.DownloadStream(context.TODO(), b.opts.Container, path.Join(b.opts.Prefix, name), nil)
233+
o, err := b.c.DownloadStream(context.TODO(), b.cfg.Container, path.Join(b.cfg.Prefix, name), nil)
228234
if err != nil {
229235
if isNotFound(err) {
230236
return nil, storage.ErrNotExist
@@ -243,7 +249,7 @@ func (b *Blob) SourceReader(name string) (io.ReadCloser, error) {
243249
}
244250

245251
func (b *Blob) Delete(name string) error {
246-
_, err := b.c.DeleteBlob(context.TODO(), b.opts.Container, path.Join(b.opts.Prefix, name), nil)
252+
_, err := b.c.DeleteBlob(context.TODO(), b.cfg.Container, path.Join(b.cfg.Prefix, name), nil)
247253
if err != nil {
248254
if isNotFound(err) {
249255
return storage.ErrNotExist

pbm/storage/azure/config.go

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,9 @@ import (
44
"fmt"
55
"maps"
66
"reflect"
7+
"time"
8+
9+
"github.com/percona/percona-backup-mongodb/pbm/errors"
710
)
811

912
//nolint:lll
@@ -14,13 +17,20 @@ type Config struct {
1417
EndpointURLMap map[string]string `bson:"endpointUrlMap,omitempty" json:"endpointUrlMap,omitempty" yaml:"endpointUrlMap,omitempty"`
1518
Prefix string `bson:"prefix" json:"prefix,omitempty" yaml:"prefix,omitempty"`
1619
Credentials Credentials `bson:"credentials" json:"-" yaml:"credentials"`
20+
Retryer *Retryer `bson:"retryer,omitempty" json:"retryer,omitempty" yaml:"retryer,omitempty"`
1721
MaxObjSizeGB *float64 `bson:"maxObjSizeGB,omitempty" json:"maxObjSizeGB,omitempty" yaml:"maxObjSizeGB,omitempty"`
1822
}
1923

2024
type Credentials struct {
2125
Key string `bson:"key" json:"key,omitempty" yaml:"key,omitempty"`
2226
}
2327

28+
type Retryer struct {
29+
NumMaxRetries int32 `bson:"numMaxRetries" json:"numMaxRetries" yaml:"numMaxRetries"`
30+
MinRetryDelay time.Duration `bson:"minRetryDelay" json:"minRetryDelay" yaml:"minRetryDelay"`
31+
MaxRetryDelay time.Duration `bson:"maxRetryDelay" json:"maxRetryDelay" yaml:"maxRetryDelay"`
32+
}
33+
2434
func (cfg *Config) Clone() *Config {
2535
if cfg == nil {
2636
return nil
@@ -83,6 +93,28 @@ func (cfg *Config) IsSameStorage(other *Config) bool {
8393
return true
8494
}
8595

96+
func (cfg *Config) Cast() error {
97+
if cfg == nil {
98+
return errors.New("missing azure configuration with azure storage type")
99+
}
100+
101+
if cfg.Retryer == nil {
102+
cfg.Retryer = &Retryer{
103+
NumMaxRetries: defaultMaxRetries,
104+
MaxRetryDelay: defaultMaxRetryDelay,
105+
}
106+
} else {
107+
if cfg.Retryer.NumMaxRetries == 0 {
108+
cfg.Retryer.NumMaxRetries = defaultMaxRetries
109+
}
110+
if cfg.Retryer.MaxRetryDelay == 0 {
111+
cfg.Retryer.MaxRetryDelay = defaultMaxRetryDelay
112+
}
113+
}
114+
115+
return nil
116+
}
117+
86118
// resolveEndpointURL returns endpoint url based on provided
87119
// EndpointURL or associated EndpointURLMap configuration fields.
88120
// If specified EndpointURLMap overrides EndpointURL field.

0 commit comments

Comments
 (0)