Skip to content

Commit e19fc3e

Browse files
committed
refrw buckets in batch operations
1 parent 59c8964 commit e19fc3e

File tree

4 files changed

+151
-3
lines changed

4 files changed

+151
-3
lines changed

crud/common/sharding/bucket_ref_unref.lua

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ function M.bucket_refro(bucket_id)
8181
end
8282

8383
--- bucket_unrefro calls vshard.storage.bucket_unrefro if safe mode enabled
84+
--- must be called in one transaction with bucket_refrw_many
8485
--- otherwise returns true
8586
function M.bucket_unrefro(bucket_id)
8687
if not M.safe_mode_status() then
@@ -89,4 +90,61 @@ function M.bucket_unrefro(bucket_id)
8990
return vshard.storage.bucket_unrefro(bucket_id)
9091
end
9192

93+
--- bucket_refrw_many calls bucket_refrw for every bucket and aggregates errors
94+
--- @param bucket_ids table<number, boolean>
95+
function M.bucket_refrw_many(bucket_ids)
96+
if not M.safe_mode_status() then
97+
return true
98+
end
99+
100+
local bucket_ref_errs = {}
101+
local reffed_bucket_ids = {}
102+
for bucket_id in pairs(bucket_ids) do
103+
local ref_ok, bucket_refrw_err = M.bucket_refrw(bucket_id)
104+
if not ref_ok then
105+
106+
table.insert(bucket_ref_errs, bucket_refrw_err.bucket_ref_errs[1])
107+
goto continue
108+
end
109+
110+
reffed_bucket_ids[bucket_id] = true
111+
::continue::
112+
end
113+
114+
if next(bucket_ref_errs) ~= nil then
115+
local err = M.BucketRefError:new(M.BucketRefError:new("failed bucket_ref"))
116+
err.bucket_ref_errs = bucket_ref_errs
117+
M.bucket_unrefrw_many(reffed_bucket_ids)
118+
return nil, err
119+
end
120+
121+
return true
122+
end
123+
124+
--- bucket_unrefrw_many calls vshard.storage.bucket_unrefrw for every bucket if safe mode enabled
125+
--- otherwise returns true
126+
--- must be called in one transaction with bucket_refrw_many
127+
--- will never happen in called in one transaction with bucket_refrw_many
128+
--- @param bucket_ids table<number, true>
129+
function M.bucket_unrefrw_many(bucket_ids)
130+
if not M.safe_mode_status() then
131+
return true
132+
end
133+
134+
local unref_all_ok = true
135+
local unref_last_err
136+
for reffed_bucket_id in pairs(bucket_ids) do
137+
local unref_ok, unref_err = M.bucket_unrefrw(reffed_bucket_id)
138+
if not unref_ok then
139+
unref_all_ok = nil
140+
unref_last_err = unref_err
141+
end
142+
end
143+
144+
if not unref_all_ok then
145+
return nil, unref_last_err
146+
end
147+
return true
148+
end
149+
92150
return M

crud/insert_many.lua

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ local batching_utils = require('crud.common.batching_utils')
88
local sharding = require('crud.common.sharding')
99
local dev_checks = require('crud.common.dev_checks')
1010
local schema = require('crud.common.schema')
11+
local bucket_ref_unref = require('crud.common.sharding.bucket_ref_unref')
1112

1213
local BatchInsertIterator = require('crud.common.map_call_cases.batch_insert_iter')
1314
local BatchPostprocessor = require('crud.common.map_call_cases.batch_postprocessor')
@@ -48,6 +49,16 @@ local function insert_many_on_storage(space_name, tuples, opts)
4849
return nil, batching_utils.construct_sharding_hash_mismatch_errors(err.err, tuples)
4950
end
5051

52+
local bucket_ids = {}
53+
for _, tuple in ipairs(tuples) do
54+
bucket_ids[tuple[utils.get_bucket_id_fieldno(space)]] = true
55+
end
56+
57+
local ref_ok, bucket_ref_err = bucket_ref_unref.bucket_refrw_many(bucket_ids)
58+
if not ref_ok then
59+
return nil, bucket_ref_err
60+
end
61+
5162
local inserted_tuples = {}
5263
local errs = {}
5364
local replica_schema_version = nil
@@ -84,7 +95,11 @@ local function insert_many_on_storage(space_name, tuples, opts)
8495
end
8596

8697
if opts.rollback_on_error == true then
98+
local unref_ok, bucket_unref_err = bucket_ref_unref.bucket_unrefrw_many(bucket_ids)
8799
box.rollback()
100+
if not unref_ok then
101+
return nil, bucket_unref_err
102+
end
88103
if next(inserted_tuples) then
89104
errs = batching_utils.complement_batching_errors(errs,
90105
batching_utils.rollback_on_error_msg, inserted_tuples)
@@ -93,7 +108,11 @@ local function insert_many_on_storage(space_name, tuples, opts)
93108
return nil, errs, replica_schema_version
94109
end
95110

