From dfc343dcdfd763cd7dee42943ff8a14f72ac94ad Mon Sep 17 00:00:00 2001 From: Bar Shaul Date: Thu, 16 Jun 2022 10:18:46 +0300 Subject: [PATCH 1/5] A fix was made to reuse the old nodes' connections when a cluster topology refresh is being done --- redis/cluster.py | 34 ++++++++++++++++++++++++---------- tests/test_cluster.py | 40 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 64 insertions(+), 10 deletions(-) diff --git a/redis/cluster.py b/redis/cluster.py index 8e4c654c6b..d09ed60fcc 100644 --- a/redis/cluster.py +++ b/redis/cluster.py @@ -1134,6 +1134,7 @@ def _execute_command(self, target_node, *args, **kwargs): else: # Hard force of reinitialize of the node/slots setup # and try again with the new setup + target_node.redis_connection = None self.nodes_manager.initialize() raise except MovedError as e: @@ -1443,6 +1444,21 @@ def create_redis_node(self, host, port, **kwargs): r = Redis(host=host, port=port, **kwargs) return r + def _get_or_create_cluster_node(self, host, port, role, tmp_nodes_cache): + node_name = get_node_name(host, port) + # check if we already have this node in the tmp_nodes_cache + target_node = tmp_nodes_cache.get(node_name) + if target_node is None: + # before creating a new cluster node, check if the cluster node already + # exists in the current nodes cache and has a valid connection so we can + # reuse it + target_node = self.nodes_cache.get(node_name) + if target_node is None or target_node.redis_connection is None: + # create new cluster node for this cluster + target_node = ClusterNode(host, port, role) + + return target_node + def initialize(self): """ Initializes the nodes cache, slots cache and redis connections. @@ -1521,14 +1537,14 @@ def initialize(self): for slot in cluster_slots: primary_node = slot[2] - host = primary_node[0] + host = str_if_bytes(primary_node[0]) if host == "": host = startup_node.host port = int(primary_node[1]) - target_node = tmp_nodes_cache.get(get_node_name(host, port)) - if target_node is None: - target_node = ClusterNode(host, port, PRIMARY) + target_node = self._get_or_create_cluster_node( + host, port, PRIMARY, tmp_nodes_cache + ) # add this node to the nodes cache tmp_nodes_cache[target_node.name] = target_node @@ -1539,14 +1555,12 @@ def initialize(self): replica_nodes = [slot[j] for j in range(3, len(slot))] for replica_node in replica_nodes: - host = replica_node[0] + host = str_if_bytes(replica_node[0]) port = replica_node[1] - target_replica_node = tmp_nodes_cache.get( - get_node_name(host, port) + target_replica_node = self._get_or_create_cluster_node( + host, port, REPLICA, tmp_nodes_cache ) - if target_replica_node is None: - target_replica_node = ClusterNode(host, port, REPLICA) tmp_slots[i].append(target_replica_node) # add this node to the nodes cache tmp_nodes_cache[ @@ -1598,7 +1612,7 @@ def initialize(self): # Set the default node self.default_node = self.get_nodes_by_server_type(PRIMARY)[0] # Populate the startup nodes with all discovered nodes - self.populate_startup_nodes(self.nodes_cache.values()) + self.startup_nodes = tmp_nodes_cache # If initialize was called after a MovedError, clear it self._moved_exception = None diff --git a/tests/test_cluster.py b/tests/test_cluster.py index d1568ef4de..438ef73a0e 100644 --- a/tests/test_cluster.py +++ b/tests/test_cluster.py @@ -29,6 +29,7 @@ RedisClusterException, RedisError, ResponseError, + TimeoutError, ) from redis.utils import str_if_bytes from tests.test_pubsub import wait_for_message @@ -651,6 +652,45 @@ def test_not_require_full_coverage_cluster_down_error(self, r): else: raise e + def test_timeout_error_topology_refresh_reuse_connections(self, r): + """ + By mucking TIMEOUT errors, we'll force the cluster topology to be reinitialized, + and then ensure that only the impacted connection is replaced + """ + node = r.get_node_from_key("key") + r.set("key", "value") + node_conn_origin = {} + for n in r.get_nodes(): + node_conn_origin[n.name] = n.redis_connection + real_func = r.get_redis_connection(node).parse_response + + class counter: + def __init__(self, val=0): + self.val = int(val) + + count = counter(0) + with patch.object(Redis, "parse_response") as parse_response: + + def moved_redirect_effect(connection, *args, **options): + # raise a timeout for 5 times so we'll need to reinitilize the topology + if count.val >= 5: + parse_response.side_effect = real_func + count.val += 1 + raise TimeoutError() + + parse_response.side_effect = moved_redirect_effect + assert r.get("key") == b"value" + for node_name, conn in node_conn_origin.items(): + if node_name == node.name: + # The old redis connection of the timed out node should have been + # deleted and replaced + assert conn != r.get_redis_connection(node) + else: + # other nodes' redis connection should have been reused during the + # topology refresh + cur_node = r.get_node(node_name=node_name) + assert conn == r.get_redis_connection(cur_node) + @pytest.mark.onlycluster class TestClusterRedisCommands: From 2026f66a6a2e58732c82a9624cc3c3b54d6623d1 Mon Sep 17 00:00:00 2001 From: Bar Shaul Date: Thu, 16 Jun 2022 10:58:20 +0300 Subject: [PATCH 2/5] Fixed RedisCluster to immediately raise AuthenticationError --- redis/cluster.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/redis/cluster.py b/redis/cluster.py index d09ed60fcc..1737ec75b1 100644 --- a/redis/cluster.py +++ b/redis/cluster.py @@ -14,6 +14,7 @@ from redis.crc import REDIS_CLUSTER_HASH_SLOTS, key_slot from redis.exceptions import ( AskError, + AuthenticationError, BusyLoadingError, ClusterCrossSlotError, ClusterDownError, @@ -1113,7 +1114,7 @@ def _execute_command(self, target_node, *args, **kwargs): ) return response - except (RedisClusterException, BusyLoadingError) as e: + except (RedisClusterException, BusyLoadingError, AuthenticationError) as e: log.exception(type(e)) raise except (ConnectionError, TimeoutError) as e: From e7e06139c8edae90bc5a05a81eebf5820fba954f Mon Sep 17 00:00:00 2001 From: Bar Shaul Date: Thu, 16 Jun 2022 11:02:50 +0300 Subject: [PATCH 3/5] Updated CHANGES --- CHANGES | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGES b/CHANGES index b7e238ebb3..367fd9d29d 100644 --- a/CHANGES +++ b/CHANGES @@ -10,6 +10,8 @@ * Fix broken connection writer lock-up for asyncio (#2065) * Fix auth bug when provided with no username (#2086) * Fix missing ClusterPipeline._lock (#2189) + * Fix reusing the old nodes' connections when cluster topology refresh is being done + * Fix RedisCluster to immediately raise AuthenticationError without a retry * 4.1.3 (Feb 8, 2022) * Fix flushdb and flushall (#1926) From 70d249755db6d7c9d7d5f92a000bd2c312fc9167 Mon Sep 17 00:00:00 2001 From: Bar Shaul Date: Thu, 16 Jun 2022 11:24:56 +0300 Subject: [PATCH 4/5] Fixed cluster async bgsave test to ignore "bgsave already in progress" error --- tests/test_asyncio/test_cluster.py | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/tests/test_asyncio/test_cluster.py b/tests/test_asyncio/test_cluster.py index 0c676cb2d7..6493bd6785 100644 --- a/tests/test_asyncio/test_cluster.py +++ b/tests/test_asyncio/test_cluster.py @@ -1057,9 +1057,14 @@ async def test_readwrite(self) -> None: @skip_if_redis_enterprise() async def test_bgsave(self, r: RedisCluster) -> None: - assert await r.bgsave() - await asyncio.sleep(0.3) - assert await r.bgsave(True) + try: + assert await r.bgsave() + await asyncio.sleep(0.3) + assert await r.bgsave(True) + except ResponseError as e: + if "Background save already in progress" not in e.__str__(): + raise + async def test_info(self, r: RedisCluster) -> None: # Map keys to same slot From 1bac8433f565c3a6c5980fbe56d7e422592e3f17 Mon Sep 17 00:00:00 2001 From: Bar Shaul Date: Thu, 16 Jun 2022 11:31:24 +0300 Subject: [PATCH 5/5] Fixed linters --- tests/test_asyncio/test_cluster.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/test_asyncio/test_cluster.py b/tests/test_asyncio/test_cluster.py index 6493bd6785..a7fe23c6a8 100644 --- a/tests/test_asyncio/test_cluster.py +++ b/tests/test_asyncio/test_cluster.py @@ -1065,7 +1065,6 @@ async def test_bgsave(self, r: RedisCluster) -> None: if "Background save already in progress" not in e.__str__(): raise - async def test_info(self, r: RedisCluster) -> None: # Map keys to same slot await r.set("x{1}", 1)