Skip to content

Commit 59c8964

Browse files
committed
handle BucketRefError for single operations
1 parent a6423d9 commit 59c8964

File tree

8 files changed

+60
-50
lines changed

8 files changed

+60
-50
lines changed

crud/common/call.lua

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ local sharding_utils = require('crud.common.sharding.utils')
88
local fiber = require('fiber')
99
local const = require('crud.common.const')
1010
local rebalance = require('crud.common.rebalance')
11+
local bucket_ref_unref = require('crud.common.sharding.bucket_ref_unref')
1112

1213
local BaseIterator = require('crud.common.map_call_cases.base_iter')
1314
local BasePostprocessor = require('crud.common.map_call_cases.base_postprocessor')
@@ -149,6 +150,7 @@ local function wrap_vshard_err(vshard_router, err, func_name, replicaset_id, buc
149150
))
150151
end
151152

153+
-- todo: rename it
152154
local function retry_call_with_master_discovery(vshard_router, replicaset,
153155
method, func_name, func_args,
154156
call_opts, mode, bucket_ids)
@@ -161,19 +163,23 @@ local function retry_call_with_master_discovery(vshard_router, replicaset,
161163
if err == nil then
162164
return resp, err
163165
end
164-
166+
165167
-- This is a partial copy of error handling from vshard.router.router_call_impl()
166168
-- It is much simpler mostly because bucket_set() can't be accessed from outside vshard.
167-
if err.name == 'WRONG_BUCKET' or
168-
err.name == 'BUCKET_IS_LOCKED' or
169-
err.name == 'TRANSFER_IS_IN_PROGRESS' then
170-
vshard_router:_bucket_reset(err.bucket_id)
171-
172-
-- Substitute replicaset only for single bucket_id calls.
173-
if err.destination and vshard_router.replicasets[err.destination] and #bucket_ids == 1 then
174-
replicaset = vshard_router.replicasets[err.destination]
175-
else
176-
return nil, err
169+
-- may be we cant get this -- todo: check
170+
if err.class_name == bucket_ref_unref.BucketRefError.name then
171+
for _, bucket_ref_err in pairs(err.bucket_ref_errs) do
172+
local bucket_id = bucket_ref_err.bucket_id
173+
local vshard_err = bucket_ref_err.vshard_err
174+
if vshard_err.name == 'WRONG_BUCKET' or
175+
vshard_err.name == 'BUCKET_IS_LOCKED' or
176+
vshard_err.name == 'TRANSFER_IS_IN_PROGRESS' then
177+
vshard_router:_bucket_reset(bucket_id)
178+
179+
if vshard_err.destination and vshard_router.replicasets[vshard_err.destination] then
180+
replicaset = vshard_router.replicasets[vshard_err.destination]
181+
end
182+
end
177183
end
178184
elseif err.name == 'MISSING_MASTER' and replicaset.locate_master ~= nil then
179185
replicaset:locate_master()

crud/common/sharding/bucket_ref_unref.lua

Lines changed: 30 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,22 @@
99
--- safe_mode change during execution.
1010

1111
local vshard = require('vshard')
12+
local errors = require('errors')
1213

13-
local M = {}
14+
local M = {
15+
BucketRefError = errors.new_class('bucket_ref_error', {capture_stack = false})
16+
}
17+
18+
local function make_bucket_ref_err(bucket_id, vshard_ref_err)
19+
local err = M.BucketRefError:new(M.BucketRefError:new("failed bucket_ref: %s, bucket_id: %s", vshard_ref_err.name, bucket_id))
20+
err.bucket_ref_errs = {
21+
{
22+
bucket_id = bucket_id,
23+
vshard_err = vshard_ref_err,
24+
}
25+
}
26+
return err
27+
end
1428

1529
--- on module initialization safe_mode_status func must be set
1630
--- it's rebalance.safe_mode_status
@@ -31,7 +45,13 @@ function M.bucket_refrw(bucket_id)
3145
if not M.safe_mode_status() then
3246
return true
3347
end
34-
return vshard.storage.bucket_refrw(bucket_id)
48+
49+
local ref_ok, vshard_ref_err = vshard.storage.bucket_refrw(bucket_id)
50+
if not ref_ok then
51+
return false, make_bucket_ref_err(bucket_id, vshard_ref_err)
52+
end
53+
54+
return true
3555
end
3656

3757
--- bucket_unrefrw calls vshard.storage.bucket_unrefrw if safe mode enabled
@@ -41,6 +61,7 @@ function M.bucket_unrefrw(bucket_id)
4161
if not M.safe_mode_status() then
4262
return true
4363
end
64+
4465
return vshard.storage.bucket_unrefrw(bucket_id)
4566
end
4667

@@ -50,7 +71,13 @@ function M.bucket_refro(bucket_id)
5071
if not M.safe_mode_status() then
5172
return true
5273
end
53-
return vshard.storage.bucket_refro(bucket_id)
74+
75+
local ref_ok, vshard_ref_err = vshard.storage.bucket_refro(bucket_id)
76+
if not ref_ok then
77+
return false, make_bucket_ref_err(bucket_id, vshard_ref_err)
78+
end
79+
80+
return true
5481
end
5582

5683
--- bucket_unrefro calls vshard.storage.bucket_unrefro if safe mode enabled
@@ -62,28 +89,4 @@ function M.bucket_unrefro(bucket_id)
6289
return vshard.storage.bucket_unrefro(bucket_id)
6390
end
6491

65-
--- bucket_unrefro calls vshard.storage.bucket_unrefrw for every bucket if safe mode enabled
66-
--- otherwise returns true
67-
--- @param bucket_ids table<number, true>
68-
function M.bucket_unrefrw_many(bucket_ids)
69-
if not M.safe_mode_status() then
70-
return true
71-
end
72-
73-
local unref_all_ok = true
74-
local unref_last_err
75-
for reffed_bucket_id in pairs(bucket_ids) do
76-
local unref_ok, unref_err = M.bucket_unrefrw(reffed_bucket_id)
77-
if not unref_ok then
78-
unref_all_ok = nil
79-
unref_last_err = unref_err
80-
end
81-
end
82-
83-
if not unref_all_ok then
84-
return nil, unref_last_err
85-
end
86-
return true
87-
end
88-
8992
return M

crud/delete.lua

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,9 +45,9 @@ local function delete_on_storage(space_name, key, field_names, opts)
4545
end
4646

4747
local function make_delete()
48-
local ref_ok, err_ref = bucket_ref_unref.bucket_refrw(opts.bucket_id)
48+
local ref_ok, bucket_ref_err = bucket_ref_unref.bucket_refrw(opts.bucket_id)
4949
if not ref_ok then
50-
return nil, err_ref
50+
return nil, bucket_ref_err
5151
end
5252

5353
-- add_space_schema_hash is false because

crud/get.lua

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,9 +43,9 @@ local function get_on_storage(space_name, key, field_names, opts)
4343
return nil, err
4444
end
4545

46-
local ref_ok, err_ref = bucket_ref_unref.bucket_refro(opts.bucket_id)
46+
local ref_ok, bucket_ref_err = bucket_ref_unref.bucket_refro(opts.bucket_id)
4747
if not ref_ok then
48-
return nil, err_ref
48+
return nil, bucket_ref_err
4949
end
5050
-- add_space_schema_hash is false because
5151
-- reloading space format on router can't avoid get error on storage

crud/insert.lua

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,9 +45,10 @@ local function insert_on_storage(space_name, tuple, opts)
4545

4646
local function make_insert()
4747
local bucket_id = tuple[utils.get_bucket_id_fieldno(space)]
48-
local ref_ok, err_ref = bucket_ref_unref.bucket_refrw(bucket_id)
48+
local ref_ok, bucket_ref_err = bucket_ref_unref.bucket_refrw(bucket_id)
49+
4950
if not ref_ok then
50-
return nil, err_ref
51+
return nil, bucket_ref_err
5152
end
5253

5354
-- add_space_schema_hash is true only in case of insert_object

crud/replace.lua

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,9 +45,9 @@ local function replace_on_storage(space_name, tuple, opts)
4545

4646
local function make_replace()
4747
local bucket_id = tuple[utils.get_bucket_id_fieldno(space)]
48-
local ref_ok, err_ref = bucket_ref_unref.bucket_refrw(bucket_id)
48+
local ref_ok, bucket_ref_err = bucket_ref_unref.bucket_refrw(bucket_id)
4949
if not ref_ok then
50-
return nil, err_ref
50+
return nil, bucket_ref_err
5151
end
5252
-- add_space_schema_hash is true only in case of replace_object
5353
-- the only one case when reloading schema can avoid insert error

crud/update.lua

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,9 +45,9 @@ local function update_on_storage(space_name, key, operations, field_names, opts)
4545
end
4646

4747
local function make_update()
48-
local ref_ok, err_ref = bucket_ref_unref.bucket_refrw(opts.bucket_id)
48+
local ref_ok, bucket_ref_err = bucket_ref_unref.bucket_refrw(opts.bucket_id)
4949
if not ref_ok then
50-
return nil, err_ref
50+
return nil, bucket_ref_err
5151
end
5252

5353
-- add_space_schema_hash is false because

crud/upsert.lua

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,9 +44,9 @@ local function upsert_on_storage(space_name, tuple, operations, opts)
4444

4545
local function make_upsert()
4646
local bucket_id = tuple[utils.get_bucket_id_fieldno(space)]
47-
local ref_ok, err_ref = bucket_ref_unref.bucket_refrw(bucket_id)
47+
local ref_ok, bucket_ref_err = bucket_ref_unref.bucket_refrw(bucket_id)
4848
if not ref_ok then
49-
return nil, err_ref
49+
return nil, bucket_ref_err
5050
end
5151

5252
-- add_space_schema_hash is true only in case of upsert_object

0 commit comments

Comments
 (0)