diff --git a/mautrix_telegram/commands/handler.py b/mautrix_telegram/commands/handler.py index ba071700..43b45968 100644 --- a/mautrix_telegram/commands/handler.py +++ b/mautrix_telegram/commands/handler.py @@ -14,9 +14,21 @@ # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . -from typing import Awaitable, Callable, Dict, List, NamedTuple, Optional -import traceback +"""This module contains classes handling commands issued by Matrix users.""" +from typing import ( + Any, + Awaitable, + Callable, + Coroutine, + Dict, + List, + NamedTuple, + Optional, + Union, + NewType, +) import logging +import traceback import commonmark @@ -59,7 +71,28 @@ def image(self, node, entering): md_renderer = HtmlEscapingRenderer() +def ensure_trailing_newline(s: str) -> str: + """Returns the passed string, but with a guaranteed trailing newline.""" + return s + ("" if s[-1] == "\n" else "\n") + + class CommandEvent: + """Holds information about a command issued in a Matrix room. + + When a Matrix command was issued to the bot, CommandEvent will hold + information regarding the event. + + Attributes: + room_id: The id of the Matrix room in which the command was issued. + event_id: The id of the matrix event which contained the command. + sender: The user who issued the command. + command: The issued command. + args: Arguments given with the issued command. + is_management: Determines whether the room in which the command wa + issued is a management room. + is_portal: Determines whether the room in which the command was issued + is a portal. + """ def __init__(self, processor: 'CommandProcessor', room: MatrixRoomID, event: MatrixEventID, sender: u.User, command: str, args: List[str], is_management: bool, is_portal: bool) -> None: @@ -78,28 +111,109 @@ def __init__(self, processor: 'CommandProcessor', room: MatrixRoomID, event: Mat self.is_management = is_management self.is_portal = is_portal - def reply(self, message: str, allow_html: bool = False, render_markdown: bool = True - ) -> Awaitable[Dict]: - message = message.replace("$cmdprefix+sp ", - "" if self.is_management else f"{self.command_prefix} ") - message = message.replace("$cmdprefix", self.command_prefix) - html = None + def reply( + self, + message: str, + allow_html: bool = False, + render_markdown: bool = True, + ) -> Awaitable[Dict]: + """Write a reply to the room in which the command was issued. + + Replaces occurences of "$cmdprefix" in the message with the command + prefix and replaces occurences of "$cmdprefix+sp " with the command + prefix if the command was not issued in a management room. + If allow_html and render_markdown are both False, the message will not + be rendered to html and sending of html is disabled. + + Args: + message: The message to post in the room. + allow_html: Escape html in the message or don't render html at all + if markdown is disabled. + render_markdown: Use markdown formatting to render the passed + message to html. + + Returns: + Handler for the message sending function. + """ + message_cmd = self._replace_command_prefix(message) + html = self._render_message( + message_cmd, allow_html=allow_html, render_markdown=render_markdown + ) + + return self.az.intent.send_notice(self.room_id, message_cmd, html=html) + + def mark_read(self) -> Awaitable[Dict]: + """Marks the command as read by the bot.""" + return self.az.intent.mark_read(self.room_id, self.event_id) + + def _replace_command_prefix(self, message: str) -> str: + """Returns the string with the proper command prefix entered.""" + message = message.replace( + "$cmdprefix+sp ", "" if self.is_management else f"{self.command_prefix} " + ) + return message.replace("$cmdprefix", self.command_prefix) + + def _render_message( + self, message: str, allow_html: bool, render_markdown: bool + ) -> Optional[str]: + """Renders the message as HTML. + + Args: + allow_html: Flag to allow custom HTML in the message. + render_markdown: If true, markdown styling is applied to the message. + + Returns: + The message rendered as HTML. + None is returned if no styled output is required. + """ + html = "" if render_markdown: md_renderer.allow_html = allow_html html = md_renderer.render(md_parser.parse(message)) elif allow_html: html = message - return self.az.intent.send_notice(self.room_id, message, html=html) - - def mark_read(self) -> Awaitable[Dict]: - return self.az.intent.mark_read(self.room_id, self.event_id) + return ensure_trailing_newline(html) if html else None class CommandHandler: + """A command which can be executed from a Matrix room. + + The command manages its permission and help texts. + When called, it will check the permission of the command event and execute + the command or, in case of error, report back to the user. + + Attributes: + needs_auth: Flag indicating if the sender is required to be logged in. + needs_puppeting: Flag indicating if the sender is required to use + Telegram puppeteering for this command. + needs_matrix_puppeting: Flag indicating if the sender is required to use + Matrix pupeteering. + needs_admin: Flag for whether only admin users can issue this command. + management_only: Whether the command can exclusively be issued in a + management room. + name: The name of this command. + help_section: Section of the help in which this command will appear. + """ def __init__(self, handler: Callable[[CommandEvent], Awaitable[Dict]], needs_auth: bool, needs_puppeting: bool, needs_matrix_puppeting: bool, needs_admin: bool, management_only: bool, name: str, help_text: str, help_args: str, help_section: HelpSection) -> None: + """ + Args: + handler: The function handling the execution of this command. + needs_auth: Flag indicating if the sender is required to be logged in. + needs_puppeting: Flag indicating if the sender is required to use + Telegram puppeteering for this command. + needs_matrix_puppeting: Flag indicating if the sender is required to + use Matrix pupeteering. + needs_admin: Flag for whether only admin users can issue this command. + management_only: Whether the command can exclusively be issued + in a management room. + name: The name of this command. + help_text: The text displayed in the help for this command. + help_args: Help text for the arguments of this command. + help_section: Section of the help in which this command will appear. + """ self._handler = handler self.needs_auth = needs_auth self.needs_puppeting = needs_puppeting @@ -112,6 +226,14 @@ def __init__(self, handler: Callable[[CommandEvent], Awaitable[Dict]], needs_aut self.help_section = help_section async def get_permission_error(self, evt: CommandEvent) -> Optional[str]: + """Returns the reason why the command could not be issued. + + Args: + evt: The event for which to get the error information. + + Returns: + A string describing the error or None if there was no error. + """ if self.management_only and not evt.is_management: return (f"`{evt.command}` is a restricted command: " "you may only run it in management rooms.") @@ -127,6 +249,22 @@ async def get_permission_error(self, evt: CommandEvent) -> Optional[str]: def has_permission(self, is_management: bool, puppet_whitelisted: bool, matrix_puppet_whitelisted: bool, is_admin: bool, is_logged_in: bool) -> bool: + """Checks the permission for this command with the given status. + + Args: + is_management: If the room in which the command will be issued is a + management room. + puppet_whitelited: If the connected Telegram account puppet is + allowed to issue the command. + matrix_puppet_whitelisted: If the connected Matrix account puppet is + allowed to issue the command. + is_admin: If the issuing user is an admin. + is_logged_in: If the issuing user is logged in. + + Returns: + True if a user with the given state is allowed to issue the + command. + """ return ((not self.management_only or is_management) and (not self.needs_puppeting or puppet_whitelisted) and (not self.needs_matrix_puppeting or matrix_puppet_whitelisted) and @@ -134,6 +272,17 @@ def has_permission(self, is_management: bool, puppet_whitelisted: bool, (not self.needs_auth or is_logged_in)) async def __call__(self, evt: CommandEvent) -> Dict: + """Executes the command if evt was issued with proper rights. + + Args: + evt: The CommandEvent for which to check permissions. + + Returns: + The result of the command or the error message function. + + Raises: + FloodWaitError + """ error = await self.get_permission_error(evt) if error is not None: return await evt.reply(error) @@ -141,10 +290,12 @@ async def __call__(self, evt: CommandEvent) -> Dict: @property def has_help(self) -> bool: + """Returns true if this command has a help text.""" return bool(self.help_section) and bool(self._help_text) @property def help(self) -> str: + """Returns the help text to this command.""" return f"**{self.name}** {self._help_args} - {self._help_text}" @@ -173,6 +324,7 @@ def decorator(func: Callable[[CommandEvent], Awaitable[Optional[Dict]]]) -> Comm class CommandProcessor: + """Handles the raw commands issued by a user to the Matrix bot.""" log = logging.getLogger("mau.commands") def __init__(self, context: c.Context) -> None: @@ -183,6 +335,28 @@ def __init__(self, context: c.Context) -> None: async def handle(self, room: MatrixRoomID, event_id: MatrixEventID, sender: u.User, command: str, args: List[str], is_management: bool, is_portal: bool ) -> Optional[Dict]: + """Handles the raw commands issued by a user to the Matrix bot. + + If the command is not known, it might be a followup command and is + delegated to a command handler registered for that purpose in the + senders command_status as "next". + + Args: + room: ID of the Matrix room in which the command was issued. + event_id: ID of the event by which the command was issued. + sender: The sender who issued the command. + command: The issued command, case insensitive. + args: Arguments given with the command. + is_management: Whether the room is a management room. + is_portal: Whether the room is a portal. + + Returns: + The result of the error message function or None if no error + occured. Unknown and delegated commands do not count as errors. + """ + if not command_handlers or "unknown-command" not in command_handlers: + raise ValueError("command_handlers are not properly initialized.") + evt = CommandEvent(self, room, event_id, sender, command, args, is_management, is_portal) orig_command = command command = command.lower() diff --git a/setup.cfg b/setup.cfg new file mode 100644 index 00000000..b7e47898 --- /dev/null +++ b/setup.cfg @@ -0,0 +1,2 @@ +[aliases] +test=pytest diff --git a/setup.py b/setup.py index 2a5c55f7..68078c58 100644 --- a/setup.py +++ b/setup.py @@ -44,6 +44,9 @@ ], extras_require=extras, + setup_requires=["pytest-runner"], + tests_require=["pytest", "pytest-asyncio", "pytest-mock"], + classifiers=[ "Development Status :: 4 - Beta", "License :: OSI Approved :: GNU Affero General Public License v3 or later (AGPLv3+)", diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/commands/__init__.py b/tests/commands/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/commands/test_handler.py b/tests/commands/test_handler.py new file mode 100644 index 00000000..a88e8661 --- /dev/null +++ b/tests/commands/test_handler.py @@ -0,0 +1,394 @@ +from typing import Tuple +from unittest.mock import Mock + +import pytest +from _pytest.fixtures import FixtureRequest +from pytest_mock import MockFixture + +import mautrix_telegram.commands.handler +from mautrix_telegram.commands.handler import ( + CommandEvent, CommandHandler, CommandProcessor, HelpSection +) +from mautrix_telegram.config import Config +from mautrix_telegram.context import Context +from mautrix_telegram.types import MatrixEventID, MatrixRoomID, MatrixUserID +import mautrix_telegram.user as u + +from tests.utils.helpers import AsyncMock, list_true_once_each + + +@pytest.fixture +def context(request: FixtureRequest) -> Context: + """Returns a Context with mocked Attributes. + + Uses the attribute cls.config as Config. + """ + # Config(path, registration_path, base_path) + config = getattr(request.cls, 'config', Config("", "", "")) + return Context( + Mock(), # az + Mock(), # db + config, # config + Mock(), # loop + Mock() # session_container + ) + + +@pytest.fixture +def command_processor(context: Context) -> CommandProcessor: + """Returns a mocked CommandProcessor.""" + return CommandProcessor(context) + + +class TestCommandEvent: + config = Config("", "", "") + config["bridge.command_prefix"] = "tg" + config["bridge.permissions"] = {"*": "noperm"} + + def test_reply( + self, command_processor: CommandProcessor, mocker: MockFixture + ) -> None: + mocker.patch("mautrix_telegram.user.config", self.config) + + evt = CommandEvent( + processor=command_processor, + room=MatrixRoomID("#mock_room:example.org"), + event=MatrixEventID("$H45H:example.org"), + sender=u.User(MatrixUserID("@sender:example.org")), + command="help", + args=[], + is_management=True, + is_portal=False, + ) + + mock_az = command_processor.az + + message = "**This** was
allfun*!" + + # html, no markdown + evt.reply(message, allow_html=True, render_markdown=False) + mock_az.intent.send_notice.assert_called_with( + MatrixRoomID("#mock_room:example.org"), + "**This** was
allfun*!", + html="**This** was
allfun*!\n", + ) + + # html, markdown (default) + evt.reply(message, allow_html=True, render_markdown=True) + mock_az.intent.send_notice.assert_called_with( + MatrixRoomID("#mock_room:example.org"), + "**This** was
allfun*!", + html=( + "

