Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplify configuration and arguments merging #158

Merged
merged 1 commit into from
Aug 4, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
152 changes: 33 additions & 119 deletions TheengsGateway/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import json
import sys
from pathlib import Path
from typing import Dict

from .ble_gateway import run

Expand Down Expand Up @@ -55,173 +56,148 @@
}


def main() -> None:
"""Main entry point of the TheengsGateway program."""
def parse_args() -> argparse.Namespace:
"""Parse command-line arguments and return them."""
parser = argparse.ArgumentParser()
parser.add_argument(
"-H",
"--host",
dest="host",
type=str,
help="MQTT host address",
)
parser.add_argument(
"-P",
"--port",
dest="port",
type=int,
help="MQTT host port",
)
parser.add_argument(
"-u",
"--user",
dest="user",
type=str,
help="MQTT username",
)
parser.add_argument(
"-p",
"--pass",
dest="pwd",
type=str,
help="MQTT password",
)
parser.add_argument(
"-pt",
"--pub_topic",
dest="pub_topic",
type=str,
help="MQTT publish topic",
)
parser.add_argument(
"-st",
"--sub_topic",
dest="sub_topic",
type=str,
help="MQTT subscribe topic",
)
parser.add_argument(
"-Lt",
"--lwt_topic",
dest="lwt_topic",
type=str,
help="MQTT LWT topic",
)
parser.add_argument(
"-prt",
"--presence_topic",
dest="presence_topic",
type=str,
help="MQTT presence topic",
)
parser.add_argument(
"-pr",
"--presence",
dest="presence",
type=int,
help="Enable (1) or disable (0) presence publication (default: 1)",
)
parser.add_argument(
"-pa",
"--publish_all",
dest="publish_all",
type=int,
help="Publish all (1) or only decoded (0) advertisements (default: 1)",
)
parser.add_argument(
"-sd",
"--scan_duration",
dest="scan_dur",
type=int,
help="BLE scan duration (seconds)",
)
parser.add_argument(
"-tb",
"--time_between",
dest="time_between",
type=int,
help="Seconds to wait between scans",
)
parser.add_argument(
"-ll",
"--log_level",
dest="log_level",
type=str,
help="TheengsGateway log level",
choices=["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"],
)
parser.add_argument(
"-Dt",
"--discovery-topic",
dest="discovery_topic",
type=str,
help="MQTT Discovery topic",
)
parser.add_argument(
"-D",
"--discovery",
dest="discovery",
type=int,
help="Enable(1) or disable(0) MQTT discovery",
)
parser.add_argument(
"-Dh",
"--hass_discovery",
dest="hass_discovery",
type=int,
help="Enable(1) or disable(0) Home Assistant MQTT discovery "
"(default: 1)",
)
parser.add_argument(
"-Dn",
"--discovery_name",
dest="discovery_device_name",
type=str,
help="Device name for Home Assistant",
)
parser.add_argument(
"-Df",
"--discovery_filter",
dest="discovery_filter",
nargs="+",
default=[],
help="Device discovery filter list for Home Assistant",
)
parser.add_argument(
"-a",
"--adapter",
dest="adapter",
type=str,
help="Bluetooth adapter (e.g. hci1 on Linux)",
)
parser.add_argument(
"-s",
"--scanning_mode",
dest="scanning_mode",
type=str,
choices=("active", "passive"),
help="Scanning mode (default: active)",
)
parser.add_argument(
"-ts",
"--time_sync",
dest="time_sync",
nargs="+",
default=[],
help="Addresses of Bluetooth devices to synchronize the time",
)
parser.add_argument(
"-tf",
"--time_format",
dest="time_format",
type=int,
help="Use 12-hour (1) or 24-hour (0) time format for clocks "
"(default: 0)",
)
parser.add_argument(
"-padv",
"--publish_advdata",
dest="publish_advdata",
type=int,
help="Publish advertising and advanced data (1) or not (0) (default: 0)",
)
Expand All @@ -230,24 +206,45 @@ def main() -> None:
"--config",
dest="conf_path",
type=str,
default=str(Path("~/theengsgw.conf").expanduser()),
help="Path to the configuration file (default: ~/theengsgw.conf)",
)
parser.add_argument(
"-bk",
"--bindkeys",
nargs="+",
metavar=("ADDRESS", "BINDKEY"),
dest="bindkeys",
default={},
help="Device addresses and their bindkeys: ADDR1 KEY1 ADDR2 KEY2",
)

