Source code for pyebus.dummyconnection

"""EBUS Connection Handling."""
import collections
import logging

from .connection import CommandError, Connection, Shutdown
from .const import DEFAULT_HOST, DEFAULT_PORT, DEFAULT_TIMEOUT
from .dummy import Dummy

_LOGGER = logging.getLogger(__name__)


[docs]class DummyConnection(Connection, Dummy): """ Dummy EBUS Connection. Keyword Args: host (str): Hostname or IP port (int): Port autoconnect (bool): Automatically connect and re-connect timeout (int): Connection Timeout dummydata (DummyData): storage for responses """ # pylint: disable=too-many-arguments def __init__( self, host=DEFAULT_HOST, port=DEFAULT_PORT, autoconnect=False, timeout=DEFAULT_TIMEOUT, dummydata=None ): Connection.__init__(self, host=host, port=port, autoconnect=autoconnect, timeout=timeout) Dummy.__init__(self, dummydata=dummydata) self.__connected = False self.__respbuffer = collections.deque()
[docs] async def async_connect(self): """ Establish connection (required before first communication). Raises: ConnectionRefusedError: If connection cannot be established """ _LOGGER.debug("connect()") self.__connected = True
[docs] async def async_disconnect(self): """Disconnect if not already done.""" _LOGGER.debug("disconnect()") self.__connected = False
[docs] def is_connected(self): """ Return `True` if connection is established. This does not check if the connection is still usable. Returns: bool """ return self.__connected
async def _async_write(self, message): assert not self.__respbuffer, "Response Buffer not empty" self.__respbuffer.extend(self.respond(message)) async def _async_readline(self, check=False): line = self.__respbuffer.popleft() _LOGGER.debug("_readline() = %r", line) if line == "ERR: shutdown": raise Shutdown() if check and line.startswith("ERR: "): raise CommandError(line.lstrip("ERR: ")) return line async def _async_ensure_connection(self): if not self.__connected: if self._autoconnect: await self.async_connect() else: raise ConnectionError("Not connected")