Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

♻️ volumes are removed via agent (⚠️ devops) #3941

Closed
Closed
Show file tree
Hide file tree
Changes from 90 commits
Commits
Show all changes
165 commits
Select commit Hold shift + click to select a range
ed19ad0
first version of the RPC client via rabbitmq
Feb 22, 2023
ec3b363
Merge remote-tracking branch 'upstream/master' into pr-osparc-aiopika…
Feb 22, 2023
46e84d3
Merge remote-tracking branch 'upstream/master' into pr-osparc-aiopika…
Feb 23, 2023
7c37064
added a server namespace
Feb 23, 2023
7b24567
moved robust rpc to rabbitmq
Feb 23, 2023
eeb30de
Merge remote-tracking branch 'upstream/master' into pr-osparc-aiopika…
Feb 23, 2023
a91bd89
Merge remote-tracking branch 'upstream/master' into pr-osparc-aiopika…
Feb 23, 2023
8b3a0a3
rabbitmq rpc refactor
Feb 24, 2023
b7a7742
Merge remote-tracking branch 'upstream/master' into pr-osparc-aiopika…
Feb 27, 2023
b4890bd
Merge remote-tracking branch 'upstream/master' into pr-osparc-aiopika…
Feb 28, 2023
bebdbdb
added extra test
Feb 28, 2023
6a5805d
Merge remote-tracking branch 'upstream/master' into pr-osparc-aiopika…
Mar 1, 2023
d1cfab7
fix error when closing channel
Mar 1, 2023
dce584f
add missing type
Mar 1, 2023
6d7ad01
some more progress
Mar 1, 2023
8616e3b
remove extension module
Mar 2, 2023
26f4708
Merge remote-tracking branch 'upstream/master' into pr-osparc-aiopika…
Mar 2, 2023
1241850
Merge remote-tracking branch 'upstream/master' into pr-osparc-aiopika…
Mar 2, 2023
c0a1d8a
Merge remote-tracking branch 'upstream/master' into pr-osparc-aiopika…
Mar 3, 2023
b929d93
added get_namespace
Mar 3, 2023
fe3239b
Merge remote-tracking branch 'origin/pr-osparc-aiopika-solidrpc' into…
Mar 3, 2023
839b84e
refactor
Mar 7, 2023
4ab92e0
added registration helper
Mar 7, 2023
9eeb9bf
Merge remote-tracking branch 'upstream/master' into pr-osparc-aiopika…
Mar 7, 2023
0ebee22
Merge remote-tracking branch 'upstream/master' into pr-osparc-remove-…
Mar 7, 2023
fbd46da
added new module for removing volumes
Mar 7, 2023
9de8990
moved docker volume
Mar 7, 2023
72e1d94
rename test
Mar 7, 2023
22dd56e
injecting a unique nodeid
Mar 7, 2023
1ba1939
adding missing default node id
Mar 7, 2023
2109c67
Merge remote-tracking branch 'upstream/master' into pr-osparc-remove-…
Mar 8, 2023
4f1d38b
Merge remote-tracking branch 'upstream/master' into pr-osparc-aiopika…
Mar 8, 2023
d5f0eb5
refactor import
Mar 8, 2023
fddef59
refactor
Mar 8, 2023
993cef8
refactor
Mar 8, 2023
ca3bdc2
refactor
Mar 8, 2023
d7cf000
moved to helpers
Mar 8, 2023
13fef45
refactor names
Mar 8, 2023
d8f7bcb
refactor
Mar 8, 2023
50a9c17
fix utility
Mar 8, 2023
58b292e
replaced helpers with pydantic models
Mar 8, 2023
0603de9
using hostname
Mar 9, 2023
ede5a09
Merge branch 'master' into pr-osparc-aiopika-solidrpc
GitHK Mar 9, 2023
f6c12cd
refactor task_monitor
Mar 9, 2023
716762d
rename
Mar 9, 2023
2a7b2bb
Merge remote-tracking branch 'upstream/master' into pr-osparc-remove-…
Mar 9, 2023
05ebcad
Merge remote-tracking branch 'origin/pr-osparc-aiopika-solidrpc' into…
Mar 9, 2023
5a79662
Merge remote-tracking branch 'upstream/master' into pr-osparc-aiopika…
Mar 9, 2023
c9b2d0c
Merge remote-tracking branch 'origin/pr-osparc-aiopika-solidrpc' into…
Mar 9, 2023
47db156
updated libraries
Mar 9, 2023
372dde0
agent exposes rpc method to remove volume
Mar 9, 2023
82de021
remove defaults
Mar 9, 2023
b70e0ef
Merge remote-tracking branch 'upstream/master' into pr-osparc-remove-…
Mar 9, 2023
e31ab56
not compatible with current
Mar 9, 2023
bbbcfd2
making more clear what is happening
Mar 10, 2023
722f7e2
making rabbitmq required
Mar 10, 2023
fcb27d0
rabbitmq is required not optional
Mar 10, 2023
c34f164
rabbitmq required for agent
Mar 10, 2023
c9fd018
refactor removal via agent
Mar 10, 2023
829538b
revert todo
Mar 10, 2023
9aeb52c
Merge remote-tracking branch 'upstream/master' into pr-osparc-remove-…
Mar 10, 2023
5d5ba45
fix bug
Mar 10, 2023
8ef3dff
moved test to integration
Mar 10, 2023
bc08176
refactor upgrade deprecation
Mar 10, 2023
13a0431
reverting
Mar 10, 2023
0cf893d
update requirements
Mar 10, 2023
43d3a6b
replacing error with warning
Mar 10, 2023
1cb5547
refactor
Mar 10, 2023
1086b1f
refactor
Mar 10, 2023
5f7a8c7
revert rabbitmq mandatory
Mar 10, 2023
c9d2682
refactor volume removal to support concurrent requests
Mar 10, 2023
05d5bd1
Merge remote-tracking branch 'upstream/master' into pr-osparc-remove-…
Mar 10, 2023
3af40f1
fix docstring
Mar 10, 2023
73d0d5c
added more tests
Mar 10, 2023
5902de6
fix broken tests
Mar 10, 2023
bdd5696
fix broken test
Mar 10, 2023
a1f2166
gathering all errors and raising them
Mar 10, 2023
7bd5347
fixed issue with double try to remove volumes
Mar 13, 2023
b972bf3
Merge remote-tracking branch 'upstream/master' into pr-osparc-remove-…
Mar 13, 2023
04296fd
fixed errors
Mar 13, 2023
dd872b1
rename fixture
Mar 13, 2023
4ccdcc3
attempt to fix failing test
Mar 13, 2023
d5de4cc
trying to make test more reliable
Mar 13, 2023
b941db0
not an error
Mar 13, 2023
c9ecaa1
fix test
Mar 13, 2023
c2e3336
Merge remote-tracking branch 'upstream/master' into pr-osparc-remove-…
Mar 13, 2023
9e9da7e
adding timeout for parallel operation
Mar 13, 2023
4ca56fe
making call resilient
Mar 13, 2023
ccb86fa
fix import order
Mar 13, 2023
8abadde
rabbitmq data is persisted to volume
Mar 13, 2023
b845efc
fixing log message
Mar 14, 2023
dc89358
Merge remote-tracking branch 'upstream/master' into pr-osparc-remove-…
Mar 14, 2023
ea12a8c
added missing test
Mar 14, 2023
34206d5
renamed timeouts and updated docs
Mar 14, 2023
c09d2e1
fixed wrong error matching
Mar 14, 2023
6567b74
refactor to use individual timeouts
Mar 14, 2023
d3af6b2
refactor agent rabbit test
Mar 14, 2023
7694bca
refactor using explicit timeouts
Mar 14, 2023
9e922c4
added policy
Mar 14, 2023
61e77aa
using correct wait policy
Mar 14, 2023
460868f
renamed to RPCExceptionGroup
Mar 14, 2023
d7f0cac
refactor volume removal request
Mar 14, 2023
e08dbfa
Merge remote-tracking branch 'upstream/master' into pr-osparc-remove-…
Mar 14, 2023
91dadda
using swarm_stack_name
Apr 19, 2023
d9d0a32
Merge remote-tracking branch 'upstream/master' into pr-osparc-remove-…
Apr 19, 2023
401a8e3
Merge remote-tracking branch 'upstream/master' into pr-osparc-remove-…
Jun 1, 2023
ee02e63
Merge remote-tracking branch 'upstream/master' into pr-osparc-remove-…
Jun 5, 2023
a32a84c
refactor after merge
Jun 5, 2023
6fbacb5
refactor
Jun 5, 2023
4b4f9f0
fixed agent
Jun 5, 2023
a2f5727
fixed issue in servicelib
Jun 5, 2023
0869bae
Merge remote-tracking branch 'upstream/master' into pr-osparc-remove-…
Jun 5, 2023
32d9f47
refactor tests
Jun 5, 2023
1974ee0
making test portable
Jun 5, 2023
42611a2
refactor
Jun 5, 2023
40e242b
refactor
Jun 5, 2023
c12a46e
skipping failing test
Jun 5, 2023
22fc0d7
reverting
Jun 5, 2023
7534986
Merge remote-tracking branch 'upstream/master' into pr-osparc-remove-…
Jun 6, 2023
b2295f7
added base serial executor
Jun 6, 2023
4ace2b4
added VolumeUtils
Jun 6, 2023
cfc0b53
add is_volume_present
Jun 7, 2023
5fc9348
upgraded serial executor
Jun 7, 2023
fb06df7
added parallel different key
Jun 7, 2023
0b32885
refactor dy_sidecar shared store
Jun 7, 2023
941221f
removed
Jun 7, 2023
537eec9
renamed
Jun 7, 2023
98970eb
refactored tests
Jun 7, 2023
3ea3bd4
fixed dy-sidecar
Jun 8, 2023
14e8ecc
added legacy format
Jun 8, 2023
40a433e
refactor _core
Jun 14, 2023
f9df804
remove comments
Jun 14, 2023
c132d9b
refactor
Jun 14, 2023
ad73c4e
refactor
Jun 14, 2023
a6880be
removed
Jun 14, 2023
516a0e9
refactor
Jun 14, 2023
f594433
refactor volume removal
Jun 14, 2023
157d46d
refactor to accept timeout
Jun 14, 2023
72709bf
no longer required
Jun 14, 2023
be43fbf
refactor to use new internals
Jun 14, 2023
a112e08
removed cast
Jun 14, 2023
7ccc90f
refactored
Jun 14, 2023
354e627
using correct import
Jun 14, 2023
ed47a82
fixed imports
Jun 14, 2023
52d9cb5
refactor test
Jun 14, 2023
25d066e
refactored task_monitor again
Jun 14, 2023
724a017
Merge remote-tracking branch 'upstream/master' into pr-osparc-remove-…
Jun 14, 2023
1dab67a
refactor
Jun 14, 2023
1d99c0a
moved task registration to separate module
Jun 14, 2023
4094ed4
refactor
Jun 14, 2023
3a9ffac
refactor test
Jun 15, 2023
74deb28
extract to utils
Jun 15, 2023
0d32514
refactor
Jun 15, 2023
586c8d5
more refactoring
Jun 15, 2023
423ba11
refactored rabbitmq to work with volumes
Jun 15, 2023
acecb5a
refactor tests and removal
Jun 15, 2023
4f58a3f
refactor to pull image
Jun 16, 2023
ecc33f6
drop comment
Jun 16, 2023
5cc16ad
restructure import paths
Jun 16, 2023
ddd74e7
refactor
Jun 16, 2023
571db81
refactor
Jun 16, 2023
e49bd7e
update message
Jun 16, 2023
422071f
refactor
Jun 16, 2023
169672c
refactored more tests
Jun 16, 2023
de6916d
Merge remote-tracking branch 'upstream/master' into pr-osparc-remove-…
Jun 16, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 46 additions & 20 deletions packages/service-library/src/servicelib/rabbitmq.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,17 @@
from typing import Any, Awaitable, Callable, Final, Optional