args = parser.parse_args()
return parser.parse_args()


def merge_args_with_config(config: Dict, args: argparse.Namespace) -> None:
"""Merge command-line arguments into configuration.

Command-line arguments override the corresponding configuration if they are set.
Lists and dicts are merged.
"""
for key, value in args.__dict__.items():
if value is not None:
if isinstance(value, list):
if key == "bindkeys":
config[key].update(
dict(zip(value[::2], value[1::2])),
)
else:
for element in value:
if element not in config[key]:
config[key].append(element)
else:
config[key] = value

if args.conf_path:
conf_path = Path(args.conf_path)
else:
conf_path = Path("~/theengsgw.conf").expanduser()

def main() -> None:
"""Main entry point of the TheengsGateway program."""
args = parse_args()
conf_path = Path(args.conf_path)

try:
with conf_path.open(encoding="utf-8") as config_file:
Expand All @@ -260,93 +257,10 @@ def main() -> None:
# This guarantees that all keys we refer to are in the dictionary.
config = {**default_config, **config}

if args.host:
config["host"] = args.host
if args.port:
config["port"] = args.port
if args.user:
config["user"] = args.user
if args.pwd:
config["pass"] = args.pwd
if args.pub_topic:
config["publish_topic"] = args.pub_topic
if args.sub_topic:
config["subscribe_topic"] = args.sub_topic
if args.lwt_topic:
config["lwt_topic"] = args.lwt_topic
if args.presence_topic:
config["presence_topic"] = args.presence_topic
if args.presence is not None:
config["presence"] = args.presence
if args.publish_all is not None:
config["publish_all"] = args.publish_all
if args.scan_dur:
config["ble_scan_time"] = args.scan_dur
if args.time_between:
config["ble_time_between_scans"] = args.time_between
if args.log_level:
config["log_level"] = args.log_level

if args.discovery is not None:
config["discovery"] = args.discovery
elif "discovery" not in config.keys():
config["discovery"] = default_config["discovery"]
config["discovery_topic"] = default_config["discovery_topic"]
config["discovery_device_name"] = default_config[
"discovery_device_name"
]
config["discovery_filter"] = default_config["discovery_filter"]

if args.hass_discovery is not None:
config["hass_discovery"] = args.hass_discovery

if args.discovery_topic:
config["discovery_topic"] = args.discovery_topic
elif "discovery_topic" not in config.keys():
config["discovery_topic"] = default_config["discovery_topic"]

if args.discovery_device_name:
config["discovery_device_name"] = args.discovery_device_name
elif "discovery_device_name" not in config.keys():
config["discovery_device_name"] = default_config[
"discovery_device_name"
]

if args.discovery_filter:
config["discovery_filter"] = default_config["discovery_filter"]
if args.discovery_filter[0] != "reset":
for item in args.discovery_filter:
config["discovery_filter"].append(item)
elif "discovery_filter" not in config.keys():
config["discovery_filter"] = default_config["discovery_filter"]

if args.adapter:
config["adapter"] = args.adapter

if args.scanning_mode:
config["scanning_mode"] = args.scanning_mode

if args.time_sync:
config["time_sync"] = default_config["time_sync"]
if args.time_sync[0] != "reset":
for item in args.time_sync:
config["time_sync"].append(item)
elif "time_sync" not in config.keys():
config["time_sync"] = default_config["time_sync"]

if args.time_format is not None:
config["time_format"] = args.time_format

if args.publish_advdata is not None:
config["publish_advdata"] = args.publish_advdata

if args.bindkeys:
config["bindkeys"].update(
dict(zip(args.bindkeys[::2], args.bindkeys[1::2])),
)
merge_args_with_config(config, args)

if not config["host"]:
sys.exit("Invalid MQTT host")
sys.exit("MQTT host is not specified")

try:
with conf_path.open(mode="w", encoding="utf-8") as config_file:
Expand Down