Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MAX_CYCLE_LIFETIME_IN_SECONDS only for cycle logic #523

Merged
merged 11 commits into from
Jan 10, 2025
144 changes: 88 additions & 56 deletions src/modules/submodules/oracle_module.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
from src import variables
from src.types import SlotNumber, BlockStamp, BlockRoot


logger = logging.getLogger(__name__)


Expand All @@ -35,6 +34,39 @@ class ModuleExecuteDelay(Enum):
NEXT_FINALIZED_EPOCH = 1


def _handle_error(error):
"""Handle exceptions and log messages based on exception type."""
error_mapping = {
IsNotMemberException: 'Provided account is not part of Oracle`s committee.',
IncompatibleOracleVersion: 'Incompatible Contract version. Please update Oracle Daemon.',
DecoratorTimeoutError: 'Oracle module do not respond.',
NoActiveProviderError: 'No active provider available.',
RequestsConnectionError: 'Connection error.',
NotOkResponse: 'Invalid response from server.',
(NoSlotsAvailable, SlotNotFinalized, InconsistentData): 'Inconsistent response from consensus layer node.',
KeysOutdatedException: 'Keys API service returns outdated data.',
CountOfKeysDiffersException: 'Keys API service returned incorrect number of keys.',
Web3Exception: 'Web3py exception.',
IPFSError: 'IPFS provider error.',
ValueError: 'Unexpected error.',
}

for exception_type, message in error_mapping.items():
if isinstance(error, exception_type):
if isinstance(error, NotOkResponse):
logger.error({'msg': ''.join(traceback.format_exception(error))})
return
# Reraise specific exceptions
if isinstance(error, (IsNotMemberException, IncompatibleOracleVersion)):
logger.error({'msg': message})
raise error
logger.error({'msg': message, 'error': str(error)})
return # Handled exception; no further action needed

# Reraise unhandled exceptions
raise error