import aio_pika
from aio_pika.exceptions import ChannelClosed
from aio_pika.patterns import RPC
from pydantic import PositiveInt
from aiormq import AMQPConnectionError
from packaging.version import Version
from pydantic import PositiveFloat
from servicelib.logging_utils import log_context
from settings_library.rabbit import RabbitSettings
from tenacity._asyncio import AsyncRetrying
from tenacity.before import before_log
from tenacity.retry import retry_if_exception
from tenacity.stop import stop_after_delay
from tenacity.wait import wait_random

from .rabbitmq_errors import RemoteMethodNotRegisteredError, RPCNotInitializedError
from .rabbitmq_utils import RPCMethodName, RPCNamespace, RPCNamespacedMethodName
Expand All @@ -30,18 +36,12 @@ def _connection_close_callback(sender: Any, exc: Optional[BaseException]) -> Non
)


def _channel_close_callback(sender: Any, exc: Optional[BaseException]) -> None:
def _channel_close_callback(_: Any, exc: Optional[BaseException]) -> None:
if exc:
if isinstance(exc, asyncio.CancelledError):
log.info("Rabbit channel was cancelled")
elif isinstance(exc, ChannelClosed):
log.info("%s", exc)
else:
log.error(
"Rabbit channel closed with exception from %s:%s",
sender,
exc,
)
log.error("Rabbit channel closed with exception from %s", exc)


