Skip to content

Commit dfa5f33

Browse files
committed
Extract some duplicated code for sync triggers and timers
1 parent bf62783 commit dfa5f33

File tree

7 files changed

+101
-162
lines changed

7 files changed

+101
-162
lines changed

CHANGELOG.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
-----------
1818

1919
### Internals
20-
* None.
20+
* Refactor the implementation of sync triggers and timers to eliminate some duplicated code. ([PR #7912](https://github.com/realm/realm-core/pull/7912))
2121

2222
----------------------------------------------
2323

src/realm/sync/client.cpp

Lines changed: 7 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -494,10 +494,9 @@ void ClientImpl::register_unactualized_session_wrapper(SessionWrapper* wrapper)
494494
return;
495495
}
496496

497-
REALM_ASSERT(m_actualize_and_finalize);
498497
m_unactualized_session_wrappers.push(util::bind_ptr(wrapper));
499498
}
500-
m_actualize_and_finalize->trigger();
499+
m_actualize_and_finalize.trigger();
501500
}
502501

503502

@@ -506,7 +505,6 @@ void ClientImpl::register_abandoned_session_wrapper(util::bind_ptr<SessionWrappe
506505
// Thread safety required.
507506
{
508507
util::CheckedLockGuard lock{m_mutex};
509-
REALM_ASSERT(m_actualize_and_finalize);
510508
// The wrapper may have already been finalized before being abandoned
511509
// if we were stopped when it was created.
512510
if (wrapper->mark_abandoned())
@@ -522,7 +520,7 @@ void ClientImpl::register_abandoned_session_wrapper(util::bind_ptr<SessionWrappe
522520
}
523521
m_abandoned_session_wrappers.push(std::move(wrapper));
524522
}
525-
m_actualize_and_finalize->trigger();
523+
m_actualize_and_finalize.trigger();
526524
}
527525

528526

