Skip to content

Commit

Permalink
Update pre-commit configuration and fix newly found issues (#165)
Browse files Browse the repository at this point in the history
  • Loading branch information
koenvervloesem authored Aug 19, 2023
1 parent 71b16ee commit d38b4f1
Show file tree
Hide file tree
Showing 6 changed files with 75 additions and 67 deletions.
8 changes: 4 additions & 4 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
---
repos:
- repo: https://github.com/psf/black
rev: 22.10.0
rev: 23.7.0
hooks:
- id: black
- repo: https://github.com/charliermarsh/ruff-pre-commit
rev: 'v0.0.261'
rev: 'v0.0.285'
hooks:
- id: ruff
- repo: https://github.com/pre-commit/mirrors-mypy
rev: v0.991
rev: v1.5.1
hooks:
- id: mypy
entry: env MYPYPATH=src mypy
entry: env MYPYPATH=TheengsGateway mypy
args: []
additional_dependencies:
- bleak>=0.19.0
Expand Down
66 changes: 34 additions & 32 deletions TheengsGateway/ble_gateway.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
"""

# mypy: disable-error-code="name-defined,attr-defined"

from __future__ import annotations

import asyncio
import json
Expand All @@ -29,22 +29,25 @@
import struct
from contextlib import suppress
from datetime import datetime
from pathlib import Path
from random import randrange
from threading import Thread
from time import time
from typing import Dict, Optional, Union
from typing import TYPE_CHECKING, Union

from bleak import BleakError, BleakScanner
from bleak.backends.device import BLEDevice
from bleak.backends.scanner import AdvertisementData
from bluetooth_clocks.exceptions import UnsupportedDeviceError
from bluetooth_clocks.scanners import find_clock
from bluetooth_numbers import company
from bluetooth_numbers.exceptions import No16BitIntegerError, UnknownCICError
from paho.mqtt import client as mqtt_client
from TheengsDecoder import decodeBLE

if TYPE_CHECKING:
from pathlib import Path

from bleak.backends.device import BLEDevice
from bleak.backends.scanner import AdvertisementData

from .decryption import UnsupportedEncryptionError, create_decryptor
from .diagnose import diagnostics

Expand Down Expand Up @@ -78,7 +81,7 @@

logger = logging.getLogger("BLEGateway")

DataJSONType = Dict[str, Union[str, int, float, bool]]
DataJSONType = dict[str, Union[str, int, float, bool]]


def get_address(data: DataJSONType) -> str:
Expand All @@ -91,7 +94,7 @@ def get_address(data: DataJSONType) -> str:

def add_manufacturer(
data: DataJSONType,
company_id: Optional[int],
company_id: int | None,
) -> None:
"""Add the name of the manufacturer based on the company ID."""
if company_id is not None:
Expand All @@ -104,11 +107,11 @@ class Gateway:

def __init__(
self,
configuration: Dict,
configuration: dict,
) -> None:
self.configuration = configuration
self.stopped = False
self.clock_updates: Dict[str, float] = {}
self.clock_updates: dict[str, float] = {}
self.published_messages = 0

def connect_mqtt(self) -> None:
Expand Down Expand Up @@ -364,26 +367,23 @@ async def ble_scan_loop(self) -> None:
logger.info("Starting BLE scan")
self.running = True
while not self.stopped:
try:
if self.client.is_connected():
self.published_messages = 0
await scanner.start()
await asyncio.sleep(self.configuration["ble_scan_time"])
await scanner.stop()
logger.info(
"Sent %s messages to MQTT",
self.published_messages,
)
await asyncio.sleep(
self.configuration["ble_time_between_scans"],
)
if self.client.is_connected():
self.published_messages = 0
await scanner.start()
await asyncio.sleep(self.configuration["ble_scan_time"])
await scanner.stop()
logger.info(
"Sent %s messages to MQTT",
self.published_messages,
)
await asyncio.sleep(
self.configuration["ble_time_between_scans"],
)

# Update time for all clocks once a day
await self.update_clock_times()
else:
await asyncio.sleep(5.0)
except Exception:
raise
# Update time for all clocks once a day
await self.update_clock_times()
else:
await asyncio.sleep(5.0)

logger.error("BLE scan loop stopped")
self.running = False
Expand All @@ -404,8 +404,10 @@ def detection_callback(

if advertisement_data.manufacturer_data:
# Only look at the first manufacturer data in the advertisement
company_id = list(advertisement_data.manufacturer_data.keys())[0]
manufacturer_data = list(
company_id = list( # noqa: RUF015
advertisement_data.manufacturer_data.keys(),
)[0]
manufacturer_data = list( # noqa: RUF015
advertisement_data.manufacturer_data.values(),
)[0]
dstr = str(struct.pack("<H", company_id).hex())
Expand Down Expand Up @@ -433,7 +435,7 @@ def detection_callback(
def decode_advertisement(
self,
data_json: DataJSONType,
company_id: Optional[int],
company_id: int | None,
) -> None:
"""Decode device from data JSON."""
decoded_json = decodeBLE(json.dumps(data_json))
Expand Down Expand Up @@ -551,7 +553,7 @@ def handle_encrypted_advertisement(
logger.exception("Decryption failed")


def run(configuration: Dict, config_path: Path) -> None:
def run(configuration: dict, config_path: Path) -> None:
"""Run BLE gateway."""
if configuration["discovery"]:
from .discovery import DiscoveryGateway
Expand Down
19 changes: 11 additions & 8 deletions TheengsGateway/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@
This module handles everything that has to do with configuration files and
command-line parameters for Theengs Gateway.
"""
from __future__ import annotations

import argparse
import json
from pathlib import Path
from typing import Dict

# Each configuration option is added to:
# - the DEFAULT_CONFIG dict with its default value
Expand Down Expand Up @@ -218,7 +219,7 @@ def parse_args() -> argparse.Namespace:
return parser.parse_args()


def read_configuration(config_path: Path) -> Dict:
def read_configuration(config_path: Path) -> dict:
"""Read a Theengs Gateway configuration from a file.
If the path doesn't exist, the function returns the default configuration.
Expand All @@ -229,13 +230,13 @@ def read_configuration(config_path: Path) -> Dict:
except OSError:
configuration = DEFAULT_CONFIG
except json.JSONDecodeError as exception:
msg = f"Malformed configuration file {config_path.resolve()}: {str(exception)}"
msg = f"Malformed configuration file {config_path.resolve()}: {exception}"
raise SystemExit(msg) from exception

return configuration


def write_configuration(configuration: Dict, config_path: Path) -> None:
def write_configuration(configuration: dict, config_path: Path) -> None:
"""Write a Theengs Gateway configuration to a file."""
try:
with config_path.open(encoding="utf-8", mode="w") as config_file:
Expand All @@ -247,7 +248,7 @@ def write_configuration(configuration: Dict, config_path: Path) -> None:
raise SystemExit(msg) from exception


def merge_args_with_config(config: Dict, args: argparse.Namespace) -> None:
def merge_args_with_config(config: dict, args: argparse.Namespace) -> None:
"""Merge command-line arguments into configuration.
Command-line arguments override the corresponding configuration if they are set.
Expand All @@ -261,8 +262,10 @@ def merge_args_with_config(config: Dict, args: argparse.Namespace) -> None:
dict(zip(value[::2], value[1::2])),
)
else:
for element in value:
if element not in config[key]:
config[key].append(element)
config[key].extend(
element
for element in value
if element not in config[key]
)
elif key != "config":
config[key] = value
27 changes: 14 additions & 13 deletions TheengsGateway/decryption.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@
This module adds decryptors for encrypted advertisements of a selection of
devices.
"""
from __future__ import annotations

import binascii
from abc import ABC, abstractmethod
from typing import Dict

from Cryptodome.Cipher import AES

Expand All @@ -14,32 +15,32 @@ class AdvertisementDecryptor(ABC):
"""Abstract class that represents a decryptor for advertisements."""

@abstractmethod
def compute_nonce(self, address: str, decoded_json: Dict) -> bytes:
def compute_nonce(self, address: str, decoded_json: dict) -> bytes:
"""Compute the nonce for a specific address and JSON input."""

@abstractmethod
def decrypt(
self,
bindkey: bytes,
address: str,
decoded_json: Dict,
decoded_json: dict,
) -> bytes:
"""Decrypt ciphertext from JSON input."""

@abstractmethod
def replace_encrypted_data(
self,
decrypted_data: bytes,
data_json: Dict,
decoded_json: Dict,
data_json: dict,
decoded_json: dict,
) -> None:
"""Replace the encrypted data by decrypted payload."""


class LYWSD03MMC_PVVXDecryptor(AdvertisementDecryptor): # noqa: N801
"""Class for decryption of LYWSD03MMX PVVX encrypted advertisements."""

def compute_nonce(self, address: str, decoded_json: Dict) -> bytes:
def compute_nonce(self, address: str, decoded_json: dict) -> bytes:
"""Compute the nonce for a specific address and JSON input."""
# The nonce consists of:
# 6 bytes: device address in reverse
Expand All @@ -62,7 +63,7 @@ def decrypt(
self,
bindkey: bytes,
address: str,
decoded_json: Dict,
decoded_json: dict,
) -> bytes:
"""Decrypt ciphertext from JSON input with AES CCM."""
nonce = self.compute_nonce(address, decoded_json)
Expand All @@ -75,8 +76,8 @@ def decrypt(
def replace_encrypted_data(
self,
decrypted_data: bytes,
data_json: Dict,
decoded_json: Dict, # noqa: ARG002
data_json: dict,
decoded_json: dict, # noqa: ARG002
) -> None:
"""Replace the encrypted data by decrypted payload."""
data_json["servicedata"] = decrypted_data.hex()
Expand All @@ -85,7 +86,7 @@ def replace_encrypted_data(
class BTHomeV2Decryptor(AdvertisementDecryptor):
"""Class for decryption of BTHome v2 encrypted advertisements."""

def compute_nonce(self, address: str, decoded_json: Dict) -> bytes:
def compute_nonce(self, address: str, decoded_json: dict) -> bytes:
"""Compute the nonce for a specific address and JSON input."""
# The nonce consists of:
# 6 bytes: device address
Expand All @@ -107,7 +108,7 @@ def decrypt(
self,
bindkey: bytes,
address: str,
decoded_json: Dict,
decoded_json: dict,
) -> bytes:
"""Decrypt ciphertext from JSON input with AES CCM."""
nonce = self.compute_nonce(address, decoded_json)
Expand All @@ -119,8 +120,8 @@ def decrypt(
def replace_encrypted_data(
self,
decrypted_data: bytes,
data_json: Dict,
decoded_json: Dict,
data_json: dict,
decoded_json: dict,
) -> None:
"""Replace the encrypted data by decrypted payload."""
# Clear encryption and MAC included bits in device info
Expand Down
14 changes: 8 additions & 6 deletions TheengsGateway/diagnose.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
This module can be run on the command line with python -m TheengsGateway.diagnose
to show diagnostic information for debugging purposes.
"""
from __future__ import annotations

