Skip to content

Commit

Permalink
[API-879] Wait for listener deregistration requests (#475)
Browse files Browse the repository at this point in the history
* Wait for listener deregistration requests

With this PR, we will wait for all the listener deregistration
requests to finish before returning `True` to the user.

It is mainly a precaution to the possible problems that might result
in services that expect sync removal of listeners.

While doing that, I have also dived into one of our technical debts
and corrected the implementation of the `combine_futures`.

It now waits for all futures to complete in all scenarios, as it should be,
and sets the result to the first exceptional completion, if some of
the input futures complete as such.

* add exception info to log message and remove unused import
  • Loading branch information
mdumandag authored Sep 16, 2021
1 parent f614580 commit e9dfb0a
Show file tree
Hide file tree
Showing 4 changed files with 116 additions and 28 deletions.
49 changes: 37 additions & 12 deletions hazelcast/future.py
Original file line number Diff line number Diff line change
Expand Up @@ -254,31 +254,56 @@ def add_done_callback(self, callback):
def combine_futures(futures):
"""Combines set of Futures.
It waits for the completion of the all input Futures regardless
of their output.
The returned Future completes with the list of the results of the input
Futures, respecting the input order.
If one of the input Futures completes exceptionally, the returned
Future also completes exceptionally. In case of multiple exceptional
completions, the returned Future will be completed with the first
exceptional result.
Args:
futures (list[Future]): List of Futures to be combined.
Returns:
Future: Result of the combination.
"""
expected = len(futures)
results = []
if expected == 0:
count = len(futures)
results = [None] * count
if count == 0:
return ImmediateFuture(results)

completed = AtomicInteger()
combined = Future()
errors = []

def done(f):
if not combined.done():
if f.is_success(): # TODO: ensure ordering of results as original list
results.append(f.result())
if completed.get_and_increment() + 1 == expected:
combined.set_result(results)
def done(future, index):
if future.is_success():
results[index] = future.result()
else:
if not errors:
# We are fine with this check-then-act.
# At most, we will end up with couple of
# errors stored in case of the concurrent calls.
# The idea behind this check is to try to minimize
# the number of errors we store without
# synchronization, as we only need the first error.
errors.append((future.exception(), future.traceback()))

if count == completed.increment_and_get():
if errors:
first_exception, first_traceback = errors[0]
combined.set_exception(first_exception, first_traceback)
else:
combined.set_exception(f.exception(), f.traceback())
combined.set_result(results)

for future in futures:
future.add_done_callback(done)
for index, future in enumerate(futures):
# Capture the index in the closure or else we
# will only update the last element.
future.add_done_callback(lambda f, captured_index=index: done(f, captured_index))

return combined

Expand Down
31 changes: 18 additions & 13 deletions hazelcast/listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ def deregister_listener(self, user_registration_id):
if not listener_registration:
return ImmediateFuture(False)

futures = []
for connection, event_registration in six.iteritems(
listener_registration.connection_registrations
):
Expand All @@ -105,24 +106,28 @@ def deregister_listener(self, user_registration_id):
)
self._invocation_service.invoke(invocation)

def handler(f, connection=connection):
def handler(f, captured_connection=connection):
e = f.exception()
if e:
if isinstance(
e, (HazelcastClientNotActiveError, IOError, TargetDisconnectedError)
):
return

_logger.warning(
"Deregistration of listener with ID %s has failed for address %s",
user_registration_id,
connection.remote_address,
)
if not e:
return

if isinstance(
e, (HazelcastClientNotActiveError, IOError, TargetDisconnectedError)
):
return

_logger.warning(
"Deregistration of listener with ID %s has failed for address %s: %s",
user_registration_id,
captured_connection.remote_address,
e,
)

invocation.future.add_done_callback(handler)
futures.append(invocation.future)

listener_registration.connection_registrations.clear()
return ImmediateFuture(True)
return combine_futures(futures).continue_with(lambda _: True)

def handle_client_message(self, message, correlation_id):
handler = self._event_handlers.get(correlation_id, None)
Expand Down
10 changes: 10 additions & 0 deletions hazelcast/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,16 @@ def get_and_increment(self):
self._counter += 1
return res

def increment_and_get(self):
"""Increments the current value and returns it.
Returns:
int: Incremented value of AtomicInteger.
"""
with self._mux:
self._counter += 1
return self._counter

def get(self):
"""Returns the current value.
Expand Down
54 changes: 51 additions & 3 deletions tests/unit/future_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -394,7 +394,7 @@ def callback(f):


class CombineFutureTest(unittest.TestCase):
def test_combine_futures(self):
def test_combining(self):
f1, f2, f3 = Future(), Future(), Future()

combined = combine_futures([f1, f2, f3])
Expand All @@ -408,7 +408,7 @@ def test_combine_futures(self):
f3.set_result("done3")
self.assertEqual(combined.result(), ["done1", "done2", "done3"])

def test_combine_futures_exception(self):
def test_exceptional_result(self):
f1, f2, f3 = Future(), Future(), Future()

combined = combine_futures([f1, f2, f3])
Expand All @@ -420,11 +420,59 @@ def test_combine_futures_exception(self):

self.assertEqual(e, combined.exception())

def test_combine_futures_with_empty_list(self):
def test_empty_input(self):
combined = combine_futures([])
self.assertTrue(combined.done())
self.assertEqual([], combined.result())

def test_completion_order(self):
# Verifies that the returned list order is
# consistent with the order of input futures
# regardless of the completion order of futures.
f1, f2, f3 = Future(), Future(), Future()
combined = combine_futures([f1, f2, f3])

f3.set_result(3)
f1.set_result(1)
f2.set_result(2)

self.assertEqual([1, 2, 3], combined.result())

def test_waiting_for_all_inputs(self):
# Verifies that we wait for all futures, even if some
# of them fails.
f1, f2, f3 = Future(), Future(), Future()
combined = combine_futures([f1, f2, f3])

e = RuntimeError("expected")
f1.set_exception(e)
self.assertFalse(combined.done())
f2.set_result(1)
self.assertFalse(combined.done())
f3.set_result(2)
self.assertTrue(combined.done())
self.assertFalse(combined.is_success())
self.assertEqual(e, combined.exception())

def test_returning_first_error(self):
# Verifies that we return the first error occurred, even
# if more than one of the input futures completed
# exceptionally.
f1, f2, f3 = Future(), Future(), Future()
combined = combine_futures([f1, f2, f3])

e1 = RuntimeError("expected")
f1.set_exception(e1)
self.assertFalse(combined.done())
e2 = RuntimeError("expected2")
f2.set_exception(e2)
self.assertFalse(combined.done())
e3 = RuntimeError("expected3")
f3.set_exception(e3)
self.assertTrue(combined.done())
self.assertFalse(combined.is_success())
self.assertEqual(e1, combined.exception())


class MakeBlockingTest(unittest.TestCase):
class Calculator(object):
Expand Down

0 comments on commit e9dfb0a

Please sign in to comment.