Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Eagerly deserialize responses consisting of list of data [API-1964] #618

Merged
merged 3 commits into from
Mar 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 0 additions & 27 deletions docs/serialization.rst
Original file line number Diff line number Diff line change
Expand Up @@ -278,33 +278,6 @@ All of these APIs will work with the Compact serialization format, once it is
promoted to the stable status.

- Reading OBJECT columns of the SQL results
- Listening for :class:`hazelcast.proxy.reliable_topic.ReliableTopic` messages
- :func:`hazelcast.proxy.list.List.iterator`
- :func:`hazelcast.proxy.list.List.list_iterator`
- :func:`hazelcast.proxy.list.List.get_all`
- :func:`hazelcast.proxy.list.List.sub_list`
- :func:`hazelcast.proxy.map.Map.values`
- :func:`hazelcast.proxy.map.Map.entry_set`
- :func:`hazelcast.proxy.map.Map.execute_on_keys`
- :func:`hazelcast.proxy.map.Map.key_set`
- :func:`hazelcast.proxy.map.Map.project`
- :func:`hazelcast.proxy.map.Map.execute_on_entries`
- :func:`hazelcast.proxy.map.Map.get_all`
- :func:`hazelcast.proxy.multi_map.MultiMap.remove_all`
- :func:`hazelcast.proxy.multi_map.MultiMap.key_set`
- :func:`hazelcast.proxy.multi_map.MultiMap.values`
- :func:`hazelcast.proxy.multi_map.MultiMap.entry_set`
- :func:`hazelcast.proxy.multi_map.MultiMap.get`
- :func:`hazelcast.proxy.queue.Queue.iterator`
- :func:`hazelcast.proxy.replicated_map.ReplicatedMap.values`
- :func:`hazelcast.proxy.replicated_map.ReplicatedMap.entry_set`
- :func:`hazelcast.proxy.replicated_map.ReplicatedMap.key_set`
- :func:`hazelcast.proxy.set.Set.get_all`
- :func:`hazelcast.proxy.ringbuffer.Ringbuffer.read_many`
- :func:`hazelcast.proxy.transactional_map.TransactionalMap.values`
- :func:`hazelcast.proxy.transactional_map.TransactionalMap.key_set`
- :func:`hazelcast.proxy.transactional_multi_map.TransactionalMultiMap.get`
- :func:`hazelcast.proxy.transactional_multi_map.TransactionalMultiMap.remove_all`

IdentifiedDataSerializable Serialization
----------------------------------------
Expand Down
20 changes: 9 additions & 11 deletions hazelcast/proxy/list.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
from hazelcast.proxy.base import PartitionSpecificProxy, ItemEvent, ItemEventType
from hazelcast.types import ItemType
from hazelcast.serialization.compact import SchemaNotReplicatedError
from hazelcast.util import check_not_none, ImmutableLazyDataList
from hazelcast.util import check_not_none, deserialize_list_in_place


class List(PartitionSpecificProxy["BlockingList"], typing.Generic[ItemType]):
Expand Down Expand Up @@ -247,9 +247,8 @@ def get_all(self) -> Future[typing.List[ItemType]]:
"""

def handler(message):
return ImmutableLazyDataList(
list_get_all_codec.decode_response(message), self._to_object
)
data_list = list_get_all_codec.decode_response(message)
return deserialize_list_in_place(data_list, self._to_object)

request = list_get_all_codec.encode_request(self.name)
return self._invoke(request, handler)
Expand All @@ -263,9 +262,8 @@ def iterator(self) -> Future[typing.List[ItemType]]:
"""

def handler(message):
return ImmutableLazyDataList(
list_iterator_codec.decode_response(message), self._to_object
)
data_list = list_iterator_codec.decode_response(message)
return deserialize_list_in_place(data_list, self._to_object)

