Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cherry-pick for 4.3.5 #2441

Merged
merged 9 commits into from
Nov 21, 2022
395 changes: 205 additions & 190 deletions redis/asyncio/cluster.py

Large diffs are not rendered by default.

13 changes: 11 additions & 2 deletions redis/asyncio/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -685,7 +685,7 @@ def __del__(self):

@property
def is_connected(self):
return self._reader and self._writer
return self._reader is not None and self._writer is not None

def register_connect_callback(self, callback):
self._connect_callbacks.append(weakref.WeakMethod(callback))
Expand Down Expand Up @@ -767,7 +767,16 @@ async def _connect(self):
def _error_message(self, exception):
# args for socket.error can either be (errno, "message")
# or just "message"
if len(exception.args) == 1:
if not exception.args:
# asyncio has a bug where on Connection reset by peer, the
# exception is not instanciated, so args is empty. This is the
# workaround.
# See: https://github.com/redis/redis-py/issues/2237
# See: https://github.com/python/cpython/issues/94061
return (
f"Error connecting to {self.host}:{self.port}. Connection reset by peer"
)
elif len(exception.args) == 1:
return f"Error connecting to {self.host}:{self.port}. {exception.args[0]}."
else:
return (
Expand Down
4 changes: 2 additions & 2 deletions redis/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import threading
import time
from collections import OrderedDict
from typing import Any, Callable, Dict, Tuple
from typing import Any, Callable, Dict, Tuple, Union

from redis.client import CaseInsensitiveDict, PubSub, Redis, parse_scan
from redis.commands import READ_COMMANDS, CommandsParser, RedisClusterCommands
Expand Down Expand Up @@ -42,7 +42,7 @@
log = logging.getLogger(__name__)


def get_node_name(host: str, port: int) -> str:
def get_node_name(host: str, port: Union[str, int]) -> str:
return f"{host}:{port}"


Expand Down
9 changes: 7 additions & 2 deletions redis/commands/bf/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,11 +165,16 @@ def __init__(self, client, **kwargs):
# TDIGEST_RESET: bool_ok,
# TDIGEST_ADD: spaceHolder,
# TDIGEST_MERGE: spaceHolder,
TDIGEST_CDF: float,
TDIGEST_QUANTILE: float,
TDIGEST_CDF: parse_to_list,
TDIGEST_QUANTILE: parse_to_list,
TDIGEST_MIN: float,
TDIGEST_MAX: float,
TDIGEST_TRIMMED_MEAN: float,
TDIGEST_INFO: TDigestInfo,
TDIGEST_RANK: parse_to_list,
TDIGEST_REVRANK: parse_to_list,
TDIGEST_BYRANK: parse_to_list,
TDIGEST_BYREVRANK: parse_to_list,
}

self.client = client
Expand Down
103 changes: 81 additions & 22 deletions redis/commands/bf/commands.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from redis.client import NEVER_DECODE
from redis.exceptions import ModuleError
from redis.utils import HIREDIS_AVAILABLE
from redis.utils import HIREDIS_AVAILABLE, deprecated_function

BF_RESERVE = "BF.RESERVE"
BF_ADD = "BF.ADD"
Expand Down Expand Up @@ -49,6 +49,11 @@
TDIGEST_MIN = "TDIGEST.MIN"
TDIGEST_MAX = "TDIGEST.MAX"
TDIGEST_INFO = "TDIGEST.INFO"
TDIGEST_TRIMMED_MEAN = "TDIGEST.TRIMMED_MEAN"
TDIGEST_RANK = "TDIGEST.RANK"
TDIGEST_REVRANK = "TDIGEST.REVRANK"
TDIGEST_BYRANK = "TDIGEST.BYRANK"
TDIGEST_BYREVRANK = "TDIGEST.BYREVRANK"


class BFCommands:
Expand All @@ -67,6 +72,8 @@ def create(self, key, errorRate, capacity, expansion=None, noScale=None):
self.append_no_scale(params, noScale)
return self.execute_command(BF_RESERVE, *params)

reserve = create

