diff --git a/crud.lua b/crud.lua index 60301163..2a5dcf95 100644 --- a/crud.lua +++ b/crud.lua @@ -23,6 +23,7 @@ local readview = require('crud.readview') local schema = require('crud.schema') local storage_info = require('crud.storage_info') local storage = require('crud.storage') +local rebalance = require('crud.common.rebalance') local crud = {} @@ -158,8 +159,23 @@ crud.readview = readview.new -- @function schema crud.schema = schema.call +crud.rebalance = {} + +-- @refer rebalance.router_cache_clear +-- @function router_cache_clear +crud.rebalance.router_cache_clear = rebalance.router.cache_clear + +-- @refer rebalance.router_cache_length +-- @function router_cache_length +crud.rebalance.router_cache_length = rebalance.router.cache_length + +-- @refer rebalance.router_cache_last_clear_ts +-- @function router_cache_last_clear_ts +crud.rebalance.router_cache_last_clear_ts = rebalance.router.cache_last_clear_ts + function crud.init_router() - rawset(_G, 'crud', crud) + rawset(_G, 'crud', crud) + rebalance.metrics.enable_router_metrics() end function crud.stop_router() diff --git a/crud/common/call.lua b/crud/common/call.lua index 5887923f..66616bb9 100644 --- a/crud/common/call.lua +++ b/crud/common/call.lua @@ -1,11 +1,13 @@ local errors = require('errors') +local vshard = require('vshard') local call_cache = require('crud.common.call_cache') local dev_checks = require('crud.common.dev_checks') local utils = require('crud.common.utils') local sharding_utils = require('crud.common.sharding.utils') -local fiber_clock = require('fiber').clock +local fiber = require('fiber') local const = require('crud.common.const') +local rebalance = require('crud.common.rebalance') local BaseIterator = require('crud.common.map_call_cases.base_iter') local BasePostprocessor = require('crud.common.map_call_cases.base_postprocessor') @@ -14,14 +16,79 @@ local CallError = errors.new_class('CallError') local CALL_FUNC_NAME = 'call_on_storage' local CRUD_CALL_FUNC_NAME = utils.get_storage_call(CALL_FUNC_NAME) - +local CRUD_CALL_FIBER_NAME = CRUD_CALL_FUNC_NAME .. '/' local call = {} -local function call_on_storage(run_as_user, func_name, ...) +local function bucket_unref_many(bucket_ids, mode) + local all_ok = true + local last_err = nil + for _, bucket_id in pairs(bucket_ids) do + local ok, err = vshard.storage.bucket_unref(bucket_id, mode) + if not ok then + all_ok = nil + last_err = err + end + end + return all_ok, last_err +end + +local function bucket_ref_many(bucket_ids, mode) + local reffed = {} + for _, bucket_id in pairs(bucket_ids) do + local ok, err = vshard.storage.bucket_ref(bucket_id, mode) + if not ok then + bucket_unref_many(reffed, mode) + return nil, err + end + table.insert(reffed, bucket_id) + end + return true, nil +end + +local function call_on_storage_safe(run_as_user, bucket_ids, mode, func_name, ...) + fiber.name(CRUD_CALL_FIBER_NAME .. 'safe/' .. func_name) + + local ok, ref_err = bucket_ref_many(bucket_ids, mode) + if not ok then + return nil, ref_err + end + + local res = {box.session.su(run_as_user, call_cache.func_name_to_func(func_name), ...)} + + ok, ref_err = bucket_unref_many(bucket_ids, mode) + if not ok then + return nil, ref_err + end + + return unpack(res, 1, table.maxn(res)) +end + +local function call_on_storage_fast(run_as_user, _, _, func_name, ...) + fiber.name(CRUD_CALL_FIBER_NAME .. 'fast/' .. func_name) + return box.session.su(run_as_user, call_cache.func_name_to_func(func_name), ...) end +local call_on_storage = rebalance.safe_mode and call_on_storage_safe or call_on_storage_fast + +local function safe_mode_enable() + call_on_storage = call_on_storage_safe + + for fb_id, fb in pairs(fiber.info()) do + if string.find(fb.name, CRUD_CALL_FIBER_NAME) then + fiber.kill(fb_id) + end + end +end + +local function safe_mode_disable() + call_on_storage = call_on_storage_fast +end + +rebalance.register_safe_mode_enable_hook(safe_mode_enable) +rebalance.register_safe_mode_disable_hook(safe_mode_disable) + call.storage_api = {[CALL_FUNC_NAME] = call_on_storage} function call.get_vshard_call_name(mode, prefer_replica, balance) @@ -82,8 +149,10 @@ local function wrap_vshard_err(vshard_router, err, func_name, replicaset_id, buc )) end -local function retry_call_with_master_discovery(replicaset, method, func_name, func_args, call_opts) - local func_args_ext = utils.append_array({ box.session.effective_user(), func_name }, func_args) +local function retry_call_with_master_discovery(vshard_router, replicaset, + method, func_name, func_args, + call_opts, mode, bucket_ids) + local func_args_ext = utils.append_array({ box.session.effective_user(), bucket_ids, mode, func_name }, func_args) -- In case cluster was just bootstrapped with auto master discovery, -- replicaset may miss master. @@ -93,7 +162,20 @@ local function retry_call_with_master_discovery(replicaset, method, func_name, f return resp, err end - if err.name == 'MISSING_MASTER' and replicaset.locate_master ~= nil then + -- This is a partial copy of error handling from vshard.router.router_call_impl() + -- It is much simpler mostly because bucket_set() can't be accessed from outside vshard. + if err.name == 'WRONG_BUCKET' or + err.name == 'BUCKET_IS_LOCKED' or + err.name == 'TRANSFER_IS_IN_PROGRESS' then + vshard_router:_bucket_reset(err.bucket_id) + + -- Substitute replicaset only for single bucket_id calls. + if err.destination and vshard_router.replicasets[err.destination] and #bucket_ids == 1 then + replicaset = vshard_router.replicasets[err.destination] + else + return nil, err + end + elseif err.name == 'MISSING_MASTER' and replicaset.locate_master ~= nil then replicaset:locate_master() end @@ -145,10 +227,10 @@ function call.map(vshard_router, func_name, func_args, opts) request_timeout = opts.mode == 'read' and opts.request_timeout or nil, } while iter:has_next() do - local args, replicaset, replicaset_id = iter:get() + local args, replicaset, replicaset_id, bucket_ids = iter:get() - local future, err = retry_call_with_master_discovery(replicaset, vshard_call_name, - func_name, args, call_opts) + local future, err = retry_call_with_master_discovery(vshard_router, replicaset, vshard_call_name, + func_name, args, call_opts, opts.mode, bucket_ids) if err ~= nil then local result_info = { @@ -170,9 +252,9 @@ function call.map(vshard_router, func_name, func_args, opts) futures_by_replicasets[replicaset_id] = future end - local deadline = fiber_clock() + timeout + local deadline = fiber.clock() + timeout for replicaset_id, future in pairs(futures_by_replicasets) do - local wait_timeout = deadline - fiber_clock() + local wait_timeout = deadline - fiber.clock() if wait_timeout < 0 then wait_timeout = 0 end @@ -221,9 +303,9 @@ function call.single(vshard_router, bucket_id, func_name, func_args, opts) local timeout = opts.timeout or const.DEFAULT_VSHARD_CALL_TIMEOUT local request_timeout = opts.mode == 'read' and opts.request_timeout or nil - local res, err = retry_call_with_master_discovery(replicaset, vshard_call_name, - func_name, func_args, {timeout = timeout, - request_timeout = request_timeout}) + local res, err = retry_call_with_master_discovery(vshard_router, replicaset, vshard_call_name, + func_name, func_args, {timeout = timeout, request_timeout = request_timeout}, + opts.mode, {bucket_id}) if err ~= nil then return nil, wrap_vshard_err(vshard_router, err, func_name, nil, bucket_id) end @@ -248,8 +330,9 @@ function call.any(vshard_router, func_name, func_args, opts) end local replicaset_id, replicaset = next(replicasets) - local res, err = retry_call_with_master_discovery(replicaset, 'call', - func_name, func_args, {timeout = timeout}) + local res, err = retry_call_with_master_discovery(vshard_router, replicaset, 'call', + func_name, func_args, {timeout = timeout}, + 'read', {}) if err ~= nil then return nil, wrap_vshard_err(vshard_router, err, func_name, replicaset_id) end diff --git a/crud/common/map_call_cases/base_iter.lua b/crud/common/map_call_cases/base_iter.lua index 452e1599..e52c8eb4 100644 --- a/crud/common/map_call_cases/base_iter.lua +++ b/crud/common/map_call_cases/base_iter.lua @@ -67,12 +67,13 @@ end -- @return[1] table func_args -- @return[2] table replicaset -- @return[3] string replicaset_id +-- @return[4] table bucket_ids function BaseIterator:get() local replicaset_id = self.next_index local replicaset = self.next_replicaset self.next_index, self.next_replicaset = next(self.replicasets, self.next_index) - return self.func_args, replicaset, replicaset_id + return self.func_args, replicaset, replicaset_id, {} end return BaseIterator diff --git a/crud/common/map_call_cases/batch_insert_iter.lua b/crud/common/map_call_cases/batch_insert_iter.lua index 37867f1b..2c22a798 100644 --- a/crud/common/map_call_cases/batch_insert_iter.lua +++ b/crud/common/map_call_cases/batch_insert_iter.lua @@ -68,6 +68,7 @@ end -- @return[1] table func_args -- @return[2] table replicaset -- @return[3] string replicaset_id +-- @return[4] table bucket_ids function BatchInsertIterator:get() local replicaset_id = self.next_index local replicaset = self.next_batch.replicaset @@ -76,10 +77,11 @@ function BatchInsertIterator:get() self.next_batch.tuples, self.opts, } + local bucket_ids = self.next_batch.bucket_ids self.next_index, self.next_batch = next(self.batches_by_replicasets, self.next_index) - return func_args, replicaset, replicaset_id + return func_args, replicaset, replicaset_id, bucket_ids end return BatchInsertIterator diff --git a/crud/common/map_call_cases/batch_upsert_iter.lua b/crud/common/map_call_cases/batch_upsert_iter.lua index d25e3ee8..658db9d4 100644 --- a/crud/common/map_call_cases/batch_upsert_iter.lua +++ b/crud/common/map_call_cases/batch_upsert_iter.lua @@ -76,6 +76,7 @@ end -- @return[1] table func_args -- @return[2] table replicaset -- @return[3] string replicaset_id +-- @return[4] table bucket_ids function BatchUpsertIterator:get() local replicaset_id = self.next_index local replicaset = self.next_batch.replicaset @@ -85,10 +86,11 @@ function BatchUpsertIterator:get() self.next_batch.operations, self.opts, } + local bucket_ids = self.next_batch.bucket_ids self.next_index, self.next_batch = next(self.batches_by_replicasets, self.next_index) - return func_args, replicaset, replicaset_id + return func_args, replicaset, replicaset_id, bucket_ids end return BatchUpsertIterator diff --git a/crud/common/rebalance.lua b/crud/common/rebalance.lua new file mode 100644 index 00000000..2e6e0ccb --- /dev/null +++ b/crud/common/rebalance.lua @@ -0,0 +1,211 @@ +local fiber = require('fiber') +local log = require('log') +local vshard_consts = require('vshard.consts') +local utils = require('crud.common.utils') + +local has_metrics_module, metrics = pcall(require, 'metrics') + +local SETTINGS_SPACE_NAME = '_crud_settings' +local SAFE_MOD_ENABLE_EVENT = '_crud.safe_mode_enable' + +local M = { + safe_mode = false, + safe_mode_enable_hooks = {}, + safe_mode_disable_hooks = {}, + _router_cache_last_clear_ts = fiber.time() +} + +local function create_space() + local settings_space = box.schema.space.create(SETTINGS_SPACE_NAME, { + engine = 'memtx', + format = { + { name = 'key', type = 'string' }, + { name = 'value', type = 'any' }, + }, + if_not_exists = true, + }) + settings_space:create_index('primary', { parts = { 'key' }, if_not_exists = true }) +end + +local function safe_mode_trigger(_, new, space, op) + if space ~= '_bucket' then + return + end + if (op == 'INSERT' and new.status == vshard_consts.BUCKET.RECEIVING) or + (op == 'REPLACE' and new.status == vshard_consts.BUCKET.SENDING) then + box.broadcast(SAFE_MOD_ENABLE_EVENT, true) + end +end + +local function register_enable_hook(func) + M.safe_mode_enable_hooks[func] = true +end + +local function remove_enable_hook(func) + M.safe_mode_enable_hooks[func] = nil +end + +local function register_disable_hook(func) + M.safe_mode_disable_hooks[func] = true +end + +local function remove_disable_hook(func) + M.safe_mode_disable_hooks[func] = nil +end + +local function safe_mode_status() + return M.safe_mode +end + +local function safe_mode_enable() + if not box.info.ro then + box.space[SETTINGS_SPACE_NAME]:replace{ 'safe_mode', true } + for _, trig in pairs(box.space._bucket:on_replace()) do + if trig == safe_mode_trigger then + box.space._bucket:on_replace(nil, safe_mode_trigger) + end + end + end + M.safe_mode = true + + for hook, _ in pairs(M.safe_mode_enable_hooks) do + hook() + end + + log.info('Rebalance safe mode enabled') +end + +local function safe_mode_disable() + if not box.info.ro then + box.space[SETTINGS_SPACE_NAME]:replace{ 'safe_mode', false } + box.space._bucket:on_replace(safe_mode_trigger) + end + M.safe_mode = false + + for hook, _ in pairs(M.safe_mode_disable_hooks) do + hook() + end + + log.info('Rebalance safe mode disabled') +end + +local function rebalance_init() + M.metrics.enable_storage_metrics() + + -- box.watch was introduced in tarantool 2.10.0 + if not utils.tarantool_supports_box_watch() then + log.warn('This version of tarantool does not support autoswitch to safe mode during rebalance. ' + .. 'Update to newer version or use `_crud.rebalance_safe_mode_enable()` to enable safe mode manually.') + return + end + + box.watch('box.status', function() + if box.info.ro then + return + end + + local stored_safe_mode + if box.space[SETTINGS_SPACE_NAME] == nil then + create_space() + box.space[SETTINGS_SPACE_NAME]:insert{ 'safe_mode', false } + else + stored_safe_mode = box.space[SETTINGS_SPACE_NAME]:get{ 'safe_mode' } + end + M.safe_mode = stored_safe_mode and stored_safe_mode.value or false + + if M.safe_mode then + for hook, _ in pairs(M.safe_mode_enable_hooks) do + hook() + end + else + box.space._bucket:on_replace(safe_mode_trigger) + for hook, _ in pairs(M.safe_mode_disable_hooks) do + hook() + end + end + end) + + box.watch(SAFE_MOD_ENABLE_EVENT, function(_, do_enable) + if box.info.ro or not do_enable then + return + end + safe_mode_enable() + end) +end + +local function router_cache_clear() + M._router_cache_last_clear_ts = fiber.time() + return utils.get_vshard_router_instance():_route_map_clear() +end + +local function router_cache_length() + return utils.get_vshard_router_instance().known_bucket_count +end + +local function router_cache_last_clear_ts() + return M._router_cache_last_clear_ts +end + +-- Rebalance related metrics +local function enable_storage_metrics() + if not has_metrics_module then + return + end + + local safe_mode_enabled_gauge = metrics.gauge( + 'tnt_crud_storage_safe_mode_enabled', + "is safe mode enabled on this storage instance" + ) + + metrics.register_callback(function() + safe_mode_enabled_gauge:set(safe_mode_status() and 1 or 0) + end) +end + +local function enable_router_metrics() + if not has_metrics_module then + return + end + + local router_cache_length_gauge = metrics.gauge( + 'tnt_crud_router_cache_length', + "number of bucket routes in vshard router cache" + ) + local router_cache_last_clear_ts_gauge = metrics.gauge( + 'tnt_crud_router_cache_last_clear_ts', + "when vshard router cache was cleared last time" + ) + + metrics.register_callback(function() + router_cache_length_gauge:set(router_cache_length()) + router_cache_last_clear_ts_gauge:set(router_cache_last_clear_ts()) + end) +end + +M.init = rebalance_init +M.safe_mode_status = safe_mode_status +M.safe_mode_enable = safe_mode_enable +M.safe_mode_disable = safe_mode_disable +M.register_safe_mode_enable_hook = register_enable_hook +M.remove_safe_mode_enable_hook = remove_enable_hook +M.register_safe_mode_disable_hook = register_disable_hook +M.remove_safe_mode_disable_hook = remove_disable_hook + +M.router = { + cache_clear = router_cache_clear, + cache_length = router_cache_length, + cache_last_clear_ts = router_cache_last_clear_ts, +} + +M.storage_api = { + rebalance_safe_mode_status = safe_mode_status, + rebalance_safe_mode_enable = safe_mode_enable, + rebalance_safe_mode_disable = safe_mode_disable, +} + +M.metrics = { + enable_storage_metrics = enable_storage_metrics, + enable_router_metrics = enable_router_metrics, +} + +return M diff --git a/crud/common/sharding/init.lua b/crud/common/sharding/init.lua index 02bb6beb..83314391 100644 --- a/crud/common/sharding/init.lua +++ b/crud/common/sharding/init.lua @@ -324,8 +324,10 @@ function sharding.split_tuples_by_replicaset(vshard_router, tuples, space, opts) local record_by_replicaset = batches[replicaset_id] or { replicaset = replicaset, tuples = {}, + bucket_ids = {}, } table.insert(record_by_replicaset.tuples, tuple) + record_by_replicaset.bucket_ids[sharding_data.bucket_id] = true if opts.operations ~= nil then record_by_replicaset.operations = record_by_replicaset.operations or {} @@ -335,6 +337,14 @@ function sharding.split_tuples_by_replicaset(vshard_router, tuples, space, opts) batches[replicaset_id] = record_by_replicaset end + for _, rbr in pairs(batches) do + local bucket_ids = {} + for bid, _ in pairs(rbr.bucket_ids) do + table.insert(bucket_ids, bid) + end + rbr.bucket_ids = bucket_ids + end + return { batches = batches, sharding_func_hash = sharding_func_hash, diff --git a/crud/schema.lua b/crud/schema.lua index 57743ba9..0a6d0473 100644 --- a/crud/schema.lua +++ b/crud/schema.lua @@ -46,6 +46,8 @@ schema.system_spaces = { ['_tt_migrations'] = true, -- https://github.com/tarantool/cluster-federation/blob/01738cafa0dc7a3138e64f93c4e84cb323653257/src/internal/utils/utils.go#L17 ['_cdc_state'] = true, + -- crud/common/rebalance.lua + ['_crud_settings'] = true, } local function get_crud_schema(space) diff --git a/crud/select/merger.lua b/crud/select/merger.lua index 1fd75320..48dfe1e4 100644 --- a/crud/select/merger.lua +++ b/crud/select/merger.lua @@ -171,7 +171,11 @@ local function fetch_chunk(context, state) -- change context.func_args too, but it does not matter next_func_args[4].after_tuple = cursor.after_tuple - local func_args_ext = utils.append_array({ box.session.effective_user(), func_name }, next_func_args) + local mode = "read" + local bucket_ids = {} + local func_args_ext = utils.append_array( + { box.session.effective_user(), bucket_ids, mode, func_name }, + next_func_args) if context.readview then next_state = {future = context.future_replica.conn:call("_crud.call_on_storage", @@ -203,7 +207,10 @@ local function new(vshard_router, replicasets, space, index_id, func_name, func_ local buf = buffer.ibuf() local net_box_opts = {is_async = true, buffer = buf, skip_header = utils.tarantool_supports_netbox_skip_header_option() or nil} - local func_args_ext = utils.append_array({ box.session.effective_user(), func_name }, func_args) + local bucket_ids = {} + local func_args_ext = utils.append_array( + { box.session.effective_user(), bucket_ids, mode, func_name }, + func_args) local future = replicaset[vshard_call_name](replicaset, "_crud.call_on_storage", func_args_ext, net_box_opts) @@ -279,8 +286,13 @@ local function new_readview(vshard_router, replicasets, readview_info, space, in local net_box_opts = {is_async = true, buffer = buf, skip_header = utils.tarantool_supports_netbox_skip_header_option() or nil} func_args[4].readview_id = replicaset_info.id - local func_args_ext = utils.append_array({ box.session.effective_user(), func_name }, func_args) - local future = replica.conn:call("_crud.call_on_storage", func_args_ext, net_box_opts) + local mode = "read" + local bucket_ids = {} + local func_args_ext = utils.append_array( + { box.session.effective_user(), bucket_ids, mode, func_name }, + func_args) + local future = replica.conn:call("_crud.call_on_storage", + func_args_ext, net_box_opts) -- Create a source. local context = { diff --git a/crud/storage.lua b/crud/storage.lua index 0b8ef770..c6356008 100644 --- a/crud/storage.lua +++ b/crud/storage.lua @@ -4,6 +4,7 @@ local dev_checks = require('crud.common.dev_checks') local stash = require('crud.common.stash') local utils = require('crud.common.utils') +local rebalance = require('crud.common.rebalance') local call = require('crud.common.call') local sharding_metadata = require('crud.common.sharding.sharding_metadata') local insert = require('crud.insert') @@ -62,6 +63,7 @@ local function init_storage_call(user, storage_api) end local modules_with_storage_api = { + rebalance, call, sharding_metadata, insert, @@ -103,6 +105,8 @@ local function init_impl() user = utils.get_this_replica_user() or 'guest' end + rebalance.init() + for _, module in ipairs(modules_with_storage_api) do init_storage_call(user, module.storage_api) end @@ -141,7 +145,6 @@ function storage.stop() internal_stash.watcher:unregister() internal_stash.watcher = nil end - rawset(_G, utils.STORAGE_NAMESPACE, nil) end diff --git a/test/helper.lua b/test/helper.lua index 9aa127d1..eae17e8c 100644 --- a/test/helper.lua +++ b/test/helper.lua @@ -981,6 +981,16 @@ function helpers.start_cluster(g, cartridge_cfg, vshard_cfg, tarantool3_cluster_ error(err) end end + + if g.params and g.params.safe_mode ~= nil then + local safe_mod_func = '_crud.rebalance_safe_mode_disable' + if g.params.safe_mode then + safe_mod_func = '_crud.rebalance_safe_mode_enable' + end + helpers.call_on_storages(g.cluster, function(server) + server:call(safe_mod_func) + end) + end end local function count_storages_in_topology(g, backend, vshard_group, storage_roles) @@ -1178,8 +1188,34 @@ function helpers.is_cartridge_suite_supported() return is_module_provided and is_tarantool_supports end -function helpers.backend_matrix(base_matrix) +function helpers.safe_mode_matrix(base_matrix) + base_matrix = base_matrix or {{}} + + local safe_mode_params = { + { safe_mode = true }, + { safe_mode = false }, + } + + local matrix = {} + for _, params in ipairs(safe_mode_params) do + for _, base in ipairs(base_matrix) do + base = table.deepcopy(base) + base.safe_mode = params.safe_mode + table.insert(matrix, base) + end + end + + return matrix +end + +function helpers.backend_matrix(base_matrix, opts) base_matrix = base_matrix or {{}} + opts = opts or {} + + if not opts.skip_safe_mode then + base_matrix = helpers.safe_mode_matrix(base_matrix) + end + local backend_params = { { backend = helpers.backend.VSHARD, diff --git a/test/integration/double_buckets_test.lua b/test/integration/double_buckets_test.lua new file mode 100644 index 00000000..2c3922cf --- /dev/null +++ b/test/integration/double_buckets_test.lua @@ -0,0 +1,316 @@ +local t = require('luatest') +local json = require('json') +local fiber = require('fiber') + +local utils = require('crud.common.utils') + +local helpers = require('test.helper') + +local function wait_balance(g) + t.helpers.retrying({timeout=30}, function() + local buckets_count_s1 = g.cluster:server('s1-master').net_box:eval("return box.space._bucket:len()") + local buckets_count_s2 = g.cluster:server('s2-master').net_box:eval("return box.space._bucket:len()") + t.assert_equals(buckets_count_s1, 1500) + t.assert_equals(buckets_count_s2, 1500) + end) +end + +local function balance_cluster(g) + if g.params.backend == "config" then + local cfg = g.cluster:cfg() + cfg.groups.storages.replicasets["s-1"].sharding = { + weight = 1, + } + cfg.groups.storages.replicasets["s-2"].sharding = { + weight = 1, + } + g.cluster:cfg(cfg) + wait_balance(g) + end +end + +local pgroup_duplicates = t.group('double_buckets_duplicates', helpers.backend_matrix({ + {engine = 'memtx', operation = 'replace'}, + {engine = 'memtx', operation = 'insert'}, + {engine = 'memtx', operation = 'upsert'}, + {engine = 'memtx', operation = 'insert_many'}, + {engine = 'memtx', operation = 'replace_many'}, + {engine = 'memtx', operation = 'upsert_many'}, +})) + +pgroup_duplicates.before_all(function(g) + helpers.start_default_cluster(g, 'srv_simple_operations') +end) + +pgroup_duplicates.after_all(function(g) + helpers.stop_cluster(g.cluster, g.params.backend) +end) + +pgroup_duplicates.before_each(function(g) + helpers.truncate_space_on_cluster(g.cluster, 'customers') +end) + +pgroup_duplicates.after_each(function(g) + balance_cluster(g) +end) + +--- Rebalance stalls if we move all buckets at once; use a small subset. +local test_tuples = { + {22, box.NULL, 'Alex', 34}, + -- {92, box.NULL, 'Artur', 29}, + -- {3, box.NULL, 'Anastasia', 22}, + -- {5, box.NULL, 'Sergey', 25}, + -- {9, box.NULL, 'Anna', 30}, + -- {71, box.NULL, 'Oksana', 29}, +} + +local last_call = fiber.time() +local duplicate_operations = { + insert = function(g) + return g.router:call('crud.insert', {'customers', {45, box.NULL, 'John Fedor', 42}}) + end, + replace = function(g) + return g.router:call('crud.replace', {'customers', {45, box.NULL, 'John Fedor', 42}}) + end, + upsert = function (g) + return g.router:call('crud.upsert', {'customers', {45, box.NULL, 'John Fedor', 42}, {{'+', 'age', 1}}}) + end, + insert_many = function(g) + if fiber.time() - last_call < 1 then + return + end + last_call = fiber.time() + return g.router:call('crud.insert_many', {'customers', test_tuples}) + end, + replace_many = function(g) + if fiber.time() - last_call < 1 then + return + end + last_call = fiber.time() + return g.router:call('crud.replace_many', {'customers', test_tuples}) + end, + upsert_many = function(g) + if fiber.time() - last_call < 1 then + return + end + last_call = fiber.time() + local tuples = {} + for i = 1, 2 do + tuples[i] = {{i, box.NULL, 'John Fedor', 42}, {{'+', 'age', 1}}} + end + return g.router:call('crud.upsert_many', {'customers', tuples}) + end +} + +local function check_duplicates(tuples) + local ids = {} + for _, tuple in pairs(tuples) do + t.assert_equals(ids[tuple[1]], nil, ('duplicate to tuple: %s'):format(json.encode(tuple))) + ids[tuple[1]] = true + end +end + + +--- write requests cause duplicates by primary key in cluster +pgroup_duplicates.test_duplicates = function(g) + t.skip_if( + not ( + utils.tarantool_version_at_least(3, 1) and (g.params.backend == "config") + ), + 'test implemented only for 3.1 and greater' + ) + if g.params.backend == "config" then + t.xfail('not implemented yet') + duplicate_operations[g.params.operation](g) + + local cfg = g.cluster:cfg() + cfg.groups.storages.replicasets["s-1"].sharding = { + weight = 0, + } + g.cluster:cfg(cfg) + t.helpers.retrying({timeout=30}, function() + local buckets_count = g.cluster:server('s1-master').net_box:eval("return box.space._bucket:len()") + duplicate_operations[g.params.operation](g) + t.assert_equals(buckets_count, 0) + end) + + cfg.groups.storages.replicasets["s-2"].sharding = { + weight = 0, + } + cfg.groups.storages.replicasets["s-1"].sharding = { + weight = 1, + } + g.cluster:cfg(cfg) + t.helpers.retrying({timeout=30}, function() + local buckets_count = g.cluster:server('s2-master').net_box:eval("return box.space._bucket:len()") + duplicate_operations[g.params.operation](g) + t.assert_equals(buckets_count, 0) + end) + + local res = g.router:call('crud.select', {'customers'}) + check_duplicates(res.rows) + end +end + +local pgroup_not_applied = t.group('double_buckets_not_applied', helpers.backend_matrix({ + {engine = 'memtx', operation = 'delete'}, + {engine = 'memtx', operation = 'update'}, + {engine = 'memtx', operation = 'get'}, +})) + +pgroup_not_applied.before_all(function(g) + helpers.start_default_cluster(g, 'srv_simple_operations') +end) + +pgroup_not_applied.after_all(function(g) + helpers.stop_cluster(g.cluster, g.params.backend) +end) + +pgroup_not_applied.before_each(function(g) + helpers.truncate_space_on_cluster(g.cluster, 'customers') +end) + +pgroup_not_applied.after_each(function(g) + balance_cluster(g) +end) + +local not_applied_operations = { + delete = { + call = function(g, key) + last_call = fiber.time() + return g.router:call('crud.delete', { 'customers', {key} }) + end, + check_applied = function(rows, applied_ids) + for _, tuple in pairs(rows) do + t.assert_equals( + applied_ids[tuple[1]], + nil, + ('tuples %s was marked as deleted, but exists'):format(json.encode(tuple)) + ) + end + end, + check_not_applied = function(not_applied_ids) + t.assert_equals( + next(not_applied_ids), + nil, + 'tuples were inserted, but crud.delete returned 0 rows, as if there were no such tuples' + ) + end + }, + update = { + call = function(g, key) + return g.router:call('crud.update', { 'customers', key, {{'=', 'name', 'applied'}} }) + end, + check_applied = function(rows, applied_ids) + for _, tuple in pairs(rows) do + if applied_ids[tuple[1]] then + t.assert_equals( + tuple[3], + 'applied', + ('tuples %s was marked as updated, but was not updated'):format(json.encode(tuple)) + ) + end + end + end, + check_not_applied = function(not_applied_ids) + t.assert_equals( + next(not_applied_ids), + nil, + 'tuples were created, but crud.update returned 0 rows, as if there were no such tuples' + ) + end + }, + get = { + call = function (g, key) + return g.router:call('crud.get', { 'customers', key }) + end, + check_applied = function() end, + check_not_applied = function(not_applied_ids) + t.assert_equals( + next(not_applied_ids), + nil, + 'tuples were created, but crud.get returned 0 rows, as if there were no such tuples' + ) + end + } +} + +--- Some requests do not create duplicates but return 0 rows as if there is no tuple +--- with this key. The tuple can still exist in cluster but be unavailable during +--- rebalance. CRUD should return an error in this case, not 0 rows as if there were +--- no tuples. +pgroup_not_applied.test_not_applied = function(g) + t.skip_if( + not ( + utils.tarantool_version_at_least(3, 1) and (g.params.backend == "config") + ), + 'test implemented only for 3.1 and greater' + ) + if g.params.backend == "config" then + t.xfail('not implemented yet') + local tuples, tuples_count = {}, 1000 + for i = 1, tuples_count do + tuples[i] = {i, box.NULL, 'John Fedor', 42} + end + + local _, err = g.router:call('crud.replace_many', {'customers', tuples}) + t.assert_equals(err, nil) + local cfg = g.cluster:cfg() + cfg.groups.storages.replicasets["s-1"].sharding = { + weight = 0, + } + g.cluster:cfg(cfg) + local tuple_id = 1 + local not_applied_ids = {} + local applied_ids = {} + t.helpers.retrying({timeout=30}, function() + if tuple_id > tuples_count then + return + end + + local buckets_count = g.cluster:server('s1-master').net_box:eval("return box.space._bucket:len()") + local res, err = not_applied_operations[g.params.operation].call(g, tuple_id) + if err == nil then + if #res.rows == 0 then + not_applied_ids[tuple_id] = true + else + applied_ids[tuple_id] = true + end + tuple_id = tuple_id + 1 + end + + t.assert_equals(buckets_count, 0) + end) + + cfg.groups.storages.replicasets["s-2"].sharding = { + weight = 0, + } + cfg.groups.storages.replicasets["s-1"].sharding = { + weight = 1, + } + g.cluster:cfg(cfg) + t.helpers.retrying({timeout=30}, function() + if tuple_id > tuples_count then + return + end + + local buckets_count = g.cluster:server('s2-master').net_box:eval("return box.space._bucket:len()") + local res, err = not_applied_operations[g.params.operation].call(g, tuple_id) + + if err == nil then + if #res.rows == 0 then + not_applied_ids[tuple_id] = true + else + applied_ids[tuple_id] = true + end + tuple_id = tuple_id + 1 + end + + t.assert_equals(buckets_count, 0) + end) + + local res = g.router:call('crud.select', {'customers'}) + not_applied_operations[g.params.operation].check_applied(res.rows, applied_ids) + not_applied_operations[g.params.operation].check_not_applied(not_applied_ids) + end +end diff --git a/test/integration/metrics_test.lua b/test/integration/metrics_test.lua new file mode 100644 index 00000000..e1dd3778 --- /dev/null +++ b/test/integration/metrics_test.lua @@ -0,0 +1,94 @@ +local helpers = require('test.helper') +local t = require('luatest') + +local pgroup = t.group('metrics_integration', helpers.backend_matrix({ + {engine = 'memtx'}, +})) + +local function before_all(g) + helpers.start_default_cluster(g, 'srv_stats') +end + +local function after_all(g) + helpers.stop_cluster(g.cluster, g.params.backend) +end + +local function before_each(g) + g.router:eval("crud = require('crud')") + helpers.call_on_storages(g.cluster, function(server) + server:call('_crud.rebalance_safe_mode_disable') + end) +end + +pgroup.before_all(before_all) + +pgroup.after_all(after_all) + +pgroup.before_each(before_each) + +pgroup.test_safe_mode_metrics = function(g) + local has_metrics_module = require('metrics') + t.skip_if(not has_metrics_module, 'No metrics module in current version') + + -- Check safe mode metric on storage + helpers.call_on_storages(g.cluster, function(server) + local observed = server:eval("return require('metrics').collect({ invoke_callbacks = true })") + local has_metric = false + for _, m in pairs(observed) do + if m.metric_name == 'tnt_crud_storage_safe_mode_enabled' then + t.assert_equals(m.value, 0, 'Metric shows safe mode disabled') + has_metric = true + break + end + end + if not has_metric then + t.fail('No tnt_crud_storage_safe_mode_enabled metric found') + end + end) + + -- Enable safe mode + helpers.call_on_storages(g.cluster, function(server) + server:call('_crud.rebalance_safe_mode_enable') + end) + + -- Check that metric value has changed + helpers.call_on_storages(g.cluster, function(server) + local observed = server:eval("return require('metrics').collect({ invoke_callbacks = true })") + local has_metric = false + for _, m in pairs(observed) do + if m.metric_name == 'tnt_crud_storage_safe_mode_enabled' then + t.assert_equals(m.value, 1, 'Metric shows safe mode enabled') + has_metric = true + break + end + end + if not has_metric then + t.fail('No tnt_crud_storage_safe_mode_enabled metric found') + end + end) + + -- Check router cache metric + local observed = g.router:eval("return require('metrics').collect({ invoke_callbacks = true })") + local first_ts = 0 + for _, m in pairs(observed) do + if m.metric_name == 'tnt_crud_router_cache_last_clear_ts' then + first_ts = m.value + break + end + end + t.assert_gt(first_ts, 0, 'Last cache clear TS is greater than zero') + + -- Clear router cache + g.router:eval("crud.rebalance.router_cache_clear()") + + -- Check that last_clear_ts has changed + observed = g.router:eval("return require('metrics').collect({ invoke_callbacks = true })") + local new_ts = 0 + for _, m in pairs(observed) do + if m.metric_name == 'tnt_crud_router_cache_last_clear_ts' then + new_ts = m.value + break + end + end + t.assert_gt(new_ts, first_ts, 'Last cache clear TS is greater than the first one') +end diff --git a/test/tarantool3_helpers/cluster.lua b/test/tarantool3_helpers/cluster.lua index 21574d2b..df8caf81 100644 --- a/test/tarantool3_helpers/cluster.lua +++ b/test/tarantool3_helpers/cluster.lua @@ -297,34 +297,7 @@ function Cluster:cfg(new_config) return table.deepcopy(self.config) end -local function strip_all_entries(t, name) - if type(t) ~= 'table' then - return t - end - - t[name] = nil - - for k, v in pairs(t) do - t[k] = strip_all_entries(v, name) - end - - return t -end - -local function check_only_roles_cfg_changed_in_groups(old_groups, new_groups) - old_groups = table.deepcopy(old_groups) - new_groups = table.deepcopy(new_groups) - - local old_groups_no_roles_cfg = strip_all_entries(old_groups, 'roles_cfg') - local new_groups_no_roles_cfg = strip_all_entries(new_groups, 'roles_cfg') - - t.assert_equals(new_groups_no_roles_cfg, old_groups_no_roles_cfg, - 'groups reload supports only roles_cfg reload') -end - function Cluster:reload_config(new_config) - check_only_roles_cfg_changed_in_groups(self.config.groups, new_config.groups) - for _, server in ipairs(self.servers) do write_config(self.dirs[server], new_config) end diff --git a/test/unit/not_initialized_test.lua b/test/unit/not_initialized_test.lua index 82f9b4ee..e2f6793a 100644 --- a/test/unit/not_initialized_test.lua +++ b/test/unit/not_initialized_test.lua @@ -5,7 +5,7 @@ local server = require('luatest.server') local pgroup = t.group('not-initialized', helpers.backend_matrix({ {}, -})) +}, { skip_safe_mode = true })) local vshard_cfg_template = { sharding = { diff --git a/test/unit/privileges_test.lua b/test/unit/privileges_test.lua index 51cec2c5..7a3b6036 100644 --- a/test/unit/privileges_test.lua +++ b/test/unit/privileges_test.lua @@ -17,13 +17,14 @@ g.before_all(function() end) g.test_prepend_current_user_smoke = function() - local res = call.storage_api.call_on_storage(box.session.effective_user(), "unittestfunc", {"too", "foo"}) + local res = call.storage_api.call_on_storage( + box.session.effective_user(), {}, "read", "unittestfunc", {"too", "foo"}) t.assert_equals(res, {"too", "foo"}) end g.test_non_existent_user = function() t.assert_error_msg_contains("User 'non_existent_user' is not found", - call.storage_api.call_on_storage, "non_existent_user", "unittestfunc") + call.storage_api.call_on_storage, "non_existent_user", {}, "read", "unittestfunc") end g.test_that_the_session_switches_back = function() @@ -34,7 +35,7 @@ g.test_that_the_session_switches_back = function() local reference_user = box.session.effective_user() t.assert_not_equals(reference_user, "unittestuser") - local res = call.storage_api.call_on_storage("unittestuser", "unittestfunc2") + local res = call.storage_api.call_on_storage("unittestuser", {}, "read", "unittestfunc2") t.assert_equals(res, "unittestuser") t.assert_equals(box.session.effective_user(), reference_user) end diff --git a/test/unit/stats_test.lua b/test/unit/stats_test.lua index fdaf8c01..9ae6847d 100644 --- a/test/unit/stats_test.lua +++ b/test/unit/stats_test.lua @@ -45,6 +45,7 @@ local function enable_stats(g, params) params = table.deepcopy(params) params.backend = nil params.backend_cfg = nil + params.safe_mode = nil end g.router:eval("stats_module.enable(...)", { params }) end