request = list_iterator_codec.encode_request(self.name)
return self._invoke(request, handler)
Expand Down Expand Up @@ -337,9 +335,8 @@ def list_iterator(self, index: int = 0) -> Future[typing.List[ItemType]]:
"""

def handler(message):
return ImmutableLazyDataList(
list_list_iterator_codec.decode_response(message), self._to_object
)
data_list = list_list_iterator_codec.decode_response(message)
return deserialize_list_in_place(data_list, self._to_object)

request = list_list_iterator_codec.encode_request(self.name, index)
return self._invoke(request, handler)
Expand Down Expand Up @@ -494,7 +491,8 @@ def sub_list(self, from_index: int, to_index: int) -> Future[typing.List[ItemTyp
"""

def handler(message):
return ImmutableLazyDataList(list_sub_codec.decode_response(message), self._to_object)
data_list = list_sub_codec.decode_response(message)
return deserialize_list_in_place(data_list, self._to_object)

request = list_sub_codec.encode_request(self.name, from_index, to_index)
return self._invoke(request, handler)
Expand Down
72 changes: 32 additions & 40 deletions hazelcast/proxy/map.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,9 @@
check_not_none,
thread_id,
to_millis,
ImmutableLazyDataList,
IterationType,
deserialize_entry_list_in_place,
deserialize_list_in_place,
)


Expand Down Expand Up @@ -563,7 +564,8 @@ def handler(message):
predicate.anchor_list = response["anchor_data_list"].as_anchor_list(
self._to_object
)
return ImmutableLazyDataList(response["response"], self._to_object)
entry_data_list = response["response"]
return deserialize_entry_list_in_place(entry_data_list, self._to_object)

request = map_entries_with_paging_predicate_codec.encode_request(self.name, holder)
else:
Expand All @@ -573,17 +575,15 @@ def handler(message):
return self._send_schema_and_retry(e, self.entry_set, predicate)

def handler(message):
return ImmutableLazyDataList(
map_entries_with_predicate_codec.decode_response(message), self._to_object
)
entry_data_list = map_entries_with_predicate_codec.decode_response(message)
return deserialize_entry_list_in_place(entry_data_list, self._to_object)

request = map_entries_with_predicate_codec.encode_request(self.name, predicate_data)
else:

def handler(message):
return ImmutableLazyDataList(
map_entry_set_codec.decode_response(message), self._to_object
)
entry_data_list = map_entry_set_codec.decode_response(message)
return deserialize_entry_list_in_place(entry_data_list, self._to_object)

request = map_entry_set_codec.encode_request(self.name)

Expand Down Expand Up @@ -648,9 +648,8 @@ def execute_on_entries(
)

def handler(message):
return ImmutableLazyDataList(
map_execute_with_predicate_codec.decode_response(message), self._to_object
)
entry_data_list = map_execute_with_predicate_codec.decode_response(message)
return deserialize_entry_list_in_place(entry_data_list, self._to_object)

request = map_execute_with_predicate_codec.encode_request(
self.name, entry_processor_data, predicate_data
Expand All @@ -664,9 +663,8 @@ def handler(message):
)

def handler(message):
return ImmutableLazyDataList(
map_execute_on_all_keys_codec.decode_response(message), self._to_object
)
entry_data_list = map_execute_on_all_keys_codec.decode_response(message)
return deserialize_entry_list_in_place(entry_data_list, self._to_object)

request = map_execute_on_all_keys_codec.encode_request(self.name, entry_processor_data)

