Skip to content

Commit d82469c

Browse files
committed
call bucket_ref/bucket_unref on crud operations
added module bucket_ref_unref.lua with functions bucket_refrw, bucket_unrefrw, bucket_refro, etc ... module looks at rebalancer.safe_mode flag. if it's disabled module does nothing, otherwise module call ref fucnctions bucket_ref fail indicated bucket movement. it will help with problem with double buckets.
1 parent 5f3f46f commit d82469c

File tree

5 files changed

+155
-18
lines changed

5 files changed

+155
-18
lines changed
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
--- module to call vshard.storage.bucket_ref / vshard.storage.bucket_unref
2+
--- on write requests
3+
--- there are two modes: safe and fast. on safe mode module
4+
--- calls vshard.storage.bucket_ref / vshard.storage.bucket_unref
5+
--- on fast mode it does nothing.
6+
--- default is fast mode.
7+
8+
--- bucket_refw and bucket_unrefrw must be called in one transaction in order to prevent
9+
--- safe_mode change during execution.
10+
11+
local vshard = require('vshard')
12+
13+
local M = {}
14+
15+
--- on module initialization safe_mode_status func must be set
16+
--- it's rebalance.safe_mode_status
17+
function M.safe_mode_status()
18+
error('safe_mode_status not set')
19+
end
20+
21+
--- set safe mode func
22+
--- from rabalance.safe_mode_status
23+
function M.set_safe_mode_status(safe_mode_status)
24+
M.safe_mode_status = safe_mode_status
25+
end
26+
27+
--- bucket_refrw calls vshard.storage.bucket_refrw if safe mode enabled
28+
--- otherwise returns true
29+
--- must be called with bucket_unrefrw in transaction
30+
function M.bucket_refrw(bucket_id)
31+
if not M.safe_mode_status() then
32+
return true
33+
end
34+
return vshard.storage.bucket_refrw(bucket_id)
35+
end
36+
37+
--- bucket_unrefrw calls vshard.storage.bucket_unrefrw if safe mode enabled
38+
--- otherwise returns true
39+
--- must be called with bucket_refrw in transaction
40+
function M.bucket_unrefrw(bucket_id)
41+
if not M.safe_mode_status() then
42+
return true
43+
end
44+
return vshard.storage.bucket_unrefrw(bucket_id)
45+
end
46+
47+
--- bucket_refro calls vshard.storage.bucket_refro if safe mode enabled
48+
--- otherwise returns true
49+
function M.bucket_refro(bucket_id)
50+
if not M.safe_mode_status() then
51+
return true
52+
end
53+
return vshard.storage.bucket_refro(bucket_id)
54+
end
55+
56+
--- bucket_unrefro calls vshard.storage.bucket_unrefro if safe mode enabled
57+
--- otherwise returns true
58+
function M.bucket_unrefro(bucket_id)
59+
if not M.safe_mode_status() then
60+
return true
61+
end
62+
return vshard.storage.bucket_unrefro(bucket_id)
63+
end
64+
65+
--- bucket_unrefro calls vshard.storage.bucket_unrefrw for every bucket if safe mode enabled
66+
--- otherwise returns true
67+
--- @param bucket_ids table<number, true>
68+
function M.bucket_unrefrw_many(bucket_ids)
69+
if not M.safe_mode_status() then
70+
return true
71+
end
72+
73+
local unref_all_ok = true
74+
local unref_last_err
75+
for reffed_bucket_id in pairs(bucket_ids) do
76+
local unref_ok, unref_err = M.bucket_unrefrw(reffed_bucket_id)
77+
if not unref_ok then
78+
unref_all_ok = nil
79+
unref_last_err = unref_err
80+
end
81+
end
82+
83+
if not unref_all_ok then
84+
return nil, unref_last_err
85+
end
86+
return true
87+
end
88+
89+
return M

crud/delete.lua

Lines changed: 26 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ local sharding_key_module = require('crud.common.sharding.sharding_key')
99
local sharding_metadata_module = require('crud.common.sharding.sharding_metadata')
1010
local dev_checks = require('crud.common.dev_checks')
1111
local schema = require('crud.common.schema')
12+
local bucket_ref_unref = require('crud.common.sharding.bucket_ref_unref')
1213

