@@ -4202,6 +4202,165 @@ TEST_CASE("flx: compensating write errors get re-sent across sessions", "[sync][
42024202 REQUIRE (top_level_table->is_empty ());
42034203}
42044204
4205+ TEST_CASE (" flx: compensating write errors are not duplicated" , " [sync][flx][compensating write][baas]" ) {
4206+ FLXSyncTestHarness harness (" flx_compensating_writes" );
4207+ auto config = harness.make_test_file ();
4208+
4209+ auto test_obj_id_1 = ObjectId::gen ();
4210+ auto test_obj_id_2 = ObjectId::gen ();
4211+
4212+ enum class TestState { Start, FirstError, SecondError, Resume, ThirdError, FourthError };
4213+ TestingStateMachine<TestState> state (TestState::Start);
4214+
4215+ std::mutex errors_mutex;
4216+ std::vector<std::pair<ObjectId, sync::version_type>> error_to_download_version;
4217+ std::vector<sync::CompensatingWriteErrorInfo> compensating_writes;
4218+ sync::version_type download_version;
4219+
4220+ config.sync_config ->on_sync_client_event_hook = [&](std::weak_ptr<SyncSession> weak_session,
4221+ const SyncClientHookData& data) {
4222+ if (auto session = weak_session.lock (); !session) {
4223+ return SyncClientHookAction::NoAction;
4224+ }
4225+ SyncClientHookAction action = SyncClientHookAction::NoAction;
4226+ state.transition_with ([&](TestState cur_state) -> std::optional<TestState> {
4227+ if (data.event != SyncClientHookEvent::ErrorMessageReceived) {
4228+ // Before the session is resumed, ignore the download messages received
4229+ // to undo the out-of-view writes.
4230+ if (data.event == SyncClientHookEvent::DownloadMessageReceived &&
4231+ (cur_state == TestState::FirstError || cur_state == TestState::SecondError)) {
4232+ action = SyncClientHookAction::EarlyReturn;
4233+ }
4234+ else if (data.event == SyncClientHookEvent::DownloadMessageReceived &&
4235+ (cur_state == TestState::Resume || cur_state == TestState::ThirdError)) {
4236+ download_version = data.progress .download .server_version ;
4237+ }
4238+ else if (data.event == SyncClientHookEvent::BindMessageSent && cur_state == TestState::SecondError) {
4239+ return TestState::Resume;
4240+ }
4241+ return std::nullopt ;
4242+ }
4243+
4244+ auto error_code = sync::ProtocolError (data.error_info ->raw_error_code );
4245+ if (error_code == sync::ProtocolError::initial_sync_not_completed) {
4246+ return std::nullopt ;
4247+ }
4248+
4249+ REQUIRE (error_code == sync::ProtocolError::compensating_write);
4250+ REQUIRE_FALSE (data.error_info ->compensating_writes .empty ());
4251+
4252+ if (cur_state == TestState::Start) {
4253+ return TestState::FirstError;
4254+ }
4255+ else if (cur_state == TestState::FirstError) {
4256+ // Return early so the second compensating write error is not saved
4257+ // by the sync client.
4258+ // This is so server versions received to undo the out-of-view writes are
4259+ // [x, x, y] instead of [x, y, x, y] (server versions don't increase
4260+ // monotonically as the client expects).
4261+ action = SyncClientHookAction::EarlyReturn;
4262+ return TestState::SecondError;
4263+ }
4264+ // Save third and fourth compensating write errors after resume.
4265+ std::lock_guard<std::mutex> lk (errors_mutex);
4266+ for (const auto & compensating_write : data.error_info ->compensating_writes ) {
4267+ error_to_download_version.emplace_back (compensating_write.primary_key .get_object_id (),
4268+ *data.error_info ->compensating_write_server_version );
4269+ }
4270+ return std::nullopt ;
4271+ });
4272+ return action;
4273+ };
4274+
4275+ config.sync_config ->error_handler = [&](std::shared_ptr<SyncSession>, SyncError error) {
4276+ std::unique_lock<std::mutex> lk (errors_mutex);
4277+ REQUIRE (error.status == ErrorCodes::SyncCompensatingWrite);
4278+ for (const auto & compensating_write : error.compensating_writes_info ) {
4279+ auto tracked_error = std::find_if (error_to_download_version.begin (), error_to_download_version.end (),
4280+ [&](const auto & pair) {
4281+ return pair.first == compensating_write.primary_key .get_object_id ();
4282+ });
4283+ REQUIRE (tracked_error != error_to_download_version.end ());
4284+ CHECK (tracked_error->second <= download_version);
4285+ compensating_writes.push_back (compensating_write);
4286+ }
4287+ lk.unlock ();
4288+
4289+ state.transition_with ([&](TestState cur_state) -> std::optional<TestState> {
4290+ if (cur_state == TestState::Resume) {
4291+ return TestState::ThirdError;
4292+ }
4293+ else if (cur_state == TestState::ThirdError) {
4294+ return TestState::FourthError;
4295+ }
4296+ return std::nullopt ;
4297+ });
4298+ };
4299+
4300+ auto realm = Realm::get_shared_realm (config);
4301+ auto table = realm->read_group ().get_table (" class_TopLevel" );
4302+ auto queryable_str_field = table->get_column_key (" queryable_str_field" );
4303+ auto new_query = realm->get_latest_subscription_set ().make_mutable_copy ();
4304+ new_query.insert_or_assign (Query (table).equal (queryable_str_field, " bizz" ));
4305+ std::move (new_query).commit ();
4306+
4307+ wait_for_upload (*realm);
4308+ wait_for_download (*realm);
4309+
4310+ CppContext c (realm);
4311+ realm->begin_transaction ();
4312+ Object::create (c, realm, " TopLevel" ,
4313+ util::Any (AnyDict{
4314+ {" _id" , test_obj_id_1},
4315+ {" queryable_str_field" , std::string{" foo" }},
4316+ }));
4317+ realm->commit_transaction ();
4318+
4319+ realm->begin_transaction ();
4320+ Object::create (c, realm, " TopLevel" ,
4321+ util::Any (AnyDict{
4322+ {" _id" , test_obj_id_2},
4323+ {" queryable_str_field" , std::string{" baz" }},
4324+ }));
4325+ realm->commit_transaction ();
4326+ state.wait_for (TestState::SecondError);
4327+
4328+ nlohmann::json error_body = {
4329+ {" tryAgain" , true }, {" message" , " fake error" },
4330+ {" shouldClientReset" , false }, {" isRecoveryModeDisabled" , false },
4331+ {" action" , " Transient" },
4332+ };
4333+ nlohmann::json test_command = {{" command" , " ECHO_ERROR" },
4334+ {" args" , nlohmann::json{{" errorCode" , 229 }, {" errorBody" , error_body}}}};
4335+
4336+ // Trigger a retryable transient error to resume the session.
4337+ auto test_cmd_res =
4338+ wait_for_future (SyncSession::OnlyForTesting::send_test_command (*realm->sync_session (), test_command.dump ()))
4339+ .get ();
4340+ CHECK (test_cmd_res == " {}" );
4341+ state.wait_for (TestState::FourthError);
4342+
4343+ REQUIRE (compensating_writes.size () == 2 );
4344+ auto & write_info = compensating_writes[0 ];
4345+ CHECK (write_info.primary_key .is_type (type_ObjectId));
4346+ CHECK (write_info.primary_key .get_object_id () == test_obj_id_1);
4347+ CHECK (write_info.object_name == " TopLevel" );
4348+ CHECK (write_info.reason == util::format (" write to ObjectID(\" %1\" ) in table \" TopLevel\" not allowed; object is "
4349+ " outside of the current query view" ,
4350+ test_obj_id_1));
4351+
4352+ write_info = compensating_writes[1 ];
4353+ CHECK (write_info.primary_key .is_type (type_ObjectId));
4354+ CHECK (write_info.primary_key .get_object_id () == test_obj_id_2);
4355+ CHECK (write_info.object_name == " TopLevel" );
4356+ CHECK (write_info.reason == util::format (" write to ObjectID(\" %1\" ) in table \" TopLevel\" not allowed; object is "
4357+ " outside of the current query view" ,
4358+ test_obj_id_2));
4359+ realm->refresh ();
4360+ auto top_level_table = realm->read_group ().get_table (" class_TopLevel" );
4361+ CHECK (top_level_table->is_empty ());
4362+ }
4363+
42054364TEST_CASE (" flx: bootstrap changesets are applied continuously" , " [sync][flx][bootstrap][baas]" ) {
42064365 FLXSyncTestHarness harness (" flx_bootstrap_ordering" , {g_large_array_schema, {" queryable_int_field" }});
42074366 fill_large_array_schema (harness);
0 commit comments