Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

director-v2's dynamic scheduler monitoring task runs exclusively #5401

Merged
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
notifier,
osparc_variables_substitutions,
rabbitmq,
redis,
resource_usage_tracker_client,
socketio,
storage,
Expand Down Expand Up @@ -165,6 +166,7 @@ def init_app(settings: AppSettings | None = None) -> FastAPI:
rabbitmq.setup(app)

if dynamic_scheduler_enabled:
redis.setup(app)
dynamic_sidecar.setup(app)
api_keys_manager.setup(app)
socketio.setup(app)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from datetime import timedelta
from typing import Final

from models_library.projects_networks import DockerNetworkName
Expand All @@ -10,8 +11,9 @@
class DynamicServicesSchedulerSettings(BaseCustomSettings):
DIRECTOR_V2_DYNAMIC_SCHEDULER_ENABLED: bool = True

DIRECTOR_V2_DYNAMIC_SCHEDULER_INTERVAL_SECONDS: PositiveFloat = Field(
5.0, description="interval at which the scheduler cycle is repeated"
DIRECTOR_V2_DYNAMIC_SCHEDULER_INTERVAL: timedelta = Field(
timedelta(seconds=5),
description="interval at which the scheduler cycle is repeated",
)

DIRECTOR_V2_DYNAMIC_SCHEDULER_PENDING_VOLUME_REMOVAL_INTERVAL_S: PositiveFloat = (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,9 @@
from models_library.users import UserID
from pydantic import BaseModel, StrBytes, parse_obj_as
from servicelib.rabbitmq import RabbitMQRPCClient
from servicelib.redis import RedisClientSDK
from servicelib.redis import RedisClientSDK, RedisClientsManager
from settings_library.redis import RedisDatabase

from ..core.settings import AppSettings
from ..utils.base_distributed_identifier import BaseDistributedIdentifierManager
from .rabbitmq import get_rabbitmq_rpc_client

Expand Down Expand Up @@ -146,13 +145,11 @@ def _get_api_keys_manager(app: FastAPI) -> APIKeysManager:

def setup(app: FastAPI) -> None:
async def on_startup() -> None:
settings: AppSettings = app.state.settings
redis_clients_manager: RedisClientsManager = app.state.redis_clients_manager

redis_dsn = settings.REDIS.build_redis_dsn(
RedisDatabase.DISTRIBUTED_IDENTIFIERS
app.state.api_keys_manager = manager = APIKeysManager(
app, redis_clients_manager.client(RedisDatabase.DISTRIBUTED_IDENTIFIERS)
)
redis_client_sdk = RedisClientSDK(redis_dsn)
app.state.api_keys_manager = manager = APIKeysManager(app, redis_client_sdk)

await manager.setup()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ async def observing_single_service(
# docker swarm engine API.
_trigger_every_30_seconds(
scheduler._observation_counter, # pylint:disable=protected-access # noqa: SLF001
dynamic_scheduler.DIRECTOR_V2_DYNAMIC_SCHEDULER_INTERVAL_SECONDS,
dynamic_scheduler.DIRECTOR_V2_DYNAMIC_SCHEDULER_INTERVAL.total_seconds(),
)
and await is_dynamic_sidecar_stack_missing(
scheduler_data.node_uuid, dynamic_scheduler.SWARM_STACK_NAME
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import contextlib
import functools
import logging
from asyncio import Lock, Queue, Task, sleep
from asyncio import Lock, Queue, Task
from dataclasses import dataclass, field
from typing import Final

Expand All @@ -36,9 +36,16 @@
from models_library.users import UserID
from models_library.wallets import WalletID
from pydantic import AnyHttpUrl, NonNegativeFloat
from servicelib.background_task import cancel_task
from servicelib.background_task import (
cancel_task,
start_periodic_task,
stop_periodic_task,
)
from servicelib.fastapi.long_running_tasks.client import ProgressCallback
from servicelib.fastapi.long_running_tasks.server import TaskProgress
from servicelib.redis import RedisClientsManager
from servicelib.redis_utils import exclusive
from settings_library.redis import RedisDatabase

from .....core.dynamic_services_settings.scheduler import (
DynamicServicesSchedulerSettings,
Expand Down Expand Up @@ -74,7 +81,6 @@ class Scheduler( # pylint: disable=too-many-instance-attributes, too-many-publi
_service_observation_task: dict[ServiceName, asyncio.Task | object | None] = field(
default_factory=dict
)
_keep_running: bool = False
_inverse_search_mapping: dict[NodeID, ServiceName] = field(default_factory=dict)
_scheduler_task: Task | None = None
_cleanup_volume_removal_services_task: Task | None = None
Expand All @@ -85,10 +91,23 @@ class Scheduler( # pylint: disable=too-many-instance-attributes, too-many-publi
async def start(self) -> None:
# run as a background task
logger.info("Starting dynamic-sidecar scheduler")
self._keep_running = True
self._scheduler_task = asyncio.create_task(
self._run_scheduler_task(), name="dynamic-scheduler"

redis_clients_manager: RedisClientsManager = (
self.app.state.redis_clients_manager
)

settings: DynamicServicesSchedulerSettings = (
self.app.state.settings.DYNAMIC_SERVICES.DYNAMIC_SCHEDULER
)
self._scheduler_task = start_periodic_task(
exclusive(
redis_clients_manager.client(RedisDatabase.LOCKS),
lock_key="director-v2_dynamic-scheduler_task",
)(self._run_scheduler_task),
interval=settings.DIRECTOR_V2_DYNAMIC_SCHEDULER_INTERVAL,
task_name="dynamic-scheduler",
)

self._trigger_observation_queue_task = asyncio.create_task(
self._run_trigger_observation_queue_task(),
name="dynamic-scheduler-trigger-obs-queue",
Expand All @@ -102,7 +121,6 @@ async def start(self) -> None:

async def shutdown(self) -> None:
logger.info("Shutting down dynamic-sidecar scheduler")
self._keep_running = False
self._inverse_search_mapping = {}
self._to_observe = {}

Expand All @@ -113,9 +131,7 @@ async def shutdown(self) -> None:
self._cleanup_volume_removal_services_task = None

if self._scheduler_task is not None:
self._scheduler_task.cancel()
with contextlib.suppress(asyncio.CancelledError):
await self._scheduler_task
await stop_periodic_task(self._scheduler_task, timeout=5)
self._scheduler_task = None

if self._trigger_observation_queue_task is not None:
Expand Down Expand Up @@ -553,32 +569,20 @@ async def _run_trigger_observation_queue_task(self) -> None:
logger.info("Scheduler 'trigger observation queue task' was shut down")

async def _run_scheduler_task(self) -> None:
settings: DynamicServicesSchedulerSettings = (
self.app.state.settings.DYNAMIC_SERVICES.DYNAMIC_SCHEDULER
)
logger.debug(
"dynamic-sidecars observation interval %s",
settings.DIRECTOR_V2_DYNAMIC_SCHEDULER_INTERVAL_SECONDS,
)

while self._keep_running:
logger.debug("Observing dynamic-sidecars %s", list(self._to_observe.keys()))
logger.debug("Observing dynamic-sidecars %s", list(self._to_observe.keys()))

try:
# prevent access to self._to_observe
async with self._lock:
for service_name in self._to_observe:
self._enqueue_observation_from_service_name(service_name)
except asyncio.CancelledError: # pragma: no cover
logger.info("Stopped dynamic scheduler")
raise
except Exception: # pylint: disable=broad-except
logger.exception(
"Unexpected error while scheduling sidecars observation"
)

await sleep(settings.DIRECTOR_V2_DYNAMIC_SCHEDULER_INTERVAL_SECONDS)
self._observation_counter += 1
try:
# prevent access to self._to_observe
async with self._lock:
for service_name in self._to_observe:
self._enqueue_observation_from_service_name(service_name)
except asyncio.CancelledError: # pragma: no cover
logger.info("Stopped dynamic scheduler")
raise
except Exception: # pylint: disable=broad-except
logger.exception("Unexpected error while scheduling sidecars observation")

self._observation_counter += 1

async def free_reserved_disk_space(self, node_id: NodeID) -> None:
sidecars_client: SidecarsClient = await get_sidecars_client(self.app, node_id)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
from fastapi import FastAPI
from servicelib.redis import RedisClientsManager
from settings_library.redis import RedisDatabase

from ..core.settings import AppSettings


def setup(app: FastAPI) -> None:
async def on_startup() -> None:
settings: AppSettings = app.state.settings

app.state.redis_clients_manager = redis_clients_manager = RedisClientsManager(
databases={
RedisDatabase.LOCKS,
RedisDatabase.DISTRIBUTED_IDENTIFIERS,
},
settings=settings.REDIS,
)
await redis_clients_manager.setup()

async def on_shutdown() -> None:
redis_clients_manager: RedisClientsManager = app.state.redis_clients_manager
await redis_clients_manager.shutdown()

app.add_event_handler("startup", on_startup)
app.add_event_handler("shutdown", on_shutdown)
40 changes: 40 additions & 0 deletions services/director-v2/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
# pylint: disable=unused-argument
# pylint: disable=unused-variable

import functools
import json
import logging
import os
Expand Down Expand Up @@ -291,3 +292,42 @@ def mocked_service_awaits_manual_interventions(mocker: MockerFixture) -> None:
autospec=True,
return_value=False,
)


@pytest.fixture
def mock_redis(mocker: MockerFixture) -> None:
def _mock_setup(app: FastAPI) -> None:
def _mock_client(*args, **kwargs) -> AsyncMock:
return AsyncMock()

mock = AsyncMock()
mock.client = _mock_client

async def on_startup() -> None:
app.state.redis_clients_manager = mock

app.add_event_handler("startup", on_startup)

mocker.patch(
"simcore_service_director_v2.modules.redis.setup", side_effect=_mock_setup
)


@pytest.fixture
def mock_exclusive(mock_redis: None, mocker: MockerFixture) -> None:
def _mock_exclusive(
_: Any, *, lock_key: str, lock_value: bytes | str | None = None
):
def decorator(func):
@functools.wraps(func)
async def wrapper(*args, **kwargs):
return await func(*args, **kwargs)

return wrapper

return decorator

module_base = (
"simcore_service_director_v2.modules.dynamic_sidecar.scheduler._core._scheduler"
)
mocker.patch(f"{module_base}.exclusive", side_effect=_mock_exclusive)
Original file line number Diff line number Diff line change
Expand Up @@ -209,10 +209,10 @@ async def ensure_services_stopped(
assert delete_result is True

scheduler_interval = (
director_v2_client.application.state.settings.DYNAMIC_SERVICES.DYNAMIC_SCHEDULER.DIRECTOR_V2_DYNAMIC_SCHEDULER_INTERVAL_SECONDS
director_v2_client.application.state.settings.DYNAMIC_SERVICES.DYNAMIC_SCHEDULER.DIRECTOR_V2_DYNAMIC_SCHEDULER_INTERVAL
)
# sleep enough to ensure the observation cycle properly stopped the service
await asyncio.sleep(2 * scheduler_interval)
await asyncio.sleep(2 * scheduler_interval.total_seconds())

await ensure_network_cleanup(docker_client, project_id)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,10 +177,10 @@ async def ensure_services_stopped(

# pylint: disable=protected-access
scheduler_interval = (
minimal_app.state.settings.DYNAMIC_SERVICES.DYNAMIC_SCHEDULER.DIRECTOR_V2_DYNAMIC_SCHEDULER_INTERVAL_SECONDS
minimal_app.state.settings.DYNAMIC_SERVICES.DYNAMIC_SCHEDULER.DIRECTOR_V2_DYNAMIC_SCHEDULER_INTERVAL
)
# sleep enough to ensure the observation cycle properly stopped the service
await asyncio.sleep(2 * scheduler_interval)
await asyncio.sleep(2 * scheduler_interval.total_seconds())
await ensure_network_cleanup(docker_client, project_id)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

@pytest.fixture
def mock_env(
mock_exclusive: None,
disable_rabbitmq: None,
disable_postgres: None,
mock_env: EnvVarsDict,
Expand Down Expand Up @@ -73,10 +74,7 @@ def mock_free_reserved_disk_space(mocker: MockerFixture) -> None:
async def mock_sidecar_api(
scheduler_data: SchedulerData,
) -> AsyncIterator[None]:
with respx.mock(
assert_all_called=False,
assert_all_mocked=True,
) as respx_mock:
with respx.mock(assert_all_called=False, assert_all_mocked=True) as respx_mock:
respx_mock.get(f"{scheduler_data.endpoint}/health", name="is_healthy").respond(
json={"is_healthy": True}
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import urllib.parse
from collections.abc import AsyncGenerator, Awaitable, Callable, Iterator
from contextlib import asynccontextmanager, contextmanager
from typing import Final
from unittest.mock import AsyncMock

import pytest
Expand All @@ -20,6 +21,7 @@
from models_library.service_settings_labels import SimcoreServiceLabels
from models_library.services_enums import ServiceState
from models_library.wallets import WalletID
from pydantic import NonNegativeFloat
from pytest_mock.plugin import MockerFixture
from pytest_simcore.helpers.typing_env import EnvVarsDict
from respx.router import MockRouter
Expand Down Expand Up @@ -52,7 +54,7 @@

# running scheduler at a hight rate to stress out the system
# and ensure faster tests
TEST_SCHEDULER_INTERVAL_SECONDS = 0.1
_TEST_SCHEDULER_INTERVAL_SECONDS: Final[NonNegativeFloat] = 0.1

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -125,6 +127,7 @@ async def _assert_get_dynamic_services_mocked(

@pytest.fixture
def mock_env(
mock_exclusive: None,
disable_postgres: None,
disable_rabbitmq: None,
disable_api_keys_manager: None,
Expand All @@ -136,8 +139,7 @@ def mock_env(
monkeypatch.setenv("SIMCORE_SERVICES_NETWORK_NAME", simcore_services_network_name)
monkeypatch.setenv("DIRECTOR_HOST", "mocked_out")
monkeypatch.setenv(
"DIRECTOR_V2_DYNAMIC_SCHEDULER_INTERVAL_SECONDS",
str(TEST_SCHEDULER_INTERVAL_SECONDS),
"DIRECTOR_V2_DYNAMIC_SCHEDULER_INTERVAL", f"{_TEST_SCHEDULER_INTERVAL_SECONDS}"
)
monkeypatch.setenv("DIRECTOR_V2_DYNAMIC_SCHEDULER_ENABLED", "true")
monkeypatch.setenv("S3_ENDPOINT", "endpoint")
Expand Down Expand Up @@ -346,9 +348,11 @@ async def test_collition_at_global_level_raises(
mocked_dynamic_scheduler_events: None,
mock_docker_api: None,
):
scheduler.scheduler._inverse_search_mapping[ # noqa: SLF001
scheduler.scheduler._inverse_search_mapping[
scheduler_data.node_uuid
] = ServiceName("mock_service_name")
] = ServiceName( # noqa: SLF001
"mock_service_name"
)
with pytest.raises(DynamicSidecarError) as execinfo:
await scheduler.scheduler.add_service_from_scheduler_data(scheduler_data)
assert "collide" in str(execinfo.value)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ def mock_env(

monkeypatch.setenv("DIRECTOR_V2_DYNAMIC_SCHEDULER_ENABLED", "true")
monkeypatch.setenv(
"DIRECTOR_V2_DYNAMIC_SCHEDULER_INTERVAL_SECONDS",
"DIRECTOR_V2_DYNAMIC_SCHEDULER_INTERVAL",
f"{SCHEDULER_INTERVAL_SECONDS}",
)

Expand Down Expand Up @@ -229,6 +229,7 @@ def mock_projects_repository(mocker: MockerFixture, node_present_in_db: bool) ->


async def test_skip_observation_cycle_after_error(
mock_exclusive: None,
docker_swarm: None,
minimal_app: FastAPI,
mock_projects_repository: None,
Expand Down
Loading
Loading