Skip to content

Commit a6423d9

Browse files
committed
Add ref/unref for inser/update/upsert
1 parent d82469c commit a6423d9

File tree

3 files changed

+86
-41
lines changed

3 files changed

+86
-41
lines changed

crud/insert.lua

Lines changed: 27 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ local utils = require('crud.common.utils')
77
local sharding = require('crud.common.sharding')
88
local dev_checks = require('crud.common.dev_checks')
99
local schema = require('crud.common.schema')
10+
local bucket_ref_unref = require('crud.common.sharding.bucket_ref_unref')
1011

1112
local InsertError = errors.new_class('InsertError', {capture_stack = false})
1213

@@ -42,15 +43,32 @@ local function insert_on_storage(space_name, tuple, opts)
4243
return nil, err
4344
end
4445

45-
-- add_space_schema_hash is true only in case of insert_object
46-
-- the only one case when reloading schema can avoid insert error
47-
-- is flattening object on router
48-
return schema.wrap_box_space_func_result(space, 'insert', {tuple}, {
49-
add_space_schema_hash = opts.add_space_schema_hash,
50-
field_names = opts.fields,
51-
noreturn = opts.noreturn,
52-
fetch_latest_metadata = opts.fetch_latest_metadata,
53-
})
46+
local function make_insert()
47+
local bucket_id = tuple[utils.get_bucket_id_fieldno(space)]
48+
local ref_ok, err_ref = bucket_ref_unref.bucket_refrw(bucket_id)
49+
if not ref_ok then
50+
return nil, err_ref
51+
end
52+
53+
-- add_space_schema_hash is true only in case of insert_object
54+
-- the only one case when reloading schema can avoid insert error
55+
-- is flattening object on router
56+
local result = schema.wrap_box_space_func_result(space, 'insert', {tuple}, {
57+
add_space_schema_hash = opts.add_space_schema_hash,
58+
field_names = opts.fields,
59+
noreturn = opts.noreturn,
60+
fetch_latest_metadata = opts.fetch_latest_metadata,
61+
})
62+
63+
local unref_ok, err_unref = bucket_ref_unref.bucket_unrefrw(bucket_id)
64+
if not unref_ok then
65+
return nil, err_unref
66+
end
67+
68+
return result
69+
end
70+
71+
return box.atomic(make_insert)
5472
end
5573

5674
insert.storage_api = {[INSERT_FUNC_NAME] = insert_on_storage}

crud/update.lua

Lines changed: 34 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ local sharding_key_module = require('crud.common.sharding.sharding_key')
99
local sharding_metadata_module = require('crud.common.sharding.sharding_metadata')
1010
local dev_checks = require('crud.common.dev_checks')
1111
local schema = require('crud.common.schema')
12+
local bucket_ref_unref = require('crud.common.sharding.bucket_ref_unref')
1213

1314
local UpdateError = errors.new_class('UpdateError', {capture_stack = false})
1415

@@ -19,6 +20,7 @@ local CRUD_UPDATE_FUNC_NAME = utils.get_storage_call(UPDATE_FUNC_NAME)
1920

