Skip to content

Commit

Permalink
Improve delay logic for the same epoch election
Browse files Browse the repository at this point in the history
Signed-off-by: Seungmin Lee <[email protected]>
  • Loading branch information
Seungmin Lee committed Feb 7, 2025
1 parent fc55142 commit 0f5f0be
Showing 1 changed file with 12 additions and 8 deletions.
20 changes: 12 additions & 8 deletions src/cluster_legacy.c
Original file line number Diff line number Diff line change
Expand Up @@ -3260,6 +3260,7 @@ int clusterProcessPacket(clusterLink *link) {
sender_claimed_config_epoch = ntohu64(hdr->configEpoch);
if (sender_claimed_current_epoch > server.cluster->currentEpoch)
server.cluster->currentEpoch = sender_claimed_current_epoch;

/* Update the sender configEpoch if it is a primary publishing a newer one. */
if (sender_claims_to_be_primary && sender_claimed_config_epoch > sender->configEpoch) {
sender->configEpoch = sender_claimed_config_epoch;
Expand Down Expand Up @@ -4675,9 +4676,12 @@ int clusterGetFailedPrimaryRank(void) {
if (!nodeFailed(node) || !clusterNodeIsVotingPrimary(node) || node->num_replicas == 0) continue;

/* If cluster-replica-validity-factor is enabled, skip the invalid nodes. */
if (server.cluster_replica_validity_factor) {
if ((now - node->fail_time) > (server.cluster_node_timeout * server.cluster_replica_validity_factor))
if (nodeFailed(node) && server.cluster_replica_validity_factor) {
if ((now - node->fail_time) > (server.cluster_node_timeout * server.cluster_replica_validity_factor)) {
serverLog(LL_DEBUG, "Skip failed primary rank since validity factor is enabled. node failed time: %llu",
(unsigned long long)node->fail_time);
continue;
}
}

if (memcmp(node->shard_id, myself->shard_id, CLUSTER_NAMELEN) < 0) rank++;
Expand Down Expand Up @@ -4860,7 +4864,7 @@ void clusterHandleReplicaFailover(void) {
if (auth_age > auth_retry_time) {
server.cluster->failover_auth_time = now +
500 + /* Fixed delay of 500 milliseconds, let FAIL msg propagate. */
random() % 500; /* Random delay between 0 and 500 milliseconds. */
random() % 100; /* Random delay between 0 and 100 milliseconds. */
server.cluster->failover_auth_count = 0;
server.cluster->failover_auth_sent = 0;
server.cluster->failover_auth_rank = clusterGetReplicaRank();
Expand All @@ -4869,10 +4873,10 @@ void clusterHandleReplicaFailover(void) {
* less updated replication offset, are penalized. */
server.cluster->failover_auth_time += server.cluster->failover_auth_rank * 1000;
/* We add another delay that is proportional to the failed primary rank.
* Specifically 0.5 second * rank. This way those failed primaries will be
* Specifically (1 second + random % 100) * rank. This way those failed primaries will be
* elected in rank to avoid the vote conflicts. */
server.cluster->failover_failed_primary_rank = clusterGetFailedPrimaryRank();
server.cluster->failover_auth_time += server.cluster->failover_failed_primary_rank * 500;
server.cluster->failover_auth_time += server.cluster->failover_failed_primary_rank * (1000 + random() % 100);
/* However if this is a manual failover, no delay is needed. */
if (server.cluster->mf_end) {
server.cluster->failover_auth_time = now;
Expand Down Expand Up @@ -4917,7 +4921,7 @@ void clusterHandleReplicaFailover(void) {

int new_failed_primary_rank = clusterGetFailedPrimaryRank();
if (new_failed_primary_rank != server.cluster->failover_failed_primary_rank) {
long long added_delay = (new_failed_primary_rank - server.cluster->failover_failed_primary_rank) * 500;
long long added_delay = (new_failed_primary_rank - server.cluster->failover_failed_primary_rank) * (1000 + random() % 100);
server.cluster->failover_auth_time += added_delay;
server.cluster->failover_failed_primary_rank = new_failed_primary_rank;
serverLog(LL_NOTICE, "Failed primary rank updated to #%d, added %lld milliseconds of delay.",
Expand All @@ -4941,8 +4945,8 @@ void clusterHandleReplicaFailover(void) {
if (server.cluster->failover_auth_sent == 0) {
server.cluster->currentEpoch++;
server.cluster->failover_auth_epoch = server.cluster->currentEpoch;
serverLog(LL_NOTICE, "Starting a failover election for epoch %llu, node config epoch is %llu",
(unsigned long long)server.cluster->currentEpoch, (unsigned long long)nodeEpoch(myself));
serverLog(LL_NOTICE, "Starting a failover election for epoch %llu, node config epoch is %llu, failover primary rank is %llu",
(unsigned long long)server.cluster->currentEpoch, (unsigned long long)nodeEpoch(myself), (unsigned long long)server.cluster->failover_failed_primary_rank);
clusterRequestFailoverAuth();
server.cluster->failover_auth_sent = 1;
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG | CLUSTER_TODO_UPDATE_STATE | CLUSTER_TODO_FSYNC_CONFIG);
Expand Down

0 comments on commit 0f5f0be

Please sign in to comment.