async def _get_connection(
Expand Down Expand Up @@ -93,7 +93,13 @@ async def rpc_initialize(self) -> None:
)
self._rpc_channel = await self._rpc_connection.channel()

self._rpc = RPC(self._rpc_channel, host_exceptions=True)
if Version(aio_pika.__version__) >= Version("9.0.4"):
log.warning(
"When all libraries have `aio_pika>=9.0.4` use:"
"`self._rpc = RPC(self._rpc_channel, host_exceptions=True)`"
"on the line below"
)
self._rpc = RPC(self._rpc_channel)
await self._rpc.initialize()

async def close(self) -> None:
Expand Down Expand Up @@ -190,13 +196,19 @@ async def rpc_request(
namespace: RPCNamespace,
method_name: RPCMethodName,
*,
timeout_s: Optional[PositiveInt] = 5,
timeout_s: PositiveFloat = 5,
connection_error_timeout_s: PositiveFloat = 60,
**kwargs: dict[str, Any],
) -> Any:
"""
Call a remote registered `handler` by providing it's `namespace`, `method_name`
and `kwargs` containing the key value arguments expected by the remote `handler`.

param: `timeout_s` amount of seconds to wait for a reply once the message
was accepted by the remove replier
param: `connection_error_timeout_s` amount of seconds to wait for rabbit to
be available again in case there was a connection error

:raises asyncio.TimeoutError: when message expired
:raises CancelledError: when called :func:`RPC.cancel`
:raises RuntimeError: internal error
Expand All @@ -211,13 +223,21 @@ async def rpc_request(
namespace, method_name
)
try:
queue_expiration_timeout = timeout_s
awaitable = self._rpc.call(
namespaced_method_name,
expiration=queue_expiration_timeout,
kwargs=kwargs,
)
return await asyncio.wait_for(awaitable, timeout=timeout_s)
async for attempt in AsyncRetrying(
wait=wait_random(2),
stop=stop_after_delay(connection_error_timeout_s),
retry=retry_if_exception(AMQPConnectionError),
before=before_log(log, logging.DEBUG),
reraise=True,
):
with attempt:
queue_expiration_timeout = timeout_s
awaitable = self._rpc.call(
namespaced_method_name,
expiration=queue_expiration_timeout,
kwargs=kwargs,
)
return await asyncio.wait_for(awaitable, timeout=timeout_s)
except aio_pika.MessageProcessError as e:
if e.args[0] == "Message has been returned":
raise RemoteMethodNotRegisteredError(
Expand All @@ -239,8 +259,14 @@ async def rpc_register_handler(
if self._rpc is None:
raise RPCNotInitializedError()

namespaced_method_name = RPCNamespacedMethodName.from_namespace_and_method(
namespace, method_name
)
log.info(
"RPC registered handler '%s' to queue '%s'", handler, namespaced_method_name
)
await self._rpc.register(
RPCNamespacedMethodName.from_namespace_and_method(namespace, method_name),
namespaced_method_name,
handler,
auto_delete=True,
)
Expand Down
10 changes: 10 additions & 0 deletions packages/service-library/src/servicelib/rabbitmq_errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,13 @@ class RemoteMethodNotRegisteredError(BaseRPCError):
"Could not find a remote method named: '{method_name}'. "
"Message from remote server was returned: {incoming_message}. "
)


class GatheredRuntimeErrors(BaseRPCError):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a pretty weird name. what is it for? from your code it looks like you put all the errors together. but what is the idea?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since ExceptionGroup is coming to py 3.11 and the back port library does not support the syntax for it I have tried to group errors and raise them together.
Let me rename it a bit.

"""
Has to be defined in both `requester` and `replier`
source codes to work as expected.
"""

code = f"{_ERROR_PREFIX}.executing_raised_errors"
msg_template = "The following errors occurred: {errors}"
57 changes: 56 additions & 1 deletion packages/service-library/tests/test_rabbitmq_rpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@
# pylint:disable=unused-argument

import asyncio
from typing import Any, Awaitable, Final
from typing import Any, Awaitable, Callable, Final

import pytest
from docker.client import DockerClient
from docker.models.containers import Container
from pydantic import NonNegativeInt, ValidationError
from pytest import LogCaptureFixture
from servicelib.rabbitmq import RabbitMQClient
Expand All @@ -21,6 +23,8 @@

MULTIPLE_REQUESTS_COUNT: Final[NonNegativeInt] = 100

# FIXTURES


@pytest.fixture
def namespace() -> RPCNamespace:
Expand All @@ -43,6 +47,32 @@ async def rabbit_replier(rabbit_service: RabbitSettings) -> RabbitMQClient:
await client.close()


@pytest.fixture
def restart_rabbit(
docker_stack: dict,
testing_environ_vars: dict,
docker_client: DockerClient,
) -> Callable:
prefix = testing_environ_vars["SWARM_STACK_NAME"]
service_name = f"{prefix}_rabbit"
assert service_name in docker_stack["services"]

async def _reboot() -> None:
containers = docker_client.containers.list(
filters={"label": f"com.docker.swarm.service.name={service_name}"}
)
assert len(containers) == 1
container: Container = containers[0]
# killing the container will cause the service to be unavailable
# and swarm to restart it. Exactly what we are trying to test
container.kill()

return _reboot


# UTILS


async def add_me(*, x: Any, y: Any) -> Any:
result = x + y
# NOTE: types are not enforced
Expand All @@ -65,6 +95,9 @@ def __add__(self, other: "CustomClass") -> "CustomClass":
return CustomClass(x=self.x + other.x, y=self.y + other.y)


# TESTS


@pytest.mark.parametrize(
"x,y,expected_result,expected_type",
[
Expand Down Expand Up @@ -369,3 +402,25 @@ async def _a_handler() -> None:
assert "ensure this value has at most 255 characters" in f"{exec_info.value}"
else:
await rabbit_replier.rpc_register_handler("a", handler_name, _a_handler)


async def test_rabbit_unavailable_just_before_request(
rabbit_requester: RabbitMQClient,
rabbit_replier: RabbitMQClient,
namespace: RPCNamespace,
restart_rabbit: Callable,
):
times_called = 0

async def _func() -> None:
nonlocal times_called
times_called += 1

await rabbit_replier.rpc_register_handler(namespace, _func.__name__, _func)

await restart_rabbit()

# this function will be retried because rabbitmq is restarting
await rabbit_requester.rpc_request(namespace, _func.__name__)

assert times_called == 1
1 change: 1 addition & 0 deletions services/agent/requirements/_base.in
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
--requirement ../../../packages/settings-library/requirements/_base.in
--requirement ../../../packages/service-library/requirements/_base.in

aio-pika
aiodocker
fastapi
packaging
Expand Down
8 changes: 5 additions & 3 deletions services/agent/requirements/_base.txt
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
#
# This file is autogenerated by pip-compile with python 3.9
# To update, run:
# This file is autogenerated by pip-compile with Python 3.9
# by the following command:
#
# pip-compile --output-file=requirements/_base.txt --resolver=backtracking --strip-extras requirements/_base.in
#
aio-pika==8.2.5
# via -r requirements/../../../packages/service-library/requirements/_base.in
# via
# -r requirements/../../../packages/service-library/requirements/_base.in
# -r requirements/_base.in
aiodebug==2.3.0
# via -r requirements/../../../packages/service-library/requirements/_base.in
aiodocker==0.21.0
Expand Down
1 change: 1 addition & 0 deletions services/agent/requirements/_test.in
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,4 @@ pytest-asyncio
pytest-cov
pytest-mock
pytest-runner
python-dotenv
2 changes: 2 additions & 0 deletions services/agent/requirements/_test.txt
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,8 @@ python-dateutil==2.8.2
# botocore
# faker
# moto
python-dotenv==1.0.0
# via -r requirements/_test.in
python-jose==3.3.0
# via moto
pyyaml==5.4.1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
SUMMARY,
VERSION,
)
from ..modules import task_monitor
from ..modules import low_priority_managers, rabbitmq, task_monitor
from ._routes import router
from .settings import ApplicationSettings

Expand Down Expand Up @@ -50,8 +50,10 @@ def create_app() -> FastAPI:
# ROUTERS
app.include_router(router)

# EVENTS
# SUBMODULES
low_priority_managers.setup(app)
task_monitor.setup(app)
rabbitmq.setup(app)

async def _on_startup() -> None:
print(APP_STARTED_BANNER_MSG, flush=True)
Expand Down
10 changes: 10 additions & 0 deletions services/agent/src/simcore_service_agent/core/errors.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
from pydantic.errors import PydanticErrorMixin


class AgentRuntimeError(PydanticErrorMixin, RuntimeError):
msg_template: str = "Autoscaling unexpected error"


class ConfigurationError(AgentRuntimeError):
code: str = "agent.application_configuration"
msg_template: str = "Application misconfiguration: {msg}"
4 changes: 4 additions & 0 deletions services/agent/src/simcore_service_agent/core/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from pydantic import Field, NonNegativeInt, validator
from settings_library.base import BaseCustomSettings
from settings_library.r_clone import S3Provider
from settings_library.rabbit import RabbitSettings
from settings_library.utils_logging import MixinLoggingSettings

_MINUTE: Final[NonNegativeInt] = 60
Expand Down Expand Up @@ -39,6 +40,9 @@ class ApplicationSettings(BaseCustomSettings, MixinLoggingSettings):
60 * _MINUTE, description="interval at which to repeat volumes cleanup"
)

AGENT_DOCKER_NODE_ID: str = Field(..., description="used by the rabbitmq module")
AGENT_RABBITMQ: Optional[RabbitSettings] = Field(auto_default_from_env=True)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should it be optional? I think you pretty much rely on this now no?


@validator("AGENT_VOLUMES_CLEANUP_S3_ENDPOINT", pre=True)
@classmethod
def ensure_scheme(cls, v: str, values) -> str:
Expand Down
Loading