diff --git a/src/cluster_legacy.c b/src/cluster_legacy.c index 22e2436d21..4bf8084960 100644 --- a/src/cluster_legacy.c +++ b/src/cluster_legacy.c @@ -169,6 +169,9 @@ static inline int defaultClientPort(void) { #define isSlotUnclaimed(slot) \ (server.cluster->slots[slot] == NULL || bitmapTestBit(server.cluster->owner_not_claiming_slot, slot)) +/* Treating slot bitmaps as 8-byte words to speedup iteration */ +#define CLUSTER_SLOT_WORDS (CLUSTER_SLOTS / 64) +#define SLOT_WORD_OFFSET(w) ((w) << 3) #define RCVBUF_INIT_LEN 1024 #define RCVBUF_MIN_READ_LEN 14 @@ -3596,6 +3599,16 @@ int clusterIsValidPacket(clusterLink *link) { return 1; } +/* When iterating through the slot bitmap, group every 64 bits as + * a word to speedup. */ +static inline int clusterExtractSlotFromWord(uint64_t *slot_word, size_t slot_word_index) { + /* Get the index of the least-significant set bit, in this 64-bit word */ + const unsigned bit = (unsigned)__builtin_ctzll(*slot_word); + const int slot = (int)((slot_word_index << 6) | bit); + *slot_word &= *slot_word - 1; /* clear that bit */ + return slot; +} + /* When this function is called, there is a packet to process starting * at link->rcvbuf. Releasing the buffer is up to the caller, so this * function should just handle the higher level stuff of processing the @@ -4105,20 +4118,27 @@ int clusterProcessPacket(clusterLink *link) { * new configuration, so other nodes that have an updated table must * do it. In this way A will stop to act as a primary (or can try to * failover if there are the conditions to win the election). */ - for (int j = 0; j < CLUSTER_SLOTS; j++) { - if (bitmapTestBit(hdr->myslots, j)) { - if (server.cluster->slots[j] == sender || isSlotUnclaimed(j)) continue; - if (server.cluster->slots[j]->configEpoch > sender_claimed_config_epoch) { + bool found_new_owner = false; + for (size_t w = 0; w < CLUSTER_SLOT_WORDS && !found_new_owner; w++) { + uint64_t word; + memcpy(&word, hdr->myslots + SLOT_WORD_OFFSET(w), sizeof(word)); + while (word) { + const int slot = clusterExtractSlotFromWord(&word, w); + + clusterNode *slot_owner = server.cluster->slots[slot]; + if (slot_owner == sender || isSlotUnclaimed(slot)) continue; + if (slot_owner->configEpoch > sender_claimed_config_epoch) { serverLog(LL_VERBOSE, "Node %.40s (%s) has old slots configuration, sending " "an UPDATE message about %.40s (%s)", sender->name, sender->human_nodename, - server.cluster->slots[j]->name, server.cluster->slots[j]->human_nodename); - clusterSendUpdate(sender->link, server.cluster->slots[j]); + slot_owner->name, slot_owner->human_nodename); + clusterSendUpdate(sender->link, slot_owner); /* TODO: instead of exiting the loop send every other * UPDATE packet for other nodes that are the new owner * of sender's slots. */ + found_new_owner = true; break; } } @@ -5003,7 +5023,8 @@ void clusterSendFailoverAuthIfNeeded(clusterNode *node, clusterMsg *request) { uint64_t requestConfigEpoch = ntohu64(request->configEpoch); unsigned char *claimed_slots = request->myslots; int force_ack = request->mflags[0] & CLUSTERMSG_FLAG0_FORCEACK; - int j; + int slot; + clusterNode *slot_owner; /* IF we are not a primary serving at least 1 slot, we don't have the * right to vote, as the cluster size is the number @@ -5049,30 +5070,35 @@ void clusterSendFailoverAuthIfNeeded(clusterNode *node, clusterMsg *request) { /* The replica requesting the vote must have a configEpoch for the claimed * slots that is >= the one of the primaries currently serving the same * slots in the current configuration. */ - for (j = 0; j < CLUSTER_SLOTS; j++) { - if (bitmapTestBit(claimed_slots, j) == 0) continue; - if (isSlotUnclaimed(j) || server.cluster->slots[j]->configEpoch <= requestConfigEpoch) { - continue; + for (size_t w = 0; w < CLUSTER_SLOT_WORDS; w++) { + uint64_t word; + memcpy(&word, claimed_slots + SLOT_WORD_OFFSET(w), sizeof(word)); + while (word) { + slot = clusterExtractSlotFromWord(&word, w); + + if (isSlotUnclaimed(slot) || server.cluster->slots[slot]->configEpoch <= requestConfigEpoch) { + continue; + } + slot_owner = server.cluster->slots[slot]; + /* If we reached this point we found a slot that in our current slots + * is served by a primary with a greater configEpoch than the one claimed + * by the replica requesting our vote. Refuse to vote for this replica. */ + serverLog(LL_WARNING, + "Failover auth denied to %.40s (%s): " + "slot %d epoch (%llu) > reqConfigEpoch (%llu)", + node->name, node->human_nodename, slot, (unsigned long long)slot_owner->configEpoch, + (unsigned long long)requestConfigEpoch); + + /* Send an UPDATE message to the replica. After receiving the UPDATE message, + * the replica will update the slots config so that it can initiate a failover + * again later. Otherwise the replica will never get votes if the primary is down. */ + serverLog(LL_VERBOSE, + "Node %.40s (%s) has old slots configuration, sending " + "an UPDATE message about %.40s (%s)", + node->name, node->human_nodename, slot_owner->name, slot_owner->human_nodename); + clusterSendUpdate(node->link, slot_owner); + return; } - /* If we reached this point we found a slot that in our current slots - * is served by a primary with a greater configEpoch than the one claimed - * by the replica requesting our vote. Refuse to vote for this replica. */ - serverLog(LL_WARNING, - "Failover auth denied to %.40s (%s): " - "slot %d epoch (%llu) > reqConfigEpoch (%llu)", - node->name, node->human_nodename, j, (unsigned long long)server.cluster->slots[j]->configEpoch, - (unsigned long long)requestConfigEpoch); - - /* Send an UPDATE message to the replica. After receiving the UPDATE message, - * the replica will update the slots config so that it can initiate a failover - * again later. Otherwise the replica will never get votes if the primary is down. */ - serverLog(LL_VERBOSE, - "Node %.40s (%s) has old slots configuration, sending " - "an UPDATE message about %.40s (%s)", - node->name, node->human_nodename, - server.cluster->slots[j]->name, server.cluster->slots[j]->human_nodename); - clusterSendUpdate(node->link, server.cluster->slots[j]); - return; } /* We can vote for this replica. */