Skip to content

Commit 1fa88c7

Browse files
committed
TNTP-2109: Multiple minor fixes
1 parent 90e1fa2 commit 1fa88c7

File tree

5 files changed

+26
-44
lines changed

5 files changed

+26
-44
lines changed

crud/common/call.lua

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ local function bucket_ref_many(bucket_ids, mode)
4747
end
4848

4949
local function call_on_storage_safe(run_as_user, bucket_ids, mode, func_name, ...)
50-
fiber.name(CRUD_CALL_FIBER_NAME .. func_name)
50+
fiber.name(CRUD_CALL_FIBER_NAME .. 'safe/' .. func_name)
5151

5252
local ok, ref_err = bucket_ref_many(bucket_ids, mode)
5353
if not ok then
@@ -65,7 +65,7 @@ local function call_on_storage_safe(run_as_user, bucket_ids, mode, func_name, ..
6565
end
6666

6767
local function call_on_storage_fast(run_as_user, _, _, func_name, ...)
68-
fiber.name(CRUD_CALL_FIBER_NAME .. func_name)
68+
fiber.name(CRUD_CALL_FIBER_NAME .. 'fast/' .. func_name)
6969

7070
return box.session.su(run_as_user, call_cache.func_name_to_func(func_name), ...)
7171
end
@@ -162,9 +162,11 @@ local function retry_call_with_master_discovery(vshard_router, replicaset,
162162
return resp, err
163163
end
164164

165+
-- This is a partial copy of error handling from vshard.router.router_call_impl()
166+
-- It is much simpler mostly because bucket_set() can't be accessed from outside vshard.
165167
if err.name == 'WRONG_BUCKET' or
166-
err.name == 'BUCKET_IS_LOCKED' or
167-
err.name == 'TRANSFER_IS_IN_PROGRESS' then
168+
err.name == 'BUCKET_IS_LOCKED' or
169+
err.name == 'TRANSFER_IS_IN_PROGRESS' then
168170
vshard_router:_bucket_reset(err.bucket_id)
169171

170172
-- Substitute replicaset only for single bucket_id calls.

crud/common/map_call_cases/base_iter.lua

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ end
6767
-- @return[1] table func_args
6868
-- @return[2] table replicaset
6969
-- @return[3] string replicaset_id
70+
-- @return[4] table bucket_ids
7071
function BaseIterator:get()
7172
local replicaset_id = self.next_index
7273
local replicaset = self.next_replicaset

crud/common/map_call_cases/batch_insert_iter.lua

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ end
6868
-- @return[1] table func_args
6969
-- @return[2] table replicaset
7070
-- @return[3] string replicaset_id
71+
-- @return[4] table bucket_ids
7172
function BatchInsertIterator:get()
7273
local replicaset_id = self.next_index
7374
local replicaset = self.next_batch.replicaset

crud/common/map_call_cases/batch_upsert_iter.lua

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ end
7676
-- @return[1] table func_args
7777
-- @return[2] table replicaset
7878
-- @return[3] string replicaset_id
79+
-- @return[4] table bucket_ids
7980
function BatchUpsertIterator:get()
8081
local replicaset_id = self.next_index
8182
local replicaset = self.next_batch.replicaset

crud/common/rebalance.lua

Lines changed: 17 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -2,21 +2,16 @@ local fiber = require('fiber')
22
local vshard_consts = require('vshard.consts')
33
local utils = require('crud.common.utils')
44

5-
local MODULE_INTERNALS = '__module_crud_rebalance'
65
local SETTINGS_SPACE_NAME = '_crud_settings'
6+
local SAFE_MOD_ENABLE_EVENT = '_crud.safe_mode_enable'
77

88

9-
local M = rawget(_G, MODULE_INTERNALS)
10-
if not M then
11-
M = {
12-
safe_mode = false,
13-
safe_mode_enable_hooks = {},
14-
safe_mode_disable_hooks = {},
15-
_router_cache_last_clear_ts = fiber.time()
16-
}
17-
else
18-
return M
19-
end
9+
local M = {
10+
safe_mode = false,
11+
safe_mode_enable_hooks = {},
12+
safe_mode_disable_hooks = {},
13+
_router_cache_last_clear_ts = fiber.time()
14+
}
2015

2116
local function create_space()
2217
local settings_space = box.schema.space.create(SETTINGS_SPACE_NAME, {
@@ -36,7 +31,7 @@ local function safe_mode_trigger(_, new, space, op)
3631
end
3732
if (op == 'INSERT' and new.status == vshard_consts.BUCKET.RECEIVING) or
3833
(op == 'REPLACE' and new.status == vshard_consts.BUCKET.SENDING) then
39-
box.broadcast('_crud.safe_mode_enable', true)
34+
box.broadcast(SAFE_MOD_ENABLE_EVENT, true)
4035
end
4136
end
4237

@@ -86,11 +81,17 @@ end
8681

8782
local function rebalance_init()
8883
box.watch('box.status', function()
89-
if box.info.ro or box.space[SETTINGS_SPACE_NAME] == nil then
84+
if box.info.ro then
9085
return
9186
end
9287

93-
local stored_safe_mode = box.space[SETTINGS_SPACE_NAME]:get{ 'safe_mode' }
88+
local stored_safe_mode
89+
if box.space[SETTINGS_SPACE_NAME] == nil then
90+
create_space()
91+
box.space[SETTINGS_SPACE_NAME]:insert{ 'safe_mode', false }
92+
else
93+
stored_safe_mode = box.space[SETTINGS_SPACE_NAME]:get{ 'safe_mode' }
94+
end
9495
M.safe_mode = stored_safe_mode.value
9596

9697
if M.safe_mode then
@@ -105,36 +106,12 @@ local function rebalance_init()
105106
end
106107
end)
107108

108-
box.watch('_crud.safe_mode_enable', function(_, do_enable)
109+
box.watch(SAFE_MOD_ENABLE_EVENT, function(_, do_enable)
109110
if box.info.ro or not do_enable then
110111
return
111112
end
112113
safe_mode_enable()
113114
end)
114-
115-
if box.info.ro then
116-
return
117-
end
118-
119-
local stored_safe_mode
120-
if box.space[SETTINGS_SPACE_NAME] == nil then
121-
create_space()
122-
box.space[SETTINGS_SPACE_NAME]:insert{ 'safe_mode', false }
123-
else
124-
stored_safe_mode = box.space[SETTINGS_SPACE_NAME]:get{ 'safe_mode' }
125-
end
126-
M.safe_mode = stored_safe_mode and stored_safe_mode.value or false
127-
128-
if M.safe_mode then
129-
for hook, _ in pairs(M.safe_mode_enable_hooks) do
130-
hook()
131-
end
132-
else
133-
box.space._bucket:on_replace(safe_mode_trigger)
134-
for hook, _ in pairs(M.safe_mode_disable_hooks) do
135-
hook()
136-
end
137-
end
138115
end
139116

140117
local function rebalance_stop()

0 commit comments

Comments
 (0)