This was
" + "allfun*!

\n" + ), + ) + + # no html, no markdown + evt.reply(message, allow_html=False, render_markdown=False) + mock_az.intent.send_notice.assert_called_with( + MatrixRoomID("#mock_room:example.org"), + "**This** was
allfun*!", + html=None, + ) + + # no html, markdown + evt.reply(message, allow_html=False, render_markdown=True) + mock_az.intent.send_notice.assert_called_with( + MatrixRoomID("#mock_room:example.org"), + "**This** was
allfun*!", + html=( + "

This <i>was</i><br/>" + "<strong>all</strong>fun*!

\n" + ), + ) + + def test_reply_with_cmdprefix( + self, command_processor: CommandProcessor, mocker: MockFixture + ) -> None: + mocker.patch("mautrix_telegram.user.config", self.config) + + evt = CommandEvent( + processor=command_processor, + room=MatrixRoomID("#mock_room:example.org"), + event=MatrixEventID("$H45H:example.org"), + sender=u.User(MatrixUserID("@sender:example.org")), + command="help", + args=[], + is_management=False, + is_portal=False, + ) + + mock_az = command_processor.az + + evt.reply( + "$cmdprefix+sp ....$cmdprefix+sp...$cmdprefix $cmdprefix", + allow_html=False, + render_markdown=False, + ) + + mock_az.intent.send_notice.assert_called_with( + MatrixRoomID("#mock_room:example.org"), + "tg ....tg+sp...tg tg", + html=None, + ) + + def test_reply_with_cmdprefix_in_management_room( + self, command_processor: CommandProcessor, mocker: MockFixture + ) -> None: + mocker.patch("mautrix_telegram.user.config", self.config) + + evt = CommandEvent( + processor=command_processor, + room=MatrixRoomID("#mock_room:example.org"), + event=MatrixEventID("$H45H:example.org"), + sender=u.User(MatrixUserID("@sender:example.org")), + command="help", + args=[], + is_management=True, + is_portal=False, + ) + + mock_az = command_processor.az + + evt.reply( + "$cmdprefix+sp ....$cmdprefix+sp...$cmdprefix $cmdprefix", + allow_html=True, + render_markdown=True, + ) + + mock_az.intent.send_notice.assert_called_with( + MatrixRoomID("#mock_room:example.org"), + "....tg+sp...tg tg", + html="