def add(self, key, item):
"""
Add to a Bloom Filter `key` an `item`.
Expand Down Expand Up @@ -176,6 +183,8 @@ def create(
self.append_max_iterations(params, max_iterations)
return self.execute_command(CF_RESERVE, *params)

reserve = create

def add(self, key, item):
"""
Add an `item` to a Cuckoo Filter `key`.
Expand Down Expand Up @@ -316,6 +325,7 @@ def query(self, key, *items):
""" # noqa
return self.execute_command(TOPK_QUERY, key, *items)

@deprecated_function(version="4.4.0", reason="deprecated since redisbloom 2.4.0")
def count(self, key, *items):
"""
Return count for one `item` or more from `key`.
Expand Down Expand Up @@ -344,12 +354,12 @@ def info(self, key):


class TDigestCommands:
def create(self, key, compression):
def create(self, key, compression=100):
"""
Allocate the memory and initialize the t-digest.
For more information see `TDIGEST.CREATE <https://redis.io/commands/tdigest.create>`_.
""" # noqa
return self.execute_command(TDIGEST_CREATE, key, compression)
return self.execute_command(TDIGEST_CREATE, key, "COMPRESSION", compression)

def reset(self, key):
"""
Expand All @@ -358,26 +368,30 @@ def reset(self, key):
""" # noqa
return self.execute_command(TDIGEST_RESET, key)

def add(self, key, values, weights):
def add(self, key, values):
"""
Add one or more samples (value with weight) to a sketch `key`.
Both `values` and `weights` are lists.
For more information see `TDIGEST.ADD <https://redis.io/commands/tdigest.add>`_.
Adds one or more observations to a t-digest sketch `key`.
Example:
>>> tdigestadd('A', [1500.0], [1.0])
For more information see `TDIGEST.ADD <https://redis.io/commands/tdigest.add>`_.
""" # noqa
params = [key]
self.append_values_and_weights(params, values, weights)
return self.execute_command(TDIGEST_ADD, *params)
return self.execute_command(TDIGEST_ADD, key, *values)

def merge(self, toKey, fromKey):
def merge(self, destination_key, num_keys, *keys, compression=None, override=False):
"""
Merge all of the values from 'fromKey' to 'toKey' sketch.
Merges all of the values from `keys` to 'destination-key' sketch.
It is mandatory to provide the `num_keys` before passing the input keys and
the other (optional) arguments.
If `destination_key` already exists its values are merged with the input keys.
If you wish to override the destination key contents use the `OVERRIDE` parameter.
For more information see `TDIGEST.MERGE <https://redis.io/commands/tdigest.merge>`_.
""" # noqa
return self.execute_command(TDIGEST_MERGE, toKey, fromKey)
params = [destination_key, num_keys, *keys]
if compression is not None:
params.extend(["COMPRESSION", compression])
if override:
params.append("OVERRIDE")
return self.execute_command(TDIGEST_MERGE, *params)

def min(self, key):
"""
Expand All @@ -393,20 +407,21 @@ def max(self, key):
""" # noqa
return self.execute_command(TDIGEST_MAX, key)

def quantile(self, key, quantile):
def quantile(self, key, quantile, *quantiles):
"""
Return double value estimate of the cutoff such that a specified fraction of the data
added to this TDigest would be less than or equal to the cutoff.
Returns estimates of one or more cutoffs such that a specified fraction of the
observations added to this t-digest would be less than or equal to each of the
specified cutoffs. (Multiple quantiles can be returned with one call)
For more information see `TDIGEST.QUANTILE <https://redis.io/commands/tdigest.quantile>`_.
""" # noqa
return self.execute_command(TDIGEST_QUANTILE, key, quantile)
return self.execute_command(TDIGEST_QUANTILE, key, quantile, *quantiles)

def cdf(self, key, value):
def cdf(self, key, value, *values):
"""
Return double fraction of all points added which are <= value.
For more information see `TDIGEST.CDF <https://redis.io/commands/tdigest.cdf>`_.
""" # noqa
return self.execute_command(TDIGEST_CDF, key, value)
return self.execute_command(TDIGEST_CDF, key, value, *values)

