Skip to content

Commit 819bb98

Browse files
authored
Add support for multi-process subscription state change notifications (#7862)
As with the other multi-process notifications, the core idea here is to eliminate the in-memory state and produce notifications based entirely on the current state of the Realm file. SubscriptionStore::update_state() has been replaced with separate functions for the specific legal state transitions, which also take a write transaction as a parameter. These functions are called by PendingBootstrapStore inside the same write transaction as the bootstrap updates which changed the subscription state. This is both a minor performance optimization (due to fewer writes) and eliminates a brief window between the two writes where the Realm file was in an inconsistent state. There's a minor functional change here: previously old subscription sets were superseded when the new one reached the Completed state, and now they are superseded on AwaitingMark. This aligns it with when the new subscription set becomes the one which is returned by get_active().
1 parent f93e758 commit 819bb98

20 files changed

+630
-505
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
### Internals
1919
* Fix emscripten build and add emscripten debug/release compile tasks to evergreen. ([PR #7916](https://github.com/realm/realm-core/pull/7916))
20+
* Subscription set state change notifications now work in a multiprocess-compatible manner ([PR #7862](https://github.com/realm/realm-core/pull/7862)).
2021

2122
----------------------------------------------
2223

src/realm/sync/client.cpp

Lines changed: 37 additions & 145 deletions
Original file line numberDiff line numberDiff line change
@@ -104,8 +104,6 @@ class SessionWrapper final : public util::AtomicRefCountBase, DB::CommitListener
104104

105105
void handle_pending_client_reset_acknowledgement();
106106

107-
void update_subscription_version_info();
108-
109107
// Can be called from any thread.
110108
std::string get_appservices_connection_id();
111109

@@ -149,7 +147,7 @@ class SessionWrapper final : public util::AtomicRefCountBase, DB::CommitListener
149147
uint64_t uploadable;
150148
uint64_t downloaded;
151149
uint64_t downloadable;
152-
int64_t query_version;
150+
int64_t query_version = 0;
153151
double download_estimate;
154152

155153
// Does not check snapshot
@@ -179,11 +177,7 @@ class SessionWrapper final : public util::AtomicRefCountBase, DB::CommitListener
179177
const uint64_t m_schema_version;
180178

181179
std::shared_ptr<SubscriptionStore> m_flx_subscription_store;
182-
int64_t m_flx_active_version = 0;
183-
int64_t m_flx_last_seen_version = 0;
184-
int64_t m_flx_pending_mark_version = 0;
185180
std::unique_ptr<PendingBootstrapStore> m_flx_pending_bootstrap_store;
186-
187181
std::shared_ptr<MigrationStore> m_migration_store;
188182

189183
// Set to true when this session wrapper is actualized (i.e. the wrapped
@@ -242,8 +236,6 @@ class SessionWrapper final : public util::AtomicRefCountBase, DB::CommitListener
242236
void on_resumed();
243237
void on_connection_state_changed(ConnectionState, const std::optional<SessionErrorInfo>&);
244238
void on_flx_sync_progress(int64_t new_version, DownloadBatchState batch_state);
245-
void on_flx_sync_error(int64_t version, std::string_view err_msg);
246-
void on_flx_sync_version_complete(int64_t version);
247239

248240
void init_progress_handler();
249241
void check_progress();
@@ -795,14 +787,6 @@ void SessionImpl::handle_pending_client_reset_acknowledgement()
795787
}
796788
}
797789

798-
void SessionImpl::update_subscription_version_info()
799-
{
800-
// Ignore the call if the session is not active
801-
if (m_state == State::Active) {
802-
m_wrapper.update_subscription_version_info();
803-
}
804-
}
805-
806790
bool SessionImpl::process_flx_bootstrap_message(const DownloadMessage& message)
807791
{
808792
// Ignore the message if the session is not active or a steady state message
@@ -818,10 +802,8 @@ bool SessionImpl::process_flx_bootstrap_message(const DownloadMessage& message)
818802
maybe_progress = message.progress;
819803
}
820804

821-
bool new_batch = false;
822805
try {
823-
bootstrap_store->add_batch(*message.query_version, std::move(maybe_progress), message.downloadable,
824-
message.changesets, &new_batch);
806+
bootstrap_store->add_batch(*message.query_version, maybe_progress, message.downloadable, message.changesets);
825807
}
826808
catch (const LogicError& ex) {
827809
if (ex.code() == ErrorCodes::LimitExceeded) {
@@ -834,12 +816,6 @@ bool SessionImpl::process_flx_bootstrap_message(const DownloadMessage& message)
834816
throw;
835817
}
836818

837-
// If we've started a new batch and there is more to come, call on_flx_sync_progress to mark the subscription as
838-
// bootstrapping.
839-
if (new_batch && message.batch_state == DownloadBatchState::MoreToCome) {
840-
on_flx_sync_progress(*message.query_version, DownloadBatchState::MoreToCome);
841-
}
842-
843819
auto hook_action = call_debug_hook(SyncClientHookEvent::BootstrapMessageProcessed, message.progress,
844820
*message.query_version, message.batch_state, message.changesets.size());
845821
if (hook_action == SyncClientHookAction::EarlyReturn) {
@@ -891,13 +867,11 @@ void SessionImpl::process_pending_flx_bootstrap()
891867
TransactionRef transact = get_db()->start_write();
892868
while (bootstrap_store->has_pending()) {
893869
auto start_time = std::chrono::steady_clock::now();
894-
auto pending_batch = bootstrap_store->peek_pending(m_wrapper.m_flx_bootstrap_batch_size_bytes);
870+
auto pending_batch = bootstrap_store->peek_pending(*transact, m_wrapper.m_flx_bootstrap_batch_size_bytes);
895871
if (!pending_batch.progress) {
896872
logger.info("Incomplete pending bootstrap found for query version %1", pending_batch.query_version);
897-
// Close the write transaction before clearing the bootstrap store to avoid a deadlock because the
898-
// bootstrap store requires a write transaction itself.
899-
transact->close();
900-
bootstrap_store->clear();
873+
bootstrap_store->clear(*transact, pending_batch.query_version);
874+
transact->commit();
901875
return;
902876
}
903877

@@ -915,7 +889,7 @@ void SessionImpl::process_pending_flx_bootstrap()
915889

916890
history.integrate_server_changesets(
917891
*pending_batch.progress, 1.0, pending_batch.changesets, new_version, batch_state, logger, transact,
918-
[&](const TransactionRef& tr, util::Span<Changeset> changesets_applied) {
892+
[&](const Transaction& tr, util::Span<Changeset> changesets_applied) {
919893
REALM_ASSERT_3(changesets_applied.size(), <=, pending_batch.changesets.size());
920894
bootstrap_store->pop_front_pending(tr, changesets_applied.size());
921895
});
@@ -935,7 +909,6 @@ void SessionImpl::process_pending_flx_bootstrap()
935909
}
936910

937911
REALM_ASSERT_3(query_version, !=, -1);
938-
on_flx_sync_progress(query_version, DownloadBatchState::LastInBatch);
939912

940913
on_changesets_integrated(new_version.realm_version, progress);
941914
auto action = call_debug_hook(SyncClientHookEvent::BootstrapProcessed, progress, query_version,
@@ -948,15 +921,7 @@ void SessionImpl::on_flx_sync_error(int64_t version, std::string_view err_msg)
948921
{
949922
// Ignore the call if the session is not active
950923
if (m_state == State::Active) {
951-
m_wrapper.on_flx_sync_error(version, err_msg);
952-
}
953-
}
954-
955-
void SessionImpl::on_flx_sync_progress(int64_t version, DownloadBatchState batch_state)
956-
{
957-
// Ignore the call if the session is not active
958-
if (m_state == State::Active) {
959-
m_wrapper.on_flx_sync_progress(version, batch_state);
924+
get_flx_subscription_store()->set_error(version, err_msg);
960925
}
961926
}
962927

@@ -974,14 +939,6 @@ MigrationStore* SessionImpl::get_migration_store()
974939
return m_wrapper.get_migration_store();
975940
}
976941

977-
void SessionImpl::on_flx_sync_version_complete(int64_t version)
978-
{
979-
// Ignore the call if the session is not active
980-
if (m_state == State::Active) {
981-
m_wrapper.on_flx_sync_version_complete(version);
982-
}
983-
}
984-
985942
SyncClientHookAction SessionImpl::call_debug_hook(const SyncClientHookData& data)
986943
{
987944
// Should never be called if session is not active
@@ -1182,75 +1139,6 @@ bool SessionWrapper::has_flx_subscription_store() const
11821139
return static_cast<bool>(m_flx_subscription_store);
11831140
}
11841141

1185-
void SessionWrapper::on_flx_sync_error(int64_t version, std::string_view err_msg)
1186-
{
1187-
REALM_ASSERT(!m_finalized);
1188-
get_flx_subscription_store()->update_state(version, SubscriptionSet::State::Error, err_msg);
1189-
}
1190-
1191-
void SessionWrapper::on_flx_sync_version_complete(int64_t version)
1192-
{
1193-
REALM_ASSERT(!m_finalized);
1194-
m_flx_last_seen_version = version;
1195-
m_flx_active_version = version;
1196-
}
1197-
1198-
void SessionWrapper::on_flx_sync_progress(int64_t new_version, DownloadBatchState batch_state)
1199-
{
1200-
if (!has_flx_subscription_store()) {
1201-
return;
1202-
}
1203-
1204-
REALM_ASSERT(!m_finalized);
1205-
if (batch_state == DownloadBatchState::SteadyState) {
1206-
throw IntegrationException(ErrorCodes::SyncProtocolInvariantFailed,
1207-
"Unexpected batch state of SteadyState while downloading bootstrap");
1208-
}
1209-
// Is this a server-initiated bootstrap? Skip notifying the subscription store
1210-
if (new_version == m_flx_active_version) {
1211-
return;
1212-
}
1213-
if (new_version < m_flx_active_version) {
1214-
throw IntegrationException(ErrorCodes::SyncProtocolInvariantFailed,
1215-
util::format("Bootstrap query version %1 is less than active version %2",
1216-
new_version, m_flx_active_version));
1217-
}
1218-
if (new_version < m_flx_last_seen_version) {
1219-
throw IntegrationException(
1220-
ErrorCodes::SyncProtocolInvariantFailed,
1221-
util::format("Download message query version %1 is less than current bootstrap version %2", new_version,
1222-
m_flx_last_seen_version));
1223-
}
1224-
1225-
SubscriptionSet::State new_state = SubscriptionSet::State::Uncommitted; // Initialize to make compiler happy
1226-
1227-
switch (batch_state) {
1228-
case DownloadBatchState::SteadyState:
1229-
// Cannot be called with this value.
1230-
REALM_UNREACHABLE();
1231-
case DownloadBatchState::LastInBatch:
1232-
on_flx_sync_version_complete(new_version);
1233-
if (new_version == 0) {
1234-
new_state = SubscriptionSet::State::Complete;
1235-
}
1236-
else {
1237-
new_state = SubscriptionSet::State::AwaitingMark;
1238-
m_flx_pending_mark_version = new_version;
1239-
}
1240-
break;
1241-
case DownloadBatchState::MoreToCome:
1242-
if (m_flx_last_seen_version == new_version) {
1243-
return;
1244-
}
1245-
1246-
m_flx_last_seen_version = new_version;
1247-
new_state = SubscriptionSet::State::Bootstrapping;
1248-
break;
1249-
}
1250-
1251-
get_flx_subscription_store()->update_state(new_version, new_state);
1252-
}
1253-
12541142
SubscriptionStore* SessionWrapper::get_flx_subscription_store()
12551143
{
12561144
REALM_ASSERT(!m_finalized);
@@ -1443,7 +1331,8 @@ void SessionWrapper::actualize()
14431331
conn.update_connect_info(m_http_request_path_prefix, m_signed_access_token); // Throws
14441332
std::unique_ptr<SessionImpl> sess = std::make_unique<SessionImpl>(*this, conn); // Throws
14451333
if (m_sync_mode == SyncServerMode::FLX) {
1446-
m_flx_pending_bootstrap_store = std::make_unique<PendingBootstrapStore>(m_db, sess->logger);
1334+
m_flx_pending_bootstrap_store =
1335+
std::make_unique<PendingBootstrapStore>(m_db, sess->logger, m_flx_subscription_store);
14471336
}
14481337

14491338
sess->logger.info("Binding '%1' to '%2'", m_db->get_path(), m_virt_path); // Throws
@@ -1453,10 +1342,6 @@ void SessionWrapper::actualize()
14531342
});
14541343
conn.activate_session(std::move(sess)); // Throws
14551344

1456-
// Initialize the variables relying on the bootstrap store from the event loop to guarantee that a previous
1457-
// session cannot change the state of the bootstrap store at the same time.
1458-
update_subscription_version_info();
1459-
14601345
if (was_created)
14611346
conn.activate(); // Throws
14621347

@@ -1563,6 +1448,10 @@ void SessionWrapper::on_download_completion()
15631448
// completion without this.
15641449
check_progress();
15651450

1451+
if (m_flx_subscription_store) {
1452+
m_flx_subscription_store->download_complete();
1453+
}
1454+
15661455
while (!m_download_completion_handlers.empty()) {
15671456
auto handler = std::move(m_download_completion_handlers.back());
15681457
m_download_completion_handlers.pop_back();
@@ -1573,13 +1462,6 @@ void SessionWrapper::on_download_completion()
15731462
m_upload_completion_handlers.push_back(std::move(handler)); // Throws
15741463
m_sync_completion_handlers.pop_back();
15751464
}
1576-
1577-
if (m_flx_subscription_store && m_flx_pending_mark_version != SubscriptionSet::EmptyVersion) {
1578-
m_sess->logger.debug("Marking query version %1 as complete after receiving MARK message",
1579-
m_flx_pending_mark_version);
1580-
m_flx_subscription_store->update_state(m_flx_pending_mark_version, SubscriptionSet::State::Complete);
1581-
m_flx_pending_mark_version = SubscriptionSet::EmptyVersion;
1582-
}
15831465
}
15841466

15851467

@@ -1626,15 +1508,34 @@ void SessionWrapper::check_progress()
16261508
REALM_ASSERT(!m_finalized);
16271509
REALM_ASSERT(m_sess);
16281510

1629-
if (!m_progress_handler && m_upload_completion_handlers.empty() && m_sync_completion_handlers.empty())
1511+
// Check if there's anything which even wants progress or completion information
1512+
bool has_progress_handler = m_progress_handler && m_reliable_download_progress;
1513+
bool has_completion_handler = !m_upload_completion_handlers.empty() || !m_sync_completion_handlers.empty();
1514+
if (!m_flx_subscription_store && !has_progress_handler && !has_completion_handler)
16301515
return;
16311516

1632-
version_type uploaded_version;
1517+
// The order in which we report each type of completion or progress is important,
1518+
// and changing it needs to be avoided as it'd be a breaking change to the APIs
1519+
1520+
TransactionRef tr;
16331521
ReportedProgress p;
1522+
if (m_flx_subscription_store) {
1523+
m_flx_subscription_store->report_progress(tr);
1524+
}
1525+
1526+
if (!has_progress_handler && !has_completion_handler)
1527+
return;
1528+
// The subscription store may have started a read transaction that we'll
1529+
// reuse, but it may not have needed to or may not exist
1530+
if (!tr)
1531+
tr = m_db->start_read();
1532+
1533+
version_type uploaded_version;
16341534
DownloadableProgress downloadable;
1635-
ClientHistory::get_upload_download_state(*m_db, p.downloaded, downloadable, p.uploaded, p.uploadable, p.snapshot,
1636-
uploaded_version);
1637-
p.query_version = m_flx_last_seen_version;
1535+
ClientHistory::get_upload_download_state(*tr, m_db->get_alloc(), p.downloaded, downloadable, p.uploaded,
1536+
p.uploadable, p.snapshot, uploaded_version);
1537+
if (m_flx_subscription_store && has_progress_handler)
1538+
p.query_version = m_flx_subscription_store->get_downloading_query_version(*tr);
16381539

16391540
report_progress(p, downloadable);
16401541
report_upload_completion(uploaded_version);
@@ -1793,15 +1694,6 @@ void SessionWrapper::handle_pending_client_reset_acknowledgement()
17931694
});
17941695
}
17951696

1796-
void SessionWrapper::update_subscription_version_info()
1797-
{
1798-
if (!m_flx_subscription_store)
1799-
return;
1800-
auto versions_info = m_flx_subscription_store->get_version_info();
1801-
m_flx_active_version = versions_info.active;
1802-
m_flx_pending_mark_version = versions_info.pending_mark;
1803-
}
1804-
18051697
std::string SessionWrapper::get_appservices_connection_id()
18061698
{
18071699
auto pf = util::make_promise_future<std::string>();

src/realm/sync/noinst/client_history_impl.cpp

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -423,7 +423,7 @@ void ClientHistory::integrate_server_changesets(
423423
const SyncProgress& progress, DownloadableProgress downloadable_bytes,
424424
util::Span<const RemoteChangeset> incoming_changesets, VersionInfo& version_info, DownloadBatchState batch_state,
425425
util::Logger& logger, const TransactionRef& transact,
426-
util::UniqueFunction<void(const TransactionRef&, util::Span<Changeset>)> run_in_write_tr)
426+
util::UniqueFunction<void(const Transaction&, util::Span<Changeset>)> run_in_write_tr)
427427
{
428428
REALM_ASSERT(incoming_changesets.size() != 0);
429429
REALM_ASSERT(
@@ -502,7 +502,7 @@ void ClientHistory::integrate_server_changesets(
502502
update_sync_progress(progress, downloadable_bytes); // Throws
503503
}
504504
if (run_in_write_tr) {
505-
run_in_write_tr(transact, changesets_for_cb);
505+
run_in_write_tr(*transact, changesets_for_cb);
506506
}
507507

508508
// The reason we can use the `origin_timestamp`, and the `origin_file_ident`
@@ -610,14 +610,13 @@ size_t ClientHistory::transform_and_apply_server_changesets(util::Span<Changeset
610610
}
611611

612612

613-
void ClientHistory::get_upload_download_state(DB& db, std::uint_fast64_t& downloaded_bytes,
613+
void ClientHistory::get_upload_download_state(Transaction& rt, Allocator& alloc, std::uint_fast64_t& downloaded_bytes,
614614
DownloadableProgress& downloadable_bytes,
615615
std::uint_fast64_t& uploaded_bytes,
616616
std::uint_fast64_t& uploadable_bytes,
617617
std::uint_fast64_t& snapshot_version, version_type& uploaded_version)
618618
{
619-
TransactionRef rt = db.start_read(); // Throws
620-
version_type current_client_version = rt->get_version();
619+
version_type current_client_version = rt.get_version();
621620

622621
downloaded_bytes = 0;
623622
downloadable_bytes = uint64_t(0);
@@ -627,11 +626,11 @@ void ClientHistory::get_upload_download_state(DB& db, std::uint_fast64_t& downlo
627626
uploaded_version = 0;
628627

629628
using gf = _impl::GroupFriend;
630-
ref_type ref = gf::get_history_ref(*rt);
629+
ref_type ref = gf::get_history_ref(rt);
631630
if (!ref)
632631
return;
633632

634-
Array root(db.get_alloc());
633+
Array root(alloc);
635634
root.init_from_ref(ref);
636635
downloaded_bytes = root.get_as_ref_or_tagged(s_progress_downloaded_bytes_iip).get_as_int();
637636
downloadable_bytes = root.get_as_ref_or_tagged(s_progress_downloadable_bytes_iip).get_as_int();
@@ -642,9 +641,9 @@ void ClientHistory::get_upload_download_state(DB& db, std::uint_fast64_t& downlo
642641
if (uploaded_version == current_client_version)
643642
return;
644643

645-
BinaryColumn changesets(db.get_alloc());
644+
BinaryColumn changesets(alloc);
646645
changesets.init_from_ref(root.get_as_ref(s_changesets_iip));
647-
IntegerBpTree origin_file_idents(db.get_alloc());
646+
IntegerBpTree origin_file_idents(alloc);
648647
origin_file_idents.init_from_ref(root.get_as_ref(s_origin_file_idents_iip));
649648

650649
// `base_version` is the oldest version we have history for. If this is

0 commit comments

Comments
 (0)