@@ -1731,23 +1729,12 @@ ClientImpl::Connection::Connection(ClientImpl& client, connection_ident_type ide
17311729
, m_ssl_verify_callback{std::move(ssl_verify_callback)} // DEPRECATED
17321730
, m_proxy_config{std::move(proxy_config)} // DEPRECATED
17331731
, m_reconnect_info{reconnect_info}
1732+
, m_on_idle{m_client, &Connection::on_idle, this}
17341733
, m_ident{ident}
17351734
, m_server_endpoint{std::move(endpoint)}
17361735
, m_authorization_header_name{authorization_header_name} // DEPRECATED
17371736
, m_custom_http_headers{custom_http_headers} // DEPRECATED
17381737
{
1739-
m_on_idle = m_client.create_trigger([this](Status status) {
1740-
if (status == ErrorCodes::OperationAborted)
1741-
return;
1742-
else if (!status.is_ok())
1743-
throw Exception(status);
1744-
1745-
REALM_ASSERT(m_activated);
1746-
if (m_state == ConnectionState::disconnected && m_num_active_sessions == 0) {
1747-
on_idle(); // Throws
1748-
// Connection object may be destroyed now.
1749-
}
1750-
});
17511738
}
17521739

17531740
inline connection_ident_type ClientImpl::Connection::get_ident() const noexcept
@@ -1779,6 +1766,10 @@ void ClientImpl::Connection::resume_active_sessions()
17791766

17801767
void ClientImpl::Connection::on_idle()
17811768
{
1769+
REALM_ASSERT(m_activated);
1770+
if (m_state != ConnectionState::disconnected || m_num_active_sessions != 0)
1771+
return;
1772+
17821773
logger.debug(util::LogCategory::session, "Destroying connection object");
17831774
ClientImpl& client = get_client();
17841775
client.remove_connection(*this);

src/realm/sync/noinst/client_impl_base.cpp

Lines changed: 26 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -149,9 +149,8 @@ ClientImpl::ClientImpl(ClientConfig config)
149149
, m_fix_up_object_ids{config.fix_up_object_ids}
150150
, m_roundtrip_time_handler{std::move(config.roundtrip_time_handler)}
151151
, m_socket_provider{std::move(config.socket_provider)}
152-
, m_client_protocol{} // Throws
153152
, m_one_connection_per_session{config.one_connection_per_session}
154-
, m_random{}
153+
, m_actualize_and_finalize{*this, &ClientImpl::actualize_and_finalize_session_wrappers, this}
155154
{
156155
// FIXME: Would be better if seeding was up to the application.
157156
util::seed_prng_nondeterministically(m_random); // Throws
@@ -213,14 +212,6 @@ ClientImpl::ClientImpl(ClientConfig config)
213212
logger.warn("Testing/debugging feature 'disable_sync_to_disk' enabled - "
214213
"never do this in production");
215214
}
216-
217-
m_actualize_and_finalize = create_trigger([this](Status status) {
218-
if (status == ErrorCodes::OperationAborted)
219-
return;
220-
else if (!status.is_ok())
221-
throw Exception(status);
222-
actualize_and_finalize_session_wrappers(); // Throws
223-
});
224215
}
225216

226217
void ClientImpl::incr_outstanding_posts()
@@ -290,25 +281,23 @@ void ClientImpl::drain_connections()
290281

291282

292283
SyncSocketProvider::SyncTimer ClientImpl::create_timer(std::chrono::milliseconds delay,
293-
SyncSocketProvider::FunctionHandler&& handler)
284+
util::UniqueFunction<void()>&& handler)
294285
{
295286
REALM_ASSERT(m_socket_provider);
296287
incr_outstanding_posts();
297288
return m_socket_provider->create_timer(delay, [handler = std::move(handler), this](Status status) {
298-
auto decr_guard = util::make_scope_exit([&]() noexcept {
289+
ScopeExit decr_guard([&]() noexcept {
299290
decr_outstanding_posts();
300291
});
301-
handler(status);
292+
if (status == ErrorCodes::OperationAborted)
293+
return;
294+
if (!status.is_ok())
295+
throw Exception(status);
296+
handler();
302297
});
303298
}
304299

305300

306-
ClientImpl::SyncTrigger ClientImpl::create_trigger(SyncSocketProvider::FunctionHandler&& handler)
307-
{
308-
REALM_ASSERT(m_socket_provider);
309-
return std::make_unique<Trigger<ClientImpl>>(this, std::move(handler));
310-
}
311-
312301
Connection::~Connection()
313302
{
314303
if (m_websocket_sentinel) {
@@ -319,10 +308,9 @@ Connection::~Connection()
319308

320309
void Connection::activate()
321310
{
322-
REALM_ASSERT(m_on_idle);
323311
m_activated = true;
324312
if (m_num_active_sessions == 0)
325-
m_on_idle->trigger();
313+
m_on_idle.trigger();
326314
// We cannot in general connect immediately, because a prior failure to
327315
// connect may require a delay before reconnecting (see `m_reconnect_info`).
328316
initiate_reconnect_wait(); // Throws
@@ -364,7 +352,7 @@ void Connection::initiate_session_deactivation(Session* sess)
364352
}
365353
if (REALM_UNLIKELY(--m_num_active_sessions == 0)) {
366354
if (m_activated && m_state == ConnectionState::disconnected)
367-
m_on_idle->trigger();
355+
m_on_idle.trigger();
368356
}
369357
}
370358

@@ -679,22 +667,14 @@ void Connection::initiate_reconnect_wait()
679667
// We create a timer for the reconnect_disconnect timer even if the delay is zero because
680668
// we need it to be cancelable in case the connection is terminated before the timer
681669
// callback is run.
682-
m_reconnect_disconnect_timer = m_client.create_timer(delay, [this](Status status) {
683-
// If the operation is aborted, the connection object may have been
684-
// destroyed.
685-
if (status != ErrorCodes::OperationAborted)
686-
handle_reconnect_wait(status); // Throws
687-
}); // Throws
670+
m_reconnect_disconnect_timer = m_client.create_timer(delay, [this] {
671+
handle_reconnect_wait(); // Throws
672+
}); // Throws
688673
}
689674

690675

691-
void Connection::handle_reconnect_wait(Status status)
676+
void Connection::handle_reconnect_wait()
692677
{
693-
if (!status.is_ok()) {
694-
REALM_ASSERT(status != ErrorCodes::OperationAborted);
695-
throw Exception(status);
696-
}
697-
698678
REALM_ASSERT(m_reconnect_delay_in_progress);
699679
m_reconnect_delay_in_progress = false;
700680

@@ -806,24 +786,15 @@ void Connection::initiate_connect_wait()
806786
// fully establish the connection (including SSL and WebSocket
807787
// handshakes). Without such a watchdog, connect operations could take very
808788
// long, or even indefinite time.
809-
milliseconds_type time = m_client.m_connect_timeout;
810-
811-
m_connect_timer = m_client.create_timer(std::chrono::milliseconds(time), [this](Status status) {
812-
// If the operation is aborted, the connection object may have been
813-
// destroyed.
814-
if (status != ErrorCodes::OperationAborted)
815-
handle_connect_wait(status); // Throws
816-
}); // Throws
789+
std::chrono::milliseconds time(m_client.m_connect_timeout);
790+
m_connect_timer = m_client.create_timer(time, [this] {
791+
handle_connect_wait(); // Throws
792+
}); // Throws
817793
}
818794

819795

820-
void Connection::handle_connect_wait(Status status)
796+
void Connection::handle_connect_wait()
821797
{
822-
if (!status.is_ok()) {
823-
REALM_ASSERT(status != ErrorCodes::OperationAborted);
824-
throw Exception(status);
825-
}
826-
827798
REALM_ASSERT_EX(m_state == ConnectionState::connecting, m_state);
828799
logger.info("Connect timeout"); // Throws
829800
SessionErrorInfo error_info({ErrorCodes::SyncConnectTimeout, "Sync connection was not fully established in time"},
@@ -917,12 +888,7 @@ void Connection::initiate_ping_delay(milliseconds_type now)
917888

918889
m_ping_delay_in_progress = true;
919890

920-
m_heartbeat_timer = m_client.create_timer(std::chrono::milliseconds(delay), [this](Status status) {
921-
if (status == ErrorCodes::OperationAborted)
922-
return;
923-
else if (!status.is_ok())
924-
throw Exception(status);
925-
891+
m_heartbeat_timer = m_client.create_timer(std::chrono::milliseconds(delay), [this] {
926892
handle_ping_delay(); // Throws
927893
}); // Throws
928894
logger.debug("Will emit a ping in %1 milliseconds", delay); // Throws
@@ -952,12 +918,7 @@ void Connection::initiate_pong_timeout()
952918
m_pong_wait_started_at = monotonic_clock_now();
953919

954920
milliseconds_type time = m_client.m_pong_keepalive_timeout;
955-
m_heartbeat_timer = m_client.create_timer(std::chrono::milliseconds(time), [this](Status status) {
956-
if (status == ErrorCodes::OperationAborted)
957-
return;
958-
else if (!status.is_ok())
959-
throw Exception(status);
960-
921+
m_heartbeat_timer = m_client.create_timer(std::chrono::milliseconds(time), [this] {
961922
handle_pong_timeout(); // Throws
962923
}); // Throws
963924
}
@@ -1108,23 +1069,15 @@ void Connection::initiate_disconnect_wait()
11081069

11091070
milliseconds_type time = m_client.m_connection_linger_time;
11101071

1111-
m_reconnect_disconnect_timer = m_client.create_timer(std::chrono::milliseconds(time), [this](Status status) {
1112-
// If the operation is aborted, the connection object may have been
1113-
// destroyed.
1114-
if (status != ErrorCodes::OperationAborted)
1115-
handle_disconnect_wait(status); // Throws
1116-
}); // Throws
1072+
m_reconnect_disconnect_timer = m_client.create_timer(std::chrono::milliseconds(time), [this] {
1073+
handle_disconnect_wait(); // Throws
1074+
}); // Throws
11171075
m_disconnect_delay_in_progress = true;
11181076
}
11191077

11201078

1121-
void Connection::handle_disconnect_wait(Status status)
1079+
void Connection::handle_disconnect_wait()
11221080
{
1123-
if (!status.is_ok()) {
1124-
REALM_ASSERT(status != ErrorCodes::OperationAborted);
1125-
throw Exception(status);
1126-
}
1127-
11281081
m_disconnect_delay_in_progress = false;
11291082

11301083
REALM_ASSERT_EX(m_state != ConnectionState::disconnected, m_state);
@@ -2649,12 +2602,7 @@ void Session::begin_resumption_delay(const ProtocolErrorInfo& error_info)
26492602
try_again_interval = std::chrono::milliseconds{1000};
26502603
}
26512604
logger.debug("Will attempt to resume session after %1 milliseconds", try_again_interval.count());
2652-
m_try_again_activation_timer = get_client().create_timer(try_again_interval, [this](Status status) {
2653-
if (status == ErrorCodes::OperationAborted)
2654-
return;
2655-
else if (!status.is_ok())
2656-
throw Exception(status);
2657-
2605+
m_try_again_activation_timer = get_client().create_timer(try_again_interval, [this] {
26582606
m_try_again_activation_timer.reset();
26592607
cancel_resumption_delay();
26602608
});

src/realm/sync/noinst/client_impl_base.hpp

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -231,10 +231,8 @@ class ClientImpl {
231231
void post(SyncSocketProvider::FunctionHandler&& handler) REQUIRES(!m_drain_mutex);
232232
void post(util::UniqueFunction<void()>&& handler) REQUIRES(!m_drain_mutex);
233233
SyncSocketProvider::SyncTimer create_timer(std::chrono::milliseconds delay,
234-
SyncSocketProvider::FunctionHandler&& handler)
235-
REQUIRES(!m_drain_mutex);
236-
using SyncTrigger = std::unique_ptr<Trigger<ClientImpl>>;
237-
SyncTrigger create_trigger(SyncSocketProvider::FunctionHandler&& handler);
234+
util::UniqueFunction<void()>&& handler) REQUIRES(!m_drain_mutex);
235+
using SyncTrigger = Trigger<ClientImpl>;
238236

239237
RandomEngine& get_random() noexcept;
240238

@@ -534,10 +532,10 @@ class ClientImpl::Connection {
534532
std::string get_http_request_path() const;
535533

536534
void initiate_reconnect_wait();
537-
void handle_reconnect_wait(Status status);
535+
void handle_reconnect_wait();
538536
void initiate_reconnect();
539537
void initiate_connect_wait();
540-
void handle_connect_wait(Status status);
538+
void handle_connect_wait();
541539

542540
void handle_connection_established();
543541
void schedule_urgent_ping();
@@ -553,7 +551,7 @@ class ClientImpl::Connection {
553551
void handle_write_ping();
554552
void handle_message_received(util::Span<const char> data);
555553
void initiate_disconnect_wait();
556-
void handle_disconnect_wait(Status status);
554+
void handle_disconnect_wait();
557555
void close_due_to_protocol_error(Status status);
558556
void close_due_to_client_side_error(Status, IsFatal is_fatal, ConnectionTerminationReason reason);
559557
void close_due_to_transient_error(Status status, ConnectionTerminationReason reason);
@@ -1227,12 +1225,11 @@ inline void ClientImpl::Connection::involuntary_disconnect(const SessionErrorInf
12271225

12281226
inline void ClientImpl::Connection::change_state_to_disconnected() noexcept
12291227
{
1230-
REALM_ASSERT(m_on_idle);
12311228
REALM_ASSERT(m_state != ConnectionState::disconnected);
12321229
m_state = ConnectionState::disconnected;
12331230

12341231
if (m_num_active_sessions == 0)
1235-
m_on_idle->trigger();
1232+
m_on_idle.trigger();
12361233

12371234
REALM_ASSERT(!m_reconnect_delay_in_progress);
12381235
if (m_disconnect_delay_in_progress) {

0 commit comments

Comments
 (0)