1314
local DeleteError = errors.new_class('DeleteError', {capture_stack = false})
1415

@@ -19,6 +20,7 @@ local CRUD_DELETE_FUNC_NAME = utils.get_storage_call(DELETE_FUNC_NAME)
1920

2021
local function delete_on_storage(space_name, key, field_names, opts)
2122
dev_checks('string', '?', '?table', {
23+
bucket_id = 'number|cdata',
2224
sharding_key_hash = '?number',
2325
sharding_func_hash = '?number',
2426
skip_sharding_hash_check = '?boolean',
@@ -42,14 +44,29 @@ local function delete_on_storage(space_name, key, field_names, opts)
4244
return nil, err
4345
end
4446

45-
-- add_space_schema_hash is false because
46-
-- reloading space format on router can't avoid delete error on storage
47-
return schema.wrap_box_space_func_result(space, 'delete', {key}, {
48-
add_space_schema_hash = false,
49-
field_names = field_names,
50-
noreturn = opts.noreturn,
51-
fetch_latest_metadata = opts.fetch_latest_metadata,
52-
})
47+
local function make_delete()
48+
local ref_ok, err_ref = bucket_ref_unref.bucket_refrw(opts.bucket_id)
49+
if not ref_ok then
50+
return nil, err_ref
51+
end
52+
53+
-- add_space_schema_hash is false because
54+
-- reloading space format on router can't avoid delete error on storage
55+
local result = schema.wrap_box_space_func_result(space, 'delete', {key}, {
56+
add_space_schema_hash = false,
57+
field_names = field_names,
58+
noreturn = opts.noreturn,
59+
fetch_latest_metadata = opts.fetch_latest_metadata,
60+
})
61+
local unref_ok, err_unref = bucket_ref_unref.bucket_unrefrw(opts.bucket_id)
62+
if not unref_ok then
63+
return nil, err_unref
64+
end
65+
66+
return result
67+
end
68+
69+
return box.atomic(make_delete)
5370
end
5471

5572
delete.storage_api = {[DELETE_FUNC_NAME] = delete_on_storage}
@@ -116,6 +133,7 @@ local function call_delete_on_router(vshard_router, space_name, key, opts)
116133
sharding.fill_bucket_id_pk(space, key, bucket_id_data.bucket_id)
117134

118135
local delete_on_storage_opts = {
136+
bucket_id = bucket_id_data.bucket_id,
119137
sharding_func_hash = bucket_id_data.sharding_func_hash,
120138
sharding_key_hash = sharding_key_hash,
121139
skip_sharding_hash_check = skip_sharding_hash_check,

crud/get.lua

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ local sharding_key_module = require('crud.common.sharding.sharding_key')
99
local sharding_metadata_module = require('crud.common.sharding.sharding_metadata')
1010
local dev_checks = require('crud.common.dev_checks')
1111
local schema = require('crud.common.schema')
12+
local bucket_ref_unref = require('crud.common.sharding.bucket_ref_unref')
1213

1314
local GetError = errors.new_class('GetError', {capture_stack = false})
1415

@@ -19,6 +20,7 @@ local CRUD_GET_FUNC_NAME = utils.get_storage_call(GET_FUNC_NAME)
1920

2021
local function get_on_storage(space_name, key, field_names, opts)
2122
dev_checks('string', '?', '?table', {
23+
bucket_id = 'number|cdata',
2224
sharding_key_hash = '?number',
2325
sharding_func_hash = '?number',
2426
skip_sharding_hash_check = '?boolean',
@@ -41,13 +43,23 @@ local function get_on_storage(space_name, key, field_names, opts)
4143
return nil, err
4244
end
4345

46+
local ref_ok, err_ref = bucket_ref_unref.bucket_refro(opts.bucket_id)
47+
if not ref_ok then
48+
return nil, err_ref
49+
end
4450
-- add_space_schema_hash is false because
4551
-- reloading space format on router can't avoid get error on storage
46-
return schema.wrap_box_space_func_result(space, 'get', {key}, {
52+
local result = schema.wrap_box_space_func_result(space, 'get', {key}, {
4753
add_space_schema_hash = false,
4854
field_names = field_names,
4955
fetch_latest_metadata = opts.fetch_latest_metadata,
5056
})
57+
local unref_ok, err_unref = bucket_ref_unref.bucket_unrefro(opts.bucket_id)
58+
if not unref_ok then
59+
return nil, err_unref
60+
end
61+
62+
return result
5163
end
5264

5365
get.storage_api = {[GET_FUNC_NAME] = get_on_storage}
@@ -114,6 +126,7 @@ local function call_get_on_router(vshard_router, space_name, key, opts)
114126
sharding.fill_bucket_id_pk(space, key, bucket_id_data.bucket_id)
115127

116128
local get_on_storage_opts = {
129+
bucket_id = bucket_id_data.bucket_id,
117130
sharding_func_hash = bucket_id_data.sharding_func_hash,
118131
sharding_key_hash = sharding_key_hash,
119132
skip_sharding_hash_check = skip_sharding_hash_check,

crud/replace.lua

Lines changed: 24 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ local utils = require('crud.common.utils')
77
local sharding = require('crud.common.sharding')
88
local dev_checks = require('crud.common.dev_checks')
99
local schema = require('crud.common.schema')
10+
local bucket_ref_unref = require('crud.common.sharding.bucket_ref_unref')
1011

1112
local ReplaceError = errors.new_class('ReplaceError', { capture_stack = false })
1213

@@ -42,15 +43,29 @@ local function replace_on_storage(space_name, tuple, opts)
4243
return nil, err
4344
end
4445

45-
-- add_space_schema_hash is true only in case of replace_object
46-
-- the only one case when reloading schema can avoid insert error
47-
-- is flattening object on router
48-
return schema.wrap_box_space_func_result(space, 'replace', {tuple}, {
49-
add_space_schema_hash = opts.add_space_schema_hash,
50-
field_names = opts.fields,
51-
noreturn = opts.noreturn,
52-
fetch_latest_metadata = opts.fetch_latest_metadata,
53-
})
46+
local function make_replace()
47+
local bucket_id = tuple[utils.get_bucket_id_fieldno(space)]
48+
local ref_ok, err_ref = bucket_ref_unref.bucket_refrw(bucket_id)
49+
if not ref_ok then
50+
return nil, err_ref
51+
end
52+
-- add_space_schema_hash is true only in case of replace_object
53+
-- the only one case when reloading schema can avoid insert error
54+
-- is flattening object on router
55+
local result = schema.wrap_box_space_func_result(space, 'replace', {tuple}, {
56+
add_space_schema_hash = opts.add_space_schema_hash,
57+
field_names = opts.fields,
58+
noreturn = opts.noreturn,
59+
fetch_latest_metadata = opts.fetch_latest_metadata,
60+
})
61+
local unref_ok, err_unref = bucket_ref_unref.bucket_unrefrw(bucket_id)
62+
if not unref_ok then
63+
return nil, err_unref
64+
end
65+
return result
66+
end
67+
68+
return box.atomic(make_replace)
5469
end
5570

5671
replace.storage_api = {[REPLACE_FUNC_NAME] = replace_on_storage}

crud/storage.lua

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ local count = require('crud.count')
2323
local borders = require('crud.borders')
2424
local readview = require('crud.readview')
2525
local storage_info = require('crud.storage_info')
26+
local bucket_ref_unref = require('crud.common.sharding.bucket_ref_unref')
2627

2728
local storage = {}
2829

@@ -106,6 +107,7 @@ local function init_impl()
106107
end
107108

108109
rebalance.init()
110+
bucket_ref_unref.set_safe_mode_status(rebalance.safe_mode_status)
109111

110112
for _, module in ipairs(modules_with_storage_api) do
111113
init_storage_call(user, module.storage_api)

0 commit comments

Comments
 (0)