111+
local unref_ok, bucket_unref_err = bucket_ref_unref.bucket_unrefrw_many(bucket_ids)
96112
box.commit()
113+
if not unref_ok then
114+
return nil, bucket_unref_err
115+
end
97116

98117
return inserted_tuples, errs, replica_schema_version
99118
end
@@ -104,7 +123,11 @@ local function insert_many_on_storage(space_name, tuples, opts)
104123

105124
if next(errs) ~= nil then
106125
if opts.rollback_on_error == true then
126+
local unref_ok, bucket_unref_err = bucket_ref_unref.bucket_unrefrw_many(bucket_ids)
107127
box.rollback()
128+
if not unref_ok then
129+
return nil, bucket_unref_err
130+
end
108131
if next(inserted_tuples) then
109132
errs = batching_utils.complement_batching_errors(errs,
110133
batching_utils.rollback_on_error_msg, inserted_tuples)
@@ -113,12 +136,20 @@ local function insert_many_on_storage(space_name, tuples, opts)
113136
return nil, errs, replica_schema_version
114137
end
115138

139+
local unref_ok, bucket_unref_err = bucket_ref_unref.bucket_unrefrw_many(bucket_ids)
116140
box.commit()
141+
if not unref_ok then
142+
return nil, bucket_unref_err
143+
end
117144

118145
return inserted_tuples, errs, replica_schema_version
119146
end
120147

148+
local unref_ok, bucket_unref_err = bucket_ref_unref.bucket_unrefrw_many(bucket_ids)
121149
box.commit()
150+
if not unref_ok then
151+
return nil, bucket_unref_err
152+
end
122153

123154
return inserted_tuples, nil, replica_schema_version
124155
end

crud/replace_many.lua

Lines changed: 31 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ local batching_utils = require('crud.common.batching_utils')
88
local sharding = require('crud.common.sharding')
99
local dev_checks = require('crud.common.dev_checks')
1010
local schema = require('crud.common.schema')
11+
local bucket_ref_unref = require('crud.common.sharding.bucket_ref_unref')
1112

1213
local BatchInsertIterator = require('crud.common.map_call_cases.batch_insert_iter')
1314
local BatchPostprocessor = require('crud.common.map_call_cases.batch_postprocessor')
@@ -52,6 +53,16 @@ local function replace_many_on_storage(space_name, tuples, opts)
5253
local errs = {}
5354
local replica_schema_version = nil
5455

56+
local bucket_ids = {}
57+
for _, tuple in ipairs(tuples) do
58+
bucket_ids[tuple[utils.get_bucket_id_fieldno(space)]] = true
59+
end
60+
61+
local ref_ok, bucket_ref_err = bucket_ref_unref.bucket_refrw_many(bucket_ids)
62+
if not ref_ok then
63+
return nil, bucket_ref_err
64+
end
65+
5566
box.begin()
5667
for i, tuple in ipairs(tuples) do
5768
-- add_space_schema_hash is true only in case of replace_object_many
@@ -87,17 +98,23 @@ local function replace_many_on_storage(space_name, tuples, opts)
8798
end
8899

89100
if opts.rollback_on_error == true then
101+
local unref_ok, bucket_unref_err = bucket_ref_unref.bucket_unrefrw_many(bucket_ids)
90102
box.rollback()
103+
if not unref_ok then
104+
return nil, bucket_unref_err
105+
end
91106
if next(inserted_tuples) then
92107
errs = batching_utils.complement_batching_errors(errs,
93108
batching_utils.rollback_on_error_msg, inserted_tuples)
94109
end
95-
96110
return nil, errs, replica_schema_version
97111
end
98112

113+
local unref_ok, bucket_unref_err = bucket_ref_unref.bucket_unrefrw_many(bucket_ids)
99114
box.commit()
100-
115+
if not unref_ok then
116+
return nil, bucket_unref_err
117+
end
101118
return inserted_tuples, errs, replica_schema_version
102119
end
103120
end
@@ -107,7 +124,11 @@ local function replace_many_on_storage(space_name, tuples, opts)
107124

108125
if next(errs) ~= nil then
109126
if opts.rollback_on_error == true then
127+
local unref_ok, bucket_unref_err = bucket_ref_unref.bucket_unrefrw_many(bucket_ids)
110128
box.rollback()
129+
if not unref_ok then
130+
return nil, bucket_unref_err
131+
end
111132
if next(inserted_tuples) then
112133
errs = batching_utils.complement_batching_errors(errs,
113134
batching_utils.rollback_on_error_msg, inserted_tuples)
@@ -116,13 +137,20 @@ local function replace_many_on_storage(space_name, tuples, opts)
116137
return nil, errs, replica_schema_version
117138
end
118139

