Skip to content

Commit 7ae9f65

Browse files
Connection Error Tracking & Auto-Reconnect (#320)
* Connection Error Tracking & Auto-Reconnect * Fixes per PR review: refactored retry logic to eliminate code duplication.
1 parent aa9edb8 commit 7ae9f65

15 files changed

+732
-18
lines changed

.gitignore

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ config.h.in~
66
config.h
77
config.sub
88
configure
9+
configure~
910
compile
1011
depcomp
1112
install-sh
@@ -33,9 +34,18 @@ tests/tls/*
3334
*.txt
3435
!/tests/test_requirements.txt
3536
__pycache__
37+
*.csv
38+
*.json
3639

3740
# Code coverage with lcov/gcov
3841
*.gcno
3942
*.gcov
4043
*.gcda
4144
*.info
45+
46+
# redis related
47+
*.rdb
48+
*.aof
49+
appendonlydir/
50+
*.conf
51+

client.cpp

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -622,6 +622,18 @@ int client_group::create_clients(int num)
622622
}
623623

624624
m_clients.push_back(c);
625+
626+
// Add jitter between connection creation (except for the last connection)
627+
if (i < num - 1 && m_config->thread_conn_start_max_jitter_micros > 0) {
628+
unsigned int jitter_range = m_config->thread_conn_start_max_jitter_micros - m_config->thread_conn_start_min_jitter_micros;
629+
unsigned int jitter_micros = m_config->thread_conn_start_min_jitter_micros;
630+
631+
if (jitter_range > 0) {
632+
jitter_micros += rand() % (jitter_range + 1);
633+
}
634+
635+
usleep(jitter_micros);
636+
}
625637
}
626638

627639
return num;
@@ -714,6 +726,16 @@ unsigned long int client_group::get_duration_usec(void)
714726
return duration;
715727
}
716728

729+
unsigned long int client_group::get_total_connection_errors(void)
730+
{
731+
unsigned long int total_errors = 0;
732+
for (std::vector<client*>::iterator i = m_clients.begin(); i != m_clients.end(); i++) {
733+
total_errors += (*i)->get_stats()->get_total_connection_errors();
734+
}
735+
736+
return total_errors;
737+
}
738+
717739
void client_group::merge_run_stats(run_stats* target)
718740
{
719741
assert(target != NULL);

client.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -219,12 +219,13 @@ class client_group {
219219
struct event_base *get_event_base(void) { return m_base; }
220220
benchmark_config *get_config(void) { return m_config; }
221221
abstract_protocol* get_protocol(void) { return m_protocol; }
222-
object_generator* get_obj_gen(void) { return m_obj_gen; }
222+
object_generator* get_obj_gen(void) { return m_obj_gen; }
223223

224224
unsigned long int get_total_bytes(void);
225225
unsigned long int get_total_ops(void);
226226
unsigned long int get_total_latency(void);
227227
unsigned long int get_duration_usec(void);
228+
unsigned long int get_total_connection_errors(void);
228229

229230
void merge_run_stats(run_stats* target);
230231
};

cluster_client.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -209,6 +209,7 @@ bool cluster_client::connect_shard_connection(shard_connection* sc, char* addres
209209
memcpy(ci.addr_buf, addr_info->ai_addr, addr_info->ai_addrlen);
210210
ci.ci_addr = (struct sockaddr *) ci.addr_buf;
211211
ci.ci_addrlen = addr_info->ai_addrlen;
212+
212213
freeaddrinfo(addr_info);
213214

214215
// call connect
@@ -497,4 +498,3 @@ void cluster_client::handle_response(unsigned int conn_id, struct timeval timest
497498
// continue with base class
498499
client::handle_response(conn_id, timestamp, request, response);
499500
}
500-

memtier_benchmark.1

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,24 @@ Number of concurrent pipelined requests (default: 1)
128128
\fB\-\-reconnect\-interval\fR=\fI\,NUM\/\fR
129129
Number of requests after which re\-connection is performed
130130
.TP
131+
\fB\-\-reconnect\-on\-error\fR
132+
Enable automatic reconnection on connection errors (default: disabled)
133+
.TP
134+
\fB\-\-max\-reconnect\-attempts\fR=\fI\,NUM\/\fR
135+
Maximum number of reconnection attempts, 0 for unlimited (default: 0)
136+
.TP
137+
\fB\-\-reconnect\-backoff\-factor\fR=\fI\,NUM\/\fR
138+
Backoff factor for reconnection delays, 0 for no backoff (default: 0)
139+
.TP
140+
\fB\-\-connection\-timeout\fR=\fI\,SECS\/\fR
141+
Connection timeout in seconds, 0 to disable (default: 0)
142+
.TP
143+
\fB\-\-thread\-conn\-start\-min\-jitter\-micros\fR=\fI\,NUM\/\fR
144+
Minimum jitter in microseconds between connection creation (default: 0)
145+
.TP
146+
\fB\-\-thread\-conn\-start\-max\-jitter\-micros\fR=\fI\,NUM\/\fR
147+
Maximum jitter in microseconds between connection creation (default: 0)
148+
.TP
131149
\fB\-\-multi\-key\-get\fR=\fI\,NUM\/\fR
132150
Enable multi\-key get commands, up to NUM keys (default: 0)
133151
.TP

memtier_benchmark.cpp

Lines changed: 159 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ static void sigint_handler(int signum)
7676
(void)signum; // unused parameter
7777
g_interrupted = 1;
7878
}
79+
7980
void benchmark_log_file_line(int level, const char *filename, unsigned int line, const char *fmt, ...)
8081
{
8182
if (level > log_level)
@@ -165,6 +166,9 @@ static void config_print(FILE *file, struct benchmark_config *cfg)
165166
"key_stddev = %f\n"
166167
"key_median = %f\n"
167168
"reconnect_interval = %u\n"
169+
"connection_timeout = %u\n"
170+
"thread_conn_start_min_jitter_micros = %u\n"
171+
"thread_conn_start_max_jitter_micros = %u\n"
168172
"multi_key_get = %u\n"
169173
"authenticate = %s\n"
170174
"select-db = %d\n"
@@ -217,6 +221,9 @@ static void config_print(FILE *file, struct benchmark_config *cfg)
217221
cfg->key_stddev,
218222
cfg->key_median,
219223
cfg->reconnect_interval,
224+
cfg->connection_timeout,
225+
cfg->thread_conn_start_min_jitter_micros,
226+
cfg->thread_conn_start_max_jitter_micros,
220227
cfg->multi_key_get,
221228
cfg->authenticate ? cfg->authenticate : "",
222229
cfg->select_db,
@@ -278,6 +285,9 @@ static void config_print_to_json(json_handler * jsonhandler, struct benchmark_co
278285
jsonhandler->write_obj("key_median" ,"%f", cfg->key_median);
279286
jsonhandler->write_obj("key_zipf_exp" ,"%f", cfg->key_zipf_exp);
280287
jsonhandler->write_obj("reconnect_interval","%u", cfg->reconnect_interval);
288+
jsonhandler->write_obj("connection_timeout","%u", cfg->connection_timeout);
289+
jsonhandler->write_obj("thread_conn_start_min_jitter_micros","%u", cfg->thread_conn_start_min_jitter_micros);
290+
jsonhandler->write_obj("thread_conn_start_max_jitter_micros","%u", cfg->thread_conn_start_max_jitter_micros);
281291
jsonhandler->write_obj("multi_key_get" ,"%u", cfg->multi_key_get);
282292
jsonhandler->write_obj("authenticate" ,"\"%s\"", cfg->authenticate ? cfg->authenticate : "");
283293
jsonhandler->write_obj("select-db" ,"%d", cfg->select_db);
@@ -449,6 +459,7 @@ static void config_init_defaults(struct benchmark_config *cfg)
449459
cfg->hdr_prefix = "";
450460
if (!cfg->print_percentiles.is_defined())
451461
cfg->print_percentiles = config_quantiles("50,99,99.9");
462+
452463
#ifdef USE_TLS
453464
if (!cfg->tls_protocols)
454465
cfg->tls_protocols = REDIS_TLS_PROTO_DEFAULT;
@@ -545,6 +556,12 @@ static int config_parse_args(int argc, char *argv[], struct benchmark_config *cf
545556
o_randomize,
546557
o_client_stats,
547558
o_reconnect_interval,
559+
o_reconnect_on_error,
560+
o_max_reconnect_attempts,
561+
o_reconnect_backoff_factor,
562+
o_connection_timeout,
563+
o_thread_conn_start_min_jitter_micros,
564+
o_thread_conn_start_max_jitter_micros,
548565
o_generate_keys,
549566
o_multi_key_get,
550567
o_select_db,
@@ -623,6 +640,12 @@ static int config_parse_args(int argc, char *argv[], struct benchmark_config *cf
623640
{ "key-median", 1, 0, o_key_median },
624641
{ "key-zipf-exp", 1, 0, o_key_zipf_exp},
625642
{ "reconnect-interval", 1, 0, o_reconnect_interval },
643+
{ "reconnect-on-error", 0, 0, o_reconnect_on_error },
644+
{ "max-reconnect-attempts", 1, 0, o_max_reconnect_attempts },
645+
{ "reconnect-backoff-factor", 1, 0, o_reconnect_backoff_factor },
646+
{ "connection-timeout", 1, 0, o_connection_timeout },
647+
{ "thread-conn-start-min-jitter-micros", 1, 0, o_thread_conn_start_min_jitter_micros },
648+
{ "thread-conn-start-max-jitter-micros", 1, 0, o_thread_conn_start_max_jitter_micros },
626649
{ "multi-key-get", 1, 0, o_multi_key_get },
627650
{ "authenticate", 1, 0, 'a' },
628651
{ "select-db", 1, 0, o_select_db },
@@ -933,6 +956,49 @@ static int config_parse_args(int argc, char *argv[], struct benchmark_config *cf
933956
return -1;
934957
}
935958
break;
959+
case o_reconnect_on_error:
960+
cfg->reconnect_on_error = true;
961+
break;
962+
case o_max_reconnect_attempts:
963+
endptr = NULL;
964+
cfg->max_reconnect_attempts = (unsigned int) strtoul(optarg, &endptr, 10);
965+
if (!endptr || *endptr != '\0') {
966+
fprintf(stderr, "error: max-reconnect-attempts must be a valid number.\n");
967+
return -1;
968+
}
969+
break;
970+
case o_reconnect_backoff_factor:
971+
endptr = NULL;
972+
cfg->reconnect_backoff_factor = strtod(optarg, &endptr);
973+
if (cfg->reconnect_backoff_factor <= 0.0 || !endptr || *endptr != '\0') {
974+
fprintf(stderr, "error: reconnect-backoff-factor must be greater than zero.\n");
975+
return -1;
976+
}
977+
break;
978+
case o_connection_timeout:
979+
endptr = NULL;
980+
cfg->connection_timeout = (unsigned int) strtoul(optarg, &endptr, 10);
981+
if (!endptr || *endptr != '\0') {
982+
fprintf(stderr, "error: connection-timeout must be a valid number.\n");
983+
return -1;
984+
}
985+
break;
986+
case o_thread_conn_start_min_jitter_micros:
987+
endptr = NULL;
988+
cfg->thread_conn_start_min_jitter_micros = (unsigned int) strtoul(optarg, &endptr, 10);
989+
if (!endptr || *endptr != '\0') {
990+
fprintf(stderr, "error: thread-conn-start-min-jitter-micros must be a valid number.\n");
991+
return -1;
992+
}
993+
break;
994+
case o_thread_conn_start_max_jitter_micros:
995+
endptr = NULL;
996+
cfg->thread_conn_start_max_jitter_micros = (unsigned int) strtoul(optarg, &endptr, 10);
997+
if (!endptr || *endptr != '\0') {
998+
fprintf(stderr, "error: thread-conn-start-max-jitter-micros must be a valid number.\n");
999+
return -1;
1000+
}
1001+
break;
9361002
case o_generate_keys:
9371003
cfg->generate_keys = 1;
9381004
break;
@@ -1156,6 +1222,12 @@ void usage() {
11561222
" --ratio=RATIO Set:Get ratio (default: 1:10)\n"
11571223
" --pipeline=NUMBER Number of concurrent pipelined requests (default: 1)\n"
11581224
" --reconnect-interval=NUM Number of requests after which re-connection is performed\n"
1225+
" --reconnect-on-error Enable automatic reconnection on connection errors (default: disabled)\n"
1226+
" --max-reconnect-attempts=NUM Maximum number of reconnection attempts (default: 0, unlimited)\n"
1227+
" --reconnect-backoff-factor=NUM Backoff factor for reconnection delays (default: 0, no backoff)\n"
1228+
" --connection-timeout=SECS Connection timeout in seconds, 0 to disable (default: 0)\n"
1229+
" --thread-conn-start-min-jitter-micros=NUM Minimum jitter in microseconds between connection creation (default: 0)\n"
1230+
" --thread-conn-start-max-jitter-micros=NUM Maximum jitter in microseconds between connection creation (default: 0)\n"
11591231
" --multi-key-get=NUM Enable multi-key get commands, up to NUM keys (default: 0)\n"
11601232
" --select-db=DB DB number to select, when testing a redis server\n"
11611233
" --distinct-client-seed Use a different random seed for each client\n"
@@ -1235,9 +1307,12 @@ struct cg_thread {
12351307
abstract_protocol* m_protocol;
12361308
pthread_t m_thread;
12371309
std::atomic<bool> m_finished; // Atomic to prevent data race between worker thread write and main thread read
1310+
bool m_restart_requested;
1311+
unsigned int m_restart_count;
12381312

12391313
cg_thread(unsigned int id, benchmark_config* config, object_generator* obj_gen) :
1240-
m_thread_id(id), m_config(config), m_obj_gen(obj_gen), m_cg(NULL), m_protocol(NULL), m_finished(false)
1314+
m_thread_id(id), m_config(config), m_obj_gen(obj_gen), m_cg(NULL), m_protocol(NULL),
1315+
m_finished(false), m_restart_requested(false), m_restart_count(0)
12411316
{
12421317
m_protocol = protocol_factory(m_config->protocol);
12431318
assert(m_protocol != NULL);
@@ -1276,13 +1351,57 @@ struct cg_thread {
12761351
assert(ret == 0);
12771352
}
12781353

1354+
int restart(void)
1355+
{
1356+
// Clean up existing client group
1357+
if (m_cg != NULL) {
1358+
delete m_cg;
1359+
}
1360+
1361+
// Create new client group
1362+
m_cg = new client_group(m_config, m_protocol, m_obj_gen);
1363+
1364+
// Prepare new clients
1365+
if (m_cg->create_clients(m_config->clients) < (int) m_config->clients)
1366+
return -1;
1367+
if (m_cg->prepare() < 0)
1368+
return -1;
1369+
1370+
// Reset state
1371+
m_finished = false;
1372+
m_restart_requested = false;
1373+
m_restart_count++;
1374+
1375+
// Start new thread
1376+
return pthread_create(&m_thread, NULL, cg_thread_start, (void *)this);
1377+
}
1378+
12791379
};
12801380

12811381
static void* cg_thread_start(void *t)
12821382
{
12831383
cg_thread* thread = (cg_thread*) t;
1284-
thread->m_cg->run();
1285-
thread->m_finished = true;
1384+
1385+
try {
1386+
thread->m_cg->run();
1387+
1388+
// Check if we should restart due to connection failures
1389+
// If the thread finished but still has time left and connection errors, request restart
1390+
if (thread->m_cg->get_total_connection_errors() > 0) {
1391+
benchmark_error_log("Thread %u finished due to connection failures, requesting restart.\n", thread->m_thread_id);
1392+
thread->m_restart_requested = true;
1393+
}
1394+
1395+
thread->m_finished = true;
1396+
} catch (const std::exception& e) {
1397+
benchmark_error_log("Thread %u caught exception: %s\n", thread->m_thread_id, e.what());
1398+
thread->m_finished = true;
1399+
thread->m_restart_requested = true;
1400+
} catch (...) {
1401+
benchmark_error_log("Thread %u caught unknown exception\n", thread->m_thread_id);
1402+
thread->m_finished = true;
1403+
thread->m_restart_requested = true;
1404+
}
12861405

12871406
return t;
12881407
}
@@ -1364,14 +1483,32 @@ run_stats run_benchmark(int run_id, benchmark_config* cfg, object_generator* obj
13641483
unsigned long int duration = 0;
13651484
unsigned int thread_counter = 0;
13661485
unsigned long int total_latency = 0;
1486+
unsigned long int total_connection_errors = 0;
13671487

13681488
for (std::vector<cg_thread*>::iterator i = threads.begin(); i != threads.end(); i++) {
1489+
// Check if thread needs restart
1490+
if ((*i)->m_finished && (*i)->m_restart_requested && (*i)->m_restart_count < 5) {
1491+
benchmark_error_log("Restarting thread %u (restart #%u)...\n",
1492+
(*i)->m_thread_id, (*i)->m_restart_count + 1);
1493+
1494+
// Join the failed thread first
1495+
(*i)->join();
1496+
1497+
// Attempt to restart
1498+
if ((*i)->restart() == 0) {
1499+
benchmark_error_log("Thread %u restarted successfully.\n", (*i)->m_thread_id);
1500+
} else {
1501+
benchmark_error_log("Failed to restart thread %u.\n", (*i)->m_thread_id);
1502+
}
1503+
}
1504+
13691505
if (!(*i)->m_finished)
13701506
active_threads++;
13711507

13721508
total_ops += (*i)->m_cg->get_total_ops();
13731509
total_bytes += (*i)->m_cg->get_total_bytes();
13741510
total_latency += (*i)->m_cg->get_total_latency();
1511+
total_connection_errors += (*i)->m_cg->get_total_connection_errors();
13751512
thread_counter++;
13761513
float factor = ((float)(thread_counter - 1) / thread_counter);
13771514
duration = factor * duration + (float)(*i)->m_cg->get_duration_usec() / thread_counter ;
@@ -1410,8 +1547,14 @@ run_stats run_benchmark(int run_id, benchmark_config* cfg, object_generator* obj
14101547
else
14111548
progress = 100.0 * (duration / 1000000.0)/cfg->test_time;
14121549

1413-
fprintf(stderr, "[RUN #%u %.0f%%, %3u secs] %2u threads: %11lu ops, %7lu (avg: %7lu) ops/sec, %s/sec (avg: %s/sec), %5.2f (avg: %5.2f) msec latency\r",
1414-
run_id, progress, (unsigned int) (duration / 1000000), active_threads, total_ops, cur_ops_sec, ops_sec, cur_bytes_str, bytes_str, cur_latency, avg_latency);
1550+
// Only show connection errors if there are any (backwards compatible output)
1551+
if (total_connection_errors > 0) {
1552+
fprintf(stderr, "[RUN #%u %.0f%%, %3u secs] %2u threads %2u conns %lu conn errors: %11lu ops, %7lu (avg: %7lu) ops/sec, %s/sec (avg: %s/sec), %5.2f (avg: %5.2f) msec latency\r",
1553+
run_id, progress, (unsigned int) (duration / 1000000), active_threads, cfg->clients, total_connection_errors, total_ops, cur_ops_sec, ops_sec, cur_bytes_str, bytes_str, cur_latency, avg_latency);
1554+
} else {
1555+
fprintf(stderr, "[RUN #%u %.0f%%, %3u secs] %2u threads %2u conns: %11lu ops, %7lu (avg: %7lu) ops/sec, %s/sec (avg: %s/sec), %5.2f (avg: %5.2f) msec latency\r",
1556+
run_id, progress, (unsigned int) (duration / 1000000), active_threads, cfg->clients, total_ops, cur_ops_sec, ops_sec, cur_bytes_str, bytes_str, cur_latency, avg_latency);
1557+
}
14151558
} while (active_threads > 0);
14161559

14171560
fprintf(stderr, "\n\n");
@@ -1569,6 +1712,14 @@ int main(int argc, char *argv[])
15691712
}
15701713

15711714
config_init_defaults(&cfg);
1715+
1716+
// Validate jitter parameters
1717+
if (cfg.thread_conn_start_min_jitter_micros > cfg.thread_conn_start_max_jitter_micros) {
1718+
fprintf(stderr, "error: thread-conn-start-min-jitter-micros (%u) cannot be greater than thread-conn-start-max-jitter-micros (%u).\n",
1719+
cfg.thread_conn_start_min_jitter_micros, cfg.thread_conn_start_max_jitter_micros);
1720+
exit(1);
1721+
}
1722+
15721723
log_level = cfg.debug;
15731724
if (cfg.show_config) {
15741725
fprintf(stderr, "============== Configuration values: ==============\n");
@@ -1981,6 +2132,9 @@ int main(int argc, char *argv[])
19812132
}
19822133

19832134
if (jsonhandler != NULL) {
2135+
// Log message for saving JSON file
2136+
fprintf(stderr, "Saving JSON output file: %s\n", cfg.json_out_file);
2137+
19842138
// closing the JSON
19852139
delete jsonhandler;
19862140
}

0 commit comments

Comments
 (0)