Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

#define CLUSTER_SLOT_MASK_BITS 14 /* Number of bits used for slot id. */
#define CLUSTER_SLOTS (1 << CLUSTER_SLOT_MASK_BITS) /* Total number of slots in cluster mode, which is 16384. */
#define CLUSTER_SLOT_WORDS (CLUSTER_SLOTS / 64) /* Treating slot bitmaps as 8-byte words */
#define CLUSTER_OK 0 /* Everything looks ok */
#define CLUSTER_FAIL 1 /* The cluster can't work */
#define CLUSTER_NAMELEN 40 /* sha1 hex length */
Expand Down
79 changes: 49 additions & 30 deletions src/cluster_legacy.c
Original file line number Diff line number Diff line change
Expand Up @@ -4105,20 +4105,30 @@ int clusterProcessPacket(clusterLink *link) {
* new configuration, so other nodes that have an updated table must
* do it. In this way A will stop to act as a primary (or can try to
* failover if there are the conditions to win the election). */
for (int j = 0; j < CLUSTER_SLOTS; j++) {
if (bitmapTestBit(hdr->myslots, j)) {
if (server.cluster->slots[j] == sender || isSlotUnclaimed(j)) continue;
if (server.cluster->slots[j]->configEpoch > sender_claimed_config_epoch) {
int found_new_owner = 0;
for (size_t w = 0; w < CLUSTER_SLOT_WORDS && !found_new_owner; w++) {
uint64_t word;
memcpy(&word, hdr->myslots + (w << 3), sizeof(word));
while (word) {
/* Get the index of the least-significant set bit, in this 64-bit word */
const unsigned bit = (unsigned)__builtin_ctzll(word);
const int slot = (int)((w << 6) | bit);
word &= word - 1; /* clear that bit */

clusterNode *slot_owner = server.cluster->slots[slot];
if (slot_owner == sender || isSlotUnclaimed(slot)) continue;
if (slot_owner->configEpoch > sender_claimed_config_epoch) {
serverLog(LL_VERBOSE,
"Node %.40s (%s) has old slots configuration, sending "
"an UPDATE message about %.40s (%s)",
sender->name, sender->human_nodename,
server.cluster->slots[j]->name, server.cluster->slots[j]->human_nodename);
clusterSendUpdate(sender->link, server.cluster->slots[j]);
slot_owner->name, slot_owner->human_nodename);
clusterSendUpdate(sender->link, slot_owner);

/* TODO: instead of exiting the loop send every other
* UPDATE packet for other nodes that are the new owner
* of sender's slots. */
found_new_owner = 1;
break;
}
}
Expand Down Expand Up @@ -5003,7 +5013,8 @@ void clusterSendFailoverAuthIfNeeded(clusterNode *node, clusterMsg *request) {
uint64_t requestConfigEpoch = ntohu64(request->configEpoch);
unsigned char *claimed_slots = request->myslots;
int force_ack = request->mflags[0] & CLUSTERMSG_FLAG0_FORCEACK;
int j;
int slot;
clusterNode *slot_owner;

/* IF we are not a primary serving at least 1 slot, we don't have the
* right to vote, as the cluster size is the number
Expand Down Expand Up @@ -5049,30 +5060,38 @@ void clusterSendFailoverAuthIfNeeded(clusterNode *node, clusterMsg *request) {
/* The replica requesting the vote must have a configEpoch for the claimed
* slots that is >= the one of the primaries currently serving the same
* slots in the current configuration. */
for (j = 0; j < CLUSTER_SLOTS; j++) {
if (bitmapTestBit(claimed_slots, j) == 0) continue;
if (isSlotUnclaimed(j) || server.cluster->slots[j]->configEpoch <= requestConfigEpoch) {
continue;
for (size_t w = 0; w < CLUSTER_SLOT_WORDS; w++) {
uint64_t word;
memcpy(&word, claimed_slots + (w << 3), sizeof(word));
while (word) {
/* Get the index of the least-significant set bit, in this 64-bit word */
const unsigned bit = (unsigned)__builtin_ctzll(word);
slot = (int)((w << 6) | bit);
word &= word - 1; /* clear that bit */

if (isSlotUnclaimed(slot) || server.cluster->slots[slot]->configEpoch <= requestConfigEpoch) {
continue;
}
slot_owner = server.cluster->slots[slot];
/* If we reached this point we found a slot that in our current slots
* is served by a primary with a greater configEpoch than the one claimed
* by the replica requesting our vote. Refuse to vote for this replica. */
serverLog(LL_WARNING,
"Failover auth denied to %.40s (%s): "
"slot %d epoch (%llu) > reqConfigEpoch (%llu)",
node->name, node->human_nodename, slot, (unsigned long long)slot_owner->configEpoch,
(unsigned long long)requestConfigEpoch);

/* Send an UPDATE message to the replica. After receiving the UPDATE message,
* the replica will update the slots config so that it can initiate a failover
* again later. Otherwise the replica will never get votes if the primary is down. */
serverLog(LL_VERBOSE,
"Node %.40s (%s) has old slots configuration, sending "
"an UPDATE message about %.40s (%s)",
node->name, node->human_nodename, slot_owner->name, slot_owner->human_nodename);
clusterSendUpdate(node->link, slot_owner);
return;
}
/* If we reached this point we found a slot that in our current slots
* is served by a primary with a greater configEpoch than the one claimed
* by the replica requesting our vote. Refuse to vote for this replica. */
serverLog(LL_WARNING,
"Failover auth denied to %.40s (%s): "
"slot %d epoch (%llu) > reqConfigEpoch (%llu)",
node->name, node->human_nodename, j, (unsigned long long)server.cluster->slots[j]->configEpoch,
(unsigned long long)requestConfigEpoch);

/* Send an UPDATE message to the replica. After receiving the UPDATE message,
* the replica will update the slots config so that it can initiate a failover
* again later. Otherwise the replica will never get votes if the primary is down. */
serverLog(LL_VERBOSE,
"Node %.40s (%s) has old slots configuration, sending "
"an UPDATE message about %.40s (%s)",
node->name, node->human_nodename,
server.cluster->slots[j]->name, server.cluster->slots[j]->human_nodename);
clusterSendUpdate(node->link, server.cluster->slots[j]);
return;
}

/* We can vote for this replica. */
Expand Down
Loading