Source code for pyebus.msgdefs

"""Message Defintions."""
import collections
import re
from fnmatch import fnmatch

from .const import AUTO
from .msgdef import resolve_prio

_RE_RESOLVE = re.compile(r"\A([^/#+]+)/([^/#+]+)(#(\d|A))?(/([^/#]*))?\Z")


[docs]class Pattern(collections.namedtuple("Pattern", "circuit name setprio fieldname")): """Message Definition and Field Definition Search Pattern."""
[docs] @staticmethod def from_str(pattern): """Create :any:`Pattern` from `pattern` string.""" mat = _RE_RESOLVE.fullmatch(pattern) if mat: circuit, name, _, setprio, _, fieldname = mat.groups() return Pattern(circuit, name, setprio, fieldname) raise ValueError(f"Invalid pattern {pattern!r}")
[docs]class MsgDefs: """ Message Definitions Container. >>> from .msgdef import MsgDef, FieldDef >>> from .types import Type >>> msgdefs = MsgDefs() >>> msgdefs.add(MsgDef('mc', 'Status0a', ( ... FieldDef(0, 'temp', Type(), '°C'), ... FieldDef(1, 'mixer', Type(), None), ... FieldDef(2, 'onoff-0', Type(), None), ... FieldDef(3, 'onoff-1', Type(), None), ... FieldDef(4, 'temp0', Type(), '°C'), ... ), read=True)) >>> msgdefs.add(MsgDef('hc', 'Status0', ( ... FieldDef(0, 'temp', Type(), '°C'), ... FieldDef(1, 'temp0', Type(), '°C'), ... ), read=True)) >>> msgdefs.get('mc', 'Status0a') MsgDef('mc', 'Status0a', (FieldDef(0, 'temp', ...'°C'), FieldDef(1, 'mixer', ..., unit='°C')), read=True) >>> list(msgdefs) [MsgDef('mc', 'Status0a', (FieldDef(0, 'temp', ...'°C'), FieldDef(1, 'mixer', ..., unit='°C')), read=True)] >>> msgdefs.summary() '2 messages (2 read, 0 update, 0 write) with 7 fields' """ def __init__(self): self.clear()
[docs] def clear(self): """Remove All Stored Message Definitions.""" self._msgdefs = collections.defaultdict(lambda: collections.defaultdict(list))
[docs] def add(self, msgdef): """Add Message Definition.""" msgdefs = self._msgdefs[msgdef.circuit][msgdef.name] for idx, msgdef0 in enumerate(msgdefs): joined = msgdef0.join(msgdef) if joined is not None: msgdefs[idx] = joined break else: msgdefs.append(msgdef)
[docs] def get(self, circuit, name): """ Get message with `circuit` and `name`. Returns MsgDef: Message Definition """ msgdefs = self._msgdefs if circuit in msgdefs: circuitmsgdefs = msgdefs[circuit] if name in circuitmsgdefs: return circuitmsgdefs[name][0] return None
[docs] def get_ident(self, ident): """ Get message with `ident`. Returns MsgDef: Message Definition """ pat = Pattern.from_str(ident) return self.get(pat.circuit, pat.name)
[docs] def find(self, circuit, name="*"): """ Find all message definitions matching `circuit` and `name`, wildcards and placeholder are accepted. Returns: MsgDefs: Message Definitions """ msgdefs = MsgDefs() circuit = circuit.lower() name = name.lower() for msgdef in self: if fnmatch(msgdef.circuit.lower(), circuit) and fnmatch(msgdef.name.lower(), name): msgdefs.add(msgdef) return msgdefs
[docs] def resolve(self, patterns, filter_=None): """ Resolve patterns and filter message definitions. Returns: MsgDefs: Message Definitions """ if isinstance(patterns, str): patterns = patterns.split(";") msgdefs = MsgDefs() for pattern in patterns: for msgdef in self._resolve(pattern.strip()): if msgdef not in msgdefs and (filter_ is None or filter_(msgdef)): msgdefs.add(msgdef) return msgdefs
def _resolve(self, pattern): pat = Pattern.from_str(pattern) circuit, name, setprio, fieldname = pat for msgdef in self.find(circuit, name): if fieldname is None: fields = msgdef.children else: fields = tuple( fielddef for fielddef in msgdef.children if fnmatch(fielddef.name.lower(), fieldname.lower()) ) if not fields: continue if fields == msgdef.children and (setprio is None or not msgdef.read): yield msgdef else: setprio = setprio if setprio in (AUTO, None) else int(setprio) yield msgdef.replace(children=fields, setprio=setprio)
[docs] def set_defaultprio(self, defaultprio): """Set Priorities of all messages without a priority value.""" msgdefs = self._msgdefs for circuitmsgdefs in msgdefs.values(): for msgdefs in circuitmsgdefs.values(): for idx, msgdef in enumerate(msgdefs): if msgdef.setprio is None: msgdefs[idx] = msgdef.replace(setprio=resolve_prio(msgdef, msgdef.setprio or defaultprio))
[docs] def summary(self): """ Summary. Returns: str: Summary. """ total = len(self) fields = sum(len(msgdef.children) for msgdef in self) read = sum(1 for msgdef in self if msgdef.read) update = sum(1 for msgdef in self if msgdef.update) write = sum(1 for msgdef in self if msgdef.write) return f"{total} messages ({read} read, {update} update, {write} write) with {fields} fields"
def __iter__(self): for circuitmsgdefs in self._msgdefs.values(): for msgdefs in circuitmsgdefs.values(): yield from msgdefs def __len__(self): return sum( sum(len(msgdefs) for msgdefs in circuitmsgdefs.values()) for circuitmsgdefs in self._msgdefs.values() ) def __add__(self, other): if self.__class__ is other.__class__: msgdefs = MsgDefs() for msgdef in self: msgdefs.add(msgdef) for msgdef in other: msgdefs.add(msgdef) return msgdefs return NotImplemented def __sub__(self, other): if self.__class__ is other.__class__: msgdefs = MsgDefs() othermsgs = tuple(other) for msgdef in self: if msgdef not in othermsgs: msgdefs.add(msgdef) return msgdefs return NotImplemented