2021
local function update_on_storage(space_name, key, operations, field_names, opts)
2122
dev_checks('string', '?', 'table', '?table', {
23+
bucket_id = 'number|cdata',
2224
sharding_key_hash = '?number',
2325
sharding_func_hash = '?number',
2426
skip_sharding_hash_check = '?boolean',
@@ -42,39 +44,45 @@ local function update_on_storage(space_name, key, operations, field_names, opts)
4244
return nil, err
4345
end
4446

45-
-- add_space_schema_hash is false because
46-
-- reloading space format on router can't avoid update error on storage
47-
local res, err = schema.wrap_box_space_func_result(space, 'update', {key, operations}, {
48-
add_space_schema_hash = false,
49-
field_names = field_names,
50-
noreturn = opts.noreturn,
51-
fetch_latest_metadata = opts.fetch_latest_metadata,
52-
})
53-
54-
if err ~= nil then
55-
return nil, err
56-
end
57-
58-
if res.err == nil then
59-
return res, nil
60-
end
47+
local function make_update()
48+
local ref_ok, err_ref = bucket_ref_unref.bucket_refrw(opts.bucket_id)
49+
if not ref_ok then
50+
return nil, err_ref
51+
end
6152

62-
-- Relevant for Tarantool older than 2.8.1.
63-
-- We can only add fields to end of the tuple.
64-
-- If schema is updated and nullable fields are added, then we will get error.
65-
-- Therefore, we need to add filling of intermediate nullable fields.
66-
-- More details: https://github.com/tarantool/tarantool/issues/3378
67-
if utils.is_field_not_found(res.err.code) then
68-
operations = utils.add_intermediate_nullable_fields(operations, space:format(), space:get(key))
69-
res, err = schema.wrap_box_space_func_result(space, 'update', {key, operations}, {
53+
-- add_space_schema_hash is false because
54+
-- reloading space format on router can't avoid update error on storage
55+
local res, err = schema.wrap_box_space_func_result(space, 'update', {key, operations}, {
7056
add_space_schema_hash = false,
7157
field_names = field_names,
7258
noreturn = opts.noreturn,
7359
fetch_latest_metadata = opts.fetch_latest_metadata,
7460
})
61+
62+
if err == nil and res.err ~= nil and utils.is_field_not_found(res.err.code) then
63+
-- Relevant for Tarantool older than 2.8.1.
64+
-- We can only add fields to end of the tuple.
65+
-- If schema is updated and nullable fields are added, then we will get error.
66+
-- Therefore, we need to add filling of intermediate nullable fields.
67+
-- More details: https://github.com/tarantool/tarantool/issues/3378
68+
operations = utils.add_intermediate_nullable_fields(operations, space:format(), space:get(key))
69+
res, err = schema.wrap_box_space_func_result(space, 'update', {key, operations}, {
70+
add_space_schema_hash = false,
71+
field_names = field_names,
72+
noreturn = opts.noreturn,
73+
fetch_latest_metadata = opts.fetch_latest_metadata,
74+
})
75+
end
76+
77+
local unref_ok, err_unref = bucket_ref_unref.bucket_unrefrw(opts.bucket_id)
78+
if not unref_ok then
79+
return nil, err_unref
80+
end
81+
82+
return res, err
7583
end
7684

77-
return res, err
85+
return box.atomic(make_update)
7886
end
7987

8088
update.storage_api = {[UPDATE_FUNC_NAME] = update_on_storage}
@@ -148,6 +156,7 @@ local function call_update_on_router(vshard_router, space_name, key, user_operat
148156
sharding.fill_bucket_id_pk(space, key, bucket_id_data.bucket_id)
149157

150158
local update_on_storage_opts = {
159+
bucket_id = bucket_id_data.bucket_id,
151160
sharding_func_hash = bucket_id_data.sharding_func_hash,
152161
sharding_key_hash = sharding_key_hash,
153162
skip_sharding_hash_check = skip_sharding_hash_check,

crud/upsert.lua

Lines changed: 25 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ local utils = require('crud.common.utils')
77
local sharding = require('crud.common.sharding')
88
local dev_checks = require('crud.common.dev_checks')
99
local schema = require('crud.common.schema')
10+
local bucket_ref_unref = require('crud.common.sharding.bucket_ref_unref')
1011

1112
local UpsertError = errors.new_class('UpsertError', { capture_stack = false})
1213

@@ -41,13 +42,30 @@ local function upsert_on_storage(space_name, tuple, operations, opts)
4142
return nil, err
4243
end
4344

44-
-- add_space_schema_hash is true only in case of upsert_object
45-
-- the only one case when reloading schema can avoid insert error
46-
-- is flattening object on router
47-
return schema.wrap_box_space_func_result(space, 'upsert', {tuple, operations}, {
48-
add_space_schema_hash = opts.add_space_schema_hash,
49-
fetch_latest_metadata = opts.fetch_latest_metadata,
50-
})
45+
local function make_upsert()
46+
local bucket_id = tuple[utils.get_bucket_id_fieldno(space)]
47+
local ref_ok, err_ref = bucket_ref_unref.bucket_refrw(bucket_id)
48+
if not ref_ok then
49+
return nil, err_ref
50+
end
51+
52+
-- add_space_schema_hash is true only in case of upsert_object
53+
-- the only one case when reloading schema can avoid insert error
54+
-- is flattening object on router
55+
local result = schema.wrap_box_space_func_result(space, 'upsert', {tuple, operations}, {
56+
add_space_schema_hash = opts.add_space_schema_hash,
57+
fetch_latest_metadata = opts.fetch_latest_metadata,
58+
})
59+
60+
local unref_ok, err_unref = bucket_ref_unref.bucket_unrefrw(bucket_id)
61+
if not unref_ok then
62+
return nil, err_unref
63+
end
64+
65+
return result
66+
end
67+
68+
return box.atomic(make_upsert)
5169
end
5270

5371
upsert.storage_api = {[UPSERT_FUNC_NAME] = upsert_on_storage}

0 commit comments

Comments
 (0)