140+
local unref_ok, bucket_unref_err = bucket_ref_unref.bucket_unrefrw_many(bucket_ids)
119141
box.commit()
142+
if not unref_ok then
143+
return nil, bucket_unref_err
144+
end
120145

121146
return inserted_tuples, errs, replica_schema_version
122147
end
123148

149+
local unref_ok, bucket_unref_err = bucket_ref_unref.bucket_unrefrw_many(bucket_ids)
124150
box.commit()
125-
151+
if not unref_ok then
152+
return nil, bucket_unref_err
153+
end
126154
return inserted_tuples, nil, replica_schema_version
127155
end
128156

crud/upsert_many.lua

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ local batching_utils = require('crud.common.batching_utils')
88
local sharding = require('crud.common.sharding')
99
local dev_checks = require('crud.common.dev_checks')
1010
local schema = require('crud.common.schema')
11+
local bucket_ref_unref = require('crud.common.sharding.bucket_ref_unref')
1112

1213
local BatchUpsertIterator = require('crud.common.map_call_cases.batch_upsert_iter')
1314
local BatchPostprocessor = require('crud.common.map_call_cases.batch_postprocessor')
@@ -47,6 +48,16 @@ local function upsert_many_on_storage(space_name, tuples, operations, opts)
4748
return nil, batching_utils.construct_sharding_hash_mismatch_errors(err.err, tuples)
4849
end
4950

51+
local bucket_ids = {}
52+
for _, tuple in ipairs(tuples) do
53+
bucket_ids[tuple[utils.get_bucket_id_fieldno(space)]] = true
54+
end
55+
56+
local ref_ok, bucket_ref_err = bucket_ref_unref.bucket_refrw_many(bucket_ids)
57+
if not ref_ok then
58+
return nil, bucket_ref_err
59+
end
60+
5061
local processed_tuples = {}
5162
local errs = {}
5263
local replica_schema_version = nil
@@ -81,7 +92,11 @@ local function upsert_many_on_storage(space_name, tuples, operations, opts)
8192
end
8293

8394
if opts.rollback_on_error == true then
95+
local unref_ok, bucket_unref_err = bucket_ref_unref.bucket_unrefrw_many(bucket_ids)
8496
box.rollback()
97+
if not unref_ok then
98+
return nil, bucket_unref_err
99+
end
85100
if next(processed_tuples) then
86101
errs = batching_utils.complement_batching_errors(errs,
87102
batching_utils.rollback_on_error_msg, processed_tuples)
@@ -90,7 +105,11 @@ local function upsert_many_on_storage(space_name, tuples, operations, opts)
90105
return nil, errs, replica_schema_version
91106
end
92107

108+
local unref_ok, bucket_unref_err = bucket_ref_unref.bucket_unrefrw_many(bucket_ids)
93109
box.commit()
110+
if not unref_ok then
111+
return nil, bucket_unref_err
112+
end
94113

95114
return nil, errs, replica_schema_version
96115
end
@@ -101,7 +120,11 @@ local function upsert_many_on_storage(space_name, tuples, operations, opts)
101120

102121
if next(errs) ~= nil then
103122
if opts.rollback_on_error == true then
123+
local unref_ok, bucket_unref_err = bucket_ref_unref.bucket_unrefrw_many(bucket_ids)
104124
box.rollback()
125+
if not unref_ok then
126+
return nil, bucket_unref_err
127+
end
105128
if next(processed_tuples) then
106129
errs = batching_utils.complement_batching_errors(errs,
107130
batching_utils.rollback_on_error_msg, processed_tuples)
@@ -110,12 +133,20 @@ local function upsert_many_on_storage(space_name, tuples, operations, opts)
110133
return nil, errs, replica_schema_version
111134
end
112135

136+
local unref_ok, bucket_unref_err = bucket_ref_unref.bucket_unrefrw_many(bucket_ids)
113137
box.commit()
138+
if not unref_ok then
139+
return nil, bucket_unref_err
140+
end
114141

115142
return nil, errs, replica_schema_version
116143
end
117144

145+
local unref_ok, bucket_unref_err = bucket_ref_unref.bucket_unrefrw_many(bucket_ids)
118146
box.commit()
147+
if not unref_ok then
148+
return nil, bucket_unref_err
149+
end
119150

120151
return nil, nil, replica_schema_version
121152
end

0 commit comments

Comments
 (0)