From 81fde538c4e5f677cc7c054e864c937a15312a6b Mon Sep 17 00:00:00 2001 From: Julian Querido Date: Wed, 14 Feb 2024 16:06:53 +0100 Subject: [PATCH 1/6] Allow a user to select from default project thumbnails --- .../dashboard/StudyThumbnailExplorer.js | 4 +- .../class/osparc/editor/ThumbnailEditor.js | 17 +++++-- .../osparc/editor/ThumbnailSuggestions.js | 47 ++++++++++++++----- .../source/class/osparc/theme/Appearance.js | 32 +++++++++++++ .../source/class/osparc/theme/Decoration.js | 21 +++++++++ .../client/source/class/osparc/utils/Utils.js | 12 +++++ 6 files changed, 115 insertions(+), 18 deletions(-) diff --git a/services/static-webserver/client/source/class/osparc/dashboard/StudyThumbnailExplorer.js b/services/static-webserver/client/source/class/osparc/dashboard/StudyThumbnailExplorer.js index be2371d3dd4..fe0565b2342 100644 --- a/services/static-webserver/client/source/class/osparc/dashboard/StudyThumbnailExplorer.js +++ b/services/static-webserver/client/source/class/osparc/dashboard/StudyThumbnailExplorer.js @@ -79,9 +79,9 @@ qx.Class.define("osparc.dashboard.StudyThumbnailExplorer", { __getThumbnailSuggestions: function() { const thumbnailSuggestions = new osparc.editor.ThumbnailSuggestions().set({ minHeight: this.self().THUMBNAIL_SLIDER_HEIGHT, - maxHeight: this.self().THUMBNAIL_SLIDER_HEIGHT, + maxHeight: this.self().THUMBNAIL_SLIDER_HEIGHT + 2, backgroundColor: "transparent", - padding: [3, 0] + padding: 3 }); return thumbnailSuggestions; }, diff --git a/services/static-webserver/client/source/class/osparc/editor/ThumbnailEditor.js b/services/static-webserver/client/source/class/osparc/editor/ThumbnailEditor.js index ec3cfa9ca86..6f040f72efc 100644 --- a/services/static-webserver/client/source/class/osparc/editor/ThumbnailEditor.js +++ b/services/static-webserver/client/source/class/osparc/editor/ThumbnailEditor.js @@ -78,8 +78,7 @@ qx.Class.define("osparc.editor.ThumbnailEditor", { switch (id) { case "url-field": control = new qx.ui.form.TextField().set({ - font: "text-14", - backgroundColor: "background-main", + appearance: "form-input", placeholder: this.tr("url") }); this.bind("url", control, "value"); @@ -87,7 +86,7 @@ qx.Class.define("osparc.editor.ThumbnailEditor", { break; case "thumbnails-layout": { control = new qx.ui.container.Composite(new qx.ui.layout.VBox(5)); - const label = new qx.ui.basic.Label(this.tr("or pick one from the list of services:")); + const label = new qx.ui.basic.Label(this.tr("or pick one from the list below:")); control.add(label); this._add(control, { flex: 1 @@ -96,6 +95,10 @@ qx.Class.define("osparc.editor.ThumbnailEditor", { } case "scroll-thumbnails": { control = new osparc.editor.ThumbnailSuggestions(); + control.set({ + padding: 2, + backgroundColor: "transparent" + }) const thumbnailsLayout = this.getChildControl("thumbnails-layout"); thumbnailsLayout.add(control); break; @@ -109,6 +112,9 @@ qx.Class.define("osparc.editor.ThumbnailEditor", { case "cancel-btn": { const buttons = this.getChildControl("buttons-layout"); control = new qx.ui.form.Button(this.tr("Cancel")); + control.set({ + appearance: "form-button-text" + }); control.addListener("execute", () => this.fireEvent("cancel"), this); buttons.add(control); break; @@ -116,6 +122,9 @@ qx.Class.define("osparc.editor.ThumbnailEditor", { case "save-btn": { const buttons = this.getChildControl("buttons-layout"); control = new qx.ui.form.Button(this.tr("Save")); + control.set({ + appearance: "form-button" + }); control.addListener("execute", () => { const urlField = this.getChildControl("url-field"); const validUrl = this.self().sanitizeUrl(urlField.getValue()); @@ -136,7 +145,7 @@ qx.Class.define("osparc.editor.ThumbnailEditor", { thumbnailSuggestions.setSuggestions(suggestions); thumbnailSuggestions.addListener("thumbnailTapped", e => { const thumbnailData = e.getData(); - this.setUrl(thumbnailData["source"]); + this.setUrl(thumbnailData["source"] || thumbnailData.getSource()); }); this.getChildControl("thumbnails-layout").setVisibility(suggestions.length ? "visible" : "excluded"); } diff --git a/services/static-webserver/client/source/class/osparc/editor/ThumbnailSuggestions.js b/services/static-webserver/client/source/class/osparc/editor/ThumbnailSuggestions.js index 4e832579c55..03afc330ec6 100644 --- a/services/static-webserver/client/source/class/osparc/editor/ThumbnailSuggestions.js +++ b/services/static-webserver/client/source/class/osparc/editor/ThumbnailSuggestions.js @@ -49,17 +49,31 @@ qx.Class.define("osparc.editor.ThumbnailSuggestions", { }, statics: { + defaultTemplates: [ + "https://raw.githubusercontent.com/ZurichMedTech/s4l-assets/main/app/full/project_thumbnails/Thumbnail.png", + "https://raw.githubusercontent.com/ZurichMedTech/s4l-assets/main/app/full/project_thumbnails/bright_coulomb.png", + "https://raw.githubusercontent.com/ZurichMedTech/s4l-assets/main/app/full/project_thumbnails/dynamic_hertz.png", + "https://raw.githubusercontent.com/ZurichMedTech/s4l-assets/main/app/full/project_thumbnails/electric_heaviside.png", + "https://raw.githubusercontent.com/ZurichMedTech/s4l-assets/main/app/full/project_thumbnails/energetic_ampere.png", + "https://raw.githubusercontent.com/ZurichMedTech/s4l-assets/main/app/full/project_thumbnails/glowing_tesla.png", + "https://raw.githubusercontent.com/ZurichMedTech/s4l-assets/main/app/full/project_thumbnails/illuminated_volta.png", + "https://raw.githubusercontent.com/ZurichMedTech/s4l-assets/main/app/full/project_thumbnails/luminous_ohm.png", + "https://raw.githubusercontent.com/ZurichMedTech/s4l-assets/main/app/full/project_thumbnails/magnetic_lorentz.png", + "https://raw.githubusercontent.com/ZurichMedTech/s4l-assets/main/app/full/project_thumbnails/radiant_maxwell.png", + "https://raw.githubusercontent.com/ZurichMedTech/s4l-assets/main/app/full/project_thumbnails/vibrant_faraday.png" + ], extractThumbnailSuggestions: function(study) { const suggestions = new Set([]); const wb = study.getWorkbench(); const nodes = wb.getWorkbenchInitData() ? wb.getWorkbenchInitData() : wb.getNodes(); Object.values(nodes).forEach(node => { - const srvMetadata = osparc.service.Utils.getMetaData(node["key"], node["version"]); + const srvMetadata = osparc.service.Utils.getMetaData(node.getKey(), node.getVersion()); if (srvMetadata && srvMetadata["thumbnail"] && !osparc.data.model.Node.isFrontend(node)) { suggestions.add(srvMetadata["thumbnail"]); } }); - return Array.from(suggestions); + const amendedArray = [...suggestions, ...this.defaultTemplates] + return Array.from(amendedArray); } }, @@ -122,12 +136,17 @@ qx.Class.define("osparc.editor.ThumbnailSuggestions", { }, thumbnailTapped: function(thumbnail) { - this.getChildren().forEach(thumbnailImg => osparc.utils.Utils.hideBorder(thumbnailImg)); + this.getChildren().forEach(thumbnailImg => { + osparc.utils.Utils.updateBorderColor(thumbnailImg, qx.theme.manager.Color.getInstance().resolve("box-shadow")); + osparc.utils.Utils.addBackground(thumbnailImg, qx.theme.manager.Color.getInstance().resolve("fab-background")); + }); const color = qx.theme.manager.Color.getInstance().resolve("background-selected-dark"); - osparc.utils.Utils.addBorder(thumbnail, 1, color); + const bgColor = qx.theme.manager.Color.getInstance().resolve("background-selected"); + osparc.utils.Utils.updateBorderColor(thumbnail, color); + osparc.utils.Utils.addBackground(thumbnail, bgColor); this.fireDataEvent("thumbnailTapped", { - type: thumbnail.thumbnailType, - source: thumbnail.thumbnailFileUrl + type: thumbnail.thumbnailType || "templateThumbnail", + source: thumbnail.thumbnailFileUrl || thumbnail.getSource() }); }, @@ -135,12 +154,16 @@ qx.Class.define("osparc.editor.ThumbnailSuggestions", { this.removeAll(); suggestions.forEach(suggestion => { const maxHeight = this.getMaxHeight(); - const thumbnail = new osparc.ui.basic.Thumbnail(suggestion["thumbnailUrl"], maxHeight, parseInt(maxHeight*2/3)); - thumbnail.thumbnailType = suggestion["type"]; - thumbnail.thumbnailFileUrl = suggestion["fileUrl"]; - thumbnail.setMarginLeft(1); // give some extra space to the selection border - thumbnail.addListener("mouseover", () => thumbnail.set({decorator: "selected-light"}), this); - thumbnail.addListener("mouseout", () => thumbnail.set({decorator: "fab-button"}), this); + const thumbnail = new osparc.ui.basic.Thumbnail(suggestion["thumbnailUrl"] || suggestion, maxHeight, parseInt(maxHeight*2/3)); + thumbnail.set({ + minWidth: 97, + margin: 0, + decorator: "thumbnail" + }); + thumbnail.thumbnailType = suggestion["type"] || "templateThumbnail"; + thumbnail.thumbnailFileUrl = suggestion["fileUrl"] || suggestion; + thumbnail.addListener("mouseover", () => thumbnail.set({decorator: "thumbnail-selected"}), this); + thumbnail.addListener("mouseout", () => thumbnail.set({decorator: "thumbnail"}), this); thumbnail.addListener("tap", () => { this.thumbnailTapped(thumbnail); }, this); diff --git a/services/static-webserver/client/source/class/osparc/theme/Appearance.js b/services/static-webserver/client/source/class/osparc/theme/Appearance.js index bb2b1e84f51..832061fc9c9 100644 --- a/services/static-webserver/client/source/class/osparc/theme/Appearance.js +++ b/services/static-webserver/client/source/class/osparc/theme/Appearance.js @@ -19,6 +19,7 @@ qx.Theme.define("osparc.theme.Appearance", { extend: osparc.theme.common.Appearance, appearances: { + "material-button-invalid": {}, "pb-list": { include: "list", alias: "list", @@ -616,6 +617,37 @@ qx.Theme.define("osparc.theme.Appearance", { } }, + "thumbnail": { + include: "form-button", + style: function(states) { + const style = { + decorator: "thumbnail", + cursor: "pointer", + padding: 5, + textColor: "fab_text", + backgroundColor: "fab-background", + center: true + }; + if (states.hovered) { + style.decorator = "form-button-hovered"; + } + if (states.focused) { + style.decorator = "form-button-focused"; + } + if (states.active) { + style.decorator = "form-button-active"; + } + if (states.disabled) { + style.cursor = "not-allowed"; + style.decorator = "form-button-disabled"; + } + if (states.checked || states.selected) { + style.decorator = "form-button-checked"; + } + return style; + } + }, + "form-button-text": { style: function(states) { const style = { diff --git a/services/static-webserver/client/source/class/osparc/theme/Decoration.js b/services/static-webserver/client/source/class/osparc/theme/Decoration.js index 3848cb7f2e0..41bfc971cd5 100644 --- a/services/static-webserver/client/source/class/osparc/theme/Decoration.js +++ b/services/static-webserver/client/source/class/osparc/theme/Decoration.js @@ -20,6 +20,7 @@ qx.Theme.define("osparc.theme.Decoration", { extend: osparc.theme.common.Decoration, decorations: { + "material-button-invalid": {}, "material-button": { style: { radius: 4, @@ -555,6 +556,26 @@ qx.Theme.define("osparc.theme.Decoration", { } }, + "thumbnail": { + include: "material-button", + style: { + style: "solid", + width: 1, + color: "box-shadow", + backgroundColor: "fab-background", + } + }, + + "thumbnail-selected": { + include: "thumbnail", + style: { + style: "solid", + width: 1, + color: "background-selected-dark", + backgroundColor: "background-selected" + } + }, + /* --------------------------------------------------------------------------- Appmotion diff --git a/services/static-webserver/client/source/class/osparc/utils/Utils.js b/services/static-webserver/client/source/class/osparc/utils/Utils.js index 85c786f5445..7f494be501a 100644 --- a/services/static-webserver/client/source/class/osparc/utils/Utils.js +++ b/services/static-webserver/client/source/class/osparc/utils/Utils.js @@ -428,6 +428,18 @@ qx.Class.define("osparc.utils.Utils", { widget.getContentElement().setStyle("border", width+"px solid " + color); }, + updateBorderColor: function(widget, color = "inherit") { + widget.getContentElement().setStyle("border-color", color); + }, + + addBackground: function(widget, color = "transparent") { + widget.getContentElement().setStyle("background-color", color); + }, + + removeBackground: function(widget) { + widget.getContentElement().setStyle("background-color", "transparent"); + }, + removeBorder: function(widget) { widget.getContentElement().setStyle("border", "0px solid"); }, From 923df199883f9d7a565bac52427b23bdc6f22846 Mon Sep 17 00:00:00 2001 From: Julian Querido Date: Wed, 14 Feb 2024 16:23:44 +0100 Subject: [PATCH 2/6] Small button alignment and reduction in opacity for study list background colors --- .../osparc/desktop/preferences/pages/TagsPage.js | 2 +- .../source/class/osparc/form/tag/TagManager.js | 2 +- .../client/source/class/osparc/theme/mixin/Color.js | 12 ++++++------ 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/services/static-webserver/client/source/class/osparc/desktop/preferences/pages/TagsPage.js b/services/static-webserver/client/source/class/osparc/desktop/preferences/pages/TagsPage.js index 131ffaae76c..d9409726d25 100644 --- a/services/static-webserver/client/source/class/osparc/desktop/preferences/pages/TagsPage.js +++ b/services/static-webserver/client/source/class/osparc/desktop/preferences/pages/TagsPage.js @@ -43,7 +43,7 @@ qx.Class.define("osparc.desktop.preferences.pages.TagsPage", { __createComponents: function() { this.__addTagButton = new qx.ui.form.Button().set({ - appearance: "strong-button", + appearance: "form-button-outlined", label: this.tr("New Tag"), icon: "@FontAwesome5Solid/plus/14" }); diff --git a/services/static-webserver/client/source/class/osparc/form/tag/TagManager.js b/services/static-webserver/client/source/class/osparc/form/tag/TagManager.js index 3baebcebd1a..ae3ef918adb 100644 --- a/services/static-webserver/client/source/class/osparc/form/tag/TagManager.js +++ b/services/static-webserver/client/source/class/osparc/form/tag/TagManager.js @@ -75,7 +75,7 @@ qx.Class.define("osparc.form.tag.TagManager", { }); const addTagButton = this.__addTagButton = new qx.ui.form.Button().set({ - appearance: "strong-button", + appearance: "form-button-outlined", label: this.tr("New Tag"), icon: "@FontAwesome5Solid/plus/14", alignX: "center", diff --git a/services/static-webserver/client/source/class/osparc/theme/mixin/Color.js b/services/static-webserver/client/source/class/osparc/theme/mixin/Color.js index b3ca5e09427..65429eedb18 100644 --- a/services/static-webserver/client/source/class/osparc/theme/mixin/Color.js +++ b/services/static-webserver/client/source/class/osparc/theme/mixin/Color.js @@ -19,12 +19,12 @@ qx.Theme.define("osparc.theme.mixin.Color", { "error_bg": "rgba(255, 108, 108, 0.1)", // #FF6C6C1A "pb-new": "rgba(10, 182, 255, 0.25)", - "pb-study": "rgba(10, 182, 255, 0.5)", - "pb-template": "rgba(9, 89, 122, 0.5)", - "pb-service": "rgba(248, 219, 31, 0.5)", - "pb-computational": "rgba(255, 165, 0, 0.5)", - "pb-dynamic": "rgba(248, 219, 31, 0.5)", - "pb-locked": "rgba(113, 157, 181, 0.5)", + "pb-study": "rgba(10, 182, 255, 0.4)", + "pb-template": "rgba(9, 89, 122, 0.4)", + "pb-service": "rgba(248, 219, 31, 0.4)", + "pb-computational": "rgba(255, 165, 0, 0.4)", + "pb-dynamic": "rgba(248, 219, 31, 0.4)", + "pb-locked": "rgba(113, 157, 181, 0.4)", // button "default-button-text": "rgba(255, 255, 255, 1)", From a6ebef7d8f435b084e1d39ecc5bce785ea6ce651 Mon Sep 17 00:00:00 2001 From: Pedro Crespo-Valero <32402063+pcrespov@users.noreply.github.com> Date: Fri, 16 Feb 2024 18:46:36 +0100 Subject: [PATCH 3/6] =?UTF-8?q?=F0=9F=8E=A8=20Enhances=20setup=20to=20reme?= =?UTF-8?q?diate=20accumulation=20of=20messages=20in=20socketio=20exchange?= =?UTF-8?q?=20(#5341)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../_rabbitmq_exclusive_queue_consumers.py | 81 ++++---- .../payments/_socketio.py | 34 ++-- .../projects/projects_api.py | 57 +++--- .../socketio/_handlers.py | 16 +- .../socketio/messages.py | 92 ++++++--- .../socketio/server.py | 7 +- .../server/tests/integration/02/conftest.py | 2 +- .../notifications/test_rabbitmq_consumers.py | 190 ++++++++++-------- .../notifications/test_rabbitmq_consumers.py | 65 +++--- .../03/wallets/payments/test_payments.py | 6 +- .../wallets/payments/test_payments_methods.py | 4 +- 11 files changed, 311 insertions(+), 243 deletions(-) diff --git a/services/web/server/src/simcore_service_webserver/notifications/_rabbitmq_exclusive_queue_consumers.py b/services/web/server/src/simcore_service_webserver/notifications/_rabbitmq_exclusive_queue_consumers.py index ebd61aec857..48327c74344 100644 --- a/services/web/server/src/simcore_service_webserver/notifications/_rabbitmq_exclusive_queue_consumers.py +++ b/services/web/server/src/simcore_service_webserver/notifications/_rabbitmq_exclusive_queue_consumers.py @@ -28,8 +28,8 @@ SOCKET_IO_NODE_UPDATED_EVENT, SOCKET_IO_PROJECT_PROGRESS_EVENT, SOCKET_IO_WALLET_OSPARC_CREDITS_UPDATED_EVENT, - send_messages_to_group, - send_messages_to_user, + send_message_to_standard_group, + send_message_to_user, ) from ..wallets import api as wallets_api from ._constants import APP_RABBITMQ_CONSUMERS_KEY @@ -97,59 +97,59 @@ async def _progress_message_parser(app: web.Application, data: bytes) -> bool: rabbit_message: ProgressRabbitMessageNode | ProgressRabbitMessageProject = ( parse_raw_as(ProgressRabbitMessageNode | ProgressRabbitMessageProject, data) ) - socket_message: SocketMessageDict | None = None + message: SocketMessageDict | None = None if isinstance(rabbit_message, ProgressRabbitMessageProject): - socket_message = _convert_to_project_progress_event(rabbit_message) + message = _convert_to_project_progress_event(rabbit_message) + elif rabbit_message.progress_type is ProgressType.COMPUTATION_RUNNING: - socket_message = await _convert_to_node_update_event(app, rabbit_message) - else: - socket_message = _convert_to_node_progress_event(rabbit_message) - if socket_message: - await send_messages_to_user(app, rabbit_message.user_id, [socket_message]) + message = await _convert_to_node_update_event(app, rabbit_message) + else: + message = _convert_to_node_progress_event(rabbit_message) + + if message: + await send_message_to_user( + app, + rabbit_message.user_id, + message=message, + ignore_queue=True, + ) return True async def _log_message_parser(app: web.Application, data: bytes) -> bool: rabbit_message = LoggerRabbitMessage.parse_raw(data) - socket_messages: list[SocketMessageDict] = [ - { - "event_type": SOCKET_IO_LOG_EVENT, - "data": rabbit_message.dict(exclude={"user_id", "channel_name"}), - } - ] - await send_messages_to_user(app, rabbit_message.user_id, socket_messages) + await send_message_to_user( + app, + rabbit_message.user_id, + message=SocketMessageDict( + event_type=SOCKET_IO_LOG_EVENT, + data=rabbit_message.dict(exclude={"user_id", "channel_name"}), + ), + ignore_queue=True, + ) return True async def _events_message_parser(app: web.Application, data: bytes) -> bool: rabbit_message = EventRabbitMessage.parse_raw(data) - - socket_messages: list[SocketMessageDict] = [ - { - "event_type": SOCKET_IO_EVENT, - "data": { + await send_message_to_user( + app, + rabbit_message.user_id, + message=SocketMessageDict( + event_type=SOCKET_IO_EVENT, + data={ "action": rabbit_message.action, "node_id": f"{rabbit_message.node_id}", }, - } - ] - await send_messages_to_user(app, rabbit_message.user_id, socket_messages) + ), + ignore_queue=True, + ) return True async def _osparc_credits_message_parser(app: web.Application, data: bytes) -> bool: rabbit_message = parse_raw_as(WalletCreditsMessage, data) - socket_messages: list[SocketMessageDict] = [ - { - "event_type": SOCKET_IO_WALLET_OSPARC_CREDITS_UPDATED_EVENT, - "data": { - "wallet_id": rabbit_message.wallet_id, - "osparc_credits": rabbit_message.credits, - "created_at": rabbit_message.created_at, - }, - } - ] wallet_groups = await wallets_api.list_wallet_groups_with_read_access_by_wallet( app, wallet_id=rabbit_message.wallet_id ) @@ -157,7 +157,18 @@ async def _osparc_credits_message_parser(app: web.Application, data: bytes) -> b item.gid for item in wallet_groups ) for room in rooms_to_notify: - await send_messages_to_group(app, room, socket_messages) + await send_message_to_standard_group( + app, + room, + message=SocketMessageDict( + event_type=SOCKET_IO_WALLET_OSPARC_CREDITS_UPDATED_EVENT, + data={ + "wallet_id": rabbit_message.wallet_id, + "osparc_credits": rabbit_message.credits, + "created_at": rabbit_message.created_at, + }, + ), + ) return True diff --git a/services/web/server/src/simcore_service_webserver/payments/_socketio.py b/services/web/server/src/simcore_service_webserver/payments/_socketio.py index 8e4334636a4..01bf0ec3268 100644 --- a/services/web/server/src/simcore_service_webserver/payments/_socketio.py +++ b/services/web/server/src/simcore_service_webserver/payments/_socketio.py @@ -11,7 +11,7 @@ from models_library.users import UserID from models_library.utils.fastapi_encoders import jsonable_encoder -from ..socketio.messages import send_messages_to_user +from ..socketio.messages import send_message_to_user async def notify_payment_completed( @@ -22,13 +22,15 @@ async def notify_payment_completed( ): assert payment.completed_at is not None # nosec - messages: list[SocketMessageDict] = [ - { - "event_type": SOCKET_IO_PAYMENT_COMPLETED_EVENT, - "data": jsonable_encoder(payment, by_alias=True), - } - ] - await send_messages_to_user(app, user_id, messages) + await send_message_to_user( + app, + user_id, + message=SocketMessageDict( + event_type=SOCKET_IO_PAYMENT_COMPLETED_EVENT, + data=jsonable_encoder(payment, by_alias=True), + ), + ignore_queue=True, + ) async def notify_payment_method_acked( @@ -37,10 +39,12 @@ async def notify_payment_method_acked( user_id: UserID, payment_method_transaction: PaymentMethodTransaction, ): - messages: list[SocketMessageDict] = [ - { - "event_type": SOCKET_IO_PAYMENT_METHOD_ACKED_EVENT, - "data": jsonable_encoder(payment_method_transaction, by_alias=True), - } - ] - await send_messages_to_user(app, user_id, messages) + await send_message_to_user( + app, + user_id, + message=SocketMessageDict( + event_type=SOCKET_IO_PAYMENT_METHOD_ACKED_EVENT, + data=jsonable_encoder(payment_method_transaction, by_alias=True), + ), + ignore_queue=True, + ) diff --git a/services/web/server/src/simcore_service_webserver/projects/projects_api.py b/services/web/server/src/simcore_service_webserver/projects/projects_api.py index db1a604e810..061fcbd0b3c 100644 --- a/services/web/server/src/simcore_service_webserver/projects/projects_api.py +++ b/services/web/server/src/simcore_service_webserver/projects/projects_api.py @@ -96,8 +96,8 @@ from ..socketio.messages import ( SOCKET_IO_NODE_UPDATED_EVENT, SOCKET_IO_PROJECT_UPDATED_EVENT, - send_messages_to_group, - send_messages_to_user, + send_message_to_standard_group, + send_message_to_user, ) from ..storage import api as storage_api from ..users.api import FullNameDict, get_user_fullname, get_user_role @@ -1441,30 +1441,31 @@ async def remove_project_dynamic_services( async def notify_project_state_update( app: web.Application, project: ProjectDict, - notify_only_user: int | None = None, + notify_only_user: UserID | None = None, ) -> None: if await is_project_hidden(app, ProjectID(project["uuid"])): return - messages: list[SocketMessageDict] = [ - { - "event_type": SOCKET_IO_PROJECT_UPDATED_EVENT, - "data": { - "project_uuid": project["uuid"], - "data": project["state"], - }, - } - ] + message = SocketMessageDict( + event_type=SOCKET_IO_PROJECT_UPDATED_EVENT, + data={ + "project_uuid": project["uuid"], + "data": project["state"], + }, + ) if notify_only_user: - await send_messages_to_user( - app, user_id=f"{notify_only_user}", messages=messages + await send_message_to_user( + app, + user_id=notify_only_user, + message=message, + ignore_queue=True, ) else: rooms_to_notify: Generator[GroupID, None, None] = ( gid for gid, rights in project["accessRights"].items() if rights["read"] ) for room in rooms_to_notify: - await send_messages_to_group(app, room, messages) + await send_message_to_standard_group(app, group_id=room, message=message) async def notify_project_node_update( @@ -1480,22 +1481,20 @@ async def notify_project_node_update( gid for gid, rights in project["accessRights"].items() if rights["read"] ] - messages: list[SocketMessageDict] = [ - { - "event_type": SOCKET_IO_NODE_UPDATED_EVENT, - "data": { - "project_id": project["uuid"], - "node_id": f"{node_id}", - # as GET projects/{project_id}/nodes/{node_id} - "data": project["workbench"][f"{node_id}"], - # as GET projects/{project_id}/nodes/{node_id}/errors - "errors": errors, - }, - } - ] + message = SocketMessageDict( + event_type=SOCKET_IO_NODE_UPDATED_EVENT, + data={ + "project_id": project["uuid"], + "node_id": f"{node_id}", + # as GET projects/{project_id}/nodes/{node_id} + "data": project["workbench"][f"{node_id}"], + # as GET projects/{project_id}/nodes/{node_id}/errors + "errors": errors, + }, + ) for room in rooms_to_notify: - await send_messages_to_group(app, room, messages) + await send_message_to_standard_group(app, room, message) async def retrieve_and_notify_project_locked_state( diff --git a/services/web/server/src/simcore_service_webserver/socketio/_handlers.py b/services/web/server/src/simcore_service_webserver/socketio/_handlers.py index d31f46e9ac3..18386c92175 100644 --- a/services/web/server/src/simcore_service_webserver/socketio/_handlers.py +++ b/services/web/server/src/simcore_service_webserver/socketio/_handlers.py @@ -22,7 +22,7 @@ from ..products.api import Product, get_current_product from ..resource_manager.user_sessions import managed_resource from ._utils import EnvironDict, SocketID, get_socket_server, register_socketio_handler -from .messages import SOCKET_IO_HEARTBEAT_EVENT, send_messages_to_user +from .messages import SOCKET_IO_HEARTBEAT_EVENT, send_message_to_user _logger = logging.getLogger(__name__) @@ -138,16 +138,14 @@ async def connect( product_name, ) - heart_beat_messages: list[SocketMessageDict] = [ - { - "event_type": SOCKET_IO_HEARTBEAT_EVENT, - "data": {"interval": _EMIT_INTERVAL_S}, - } - ] - await send_messages_to_user( + await send_message_to_user( app, user_id, - heart_beat_messages, + message=SocketMessageDict( + event_type=SOCKET_IO_HEARTBEAT_EVENT, + data={"interval": _EMIT_INTERVAL_S}, + ), + ignore_queue=True, ) except web.HTTPUnauthorized as exc: diff --git a/services/web/server/src/simcore_service_webserver/socketio/messages.py b/services/web/server/src/simcore_service_webserver/socketio/messages.py index 9b3c416be43..75c8c25611a 100644 --- a/services/web/server/src/simcore_service_webserver/socketio/messages.py +++ b/services/web/server/src/simcore_service_webserver/socketio/messages.py @@ -3,7 +3,6 @@ """ import logging -from collections.abc import Sequence from typing import Final from aiohttp.web import Application @@ -11,7 +10,6 @@ from models_library.socketio import SocketMessageDict from models_library.users import GroupID, UserID from servicelib.json_serialization import json_dumps -from servicelib.utils import logged_gather from socketio import AsyncServer from ._utils import get_socket_server @@ -32,37 +30,77 @@ SOCKET_IO_WALLET_OSPARC_CREDITS_UPDATED_EVENT: Final[str] = "walletOsparcCreditsUpdated" -async def send_messages_to_user( - app: Application, user_id: UserID, messages: Sequence[SocketMessageDict] +async def _safe_emit( + sio: AsyncServer, + *, + room: SocketIORoomStr, + message: SocketMessageDict, + ignore_queue: bool, +): + # NOTE 1 : we configured message queue (i.e. socketio servers are backed with rabbitMQ) + # so if `ignore_queue=True` then the server can directly communicate with the + # client without having to send his message first to rabbitMQ and then back to itself. + # + # NOTE 2: `emit` method is not designed to be used concurrently + try: + event = message["event_type"] + data = json_dumps(message["data"]) + await sio.emit( + event=event, + data=data, + room=room, + ignore_queue=ignore_queue, + ) + except Exception: # pylint: disable=broad-exception-caught + _logger.warning( + "Failed to deliver %s message to %s size=%d", + f"{event=}", + f"{room=}", + len(data), + exc_info=True, + ) + + +async def send_message_to_user( + app: Application, + user_id: UserID, + message: SocketMessageDict, + *, + ignore_queue: bool, ) -> None: + """ + Keyword Arguments: + ignore_queue -- set to False when this message is delivered from a server that has no direct connection to the client (default: {True}) + An example where this is value is False, is sending messages to a user in the GC + """ sio: AsyncServer = get_socket_server(app) - await logged_gather( - *( - sio.emit( - message["event_type"], - json_dumps(message["data"]), - room=SocketIORoomStr.from_user_id(user_id=user_id), - ) - for message in messages - ), - reraise=False, - log=_logger, - max_concurrency=100, + await _safe_emit( + sio, + room=SocketIORoomStr.from_user_id(user_id), + message=message, + ignore_queue=ignore_queue, ) -async def send_messages_to_group( - app: Application, group_id: GroupID, messages: Sequence[SocketMessageDict] +async def send_message_to_standard_group( + app: Application, + group_id: GroupID, + message: SocketMessageDict, ) -> None: + """ + WARNING: please do not use primary groups here. To transmit to the + user use instead send_message_to_user + + NOTE: despite the name, it can also be used for EVERYONE + """ sio: AsyncServer = get_socket_server(app) - send_tasks = [ - sio.emit( - message["event_type"], - json_dumps(message["data"]), - room=SocketIORoomStr.from_group_id(group_id), - ) - for message in messages - ] - await logged_gather(*send_tasks, reraise=False, log=_logger, max_concurrency=10) + await _safe_emit( + sio, + room=SocketIORoomStr.from_group_id(group_id), + message=message, + # NOTE: A standard group refers to different users + # that might be connected to different replicas + ignore_queue=False, + ) diff --git a/services/web/server/src/simcore_service_webserver/socketio/server.py b/services/web/server/src/simcore_service_webserver/socketio/server.py index 175604e30ad..35aabc9aff8 100644 --- a/services/web/server/src/simcore_service_webserver/socketio/server.py +++ b/services/web/server/src/simcore_service_webserver/socketio/server.py @@ -17,13 +17,16 @@ async def _socketio_server_cleanup_ctx(app: web.Application) -> AsyncIterator[None]: + use_logger: bool | logging.Logger = _logger + # SEE https://github.com/miguelgrinberg/python-socketio/blob/v4.6.1/docs/server.rst#aiohttp server_manager = AsyncAioPikaManager( - url=get_rabbitmq_settings(app).dsn, logger=_logger + url=get_rabbitmq_settings(app).dsn, + logger=use_logger, ) sio_server = AsyncServer( async_mode="aiohttp", - logger=_logger, + logger=use_logger, engineio_logger=False, client_manager=server_manager, ) diff --git a/services/web/server/tests/integration/02/conftest.py b/services/web/server/tests/integration/02/conftest.py index 4f64de393d8..920a7c7abbc 100644 --- a/services/web/server/tests/integration/02/conftest.py +++ b/services/web/server/tests/integration/02/conftest.py @@ -3,8 +3,8 @@ # pylint:disable=redefined-outer-name import json +from collections.abc import AsyncIterator from pathlib import Path -from typing import AsyncIterator from uuid import uuid4 import pytest diff --git a/services/web/server/tests/integration/02/notifications/test_rabbitmq_consumers.py b/services/web/server/tests/integration/02/notifications/test_rabbitmq_consumers.py index 0f44f71c42a..706d66936ea 100644 --- a/services/web/server/tests/integration/02/notifications/test_rabbitmq_consumers.py +++ b/services/web/server/tests/integration/02/notifications/test_rabbitmq_consumers.py @@ -5,8 +5,9 @@ import asyncio import json +from collections.abc import Awaitable, Callable from random import choice -from typing import Any, Awaitable, Callable +from typing import Any from unittest import mock import aiopg @@ -79,7 +80,7 @@ async def _assert_handler_not_called(handler: mock.Mock) -> None: - with pytest.raises(RetryError): + with pytest.raises(RetryError): # noqa: PT012 async for attempt in AsyncRetrying( retry=retry_always, stop=stop_after_delay(_STABLE_DELAY_S), @@ -175,25 +176,52 @@ async def rabbitmq_publisher( return create_rabbitmq_client("pytest_publisher") +@pytest.fixture +def random_node_id_in_user_project(user_project: ProjectDict) -> NodeID: + workbench = list(user_project["workbench"]) + return NodeID(choice(workbench)) # noqa: S311 + + +@pytest.fixture +def user_project_id(user_project: ProjectDict) -> ProjectID: + return ProjectID(user_project["uuid"]) + + +@pytest.fixture +def user_id(logged_user: UserInfoDict) -> UserID: + return UserID(logged_user["id"]) + + +@pytest.fixture +def sender_user_id(user_id: UserID, sender_same_user_id: bool, faker: Faker) -> UserID: + if sender_same_user_id is False: + return UserID(faker.pyint(min_value=user_id + 1)) + return user_id + + @pytest.mark.parametrize("user_role", [UserRole.GUEST], ids=str) @pytest.mark.parametrize( - "sender_same_user_id", [True, False], ids=lambda id: f"same_sender_id={id}" + "sender_same_user_id", [True, False], ids=lambda id_: f"same_sender_id={id_}" ) @pytest.mark.parametrize( - "subscribe_to_logs", [True, False], ids=lambda id: f"subscribed={id}" + "subscribe_to_logs", [True, False], ids=lambda id_: f"subscribed={id_}" ) async def test_log_workflow( client: TestClient, rabbitmq_publisher: RabbitMQClient, - logged_user: UserInfoDict, - user_project: ProjectDict, - faker: Faker, + subscribe_to_logs: bool, socketio_client_factory: Callable[ [str | None, TestClient | None], Awaitable[socketio.AsyncClient] ], - mocker: MockerFixture, + # user sender_same_user_id: bool, - subscribe_to_logs: bool, + sender_user_id: UserID, + # project + random_node_id_in_user_project: NodeID, + user_project_id: ProjectID, + # + faker: Faker, + mocker: MockerFixture, ): """ RabbitMQ (TOPIC) --> Webserver --> Redis --> webclient (socketio) @@ -204,20 +232,14 @@ async def test_log_workflow( mock_log_handler = mocker.MagicMock() socket_io_conn.on(SOCKET_IO_LOG_EVENT, handler=mock_log_handler) - project_id = ProjectID(user_project["uuid"]) - random_node_id_in_project = NodeID(choice(list(user_project["workbench"]))) - sender_user_id = UserID(logged_user["id"]) - if sender_same_user_id is False: - sender_user_id = UserID(faker.pyint(min_value=logged_user["id"] + 1)) - if subscribe_to_logs: assert client.app - await project_logs.subscribe(client.app, project_id) + await project_logs.subscribe(client.app, user_project_id) log_message = LoggerRabbitMessage( user_id=sender_user_id, - project_id=project_id, - node_id=random_node_id_in_project, + project_id=user_project_id, + node_id=random_node_id_in_user_project, messages=[faker.text() for _ in range(10)], ) await rabbitmq_publisher.publish(log_message.channel_name, log_message) @@ -236,8 +258,12 @@ async def test_log_workflow( async def test_log_workflow_only_receives_messages_if_subscribed( client: TestClient, rabbitmq_publisher: RabbitMQClient, - logged_user: UserInfoDict, - user_project: ProjectDict, + # user + user_id: UserID, + # project + random_node_id_in_user_project: NodeID, + user_project_id: ProjectID, + # faker: Faker, mocker: MockerFixture, ): @@ -246,21 +272,17 @@ async def test_log_workflow_only_receives_messages_if_subscribed( """ mocked_send_messages = mocker.patch( - "simcore_service_webserver.notifications._rabbitmq_exclusive_queue_consumers.send_messages_to_user", + "simcore_service_webserver.notifications._rabbitmq_exclusive_queue_consumers.send_message_to_user", autospec=True, ) - project_id = ProjectID(user_project["uuid"]) - random_node_id_in_project = NodeID(choice(list(user_project["workbench"]))) - sender_user_id = UserID(logged_user["id"]) - assert client.app - await project_logs.subscribe(client.app, project_id) + await project_logs.subscribe(client.app, user_project_id) log_message = LoggerRabbitMessage( - user_id=sender_user_id, - project_id=project_id, - node_id=random_node_id_in_project, + user_id=user_id, + project_id=user_project_id, + node_id=random_node_id_in_user_project, messages=[faker.text() for _ in range(10)], ) await rabbitmq_publisher.publish(log_message.channel_name, log_message) @@ -269,18 +291,17 @@ async def test_log_workflow_only_receives_messages_if_subscribed( mock.call( client.app, log_message.user_id, - [ - { - "event_type": SOCKET_IO_LOG_EVENT, - "data": log_message.dict(exclude={"user_id", "channel_name"}), - } - ], + message={ + "event_type": SOCKET_IO_LOG_EVENT, + "data": log_message.dict(exclude={"user_id", "channel_name"}), + }, + ignore_queue=True, ), ) mocked_send_messages.reset_mock() # when unsubscribed, we do not receive the messages anymore - await project_logs.unsubscribe(client.app, project_id) + await project_logs.unsubscribe(client.app, user_project_id) await rabbitmq_publisher.publish(log_message.channel_name, log_message) await _assert_handler_not_called(mocked_send_messages) @@ -292,24 +313,27 @@ async def test_log_workflow_only_receives_messages_if_subscribed( ids=str, ) @pytest.mark.parametrize( - "sender_same_user_id", [True, False], ids=lambda id: f"same_sender_id={id}" + "sender_same_user_id", [True, False], ids=lambda id_: f"same_sender_id={id_}" ) @pytest.mark.parametrize( - "subscribe_to_logs", [True, False], ids=lambda id: f"subscribed={id}" + "subscribe_to_logs", [True, False], ids=lambda id_: f"subscribed={id_}" ) async def test_progress_non_computational_workflow( client: TestClient, rabbitmq_publisher: RabbitMQClient, - logged_user: UserInfoDict, - user_project: ProjectDict, - faker: Faker, socketio_client_factory: Callable[ [str | None, TestClient | None], Awaitable[socketio.AsyncClient] ], - mocker: MockerFixture, + subscribe_to_logs: bool, progress_type: ProgressType, + # user sender_same_user_id: bool, - subscribe_to_logs: bool, + sender_user_id: UserID, + # project + random_node_id_in_user_project: NodeID, + user_project_id: ProjectID, + # + mocker: MockerFixture, ): """ RabbitMQ (TOPIC) --> Webserver --> Redis --> webclient (socketio) @@ -320,18 +344,14 @@ async def test_progress_non_computational_workflow( mock_progress_handler = mocker.MagicMock() socket_io_conn.on(SOCKET_IO_NODE_PROGRESS_EVENT, handler=mock_progress_handler) - random_node_id_in_project = NodeID(choice(list(user_project["workbench"]))) - sender_user_id = UserID(logged_user["id"]) - project_id = ProjectID(user_project["uuid"]) - if sender_same_user_id is False: - sender_user_id = UserID(faker.pyint(min_value=logged_user["id"] + 1)) if subscribe_to_logs: assert client.app - await project_logs.subscribe(client.app, project_id) + await project_logs.subscribe(client.app, user_project_id) + progress_message = ProgressRabbitMessageNode( user_id=sender_user_id, - project_id=ProjectID(user_project["uuid"]), - node_id=random_node_id_in_project, + project_id=user_project_id, + node_id=random_node_id_in_user_project, progress=0.3, progress_type=progress_type, ) @@ -347,24 +367,27 @@ async def test_progress_non_computational_workflow( @pytest.mark.parametrize("user_role", [UserRole.GUEST], ids=str) @pytest.mark.parametrize( - "sender_same_user_id", [True, False], ids=lambda id: f"same_sender_id={id}" + "sender_same_user_id", [True, False], ids=lambda id_: f"same_sender_id={id_}" ) @pytest.mark.parametrize( - "subscribe_to_logs", [True, False], ids=lambda id: f"subscribed={id}" + "subscribe_to_logs", [True, False], ids=lambda id_: f"subscribed={id_}" ) async def test_progress_computational_workflow( client: TestClient, rabbitmq_publisher: RabbitMQClient, - logged_user: UserInfoDict, user_project: ProjectDict, socketio_client_factory: Callable[ [str | None, TestClient | None], Awaitable[socketio.AsyncClient] ], mocker: MockerFixture, aiopg_engine: aiopg.sa.Engine, - sender_same_user_id: bool, - faker: Faker, subscribe_to_logs: bool, + # user + sender_same_user_id: bool, + sender_user_id: UserID, + # project + random_node_id_in_user_project: NodeID, + user_project_id: ProjectID, ): """ RabbitMQ (TOPIC) --> Webserver --> DB (get project) @@ -375,18 +398,14 @@ async def test_progress_computational_workflow( mock_progress_handler = mocker.MagicMock() socket_io_conn.on(SOCKET_IO_NODE_UPDATED_EVENT, handler=mock_progress_handler) - random_node_id_in_project = NodeID(choice(list(user_project["workbench"]))) - sender_user_id = UserID(logged_user["id"]) - project_id = ProjectID(user_project["uuid"]) - if sender_same_user_id is False: - sender_user_id = UserID(faker.pyint(min_value=logged_user["id"] + 1)) + if subscribe_to_logs: assert client.app - await project_logs.subscribe(client.app, project_id) + await project_logs.subscribe(client.app, user_project_id) progress_message = ProgressRabbitMessageNode( user_id=sender_user_id, - project_id=ProjectID(user_project["uuid"]), - node_id=random_node_id_in_project, + project_id=user_project_id, + node_id=random_node_id_in_user_project, progress=0.3, progress_type=ProgressType.COMPUTATION_RUNNING, ) @@ -398,7 +417,7 @@ async def test_progress_computational_workflow( progress_message, include={"node_id", "project_id"} ) expected_call |= { - "data": user_project["workbench"][f"{random_node_id_in_project}"] + "data": user_project["workbench"][f"{random_node_id_in_user_project}"] } expected_call["data"]["progress"] = int(progress_message.progress * 100) await _assert_handler_called_with_json(mock_progress_handler, expected_call) @@ -409,14 +428,17 @@ async def test_progress_computational_workflow( async with aiopg_engine.acquire() as conn: result = await conn.execute( sa.select(projects.c.workbench).where( - projects.c.uuid == user_project["uuid"] + projects.c.uuid == str(user_project_id) ) ) row = await result.fetchone() assert row project_workbench = dict(row[projects.c.workbench]) # NOTE: the progress might still be present but is not used anymore - assert project_workbench[f"{random_node_id_in_project}"].get("progress", 0) == 0 + assert ( + project_workbench[f"{random_node_id_in_user_project}"].get("progress", 0) + == 0 + ) @pytest.mark.parametrize("user_role", [UserRole.GUEST], ids=str) @@ -424,11 +446,14 @@ async def test_progress_computational_workflow( async def test_instrumentation_workflow( client: TestClient, rabbitmq_publisher: RabbitMQClient, - logged_user: UserInfoDict, - user_project: ProjectDict, mocker: MockerFixture, faker: Faker, metrics_name: str, + # user + user_id: UserID, + # project + random_node_id_in_user_project: NodeID, + user_project_id: ProjectID, ): """ RabbitMQ --> Webserver --> Prometheus metrics @@ -439,11 +464,10 @@ async def test_instrumentation_workflow( f"simcore_service_webserver.notifications._rabbitmq_nonexclusive_queue_consumers.{metrics_name}" ) - random_node_id_in_project = NodeID(choice(list(user_project["workbench"]))) rabbit_message = InstrumentationRabbitMessage( - user_id=UserID(logged_user["id"]), - project_id=ProjectID(user_project["uuid"]), - node_id=random_node_id_in_project, + user_id=user_id, + project_id=user_project_id, + node_id=random_node_id_in_user_project, metrics=metrics_name, service_uuid=faker.uuid4(), service_key=faker.pystr(), @@ -468,19 +492,21 @@ async def test_instrumentation_workflow( @pytest.mark.parametrize("user_role", [UserRole.GUEST], ids=str) @pytest.mark.parametrize( - "sender_same_user_id", [True, False], ids=lambda id: f"same_sender_id={id}" + "sender_same_user_id", [True, False], ids=lambda id_: f"same_sender_id={id_}" ) async def test_event_workflow( + mocker: MockerFixture, client: TestClient, rabbitmq_publisher: RabbitMQClient, - logged_user: UserInfoDict, - user_project: ProjectDict, socketio_client_factory: Callable[ [str | None, TestClient | None], Awaitable[socketio.AsyncClient] ], - mocker: MockerFixture, + # user sender_same_user_id: bool, - faker: Faker, + sender_user_id: UserID, + # project + random_node_id_in_user_project: NodeID, + user_project_id: ProjectID, ): """ RabbitMQ --> Webserver --> Redis --> webclient (socketio) @@ -490,14 +516,10 @@ async def test_event_workflow( mock_event_handler = mocker.MagicMock() socket_io_conn.on(SOCKET_IO_EVENT, handler=mock_event_handler) - sender_user_id = UserID(logged_user["id"]) - if sender_same_user_id is False: - sender_user_id = UserID(faker.pyint(min_value=logged_user["id"] + 1)) - random_node_id_in_project = NodeID(choice(list(user_project["workbench"]))) rabbit_message = EventRabbitMessage( user_id=sender_user_id, - project_id=ProjectID(user_project["uuid"]), - node_id=random_node_id_in_project, + project_id=user_project_id, + node_id=random_node_id_in_user_project, action=RabbitEventMessageType.RELOAD_IFRAME, ) diff --git a/services/web/server/tests/unit/isolated/notifications/test_rabbitmq_consumers.py b/services/web/server/tests/unit/isolated/notifications/test_rabbitmq_consumers.py index 8da2ec1c903..e18534f8d86 100644 --- a/services/web/server/tests/unit/isolated/notifications/test_rabbitmq_consumers.py +++ b/services/web/server/tests/unit/isolated/notifications/test_rabbitmq_consumers.py @@ -1,5 +1,8 @@ -# pylint: disable=redefined-outer-name # pylint: disable=protected-access +# pylint: disable=redefined-outer-name +# pylint: disable=too-many-arguments +# pylint: disable=unused-argument +# pylint: disable=unused-variable from unittest.mock import AsyncMock @@ -12,51 +15,33 @@ ) from pydantic import BaseModel from pytest_mock import MockerFixture -from simcore_service_webserver.notifications import _rabbitmq_exclusive_queue_consumers +from simcore_service_webserver.notifications._rabbitmq_exclusive_queue_consumers import ( + _progress_message_parser, +) _faker = Faker() -@pytest.fixture -def mock_send_messages(mocker: MockerFixture) -> dict: - reference = {} - - async def mock_send_message(*args) -> None: - reference["args"] = args - - mocker.patch.object( - _rabbitmq_exclusive_queue_consumers, - "send_messages_to_user", - side_effect=mock_send_message, - ) - - return reference - - @pytest.mark.parametrize( "raw_data, class_type", [ pytest.param( ProgressRabbitMessageNode( - **{ - "project_id": _faker.uuid4(cast_to=None), - "user_id": _faker.uuid4(cast_to=None), - "node_id": _faker.uuid4(cast_to=None), - "progress_type": ProgressType.SERVICE_OUTPUTS_PULLING, - "progress": 0.4, - } + project_id=_faker.uuid4(cast_to=None), + user_id=_faker.uuid4(cast_to=None), + node_id=_faker.uuid4(cast_to=None), + progress_type=ProgressType.SERVICE_OUTPUTS_PULLING, + progress=0.4, ).json(), ProgressRabbitMessageNode, id="node_progress", ), pytest.param( ProgressRabbitMessageProject( - **{ - "project_id": _faker.uuid4(cast_to=None), - "user_id": _faker.uuid4(cast_to=None), - "progress_type": ProgressType.PROJECT_CLOSING, - "progress": 0.4, - } + project_id=_faker.uuid4(cast_to=None), + user_id=_faker.uuid4(cast_to=None), + progress_type=ProgressType.PROJECT_CLOSING, + progress=0.4, ).json(), ProgressRabbitMessageProject, id="project_progress", @@ -64,11 +49,19 @@ async def mock_send_message(*args) -> None: ], ) async def test_regression_progress_message_parser( - mock_send_messages: dict, raw_data: bytes, class_type: type[BaseModel] + mocker: MockerFixture, raw_data: bytes, class_type: type[BaseModel] ): - await _rabbitmq_exclusive_queue_consumers._progress_message_parser( - AsyncMock(), raw_data + send_messages_to_user_mock = mocker.patch( + "simcore_service_webserver.notifications._rabbitmq_exclusive_queue_consumers.send_message_to_user", + autospec=True, ) - serialized_sent_data = mock_send_messages["args"][2][0]["data"] + + app = AsyncMock() + assert await _progress_message_parser(app, raw_data) + + # tests how send_message_to_user is called + assert send_messages_to_user_mock.call_count == 1 + message = send_messages_to_user_mock.call_args.kwargs["message"] + # check that all fields are sent as expected - assert class_type.parse_obj(serialized_sent_data) + assert class_type.parse_obj(message["data"]) diff --git a/services/web/server/tests/unit/with_dbs/03/wallets/payments/test_payments.py b/services/web/server/tests/unit/with_dbs/03/wallets/payments/test_payments.py index 4bb54c71570..ec1aad242ff 100644 --- a/services/web/server/tests/unit/with_dbs/03/wallets/payments/test_payments.py +++ b/services/web/server/tests/unit/with_dbs/03/wallets/payments/test_payments.py @@ -75,7 +75,7 @@ async def test_one_time_payment_worfklow( assert settings.PAYMENTS_FAKE_COMPLETION is False send_message = mocker.patch( - "simcore_service_webserver.payments._socketio.send_messages_to_user", + "simcore_service_webserver.payments._socketio.send_message_to_user", autospec=True, ) mock_rut_add_credits_to_wallet = mocker.patch( @@ -152,7 +152,7 @@ async def test_multiple_payments( assert settings.PAYMENTS_FAKE_COMPLETION is False send_message = mocker.patch( - "simcore_service_webserver.payments._socketio.send_messages_to_user", + "simcore_service_webserver.payments._socketio.send_message_to_user", autospec=True, ) mocker.patch( @@ -249,7 +249,7 @@ async def test_complete_payment_errors( ): assert client.app send_message = mocker.patch( - "simcore_service_webserver.payments._socketio.send_messages_to_user", + "simcore_service_webserver.payments._socketio.send_message_to_user", autospec=True, ) diff --git a/services/web/server/tests/unit/with_dbs/03/wallets/payments/test_payments_methods.py b/services/web/server/tests/unit/with_dbs/03/wallets/payments/test_payments_methods.py index a8444574702..87a5189636f 100644 --- a/services/web/server/tests/unit/with_dbs/03/wallets/payments/test_payments_methods.py +++ b/services/web/server/tests/unit/with_dbs/03/wallets/payments/test_payments_methods.py @@ -52,7 +52,7 @@ async def test_payment_method_worfklow( assert settings.PAYMENTS_FAKE_COMPLETION is False send_message = mocker.patch( - "simcore_service_webserver.payments._socketio.send_messages_to_user", + "simcore_service_webserver.payments._socketio.send_message_to_user", autospec=True, ) @@ -342,7 +342,7 @@ async def test_one_time_payment_with_payment_method( assert client.app send_message = mocker.patch( - "simcore_service_webserver.payments._socketio.send_messages_to_user", + "simcore_service_webserver.payments._socketio.send_message_to_user", autospec=True, ) mock_rut_add_credits_to_wallet = mocker.patch( From 38a32332f1e90c11366981149cf04cb886bd4793 Mon Sep 17 00:00:00 2001 From: Sylvain <35365065+sanderegg@users.noreply.github.com> Date: Mon, 19 Feb 2024 09:46:33 +0100 Subject: [PATCH 4/6] =?UTF-8?q?=E2=9C=A8Autoscaling:=20use=20label=20inste?= =?UTF-8?q?ad=20of=20draining=20machine=20(#5340)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .env-devel | 2 + .../core/settings.py | 7 + .../modules/auto_scaling_core.py | 28 ++- .../modules/auto_scaling_mode_base.py | 5 +- .../auto_scaling_mode_computational.py | 6 +- .../modules/auto_scaling_mode_dynamic.py | 8 +- .../utils/utils_docker.py | 59 ++++- services/autoscaling/tests/unit/conftest.py | 104 ++++++-- ...test_modules_auto_scaling_computational.py | 100 +++++--- .../unit/test_modules_auto_scaling_dynamic.py | 222 +++++++++++------- .../tests/unit/test_utils_docker.py | 189 ++++++++++++++- services/docker-compose.yml | 14 +- 12 files changed, 577 insertions(+), 167 deletions(-) diff --git a/.env-devel b/.env-devel index 4870cdd6d78..9f954701e87 100644 --- a/.env-devel +++ b/.env-devel @@ -20,11 +20,13 @@ AGENT_VOLUMES_CLEANUP_S3_PROVIDER=MINIO API_SERVER_DEV_FEATURES_ENABLED=0 AUTOSCALING_DASK=null +AUTOSCALING_DRAIN_NODES_WITH_LABELS=False AUTOSCALING_EC2_ACCESS=null AUTOSCALING_EC2_INSTANCES=null AUTOSCALING_NODES_MONITORING=null AUTOSCALING_POLL_INTERVAL=10 + BF_API_KEY=none BF_API_SECRET=none diff --git a/services/autoscaling/src/simcore_service_autoscaling/core/settings.py b/services/autoscaling/src/simcore_service_autoscaling/core/settings.py index 691ca472751..5fae3e43cca 100644 --- a/services/autoscaling/src/simcore_service_autoscaling/core/settings.py +++ b/services/autoscaling/src/simcore_service_autoscaling/core/settings.py @@ -230,6 +230,13 @@ class ApplicationSettings(BaseCustomSettings, MixinLoggingSettings): AUTOSCALING_PROMETHEUS_INSTRUMENTATION_ENABLED: bool = True + AUTOSCALING_DRAIN_NODES_WITH_LABELS: bool = Field( + default=False, + description="If true, drained nodes" + " are maintained as active (in the docker terminology) " + "but a docker node label named osparc-services-ready is attached", + ) + @cached_property def LOG_LEVEL(self): # noqa: N802 return self.AUTOSCALING_LOGLEVEL diff --git a/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_core.py b/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_core.py index 34bd5e916db..d9b0d5ed5a8 100644 --- a/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_core.py +++ b/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_core.py @@ -50,6 +50,11 @@ _logger = logging.getLogger(__name__) +def _node_not_ready(node: Node) -> bool: + assert node.Status # nosec + return bool(node.Status.State != NodeState.ready) + + async def _analyze_current_cluster( app: FastAPI, auto_scaling_mode: BaseAutoscaling ) -> Cluster: @@ -75,10 +80,7 @@ async def _analyze_current_cluster( docker_nodes, existing_ec2_instances ) - def _node_not_ready(node: Node) -> bool: - assert node.Status # nosec - return bool(node.Status.State != NodeState.ready) - + # analyse attached ec2s active_nodes, pending_nodes, all_drained_nodes = [], [], [] for instance in attached_ec2s: if await auto_scaling_mode.is_instance_active(app, instance): @@ -152,14 +154,14 @@ async def _try_attach_pending_ec2s( if new_node := await utils_docker.find_node_with_name( get_docker_client(app), node_host_name ): - # it is attached, let's label it, but keep it as drained - new_node = await utils_docker.tag_node( + # it is attached, let's label it + new_node = await utils_docker.attach_node( + app_settings, get_docker_client(app), new_node, tags=auto_scaling_mode.get_new_node_docker_tags( app, instance_data.ec2_instance ), - available=False, ) new_found_instances.append( AssociatedInstance( @@ -221,9 +223,11 @@ async def _activate_and_notify( auto_scaling_mode: BaseAutoscaling, drained_node: AssociatedInstance, ) -> None: + app_settings = get_application_settings(app) + docker_client = get_docker_client(app) await asyncio.gather( - utils_docker.set_node_availability( - get_docker_client(app), drained_node.node, available=True + utils_docker.set_node_osparc_ready( + app_settings, docker_client, drained_node.node, ready=True ), auto_scaling_mode.log_message_from_tasks( app, @@ -682,6 +686,7 @@ async def _scale_up_cluster( async def _deactivate_empty_nodes(app: FastAPI, cluster: Cluster) -> Cluster: + app_settings = get_application_settings(app) docker_client = get_docker_client(app) active_empty_instances: list[AssociatedInstance] = [] active_non_empty_instances: list[AssociatedInstance] = [] @@ -700,10 +705,11 @@ async def _deactivate_empty_nodes(app: FastAPI, cluster: Cluster) -> Cluster: # drain this empty nodes updated_nodes: list[Node] = await asyncio.gather( *( - utils_docker.set_node_availability( + utils_docker.set_node_osparc_ready( + app_settings, docker_client, node.node, - available=False, + ready=False, ) for node in active_empty_instances ) diff --git a/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_mode_base.py b/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_mode_base.py index 070dad548f9..4de40db4a1d 100644 --- a/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_mode_base.py +++ b/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_mode_base.py @@ -4,7 +4,6 @@ from aws_library.ec2.models import EC2InstanceData, EC2Tags, Resources from fastapi import FastAPI from models_library.docker import DockerLabelKey -from models_library.generated_models.docker_rest_api import Availability from models_library.generated_models.docker_rest_api import Node as DockerNode from servicelib.logging_utils import LogLevelInt from types_aiobotocore_ec2.literals import InstanceTypeType @@ -89,9 +88,7 @@ async def is_instance_active(app: FastAPI, instance: AssociatedInstance) -> bool @staticmethod def is_instance_drained(instance: AssociatedInstance) -> bool: - return utils_docker.is_node_ready_and_available( - instance.node, Availability.drain - ) + return not utils_docker.is_node_osparc_ready(instance.node) @staticmethod @abstractmethod diff --git a/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_mode_computational.py b/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_mode_computational.py index 1e332b1d890..1c1d7c6b970 100644 --- a/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_mode_computational.py +++ b/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_mode_computational.py @@ -9,7 +9,7 @@ DOCKER_TASK_EC2_INSTANCE_TYPE_PLACEMENT_CONSTRAINT_KEY, DockerLabelKey, ) -from models_library.generated_models.docker_rest_api import Availability, Node +from models_library.generated_models.docker_rest_api import Node from pydantic import AnyUrl, ByteSize from servicelib.logging_utils import LogLevelInt from servicelib.utils import logged_gather @@ -155,9 +155,7 @@ async def compute_cluster_total_resources( @staticmethod async def is_instance_active(app: FastAPI, instance: AssociatedInstance) -> bool: - if not utils_docker.is_node_ready_and_available( - instance.node, Availability.active - ): + if not utils_docker.is_node_osparc_ready(instance.node): return False # now check if dask-scheduler is available diff --git a/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_mode_dynamic.py b/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_mode_dynamic.py index 76e808b1b60..e1d356d9b06 100644 --- a/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_mode_dynamic.py +++ b/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_mode_dynamic.py @@ -1,7 +1,7 @@ from aws_library.ec2.models import EC2InstanceData, EC2Tags, Resources from fastapi import FastAPI from models_library.docker import DockerLabelKey -from models_library.generated_models.docker_rest_api import Availability, Node, Task +from models_library.generated_models.docker_rest_api import Node, Task from servicelib.logging_utils import LogLevelInt from types_aiobotocore_ec2.literals import InstanceTypeType @@ -33,7 +33,7 @@ def get_new_node_docker_tags( app: FastAPI, ec2_instance_data: EC2InstanceData ) -> dict[DockerLabelKey, str]: app_settings = get_application_settings(app) - return utils_docker.get__new_node_docker_tags(app_settings, ec2_instance_data) + return utils_docker.get_new_node_docker_tags(app_settings, ec2_instance_data) @staticmethod async def list_unrunnable_tasks(app: FastAPI) -> list[Task]: @@ -100,9 +100,7 @@ async def compute_cluster_total_resources( @staticmethod async def is_instance_active(app: FastAPI, instance: AssociatedInstance) -> bool: assert app # nosec - return utils_docker.is_node_ready_and_available( - instance.node, Availability.active - ) + return utils_docker.is_node_osparc_ready(instance.node) @staticmethod async def try_retire_nodes(app: FastAPI) -> None: diff --git a/services/autoscaling/src/simcore_service_autoscaling/utils/utils_docker.py b/services/autoscaling/src/simcore_service_autoscaling/utils/utils_docker.py index d5d8d4f2cfe..ae3e810be7b 100644 --- a/services/autoscaling/src/simcore_service_autoscaling/utils/utils_docker.py +++ b/services/autoscaling/src/simcore_service_autoscaling/utils/utils_docker.py @@ -9,6 +9,7 @@ import logging import re from contextlib import suppress +from copy import deepcopy from pathlib import Path from typing import Final, cast @@ -523,7 +524,7 @@ async def set_node_availability( ) -def get__new_node_docker_tags( +def get_new_node_docker_tags( app_settings: ApplicationSettings, ec2_instance: EC2InstanceData ) -> dict[DockerLabelKey, str]: assert app_settings.AUTOSCALING_NODES_MONITORING # nosec @@ -540,9 +541,63 @@ def get__new_node_docker_tags( ) -def is_node_ready_and_available(node: Node, availability: Availability) -> bool: +def is_node_ready_and_available(node: Node, *, availability: Availability) -> bool: assert node.Status # nosec assert node.Spec # nosec return bool( node.Status.State == NodeState.ready and node.Spec.Availability == availability ) + + +_OSPARC_SERVICE_READY_LABEL_KEY: Final[DockerLabelKey] = parse_obj_as( + DockerLabelKey, "osparc-services-ready" +) + + +def is_node_osparc_ready(node: Node) -> bool: + if not is_node_ready_and_available(node, availability=Availability.active): + return False + assert node.Spec # nosec + return bool( + node.Spec.Labels + and _OSPARC_SERVICE_READY_LABEL_KEY in node.Spec.Labels + and node.Spec.Labels[_OSPARC_SERVICE_READY_LABEL_KEY] == "true" + ) + + +async def set_node_osparc_ready( + app_settings: ApplicationSettings, + docker_client: AutoscalingDocker, + node: Node, + *, + ready: bool, +) -> Node: + assert node.Spec # nosec + new_tags = deepcopy(cast(dict[DockerLabelKey, str], node.Spec.Labels)) + new_tags[_OSPARC_SERVICE_READY_LABEL_KEY] = "true" if ready else "false" + # NOTE: docker drain sometimes impeed on performance when undraining see https://github.com/ITISFoundation/osparc-simcore/issues/5339 + available = app_settings.AUTOSCALING_DRAIN_NODES_WITH_LABELS or ready + return await tag_node( + docker_client, + node, + tags=new_tags, + available=available, + ) + + +async def attach_node( + app_settings: ApplicationSettings, + docker_client: AutoscalingDocker, + node: Node, + *, + tags: dict[DockerLabelKey, str], +) -> Node: + assert node.Spec # nosec + current_tags = cast(dict[DockerLabelKey, str], node.Spec.Labels or {}) + new_tags = current_tags | tags | {_OSPARC_SERVICE_READY_LABEL_KEY: "false"} + return await tag_node( + docker_client, + node, + tags=new_tags, + available=app_settings.AUTOSCALING_DRAIN_NODES_WITH_LABELS, # NOTE: full drain sometimes impede on performance + ) diff --git a/services/autoscaling/tests/unit/conftest.py b/services/autoscaling/tests/unit/conftest.py index 8c6df4c75e2..2dab933bbdd 100644 --- a/services/autoscaling/tests/unit/conftest.py +++ b/services/autoscaling/tests/unit/conftest.py @@ -26,11 +26,13 @@ from fakeredis.aioredis import FakeRedis from fastapi import FastAPI from models_library.docker import DockerLabelKey, StandardSimcoreDockerLabels +from models_library.generated_models.docker_rest_api import Availability +from models_library.generated_models.docker_rest_api import Node as DockerNode from models_library.generated_models.docker_rest_api import ( - Availability, - Node, NodeDescription, NodeSpec, + NodeState, + NodeStatus, ObjectVersion, ResourceObject, Service, @@ -100,7 +102,31 @@ def mocked_ec2_server_envs( f"{AUTOSCALING_ENV_PREFIX}{k}": v for k, v in mocked_ec2_server_settings.dict().items() } - return setenvs_from_dict(monkeypatch, changed_envs) + return setenvs_from_dict(monkeypatch, changed_envs) # type: ignore + + +@pytest.fixture( + params=[ + "with_AUTOSCALING_DRAIN_NODES_WITH_LABELS", + "without_AUTOSCALING_DRAIN_NODES_WITH_LABELS", + ] +) +def with_drain_nodes_labelled(request: pytest.FixtureRequest) -> bool: + return bool(request.param == "with_AUTOSCALING_DRAIN_NODES_WITH_LABELS") + + +@pytest.fixture +def with_labelize_drain_nodes( + app_environment: EnvVarsDict, + monkeypatch: pytest.MonkeyPatch, + with_drain_nodes_labelled: bool, +) -> EnvVarsDict: + return app_environment | setenvs_from_dict( + monkeypatch, + { + "AUTOSCALING_DRAIN_NODES_WITH_LABELS": f"{with_drain_nodes_labelled}", + }, + ) @pytest.fixture @@ -292,15 +318,36 @@ async def async_docker_client() -> AsyncIterator[aiodocker.Docker]: async def host_node( docker_swarm: None, async_docker_client: aiodocker.Docker, -) -> Node: - nodes = parse_obj_as(list[Node], await async_docker_client.nodes.list()) +) -> AsyncIterator[DockerNode]: + nodes = parse_obj_as(list[DockerNode], await async_docker_client.nodes.list()) assert len(nodes) == 1 - return nodes[0] + old_node = deepcopy(nodes[0]) + assert old_node.Spec + assert old_node.Spec.Role + assert old_node.Spec.Availability + yield nodes[0] + # revert state + assert old_node.ID + current_node = parse_obj_as( + DockerNode, await async_docker_client.nodes.inspect(node_id=old_node.ID) + ) + assert current_node.ID + assert current_node.Version + assert current_node.Version.Index + await async_docker_client.nodes.update( + node_id=current_node.ID, + version=current_node.Version.Index, + spec={ + "Availability": old_node.Spec.Availability.value, + "Labels": old_node.Spec.Labels, + "Role": old_node.Spec.Role.value, + }, + ) @pytest.fixture -def create_fake_node(faker: Faker) -> Callable[..., Node]: - def _creator(**node_overrides) -> Node: +def create_fake_node(faker: Faker) -> Callable[..., DockerNode]: + def _creator(**node_overrides) -> DockerNode: default_config = { "ID": faker.uuid4(), "Version": ObjectVersion(Index=faker.pyint()), @@ -314,19 +361,20 @@ def _creator(**node_overrides) -> Node: ), "Spec": NodeSpec( Name=None, - Labels=None, + Labels=faker.pydict(allowed_types=(str,)), Role=None, Availability=Availability.drain, ), + "Status": NodeStatus(State=NodeState.unknown, Message=None, Addr=None), } default_config.update(**node_overrides) - return Node(**default_config) + return DockerNode(**default_config) return _creator @pytest.fixture -def fake_node(create_fake_node: Callable[..., Node]) -> Node: +def fake_node(create_fake_node: Callable[..., DockerNode]) -> DockerNode: return create_fake_node() @@ -390,7 +438,7 @@ async def _creator( placement_constraints: list[str] | None = None, ) -> Service: service_name = f"pytest_{faker.pystr()}" - base_labels = {} + base_labels: dict[DockerLabelKey, Any] = {} task_labels = task_template.setdefault("ContainerSpec", {}).setdefault( "Labels", base_labels ) @@ -404,7 +452,7 @@ async def _creator( service = await async_docker_client.services.create( task_template=task_template, name=service_name, - labels=base_labels, + labels=base_labels, # type: ignore ) assert service service = parse_obj_as( @@ -541,7 +589,7 @@ def aws_allowed_ec2_instance_type_names_env( monkeypatch: pytest.MonkeyPatch, aws_allowed_ec2_instance_type_names: list[InstanceTypeType], ) -> EnvVarsDict: - changed_envs = { + changed_envs: dict[str, str | bool] = { "EC2_INSTANCES_ALLOWED_TYPES": json.dumps(aws_allowed_ec2_instance_type_names), } return app_environment | setenvs_from_dict(monkeypatch, changed_envs) @@ -630,8 +678,8 @@ def _creator(required_resources: DaskTaskResources) -> distributed.Future: @pytest.fixture def mock_docker_set_node_availability(mocker: MockerFixture) -> mock.Mock: async def _fake_set_node_availability( - docker_client: AutoscalingDocker, node: Node, *, available: bool - ) -> Node: + docker_client: AutoscalingDocker, node: DockerNode, *, available: bool + ) -> DockerNode: returned_node = deepcopy(node) assert returned_node.Spec returned_node.Spec.Availability = ( @@ -647,3 +695,27 @@ async def _fake_set_node_availability( autospec=True, side_effect=_fake_set_node_availability, ) + + +@pytest.fixture +def mock_docker_tag_node(mocker: MockerFixture) -> mock.Mock: + async def fake_tag_node( + docker_client: AutoscalingDocker, + node: DockerNode, + *, + tags: dict[DockerLabelKey, str], + available: bool, + ) -> DockerNode: + updated_node = deepcopy(node) + assert updated_node.Spec + updated_node.Spec.Labels = deepcopy(cast(dict[str, str], tags)) + updated_node.Spec.Availability = ( + Availability.active if available else Availability.drain + ) + return updated_node + + return mocker.patch( + "simcore_service_autoscaling.modules.auto_scaling_core.utils_docker.tag_node", + autospec=True, + side_effect=fake_tag_node, + ) diff --git a/services/autoscaling/tests/unit/test_modules_auto_scaling_computational.py b/services/autoscaling/tests/unit/test_modules_auto_scaling_computational.py index 641d2792321..2e09740829d 100644 --- a/services/autoscaling/tests/unit/test_modules_auto_scaling_computational.py +++ b/services/autoscaling/tests/unit/test_modules_auto_scaling_computational.py @@ -12,6 +12,7 @@ import logging from collections import defaultdict from collections.abc import Callable, Iterator +from copy import deepcopy from dataclasses import dataclass from typing import Any from unittest import mock @@ -40,6 +41,9 @@ ) from simcore_service_autoscaling.modules.dask import DaskTaskResources from simcore_service_autoscaling.modules.docker import get_docker_client +from simcore_service_autoscaling.utils.utils_docker import ( + _OSPARC_SERVICE_READY_LABEL_KEY, +) from types_aiobotocore_ec2.client import EC2Client from types_aiobotocore_ec2.literals import InstanceTypeType @@ -60,6 +64,7 @@ def local_dask_scheduler_server_envs( @pytest.fixture def minimal_configuration( + with_labelize_drain_nodes: EnvVarsDict, docker_swarm: None, mocked_ec2_server_envs: EnvVarsDict, enabled_computational_mode: EnvVarsDict, @@ -160,18 +165,6 @@ def _assert_rabbit_autoscaling_message_sent( ) -@pytest.fixture -def mock_docker_tag_node(mocker: MockerFixture) -> mock.Mock: - async def fake_tag_node(*args, **kwargs) -> DockerNode: - return args[1] - - return mocker.patch( - "simcore_service_autoscaling.modules.auto_scaling_core.utils_docker.tag_node", - autospec=True, - side_effect=fake_tag_node, - ) - - @pytest.fixture def mock_docker_find_node_with_name( mocker: MockerFixture, fake_node: DockerNode @@ -392,6 +385,7 @@ async def test_cluster_scaling_up_and_down( # noqa: PLR0915 dask_task_imposed_ec2_type: InstanceTypeType | None, dask_ram: ByteSize | None, expected_ec2_type: InstanceTypeType, + with_drain_nodes_labelled: bool, ): # we have nothing running now all_instances = await ec2_client.describe_instances() @@ -458,39 +452,66 @@ async def test_cluster_scaling_up_and_down( # noqa: PLR0915 assert len(internal_dns_names) == 1 internal_dns_name = internal_dns_names[0].removesuffix(".ec2.internal") - # the node is tagged and made active right away since we still have the pending task + # the node is attached first and then tagged and made active right away since we still have the pending task mock_docker_find_node_with_name.assert_called_once() mock_docker_find_node_with_name.reset_mock() expected_docker_node_tags = { DOCKER_TASK_EC2_INSTANCE_TYPE_PLACEMENT_CONSTRAINT_KEY: expected_ec2_type } - mock_docker_tag_node.assert_called_once_with( - get_docker_client(initialized_app), - fake_node, - tags=expected_docker_node_tags, - available=False, + assert mock_docker_tag_node.call_count == 2 + assert fake_node.Spec + assert fake_node.Spec.Labels + fake_attached_node = deepcopy(fake_node) + assert fake_attached_node.Spec + fake_attached_node.Spec.Availability = ( + Availability.active if with_drain_nodes_labelled else Availability.drain + ) + assert fake_attached_node.Spec.Labels + fake_attached_node.Spec.Labels |= expected_docker_node_tags | { + _OSPARC_SERVICE_READY_LABEL_KEY: "false" + } + mock_docker_tag_node.assert_has_calls( + ( + # attach node call + mock.call( + get_docker_client(initialized_app), + fake_node, + tags=fake_node.Spec.Labels + | expected_docker_node_tags + | {_OSPARC_SERVICE_READY_LABEL_KEY: "false"}, + available=with_drain_nodes_labelled, + ), + mock.call( + get_docker_client(initialized_app), + fake_attached_node, + tags=fake_node.Spec.Labels + | expected_docker_node_tags + | {_OSPARC_SERVICE_READY_LABEL_KEY: "true"}, + available=True, + ), + ) ) mock_docker_tag_node.reset_mock() - mock_docker_set_node_availability.assert_called_once_with( - get_docker_client(initialized_app), fake_node, available=True - ) - mock_docker_set_node_availability.reset_mock() + mock_docker_set_node_availability.assert_not_called() mock_rabbitmq_post_message.assert_called_once() mock_rabbitmq_post_message.reset_mock() # now we have 1 monitored node that needs to be mocked + fake_attached_node.Spec.Labels[_OSPARC_SERVICE_READY_LABEL_KEY] = "true" + fake_attached_node.Status = NodeStatus( + State=NodeState.ready, Message=None, Addr=None + ) + fake_attached_node.Spec.Availability = Availability.active + assert fake_attached_node.Description + fake_attached_node.Description.Hostname = internal_dns_name + auto_scaling_mode = ComputationalAutoscaling() mocker.patch.object( auto_scaling_mode, "get_monitored_nodes", autospec=True, - return_value=[fake_node], + return_value=[fake_attached_node], ) - fake_node.Status = NodeStatus(State=NodeState.ready, Message=None, Addr=None) - assert fake_node.Spec - fake_node.Spec.Availability = Availability.active - assert fake_node.Description - fake_node.Description.Hostname = internal_dns_name # 3. calling this multiple times should do nothing num_useless_calls = 10 @@ -541,10 +562,16 @@ async def test_cluster_scaling_up_and_down( # noqa: PLR0915 assert mock_dask_get_worker_used_resources.call_count == 2 mock_dask_get_worker_used_resources.reset_mock() # the node shall be set to drain, but not yet terminated - mock_docker_set_node_availability.assert_called_once_with( - get_docker_client(initialized_app), fake_node, available=False + mock_docker_set_node_availability.assert_not_called() + mock_docker_tag_node.assert_called_once_with( + get_docker_client(initialized_app), + fake_attached_node, + tags=fake_attached_node.Spec.Labels + | {_OSPARC_SERVICE_READY_LABEL_KEY: "false"}, + available=with_drain_nodes_labelled, ) - mock_docker_set_node_availability.reset_mock() + mock_docker_tag_node.reset_mock() + await _assert_ec2_instances( ec2_client, num_reservations=1, @@ -554,8 +581,11 @@ async def test_cluster_scaling_up_and_down( # noqa: PLR0915 ) # we artifically set the node to drain - fake_node.Spec.Availability = Availability.drain - fake_node.UpdatedAt = datetime.datetime.now(tz=datetime.timezone.utc).isoformat() + fake_attached_node.Spec.Availability = Availability.drain + fake_attached_node.Spec.Labels[_OSPARC_SERVICE_READY_LABEL_KEY] = "false" + fake_attached_node.UpdatedAt = datetime.datetime.now( + tz=datetime.timezone.utc + ).isoformat() # the node will be not be terminated beforet the timeout triggers assert app_settings.AUTOSCALING_EC2_INSTANCES assert ( @@ -578,14 +608,14 @@ async def test_cluster_scaling_up_and_down( # noqa: PLR0915 ) # now changing the last update timepoint will trigger the node removal and shutdown the ec2 instance - fake_node.UpdatedAt = ( + fake_attached_node.UpdatedAt = ( datetime.datetime.now(tz=datetime.timezone.utc) - app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_TIME_BEFORE_TERMINATION - datetime.timedelta(seconds=1) ).isoformat() await auto_scale_cluster(app=initialized_app, auto_scaling_mode=auto_scaling_mode) mocked_docker_remove_node.assert_called_once_with( - mock.ANY, nodes=[fake_node], force=True + mock.ANY, nodes=[fake_attached_node], force=True ) await _assert_ec2_instances( ec2_client, diff --git a/services/autoscaling/tests/unit/test_modules_auto_scaling_dynamic.py b/services/autoscaling/tests/unit/test_modules_auto_scaling_dynamic.py index 6b11152ab09..5c6ce8c03ed 100644 --- a/services/autoscaling/tests/unit/test_modules_auto_scaling_dynamic.py +++ b/services/autoscaling/tests/unit/test_modules_auto_scaling_dynamic.py @@ -5,11 +5,11 @@ # pylint: disable=too-many-arguments # pylint: disable=too-many-statements - import asyncio import base64 import datetime from collections.abc import AsyncIterator, Awaitable, Callable, Iterator +from copy import deepcopy from dataclasses import dataclass from typing import Any from unittest import mock @@ -52,6 +52,9 @@ AutoscalingDocker, get_docker_client, ) +from simcore_service_autoscaling.utils.utils_docker import ( + _OSPARC_SERVICE_READY_LABEL_KEY, +) from types_aiobotocore_ec2.client import EC2Client from types_aiobotocore_ec2.literals import InstanceTypeType @@ -95,18 +98,6 @@ def mock_find_node_with_name( ) -@pytest.fixture -def mock_tag_node(mocker: MockerFixture) -> mock.Mock: - async def fake_tag_node(*args, **kwargs) -> Node: - return args[1] - - return mocker.patch( - "simcore_service_autoscaling.modules.auto_scaling_core.utils_docker.tag_node", - autospec=True, - side_effect=fake_tag_node, - ) - - @pytest.fixture def mock_remove_nodes(mocker: MockerFixture) -> mock.Mock: return mocker.patch( @@ -188,6 +179,7 @@ async def drained_host_node( @pytest.fixture def minimal_configuration( + with_labelize_drain_nodes: EnvVarsDict, docker_swarm: None, mocked_ec2_server_envs: EnvVarsDict, enabled_dynamic_mode: EnvVarsDict, @@ -247,7 +239,7 @@ async def test_cluster_scaling_from_labelled_services_with_no_services_and_machi mock_rabbitmq_post_message: mock.Mock, mock_compute_node_used_resources: mock.Mock, mock_find_node_with_name: mock.Mock, - mock_tag_node: mock.Mock, + mock_docker_tag_node: mock.Mock, fake_node: Node, ec2_client: EC2Client, ): @@ -446,7 +438,7 @@ async def test_cluster_scaling_up_and_down( # noqa: PLR0915 task_template: dict[str, Any], create_task_reservations: Callable[[int, int], dict[str, Any]], ec2_client: EC2Client, - mock_tag_node: mock.Mock, + mock_docker_tag_node: mock.Mock, fake_node: Node, mock_rabbitmq_post_message: mock.Mock, mock_find_node_with_name: mock.Mock, @@ -457,6 +449,7 @@ async def test_cluster_scaling_up_and_down( # noqa: PLR0915 docker_service_ram: ByteSize, expected_ec2_type: InstanceTypeType, async_docker_client: aiodocker.Docker, + with_drain_nodes_labelled: bool, ): # we have nothing running now all_instances = await ec2_client.describe_instances() @@ -467,11 +460,13 @@ async def test_cluster_scaling_up_and_down( # noqa: PLR0915 task_template | create_task_reservations(4, docker_service_ram), service_monitored_labels, "pending", - [ - f"node.labels.{DOCKER_TASK_EC2_INSTANCE_TYPE_PLACEMENT_CONSTRAINT_KEY}=={ docker_service_imposed_ec2_type}" - ] - if docker_service_imposed_ec2_type - else [], + ( + [ + f"node.labels.{DOCKER_TASK_EC2_INSTANCE_TYPE_PLACEMENT_CONSTRAINT_KEY}=={ docker_service_imposed_ec2_type}" + ] + if docker_service_imposed_ec2_type + else [] + ), ) # this should trigger a scaling up as we have no nodes @@ -490,7 +485,7 @@ async def test_cluster_scaling_up_and_down( # noqa: PLR0915 # as the new node is already running, but is not yet connected, hence not tagged and drained mock_find_node_with_name.assert_not_called() - mock_tag_node.assert_not_called() + mock_docker_tag_node.assert_not_called() mock_docker_set_node_availability.assert_not_called() mock_compute_node_used_resources.assert_not_called() # check rabbit messages were sent @@ -507,9 +502,28 @@ async def test_cluster_scaling_up_and_down( # noqa: PLR0915 await auto_scale_cluster( app=initialized_app, auto_scaling_mode=DynamicAutoscaling() ) + + fake_attached_node = deepcopy(fake_node) + assert fake_attached_node.Spec + fake_attached_node.Spec.Availability = ( + Availability.active if with_drain_nodes_labelled else Availability.drain + ) + assert fake_attached_node.Spec.Labels + assert app_settings.AUTOSCALING_NODES_MONITORING + expected_docker_node_tags = { + tag_key: "true" + for tag_key in ( + app_settings.AUTOSCALING_NODES_MONITORING.NODES_MONITORING_NODE_LABELS + + app_settings.AUTOSCALING_NODES_MONITORING.NODES_MONITORING_NEW_NODES_LABELS + ) + } | {DOCKER_TASK_EC2_INSTANCE_TYPE_PLACEMENT_CONSTRAINT_KEY: expected_ec2_type} + fake_attached_node.Spec.Labels |= expected_docker_node_tags | { + _OSPARC_SERVICE_READY_LABEL_KEY: "false" + } + mock_compute_node_used_resources.assert_called_once_with( get_docker_client(initialized_app), - fake_node, + fake_attached_node, ) mock_compute_node_used_resources.reset_mock() # check the number of instances did not change and is still running @@ -526,30 +540,39 @@ async def test_cluster_scaling_up_and_down( # noqa: PLR0915 # the node is tagged and made active right away since we still have the pending task mock_find_node_with_name.assert_called_once() mock_find_node_with_name.reset_mock() - assert app_settings.AUTOSCALING_NODES_MONITORING - expected_docker_node_tags = { - tag_key: "true" - for tag_key in ( - app_settings.AUTOSCALING_NODES_MONITORING.NODES_MONITORING_NODE_LABELS - + app_settings.AUTOSCALING_NODES_MONITORING.NODES_MONITORING_NEW_NODES_LABELS + + assert mock_docker_tag_node.call_count == 2 + assert fake_node.Spec + assert fake_node.Spec.Labels + + mock_docker_tag_node.assert_has_calls( + ( + # attach node call + mock.call( + get_docker_client(initialized_app), + fake_node, + tags=fake_node.Spec.Labels + | expected_docker_node_tags + | {_OSPARC_SERVICE_READY_LABEL_KEY: "false"}, + available=with_drain_nodes_labelled, + ), + mock.call( + get_docker_client(initialized_app), + fake_attached_node, + tags=fake_node.Spec.Labels + | expected_docker_node_tags + | {_OSPARC_SERVICE_READY_LABEL_KEY: "true"}, + available=True, + ), ) - } | {DOCKER_TASK_EC2_INSTANCE_TYPE_PLACEMENT_CONSTRAINT_KEY: expected_ec2_type} - mock_tag_node.assert_called_once_with( - get_docker_client(initialized_app), - fake_node, - tags=expected_docker_node_tags, - available=False, ) - mock_tag_node.reset_mock() - mock_docker_set_node_availability.assert_called_once_with( - get_docker_client(initialized_app), fake_node, available=True - ) - mock_docker_set_node_availability.reset_mock() + mock_docker_tag_node.reset_mock() + mock_docker_set_node_availability.assert_not_called() # check rabbit messages were sent, we do have worker - assert fake_node.Description - assert fake_node.Description.Resources - assert fake_node.Description.Resources.NanoCPUs + assert fake_attached_node.Description + assert fake_attached_node.Description.Resources + assert fake_attached_node.Description.Resources.NanoCPUs _assert_rabbit_autoscaling_message_sent( mock_rabbitmq_post_message, app_settings, @@ -557,8 +580,8 @@ async def test_cluster_scaling_up_and_down( # noqa: PLR0915 nodes_total=1, nodes_active=1, cluster_total_resources={ - "cpus": fake_node.Description.Resources.NanoCPUs / 1e9, - "ram": fake_node.Description.Resources.MemoryBytes, + "cpus": fake_attached_node.Description.Resources.NanoCPUs / 1e9, + "ram": fake_attached_node.Description.Resources.MemoryBytes, }, cluster_used_resources={ "cpus": float(0), @@ -569,27 +592,32 @@ async def test_cluster_scaling_up_and_down( # noqa: PLR0915 mock_rabbitmq_post_message.reset_mock() # now we have 1 monitored node that needs to be mocked + fake_attached_node.Spec.Labels[_OSPARC_SERVICE_READY_LABEL_KEY] = "true" + fake_attached_node.Status = NodeStatus( + State=NodeState.ready, Message=None, Addr=None + ) + fake_attached_node.Spec.Availability = Availability.active + fake_attached_node.Description.Hostname = internal_dns_name + auto_scaling_mode = DynamicAutoscaling() mocker.patch.object( auto_scaling_mode, "get_monitored_nodes", autospec=True, - return_value=[fake_node], + return_value=[fake_attached_node], ) - fake_node.Status = NodeStatus(State=NodeState.ready, Message=None, Addr=None) - assert fake_node.Spec - fake_node.Spec.Availability = Availability.active - assert fake_node.Description - fake_node.Description.Hostname = internal_dns_name # 3. calling this multiple times should do nothing - for _ in range(10): + num_useless_calls = 10 + for _ in range(num_useless_calls): await auto_scale_cluster( app=initialized_app, auto_scaling_mode=auto_scaling_mode ) mock_compute_node_used_resources.assert_called() + assert mock_compute_node_used_resources.call_count == num_useless_calls * 2 + mock_compute_node_used_resources.reset_mock() mock_find_node_with_name.assert_not_called() - mock_tag_node.assert_not_called() + mock_docker_tag_node.assert_not_called() mock_docker_set_node_availability.assert_not_called() # check the number of instances did not change and is still running await _assert_ec2_instances( @@ -602,6 +630,7 @@ async def test_cluster_scaling_up_and_down( # noqa: PLR0915 # check rabbit messages were sent mock_rabbitmq_post_message.assert_called() + assert mock_rabbitmq_post_message.call_count == num_useless_calls mock_rabbitmq_post_message.reset_mock() # @@ -619,17 +648,28 @@ async def test_cluster_scaling_up_and_down( # noqa: PLR0915 instance_type=expected_ec2_type, instance_state="running", ) - mock_docker_set_node_availability.assert_called_once_with( - get_docker_client(initialized_app), fake_node, available=False + mock_docker_set_node_availability.assert_not_called() + mock_docker_tag_node.assert_called_once_with( + get_docker_client(initialized_app), + fake_attached_node, + tags=fake_attached_node.Spec.Labels + | {_OSPARC_SERVICE_READY_LABEL_KEY: "false"}, + available=with_drain_nodes_labelled, ) - mock_docker_set_node_availability.reset_mock() + mock_docker_tag_node.reset_mock() # calling again does the exact same await auto_scale_cluster(app=initialized_app, auto_scaling_mode=auto_scaling_mode) - mock_docker_set_node_availability.assert_called_once_with( - get_docker_client(initialized_app), fake_node, available=False + mock_docker_set_node_availability.assert_not_called() + mock_docker_tag_node.assert_called_once_with( + get_docker_client(initialized_app), + fake_attached_node, + tags=fake_attached_node.Spec.Labels + | {_OSPARC_SERVICE_READY_LABEL_KEY: "false"}, + available=with_drain_nodes_labelled, ) - mock_docker_set_node_availability.reset_mock() + mock_docker_tag_node.reset_mock() + await _assert_ec2_instances( ec2_client, num_reservations=1, @@ -639,8 +679,11 @@ async def test_cluster_scaling_up_and_down( # noqa: PLR0915 ) # we artifically set the node to drain - fake_node.Spec.Availability = Availability.drain - fake_node.UpdatedAt = datetime.datetime.now(tz=datetime.timezone.utc).isoformat() + fake_attached_node.Spec.Availability = Availability.drain + fake_attached_node.Spec.Labels[_OSPARC_SERVICE_READY_LABEL_KEY] = "false" + fake_attached_node.UpdatedAt = datetime.datetime.now( + tz=datetime.timezone.utc + ).isoformat() # the node will be not be terminated before the timeout triggers assert app_settings.AUTOSCALING_EC2_INSTANCES assert ( @@ -663,14 +706,14 @@ async def test_cluster_scaling_up_and_down( # noqa: PLR0915 ) # now changing the last update timepoint will trigger the node removal and shutdown the ec2 instance - fake_node.UpdatedAt = ( + fake_attached_node.UpdatedAt = ( datetime.datetime.now(tz=datetime.timezone.utc) - app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_TIME_BEFORE_TERMINATION - datetime.timedelta(seconds=1) ).isoformat() await auto_scale_cluster(app=initialized_app, auto_scaling_mode=auto_scaling_mode) mocked_docker_remove_node.assert_called_once_with( - mock.ANY, nodes=[fake_node], force=True + mock.ANY, nodes=[fake_attached_node], force=True ) await _assert_ec2_instances( ec2_client, @@ -731,7 +774,7 @@ async def test_cluster_scaling_up_starts_multiple_instances( task_template: dict[str, Any], create_task_reservations: Callable[[int, int], dict[str, Any]], ec2_client: EC2Client, - mock_tag_node: mock.Mock, + mock_docker_tag_node: mock.Mock, scale_up_params: _ScaleUpParams, mock_rabbitmq_post_message: mock.Mock, mock_find_node_with_name: mock.Mock, @@ -753,11 +796,13 @@ async def test_cluster_scaling_up_starts_multiple_instances( service_monitored_labels | osparc_docker_label_keys.to_simcore_runtime_docker_labels(), "pending", - [ - f"node.labels.{DOCKER_TASK_EC2_INSTANCE_TYPE_PLACEMENT_CONSTRAINT_KEY}=={scale_up_params.imposed_instance_type}" - ] - if scale_up_params.imposed_instance_type - else [], + ( + [ + f"node.labels.{DOCKER_TASK_EC2_INSTANCE_TYPE_PLACEMENT_CONSTRAINT_KEY}=={scale_up_params.imposed_instance_type}" + ] + if scale_up_params.imposed_instance_type + else [] + ), ) for _ in range(scale_up_params.num_services) ) @@ -779,7 +824,7 @@ async def test_cluster_scaling_up_starts_multiple_instances( # as the new node is already running, but is not yet connected, hence not tagged and drained mock_find_node_with_name.assert_not_called() - mock_tag_node.assert_not_called() + mock_docker_tag_node.assert_not_called() mock_docker_set_node_availability.assert_not_called() # check rabbit messages were sent _assert_rabbit_autoscaling_message_sent( @@ -798,6 +843,8 @@ async def test__deactivate_empty_nodes( host_node: Node, fake_ec2_instance_data: Callable[..., EC2InstanceData], mock_docker_set_node_availability: mock.Mock, + mock_docker_tag_node: mock.Mock, + with_drain_nodes_labelled: bool, ): # since we have no service running, we expect the passed node to be set to drain active_cluster = cluster( @@ -808,8 +855,12 @@ async def test__deactivate_empty_nodes( updated_cluster = await _deactivate_empty_nodes(initialized_app, active_cluster) assert not updated_cluster.active_nodes assert len(updated_cluster.drained_nodes) == len(active_cluster.active_nodes) - mock_docker_set_node_availability.assert_called_once_with( - mock.ANY, host_node, available=False + mock_docker_set_node_availability.assert_not_called() + mock_docker_tag_node.assert_called_once_with( + mock.ANY, + host_node, + tags={_OSPARC_SERVICE_READY_LABEL_KEY: "false"}, + available=with_drain_nodes_labelled, ) @@ -820,12 +871,14 @@ async def test__deactivate_empty_nodes_to_drain_when_services_running_are_missin host_node: Node, fake_ec2_instance_data: Callable[..., EC2InstanceData], mock_docker_set_node_availability: mock.Mock, + mock_docker_tag_node: mock.Mock, create_service: Callable[ [dict[str, Any], dict[DockerLabelKey, str], str], Awaitable[Service] ], task_template: dict[str, Any], create_task_reservations: Callable[[int, int], dict[str, Any]], host_cpu_count: int, + with_drain_nodes_labelled: bool, ): # create a service that runs without task labels task_template_that_runs = task_template | create_task_reservations( @@ -844,8 +897,12 @@ async def test__deactivate_empty_nodes_to_drain_when_services_running_are_missin updated_cluster = await _deactivate_empty_nodes(initialized_app, active_cluster) assert not updated_cluster.active_nodes assert len(updated_cluster.drained_nodes) == len(active_cluster.active_nodes) - mock_docker_set_node_availability.assert_called_once_with( - mock.ANY, host_node, available=False + mock_docker_set_node_availability.assert_not_called() + mock_docker_tag_node.assert_called_once_with( + mock.ANY, + host_node, + tags={_OSPARC_SERVICE_READY_LABEL_KEY: "false"}, + available=with_drain_nodes_labelled, ) @@ -857,6 +914,7 @@ async def test__deactivate_empty_nodes_does_not_drain_if_service_is_running_with host_node: Node, fake_ec2_instance_data: Callable[..., EC2InstanceData], mock_docker_set_node_availability: mock.Mock, + mock_docker_tag_node: mock.Mock, create_service: Callable[ [dict[str, Any], dict[DockerLabelKey, str], str], Awaitable[Service] ], @@ -907,6 +965,7 @@ async def test__deactivate_empty_nodes_does_not_drain_if_service_is_running_with updated_cluster = await _deactivate_empty_nodes(initialized_app, active_cluster) assert updated_cluster == active_cluster mock_docker_set_node_availability.assert_not_called() + mock_docker_tag_node.assert_not_called() async def test__find_terminateable_nodes_with_no_hosts( @@ -996,7 +1055,7 @@ async def test__activate_drained_nodes_with_no_tasks( initialized_app: FastAPI, host_node: Node, drained_host_node: Node, - mock_tag_node: mock.Mock, + mock_docker_tag_node: mock.Mock, cluster: Callable[..., Cluster], create_associated_instance: Callable[[Node, bool], AssociatedInstance], ): @@ -1020,7 +1079,7 @@ async def test__activate_drained_nodes_with_no_tasks( initialized_app, active_cluster, DynamicAutoscaling() ) assert updated_cluster == active_cluster - mock_tag_node.assert_not_called() + mock_docker_tag_node.assert_not_called() async def test__activate_drained_nodes_with_no_drained_nodes( @@ -1029,7 +1088,7 @@ async def test__activate_drained_nodes_with_no_drained_nodes( autoscaling_docker: AutoscalingDocker, initialized_app: FastAPI, host_node: Node, - mock_tag_node: mock.Mock, + mock_docker_tag_node: mock.Mock, create_service: Callable[ [dict[str, Any], dict[DockerLabelKey, str], str], Awaitable[Service] ], @@ -1062,7 +1121,7 @@ async def test__activate_drained_nodes_with_no_drained_nodes( initialized_app, cluster_without_drained_nodes, DynamicAutoscaling() ) assert updated_cluster == cluster_without_drained_nodes - mock_tag_node.assert_not_called() + mock_docker_tag_node.assert_not_called() async def test__activate_drained_nodes_with_drained_node( @@ -1071,7 +1130,7 @@ async def test__activate_drained_nodes_with_drained_node( autoscaling_docker: AutoscalingDocker, initialized_app: FastAPI, drained_host_node: Node, - mock_tag_node: mock.Mock, + mock_docker_tag_node: mock.Mock, create_service: Callable[ [dict[str, Any], dict[DockerLabelKey, str], str], Awaitable[Service] ], @@ -1112,6 +1171,9 @@ async def test__activate_drained_nodes_with_drained_node( ) assert updated_cluster.active_nodes == cluster_with_drained_nodes.drained_nodes assert drained_host_node.Spec - mock_tag_node.assert_called_once_with( - mock.ANY, drained_host_node, tags=drained_host_node.Spec.Labels, available=True + mock_docker_tag_node.assert_called_once_with( + mock.ANY, + drained_host_node, + tags={_OSPARC_SERVICE_READY_LABEL_KEY: "true"}, + available=True, ) diff --git a/services/autoscaling/tests/unit/test_utils_docker.py b/services/autoscaling/tests/unit/test_utils_docker.py index fc0f3d90e8f..d088388af31 100644 --- a/services/autoscaling/tests/unit/test_utils_docker.py +++ b/services/autoscaling/tests/unit/test_utils_docker.py @@ -12,24 +12,34 @@ import aiodocker import pytest -from aws_library.ec2.models import Resources +from aws_library.ec2.models import EC2InstanceData, Resources from deepdiff import DeepDiff from faker import Faker -from models_library.docker import DockerGenericTag, DockerLabelKey +from models_library.docker import ( + DOCKER_TASK_EC2_INSTANCE_TYPE_PLACEMENT_CONSTRAINT_KEY, + DockerGenericTag, + DockerLabelKey, +) from models_library.generated_models.docker_rest_api import ( Availability, NodeDescription, + NodeSpec, NodeState, + NodeStatus, Service, Task, ) from pydantic import ByteSize, parse_obj_as from pytest_mock.plugin import MockerFixture +from pytest_simcore.helpers.utils_envs import EnvVarsDict from servicelib.docker_utils import to_datetime +from simcore_service_autoscaling.core.settings import ApplicationSettings from simcore_service_autoscaling.modules.docker import AutoscalingDocker from simcore_service_autoscaling.utils.utils_docker import ( + _OSPARC_SERVICE_READY_LABEL_KEY, Node, _by_created_dt, + attach_node, compute_cluster_total_resources, compute_cluster_used_resources, compute_node_used_resources, @@ -40,10 +50,15 @@ get_docker_swarm_join_bash_command, get_max_resources_from_docker_task, get_monitored_nodes, + get_new_node_docker_tags, get_node_total_resources, get_worker_nodes, + is_node_osparc_ready, + is_node_ready_and_available, pending_service_tasks_with_insufficient_resources, remove_nodes, + set_node_availability, + set_node_osparc_ready, tag_node, ) @@ -334,7 +349,7 @@ async def test_pending_service_task_with_insufficient_resources_with_labelled_se # start a service with a part of the labels, we should not find it partial_service_labels = dict(itertools.islice(service_labels.items(), 2)) - _service_with_partial_labels = await create_service( + await create_service( task_template_with_too_many_resource, partial_service_labels, "pending" ) @@ -890,6 +905,52 @@ async def test_tag_node_out_of_sequence_error( assert updated_node2.Version.Index > updated_node.Version.Index +async def test_set_node_availability( + autoscaling_docker: AutoscalingDocker, host_node: Node, faker: Faker +): + assert is_node_ready_and_available(host_node, availability=Availability.active) + updated_node = await set_node_availability( + autoscaling_docker, host_node, available=False + ) + assert is_node_ready_and_available(updated_node, availability=Availability.drain) + updated_node = await set_node_availability( + autoscaling_docker, host_node, available=True + ) + assert is_node_ready_and_available(updated_node, availability=Availability.active) + + +def test_get_new_node_docker_tags( + disabled_rabbitmq: None, + disabled_ec2: None, + mocked_redis_server: None, + enabled_dynamic_mode: EnvVarsDict, + disable_dynamic_service_background_task: None, + app_settings: ApplicationSettings, + fake_ec2_instance_data: Callable[..., EC2InstanceData], +): + ec2_instance_data = fake_ec2_instance_data() + node_docker_tags = get_new_node_docker_tags(app_settings, ec2_instance_data) + assert node_docker_tags + assert DOCKER_TASK_EC2_INSTANCE_TYPE_PLACEMENT_CONSTRAINT_KEY in node_docker_tags + assert app_settings.AUTOSCALING_NODES_MONITORING + for ( + tag_key + ) in app_settings.AUTOSCALING_NODES_MONITORING.NODES_MONITORING_NODE_LABELS: + assert tag_key in node_docker_tags + for ( + tag_key + ) in app_settings.AUTOSCALING_NODES_MONITORING.NODES_MONITORING_NEW_NODES_LABELS: + assert tag_key in node_docker_tags + + all_keys = [ + DOCKER_TASK_EC2_INSTANCE_TYPE_PLACEMENT_CONSTRAINT_KEY, + *app_settings.AUTOSCALING_NODES_MONITORING.NODES_MONITORING_NODE_LABELS, + *app_settings.AUTOSCALING_NODES_MONITORING.NODES_MONITORING_NEW_NODES_LABELS, + ] + for tag_key in node_docker_tags: + assert tag_key in all_keys + + @pytest.mark.parametrize( "images, expected_cmd", [ @@ -943,3 +1004,125 @@ def test_get_docker_pull_images_crontab( interval: datetime.timedelta, expected_cmd: str ): assert get_docker_pull_images_crontab(interval) == expected_cmd + + +def test_is_node_ready_and_available(create_fake_node: Callable[..., Node]): + # check not ready state return false + for node_status in [ + NodeStatus(State=s, Message=None, Addr=None) + for s in NodeState + if s is not NodeState.ready + ]: + fake_node = create_fake_node(Status=node_status) + assert not is_node_ready_and_available( + fake_node, availability=Availability.drain + ) + + node_ready_status = NodeStatus(State=NodeState.ready, Message=None, Addr=None) + fake_drained_node = create_fake_node( + Status=node_ready_status, + Spec=NodeSpec( + Name=None, + Labels=None, + Role=None, + Availability=Availability.drain, + ), + ) + assert is_node_ready_and_available( + fake_drained_node, availability=Availability.drain + ) + assert not is_node_ready_and_available( + fake_drained_node, availability=Availability.active + ) + assert not is_node_ready_and_available( + fake_drained_node, availability=Availability.pause + ) + + +def test_is_node_osparc_ready(create_fake_node: Callable[..., Node], faker: Faker): + fake_node = create_fake_node() + assert fake_node.Spec + assert fake_node.Spec.Availability is Availability.drain + # no labels, not ready and drained + assert not is_node_osparc_ready(fake_node) + # no labels, not ready, but active + fake_node.Spec.Availability = Availability.active + assert not is_node_osparc_ready(fake_node) + # no labels, ready and active + fake_node.Status = NodeStatus(State=NodeState.ready, Message=None, Addr=None) + assert not is_node_osparc_ready(fake_node) + # add some random labels + assert fake_node.Spec + fake_node.Spec.Labels = faker.pydict(allowed_types=(str,)) + assert not is_node_osparc_ready(fake_node) + # add the expected label + fake_node.Spec.Labels[_OSPARC_SERVICE_READY_LABEL_KEY] = "false" + assert not is_node_osparc_ready(fake_node) + # make it ready + fake_node.Spec.Labels[_OSPARC_SERVICE_READY_LABEL_KEY] = "true" + assert is_node_osparc_ready(fake_node) + + +async def test_set_node_osparc_ready( + disabled_rabbitmq: None, + disabled_ec2: None, + mocked_redis_server: None, + enabled_dynamic_mode: EnvVarsDict, + disable_dynamic_service_background_task: None, + app_settings: ApplicationSettings, + autoscaling_docker: AutoscalingDocker, + host_node: Node, +): + # initial state + assert is_node_ready_and_available(host_node, availability=Availability.active) + # set the node to drain + updated_node = await set_node_availability( + autoscaling_docker, host_node, available=False + ) + assert is_node_ready_and_available(updated_node, availability=Availability.drain) + # the node is also not osparc ready + assert not is_node_osparc_ready(updated_node) + + # this implicitely make the node active as well + updated_node = await set_node_osparc_ready( + app_settings, autoscaling_docker, host_node, ready=True + ) + assert is_node_ready_and_available(updated_node, availability=Availability.active) + assert is_node_osparc_ready(updated_node) + # make it not osparc ready + updated_node = await set_node_osparc_ready( + app_settings, autoscaling_docker, host_node, ready=False + ) + assert not is_node_osparc_ready(updated_node) + assert is_node_ready_and_available(updated_node, availability=Availability.drain) + + +async def test_attach_node( + disabled_rabbitmq: None, + disabled_ec2: None, + mocked_redis_server: None, + enabled_dynamic_mode: EnvVarsDict, + disable_dynamic_service_background_task: None, + app_settings: ApplicationSettings, + autoscaling_docker: AutoscalingDocker, + host_node: Node, + faker: Faker, +): + # initial state + assert is_node_ready_and_available(host_node, availability=Availability.active) + # set the node to drain + updated_node = await set_node_availability( + autoscaling_docker, host_node, available=False + ) + assert is_node_ready_and_available(updated_node, availability=Availability.drain) + # now attach the node + updated_node = await attach_node( + app_settings, + autoscaling_docker, + updated_node, + tags=faker.pydict(allowed_types=(str,)), + ) + # expected the node to be active + assert is_node_ready_and_available(host_node, availability=Availability.active) + # but not osparc ready + assert not is_node_osparc_ready(updated_node) diff --git a/services/docker-compose.yml b/services/docker-compose.yml index 10a23cb7cef..ffc1295696f 100644 --- a/services/docker-compose.yml +++ b/services/docker-compose.yml @@ -2,11 +2,10 @@ version: "3.8" x-dask-tls-secrets: &dask_tls_secrets - source: dask_tls_key target: ${DASK_TLS_KEY} - mode: 0444 + mode: 444 - source: dask_tls_cert target: ${DASK_TLS_CERT} - mode: 0444 - + mode: 444 services: api-server: @@ -72,6 +71,7 @@ services: environment: - AUTOSCALING_LOGLEVEL=${AUTOSCALING_LOGLEVEL:-${LOG_LEVEL:-WARNING}} - AUTOSCALING_POLL_INTERVAL=${AUTOSCALING_POLL_INTERVAL} + - AUTOSCALING_DRAIN_NODES_WITH_LABELS=${AUTOSCALING_DRAIN_NODES_WITH_LABELS} - AUTOSCALING_DASK=${AUTOSCALING_DASK} # comp autoscaling - DASK_MONITORING_URL=${DASK_MONITORING_URL} @@ -193,7 +193,7 @@ services: - WORKERS_EC2_INSTANCES_SECURITY_GROUP_IDS=${WORKERS_EC2_INSTANCES_SECURITY_GROUP_IDS} - WORKERS_EC2_INSTANCES_SUBNET_ID=${WORKERS_EC2_INSTANCES_SUBNET_ID} - WORKERS_EC2_INSTANCES_CUSTOM_TAGS=${WORKERS_EC2_INSTANCES_CUSTOM_TAGS} - secrets: *dask_tls_secrets + secrets: *dask_tls_secrets director: image: ${DOCKER_REGISTRY:-itisfoundation}/director:${DOCKER_IMAGE_TAG:-latest} @@ -329,7 +329,7 @@ services: - default - interactive_services_subnet - computational_services_subnet - secrets: *dask_tls_secrets + secrets: *dask_tls_secrets invitations: image: ${DOCKER_REGISTRY:-itisfoundation}/invitations:${DOCKER_IMAGE_TAG:-latest} @@ -912,7 +912,7 @@ services: SIDECAR_COMP_SERVICES_SHARED_FOLDER: ${SIDECAR_COMP_SERVICES_SHARED_FOLDER:-/home/scu/computational_shared_data} networks: - computational_services_subnet - secrets: *dask_tls_secrets + secrets: *dask_tls_secrets dask-scheduler: image: ${DOCKER_REGISTRY:-itisfoundation}/dask-sidecar:${DOCKER_IMAGE_TAG:-latest} @@ -924,7 +924,7 @@ services: networks: - computational_services_subnet - secrets: *dask_tls_secrets + secrets: *dask_tls_secrets datcore-adapter: image: ${DOCKER_REGISTRY:-itisfoundation}/datcore-adapter:${DOCKER_IMAGE_TAG:-latest} From c3c1d6be39bfa7aa2e533d4d30f2abe251888e17 Mon Sep 17 00:00:00 2001 From: Sylvain <35365065+sanderegg@users.noreply.github.com> Date: Tue, 20 Feb 2024 08:17:07 +0100 Subject: [PATCH 5/6] =?UTF-8?q?=F0=9F=90=9B=20Autoscaling:=20issues=20with?= =?UTF-8?q?=20labelled=20drained=20nodes=20(#5348)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Makefile | 2 +- services/autoscaling/requirements/_base.txt | 10 ++ .../core/application.py | 15 ++ .../modules/auto_scaling_core.py | 18 ++- .../utils/utils_docker.py | 19 ++- services/autoscaling/tests/unit/conftest.py | 30 +++- ...test_modules_auto_scaling_computational.py | 94 ++++++++---- .../unit/test_modules_auto_scaling_dynamic.py | 138 ++++++++++++------ 8 files changed, 249 insertions(+), 77 deletions(-) diff --git a/Makefile b/Makefile index 08354f9a610..b823bdf818a 100644 --- a/Makefile +++ b/Makefile @@ -397,7 +397,7 @@ leave: ## Forces to stop all services, networks, etc by the node leaving the swa .PHONY: .init-swarm .init-swarm: # Ensures swarm is initialized - $(if $(SWARM_HOSTS),,docker swarm init --advertise-addr=$(get_my_ip)) + $(if $(SWARM_HOSTS),,docker swarm init --advertise-addr=$(get_my_ip) --default-addr-pool 192.168.0.1/16) ## DOCKER TAGS ------------------------------- diff --git a/services/autoscaling/requirements/_base.txt b/services/autoscaling/requirements/_base.txt index 621da3d81fc..2ceac3a0145 100644 --- a/services/autoscaling/requirements/_base.txt +++ b/services/autoscaling/requirements/_base.txt @@ -159,6 +159,7 @@ fastapi==0.99.1 # -c requirements/../../../requirements/constraints.txt # -r requirements/../../../packages/service-library/requirements/_fastapi.in # -r requirements/_base.in + # prometheus-fastapi-instrumentator frozenlist==1.4.0 # via # aiohttp @@ -273,6 +274,12 @@ partd==1.4.0 # via # -c requirements/../../../services/dask-sidecar/requirements/_dask-distributed.txt # dask +prometheus-client==0.20.0 + # via + # -r requirements/../../../packages/service-library/requirements/_fastapi.in + # prometheus-fastapi-instrumentator +prometheus-fastapi-instrumentator==6.1.0 + # via -r requirements/../../../packages/service-library/requirements/_fastapi.in psutil==5.9.5 # via # -c requirements/../../../services/dask-sidecar/requirements/_dask-distributed.txt @@ -452,6 +459,8 @@ types-aiobotocore==2.7.0 # via -r requirements/../../../packages/aws-library/requirements/_base.in types-aiobotocore-ec2==2.7.0 # via types-aiobotocore +types-aiobotocore-s3==2.7.0 + # via types-aiobotocore types-awscrt==0.19.8 # via botocore-stubs types-python-dateutil==2.8.19.14 @@ -465,6 +474,7 @@ typing-extensions==4.8.0 # typer # types-aiobotocore # types-aiobotocore-ec2 + # types-aiobotocore-s3 # uvicorn urllib3==1.26.16 # via diff --git a/services/autoscaling/src/simcore_service_autoscaling/core/application.py b/services/autoscaling/src/simcore_service_autoscaling/core/application.py index 6bd496f0798..7ceb418de18 100644 --- a/services/autoscaling/src/simcore_service_autoscaling/core/application.py +++ b/services/autoscaling/src/simcore_service_autoscaling/core/application.py @@ -25,10 +25,25 @@ from ..modules.remote_debug import setup_remote_debugging from .settings import ApplicationSettings +_LOG_LEVEL_STEP = logging.CRITICAL - logging.ERROR +_NOISY_LOGGERS = ( + "aiobotocore", + "aio_pika", + "aiormq", + "botocore", +) + logger = logging.getLogger(__name__) def create_app(settings: ApplicationSettings) -> FastAPI: + # keep mostly quiet noisy loggers + quiet_level: int = max( + min(logging.root.level + _LOG_LEVEL_STEP, logging.CRITICAL), logging.WARNING + ) + for name in _NOISY_LOGGERS: + logging.getLogger(name).setLevel(quiet_level) + logger.info("app settings: %s", settings.json(indent=1)) app = FastAPI( diff --git a/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_core.py b/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_core.py index d9b0d5ed5a8..ca9bf5e19a0 100644 --- a/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_core.py +++ b/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_core.py @@ -112,9 +112,19 @@ async def _analyze_current_cluster( terminated_instances=terminated_ec2_instances, disconnected_nodes=[n for n in docker_nodes if _node_not_ready(n)], ) - _logger.debug( + cluster_state = jsonable_encoder( + cluster, + include={ + "active_nodes": True, + "pending_nodes": True, + "drained_nodes": "available_resources", + "reserve_drained_nodes": True, + "pending_ec2s": "ec2_instance", + }, + ) + _logger.warning( "current state: %s", - f"{json.dumps(jsonable_encoder(cluster, include={'active_nodes', 'pending_nodes', 'drained_nodes', 'reserve_drained_nodes', 'pending_ec2s'}), indent=2)}", + f"{json.dumps(cluster_state, indent=2)}", ) return cluster @@ -744,11 +754,11 @@ async def _find_terminateable_instances( terminateable_nodes: list[AssociatedInstance] = [] for instance in cluster.drained_nodes: - assert instance.node.UpdatedAt # nosec - node_last_updated = arrow.get(instance.node.UpdatedAt).datetime + node_last_updated = utils_docker.get_node_last_readyness_update(instance.node) elapsed_time_since_drained = ( datetime.datetime.now(datetime.timezone.utc) - node_last_updated ) + _logger.warning("%s", f"{node_last_updated=}, {elapsed_time_since_drained=}") if ( elapsed_time_since_drained > app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_TIME_BEFORE_TERMINATION diff --git a/services/autoscaling/src/simcore_service_autoscaling/utils/utils_docker.py b/services/autoscaling/src/simcore_service_autoscaling/utils/utils_docker.py index ae3e810be7b..f5639eac8ce 100644 --- a/services/autoscaling/src/simcore_service_autoscaling/utils/utils_docker.py +++ b/services/autoscaling/src/simcore_service_autoscaling/utils/utils_docker.py @@ -13,6 +13,7 @@ from pathlib import Path from typing import Final, cast +import arrow import yaml from aws_library.ec2.models import EC2InstanceData, Resources from models_library.docker import ( @@ -110,7 +111,7 @@ def _check_if_node_is_removable(node: Node) -> bool: def _is_task_waiting_for_resources(task: Task) -> bool: # NOTE: https://docs.docker.com/engine/swarm/how-swarm-mode-works/swarm-task-states/ with log_context( - logger, level=logging.DEBUG, msg=f"_is_task_waiting_for_resources: {task}" + logger, level=logging.DEBUG, msg=f"_is_task_waiting_for_resources: {task.ID}" ): if ( not task.Status @@ -550,7 +551,10 @@ def is_node_ready_and_available(node: Node, *, availability: Availability) -> bo _OSPARC_SERVICE_READY_LABEL_KEY: Final[DockerLabelKey] = parse_obj_as( - DockerLabelKey, "osparc-services-ready" + DockerLabelKey, "io.simcore.osparc-services-ready" +) +_OSPARC_SERVICES_READY_DATETIME_LABEL_KEY: Final[DockerLabelKey] = parse_obj_as( + DockerLabelKey, f"{_OSPARC_SERVICE_READY_LABEL_KEY}-last-changed" ) @@ -575,6 +579,7 @@ async def set_node_osparc_ready( assert node.Spec # nosec new_tags = deepcopy(cast(dict[DockerLabelKey, str], node.Spec.Labels)) new_tags[_OSPARC_SERVICE_READY_LABEL_KEY] = "true" if ready else "false" + new_tags[_OSPARC_SERVICES_READY_DATETIME_LABEL_KEY] = arrow.utcnow().isoformat() # NOTE: docker drain sometimes impeed on performance when undraining see https://github.com/ITISFoundation/osparc-simcore/issues/5339 available = app_settings.AUTOSCALING_DRAIN_NODES_WITH_LABELS or ready return await tag_node( @@ -585,6 +590,15 @@ async def set_node_osparc_ready( ) +def get_node_last_readyness_update(node: Node) -> datetime.datetime: + assert node.Spec # nosec + assert node.Spec.Labels # nosec + return cast( + datetime.datetime, + arrow.get(node.Spec.Labels[_OSPARC_SERVICES_READY_DATETIME_LABEL_KEY]).datetime, + ) # mypy + + async def attach_node( app_settings: ApplicationSettings, docker_client: AutoscalingDocker, @@ -595,6 +609,7 @@ async def attach_node( assert node.Spec # nosec current_tags = cast(dict[DockerLabelKey, str], node.Spec.Labels or {}) new_tags = current_tags | tags | {_OSPARC_SERVICE_READY_LABEL_KEY: "false"} + new_tags[_OSPARC_SERVICES_READY_DATETIME_LABEL_KEY] = arrow.utcnow().isoformat() return await tag_node( docker_client, node, diff --git a/services/autoscaling/tests/unit/conftest.py b/services/autoscaling/tests/unit/conftest.py index 2dab933bbdd..d7347376fd7 100644 --- a/services/autoscaling/tests/unit/conftest.py +++ b/services/autoscaling/tests/unit/conftest.py @@ -14,6 +14,7 @@ from unittest import mock import aiodocker +import arrow import distributed import httpx import psutil @@ -50,6 +51,10 @@ ) from simcore_service_autoscaling.models import Cluster, DaskTaskResources from simcore_service_autoscaling.modules.docker import AutoscalingDocker +from simcore_service_autoscaling.utils.utils_docker import ( + _OSPARC_SERVICE_READY_LABEL_KEY, + _OSPARC_SERVICES_READY_DATETIME_LABEL_KEY, +) from tenacity import retry from tenacity._asyncio import AsyncRetrying from tenacity.retry import retry_if_exception_type @@ -321,13 +326,34 @@ async def host_node( ) -> AsyncIterator[DockerNode]: nodes = parse_obj_as(list[DockerNode], await async_docker_client.nodes.list()) assert len(nodes) == 1 + # keep state of node for later revert old_node = deepcopy(nodes[0]) + assert old_node.ID assert old_node.Spec assert old_node.Spec.Role assert old_node.Spec.Availability - yield nodes[0] + assert old_node.Version + assert old_node.Version.Index + labels = old_node.Spec.Labels or {} + # ensure we have the necessary labels + await async_docker_client.nodes.update( + node_id=old_node.ID, + version=old_node.Version.Index, + spec={ + "Availability": old_node.Spec.Availability.value, + "Labels": labels + | { + _OSPARC_SERVICE_READY_LABEL_KEY: "true", + _OSPARC_SERVICES_READY_DATETIME_LABEL_KEY: arrow.utcnow().isoformat(), + }, + "Role": old_node.Spec.Role.value, + }, + ) + modified_host_node = parse_obj_as( + DockerNode, await async_docker_client.nodes.inspect(node_id=old_node.ID) + ) + yield modified_host_node # revert state - assert old_node.ID current_node = parse_obj_as( DockerNode, await async_docker_client.nodes.inspect(node_id=old_node.ID) ) diff --git a/services/autoscaling/tests/unit/test_modules_auto_scaling_computational.py b/services/autoscaling/tests/unit/test_modules_auto_scaling_computational.py index 2e09740829d..d048e06124a 100644 --- a/services/autoscaling/tests/unit/test_modules_auto_scaling_computational.py +++ b/services/autoscaling/tests/unit/test_modules_auto_scaling_computational.py @@ -17,6 +17,7 @@ from typing import Any from unittest import mock +import arrow import distributed import pytest from aws_library.ec2.models import Resources @@ -43,6 +44,7 @@ from simcore_service_autoscaling.modules.docker import get_docker_client from simcore_service_autoscaling.utils.utils_docker import ( _OSPARC_SERVICE_READY_LABEL_KEY, + _OSPARC_SERVICES_READY_DATETIME_LABEL_KEY, ) from types_aiobotocore_ec2.client import EC2Client from types_aiobotocore_ec2.literals import InstanceTypeType @@ -468,29 +470,55 @@ async def test_cluster_scaling_up_and_down( # noqa: PLR0915 ) assert fake_attached_node.Spec.Labels fake_attached_node.Spec.Labels |= expected_docker_node_tags | { - _OSPARC_SERVICE_READY_LABEL_KEY: "false" + _OSPARC_SERVICE_READY_LABEL_KEY: "false", } - mock_docker_tag_node.assert_has_calls( - ( - # attach node call - mock.call( - get_docker_client(initialized_app), - fake_node, - tags=fake_node.Spec.Labels - | expected_docker_node_tags - | {_OSPARC_SERVICE_READY_LABEL_KEY: "false"}, - available=with_drain_nodes_labelled, - ), - mock.call( - get_docker_client(initialized_app), - fake_attached_node, - tags=fake_node.Spec.Labels - | expected_docker_node_tags - | {_OSPARC_SERVICE_READY_LABEL_KEY: "true"}, - available=True, - ), - ) + # check attach call + assert mock_docker_tag_node.call_args_list[0] == mock.call( + get_docker_client(initialized_app), + fake_node, + tags=fake_node.Spec.Labels + | expected_docker_node_tags + | { + _OSPARC_SERVICE_READY_LABEL_KEY: "false", + _OSPARC_SERVICES_READY_DATETIME_LABEL_KEY: mock.ANY, + }, + available=with_drain_nodes_labelled, + ) + # update our fake node + fake_attached_node.Spec.Labels[ + _OSPARC_SERVICES_READY_DATETIME_LABEL_KEY + ] = mock_docker_tag_node.call_args_list[0][1]["tags"][ + _OSPARC_SERVICES_READY_DATETIME_LABEL_KEY + ] + # check the activate time is later than attach time + assert arrow.get( + mock_docker_tag_node.call_args_list[1][1]["tags"][ + _OSPARC_SERVICES_READY_DATETIME_LABEL_KEY + ] + ) > arrow.get( + mock_docker_tag_node.call_args_list[0][1]["tags"][ + _OSPARC_SERVICES_READY_DATETIME_LABEL_KEY + ] + ) + + # check activate call + assert mock_docker_tag_node.call_args_list[1] == mock.call( + get_docker_client(initialized_app), + fake_attached_node, + tags=fake_node.Spec.Labels + | expected_docker_node_tags + | { + _OSPARC_SERVICE_READY_LABEL_KEY: "true", + _OSPARC_SERVICES_READY_DATETIME_LABEL_KEY: mock.ANY, + }, + available=True, ) + # update our fake node + fake_attached_node.Spec.Labels[ + _OSPARC_SERVICES_READY_DATETIME_LABEL_KEY + ] = mock_docker_tag_node.call_args_list[1][1]["tags"][ + _OSPARC_SERVICES_READY_DATETIME_LABEL_KEY + ] mock_docker_tag_node.reset_mock() mock_docker_set_node_availability.assert_not_called() mock_rabbitmq_post_message.assert_called_once() @@ -567,9 +595,20 @@ async def test_cluster_scaling_up_and_down( # noqa: PLR0915 get_docker_client(initialized_app), fake_attached_node, tags=fake_attached_node.Spec.Labels - | {_OSPARC_SERVICE_READY_LABEL_KEY: "false"}, + | { + _OSPARC_SERVICE_READY_LABEL_KEY: "false", + _OSPARC_SERVICES_READY_DATETIME_LABEL_KEY: mock.ANY, + }, available=with_drain_nodes_labelled, ) + # check the datetime was updated + assert arrow.get( + mock_docker_tag_node.call_args_list[0][1]["tags"][ + _OSPARC_SERVICES_READY_DATETIME_LABEL_KEY + ] + ) > arrow.get( + fake_attached_node.Spec.Labels[_OSPARC_SERVICES_READY_DATETIME_LABEL_KEY] + ) mock_docker_tag_node.reset_mock() await _assert_ec2_instances( @@ -583,10 +622,11 @@ async def test_cluster_scaling_up_and_down( # noqa: PLR0915 # we artifically set the node to drain fake_attached_node.Spec.Availability = Availability.drain fake_attached_node.Spec.Labels[_OSPARC_SERVICE_READY_LABEL_KEY] = "false" - fake_attached_node.UpdatedAt = datetime.datetime.now( - tz=datetime.timezone.utc - ).isoformat() - # the node will be not be terminated beforet the timeout triggers + fake_attached_node.Spec.Labels[ + _OSPARC_SERVICES_READY_DATETIME_LABEL_KEY + ] = datetime.datetime.now(tz=datetime.timezone.utc).isoformat() + + # the node will be not be terminated before the timeout triggers assert app_settings.AUTOSCALING_EC2_INSTANCES assert ( datetime.timedelta(seconds=5) @@ -608,7 +648,7 @@ async def test_cluster_scaling_up_and_down( # noqa: PLR0915 ) # now changing the last update timepoint will trigger the node removal and shutdown the ec2 instance - fake_attached_node.UpdatedAt = ( + fake_attached_node.Spec.Labels[_OSPARC_SERVICES_READY_DATETIME_LABEL_KEY] = ( datetime.datetime.now(tz=datetime.timezone.utc) - app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_TIME_BEFORE_TERMINATION - datetime.timedelta(seconds=1) diff --git a/services/autoscaling/tests/unit/test_modules_auto_scaling_dynamic.py b/services/autoscaling/tests/unit/test_modules_auto_scaling_dynamic.py index 5c6ce8c03ed..3cdc0943472 100644 --- a/services/autoscaling/tests/unit/test_modules_auto_scaling_dynamic.py +++ b/services/autoscaling/tests/unit/test_modules_auto_scaling_dynamic.py @@ -15,6 +15,7 @@ from unittest import mock import aiodocker +import arrow import pytest from aws_library.ec2.models import EC2InstanceData, Resources from faker import Faker @@ -54,6 +55,7 @@ ) from simcore_service_autoscaling.utils.utils_docker import ( _OSPARC_SERVICE_READY_LABEL_KEY, + _OSPARC_SERVICES_READY_DATETIME_LABEL_KEY, ) from types_aiobotocore_ec2.client import EC2Client from types_aiobotocore_ec2.literals import InstanceTypeType @@ -521,11 +523,67 @@ async def test_cluster_scaling_up_and_down( # noqa: PLR0915 _OSPARC_SERVICE_READY_LABEL_KEY: "false" } + # the node is tagged and made active right away since we still have the pending task + mock_find_node_with_name.assert_called_once() + mock_find_node_with_name.reset_mock() + + assert mock_docker_tag_node.call_count == 2 + assert fake_node.Spec + assert fake_node.Spec.Labels + # check attach call + assert mock_docker_tag_node.call_args_list[0] == mock.call( + get_docker_client(initialized_app), + fake_node, + tags=fake_node.Spec.Labels + | expected_docker_node_tags + | { + _OSPARC_SERVICE_READY_LABEL_KEY: "false", + _OSPARC_SERVICES_READY_DATETIME_LABEL_KEY: mock.ANY, + }, + available=with_drain_nodes_labelled, + ) + # update our fake node + fake_attached_node.Spec.Labels[ + _OSPARC_SERVICES_READY_DATETIME_LABEL_KEY + ] = mock_docker_tag_node.call_args_list[0][1]["tags"][ + _OSPARC_SERVICES_READY_DATETIME_LABEL_KEY + ] + # check the activate time is later than attach time + assert arrow.get( + mock_docker_tag_node.call_args_list[1][1]["tags"][ + _OSPARC_SERVICES_READY_DATETIME_LABEL_KEY + ] + ) > arrow.get( + mock_docker_tag_node.call_args_list[0][1]["tags"][ + _OSPARC_SERVICES_READY_DATETIME_LABEL_KEY + ] + ) mock_compute_node_used_resources.assert_called_once_with( get_docker_client(initialized_app), fake_attached_node, ) mock_compute_node_used_resources.reset_mock() + # check activate call + assert mock_docker_tag_node.call_args_list[1] == mock.call( + get_docker_client(initialized_app), + fake_attached_node, + tags=fake_node.Spec.Labels + | expected_docker_node_tags + | { + _OSPARC_SERVICE_READY_LABEL_KEY: "true", + _OSPARC_SERVICES_READY_DATETIME_LABEL_KEY: mock.ANY, + }, + available=True, + ) + # update our fake node + fake_attached_node.Spec.Labels[ + _OSPARC_SERVICES_READY_DATETIME_LABEL_KEY + ] = mock_docker_tag_node.call_args_list[1][1]["tags"][ + _OSPARC_SERVICES_READY_DATETIME_LABEL_KEY + ] + mock_docker_tag_node.reset_mock() + mock_docker_set_node_availability.assert_not_called() + # check the number of instances did not change and is still running internal_dns_names = await _assert_ec2_instances( ec2_client, @@ -537,38 +595,6 @@ async def test_cluster_scaling_up_and_down( # noqa: PLR0915 assert len(internal_dns_names) == 1 internal_dns_name = internal_dns_names[0].removesuffix(".ec2.internal") - # the node is tagged and made active right away since we still have the pending task - mock_find_node_with_name.assert_called_once() - mock_find_node_with_name.reset_mock() - - assert mock_docker_tag_node.call_count == 2 - assert fake_node.Spec - assert fake_node.Spec.Labels - - mock_docker_tag_node.assert_has_calls( - ( - # attach node call - mock.call( - get_docker_client(initialized_app), - fake_node, - tags=fake_node.Spec.Labels - | expected_docker_node_tags - | {_OSPARC_SERVICE_READY_LABEL_KEY: "false"}, - available=with_drain_nodes_labelled, - ), - mock.call( - get_docker_client(initialized_app), - fake_attached_node, - tags=fake_node.Spec.Labels - | expected_docker_node_tags - | {_OSPARC_SERVICE_READY_LABEL_KEY: "true"}, - available=True, - ), - ) - ) - mock_docker_tag_node.reset_mock() - mock_docker_set_node_availability.assert_not_called() - # check rabbit messages were sent, we do have worker assert fake_attached_node.Description assert fake_attached_node.Description.Resources @@ -653,9 +679,20 @@ async def test_cluster_scaling_up_and_down( # noqa: PLR0915 get_docker_client(initialized_app), fake_attached_node, tags=fake_attached_node.Spec.Labels - | {_OSPARC_SERVICE_READY_LABEL_KEY: "false"}, + | { + _OSPARC_SERVICE_READY_LABEL_KEY: "false", + _OSPARC_SERVICES_READY_DATETIME_LABEL_KEY: mock.ANY, + }, available=with_drain_nodes_labelled, ) + # check the datetime was updated + assert arrow.get( + mock_docker_tag_node.call_args_list[0][1]["tags"][ + _OSPARC_SERVICES_READY_DATETIME_LABEL_KEY + ] + ) > arrow.get( + fake_attached_node.Spec.Labels[_OSPARC_SERVICES_READY_DATETIME_LABEL_KEY] + ) mock_docker_tag_node.reset_mock() # calling again does the exact same @@ -665,7 +702,10 @@ async def test_cluster_scaling_up_and_down( # noqa: PLR0915 get_docker_client(initialized_app), fake_attached_node, tags=fake_attached_node.Spec.Labels - | {_OSPARC_SERVICE_READY_LABEL_KEY: "false"}, + | { + _OSPARC_SERVICE_READY_LABEL_KEY: "false", + _OSPARC_SERVICES_READY_DATETIME_LABEL_KEY: mock.ANY, + }, available=with_drain_nodes_labelled, ) mock_docker_tag_node.reset_mock() @@ -681,9 +721,10 @@ async def test_cluster_scaling_up_and_down( # noqa: PLR0915 # we artifically set the node to drain fake_attached_node.Spec.Availability = Availability.drain fake_attached_node.Spec.Labels[_OSPARC_SERVICE_READY_LABEL_KEY] = "false" - fake_attached_node.UpdatedAt = datetime.datetime.now( - tz=datetime.timezone.utc - ).isoformat() + fake_attached_node.Spec.Labels[ + _OSPARC_SERVICES_READY_DATETIME_LABEL_KEY + ] = datetime.datetime.now(tz=datetime.timezone.utc).isoformat() + # the node will be not be terminated before the timeout triggers assert app_settings.AUTOSCALING_EC2_INSTANCES assert ( @@ -706,7 +747,7 @@ async def test_cluster_scaling_up_and_down( # noqa: PLR0915 ) # now changing the last update timepoint will trigger the node removal and shutdown the ec2 instance - fake_attached_node.UpdatedAt = ( + fake_attached_node.Spec.Labels[_OSPARC_SERVICES_READY_DATETIME_LABEL_KEY] = ( datetime.datetime.now(tz=datetime.timezone.utc) - app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_TIME_BEFORE_TERMINATION - datetime.timedelta(seconds=1) @@ -856,10 +897,16 @@ async def test__deactivate_empty_nodes( assert not updated_cluster.active_nodes assert len(updated_cluster.drained_nodes) == len(active_cluster.active_nodes) mock_docker_set_node_availability.assert_not_called() + assert host_node.Spec + assert host_node.Spec.Labels mock_docker_tag_node.assert_called_once_with( mock.ANY, host_node, - tags={_OSPARC_SERVICE_READY_LABEL_KEY: "false"}, + tags=host_node.Spec.Labels + | { + _OSPARC_SERVICE_READY_LABEL_KEY: "false", + _OSPARC_SERVICES_READY_DATETIME_LABEL_KEY: mock.ANY, + }, available=with_drain_nodes_labelled, ) @@ -898,10 +945,16 @@ async def test__deactivate_empty_nodes_to_drain_when_services_running_are_missin assert not updated_cluster.active_nodes assert len(updated_cluster.drained_nodes) == len(active_cluster.active_nodes) mock_docker_set_node_availability.assert_not_called() + assert host_node.Spec + assert host_node.Spec.Labels mock_docker_tag_node.assert_called_once_with( mock.ANY, host_node, - tags={_OSPARC_SERVICE_READY_LABEL_KEY: "false"}, + tags=host_node.Spec.Labels + | { + _OSPARC_SERVICE_READY_LABEL_KEY: "false", + _OSPARC_SERVICES_READY_DATETIME_LABEL_KEY: mock.ANY, + }, available=with_drain_nodes_labelled, ) @@ -1174,6 +1227,9 @@ async def test__activate_drained_nodes_with_drained_node( mock_docker_tag_node.assert_called_once_with( mock.ANY, drained_host_node, - tags={_OSPARC_SERVICE_READY_LABEL_KEY: "true"}, + tags={ + _OSPARC_SERVICE_READY_LABEL_KEY: "true", + _OSPARC_SERVICES_READY_DATETIME_LABEL_KEY: mock.ANY, + }, available=True, ) From 690d48fcb21b3662a517d548679ce50d83ff37bf Mon Sep 17 00:00:00 2001 From: Julian Querido Date: Tue, 20 Feb 2024 12:22:17 +0100 Subject: [PATCH 6/6] Review items --- .../client/source/class/osparc/editor/ThumbnailEditor.js | 2 +- .../source/class/osparc/editor/ThumbnailSuggestions.js | 9 +++++---- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/services/static-webserver/client/source/class/osparc/editor/ThumbnailEditor.js b/services/static-webserver/client/source/class/osparc/editor/ThumbnailEditor.js index 6f040f72efc..35426c7fe9d 100644 --- a/services/static-webserver/client/source/class/osparc/editor/ThumbnailEditor.js +++ b/services/static-webserver/client/source/class/osparc/editor/ThumbnailEditor.js @@ -145,7 +145,7 @@ qx.Class.define("osparc.editor.ThumbnailEditor", { thumbnailSuggestions.setSuggestions(suggestions); thumbnailSuggestions.addListener("thumbnailTapped", e => { const thumbnailData = e.getData(); - this.setUrl(thumbnailData["source"] || thumbnailData.getSource()); + this.setUrl(thumbnailData.source); }); this.getChildControl("thumbnails-layout").setVisibility(suggestions.length ? "visible" : "excluded"); } diff --git a/services/static-webserver/client/source/class/osparc/editor/ThumbnailSuggestions.js b/services/static-webserver/client/source/class/osparc/editor/ThumbnailSuggestions.js index 03afc330ec6..e58882ddbb9 100644 --- a/services/static-webserver/client/source/class/osparc/editor/ThumbnailSuggestions.js +++ b/services/static-webserver/client/source/class/osparc/editor/ThumbnailSuggestions.js @@ -23,7 +23,8 @@ qx.Class.define("osparc.editor.ThumbnailSuggestions", { this.set({ alignX: "center", - maxHeight: 170 + height: 118, + maxHeight: 118 }); this.setButtonsWidth(30); @@ -154,14 +155,14 @@ qx.Class.define("osparc.editor.ThumbnailSuggestions", { this.removeAll(); suggestions.forEach(suggestion => { const maxHeight = this.getMaxHeight(); - const thumbnail = new osparc.ui.basic.Thumbnail(suggestion["thumbnailUrl"] || suggestion, maxHeight, parseInt(maxHeight*2/3)); + const thumbnail = new osparc.ui.basic.Thumbnail(suggestion.thumbnailUrl || suggestion, maxHeight, parseInt(maxHeight*2/3)); thumbnail.set({ minWidth: 97, margin: 0, decorator: "thumbnail" }); - thumbnail.thumbnailType = suggestion["type"] || "templateThumbnail"; - thumbnail.thumbnailFileUrl = suggestion["fileUrl"] || suggestion; + thumbnail.thumbnailType = suggestion.type || "templateThumbnail"; + thumbnail.thumbnailFileUrl = suggestion.fileUrl || suggestion; thumbnail.addListener("mouseover", () => thumbnail.set({decorator: "thumbnail-selected"}), this); thumbnail.addListener("mouseout", () => thumbnail.set({decorator: "thumbnail"}), this); thumbnail.addListener("tap", () => {