diff --git a/src/emonhub.py b/src/emonhub.py index e9a5e1f..bb8bb8a 100755 --- a/src/emonhub.py +++ b/src/emonhub.py @@ -91,8 +91,8 @@ def __init__(self, setup): eha.auto_conf_enabled = self.autoconf.enabled except eha.EmonHubAutoConfError as e: logger.error(e) - sys.exit("Unable to load available.conf") - + self._exit = True # Exit process if main thread cannot start + def run(self): """Launch the hub. diff --git a/src/emonhub_auto_conf.py b/src/emonhub_auto_conf.py index 08b4dc0..02c415e 100644 --- a/src/emonhub_auto_conf.py +++ b/src/emonhub_auto_conf.py @@ -61,15 +61,18 @@ def __init__(self,settings): if self.enabled: self._log.debug("Automatic configuration of nodes enabled") + + + # Initialize attribute settings as a ConfigObj instance + try: + result = ConfigObj(filename, file_error=True) + self.available = self.prepare_available(result['available']) + except Exception as e: + raise EmonHubAutoConfError(e) + else: - self._log.debug("Automatic configuration of nodes disabled") - - # Initialize attribute settings as a ConfigObj instance - try: - result = ConfigObj(filename, file_error=True) - self.available = self.prepare_available(result['available']) - except Exception as e: - raise EmonHubAutoConfError(e) + self._log.debug("Automatic configuration of nodes disabled") + self.available = None def prepare_available(self,nodes): for n in nodes: diff --git a/src/emonhub_interfacer.py b/src/emonhub_interfacer.py index 42f5080..8dc873b 100644 --- a/src/emonhub_interfacer.py +++ b/src/emonhub_interfacer.py @@ -52,9 +52,7 @@ def __init__(self, name): 'nodeoffset': '0', 'pubchannels': [], 'subchannels': [], - 'batchsize': '1', - 'nodelistonly': False - } + 'batchsize': '1'} self.init_settings = {} self._settings = {} @@ -90,6 +88,22 @@ def __init__(self, name): self.missed = {} self.rx_msg = {} + def processRxc(self, rxc): + if rxc: + rxc = self._process_rx(rxc) + if rxc: + for channel in self._settings["pubchannels"]: + self._log.debug("%d Sent to channel(start)' : %s", rxc.uri, channel) + + # Initialise channel if needed + if channel not in self._pub_channels: + self._pub_channels[channel] = [] + + # Add cargo item to channel + self._pub_channels[channel].append(rxc) + + self._log.debug("%d Sent to channel(end)' : %s", rxc.uri, channel) + @log_exceptions_from_class_method def run(self): """ @@ -97,27 +111,31 @@ def run(self): Any regularly performed tasks actioned here along with passing received values """ + self._log.info("self: {}".format(self.name)) + + self._log.info("subs:") + for item in self._settings["subchannels"]: + self._log.info("sub: {}".format(item)) + + self._log.info("pubs:") + for item in self._settings["pubchannels"]: + self._log.info("pub: {}".format(item)) + while not self.stop: # Only read if there is a pub channel defined for the interfacer if len(self._settings["pubchannels"]): # Read the input and process data if available - rxc = self.read() - if rxc: - rxc = self._process_rx(rxc) - if rxc: - for channel in self._settings["pubchannels"]: - self._log.debug("%d Sent to channel(start)' : %s", rxc.uri, channel) - - # Initialise channel if needed - if channel not in self._pub_channels: - self._pub_channels[channel] = [] - - # Add cargo item to channel - self._pub_channels[channel].append(rxc) - - self._log.debug("%d Sent to channel(end)' : %s", rxc.uri, channel) - + result = self.read() + if isinstance(result, list): + for rxc in result: + self.processRxc(rxc) + elif isinstance(result, dict): + for rxc in result: + self.processRxc(result[rxc]) + else: + self.processRxc(result) + # Subscriber channels for channel in self._settings["subchannels"]: if channel in self._sub_channels: @@ -289,13 +307,7 @@ def _process_rx(self, cargo): if 'nodeids' in ehc.nodelist[node]: del ehc.nodelist[node]['nodeids'] if 'datalength' in ehc.nodelist[node]: - del ehc.nodelist[node]['datalength'] - - - # If not in nodelist and pass through disabled return false - if node not in ehc.nodelist and self._settings['nodelistonly']: - self._log.warning("%d Discarded RX frame not in nodelist, node:%s, length:%s bytes", cargo.uri, node, len(rxc.realdata)) - return False + del ehc.nodelist[node]['datalength'] # Data whitening uses for ensuring rfm sync if node in ehc.nodelist and 'rx' in ehc.nodelist[node] and 'whitening' in ehc.nodelist[node]['rx']: @@ -636,8 +648,6 @@ def set(self, **kwargs): setting = str(setting).lower() == "true" elif key == 'targeted' and str(setting).lower() in ['true', 'false']: setting = str(setting).lower() == "true" - elif key == 'nodelistonly' and str(setting).lower() in ['true', 'false','1','0']: - setting = str(setting).lower() == "true" or str(setting).lower() == "1" elif key == 'pubchannels': pass elif key == 'subchannels': diff --git a/src/interfacers/EmonHubMBUSInterfacer.py b/src/interfacers/EmonHubMBUSInterfacer.py index e7bd151..e5b27e0 100644 --- a/src/interfacers/EmonHubMBUSInterfacer.py +++ b/src/interfacers/EmonHubMBUSInterfacer.py @@ -35,7 +35,7 @@ class EmonHubMBUSInterfacer(EmonHubInterfacer): - def __init__(self, name, device="/dev/ttyUSB0", device_vid=False, device_pid=False, baud=2400, use_meterbus_lib=False): + def __init__(self, name, device="/dev/ttyUSB0", device_vid=False, device_pid=False, baud=2400, use_meterbus_lib=True): """Initialize Interfacer """ @@ -63,21 +63,41 @@ def __init__(self, name, device="/dev/ttyUSB0", device_vid=False, device_pid=Fal self.invalid_count = 0 # Only load module if it is installed - self.connect() - - # If use_meterbus_lib is true, try to load module - # pip3 install pyMeterBus - self.use_meterbus_lib = False - if use_meterbus_lib: - try: - from pyMeterBus import meterbus - self.meterbus = meterbus + + try: + # If we need a socket connection, use meterbus_lib + # pip3 install pyMeterBus + if (device.index("socket://")>=0): + self._log.info("Connecting using meterbus_lib:" + device) + self.ser=serial.serial_for_url(device, str(baud), 8, 'E', 1, timeout=1) self.use_meterbus_lib = True - except ModuleNotFoundError as err: - self._log.error(err) - self.use_meterbus_lib = False + if use_meterbus_lib: + try: + self._log.info("importing mertbus_lib") + import meterbus + self.meterbus = meterbus + self.use_meterbus_lib = True + except ModuleNotFoundError as err: + self._log.error(err) + self.use_meterbus_lib = False + else: + self.connect() + + + + if self.ping_address(self.ser, 1, 3): + self._log.info("ok ping") + else: + print("no reply") + self._log.info("set") + self._log.info(self.ser) + except ModuleNotFoundError as err: + self._log.error("================================"); + self._log.error(err) + self.ser = False + def connect(self): """Connect to MBUS @@ -117,6 +137,7 @@ def connect(self): # if device is still False, log error and return False if not device: + self._log.error("================================"); self._log.error("Could not find MBUS device") self.ser = False return False @@ -125,13 +146,30 @@ def connect(self): self._log.debug("Connecting to MBUS serial: " + device + " " + str(self.baud)) self.ser = serial.Serial(device, self.baud, 8, 'E', 1, 0.5) except Exception: + self._log.error("================================"); self._log.error("Could not connect to MBUS serial") self.ser = False + + + def ping_address(self, ser, address, retries=5, read_echo=False): + for i in range(0, retries + 1): + self.meterbus.send_ping_frame(ser, address, read_echo) + try: + frame = self.meterbus.load(self.meterbus.recv_frame(ser, 1)) + if isinstance(frame, self.meterbus.TelegramACK): + return True + except self.meterbus.MBusFrameDecodeError as e: + pass + + time.sleep(0.5) + + return False def mbus_serial_write(self,data): try: self.ser.write(data) except Exception: + self._log.error("================================"); self.ser = False self._log.error("Could not write to MBUS serial port") @@ -199,6 +237,7 @@ def set_page(self, address, page): else: time.sleep(0.2) except Exception: + self._log.error("================================"); self.ser = False self._log.error("set_page could not read from serial port") @@ -228,7 +267,7 @@ def decodeInt(self,bytes): return struct.unpack("i", bytearray(bytes))[0] return False - def parse_frame(self,data,records): + def parse_frame(self,meter, data,records): data_types = ['null','int','int','int','int','float','int','int','null','bcd','bcd','bcd','bcd','var','bcd','null'] data_lengths = [0,1,2,3,4,4,6,8,0,1,2,3,4,6,6,0] vif = { @@ -442,42 +481,50 @@ def parse_frame(self,data,records): return result - def parse_frame_meterbus_lib(self,data,records): + def parse_frame_meterbus_lib(self,meter, data,records): + self._log.debug("parse_frame_meterbus_lib"); telegram = self.meterbus.load(data) meterbus_obj = json.loads(telegram.to_JSON()) result = {} + idx = 0; for record in meterbus_obj['body']['records']: if type(record['value'])==int or type(record['value'])==float: name = record['type'].replace('VIFUnit.','').replace('VIFUnitExt.','').lower() + if name in result: + name = name + str(idx) + value = record['value'] unit = record['unit'].replace('MeasureUnit.','') result[name] = [value, unit] - + idx = idx+1 return result - def request_data(self, address, records): + def request_data(self, meter, address, records): for i in range(0,2): - self.mbus_short_frame(address, 0x5b) + if self.use_meterbus_lib: + self.meterbus.send_request_frame(self.ser, address) + else: + self.mbus_short_frame(address, 0x5b) # time.sleep(1.0) - result = self.read_data_frame(records) + result = self.read_data_frame(meter, records) if result!=None: return result else: time.sleep(0.2) - def request_data_sdm120(self, address, records): + def request_data_sdm120(self, meter, address, records): for i in range(0,2): self.mbus_request_sdm120(address) # time.sleep(1.0) - result = self.read_data_frame(records) + result = self.read_data_frame(meter, records) if result!=None: return result else: time.sleep(0.2) - def read_data_frame(self,records): + def read_data_frame(self,meter, records): data = [] bid = 0 bid_end = 255 @@ -529,13 +576,14 @@ def read_data_frame(self,records): if valid: # Parse frame if still valid if self.use_meterbus_lib: - return self.parse_frame_meterbus_lib(data,records) + return self.parse_frame_meterbus_lib(meter, data,records) else: - return self.parse_frame(data,records) + return self.parse_frame(meter, data,records) bid += 1 time.sleep(0.1) except Exception: + self._log.error("================================"); self.ser = False self._log.error("read_data_frame could not read from serial port") # If we are here data response is corrupt @@ -547,15 +595,33 @@ def read_data_frame(self,records): if self.invalid_count>=10: # Reset invalid count self.invalid_count = 0 + self._log.error("================================"); self._log.debug("Invalid count = 10. Restarting MBUS serial connection on next read") self.ser = False - def add_result_to_cargo(self,meter,c,result): + def add_result_to_cargo(self,meter,nodesName,c,result): if result != None: - self._log.debug("Decoded MBUS data: " + json.dumps(result)) - + self._log.info("Decoded MBUS data: " + json.dumps(result)) + + self._log.error("nodesName:" + ",".join(nodesName)); + nodesNameHash = {} + for nameTranslator in nodesName: + self._log.error("nameTranslator:" + nameTranslator); + nameTranslatorPart = nameTranslator.split(':') + nodesNameHash[nameTranslatorPart[0]]=nameTranslatorPart[1] + self._log.error(nameTranslatorPart[0] + " <> " + nameTranslatorPart[1]); + + + for key in result: - c.names.append(meter+"_"+key) + self._log.error("key1:" + key); + key1=key + if key in nodesNameHash: + key1 = nodesNameHash[key] + + self._log.error("key2:" + key); + + c.names.append(key1+"_"+meter) c.realdata.append(result[key][0]) c.units.append(result[key][1]) else: @@ -576,59 +642,73 @@ def read(self): self.next_interval = False if not self.ser: - self.connect() - - c = Cargo.new_cargo() - c.names = [] - c.realdata = [] - c.units = [] - c.nodeid = self._settings['nodename'] + try: + if use_meterbus_lib: + self._log.info("Connecting using meterbus_lib:" + device) + self.ser=serial.serial_for_url(device, str(baud), 8, 'E', 1, timeout=1) + else: + self.connect() + except Exception: + self._log.error("================================"); + self._log.error("Could not connect to MBUS serial") + self.ser = False + res = [] + # Support for multiple MBUS meters on a single bus for meter in self._settings['meters']: + c = Cargo.new_cargo() + c.names = [] + c.realdata = [] + c.units = [] address = self._settings['meters'][meter]['address'] meter_type = self._settings['meters'][meter]['type'] - + if not self._settings['nodename']: + c.nodeid = meter + else: + c.nodeid = self._settings['nodename'] + + meterPrefix = self._settings['meters'][meter]['name']; + nodesName = self._settings['meters'][meter]['nodesName']; + res.append(c) + # Most mbus meters use standard request, page 0 or default, all records if meter_type=="standard": - result = self.request_data(address,[]) - self.add_result_to_cargo(meter,c,result) + result = self.request_data(meter, address,[]) + self.add_result_to_cargo(meterPrefix, nodesName, c,result) # Qalcosonic E3 if meter_type=="qalcosonic_e3": - result = self.request_data(address,[4,5,6,7,8,9,10,11,12,13,14,15]) - self.add_result_to_cargo(meter,c,result) + result = self.request_data(meter, address,[4,5,6,7,8,9,10,11,12,13,14,15]) + self.add_result_to_cargo(meterPrefix,nodesName,c,result) # ------------------------------------------------------ # Sontex Multical 531 if meter_type=="sontex531": # p1 self.set_page(address, 1) - result = self.request_data(address,[4,5]) - self.add_result_to_cargo(meter,c,result) + result = self.request_data(meter, address,[4,5]) + self.add_result_to_cargo(meterPrefix,nodesName,c,result) # p3 self.set_page(address, 3) - result = self.request_data(address,[1,2,3,4]) - self.add_result_to_cargo(meter,c,result) + result = self.request_data(meter, address,[1,2,3,4]) + self.add_result_to_cargo(meterPrefix,nodesName,c,result) # SDM120 special request command elif meter_type=="sdm120": # 1. Get energy data - result = self.request_data(address,[1]) - self.add_result_to_cargo(meter,c,result) + result = self.request_data(meter, address,[1]) + self.add_result_to_cargo(meterPrefix,nodesName,c,result) # 2. Get instantaneous data - result = self.request_data_sdm120(address,[1,7,11,23]) - self.add_result_to_cargo(meter,c,result) + result = self.request_data_sdm120(meter, address,[1,7,11,23]) + self.add_result_to_cargo(meterPrefix,nodesName,c,result) elif meter_type=="kamstrup403": - result = self.request_data(address,[1,4,7,8,9,10,11,12,14]) - self.add_result_to_cargo(meter,c,result) + result = self.request_data(meter, address,[1,4,7,8,9,10,11,12,14]) + self.add_result_to_cargo(meterPrefix,nodesName,c,result) # ------------------------------------------------------ - - - if len(c.realdata) > 0: - return c + return res else: self.next_interval = True @@ -666,12 +746,17 @@ def set(self, **kwargs): for meter in setting: # default address = 1 + name="" meter_type = "standard" records = [] # address if 'address' in setting[meter]: address = int(setting[meter]['address']) + if 'name' in setting[meter]: + name = setting[meter]['name'] + if 'nodesName' in setting[meter]: + nodesName = setting[meter]['nodesName'] # type e.g sdm if 'type' in setting[meter]: meter_type = str(setting[meter]['type']) @@ -679,6 +764,8 @@ def set(self, **kwargs): self._settings['meters'][meter] = { 'address':address, 'type':meter_type, + 'name':name, + 'nodesName':nodesName } continue else: @@ -686,5 +773,3 @@ def set(self, **kwargs): # include kwargs from parent super().set(**kwargs) - -