def info(self, key):
"""
Expand All @@ -416,6 +431,50 @@ def info(self, key):
""" # noqa
return self.execute_command(TDIGEST_INFO, key)

def trimmed_mean(self, key, low_cut_quantile, high_cut_quantile):
"""
Return mean value from the sketch, excluding observation values outside
the low and high cutoff quantiles.
For more information see `TDIGEST.TRIMMED_MEAN <https://redis.io/commands/tdigest.trimmed_mean>`_.
""" # noqa
return self.execute_command(
TDIGEST_TRIMMED_MEAN, key, low_cut_quantile, high_cut_quantile
)

def rank(self, key, value, *values):
"""
Retrieve the estimated rank of value (the number of observations in the sketch
that are smaller than value + half the number of observations that are equal to value).
For more information see `TDIGEST.RANK <https://redis.io/commands/tdigest.rank>`_.
""" # noqa
return self.execute_command(TDIGEST_RANK, key, value, *values)

def revrank(self, key, value, *values):
"""
Retrieve the estimated rank of value (the number of observations in the sketch
that are larger than value + half the number of observations that are equal to value).
For more information see `TDIGEST.REVRANK <https://redis.io/commands/tdigest.revrank>`_.
""" # noqa
return self.execute_command(TDIGEST_REVRANK, key, value, *values)

def byrank(self, key, rank, *ranks):
"""
Retrieve an estimation of the value with the given rank.
For more information see `TDIGEST.BY_RANK <https://redis.io/commands/tdigest.by_rank>`_.
""" # noqa
return self.execute_command(TDIGEST_BYRANK, key, rank, *ranks)

def byrevrank(self, key, rank, *ranks):
"""
Retrieve an estimation of the value with the given reverse rank.
For more information see `TDIGEST.BY_REVRANK <https://redis.io/commands/tdigest.by_revrank>`_.
""" # noqa
return self.execute_command(TDIGEST_BYREVRANK, key, rank, *ranks)


class CMSCommands:
"""Count-Min Sketch Commands"""
Expand Down
22 changes: 12 additions & 10 deletions redis/commands/bf/info.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,18 +68,20 @@ def __init__(self, args):
class TDigestInfo(object):
compression = None
capacity = None
mergedNodes = None
unmergedNodes = None
mergedWeight = None
unmergedWeight = None
totalCompressions = None
merged_nodes = None
unmerged_nodes = None
merged_weight = None
unmerged_weight = None
total_compressions = None
memory_usage = None

def __init__(self, args):
response = dict(zip(map(nativestr, args[::2]), args[1::2]))
self.compression = response["Compression"]
self.capacity = response["Capacity"]
self.mergedNodes = response["Merged nodes"]
self.unmergedNodes = response["Unmerged nodes"]
self.mergedWeight = response["Merged weight"]
self.unmergedWeight = response["Unmerged weight"]
self.totalCompressions = response["Total compressions"]
self.merged_nodes = response["Merged nodes"]
self.unmerged_nodes = response["Unmerged nodes"]
self.merged_weight = response["Merged weight"]
self.unmerged_weight = response["Unmerged weight"]
self.total_compressions = response["Total compressions"]
self.memory_usage = response["Memory usage"]
19 changes: 19 additions & 0 deletions redis/commands/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,25 @@ async def _split_command_across_slots(self, command: str, *keys: KeyT) -> int:
# Sum up the reply from each command
return sum(await self._execute_pipeline_by_slot(command, slots_to_keys))

async def _execute_pipeline_by_slot(
self, command: str, slots_to_args: Mapping[int, Iterable[EncodableT]]
) -> List[Any]:
if self._initialize:
await self.initialize()
read_from_replicas = self.read_from_replicas and command in READ_COMMANDS
pipe = self.pipeline()
[
pipe.execute_command(
command,
*slot_args,
target_nodes=[
self.nodes_manager.get_node_from_slot(slot, read_from_replicas)
],
)
for slot, slot_args in slots_to_args.items()
]
return await pipe.execute()


class ClusterManagementCommands(ManagementCommands):
"""
Expand Down
Loading