class BaseModule(ABC):
"""
Base skeleton for Oracle modules.
Expand All @@ -57,25 +89,51 @@ def run_as_daemon(self):
logger.debug({'msg': 'Startup new cycle.'})
self.cycle_handler()

@timeout(variables.MAX_CYCLE_LIFETIME_IN_SECONDS)
def cycle_handler(self):
blockstamp = self._receive_last_finalized_slot()

if blockstamp.slot_number > self._slot_threshold:
if self.w3.lido_contracts.has_contract_address_changed():
clear_global_cache()
self.refresh_contracts()
result = self.run_cycle(blockstamp)

if result is ModuleExecuteDelay.NEXT_FINALIZED_EPOCH:
self._slot_threshold = blockstamp.slot_number
else:
logger.info({
'msg': 'Skipping the report. Wait for new finalized slot.',
'slot_threshold': self._slot_threshold,
})

logger.info({'msg': f'Cycle end. Sleep for {variables.CYCLE_SLEEP_IN_SECONDS} seconds.'})
self._cycle()
self._sleep_cycle()

@timeout(variables.MAX_CYCLE_LIFETIME_IN_SECONDS)
def _cycle(self):
"""
Main cycle logic: fetch the last finalized slot, refresh contracts if necessary,
and execute the module's business logic.
"""
try:
blockstamp = self._receive_last_finalized_slot()

# Check if the blockstamp is below the threshold and exit early
if blockstamp.slot_number <= self._slot_threshold:
logger.info({
'msg': 'Skipping the report. Waiting for new finalized slot.',
'slot_threshold': self._slot_threshold,
})
return

self.refresh_contracts_if_address_change()
self.run_cycle(blockstamp)
except (
IsNotMemberException,
IncompatibleOracleVersion,
DecoratorTimeoutError,
NoActiveProviderError,
RequestsConnectionError,
NotOkResponse,
NoSlotsAvailable,
SlotNotFinalized,
InconsistentData,
KeysOutdatedException,
CountOfKeysDiffersException,
Web3Exception,
IPFSError,
ValueError
) as exception:
_handle_error(exception)

@staticmethod
def _sleep_cycle():
"""Handles sleeping between cycles based on the configured cycle sleep time."""
logger.info({'msg': f'Cycle end. Sleeping for {variables.CYCLE_SLEEP_IN_SECONDS} seconds.'})
time.sleep(variables.CYCLE_SLEEP_IN_SECONDS)

def _receive_last_finalized_slot(self) -> BlockStamp:
Expand All @@ -87,44 +145,12 @@ def _receive_last_finalized_slot(self) -> BlockStamp:
ORACLE_BLOCK_NUMBER.labels('finalized').set(bs.block_number)
return bs

def run_cycle(self, blockstamp: BlockStamp) -> ModuleExecuteDelay:
# pylint: disable=too-many-branches
def run_cycle(self, blockstamp: BlockStamp):
logger.info({'msg': 'Execute module.', 'value': blockstamp})

try:
result = self.execute_module(blockstamp)
except IsNotMemberException as exception:
logger.error({'msg': 'Provided account is not part of Oracle`s committee.'})
raise exception
except IncompatibleOracleVersion as exception:
logger.error({'msg': 'Incompatible Contract version. Please update Oracle Daemon.'})
raise exception
except DecoratorTimeoutError as exception:
logger.error({'msg': 'Oracle module do not respond.', 'error': str(exception)})
except NoActiveProviderError as error:
logger.error({'msg': ''.join(traceback.format_exception(error))})
except RequestsConnectionError as error:
logger.error({'msg': 'Connection error.', 'error': str(error)})
except NotOkResponse as error:
logger.error({'msg': ''.join(traceback.format_exception(error))})
except (NoSlotsAvailable, SlotNotFinalized, InconsistentData) as error:
logger.error({'msg': 'Inconsistent response from consensus layer node.', 'error': str(error)})
except KeysOutdatedException as error:
logger.error({'msg': 'Keys API service returns outdated data.', 'error': str(error)})
except CountOfKeysDiffersException as error:
logger.error({'msg': 'Keys API service returned incorrect number of keys.', 'error': str(error)})
except Web3Exception as error:
logger.error({'msg': 'Web3py exception.', 'error': str(error)})
except IPFSError as error:
logger.error({'msg': 'IPFS provider error.', 'error': str(error)})
except ValueError as error:
logger.error({'msg': 'Unexpected error.', 'error': str(error)})
else:
# if there are no exceptions, then pulse
pulse()
return result

return ModuleExecuteDelay.NEXT_SLOT
result = self.execute_module(blockstamp)
pulse()
if result is ModuleExecuteDelay.NEXT_FINALIZED_EPOCH:
self._slot_threshold = blockstamp.slot_number

@abstractmethod
def execute_module(self, last_finalized_blockstamp: BlockStamp) -> ModuleExecuteDelay:
Expand All @@ -140,3 +166,9 @@ def execute_module(self, last_finalized_blockstamp: BlockStamp) -> ModuleExecute
def refresh_contracts(self):
"""This method called if contracts addresses were changed"""
raise NotImplementedError('Module should implement this method.') # pragma: no cover

def refresh_contracts_if_address_change(self):
# Refresh contracts if the address has changed
if self.w3.lido_contracts.has_contract_address_changed():
clear_global_cache()
self.refresh_contracts()
68 changes: 39 additions & 29 deletions tests/modules/submodules/test_oracle_module.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from unittest.mock import Mock
from unittest.mock import Mock, patch, MagicMock
from typing import Type

import pytest
Expand Down Expand Up @@ -92,41 +92,51 @@ def _throw_on_third_call():
@pytest.mark.parametrize(
"ex",
[
DecoratorTimeoutError,
NoActiveProviderError,
RequestsConnectionError,
NotOkResponse,
NoSlotsAvailable,
SlotNotFinalized,
InconsistentData,
KeysOutdatedException,
DecoratorTimeoutError("Fake exception"),
NoActiveProviderError("Fake exception"),
RequestsConnectionError("Fake exception"),
NotOkResponse(status=500, text="Fake exception"),
NoSlotsAvailable("Fake exception"),
SlotNotFinalized("Fake exception"),
InconsistentData("Fake exception"),
KeysOutdatedException("Fake exception"),
],
ids=lambda param: f"{type(param).__name__}",
)
def test_run_cycle_no_fail_on_retryable_error(oracle: BaseModule, ex: Type[Exception]):
def _throw_with(*args):
if ex is NotOkResponse:
raise ex(status=500, text="Fake exception") # type: ignore
raise ex("Fake exception")

oracle.execute_module = Mock(side_effect=_throw_with)

ret = oracle.run_cycle(ReferenceBlockStampFactory.build())
assert ret is ModuleExecuteDelay.NEXT_SLOT
def test_cycle_no_fail_on_retryable_error(oracle: BaseModule, ex: Exception):
oracle.w3.lido_contracts = MagicMock()
with patch.object(
oracle, "_receive_last_finalized_slot", return_value=MagicMock(slot_number=1111111)
), patch.object(oracle.w3.lido_contracts, "has_contract_address_changed", return_value=False), patch.object(
oracle, "execute_module", side_effect=ex
):
oracle._cycle()
# test node availability
with patch.object(oracle, "_receive_last_finalized_slot", side_effect=ex):
oracle._cycle()


@pytest.mark.unit
@pytest.mark.parametrize(
"ex",
[
IsNotMemberException,
IncompatibleOracleVersion,
IsNotMemberException("Fake exception"),
IncompatibleOracleVersion("Fake exception"),
],
ids=lambda param: f"{type(param).__name__}",
)
def test_run_cycle_fails_on_critical_exceptions(oracle: BaseModule, ex: Type[Exception]):
def _throw_with(*args):
raise ex("Fake exception")

oracle.execute_module = Mock(side_effect=_throw_with)

with pytest.raises(ex, match="Fake exception"):
oracle.run_cycle(ReferenceBlockStampFactory.build())
def test_run_cycle_fails_on_critical_exceptions(oracle: BaseModule, ex: Exception):
oracle.w3.lido_contracts = MagicMock()
with patch.object(
oracle, "_receive_last_finalized_slot", return_value=MagicMock(slot_number=1111111)
), patch.object(oracle.w3.lido_contracts, "has_contract_address_changed", return_value=False), patch.object(
oracle, "execute_module", side_effect=ex
), pytest.raises(
type(ex), match="Fake exception"
):
oracle._cycle()
# test node availability
with patch.object(oracle, "_receive_last_finalized_slot", side_effect=ex), pytest.raises(
type(ex), match="Fake exception"
):
oracle._cycle()
Loading