Skip to content

Commit

Permalink
Cherry-pick for 4.3.5 (#2441)
Browse files Browse the repository at this point in the history
* fix is_connected (#2278)

* fix: workaround asyncio bug on connection reset by peer (#2259)

Fixes #2237

* Fix crash: key expire while search (#2270)

* fix expire while search

* sleep

* async_cluster: fix concurrent pipeline (#2280)

- each pipeline should create separate stacks for each node

* async_cluster: fix max_connections/ssl & improve args (#2217)

* async_cluster: fix max_connections/ssl & improve args

- set proper connection_class if ssl = True
- pass max_connections/connection_class to ClusterNode
- recreate startup_nodes to properly initialize
- pass parser_class to Connection instead of changing it in on_connect
- only pass redis_connect_func if read_from_replicas = True
- add connection_error_retry_attempts parameter
- skip is_connected check in acquire_connection as it is already checked in send_packed_command

BREAKING:
- RedisCluster args except host & port are kw-only now
- RedisCluster will no longer accept unknown arguments
- RedisCluster will no longer accept url as an argument. Use RedisCluster.from_url
- RedisCluster.require_full_coverage defaults to True
- ClusterNode args except host, port, & server_type are kw-only now

* async_cluster: remove kw-only requirement from client

Co-authored-by: dvora-h <[email protected]>

* More cherry picks to support latest module release

* Fix KeyError in async cluster - initialize before execute multi key commands (#2439)

* Fix KeyError in async cluster

* link to issue

* typo

* fixing lint

* fix json test

Co-authored-by: Mehdi ABAAKOUK <[email protected]>
Co-authored-by: Utkarsh Gupta <[email protected]>
Co-authored-by: Chayim I. Kirshen <[email protected]>
  • Loading branch information
4 people authored Nov 21, 2022
1 parent e6cd4fd commit 7424e65
Show file tree
Hide file tree
Showing 32 changed files with 2,821 additions and 1,133 deletions.
395 changes: 205 additions & 190 deletions redis/asyncio/cluster.py

Large diffs are not rendered by default.

13 changes: 11 additions & 2 deletions redis/asyncio/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -685,7 +685,7 @@ def __del__(self):

@property
def is_connected(self):
return self._reader and self._writer
return self._reader is not None and self._writer is not None

def register_connect_callback(self, callback):
self._connect_callbacks.append(weakref.WeakMethod(callback))
Expand Down Expand Up @@ -767,7 +767,16 @@ async def _connect(self):
def _error_message(self, exception):
# args for socket.error can either be (errno, "message")
# or just "message"
if len(exception.args) == 1:
if not exception.args:
# asyncio has a bug where on Connection reset by peer, the
# exception is not instanciated, so args is empty. This is the
# workaround.
# See: https://github.com/redis/redis-py/issues/2237
# See: https://github.com/python/cpython/issues/94061
return (
f"Error connecting to {self.host}:{self.port}. Connection reset by peer"
)
elif len(exception.args) == 1:
return f"Error connecting to {self.host}:{self.port}. {exception.args[0]}."
else:
return (
Expand Down
4 changes: 2 additions & 2 deletions redis/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import threading
import time
from collections import OrderedDict
from typing import Any, Callable, Dict, Tuple
from typing import Any, Callable, Dict, Tuple, Union

from redis.client import CaseInsensitiveDict, PubSub, Redis, parse_scan
from redis.commands import READ_COMMANDS, CommandsParser, RedisClusterCommands
Expand Down Expand Up @@ -42,7 +42,7 @@
log = logging.getLogger(__name__)


def get_node_name(host: str, port: int) -> str:
def get_node_name(host: str, port: Union[str, int]) -> str:
return f"{host}:{port}"


Expand Down
9 changes: 7 additions & 2 deletions redis/commands/bf/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,11 +165,16 @@ def __init__(self, client, **kwargs):
# TDIGEST_RESET: bool_ok,
# TDIGEST_ADD: spaceHolder,
# TDIGEST_MERGE: spaceHolder,
TDIGEST_CDF: float,
TDIGEST_QUANTILE: float,
TDIGEST_CDF: parse_to_list,
TDIGEST_QUANTILE: parse_to_list,
TDIGEST_MIN: float,
TDIGEST_MAX: float,
TDIGEST_TRIMMED_MEAN: float,
TDIGEST_INFO: TDigestInfo,
TDIGEST_RANK: parse_to_list,
TDIGEST_REVRANK: parse_to_list,
TDIGEST_BYRANK: parse_to_list,
TDIGEST_BYREVRANK: parse_to_list,
}

self.client = client
Expand Down
103 changes: 81 additions & 22 deletions redis/commands/bf/commands.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from redis.client import NEVER_DECODE
from redis.exceptions import ModuleError
from redis.utils import HIREDIS_AVAILABLE
from redis.utils import HIREDIS_AVAILABLE, deprecated_function

BF_RESERVE = "BF.RESERVE"
BF_ADD = "BF.ADD"
Expand Down Expand Up @@ -49,6 +49,11 @@
TDIGEST_MIN = "TDIGEST.MIN"
TDIGEST_MAX = "TDIGEST.MAX"
TDIGEST_INFO = "TDIGEST.INFO"
TDIGEST_TRIMMED_MEAN = "TDIGEST.TRIMMED_MEAN"
TDIGEST_RANK = "TDIGEST.RANK"
TDIGEST_REVRANK = "TDIGEST.REVRANK"
TDIGEST_BYRANK = "TDIGEST.BYRANK"
TDIGEST_BYREVRANK = "TDIGEST.BYREVRANK"


class BFCommands:
Expand All @@ -67,6 +72,8 @@ def create(self, key, errorRate, capacity, expansion=None, noScale=None):
self.append_no_scale(params, noScale)
return self.execute_command(BF_RESERVE, *params)

reserve = create

def add(self, key, item):
"""
Add to a Bloom Filter `key` an `item`.
Expand Down Expand Up @@ -176,6 +183,8 @@ def create(
self.append_max_iterations(params, max_iterations)
return self.execute_command(CF_RESERVE, *params)

reserve = create

def add(self, key, item):
"""
Add an `item` to a Cuckoo Filter `key`.
Expand Down Expand Up @@ -316,6 +325,7 @@ def query(self, key, *items):
""" # noqa
return self.execute_command(TOPK_QUERY, key, *items)

@deprecated_function(version="4.4.0", reason="deprecated since redisbloom 2.4.0")
def count(self, key, *items):
"""
Return count for one `item` or more from `key`.
Expand Down Expand Up @@ -344,12 +354,12 @@ def info(self, key):


class TDigestCommands:
def create(self, key, compression):
def create(self, key, compression=100):
"""
Allocate the memory and initialize the t-digest.
For more information see `TDIGEST.CREATE <https://redis.io/commands/tdigest.create>`_.
""" # noqa
return self.execute_command(TDIGEST_CREATE, key, compression)
return self.execute_command(TDIGEST_CREATE, key, "COMPRESSION", compression)

def reset(self, key):
"""
Expand All @@ -358,26 +368,30 @@ def reset(self, key):
""" # noqa
return self.execute_command(TDIGEST_RESET, key)

def add(self, key, values, weights):
def add(self, key, values):
"""
Add one or more samples (value with weight) to a sketch `key`.
Both `values` and `weights` are lists.
For more information see `TDIGEST.ADD <https://redis.io/commands/tdigest.add>`_.
Adds one or more observations to a t-digest sketch `key`.
Example:
>>> tdigestadd('A', [1500.0], [1.0])
For more information see `TDIGEST.ADD <https://redis.io/commands/tdigest.add>`_.
""" # noqa
params = [key]
self.append_values_and_weights(params, values, weights)
return self.execute_command(TDIGEST_ADD, *params)
return self.execute_command(TDIGEST_ADD, key, *values)

def merge(self, toKey, fromKey):
def merge(self, destination_key, num_keys, *keys, compression=None, override=False):
"""
Merge all of the values from 'fromKey' to 'toKey' sketch.
Merges all of the values from `keys` to 'destination-key' sketch.
It is mandatory to provide the `num_keys` before passing the input keys and
the other (optional) arguments.
If `destination_key` already exists its values are merged with the input keys.
If you wish to override the destination key contents use the `OVERRIDE` parameter.
For more information see `TDIGEST.MERGE <https://redis.io/commands/tdigest.merge>`_.
""" # noqa
return self.execute_command(TDIGEST_MERGE, toKey, fromKey)
params = [destination_key, num_keys, *keys]
if compression is not None:
params.extend(["COMPRESSION", compression])
if override:
params.append("OVERRIDE")
return self.execute_command(TDIGEST_MERGE, *params)

def min(self, key):
"""
Expand All @@ -393,20 +407,21 @@ def max(self, key):
""" # noqa
return self.execute_command(TDIGEST_MAX, key)

def quantile(self, key, quantile):
def quantile(self, key, quantile, *quantiles):
"""
Return double value estimate of the cutoff such that a specified fraction of the data
added to this TDigest would be less than or equal to the cutoff.
Returns estimates of one or more cutoffs such that a specified fraction of the
observations added to this t-digest would be less than or equal to each of the
specified cutoffs. (Multiple quantiles can be returned with one call)
For more information see `TDIGEST.QUANTILE <https://redis.io/commands/tdigest.quantile>`_.
""" # noqa
return self.execute_command(TDIGEST_QUANTILE, key, quantile)
return self.execute_command(TDIGEST_QUANTILE, key, quantile, *quantiles)

def cdf(self, key, value):
def cdf(self, key, value, *values):
"""
Return double fraction of all points added which are <= value.
For more information see `TDIGEST.CDF <https://redis.io/commands/tdigest.cdf>`_.
""" # noqa
return self.execute_command(TDIGEST_CDF, key, value)
return self.execute_command(TDIGEST_CDF, key, value, *values)

def info(self, key):
"""
Expand All @@ -416,6 +431,50 @@ def info(self, key):
""" # noqa
return self.execute_command(TDIGEST_INFO, key)

def trimmed_mean(self, key, low_cut_quantile, high_cut_quantile):
"""
Return mean value from the sketch, excluding observation values outside
the low and high cutoff quantiles.
For more information see `TDIGEST.TRIMMED_MEAN <https://redis.io/commands/tdigest.trimmed_mean>`_.
""" # noqa
return self.execute_command(
TDIGEST_TRIMMED_MEAN, key, low_cut_quantile, high_cut_quantile
)

def rank(self, key, value, *values):
"""
Retrieve the estimated rank of value (the number of observations in the sketch
that are smaller than value + half the number of observations that are equal to value).
For more information see `TDIGEST.RANK <https://redis.io/commands/tdigest.rank>`_.
""" # noqa
return self.execute_command(TDIGEST_RANK, key, value, *values)

def revrank(self, key, value, *values):
"""
Retrieve the estimated rank of value (the number of observations in the sketch
that are larger than value + half the number of observations that are equal to value).
For more information see `TDIGEST.REVRANK <https://redis.io/commands/tdigest.revrank>`_.
""" # noqa
return self.execute_command(TDIGEST_REVRANK, key, value, *values)

def byrank(self, key, rank, *ranks):
"""
Retrieve an estimation of the value with the given rank.
For more information see `TDIGEST.BY_RANK <https://redis.io/commands/tdigest.by_rank>`_.
""" # noqa
return self.execute_command(TDIGEST_BYRANK, key, rank, *ranks)

def byrevrank(self, key, rank, *ranks):
"""
Retrieve an estimation of the value with the given reverse rank.
For more information see `TDIGEST.BY_REVRANK <https://redis.io/commands/tdigest.by_revrank>`_.
""" # noqa
return self.execute_command(TDIGEST_BYREVRANK, key, rank, *ranks)


class CMSCommands:
"""Count-Min Sketch Commands"""
Expand Down
22 changes: 12 additions & 10 deletions redis/commands/bf/info.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,18 +68,20 @@ def __init__(self, args):
class TDigestInfo(object):
compression = None
capacity = None
mergedNodes = None
unmergedNodes = None
mergedWeight = None
unmergedWeight = None
totalCompressions = None
merged_nodes = None
unmerged_nodes = None
merged_weight = None
unmerged_weight = None
total_compressions = None
memory_usage = None

def __init__(self, args):
response = dict(zip(map(nativestr, args[::2]), args[1::2]))
self.compression = response["Compression"]
self.capacity = response["Capacity"]
self.mergedNodes = response["Merged nodes"]
self.unmergedNodes = response["Unmerged nodes"]
self.mergedWeight = response["Merged weight"]
self.unmergedWeight = response["Unmerged weight"]
self.totalCompressions = response["Total compressions"]
self.merged_nodes = response["Merged nodes"]
self.unmerged_nodes = response["Unmerged nodes"]
self.merged_weight = response["Merged weight"]
self.unmerged_weight = response["Unmerged weight"]
self.total_compressions = response["Total compressions"]
self.memory_usage = response["Memory usage"]
19 changes: 19 additions & 0 deletions redis/commands/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,25 @@ async def _split_command_across_slots(self, command: str, *keys: KeyT) -> int:
# Sum up the reply from each command
return sum(await self._execute_pipeline_by_slot(command, slots_to_keys))

async def _execute_pipeline_by_slot(
self, command: str, slots_to_args: Mapping[int, Iterable[EncodableT]]
) -> List[Any]:
if self._initialize:
await self.initialize()
read_from_replicas = self.read_from_replicas and command in READ_COMMANDS
pipe = self.pipeline()
[
pipe.execute_command(
command,
*slot_args,
target_nodes=[
self.nodes_manager.get_node_from_slot(slot, read_from_replicas)
],
)
for slot, slot_args in slots_to_args.items()
]
return await pipe.execute()


class ClusterManagementCommands(ManagementCommands):
"""
Expand Down
Loading

0 comments on commit 7424e65

Please sign in to comment.