Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 56 additions & 30 deletions src/cluster_legacy.c
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,9 @@ static inline int defaultClientPort(void) {

#define isSlotUnclaimed(slot) \
(server.cluster->slots[slot] == NULL || bitmapTestBit(server.cluster->owner_not_claiming_slot, slot))
/* Treating slot bitmaps as 8-byte words to speedup iteration */
#define CLUSTER_SLOT_WORDS (CLUSTER_SLOTS / 64)
#define SLOT_WORD_OFFSET(w) ((w) << 3)

#define RCVBUF_INIT_LEN 1024
#define RCVBUF_MIN_READ_LEN 14
Expand Down Expand Up @@ -3596,6 +3599,16 @@ int clusterIsValidPacket(clusterLink *link) {
return 1;
}

/* When iterating through the slot bitmap, group every 64 bits as
* a word to speedup. */
static inline int clusterExtractSlotFromWord(uint64_t *slot_word, size_t slot_word_index) {
/* Get the index of the least-significant set bit, in this 64-bit word */
const unsigned bit = (unsigned)__builtin_ctzll(*slot_word);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we simplify the types a bit?

  • Why cast to unsigned here when the result is int and we mask onto an int below? Just leave it int.
  • Why is slot_word_index a size_t? Can't we just use int for that as well?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

re 1: If I do int bit = __builtin_ctzll(word);, the (slot_word_index << 6) | bit later would internally cast the bit object into unsigned because (slot_word_index << 6) is size_t, which is unsigned 64-bit. I'd like to avoid implicit type converting, generally speaking, but I guess it's okay to go either way here.

re 2: slot_word_index is meant to be used as an array index, and because indexing into an array is technically pointer arithmetic, which uses size_t, I declare slot_word_index as size_t.

The "array" here is not explicit though. Conceptually, we should have

uint64_t words[CLUSTER_SLOT_WORDS]; // CLUSTER_SLOT_WORDS = 256
// in each loop:
word = words[slot_word_index];

but in Valkey what we actually have is

unsigned char myslots[CLUSTER_SLOTS / 8];  // CLUSTER_SLOTS = 16384
// in each loop:
memcpy(&word, myslots + (slot_word_index << 3), sizeof(word));

Here slot_word_index is the implicit index of the array myslots.

Overall I use int for slot number (logical ID) and size_t for things that interact with memory layout (buffer sizes, indexes into buffers, offsets for pointer arithmetic). And slot_word_index lives in the “memory-ish” world, not just abstract logic. That’s why I suggested size_t for it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

size_t is a 32-bit number on 32-bit machines, but this is still fine...

const int slot = (int)((slot_word_index << 6) | bit);
*slot_word &= *slot_word - 1; /* clear that bit */
return slot;
}

/* When this function is called, there is a packet to process starting
* at link->rcvbuf. Releasing the buffer is up to the caller, so this
* function should just handle the higher level stuff of processing the
Expand Down Expand Up @@ -4105,20 +4118,27 @@ int clusterProcessPacket(clusterLink *link) {
* new configuration, so other nodes that have an updated table must
* do it. In this way A will stop to act as a primary (or can try to
* failover if there are the conditions to win the election). */
for (int j = 0; j < CLUSTER_SLOTS; j++) {
if (bitmapTestBit(hdr->myslots, j)) {
if (server.cluster->slots[j] == sender || isSlotUnclaimed(j)) continue;
if (server.cluster->slots[j]->configEpoch > sender_claimed_config_epoch) {
bool found_new_owner = false;
for (size_t w = 0; w < CLUSTER_SLOT_WORDS && !found_new_owner; w++) {
uint64_t word;
memcpy(&word, hdr->myslots + SLOT_WORD_OFFSET(w), sizeof(word));
while (word) {
const int slot = clusterExtractSlotFromWord(&word, w);

clusterNode *slot_owner = server.cluster->slots[slot];
if (slot_owner == sender || isSlotUnclaimed(slot)) continue;
if (slot_owner->configEpoch > sender_claimed_config_epoch) {
serverLog(LL_VERBOSE,
"Node %.40s (%s) has old slots configuration, sending "
"an UPDATE message about %.40s (%s)",
sender->name, sender->human_nodename,
server.cluster->slots[j]->name, server.cluster->slots[j]->human_nodename);
clusterSendUpdate(sender->link, server.cluster->slots[j]);
slot_owner->name, slot_owner->human_nodename);
clusterSendUpdate(sender->link, slot_owner);

/* TODO: instead of exiting the loop send every other
* UPDATE packet for other nodes that are the new owner
* of sender's slots. */
found_new_owner = true;
break;
}
}
Expand Down Expand Up @@ -5003,7 +5023,8 @@ void clusterSendFailoverAuthIfNeeded(clusterNode *node, clusterMsg *request) {
uint64_t requestConfigEpoch = ntohu64(request->configEpoch);
unsigned char *claimed_slots = request->myslots;
int force_ack = request->mflags[0] & CLUSTERMSG_FLAG0_FORCEACK;
int j;
int slot;
clusterNode *slot_owner;

/* IF we are not a primary serving at least 1 slot, we don't have the
* right to vote, as the cluster size is the number
Expand Down Expand Up @@ -5049,30 +5070,35 @@ void clusterSendFailoverAuthIfNeeded(clusterNode *node, clusterMsg *request) {
/* The replica requesting the vote must have a configEpoch for the claimed
* slots that is >= the one of the primaries currently serving the same
* slots in the current configuration. */
for (j = 0; j < CLUSTER_SLOTS; j++) {
if (bitmapTestBit(claimed_slots, j) == 0) continue;
if (isSlotUnclaimed(j) || server.cluster->slots[j]->configEpoch <= requestConfigEpoch) {
continue;
for (size_t w = 0; w < CLUSTER_SLOT_WORDS; w++) {
uint64_t word;
memcpy(&word, claimed_slots + SLOT_WORD_OFFSET(w), sizeof(word));
while (word) {
slot = clusterExtractSlotFromWord(&word, w);

if (isSlotUnclaimed(slot) || server.cluster->slots[slot]->configEpoch <= requestConfigEpoch) {
continue;
}
slot_owner = server.cluster->slots[slot];
/* If we reached this point we found a slot that in our current slots
* is served by a primary with a greater configEpoch than the one claimed
* by the replica requesting our vote. Refuse to vote for this replica. */
serverLog(LL_WARNING,
"Failover auth denied to %.40s (%s): "
"slot %d epoch (%llu) > reqConfigEpoch (%llu)",
node->name, node->human_nodename, slot, (unsigned long long)slot_owner->configEpoch,
(unsigned long long)requestConfigEpoch);

/* Send an UPDATE message to the replica. After receiving the UPDATE message,
* the replica will update the slots config so that it can initiate a failover
* again later. Otherwise the replica will never get votes if the primary is down. */
serverLog(LL_VERBOSE,
"Node %.40s (%s) has old slots configuration, sending "
"an UPDATE message about %.40s (%s)",
node->name, node->human_nodename, slot_owner->name, slot_owner->human_nodename);
clusterSendUpdate(node->link, slot_owner);
return;
}
/* If we reached this point we found a slot that in our current slots
* is served by a primary with a greater configEpoch than the one claimed
* by the replica requesting our vote. Refuse to vote for this replica. */
serverLog(LL_WARNING,
"Failover auth denied to %.40s (%s): "
"slot %d epoch (%llu) > reqConfigEpoch (%llu)",
node->name, node->human_nodename, j, (unsigned long long)server.cluster->slots[j]->configEpoch,
(unsigned long long)requestConfigEpoch);

/* Send an UPDATE message to the replica. After receiving the UPDATE message,
* the replica will update the slots config so that it can initiate a failover
* again later. Otherwise the replica will never get votes if the primary is down. */
serverLog(LL_VERBOSE,
"Node %.40s (%s) has old slots configuration, sending "
"an UPDATE message about %.40s (%s)",
node->name, node->human_nodename,
server.cluster->slots[j]->name, server.cluster->slots[j]->human_nodename);
clusterSendUpdate(node->link, server.cluster->slots[j]);
return;
}

/* We can vote for this replica. */
Expand Down
Loading