Source code for pyebus.msgdef

"""Message Defintions."""
import collections
import copy

from anytree import NodeMixin

from .const import AUTO
from .util import repr_

_MsgDef = collections.namedtuple("_MsgDef", "circuit name read prio write update setprio")


[docs]class MsgDef(_MsgDef, NodeMixin): """ Message Definition. A message is defined by `circuit` and `name` and has a fields Args: circuit (str): Circuit Name name (str): Message Name children (tuple): Field definitions. :any:`FieldDef` and :any:`VirtFieldDef`. Keyword Args: read (bool): Message intend to be read prio (int): Message Polling Priority on bus. write (bool): Message intend to be written updated (bool): Message intent to be seen automatically on every value change setprio: Message polling priority to be set. Integer `1-9 or `A` for automatic. >>> from pyebus import MsgDef, types >>> m = MsgDef('circuit', 'name', [ ... FieldDef(0, 'temp', types.IntType(-127, 128)), ... FieldDef(1, 'name', types.StrType(10)), ... VirtFieldDef('virt', types.StrType(), lambda msg: f'{msg.name}: {msg.temp}'), ... ], read=True) >>> m # doctest: +ELLIPSIS MsgDef('circuit', 'name', (FieldDef(0, 'temp', IntType(... VirtFieldDef('virt', StrType())), read=True) >>> m.ident 'circuit/name' >>> m.children (FieldDef(0, 'temp', IntType(-127, 128)), FieldDef(1, 'name', StrType(length=10)), VirtFieldDef('virt', StrType())) >>> m.fields (FieldDef(0, 'temp', IntType(-127, 128)), FieldDef(1, 'name', StrType(length=10))) >>> m.virtfields (VirtFieldDef('virt', StrType()),) >>> m.access 'r----' :any:`MsgDef`, :any:`FieldDef` and :any:`VirtFieldDef` form a tree structure >>> from anytree import RenderTree >>> print(RenderTree(m)) # doctest: +ELLIPSIS MsgDef('circuit', 'name', (FieldDef(0, 'temp', IntType(-127, 128)),..., StrType())), read=True) ├── FieldDef(0, 'temp', IntType(-127, 128)) ├── FieldDef(1, 'name', StrType(length=10)) └── VirtFieldDef('virt', StrType()) Similar object can be easily created by: >>> m.replace(children=m.fields[1:2]) MsgDef('circuit', 'name', (FieldDef(1, 'name', StrType(length=10)),), read=True) """ __slots__ = tuple() def __new__(cls, circuit, name, children, read=False, prio=None, write=False, update=False, setprio=None): if not read: prio = None msgdef = _MsgDef.__new__(cls, circuit, name, read, prio, write, update, setprio) if children: msgdef.children = children return msgdef def __repr__(self): args = (self.circuit, self.name, self.children) kwargs = [ ("read", self.read, False), ("prio", self.prio, None), ("write", self.write, False), ("update", self.update, False), ("setprio", self.setprio, None), ] return repr_(self, args, kwargs) def __hash__(self): return hash( (self.circuit, self.name, self.children, self.read, self.prio, self.write, self.update, self.setprio) ) def __eq__(self, other): if self.__class__ is other.__class__: return ( self.circuit, self.name, self.children, self.read, self.prio, self.write, self.update, self.setprio, ) == ( other.circuit, other.name, other.children, other.read, other.prio, other.write, other.update, other.setprio, ) return NotImplemented @property def fields(self): """Fields.""" return tuple(child for child in self.children if isinstance(child, FieldDef)) @property def virtfields(self): """Calulated Fields.""" return tuple(child for child in self.children if isinstance(child, VirtFieldDef)) @property def ident(self): """Identifier.""" return f"{self.circuit}/{self.name}" @property def access(self): """Message Access.""" read = "r" if self.read else "-" write = "w" if self.write else "-" update = "u" if self.update else "-" setprio = str(self.setprio) if self.setprio else "-" prio = str(self.prio) if self.prio else "-" return "".join((read, write, update, setprio, prio))
[docs] def join(self, msgdef): """Return Joined Message Definition.""" if (self.circuit, self.name, self.children) == (msgdef.circuit, msgdef.name, msgdef.children): return self.replace( read=self.read or msgdef.read, prio=self.prio or msgdef.prio, write=self.write or msgdef.write, update=self.update or msgdef.update, setprio=self.setprio or msgdef.setprio, ) return None
[docs] def replace(self, **kwargs): """Create copy with updated attributes.""" attrs = self._asdict() attrs["children"] = self.children attrs.update(kwargs) # Take a copy of all children, to ensure proper tree relation attrs["children"] = tuple(copy.copy(fielddef) for fielddef in attrs["children"]) return MsgDef(**attrs)
_FieldDef = collections.namedtuple("_FieldDef", "idx name type_ unit comment")
[docs]class AbstractFieldDef(_FieldDef, NodeMixin): """ Abstract Field Definition. Args: idx (str): Index within Message name (str): Unique name (as `name` may be used multiple times by ebus) type_ (Type): Type Keyword Args: unit (str): Unit of the field value comment (str): Comment. """ __slots__ = tuple() def __new__(cls, idx, name, type_, unit=None, comment=None): return _FieldDef.__new__(cls, idx, name, type_, unit or None, comment or None) def __repr__(self): args = (self.idx, self.name, self.type_) kwargs = [ ("unit", self.unit, None), ("comment", self.comment, None), ] return repr_(self, args, kwargs) def _pre_detach(self, parent): # it is forbidden to remove fields from their message - create new one assert False, f"{self!r} is already used by {parent!r}" # pragma: no cover @property def msgdef(self): """Message Definition.""" return self.parent @property def ident(self): """Identifier.""" return f"{self.msgdef.ident}/{self.name}" if self.msgdef else None def __copy__(self): return FieldDef(idx=self.idx, name=self.name, type_=self.type_, unit=self.unit, comment=self.comment)
[docs]class FieldDef(AbstractFieldDef): """ Field Definition. Args: idx (str): Index within Message name (str): Unique name (as `name` may be used multiple times by ebus) type_ (Type): Type Keywords Args: unit (str): Unit of the field value comment (str): Comment. """
[docs]class VirtFieldDef(AbstractFieldDef): """ Virtual Field Definition. Args: name (str): Unique name (as `name` may be used multiple times by ebus) type_ (Type): Type func: Function to create value. Keywords Args: unit (str): Unit of the field value comment (str): Comment. """ def __new__(cls, name, type_, func, unit=None, comment=None): obj = AbstractFieldDef.__new__(cls, None, name, type_, unit or None, comment or None) obj.func = func return obj def __repr__(self): return repr_(self, (self.name, self.type_)) def __copy__(self): # pylint: disable=E1101 return VirtFieldDef(name=self.name, type_=self.type_, func=self.func, unit=self.unit, comment=self.comment)
[docs]def resolve_prio(msgdef, setprio=AUTO): """ Resolve priority specification. Integer values are just passed through. >>> resolve_prio(MsgDef('circuit', 'name', [], read=True), 1) 1 >>> resolve_prio(MsgDef('circuit', 'name', [], read=True), 9) 9 The AUTO option, sets read-only messages to prio 1. >>> resolve_prio(MsgDef('circuit', 'name', [], read=True), AUTO) 1 All other readable messages get prio 2 >>> resolve_prio(MsgDef('circuit', 'name', [], read=True, write=True), AUTO) 2 >>> resolve_prio(MsgDef('circuit', 'name', [], read=True, update=True), AUTO) 2 Non-readable messages have no prio. >>> resolve_prio(MsgDef('circuit', 'name', [], write=True), AUTO) """ if msgdef.read: if setprio == AUTO: # Prio is only available on read messages. if msgdef.update or msgdef.write: # we just want to ensure that we do not miss any value setprio = 2 else: setprio = 1 return setprio return None