Expand Down Expand Up @@ -730,9 +728,8 @@ def execute_on_keys(
return self._send_schema_and_retry(e, self.execute_on_keys, keys, entry_processor)

def handler(message):
return ImmutableLazyDataList(
map_execute_on_keys_codec.decode_response(message), self._to_object
)
entry_data_list = map_execute_on_keys_codec.decode_response(message)
return deserialize_entry_list_in_place(entry_data_list, self._to_object)

request = map_execute_on_keys_codec.encode_request(
self.name, entry_processor_data, key_list
Expand Down Expand Up @@ -943,7 +940,8 @@ def handler(message):
predicate.anchor_list = response["anchor_data_list"].as_anchor_list(
self._to_object
)
return ImmutableLazyDataList(response["response"], self._to_object)
data_list = response["response"]
return deserialize_list_in_place(data_list, self._to_object)

request = map_key_set_with_paging_predicate_codec.encode_request(self.name, holder)
else:
Expand All @@ -953,17 +951,15 @@ def handler(message):
return self._send_schema_and_retry(e, self.key_set, predicate)

def handler(message):
return ImmutableLazyDataList(
map_key_set_with_predicate_codec.decode_response(message), self._to_object
)
data_list = map_key_set_with_predicate_codec.decode_response(message)
return deserialize_list_in_place(data_list, self._to_object)

request = map_key_set_with_predicate_codec.encode_request(self.name, predicate_data)
else:

def handler(message):
return ImmutableLazyDataList(
map_key_set_codec.decode_response(message), self._to_object
)
data_list = map_key_set_codec.decode_response(message)
return deserialize_list_in_place(data_list, self._to_object)

request = map_key_set_codec.encode_request(self.name)

Expand Down Expand Up @@ -1067,9 +1063,8 @@ def project(
return self._send_schema_and_retry(e, self.project, projection, predicate)

def handler(message):
return ImmutableLazyDataList(
map_project_with_predicate_codec.decode_response(message), self._to_object
)
data_list = map_project_with_predicate_codec.decode_response(message)
return deserialize_list_in_place(data_list, self._to_object)

request = map_project_with_predicate_codec.encode_request(
self.name, projection_data, predicate_data
Expand All @@ -1081,9 +1076,8 @@ def handler(message):
return self._send_schema_and_retry(e, self.project, projection, predicate)

def handler(message):
return ImmutableLazyDataList(
map_project_codec.decode_response(message), self._to_object
)
data_list = map_project_codec.decode_response(message)
return deserialize_list_in_place(data_list, self._to_object)

request = map_project_codec.encode_request(self.name, projection_data)

Expand Down Expand Up @@ -1652,7 +1646,8 @@ def handler(message):
predicate.anchor_list = response["anchor_data_list"].as_anchor_list(
self._to_object
)
return ImmutableLazyDataList(response["response"], self._to_object)
data_list = response["response"]
return deserialize_list_in_place(data_list, self._to_object)

request = map_values_with_paging_predicate_codec.encode_request(self.name, holder)
else:
Expand All @@ -1662,17 +1657,15 @@ def handler(message):
return self._send_schema_and_retry(e, self.values, predicate)

def handler(message):
return ImmutableLazyDataList(
map_values_with_predicate_codec.decode_response(message), self._to_object
)
data_list = map_values_with_predicate_codec.decode_response(message)
return deserialize_list_in_place(data_list, self._to_object)

request = map_values_with_predicate_codec.encode_request(self.name, predicate_data)
else:

def handler(message):
return ImmutableLazyDataList(
map_values_codec.decode_response(message), self._to_object
)
data_list = map_values_codec.decode_response(message)
return deserialize_list_in_place(data_list, self._to_object)

request = map_values_codec.encode_request(self.name)

Expand All @@ -1698,9 +1691,8 @@ def _get_all_internal(self, partition_to_keys, futures=None):
futures = []

def handler(message):
return ImmutableLazyDataList(
map_get_all_codec.decode_response(message), self._to_object
)
entry_data_list = map_get_all_codec.decode_response(message)
return deserialize_entry_list_in_place(entry_data_list, self._to_object)

for partition_id, key_dict in partition_to_keys.items():
request = map_get_all_codec.encode_request(self.name, key_dict.values())
Expand Down
34 changes: 17 additions & 17 deletions hazelcast/proxy/multi_map.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,13 @@
from hazelcast.types import ValueType, KeyType
from hazelcast.serialization.data import Data
from hazelcast.serialization.compact import SchemaNotReplicatedError
from hazelcast.util import check_not_none, thread_id, to_millis, ImmutableLazyDataList

from hazelcast.util import (
check_not_none,
thread_id,
to_millis,
deserialize_list_in_place,
deserialize_entry_list_in_place,
)

EntryEventCallable = typing.Callable[[EntryEvent[KeyType, ValueType]], None]

Expand Down Expand Up @@ -213,9 +218,8 @@ def entry_set(self) -> Future[typing.List[typing.Tuple[KeyType, ValueType]]]:
"""

def handler(message):
return ImmutableLazyDataList(
multi_map_entry_set_codec.decode_response(message), self._to_object
)
entry_data_list = multi_map_entry_set_codec.decode_response(message)
return deserialize_entry_list_in_place(entry_data_list, self._to_object)

request = multi_map_entry_set_codec.encode_request(self.name)
return self._invoke(request, handler)
Expand Down Expand Up @@ -246,9 +250,8 @@ def get(self, key: KeyType) -> Future[typing.Optional[typing.List[ValueType]]]:
return self._send_schema_and_retry(e, self.get, key)

def handler(message):
return ImmutableLazyDataList(
multi_map_get_codec.decode_response(message), self._to_object
)
data_list = multi_map_get_codec.decode_response(message)
return deserialize_list_in_place(data_list, self._to_object)

request = multi_map_get_codec.encode_request(self.name, key_data, thread_id())
return self._invoke_on_key(request, key_data, handler)
Expand Down Expand Up @@ -314,9 +317,8 @@ def key_set(self) -> Future[typing.List[KeyType]]:
"""

def handler(message):
return ImmutableLazyDataList(
multi_map_key_set_codec.decode_response(message), self._to_object
)
data_list = multi_map_key_set_codec.decode_response(message)
return deserialize_list_in_place(data_list, self._to_object)

request = multi_map_key_set_codec.encode_request(self.name)
return self._invoke(request, handler)
Expand Down Expand Up @@ -410,9 +412,8 @@ def remove_all(self, key: KeyType) -> Future[typing.List[ValueType]]:
check_not_none(key, "key can't be None")

def handler(message):
return ImmutableLazyDataList(
multi_map_remove_codec.decode_response(message), self._to_object
)
data_list = multi_map_remove_codec.decode_response(message)
return deserialize_list_in_place(data_list, self._to_object)

try:
key_data = self._to_data(key)
Expand Down Expand Up @@ -551,9 +552,8 @@ def values(self) -> Future[typing.List[ValueType]]:
"""

def handler(message):
return ImmutableLazyDataList(
multi_map_values_codec.decode_response(message), self._to_object
)
data_list = multi_map_values_codec.decode_response(message)
return deserialize_list_in_place(data_list, self._to_object)

request = multi_map_values_codec.encode_request(self.name)
return self._invoke(request, handler)
Expand Down
7 changes: 3 additions & 4 deletions hazelcast/proxy/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
from hazelcast.proxy.base import PartitionSpecificProxy, ItemEvent, ItemEventType
from hazelcast.types import ItemType
from hazelcast.serialization.compact import SchemaNotReplicatedError
from hazelcast.util import check_not_none, to_millis, ImmutableLazyDataList
from hazelcast.util import check_not_none, to_millis, deserialize_list_in_place


class Queue(PartitionSpecificProxy["BlockingQueue"], typing.Generic[ItemType]):
Expand Down Expand Up @@ -202,9 +202,8 @@ def iterator(self) -> Future[typing.List[ItemType]]:
"""

def handler(message):
return ImmutableLazyDataList(
queue_iterator_codec.decode_response(message), self._to_object
)
data_list = queue_iterator_codec.decode_response(message)
return deserialize_list_in_place(data_list, self._to_object)

request = queue_iterator_codec.encode_request(self.name)
return self._invoke(request, handler)
Expand Down
4 changes: 2 additions & 2 deletions hazelcast/proxy/reliable_topic.py
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ def _handle_next_batch(self, future):
result = future.result()

# Check if there are any messages lost since the last read
# and whether or not the listener can tolerate that.
# and whether the listener can tolerate that.
lost_count = (result.next_sequence_to_read_from - result.read_count) - self._sequence
if lost_count != 0 and not self._is_loss_tolerable(lost_count):
self.cancel()
Expand Down Expand Up @@ -254,7 +254,7 @@ def _handle_next_batch(self, future):

topic_message = TopicMessage(
self._topic_name,
self._to_object(message.payload),
message.payload,
message.publish_time,
member,
)
Expand Down
Loading