import argparse
import asyncio
import json
Expand All @@ -18,7 +20,7 @@
_ADDR_RE = re.compile(r"^(([0-9A-F]{2}:){3})([0-9A-F]{2}:){2}[0-9A-F]{2}$")


def _anonymize_strings(fields: List[str], config: ConfigType) -> None:
def _anonymize_strings(fields: list[str], config: ConfigType) -> None:
for field in fields:
if field in config:
config[field] = "***"
Expand All @@ -31,17 +33,17 @@ def _anonymize_address(address: str) -> str:
return "INVALID ADDRESS"


def _anonymize_addresses(addresses: List[str]) -> List[str]:
def _anonymize_addresses(addresses: list[str]) -> list[str]:
return [_anonymize_address(address) for address in addresses]


def _anonymize_bindkeys(bindkeys: Dict[str, str]) -> Dict[str, str]:
def _anonymize_bindkeys(bindkeys: dict[str, str]) -> dict[str, str]:
"""Anonymize the addresses and bindkeys in a dictionary."""
return {_anonymize_address(address): "***" for address in bindkeys}


# This function is taken from Textual
def _section(title: str, values: Dict[str, str]) -> None:
def _section(title: str, values: dict[str, str]) -> None:
"""Print a collection of named values within a titled section."""
max_name = max(map(len, values.keys()))
max_value = max(map(len, [str(value) for value in values.values()]))
Expand All @@ -50,7 +52,7 @@ def _section(title: str, values: Dict[str, str]) -> None:
print(f"| {'Name':{max_name}} | {'Value':{max_value}} |")
print(f"|-{'-' * max_name}-|-{'-'*max_value}-|")
for name, value in values.items():
print(f"| {name:{max_name}} | {str(value):{max_value}} |")
print(f"| {name:{max_name}} | {value:{max_value}} |")
print()


Expand Down Expand Up @@ -132,7 +134,7 @@ def _config(config_path: Path) -> None:
print("Configuration file not found")
print()
except json.JSONDecodeError as exception:
print(f"Malformed JSON configuration file: {str(exception)}")
print(f"Malformed JSON configuration file: {exception}")
print()


Expand Down
Loading

0 comments on commit d38b4f1

Please sign in to comment.