....tg+sp...tg tg

\n", + ) + +class TestCommandHandler: + config = Config("", "", "") + config["bridge.permissions"] = {"*": "noperm"} + + @pytest.mark.parametrize( + ( + "needs_auth," + "needs_puppeting," + "needs_matrix_puppeting," + "needs_admin," + "management_only," + ), + [l for l in list_true_once_each(length=5)] + ) + @pytest.mark.asyncio + async def test_permissions_denied( + self, + needs_auth: bool, + needs_puppeting: bool, + needs_matrix_puppeting: bool, + needs_admin: bool, + management_only: bool, + command_processor: CommandProcessor, + boolean: bool, + mocker: MockFixture, + ) -> None: + mocker.patch("mautrix_telegram.user.config", self.config) + + command = "testcmd" + + mock_handler = Mock() + + command_handler = CommandHandler( + handler=mock_handler, + needs_auth=needs_auth, + needs_puppeting=needs_puppeting, + needs_matrix_puppeting=needs_matrix_puppeting, + needs_admin=needs_admin, + management_only=management_only, + name=command, + help_text="No real command", + help_args="mock mockmock", + help_section=HelpSection("Mock Section", 42, ""), + ) + + sender = u.User(MatrixUserID("@sender:example.org")) + sender.puppet_whitelisted = False + sender.matrix_puppet_whitelisted = False + sender.is_admin = False + + event = CommandEvent( + processor=command_processor, + room=MatrixRoomID("#mock_room:example.org"), + event=MatrixEventID("$H45H:example.org"), + sender=sender, + command=command, + args=[], + is_management=False, + is_portal=boolean, + ) + + assert await command_handler.get_permission_error(event) + assert not command_handler.has_permission(False, False, False, False, False) + + @pytest.mark.parametrize( + ( + "is_management," + "puppet_whitelisted," + "matrix_puppet_whitelisted," + "is_admin," + "is_logged_in," + ), + [l for l in list_true_once_each(length=5)] + ) + @pytest.mark.asyncio + async def test_permission_granted( + self, + is_management: bool, + puppet_whitelisted: bool, + matrix_puppet_whitelisted: bool, + is_admin: bool, + is_logged_in: bool, + command_processor: CommandProcessor, + boolean: bool, + mocker: MockFixture, + ) -> None: + mocker.patch("mautrix_telegram.user.config", self.config) + + command = "testcmd" + + mock_handler = Mock() + + command_handler = CommandHandler( + handler=mock_handler, + needs_auth=False, + needs_puppeting=False, + needs_matrix_puppeting=False, + needs_admin=False, + management_only=False, + name=command, + help_text="No real command", + help_args="mock mockmock", + help_section=HelpSection("Mock Section", 42, ""), + ) + + sender = u.User(MatrixUserID("@sender:example.org")) + sender.puppet_whitelisted = puppet_whitelisted + sender.matrix_puppet_whitelisted = matrix_puppet_whitelisted + sender.is_admin = is_admin + mocker.patch.object(u.User, 'is_logged_in', return_value=is_logged_in) + + event = CommandEvent( + processor=command_processor, + room=MatrixRoomID("#mock_room:example.org"), + event=MatrixEventID("$H45H:example.org"), + sender=sender, + command=command, + args=[], + is_management=is_management, + is_portal=boolean, + ) + + assert not await command_handler.get_permission_error(event) + assert command_handler.has_permission( + is_management=is_management, + puppet_whitelisted=puppet_whitelisted, + matrix_puppet_whitelisted=matrix_puppet_whitelisted, + is_admin=is_admin, + is_logged_in=is_logged_in, + ) + + +class TestCommandProcessor: + config = Config("", "", "") + config["bridge.command_prefix"] = "tg" + config["bridge.permissions"] = {"*": "relaybot"} + + @pytest.mark.asyncio + async def test_handle( + self, + command_processor: CommandProcessor, + boolean2: Tuple[bool, bool], + mocker: MockFixture, + ) -> None: + mocker.patch('mautrix_telegram.user.config', self.config) + mocker.patch( + 'mautrix_telegram.commands.handler.command_handlers', + {"help": AsyncMock(), "unknown-command": AsyncMock()} + ) + + sender = u.User(MatrixUserID("@sender:example.org")) + + result = await command_processor.handle( + room=MatrixRoomID("#mock_room:example.org"), + event_id=MatrixEventID("$H45H:example.org"), + sender=sender, + command="hElp", + args=[], + is_management=boolean2[0], + is_portal=boolean2[1], + ) + + assert result is None + command_handlers = mautrix_telegram.commands.handler.command_handlers + command_handlers["help"].mock.assert_called_once() # type: ignore + + @pytest.mark.asyncio + async def test_handle_unknown_command( + self, + command_processor: CommandProcessor, + boolean2: Tuple[bool, bool], + mocker: MockFixture, + ) -> None: + mocker.patch('mautrix_telegram.user.config', self.config) + mocker.patch( + 'mautrix_telegram.commands.handler.command_handlers', + {"help": AsyncMock(), "unknown-command": AsyncMock()} + ) + + sender = u.User(MatrixUserID("@sender:example.org")) + sender.command_status = {} + + result = await command_processor.handle( + room=MatrixRoomID("#mock_room:example.org"), + event_id=MatrixEventID("$H45H:example.org"), + sender=sender, + command="foo", + args=[], + is_management=boolean2[0], + is_portal=boolean2[1], + ) + + assert result is None + command_handlers = mautrix_telegram.commands.handler.command_handlers + command_handlers["help"].mock.assert_not_called() # type: ignore + command_handlers["unknown-command"].mock.assert_called_once() # type: ignore + + @pytest.mark.asyncio + async def test_handle_delegated_handler( + self, + command_processor: CommandProcessor, + boolean2: Tuple[bool, bool], + mocker: MockFixture, + ) -> None: + mocker.patch('mautrix_telegram.user.config', self.config) + mocker.patch( + 'mautrix_telegram.commands.handler.command_handlers', + {"help": AsyncMock(), "unknown-command": AsyncMock()} + ) + + sender = u.User(MatrixUserID("@sender:example.org")) + sender.command_status = {"foo": AsyncMock(), "next": AsyncMock()} + + result = await command_processor.handle( + room=MatrixRoomID("#mock_room:example.org"), + event_id=MatrixEventID("$H45H:example.org"), + sender=sender, # u.User + command="foo", + args=[], + is_management=boolean2[0], + is_portal=boolean2[1] + ) + + assert result is None + command_handlers = mautrix_telegram.commands.handler.command_handlers + command_handlers["help"].mock.assert_not_called() # type: ignore + command_handlers["unknown-command"].mock.assert_not_called() # type: ignore + sender.command_status["foo"].mock.assert_not_called() # type: ignore + sender.command_status["next"].mock.assert_called_once() # type: ignore diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 00000000..86f72a5c --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,3 @@ +pytest_plugins = [ + "tests.utils.fixtures", +] diff --git a/tests/utils/__init__.py b/tests/utils/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/utils/fixtures.py b/tests/utils/fixtures.py new file mode 100644 index 00000000..3c272750 --- /dev/null +++ b/tests/utils/fixtures.py @@ -0,0 +1,30 @@ +"""This module provides utility fixtures for testing.""" +from typing import Tuple + +from _pytest.fixtures import FixtureRequest +import pytest + + +@pytest.fixture(params=[True, False]) +def boolean(request: FixtureRequest) -> bool: + return request.param + + +@pytest.fixture +def boolean1(boolean: bool) -> Tuple[bool]: + return (boolean,) + + +@pytest.fixture(params=[True, False]) +def boolean2(request: FixtureRequest, boolean: bool) -> Tuple[bool, bool]: + return (boolean, request.param) + + +@pytest.fixture(params=[True, False]) +def boolean3( + request: FixtureRequest, boolean2: Tuple[bool, bool] +) -> Tuple[bool, bool, bool]: + return (boolean2[0], boolean2[1], request.param) + + +# … diff --git a/tests/utils/helpers.py b/tests/utils/helpers.py new file mode 100644 index 00000000..e2283b9b --- /dev/null +++ b/tests/utils/helpers.py @@ -0,0 +1,24 @@ +"""This module provides utility functions for testing.""" +from typing import Generator, Tuple +from unittest.mock import Mock + + +def AsyncMock(*args, **kwargs): + """Mocks a asyncronous coroutine which can be called with 'await'.""" + m = Mock(*args, **kwargs) + + async def mock_coro(*args, **kwargs): + return m(*args, **kwargs) + + mock_coro.mock = m + return mock_coro + + +def list_true_once_each(length: int) -> Generator[Tuple[bool, ...], None, None]: + """Yields tuples of bools with exactly one entry being True, starting left. + + Args: + length: Length of the resulting tuples + """ + for i in range(length): + yield tuple(i == j for j in range(length))