Skip to content

Commit ce5d695

Browse files
authored
Merge pull request #1209 from percona/PBM-1093-azure-improvements
PBM-1093: Agents stop processing requests in case PBM runs with Azure
2 parents e11316d + fd2ca00 commit ce5d695

File tree

5 files changed

+262
-159
lines changed

5 files changed

+262
-159
lines changed

packaging/conf/pbm-conf-reference.yml

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -69,8 +69,8 @@
6969
## Retry upload configuration options.
7070
# retryer:
7171
# numMaxRetries: 3
72-
# minRetryDelay: 30
73-
# maxRetryDelay: 5
72+
# minRetryDelay: 30ms
73+
# maxRetryDelay: 300s
7474
#
7575
## The maximum object size that will be stored on the storage
7676
# maxObjSizeGB: 5018
@@ -183,6 +183,12 @@
183183
# credentials:
184184
# key:
185185
#
186+
## Retry upload configuration options.
187+
# retryer:
188+
# numMaxRetries: 3
189+
# minRetryDelay: 800ms
190+
# maxRetryDelay: 60s
191+
#
186192
## The maximum object size that will be stored on the storage
187193
# maxObjSizeGB: 194560
188194

pbm/config/config.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -343,10 +343,10 @@ func (s *StorageConf) Cast() error {
343343
return s.Minio.Cast()
344344
case storage.OSS:
345345
return s.OSS.Cast()
346+
case storage.Azure:
347+
return s.Azure.Cast()
346348
case storage.GCS:
347349
return nil
348-
case storage.Azure: // noop
349-
return nil
350350
case storage.Blackhole: // noop
351351
return nil
352352
}

pbm/storage/azure/azure.go

Lines changed: 40 additions & 155 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,9 @@ package azure
22

