Skip to content

Commit

Permalink
Fix frame reader (#436)
Browse files Browse the repository at this point in the history
In the frame reader, we were storing the frame size including the
header (frame size + flags which is 6 bytes). Also, after reading
the frame size from the header, we were incrementing the bytes_read.

That approach works for small frames, that we can read with one go,
but was not working for frames that are large. For the same frame,
the next time we call read_frame, the length of the reader (the data
we haven't read from the buffer) was equal to frame size, but the
variable frame_size was equal to `frame_size + HEADER_SIZE`. So,
we were not reading the data that was actually there due to some
faulty length check.

We were able to read the data that is in the buffer after some long
time, when the response for the client ping request comes as it
was incrementing the length of the reader enough to pass the length
check.

To fix that, now, the frame_size field holds the frame size excluding
the header size.
  • Loading branch information
mdumandag authored Jul 27, 2021
1 parent 6e860c2 commit 711f4a6
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 10 deletions.
19 changes: 10 additions & 9 deletions hazelcast/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -693,7 +693,8 @@ def __init__(self, builder):
self._builder = builder
self._bytes_read = 0
self._bytes_written = 0
self._frame_size = 0
# Size of the frame excluding the header (SIZE_OF_FRAME_LENGTH_AND_FLAGS bytes)
self._frame_size = -1
self._frame_flags = 0
self._message = None

Expand All @@ -719,22 +720,21 @@ def _read_message(self):
return None

def _read_frame(self):
n = self.length
if n < SIZE_OF_FRAME_LENGTH_AND_FLAGS:
# we don't have even the frame length and flags ready
return False
if self._frame_size == -1:
if self.length < SIZE_OF_FRAME_LENGTH_AND_FLAGS:
# we don't have even the frame length and flags ready
return False

if self._frame_size == 0:
self._read_frame_size_and_flags()

if n < self._frame_size:
if self.length < self._frame_size:
return False

self._buf.seek(self._bytes_read)
size = self._frame_size - SIZE_OF_FRAME_LENGTH_AND_FLAGS
size = self._frame_size
data = self._buf.read(size)
self._bytes_read += size
self._frame_size = 0
self._frame_size = -1
# No need to reset flags since it will be overwritten on the next read_frame_size_and_flags call
frame = Frame(data, self._frame_flags)
if not self._message:
Expand All @@ -747,6 +747,7 @@ def _read_frame_size_and_flags(self):
self._buf.seek(self._bytes_read)
header_data = self._buf.read(SIZE_OF_FRAME_LENGTH_AND_FLAGS)
self._frame_size, self._frame_flags = _frame_header.unpack_from(header_data, 0)
self._frame_size -= SIZE_OF_FRAME_LENGTH_AND_FLAGS
self._bytes_read += SIZE_OF_FRAME_LENGTH_AND_FLAGS

def _reset(self):
Expand Down
15 changes: 14 additions & 1 deletion tests/integration/backward_compatible/proxy/map_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,13 @@
write_string_to_output,
read_string_from_input,
)
from tests.util import random_string, event_collector, fill_map, is_server_version_older_than
from tests.util import (
random_string,
event_collector,
fill_map,
is_server_version_older_than,
get_current_timestamp,
)
from hazelcast import six
from hazelcast.six.moves import range

Expand Down Expand Up @@ -396,6 +402,13 @@ def test_put_get(self):
self.assertIsNone(self.map.put("key", "value"))
self.assertEqual(self.map.get("key"), "value")

def test_put_get_large_payload(self):
payload = bytearray(os.urandom(16 * 1024 * 1024))
start = get_current_timestamp()
self.assertIsNone(self.map.put("key", payload))
self.assertEqual(self.map.get("key"), payload)
self.assertLessEqual(get_current_timestamp() - start, 5)

def test_put_get2(self):
val = "x" * 5000

Expand Down

0 comments on commit 711f4a6

Please sign in to comment.