From 98d06f8e16189a5ea175d833976fd954c2829cf7 Mon Sep 17 00:00:00 2001 From: mdumandag Date: Mon, 20 Mar 2023 10:37:01 +0300 Subject: [PATCH 1/3] Eagerly deserialize responses consisting of list of data We were performing lazy deserialization on APIs that return a list of data, such as map#values(). This was not going to work with Compact, as after returning the list to the user, there might be Compact serialized data on the list which we don't have the schema on the client yet. If it was a normal response, the client would have fetched the schema, but it is not possible with these lazy APIs. We perform the deserialization while getting the list items and this is a sync API. We can't perform a blocking call there, because there is a chance that this will be executed in the reactor thread, which would result in a deadlock. So, the only possible way of making these APIs work with Compact is to convert them to eager deserialization. This PR removes lazy deserialization in everything apart from SQL, which will be handled in another PR. --- docs/serialization.rst | 27 --- hazelcast/proxy/list.py | 20 +- hazelcast/proxy/map.py | 72 +++--- hazelcast/proxy/multi_map.py | 34 +-- hazelcast/proxy/queue.py | 7 +- hazelcast/proxy/reliable_topic.py | 4 +- hazelcast/proxy/replicated_map.py | 23 +- hazelcast/proxy/ringbuffer.py | 31 ++- hazelcast/proxy/set.py | 7 +- hazelcast/proxy/transactional_map.py | 24 +- hazelcast/proxy/transactional_multi_map.py | 12 +- hazelcast/serialization/objects.py | 12 +- hazelcast/serialization/service.py | 7 +- hazelcast/util.py | 55 ++--- .../compact_compatibility_test.py | 227 ++++++++++++------ 15 files changed, 304 insertions(+), 258 deletions(-) diff --git a/docs/serialization.rst b/docs/serialization.rst index ab65caac0e..c4a640e6b2 100644 --- a/docs/serialization.rst +++ b/docs/serialization.rst @@ -278,33 +278,6 @@ All of these APIs will work with the Compact serialization format, once it is promoted to the stable status. - Reading OBJECT columns of the SQL results -- Listening for :class:`hazelcast.proxy.reliable_topic.ReliableTopic` messages -- :func:`hazelcast.proxy.list.List.iterator` -- :func:`hazelcast.proxy.list.List.list_iterator` -- :func:`hazelcast.proxy.list.List.get_all` -- :func:`hazelcast.proxy.list.List.sub_list` -- :func:`hazelcast.proxy.map.Map.values` -- :func:`hazelcast.proxy.map.Map.entry_set` -- :func:`hazelcast.proxy.map.Map.execute_on_keys` -- :func:`hazelcast.proxy.map.Map.key_set` -- :func:`hazelcast.proxy.map.Map.project` -- :func:`hazelcast.proxy.map.Map.execute_on_entries` -- :func:`hazelcast.proxy.map.Map.get_all` -- :func:`hazelcast.proxy.multi_map.MultiMap.remove_all` -- :func:`hazelcast.proxy.multi_map.MultiMap.key_set` -- :func:`hazelcast.proxy.multi_map.MultiMap.values` -- :func:`hazelcast.proxy.multi_map.MultiMap.entry_set` -- :func:`hazelcast.proxy.multi_map.MultiMap.get` -- :func:`hazelcast.proxy.queue.Queue.iterator` -- :func:`hazelcast.proxy.replicated_map.ReplicatedMap.values` -- :func:`hazelcast.proxy.replicated_map.ReplicatedMap.entry_set` -- :func:`hazelcast.proxy.replicated_map.ReplicatedMap.key_set` -- :func:`hazelcast.proxy.set.Set.get_all` -- :func:`hazelcast.proxy.ringbuffer.Ringbuffer.read_many` -- :func:`hazelcast.proxy.transactional_map.TransactionalMap.values` -- :func:`hazelcast.proxy.transactional_map.TransactionalMap.key_set` -- :func:`hazelcast.proxy.transactional_multi_map.TransactionalMultiMap.get` -- :func:`hazelcast.proxy.transactional_multi_map.TransactionalMultiMap.remove_all` IdentifiedDataSerializable Serialization ---------------------------------------- diff --git a/hazelcast/proxy/list.py b/hazelcast/proxy/list.py index 827a861bcc..376cbf763a 100644 --- a/hazelcast/proxy/list.py +++ b/hazelcast/proxy/list.py @@ -29,7 +29,7 @@ from hazelcast.proxy.base import PartitionSpecificProxy, ItemEvent, ItemEventType from hazelcast.types import ItemType from hazelcast.serialization.compact import SchemaNotReplicatedError -from hazelcast.util import check_not_none, ImmutableLazyDataList +from hazelcast.util import check_not_none, deserialize_list_in_place class List(PartitionSpecificProxy["BlockingList"], typing.Generic[ItemType]): @@ -247,9 +247,8 @@ def get_all(self) -> Future[typing.List[ItemType]]: """ def handler(message): - return ImmutableLazyDataList( - list_get_all_codec.decode_response(message), self._to_object - ) + data_list = list_get_all_codec.decode_response(message) + return deserialize_list_in_place(data_list, self._to_object) request = list_get_all_codec.encode_request(self.name) return self._invoke(request, handler) @@ -263,9 +262,8 @@ def iterator(self) -> Future[typing.List[ItemType]]: """ def handler(message): - return ImmutableLazyDataList( - list_iterator_codec.decode_response(message), self._to_object - ) + data_list = list_iterator_codec.decode_response(message) + return deserialize_list_in_place(data_list, self._to_object) request = list_iterator_codec.encode_request(self.name) return self._invoke(request, handler) @@ -337,9 +335,8 @@ def list_iterator(self, index: int = 0) -> Future[typing.List[ItemType]]: """ def handler(message): - return ImmutableLazyDataList( - list_list_iterator_codec.decode_response(message), self._to_object - ) + data_list = list_list_iterator_codec.decode_response(message) + return deserialize_list_in_place(data_list, self._to_object) request = list_list_iterator_codec.encode_request(self.name, index) return self._invoke(request, handler) @@ -494,7 +491,8 @@ def sub_list(self, from_index: int, to_index: int) -> Future[typing.List[ItemTyp """ def handler(message): - return ImmutableLazyDataList(list_sub_codec.decode_response(message), self._to_object) + data_list = list_sub_codec.decode_response(message) + return deserialize_list_in_place(data_list, self._to_object) request = list_sub_codec.encode_request(self.name, from_index, to_index) return self._invoke(request, handler) diff --git a/hazelcast/proxy/map.py b/hazelcast/proxy/map.py index 5c3c0f5e55..5c90fe0ffe 100644 --- a/hazelcast/proxy/map.py +++ b/hazelcast/proxy/map.py @@ -87,8 +87,9 @@ check_not_none, thread_id, to_millis, - ImmutableLazyDataList, IterationType, + deserialize_entry_list_in_place, + deserialize_list_in_place, ) @@ -563,7 +564,8 @@ def handler(message): predicate.anchor_list = response["anchor_data_list"].as_anchor_list( self._to_object ) - return ImmutableLazyDataList(response["response"], self._to_object) + entry_data_list = response["response"] + return deserialize_entry_list_in_place(entry_data_list, self._to_object) request = map_entries_with_paging_predicate_codec.encode_request(self.name, holder) else: @@ -573,17 +575,15 @@ def handler(message): return self._send_schema_and_retry(e, self.entry_set, predicate) def handler(message): - return ImmutableLazyDataList( - map_entries_with_predicate_codec.decode_response(message), self._to_object - ) + entry_data_list = map_entries_with_predicate_codec.decode_response(message) + return deserialize_entry_list_in_place(entry_data_list, self._to_object) request = map_entries_with_predicate_codec.encode_request(self.name, predicate_data) else: def handler(message): - return ImmutableLazyDataList( - map_entry_set_codec.decode_response(message), self._to_object - ) + entry_data_list = map_entry_set_codec.decode_response(message) + return deserialize_entry_list_in_place(entry_data_list, self._to_object) request = map_entry_set_codec.encode_request(self.name) @@ -648,9 +648,8 @@ def execute_on_entries( ) def handler(message): - return ImmutableLazyDataList( - map_execute_with_predicate_codec.decode_response(message), self._to_object - ) + entry_data_list = map_execute_with_predicate_codec.decode_response(message) + return deserialize_entry_list_in_place(entry_data_list, self._to_object) request = map_execute_with_predicate_codec.encode_request( self.name, entry_processor_data, predicate_data @@ -664,9 +663,8 @@ def handler(message): ) def handler(message): - return ImmutableLazyDataList( - map_execute_on_all_keys_codec.decode_response(message), self._to_object - ) + entry_data_list = map_execute_on_all_keys_codec.decode_response(message) + return deserialize_entry_list_in_place(entry_data_list, self._to_object) request = map_execute_on_all_keys_codec.encode_request(self.name, entry_processor_data) @@ -730,9 +728,8 @@ def execute_on_keys( return self._send_schema_and_retry(e, self.execute_on_keys, keys, entry_processor) def handler(message): - return ImmutableLazyDataList( - map_execute_on_keys_codec.decode_response(message), self._to_object - ) + entry_data_list = map_execute_on_keys_codec.decode_response(message) + return deserialize_entry_list_in_place(entry_data_list, self._to_object) request = map_execute_on_keys_codec.encode_request( self.name, entry_processor_data, key_list @@ -943,7 +940,8 @@ def handler(message): predicate.anchor_list = response["anchor_data_list"].as_anchor_list( self._to_object ) - return ImmutableLazyDataList(response["response"], self._to_object) + data_list = response["response"] + return deserialize_list_in_place(data_list, self._to_object) request = map_key_set_with_paging_predicate_codec.encode_request(self.name, holder) else: @@ -953,17 +951,15 @@ def handler(message): return self._send_schema_and_retry(e, self.key_set, predicate) def handler(message): - return ImmutableLazyDataList( - map_key_set_with_predicate_codec.decode_response(message), self._to_object - ) + data_list = map_key_set_with_predicate_codec.decode_response(message) + return deserialize_list_in_place(data_list, self._to_object) request = map_key_set_with_predicate_codec.encode_request(self.name, predicate_data) else: def handler(message): - return ImmutableLazyDataList( - map_key_set_codec.decode_response(message), self._to_object - ) + data_list = map_key_set_codec.decode_response(message) + return deserialize_list_in_place(data_list, self._to_object) request = map_key_set_codec.encode_request(self.name) @@ -1067,9 +1063,8 @@ def project( return self._send_schema_and_retry(e, self.project, projection, predicate) def handler(message): - return ImmutableLazyDataList( - map_project_with_predicate_codec.decode_response(message), self._to_object - ) + data_list = map_project_with_predicate_codec.decode_response(message) + return deserialize_list_in_place(data_list, self._to_object) request = map_project_with_predicate_codec.encode_request( self.name, projection_data, predicate_data @@ -1081,9 +1076,8 @@ def handler(message): return self._send_schema_and_retry(e, self.project, projection, predicate) def handler(message): - return ImmutableLazyDataList( - map_project_codec.decode_response(message), self._to_object - ) + data_list = map_project_codec.decode_response(message) + return deserialize_list_in_place(data_list, self._to_object) request = map_project_codec.encode_request(self.name, projection_data) @@ -1652,7 +1646,8 @@ def handler(message): predicate.anchor_list = response["anchor_data_list"].as_anchor_list( self._to_object ) - return ImmutableLazyDataList(response["response"], self._to_object) + data_list = response["response"] + return deserialize_list_in_place(data_list, self._to_object) request = map_values_with_paging_predicate_codec.encode_request(self.name, holder) else: @@ -1662,17 +1657,15 @@ def handler(message): return self._send_schema_and_retry(e, self.values, predicate) def handler(message): - return ImmutableLazyDataList( - map_values_with_predicate_codec.decode_response(message), self._to_object - ) + data_list = map_values_with_predicate_codec.decode_response(message) + return deserialize_list_in_place(data_list, self._to_object) request = map_values_with_predicate_codec.encode_request(self.name, predicate_data) else: def handler(message): - return ImmutableLazyDataList( - map_values_codec.decode_response(message), self._to_object - ) + data_list = map_values_codec.decode_response(message) + return deserialize_list_in_place(data_list, self._to_object) request = map_values_codec.encode_request(self.name) @@ -1698,9 +1691,8 @@ def _get_all_internal(self, partition_to_keys, futures=None): futures = [] def handler(message): - return ImmutableLazyDataList( - map_get_all_codec.decode_response(message), self._to_object - ) + entry_data_list = map_get_all_codec.decode_response(message) + return deserialize_entry_list_in_place(entry_data_list, self._to_object) for partition_id, key_dict in partition_to_keys.items(): request = map_get_all_codec.encode_request(self.name, key_dict.values()) diff --git a/hazelcast/proxy/multi_map.py b/hazelcast/proxy/multi_map.py index de36b15ea6..703c8e196f 100644 --- a/hazelcast/proxy/multi_map.py +++ b/hazelcast/proxy/multi_map.py @@ -31,8 +31,13 @@ from hazelcast.types import ValueType, KeyType from hazelcast.serialization.data import Data from hazelcast.serialization.compact import SchemaNotReplicatedError -from hazelcast.util import check_not_none, thread_id, to_millis, ImmutableLazyDataList - +from hazelcast.util import ( + check_not_none, + thread_id, + to_millis, + deserialize_list_in_place, + deserialize_entry_list_in_place, +) EntryEventCallable = typing.Callable[[EntryEvent[KeyType, ValueType]], None] @@ -213,9 +218,8 @@ def entry_set(self) -> Future[typing.List[typing.Tuple[KeyType, ValueType]]]: """ def handler(message): - return ImmutableLazyDataList( - multi_map_entry_set_codec.decode_response(message), self._to_object - ) + entry_data_list = multi_map_entry_set_codec.decode_response(message) + return deserialize_entry_list_in_place(entry_data_list, self._to_object) request = multi_map_entry_set_codec.encode_request(self.name) return self._invoke(request, handler) @@ -246,9 +250,8 @@ def get(self, key: KeyType) -> Future[typing.Optional[typing.List[ValueType]]]: return self._send_schema_and_retry(e, self.get, key) def handler(message): - return ImmutableLazyDataList( - multi_map_get_codec.decode_response(message), self._to_object - ) + data_list = multi_map_get_codec.decode_response(message) + return deserialize_list_in_place(data_list, self._to_object) request = multi_map_get_codec.encode_request(self.name, key_data, thread_id()) return self._invoke_on_key(request, key_data, handler) @@ -314,9 +317,8 @@ def key_set(self) -> Future[typing.List[KeyType]]: """ def handler(message): - return ImmutableLazyDataList( - multi_map_key_set_codec.decode_response(message), self._to_object - ) + data_list = multi_map_key_set_codec.decode_response(message) + return deserialize_list_in_place(data_list, self._to_object) request = multi_map_key_set_codec.encode_request(self.name) return self._invoke(request, handler) @@ -410,9 +412,8 @@ def remove_all(self, key: KeyType) -> Future[typing.List[ValueType]]: check_not_none(key, "key can't be None") def handler(message): - return ImmutableLazyDataList( - multi_map_remove_codec.decode_response(message), self._to_object - ) + data_list = multi_map_remove_codec.decode_response(message) + return deserialize_list_in_place(data_list, self._to_object) try: key_data = self._to_data(key) @@ -551,9 +552,8 @@ def values(self) -> Future[typing.List[ValueType]]: """ def handler(message): - return ImmutableLazyDataList( - multi_map_values_codec.decode_response(message), self._to_object - ) + data_list = multi_map_values_codec.decode_response(message) + return deserialize_list_in_place(data_list, self._to_object) request = multi_map_values_codec.encode_request(self.name) return self._invoke(request, handler) diff --git a/hazelcast/proxy/queue.py b/hazelcast/proxy/queue.py index efc0d64719..c9df42dab2 100644 --- a/hazelcast/proxy/queue.py +++ b/hazelcast/proxy/queue.py @@ -26,7 +26,7 @@ from hazelcast.proxy.base import PartitionSpecificProxy, ItemEvent, ItemEventType from hazelcast.types import ItemType from hazelcast.serialization.compact import SchemaNotReplicatedError -from hazelcast.util import check_not_none, to_millis, ImmutableLazyDataList +from hazelcast.util import check_not_none, to_millis, deserialize_list_in_place class Queue(PartitionSpecificProxy["BlockingQueue"], typing.Generic[ItemType]): @@ -202,9 +202,8 @@ def iterator(self) -> Future[typing.List[ItemType]]: """ def handler(message): - return ImmutableLazyDataList( - queue_iterator_codec.decode_response(message), self._to_object - ) + data_list = queue_iterator_codec.decode_response(message) + return deserialize_list_in_place(data_list, self._to_object) request = queue_iterator_codec.encode_request(self.name) return self._invoke(request, handler) diff --git a/hazelcast/proxy/reliable_topic.py b/hazelcast/proxy/reliable_topic.py index cde0ca62aa..60188a239e 100644 --- a/hazelcast/proxy/reliable_topic.py +++ b/hazelcast/proxy/reliable_topic.py @@ -226,7 +226,7 @@ def _handle_next_batch(self, future): result = future.result() # Check if there are any messages lost since the last read - # and whether or not the listener can tolerate that. + # and whether the listener can tolerate that. lost_count = (result.next_sequence_to_read_from - result.read_count) - self._sequence if lost_count != 0 and not self._is_loss_tolerable(lost_count): self.cancel() @@ -254,7 +254,7 @@ def _handle_next_batch(self, future): topic_message = TopicMessage( self._topic_name, - self._to_object(message.payload), + message.payload, message.publish_time, member, ) diff --git a/hazelcast/proxy/replicated_map.py b/hazelcast/proxy/replicated_map.py index 657a5d9dc5..70fe58d771 100644 --- a/hazelcast/proxy/replicated_map.py +++ b/hazelcast/proxy/replicated_map.py @@ -25,8 +25,12 @@ from hazelcast.proxy.base import Proxy, EntryEvent, EntryEventType from hazelcast.types import KeyType, ValueType from hazelcast.serialization.compact import SchemaNotReplicatedError -from hazelcast.util import to_millis, check_not_none, ImmutableLazyDataList - +from hazelcast.util import ( + to_millis, + check_not_none, + deserialize_list_in_place, + deserialize_entry_list_in_place, +) EntryEventCallable = typing.Callable[[EntryEvent[KeyType, ValueType]], None] @@ -252,9 +256,8 @@ def entry_set(self) -> Future[typing.List[typing.Tuple[KeyType, ValueType]]]: """ def handler(message): - return ImmutableLazyDataList( - replicated_map_entry_set_codec.decode_response(message), self._to_object - ) + entry_data_list = replicated_map_entry_set_codec.decode_response(message) + return deserialize_entry_list_in_place(entry_data_list, self._to_object) request = replicated_map_entry_set_codec.encode_request(self.name) return self._invoke_on_partition(request, self._partition_id, handler) @@ -309,9 +312,8 @@ def key_set(self) -> Future[typing.List[KeyType]]: """ def handler(message): - return ImmutableLazyDataList( - replicated_map_key_set_codec.decode_response(message), self._to_object - ) + data_list = replicated_map_key_set_codec.decode_response(message) + return deserialize_list_in_place(data_list, self._to_object) request = replicated_map_key_set_codec.encode_request(self.name) return self._invoke_on_partition(request, self._partition_id, handler) @@ -439,9 +441,8 @@ def values(self) -> Future[typing.List[ValueType]]: """ def handler(message): - return ImmutableLazyDataList( - replicated_map_values_codec.decode_response(message), self._to_object - ) + data_list = replicated_map_values_codec.decode_response(message) + return deserialize_list_in_place(data_list, self._to_object) request = replicated_map_values_codec.encode_request(self.name) return self._invoke_on_partition(request, self._partition_id, handler) diff --git a/hazelcast/proxy/ringbuffer.py b/hazelcast/proxy/ringbuffer.py index cd4df96efa..09de90238f 100644 --- a/hazelcast/proxy/ringbuffer.py +++ b/hazelcast/proxy/ringbuffer.py @@ -1,4 +1,5 @@ import typing +from collections.abc import Sequence, Iterable from hazelcast.future import ImmediateFuture, Future from hazelcast.protocol.codec import ( @@ -20,7 +21,7 @@ check_not_none, check_not_empty, check_true, - ImmutableLazyDataList, + deserialize_list_in_place, ) OVERFLOW_POLICY_OVERWRITE = 0 @@ -52,7 +53,7 @@ """ -class ReadResult(ImmutableLazyDataList): +class ReadResult(Sequence): """Defines the result of a :func:`Ringbuffer.read_many` operation.""" SEQUENCE_UNAVAILABLE = -1 @@ -61,11 +62,12 @@ class ReadResult(ImmutableLazyDataList): members not returning the sequence). """ - def __init__(self, read_count, next_seq, items, item_seqs, to_object): - super(ReadResult, self).__init__(items, to_object) + def __init__(self, read_count, next_seq, item_seqs, items): + super(ReadResult, self).__init__() self._read_count = read_count self._next_seq = next_seq self._item_seqs = item_seqs + self._items = items @property def read_count(self) -> int: @@ -90,7 +92,7 @@ def size(self) -> int: See Also: :attr:`read_count` """ - return len(self._list_data) + return len(self._items) @property def next_sequence_to_read_from(self) -> int: @@ -129,6 +131,21 @@ def get_sequence(self, index: int) -> int: """ return self._item_seqs[index] + def __getitem__(self, index: int): + return self._items[index] + + def __len__(self) -> int: + return len(self._items) + + def __eq__(self, other): + # This implementation is copied from the + # now removed super class. It is implemented + # to maintain backward compatibility. + if not isinstance(other, Iterable): + return False + + return self._items == other + class Ringbuffer(PartitionSpecificProxy["BlockingRingbuffer"], typing.Generic[ItemType]): """A Ringbuffer is an append-only data-structure where the content is @@ -379,12 +396,12 @@ def read_many( def handler(message): response = ringbuffer_read_many_codec.decode_response(message) + items = deserialize_list_in_place(response["items"], self._to_object) read_count = response["read_count"] next_seq = response["next_seq"] - items = response["items"] item_seqs = response["item_seqs"] - return ReadResult(read_count, next_seq, items, item_seqs, self._to_object) + return ReadResult(read_count, next_seq, item_seqs, items) def continuation(future): # Since the first call to capacity diff --git a/hazelcast/proxy/set.py b/hazelcast/proxy/set.py index 32ba5c021d..6fd1e83acd 100644 --- a/hazelcast/proxy/set.py +++ b/hazelcast/proxy/set.py @@ -20,7 +20,7 @@ from hazelcast.proxy.base import PartitionSpecificProxy, ItemEvent, ItemEventType from hazelcast.types import ItemType from hazelcast.serialization.compact import SchemaNotReplicatedError -from hazelcast.util import check_not_none, ImmutableLazyDataList +from hazelcast.util import check_not_none, deserialize_list_in_place class Set(PartitionSpecificProxy, typing.Generic[ItemType]): @@ -164,9 +164,8 @@ def get_all(self) -> Future[typing.List[ItemType]]: """ def handler(message): - return ImmutableLazyDataList( - set_get_all_codec.decode_response(message), self._to_object - ) + data_list = set_get_all_codec.decode_response(message) + return deserialize_list_in_place(data_list, self._to_object) request = set_get_all_codec.encode_request(self.name) return self._invoke(request, handler) diff --git a/hazelcast/proxy/transactional_map.py b/hazelcast/proxy/transactional_map.py index e3aca43ac2..64d8ad367d 100644 --- a/hazelcast/proxy/transactional_map.py +++ b/hazelcast/proxy/transactional_map.py @@ -23,7 +23,7 @@ from hazelcast.proxy.base import TransactionalProxy from hazelcast.types import ValueType, KeyType from hazelcast.serialization.compact import SchemaNotReplicatedError -from hazelcast.util import check_not_none, to_millis, thread_id, ImmutableLazyDataList +from hazelcast.util import check_not_none, to_millis, thread_id, deserialize_list_in_place class TransactionalMap(TransactionalProxy, typing.Generic[KeyType, ValueType]): @@ -383,10 +383,8 @@ def key_set(self, predicate: Predicate = None) -> typing.List[KeyType]: return self.key_set(predicate) def handler(message): - return ImmutableLazyDataList( - transactional_map_key_set_with_predicate_codec.decode_response(message), - self._to_object, - ) + data_list = transactional_map_key_set_with_predicate_codec.decode_response(message) + return deserialize_list_in_place(data_list, self._to_object) request = transactional_map_key_set_with_predicate_codec.encode_request( self.name, self.transaction.id, thread_id(), predicate_data @@ -394,9 +392,8 @@ def handler(message): else: def handler(message): - return ImmutableLazyDataList( - transactional_map_key_set_codec.decode_response(message), self._to_object - ) + data_list = transactional_map_key_set_codec.decode_response(message) + return deserialize_list_in_place(data_list, self._to_object) request = transactional_map_key_set_codec.encode_request( self.name, self.transaction.id, thread_id() @@ -422,10 +419,8 @@ def values(self, predicate: Predicate = None) -> typing.List[ValueType]: return self.values(predicate) def handler(message): - return ImmutableLazyDataList( - transactional_map_values_with_predicate_codec.decode_response(message), - self._to_object, - ) + data_list = transactional_map_values_with_predicate_codec.decode_response(message) + return deserialize_list_in_place(data_list, self._to_object) request = transactional_map_values_with_predicate_codec.encode_request( self.name, self.transaction.id, thread_id(), predicate_data @@ -433,9 +428,8 @@ def handler(message): else: def handler(message): - return ImmutableLazyDataList( - transactional_map_values_codec.decode_response(message), self._to_object - ) + data_list = transactional_map_values_codec.decode_response(message) + return deserialize_list_in_place(data_list, self._to_object) request = transactional_map_values_codec.encode_request( self.name, self.transaction.id, thread_id() diff --git a/hazelcast/proxy/transactional_multi_map.py b/hazelcast/proxy/transactional_multi_map.py index 84881ab5c5..7ab424f52a 100644 --- a/hazelcast/proxy/transactional_multi_map.py +++ b/hazelcast/proxy/transactional_multi_map.py @@ -11,7 +11,7 @@ from hazelcast.proxy.base import TransactionalProxy from hazelcast.types import KeyType, ValueType from hazelcast.serialization.compact import SchemaNotReplicatedError -from hazelcast.util import check_not_none, thread_id, ImmutableLazyDataList +from hazelcast.util import check_not_none, thread_id, deserialize_list_in_place class TransactionalMultiMap(TransactionalProxy, typing.Generic[KeyType, ValueType]): @@ -64,9 +64,8 @@ def get(self, key: KeyType) -> typing.Optional[typing.List[ValueType]]: return self.get(key) def handler(message): - return ImmutableLazyDataList( - transactional_multi_map_get_codec.decode_response(message), self._to_object - ) + data_list = transactional_multi_map_get_codec.decode_response(message) + return deserialize_list_in_place(data_list, self._to_object) request = transactional_multi_map_get_codec.encode_request( self.name, self.transaction.id, thread_id(), key_data @@ -116,9 +115,8 @@ def remove_all(self, key: KeyType) -> typing.List[ValueType]: return self.remove_all(key) def handler(message): - return ImmutableLazyDataList( - transactional_multi_map_remove_codec.decode_response(message), self._to_object - ) + data_list = transactional_multi_map_remove_codec.decode_response(message) + return deserialize_list_in_place(data_list, self._to_object) request = transactional_multi_map_remove_codec.encode_request( self.name, self.transaction.id, thread_id(), key_data diff --git a/hazelcast/serialization/objects.py b/hazelcast/serialization/objects.py index f47330c659..65485c7730 100644 --- a/hazelcast/serialization/objects.py +++ b/hazelcast/serialization/objects.py @@ -20,17 +20,25 @@ class ReliableTopicMessage(IdentifiedDataSerializable): FACTORY_ID = -9 CLASS_ID = 2 - def __init__(self, publish_time=None, publisher_address=None, payload=None): + def __init__( + self, + publish_time=None, + publisher_address=None, + payload=None, + serialization_service=None, + ): # publish_time is in seconds but server sends/expects to receive # it in milliseconds. self.publish_time = publish_time self.publisher_address = publisher_address self.payload = payload + self._serialization_service = serialization_service def read_data(self, object_data_input): self.publish_time = object_data_input.read_long() / 1000.0 self.publisher_address = object_data_input.read_object() - self.payload = _read_data_from(object_data_input) + # Eagerly deserialize the payload on reads + self.payload = self._serialization_service.to_object(_read_data_from(object_data_input)) def write_data(self, object_data_output): object_data_output.write_long(to_millis(self.publish_time)) diff --git a/hazelcast/serialization/service.py b/hazelcast/serialization/service.py index 4dc4b48683..9b62cfcfe1 100644 --- a/hazelcast/serialization/service.py +++ b/hazelcast/serialization/service.py @@ -220,11 +220,12 @@ def destroy(self): def compact_stream_serializer(self) -> CompactStreamSerializer: return self._compact_stream_serializer - @staticmethod - def _get_builtin_identified_factories(): + def _get_builtin_identified_factories(self): return { ReliableTopicMessage.FACTORY_ID: { - ReliableTopicMessage.CLASS_ID: ReliableTopicMessage, + ReliableTopicMessage.CLASS_ID: lambda: ReliableTopicMessage( + serialization_service=self + ), }, CanonicalizingHashSet.FACTORY_ID: { CanonicalizingHashSet.CLASS_ID: CanonicalizingHashSet, diff --git a/hazelcast/util.py b/hazelcast/util.py index 2f5e4ffb16..b4eca9fbb2 100644 --- a/hazelcast/util.py +++ b/hazelcast/util.py @@ -124,46 +124,6 @@ def add(self, count: int): self._counter += count -class ImmutableLazyDataList(Sequence): - def __init__(self, list_data, to_object): - super(ImmutableLazyDataList, self).__init__() - self._list_data = list_data - self._list_obj = [None] * len(self._list_data) - self.to_object = to_object - - def __contains__(self, value): - return super(ImmutableLazyDataList, self).__contains__(value) - - def __len__(self): - return self._list_data.__len__() - - def __getitem__(self, index): - val = self._list_obj[index] - if not val: - data = self._list_data[index] - if isinstance(data, tuple): - (key, value) = data - self._list_obj[index] = (self.to_object(key), self.to_object(value)) - else: - self._list_obj[index] = self.to_object(data) - return self._list_obj[index] - - def __eq__(self, other): - if not isinstance(other, Iterable): - return False - self._populate() - return self._list_obj == other - - def _populate(self): - for index, data in enumerate(self._list_data): - if not self._list_obj[index]: - self.__getitem__(index) - - def __repr__(self): - self._populate() - return str(self._list_obj) - - # Serialization Utilities @@ -175,6 +135,21 @@ def get_portable_version(portable, default_version): return version +def deserialize_list_in_place(data_list, to_object_fn): + for i in range(len(data_list)): + data_list[i] = to_object_fn(data_list[i]) + + return data_list + + +def deserialize_entry_list_in_place(entry_data_list, to_object_fn): + for i in range(len(entry_data_list)): + item = entry_data_list[i] + entry_data_list[i] = (to_object_fn(item[0]), to_object_fn(item[1])) + + return entry_data_list + + # Version utilities UNKNOWN_VERSION = -1 MAJOR_VERSION_MULTIPLIER = 10000 diff --git a/tests/integration/backward_compatible/serialization/compact_compatibility/compact_compatibility_test.py b/tests/integration/backward_compatible/serialization/compact_compatibility/compact_compatibility_test.py index b5f5d8ba0c..8d76fecfc8 100644 --- a/tests/integration/backward_compatible/serialization/compact_compatibility/compact_compatibility_test.py +++ b/tests/integration/backward_compatible/serialization/compact_compatibility/compact_compatibility_test.py @@ -509,16 +509,32 @@ def test_get(self): self._add_from_another_client(OUTER_COMPACT_INSTANCE) self.assertEqual(OUTER_COMPACT_INSTANCE, self.list.get(0)) + def test_get_all(self): + self._add_from_another_client(INNER_COMPACT_INSTANCE, OUTER_COMPACT_INSTANCE) + self.assertEqual([INNER_COMPACT_INSTANCE, OUTER_COMPACT_INSTANCE], self.list.get_all()) + def test_index_of(self): self.assertEqual(-1, self.list.index_of(OUTER_COMPACT_INSTANCE)) self.list.add(OUTER_COMPACT_INSTANCE) self.assertEqual(0, self.list.index_of(OUTER_COMPACT_INSTANCE)) + def test_iterator(self): + self._add_from_another_client(INNER_COMPACT_INSTANCE, OUTER_COMPACT_INSTANCE) + self.assertEqual([INNER_COMPACT_INSTANCE, OUTER_COMPACT_INSTANCE], self.list.iterator()) + def test_last_index_of(self): self.assertEqual(-1, self.list.last_index_of(OUTER_COMPACT_INSTANCE)) self.list.add(OUTER_COMPACT_INSTANCE) self.assertEqual(0, self.list.last_index_of(OUTER_COMPACT_INSTANCE)) + def test_list_iterator(self): + self._add_from_another_client( + INNER_COMPACT_INSTANCE, INNER_COMPACT_INSTANCE, OUTER_COMPACT_INSTANCE + ) + self.assertEqual( + [INNER_COMPACT_INSTANCE, OUTER_COMPACT_INSTANCE], self.list.list_iterator(1) + ) + def test_remove(self): self.assertFalse(self.list.remove(OUTER_COMPACT_INSTANCE)) self.list.add(OUTER_COMPACT_INSTANCE) @@ -545,10 +561,19 @@ def test_set_at(self): self.assertEqual(42, self.list.set_at(0, OUTER_COMPACT_INSTANCE)) self.assertEqual(OUTER_COMPACT_INSTANCE, self.list.set_at(0, INNER_COMPACT_INSTANCE)) - def _add_from_another_client(self, value): + def test_sublist(self): + self._add_from_another_client( + INNER_COMPACT_INSTANCE, + INNER_COMPACT_INSTANCE, + OUTER_COMPACT_INSTANCE, + OUTER_COMPACT_INSTANCE, + ) + self.assertEqual([INNER_COMPACT_INSTANCE, OUTER_COMPACT_INSTANCE], self.list.sub_list(1, 3)) + + def _add_from_another_client(self, *values): other_client = self.create_client(self.client_config) other_client_list = other_client.get_list(self.list.name).blocking() - other_client_list.add(value) + other_client_list.add_all(values) class MapCompatibilityTest(CompactCompatibilityBase): @@ -635,21 +660,21 @@ def test_delete(self): self.map.delete(OUTER_COMPACT_INSTANCE) self.assertIsNone(self.map.get(OUTER_COMPACT_INSTANCE)) + def test_entry_set(self): + self._put_from_another_client(INNER_COMPACT_INSTANCE, OUTER_COMPACT_INSTANCE) + self.assertEqual([(INNER_COMPACT_INSTANCE, OUTER_COMPACT_INSTANCE)], self.map.entry_set()) + def test_entry_set_with_predicate(self): - # Put an entry from the same client to register these schemas - # to its local registry, so that the lazy-deserialization works. - self.map.put(OUTER_COMPACT_INSTANCE, INNER_COMPACT_INSTANCE) + self._put_from_another_client(INNER_COMPACT_INSTANCE, OUTER_COMPACT_INSTANCE) self.assertEqual( - [(OUTER_COMPACT_INSTANCE, INNER_COMPACT_INSTANCE)], + [(INNER_COMPACT_INSTANCE, OUTER_COMPACT_INSTANCE)], self.map.entry_set(CompactPredicate()), ) def test_entry_set_with_paging_predicate(self): - # Put an entry from the same client to register these schemas - # to its local registry, so that the lazy-deserialization works. - self.map.put(OUTER_COMPACT_INSTANCE, INNER_COMPACT_INSTANCE) + self._put_from_another_client(INNER_COMPACT_INSTANCE, OUTER_COMPACT_INSTANCE) self.assertEqual( - [(OUTER_COMPACT_INSTANCE, INNER_COMPACT_INSTANCE)], + [(INNER_COMPACT_INSTANCE, OUTER_COMPACT_INSTANCE)], self.map.entry_set(paging(CompactPredicate(), 1)), ) @@ -659,16 +684,16 @@ def test_evict(self): self.assertTrue(self.map.evict(OUTER_COMPACT_INSTANCE)) def test_execute_on_entries(self): - self.map.put(OUTER_COMPACT_INSTANCE, INNER_COMPACT_INSTANCE) + self._put_from_another_client(INNER_COMPACT_INSTANCE, OUTER_COMPACT_INSTANCE) self.assertEqual( - [(OUTER_COMPACT_INSTANCE, OUTER_COMPACT_INSTANCE)], + [(INNER_COMPACT_INSTANCE, OUTER_COMPACT_INSTANCE)], self.map.execute_on_entries(CompactReturningEntryProcessor()), ) def test_execute_on_entries_predicate(self): - self.map.put(OUTER_COMPACT_INSTANCE, INNER_COMPACT_INSTANCE) + self._put_from_another_client(INNER_COMPACT_INSTANCE, OUTER_COMPACT_INSTANCE) self.assertEqual( - [(OUTER_COMPACT_INSTANCE, OUTER_COMPACT_INSTANCE)], + [(INNER_COMPACT_INSTANCE, OUTER_COMPACT_INSTANCE)], self.map.execute_on_entries(CompactReturningEntryProcessor(), CompactPredicate()), ) @@ -680,10 +705,10 @@ def test_execute_on_key(self): ) def test_execute_on_keys(self): - self.map.put(OUTER_COMPACT_INSTANCE, INNER_COMPACT_INSTANCE) + self._put_from_another_client(INNER_COMPACT_INSTANCE, OUTER_COMPACT_INSTANCE) self.assertEqual( - [(OUTER_COMPACT_INSTANCE, OUTER_COMPACT_INSTANCE)], - self.map.execute_on_keys([OUTER_COMPACT_INSTANCE], CompactReturningEntryProcessor()), + [(INNER_COMPACT_INSTANCE, OUTER_COMPACT_INSTANCE)], + self.map.execute_on_keys([INNER_COMPACT_INSTANCE], CompactReturningEntryProcessor()), ) def test_force_unlock(self): @@ -697,12 +722,10 @@ def test_get(self): self.assertEqual(INNER_COMPACT_INSTANCE, self.map.get(OUTER_COMPACT_INSTANCE)) def test_get_all(self): - # Put an entry from the same client to register these schemas - # to its local registry, so that the lazy-deserialization works. - self.map.put(OUTER_COMPACT_INSTANCE, INNER_COMPACT_INSTANCE) + self._put_from_another_client(INNER_COMPACT_INSTANCE, OUTER_COMPACT_INSTANCE) self.assertEqual( - {OUTER_COMPACT_INSTANCE: INNER_COMPACT_INSTANCE}, - self.map.get_all([OUTER_COMPACT_INSTANCE]), + {INNER_COMPACT_INSTANCE: OUTER_COMPACT_INSTANCE}, + self.map.get_all([INNER_COMPACT_INSTANCE]), ) def test_get_entry_view(self): @@ -716,19 +739,19 @@ def test_is_locked(self): self.map.lock(OUTER_COMPACT_INSTANCE) self.assertTrue(self.map.is_locked(OUTER_COMPACT_INSTANCE)) + def test_key_set(self): + self._put_from_another_client(OUTER_COMPACT_INSTANCE, INNER_COMPACT_INSTANCE) + self.assertEqual([OUTER_COMPACT_INSTANCE], self.map.key_set()) + def test_key_set_with_predicate(self): - # Put an entry from the same client to register these schemas - # to its local registry, so that the lazy-deserialization works. - self.map.put(OUTER_COMPACT_INSTANCE, INNER_COMPACT_INSTANCE) + self._put_from_another_client(OUTER_COMPACT_INSTANCE, INNER_COMPACT_INSTANCE) self.assertEqual( [OUTER_COMPACT_INSTANCE], self.map.key_set(CompactPredicate()), ) def test_key_set_with_paging_predicate(self): - # Put an entry from the same client to register these schemas - # to its local registry, so that the lazy-deserialization works. - self.map.put(OUTER_COMPACT_INSTANCE, INNER_COMPACT_INSTANCE) + self._put_from_another_client(OUTER_COMPACT_INSTANCE, INNER_COMPACT_INSTANCE) self.assertEqual( [OUTER_COMPACT_INSTANCE], self.map.key_set(paging(CompactPredicate(), 1)), @@ -752,18 +775,14 @@ def test_lock(self): self.assertTrue(self.map.is_locked(OUTER_COMPACT_INSTANCE)) def test_project(self): - # Put an entry from the same client to register these schemas - # to its local registry, so that the lazy-deserialization works. - self.map.put(OUTER_COMPACT_INSTANCE, INNER_COMPACT_INSTANCE) + self._put_from_another_client(INNER_COMPACT_INSTANCE, OUTER_COMPACT_INSTANCE) self.assertEqual( [OUTER_COMPACT_INSTANCE], self.map.project(CompactReturningProjection()), ) def test_project_with_predicate(self): - # Put an entry from the same client to register these schemas - # to its local registry, so that the lazy-deserialization works. - self.map.put(OUTER_COMPACT_INSTANCE, INNER_COMPACT_INSTANCE) + self._put_from_another_client(INNER_COMPACT_INSTANCE, OUTER_COMPACT_INSTANCE) self.assertEqual( [OUTER_COMPACT_INSTANCE], self.map.project(CompactReturningProjection(), CompactPredicate()), @@ -854,16 +873,16 @@ def test_unlock(self): # of key to the server. pass + def test_values(self): + self._put_from_another_client(INNER_COMPACT_INSTANCE, OUTER_COMPACT_INSTANCE) + self.assertEqual([OUTER_COMPACT_INSTANCE], self.map.values()) + def test_values_with_predicate(self): - # Put an entry from the same client to register these schemas - # to its local registry, so that the lazy-deserialization works. - self.map.put(INNER_COMPACT_INSTANCE, OUTER_COMPACT_INSTANCE) + self._put_from_another_client(INNER_COMPACT_INSTANCE, OUTER_COMPACT_INSTANCE) self.assertEqual([OUTER_COMPACT_INSTANCE], self.map.values(CompactPredicate())) def test_values_with_paging_predicate(self): - # Put an entry from the same client to register these schemas - # to its local registry, so that the lazy-deserialization works. - self.map.put(INNER_COMPACT_INSTANCE, OUTER_COMPACT_INSTANCE) + self._put_from_another_client(INNER_COMPACT_INSTANCE, OUTER_COMPACT_INSTANCE) self.assertEqual([OUTER_COMPACT_INSTANCE], self.map.values(paging(CompactPredicate(), 1))) def _put_from_another_client(self, key, value): @@ -958,10 +977,15 @@ def test_contains_entry(self): self.multi_map.contains_entry(INNER_COMPACT_INSTANCE, OUTER_COMPACT_INSTANCE) ) + def test_entry_set(self): + self._put_from_another_client(INNER_COMPACT_INSTANCE, OUTER_COMPACT_INSTANCE) + self.assertEqual( + [(INNER_COMPACT_INSTANCE, OUTER_COMPACT_INSTANCE)], self.multi_map.entry_set() + ) + def test_get(self): - self.assertEqual([], self.multi_map.get(OUTER_COMPACT_INSTANCE)) - self.multi_map.put(OUTER_COMPACT_INSTANCE, INNER_COMPACT_INSTANCE) - self.assertEqual([INNER_COMPACT_INSTANCE], self.multi_map.get(OUTER_COMPACT_INSTANCE)) + self._put_from_another_client(INNER_COMPACT_INSTANCE, OUTER_COMPACT_INSTANCE) + self.assertEqual([OUTER_COMPACT_INSTANCE], self.multi_map.get(INNER_COMPACT_INSTANCE)) def test_is_locked(self): self.assertFalse(self.multi_map.is_locked(OUTER_COMPACT_INSTANCE)) @@ -978,16 +1002,19 @@ def test_lock(self): self.multi_map.lock(OUTER_COMPACT_INSTANCE) self.assertTrue(self.multi_map.is_locked(OUTER_COMPACT_INSTANCE)) + def test_key_set(self): + self._put_from_another_client(OUTER_COMPACT_INSTANCE, INNER_COMPACT_INSTANCE) + self.assertEqual([OUTER_COMPACT_INSTANCE], self.multi_map.key_set()) + def test_remove(self): self.assertFalse(self.multi_map.remove(INNER_COMPACT_INSTANCE, OUTER_COMPACT_INSTANCE)) self.multi_map.put(INNER_COMPACT_INSTANCE, OUTER_COMPACT_INSTANCE) self.assertTrue(self.multi_map.remove(INNER_COMPACT_INSTANCE, OUTER_COMPACT_INSTANCE)) def test_remove_all(self): - self.assertEqual([], self.multi_map.remove_all(OUTER_COMPACT_INSTANCE)) - self.multi_map.put(OUTER_COMPACT_INSTANCE, INNER_COMPACT_INSTANCE) + self._put_from_another_client(INNER_COMPACT_INSTANCE, OUTER_COMPACT_INSTANCE) self.assertEqual( - [INNER_COMPACT_INSTANCE], self.multi_map.remove_all(OUTER_COMPACT_INSTANCE) + [OUTER_COMPACT_INSTANCE], self.multi_map.remove_all(INNER_COMPACT_INSTANCE) ) def test_put(self): @@ -1018,6 +1045,10 @@ def test_unlock(self): # of key to the server. pass + def test_values(self): + self._put_from_another_client(INNER_COMPACT_INSTANCE, OUTER_COMPACT_INSTANCE) + self.assertEqual([OUTER_COMPACT_INSTANCE], self.multi_map.values()) + def _assert_entry_event(self, events): self._put_from_another_client(INNER_COMPACT_INSTANCE, OUTER_COMPACT_INSTANCE) @@ -1085,11 +1116,14 @@ def test_contains_all(self): def test_drain_to(self): target = [] - self._add_from_another_client(INNER_COMPACT_INSTANCE) - self._add_from_another_client(OUTER_COMPACT_INSTANCE) + self._add_from_another_client(INNER_COMPACT_INSTANCE, OUTER_COMPACT_INSTANCE) self.assertEqual(2, self.queue.drain_to(target)) self.assertEqual([INNER_COMPACT_INSTANCE, OUTER_COMPACT_INSTANCE], target) + def test_iterator(self): + self._add_from_another_client(INNER_COMPACT_INSTANCE, OUTER_COMPACT_INSTANCE) + self.assertEqual([INNER_COMPACT_INSTANCE, OUTER_COMPACT_INSTANCE], self.queue.iterator()) + def test_offer(self): self.assertTrue(self.queue.offer(OUTER_COMPACT_INSTANCE)) @@ -1126,10 +1160,10 @@ def test_take(self): self._add_from_another_client(OUTER_COMPACT_INSTANCE) self.assertEqual(OUTER_COMPACT_INSTANCE, self.queue.take()) - def _add_from_another_client(self, item): + def _add_from_another_client(self, *items): other_client = self.create_client(self.client_config) other_client_queue = other_client.get_queue(self.queue.name).blocking() - other_client_queue.add(item) + other_client_queue.add_all(items) class ReliableTopicCompactCompatibilityTest(CompactCompatibilityBase): @@ -1141,6 +1175,23 @@ def tearDown(self) -> None: self.reliable_topic.destroy() super().tearDown() + def test_add_listener(self): + messages = [] + + def listener(message): + messages.append(message) + + self.reliable_topic.add_listener(listener) + + self._publish_from_another_client(INNER_COMPACT_INSTANCE, OUTER_COMPACT_INSTANCE) + + def assertion(): + self.assertEqual(2, len(messages)) + self.assertEqual(INNER_COMPACT_INSTANCE, messages[0].message) + self.assertEqual(OUTER_COMPACT_INSTANCE, messages[1].message) + + self.assertTrueEventually(assertion) + def test_publish(self): messages = [] @@ -1175,6 +1226,13 @@ def assertion(): self.assertTrueEventually(assertion) + def _publish_from_another_client(self, *messages): + other_client = self.create_client(self.client_config) + other_client_reliable_topic = other_client.get_reliable_topic( + self.reliable_topic.name + ).blocking() + other_client_reliable_topic.publish_all(messages) + class ReplicatedMapCompactCompatibilityTest(CompactCompatibilityBase): def setUp(self) -> None: @@ -1234,11 +1292,21 @@ def test_contains_value(self): self.replicated_map.put(INNER_COMPACT_INSTANCE, OUTER_COMPACT_INSTANCE) self.assertTrue(self.replicated_map.contains_value(OUTER_COMPACT_INSTANCE)) + def test_entry_set(self): + self._put_from_another_client(INNER_COMPACT_INSTANCE, OUTER_COMPACT_INSTANCE) + self.assertEqual( + [(INNER_COMPACT_INSTANCE, OUTER_COMPACT_INSTANCE)], self.replicated_map.entry_set() + ) + def test_get(self): self.assertIsNone(self.replicated_map.get(OUTER_COMPACT_INSTANCE)) self.replicated_map.put(OUTER_COMPACT_INSTANCE, INNER_COMPACT_INSTANCE) self.assertEqual(INNER_COMPACT_INSTANCE, self.replicated_map.get(OUTER_COMPACT_INSTANCE)) + def test_key_set(self): + self._put_from_another_client(OUTER_COMPACT_INSTANCE, INNER_COMPACT_INSTANCE) + self.assertEqual([OUTER_COMPACT_INSTANCE], self.replicated_map.key_set()) + def test_put(self): self.assertIsNone(self.replicated_map.put(INNER_COMPACT_INSTANCE, OUTER_COMPACT_INSTANCE)) self.assertEqual(OUTER_COMPACT_INSTANCE, self.replicated_map.get(INNER_COMPACT_INSTANCE)) @@ -1262,6 +1330,10 @@ def test_remove(self): self.replicated_map.put(OUTER_COMPACT_INSTANCE, INNER_COMPACT_INSTANCE) self.assertEqual(INNER_COMPACT_INSTANCE, self.replicated_map.remove(OUTER_COMPACT_INSTANCE)) + def test_values(self): + self._put_from_another_client(INNER_COMPACT_INSTANCE, OUTER_COMPACT_INSTANCE) + self.assertEqual([OUTER_COMPACT_INSTANCE], self.replicated_map.values()) + def _assert_entry_event(self, events): self._put_from_another_client(INNER_COMPACT_INSTANCE, OUTER_COMPACT_INSTANCE) @@ -1307,19 +1379,24 @@ def test_read_one(self): self._add_from_another_client(OUTER_COMPACT_INSTANCE) self.assertEqual(OUTER_COMPACT_INSTANCE, self.ringbuffer.read_one(0)) + def test_read_many(self): + self._add_from_another_client(INNER_COMPACT_INSTANCE, OUTER_COMPACT_INSTANCE) + self.assertEqual( + [INNER_COMPACT_INSTANCE, OUTER_COMPACT_INSTANCE], + self.ringbuffer.read_many(0, 0, 2), + ) + def test_read_many_with_filter(self): - # Add an item from the same client to register these schemas - # to its local registry, so that the lazy-deserialization works. - self.ringbuffer.add(OUTER_COMPACT_INSTANCE) + self._add_from_another_client(INNER_COMPACT_INSTANCE, OUTER_COMPACT_INSTANCE) self.assertEqual( - [OUTER_COMPACT_INSTANCE], - self.ringbuffer.read_many(0, 0, 1, filter=CompactFilter()), + [INNER_COMPACT_INSTANCE, OUTER_COMPACT_INSTANCE], + self.ringbuffer.read_many(0, 0, 2, filter=CompactFilter()), ) - def _add_from_another_client(self, item): + def _add_from_another_client(self, *items): other_client = self.create_client(self.client_config) other_client_ringbuffer = other_client.get_ringbuffer(self.ringbuffer.name).blocking() - other_client_ringbuffer.add(item) + other_client_ringbuffer.add_all(items) class SetCompactCompatibilityTest(CompactCompatibilityBase): @@ -1367,6 +1444,10 @@ def test_contains_all(self): self.set.add_all([INNER_COMPACT_INSTANCE, OUTER_COMPACT_INSTANCE]) self.assertTrue(self.set.contains_all([INNER_COMPACT_INSTANCE, OUTER_COMPACT_INSTANCE])) + def test_get_all(self): + self._add_from_another_client(INNER_COMPACT_INSTANCE, OUTER_COMPACT_INSTANCE) + self.assertCountEqual([INNER_COMPACT_INSTANCE, OUTER_COMPACT_INSTANCE], self.set.get_all()) + def test_remove(self): self.assertFalse(self.set.remove(OUTER_COMPACT_INSTANCE)) self.set.add(OUTER_COMPACT_INSTANCE) @@ -1383,10 +1464,10 @@ def test_retain_all(self): self.set.add_all(items + ["x"]) self.assertTrue(self.set.retain_all(items)) - def _add_from_another_client(self, item): + def _add_from_another_client(self, *items): other_client = self.create_client(self.client_config) other_client_set = other_client.get_set(self.set.name).blocking() - other_client_set.add(item) + other_client_set.add_all(items) class TopicCompactCompatibilityTest(CompactCompatibilityBase): @@ -1574,16 +1655,28 @@ def test_delete(self): self.assertTrue(self.map.is_empty()) + def test_key_set(self): + self._put_from_another_client(OUTER_COMPACT_INSTANCE, INNER_COMPACT_INSTANCE) + with self.transaction: + transactional_map = self._get_transactional_map() + self.assertEqual([OUTER_COMPACT_INSTANCE], transactional_map.key_set()) + def test_key_set_with_predicate(self): - self.map.put(INNER_COMPACT_INSTANCE, OUTER_COMPACT_INSTANCE) + self._put_from_another_client(OUTER_COMPACT_INSTANCE, INNER_COMPACT_INSTANCE) with self.transaction: transactional_map = self._get_transactional_map() self.assertEqual( - [INNER_COMPACT_INSTANCE], transactional_map.key_set(CompactPredicate()) + [OUTER_COMPACT_INSTANCE], transactional_map.key_set(CompactPredicate()) ) + def test_values(self): + self._put_from_another_client(INNER_COMPACT_INSTANCE, OUTER_COMPACT_INSTANCE) + with self.transaction: + transactional_map = self._get_transactional_map() + self.assertEqual([OUTER_COMPACT_INSTANCE], transactional_map.values()) + def test_values_with_predicate(self): - self.map.put(INNER_COMPACT_INSTANCE, OUTER_COMPACT_INSTANCE) + self._put_from_another_client(INNER_COMPACT_INSTANCE, OUTER_COMPACT_INSTANCE) with self.transaction: transactional_map = self._get_transactional_map() self.assertEqual([OUTER_COMPACT_INSTANCE], transactional_map.values(CompactPredicate())) @@ -1616,13 +1709,11 @@ def test_put(self): ) def test_get(self): - # Put an entry from the same client to register these schemas - # to its local registry, so that the lazy-deserialization works. - self.multi_map.put(OUTER_COMPACT_INSTANCE, INNER_COMPACT_INSTANCE) + self._put_from_another_client(INNER_COMPACT_INSTANCE, OUTER_COMPACT_INSTANCE) with self.transaction: transactional_multi_map = self._get_transactional_multi_map() self.assertEqual( - [INNER_COMPACT_INSTANCE], transactional_multi_map.get(OUTER_COMPACT_INSTANCE) + [OUTER_COMPACT_INSTANCE], transactional_multi_map.get(INNER_COMPACT_INSTANCE) ) def test_remove(self): @@ -1634,11 +1725,11 @@ def test_remove(self): ) def test_remove_all(self): - self._put_from_another_client(OUTER_COMPACT_INSTANCE, INNER_COMPACT_INSTANCE) + self._put_from_another_client(INNER_COMPACT_INSTANCE, OUTER_COMPACT_INSTANCE) with self.transaction: transactional_multi_map = self._get_transactional_multi_map() self.assertEqual( - [INNER_COMPACT_INSTANCE], transactional_multi_map.remove_all(OUTER_COMPACT_INSTANCE) + [OUTER_COMPACT_INSTANCE], transactional_multi_map.remove_all(INNER_COMPACT_INSTANCE) ) def test_value_count(self): From 68b6fc279ca6cb98d93dac077c66848ad1cd3658 Mon Sep 17 00:00:00 2001 From: mdumandag Date: Mon, 20 Mar 2023 11:23:20 +0300 Subject: [PATCH 2/3] fix mypy error --- hazelcast/proxy/ringbuffer.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/hazelcast/proxy/ringbuffer.py b/hazelcast/proxy/ringbuffer.py index 09de90238f..73669b04e6 100644 --- a/hazelcast/proxy/ringbuffer.py +++ b/hazelcast/proxy/ringbuffer.py @@ -131,13 +131,13 @@ def get_sequence(self, index: int) -> int: """ return self._item_seqs[index] - def __getitem__(self, index: int): + def __getitem__(self, index: typing.Union[int, slice]) -> typing.Any: return self._items[index] def __len__(self) -> int: return len(self._items) - def __eq__(self, other): + def __eq__(self, other) -> bool: # This implementation is copied from the # now removed super class. It is implemented # to maintain backward compatibility. From c8ff71bc35d74b6655c8125ac7d814a00c79814c Mon Sep 17 00:00:00 2001 From: mdumandag Date: Thu, 23 Mar 2023 17:17:14 +0300 Subject: [PATCH 3/3] address review comments --- hazelcast/util.py | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/hazelcast/util.py b/hazelcast/util.py index b4eca9fbb2..49fb4f2a0c 100644 --- a/hazelcast/util.py +++ b/hazelcast/util.py @@ -1,12 +1,13 @@ import random import threading import time +import typing import uuid -from collections.abc import Sequence, Iterable - from hazelcast.serialization import UUID_MSB_SHIFT, UUID_LSB_MASK, UUID_MSB_MASK +if typing.TYPE_CHECKING: + from hazelcast.serialization.data import Data DEFAULT_ADDRESS = "127.0.0.1" DEFAULT_PORT = 5701 @@ -135,14 +136,19 @@ def get_portable_version(portable, default_version): return version -def deserialize_list_in_place(data_list, to_object_fn): +def deserialize_list_in_place( + data_list: typing.List["Data"], to_object_fn: typing.Callable[["Data"], typing.Any] +) -> typing.List: for i in range(len(data_list)): data_list[i] = to_object_fn(data_list[i]) return data_list -def deserialize_entry_list_in_place(entry_data_list, to_object_fn): +def deserialize_entry_list_in_place( + entry_data_list: typing.List[typing.Tuple["Data", "Data"]], + to_object_fn: typing.Callable[["Data"], typing.Any], +) -> typing.List[typing.Tuple[typing.Any, typing.Any]]: for i in range(len(entry_data_list)): item = entry_data_list[i] entry_data_list[i] = (to_object_fn(item[0]), to_object_fn(item[1]))