33
import (
44
"context"
5-
"fmt"
65
"io"
7-
"maps"
86
"net/http"
97
"path"
10-
"reflect"
118
"runtime"
129
"strings"
1310
"time"
@@ -25,139 +22,60 @@ import (
2522
const (
2623
BlobURL = "https://%s.blob.core.windows.net"
2724

28-
defaultUploadBuff = 10 << 20 // 10Mb
29-
defaultUploadMaxBuff = 5
25+
defaultUploadBuff = 10 << 20 // 10Mb
3026

31-
defaultRetries = 10
27+
defaultMaxRetries = 3
28+
defaultMinRetryDelay = 800 * time.Millisecond
29+
defaultMaxRetryDelay = 60 * time.Second
3230

3331
maxBlocks = 50_000
3432

3533
defaultMaxObjSizeGB = 194560 // 190 TB
3634
)
3735

38-
//nolint:lll
39-
type Config struct {
40-
Account string `bson:"account" json:"account,omitempty" yaml:"account,omitempty"`
41-
Container string `bson:"container" json:"container,omitempty" yaml:"container,omitempty"`
42-
EndpointURL string `bson:"endpointUrl" json:"endpointUrl,omitempty" yaml:"endpointUrl,omitempty"`
43-
EndpointURLMap map[string]string `bson:"endpointUrlMap,omitempty" json:"endpointUrlMap,omitempty" yaml:"endpointUrlMap,omitempty"`
44-
Prefix string `bson:"prefix" json:"prefix,omitempty" yaml:"prefix,omitempty"`
45-
Credentials Credentials `bson:"credentials" json:"-" yaml:"credentials"`
46-
MaxObjSizeGB *float64 `bson:"maxObjSizeGB,omitempty" json:"maxObjSizeGB,omitempty" yaml:"maxObjSizeGB,omitempty"`
47-
}
48-
49-
func (cfg *Config) Clone() *Config {
50-
if cfg == nil {
51-
return nil
52-
}
53-
54-
rv := *cfg
55-
rv.EndpointURLMap = maps.Clone(cfg.EndpointURLMap)
56-
if cfg.MaxObjSizeGB != nil {
57-
v := *cfg.MaxObjSizeGB
58-
rv.MaxObjSizeGB = &v
59-
}
60-
return &rv
61-
}
62-
63-
func (cfg *Config) Equal(other *Config) bool {
64-
if cfg == nil || other == nil {
65-
return cfg == other
66-
}
67-
68-
if cfg.Account != other.Account {
69-
return false
70-
}
71-
if cfg.Container != other.Container {
72-
return false
73-
}
74-
if cfg.EndpointURL != other.EndpointURL {
75-
return false
76-
}
77-
if !maps.Equal(cfg.EndpointURLMap, other.EndpointURLMap) {
78-
return false
79-
}
80-
if cfg.Prefix != other.Prefix {
81-
return false
82-
}
83-
if cfg.Credentials.Key != other.Credentials.Key {
84-
return false
85-
}
86-
if !reflect.DeepEqual(cfg.MaxObjSizeGB, other.MaxObjSizeGB) {
87-
return false
88-
}
89-
90-
return true
91-
}
92-
93-
// IsSameStorage identifies the same instance of the Azure storage.
94-
func (cfg *Config) IsSameStorage(other *Config) bool {
95-
if cfg == nil || other == nil {
96-
return cfg == other
97-
}
98-
99-
if cfg.Account != other.Account {
100-
return false
101-
}
102-
if cfg.Container != other.Container {
103-
return false
104-
}
105-
if cfg.Prefix != other.Prefix {
106-
return false
107-
}
108-
return true
109-
}
110-
111-
// resolveEndpointURL returns endpoint url based on provided
112-
// EndpointURL or associated EndpointURLMap configuration fields.
113-
// If specified EndpointURLMap overrides EndpointURL field.
114-
func (cfg *Config) resolveEndpointURL(node string) string {
115-
ep := cfg.EndpointURL
116-
if epm, ok := cfg.EndpointURLMap[node]; ok {
117-
ep = epm
118-
}
119-
if ep == "" {
120-
ep = fmt.Sprintf(BlobURL, cfg.Account)
121-
}
122-
return ep
123-
}
124-
125-
func (cfg *Config) GetMaxObjSizeGB() float64 {
126-
if cfg.MaxObjSizeGB != nil && *cfg.MaxObjSizeGB > 0 {
127-
return *cfg.MaxObjSizeGB
128-
}
129-
return defaultMaxObjSizeGB
130-
}
131-
132-
type Credentials struct {
133-
Key string `bson:"key" json:"key,omitempty" yaml:"key,omitempty"`
134-
}
135-
13636
type Blob struct {
137-
opts *Config
37+
cfg *Config
13838
node string
13939
log log.LogEvent
140-
// url *url.URL
141-
c *azblob.Client
40+
c *azblob.Client
14241
}
14342

144-
func New(opts *Config, node string, l log.LogEvent) (storage.Storage, error) {
43+
func New(cfg *Config, node string, l log.LogEvent) (storage.Storage, error) {
44+
err := cfg.Cast()
45+
if err != nil {
46+
return nil, errors.Wrap(err, "set defaults")
47+
}
14548
if l == nil {
14649
l = log.DiscardEvent
14750
}
14851
b := &Blob{
149-
opts: opts,
52+
cfg: cfg,
15053
node: node,
15154
log: l,
15255
}
15356

154-
var err error
15557
b.c, err = b.client()
15658
if err != nil {
15759
return nil, errors.Wrap(err, "init container")
15860
}
15961

160-
return storage.NewSplitMergeMW(b, opts.GetMaxObjSizeGB()), b.ensureContainer()
62+
return storage.NewSplitMergeMW(b, cfg.GetMaxObjSizeGB()), nil
63+
}
64+
65+
func (b *Blob) client() (*azblob.Client, error) {
66+
cred, err := azblob.NewSharedKeyCredential(b.cfg.Account, b.cfg.Credentials.Key)
67+
if err != nil {
68+
return nil, errors.Wrap(err, "create credentials")
69+
}
70+
71+
opts := &azblob.ClientOptions{}
72+
opts.Retry = policy.RetryOptions{
73+
MaxRetries: b.cfg.Retryer.NumMaxRetries,
74+
RetryDelay: b.cfg.Retryer.MinRetryDelay,
75+
MaxRetryDelay: b.cfg.Retryer.MaxRetryDelay,
76+
}
77+
epURL := b.cfg.resolveEndpointURL(b.node)
78+
return azblob.NewClientWithSharedKeyCredential(epURL, cred, opts)
16179
}
16280

16381
func (*Blob) Type() storage.Type {
@@ -180,18 +98,15 @@ func (b *Blob) Save(name string, data io.Reader, options ...storage.Option) erro
18098
}
18199
}
182100

183-
cc := runtime.NumCPU() / 2
184-
if cc == 0 {
185-
cc = 1
186-
}
101+
cc := max(runtime.NumCPU()/2, 1)
187102

188103
if b.log != nil && opts.UseLogger {
189104
b.log.Debug("BufferSize is set to %d (~%dMb) | %d", bufsz, bufsz>>20, opts.Size)
190105
}
191106

192107
_, err := b.c.UploadStream(context.TODO(),
193-
b.opts.Container,
194-
path.Join(b.opts.Prefix, name),
108+
b.cfg.Container,
109+
path.Join(b.cfg.Prefix, name),
195110
data,
196111
&azblob.UploadStreamOptions{
197112
BlockSize: int64(bufsz),
@@ -202,13 +117,13 @@ func (b *Blob) Save(name string, data io.Reader, options ...storage.Option) erro
202117
}
203118

204119
func (b *Blob) List(prefix, suffix string) ([]storage.FileInfo, error) {
205-
prfx := path.Join(b.opts.Prefix, prefix)
120+
prfx := path.Join(b.cfg.Prefix, prefix)
206121

207122
if prfx != "" && !strings.HasSuffix(prfx, "/") {
208123
prfx += "/"
209124
}
210125

211-
pager := b.c.NewListBlobsFlatPager(b.opts.Container, &azblob.ListBlobsFlatOptions{
126+
pager := b.c.NewListBlobsFlatPager(b.cfg.Container, &azblob.ListBlobsFlatOptions{
212127
Prefix: &prfx,
213128
})
214129

@@ -252,8 +167,8 @@ func (b *Blob) FileStat(name string) (storage.FileInfo, error) {
252167
inf := storage.FileInfo{}
253168

254169
p, err := b.c.ServiceClient().
255-
NewContainerClient(b.opts.Container).
256-
NewBlockBlobClient(path.Join(b.opts.Prefix, name)).
170+
NewContainerClient(b.cfg.Container).
171+
NewBlockBlobClient(path.Join(b.cfg.Prefix, name)).
257172
GetProperties(context.TODO(), nil)
258173
if err != nil {
259174
if isNotFound(err) {
@@ -275,8 +190,8 @@ func (b *Blob) FileStat(name string) (storage.FileInfo, error) {
275190
}
276191

277192
func (b *Blob) Copy(src, dst string) error {
278-
to := b.c.ServiceClient().NewContainerClient(b.opts.Container).NewBlockBlobClient(path.Join(b.opts.Prefix, dst))
279-
from := b.c.ServiceClient().NewContainerClient(b.opts.Container).NewBlockBlobClient(path.Join(b.opts.Prefix, src))
193+
to := b.c.ServiceClient().NewContainerClient(b.cfg.Container).NewBlockBlobClient(path.Join(b.cfg.Prefix, dst))
194+
from := b.c.ServiceClient().NewContainerClient(b.cfg.Container).NewBlockBlobClient(path.Join(b.cfg.Prefix, src))
280195
r, err := to.StartCopyFromURL(context.TODO(), from.BlobClient().URL(), nil)
281196
if err != nil {
282197
return errors.Wrap(err, "start copy")
@@ -316,7 +231,7 @@ func (b *Blob) DownloadStat() storage.DownloadStat {
316231
}
317232

318233
func (b *Blob) SourceReader(name string) (io.ReadCloser, error) {
319-
o, err := b.c.DownloadStream(context.TODO(), b.opts.Container, path.Join(b.opts.Prefix, name), nil)
234+
o, err := b.c.DownloadStream(context.TODO(), b.cfg.Container, path.Join(b.cfg.Prefix, name), nil)
320235
if err != nil {
321236
if isNotFound(err) {
322237
return nil, storage.ErrNotExist
@@ -335,7 +250,7 @@ func (b *Blob) SourceReader(name string) (io.ReadCloser, error) {
335250
}
336251

337252
func (b *Blob) Delete(name string) error {
338-
_, err := b.c.DeleteBlob(context.TODO(), b.opts.Container, path.Join(b.opts.Prefix, name), nil)
253+
_, err := b.c.DeleteBlob(context.TODO(), b.cfg.Container, path.Join(b.cfg.Prefix, name), nil)
339254
if err != nil {
340255
if isNotFound(err) {
341256
return storage.ErrNotExist
@@ -346,36 +261,6 @@ func (b *Blob) Delete(name string) error {
346261
return nil
347262
}
348263

349-
func (b *Blob) ensureContainer() error {
350-
_, err := b.c.ServiceClient().NewContainerClient(b.opts.Container).GetProperties(context.TODO(), nil)
351-
// container already exists
352-
if err == nil {
353-
return nil
354-
}
355-
356-
var stgErr *azcore.ResponseError
357-
if errors.As(err, &stgErr) && stgErr.StatusCode != http.StatusNotFound {
358-
return errors.Wrap(err, "check container")
359-
}
360-
361-
_, err = b.c.CreateContainer(context.TODO(), b.opts.Container, nil)
362-
return err
363-
}
364-
365-
func (b *Blob) client() (*azblob.Client, error) {
366-
cred, err := azblob.NewSharedKeyCredential(b.opts.Account, b.opts.Credentials.Key)
367-
if err != nil {
368-
return nil, errors.Wrap(err, "create credentials")
369-
}
370-
371-
opts := &azblob.ClientOptions{}
372-
opts.Retry = policy.RetryOptions{
373-
MaxRetries: defaultRetries,
374-
}
375-
epURL := b.opts.resolveEndpointURL(b.node)
376-
return azblob.NewClientWithSharedKeyCredential(epURL, cred, opts)
377-
}
378-
379264
func isNotFound(err error) bool {
380265
var stgErr *azcore.ResponseError
381266
if errors.As(err, &stgErr) {

0 commit comments

Comments
 (0)