Skip to content

Commit

Permalink
standalone REDIRECT: Fix scripting and further MULTI/EXEC scenarios
Browse files Browse the repository at this point in the history
In cluster mode, the necessity of a "MOVED" response is mainly determined based
on the keys of a command whereas in standalone redirect mode, the necessity of a
"REDIRECT" response is determined based on the command's flags.

It turns out that looking at keys is necessary. Moreover, the currently
used command flags do not encompass all situations in which a REDIRECT is
necessary:

- WATCH command (redirect in READWRITE mode is necessary)
- transactions with no keys must not be redirected
- scripting scenarios: scripts (with keys) must be redirected in some situations
  (issue valkey-io#868).

The new logic is a heavily "distilled" version of `getNodeByQuery` used in
cluster mode. As we don't need to look at slots and their consistency, the
information necessary is the flags of the command and whether it accesses at
least one key.

Signed-off-by: Simon Baatz <[email protected]>
  • Loading branch information
gmbnomis committed Feb 25, 2025
1 parent 221ff44 commit 727f400
Show file tree
Hide file tree
Showing 2 changed files with 165 additions and 34 deletions.
110 changes: 79 additions & 31 deletions src/server.c
Original file line number Diff line number Diff line change
Expand Up @@ -4013,6 +4013,52 @@ uint64_t getCommandFlags(client *c) {
return cmd_flags;
}

/* Check if the command will access at least one key (or has a "key-like"
argument like a sharded pub/sub command) */
int
commandHasKey(client *c, struct serverCommand *cmd, robj **argv, int argc) {
multiState *ms, _ms;
multiCmd mc;
int i;

if (cmd->proc == execCommand) {
/* If CLIENT_MULTI flag is not set EXEC is just going to return an
* error later. */
if (!c->flag.multi) return 0;
ms = c->mstate;
} else {
/* In order to have a single codepath create a fake Multi State
* structure if the client is not in MULTI/EXEC state, this way
* we have a single codepath below. */
ms = &_ms;
_ms.commands = &mc;
_ms.count = 1;
mc.argv = argv;
mc.argc = argc;
mc.cmd = cmd;
}

/* Check if the command accesses a key */
for (i = 0; i < ms->count; i++) {
struct serverCommand *mcmd;
robj **margv;
int margc, numkeys;

mcmd = ms->commands[i].cmd;
margc = ms->commands[i].argc;
margv = ms->commands[i].argv;

getKeysResult result;
initGetKeysResult(&result);
numkeys = getKeysFromCommand(mcmd, margv, margc, &result);
getKeysFreeResult(&result);
if (numkeys > 0) {
return 1;
}
}
return 0;
}

/* If this function gets called we already read a whole
* command, arguments are in the client argv/argc fields.
* processCommand() execute the command or prepare the
Expand Down Expand Up @@ -4103,6 +4149,7 @@ int processCommand(client *c) {
int is_denyloading_command = (combined_inv_flags & CMD_LOADING);
int is_may_replicate_command = (combined_flags & (CMD_WRITE | CMD_MAY_REPLICATE));
int is_deny_async_loading_command = (combined_flags & CMD_NO_ASYNC_LOADING);
int is_pubsub_command = (combined_flags & CMD_PUBSUB);

const int obey_client = mustObeyClient(c);

Expand Down Expand Up @@ -4154,40 +4201,41 @@ int processCommand(client *c) {
}
}

if (!server.cluster_enabled && c->capa & CLIENT_CAPA_REDIRECT && server.primary_host && !obey_client &&
(is_write_command || (is_read_command && !c->flag.readonly))) {
if (server.failover_state == FAILOVER_IN_PROGRESS) {
/* During the FAILOVER process, when conditions are met (such as
* when the force time is reached or the primary and replica offsets
* are consistent), the primary actively becomes the replica and
* transitions to the FAILOVER_IN_PROGRESS state.
*
* After the primary becomes the replica, and after handshaking
* and other operations, it will eventually send the PSYNC FAILOVER
* command to the replica, then the replica will become the primary.
* This means that the upgrade of the replica to the primary is an
* asynchronous operation, which implies that during the
* FAILOVER_IN_PROGRESS state, there may be a period of time where
* both nodes are replicas.
*
* In this scenario, if a -REDIRECT is returned, the request will be
* redirected to the replica and then redirected back, causing back
* and forth redirection. To avoid this situation, during the
* FAILOVER_IN_PROGRESS state, we temporarily suspend the clients
* that need to be redirected until the replica truly becomes the primary,
* and then resume the execution. */
blockPostponeClient(c);
} else {
if (c->cmd->proc == execCommand) {
discardTransaction(c);
if (!server.cluster_enabled && c->capa & CLIENT_CAPA_REDIRECT && server.primary_host && !obey_client) {
if (commandHasKey(c, c->cmd, c->argv, c->argc) && (is_write_command || (!c->flag.readonly && !is_pubsub_command))) {
if (server.failover_state == FAILOVER_IN_PROGRESS) {
/* During the FAILOVER process, when conditions are met (such as
* when the force time is reached or the primary and replica offsets
* are consistent), the primary actively becomes the replica and
* transitions to the FAILOVER_IN_PROGRESS state.
*
* After the primary becomes the replica, and after handshaking
* and other operations, it will eventually send the PSYNC FAILOVER
* command to the replica, then the replica will become the primary.
* This means that the upgrade of the replica to the primary is an
* asynchronous operation, which implies that during the
* FAILOVER_IN_PROGRESS state, there may be a period of time where
* both nodes are replicas.
*
* In this scenario, if a -REDIRECT is returned, the request will be
* redirected to the replica and then redirected back, causing back
* and forth redirection. To avoid this situation, during the
* FAILOVER_IN_PROGRESS state, we temporarily suspend the clients
* that need to be redirected until the replica truly becomes the primary,
* and then resume the execution. */
blockPostponeClient(c);
} else {
flagTransaction(c);
if (c->cmd->proc == execCommand) {
discardTransaction(c);
} else {
flagTransaction(c);
}
c->duration = 0;
c->cmd->rejected_calls++;
addReplyErrorSds(c, sdscatprintf(sdsempty(), "-REDIRECT %s:%d", server.primary_host, server.primary_port));
}
c->duration = 0;
c->cmd->rejected_calls++;
addReplyErrorSds(c, sdscatprintf(sdsempty(), "-REDIRECT %s:%d", server.primary_host, server.primary_port));
return C_OK;
}
return C_OK;
}

/* Disconnect some clients if total clients memory is too high. We do this
Expand Down
89 changes: 86 additions & 3 deletions tests/integration/replica-redirect.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -32,20 +32,25 @@ start_server {tags {needs:repl external:skip}} {
$rr close
}

test {replica allow read command by default} {
test {replica allows read command by default} {
r get foo
} {}

test {replica reply READONLY error for write command by default} {
assert_error {READONLY*} {r set foo bar}
}

test {replica redirect read and write command after CLIENT CAPA REDIRECT} {
test {replica redirects read and write command after CLIENT CAPA REDIRECT} {
r client capa redirect
assert_error "REDIRECT $primary_host:$primary_port" {r set foo bar}
assert_error "REDIRECT $primary_host:$primary_port" {r get foo}
}

test {replica redirects (read-only) script with keys} {
assert_error "REDIRECT $primary_host:$primary_port" {r eval {return redis.error_reply("ERR script must not be executed")} 1 foo}
assert_error "REDIRECT $primary_host:$primary_port" {r eval_ro {return redis.error_reply("ERR script must not be executed")} 1 foo}
}

test {CLIENT INFO} {
r client info
} {id=* addr=*:* laddr=*:* fd=* name=* age=* idle=* flags=N capa=r db=* sub=0 psub=0 ssub=0 multi=-1 watch=0 qbuf=0 qbuf-free=* argv-mem=* multi-mem=0 rbs=* rbp=* obl=0 oll=0 omem=0 tot-mem=* events=r cmd=client|info user=* redir=-1 resp=* lib-name=* lib-ver=* tot-net-in=* tot-net-out=* tot-cmds=*}
Expand All @@ -54,11 +59,89 @@ start_server {tags {needs:repl external:skip}} {
r ping
} {PONG}

test {replica allow read command in READONLY mode} {

# TODO: This corresponds to the behavior in cluster mode. But this may not be correct
# as pub/sub works differently in standalone mode (messages are _not_ forwarded
# from replica to primary), see issue #1780.
test {replica allows pubsub commands} {
r publish foo bar
r spublish foo bar
}

test {replica allows script accessing no key} {
assert_equal "PONG" [r eval {return redis.call("ping")} 0]
}

test {replica allows read-only script accessing no key} {
assert_equal "PONG" [r eval_ro {return redis.call("ping")} 0]
}

test {replica allows MULTI that accesses no key} {
r multi
r ping
r eval {return redis.call("ping")} 0
r eval_ro {return redis.call("ping")} 0
r exec
}

test {replica redirects WATCH} {
assert_error "REDIRECT*" {r watch foo}
}

test {replica allows read command in READONLY mode} {
r readonly
r get foo
} {}


test {replica allows read command in MULTI in READONLY mode} {
r readonly
r multi
r get foo
r exec
}

test {replica allows WATCH in READONLY mode} {
r readonly
r watch foo
r unwatch
}

test {replica allows read-only script in READONLY mode} {
r readonly
r eval_ro {redis.call("get", KEYS[1])} 1 foo
}

test {replica allows read-only script in MULTI in READONLY mode} {
r readonly
r multi
r eval_ro {redis.call("get", KEYS[1])} 1 foo
r exec
}

test {replica allows script that potentially writes but may only read in READONLY mode} {
r readonly
r eval {redis.call("get", KEYS[1])} 1 foo
}

test {replica allows script that potentially writes but may only read in MULTI in READONLY mode} {
r readonly
r multi
r eval {redis.call("get", KEYS[1])} 1 foo
r exec
}

test {replica allows script that potentially writes in READONLY mode and fails if it actually writes} {
r readonly
assert_error "READONLY*" {r eval {redis.call("set", KEYS[1], 'bar')} 1 foo}
}

test {replica allows script accessing no key in READONLY mode} {
r readonly
assert_equal "PONG" [r eval {return redis.call("ping")} 0]
assert_equal "PONG" [r eval_ro {return redis.call("ping")} 0]
}

test {client paused during failover-in-progress} {
pause_process $replica_pid
# replica will never acknowledge this write
Expand Down

0 comments on commit 727f400

Please sign in to comment.