Source code for pyebus.connection

"""EBUS Connection Handling."""
import asyncio
import logging

from .const import DEFAULT_HOST, DEFAULT_PORT, DEFAULT_TIMEOUT
from .exceptions import CommandError, Shutdown
from .util import repr_

_LOGGER = logging.getLogger(__name__)


[docs]class Connection: """ EBUS Connection. Keyword Args: host (str): Hostname or IP port (int): Port autoconnect (bool): Automatically connect and re-connect timeout (int): Connection Timeout """ def __init__(self, host=DEFAULT_HOST, port=DEFAULT_PORT, autoconnect=False, timeout=DEFAULT_TIMEOUT): self._host = host self._port = port self._autoconnect = autoconnect self._timeout = timeout self._reader, self._writer = None, None def __repr__(self): return repr_( self, kwargs=( ("host", self.host, DEFAULT_HOST), ("port", self.port, DEFAULT_PORT), ("autoconnect", self.autoconnect, False), ("timeout", self.timeout, DEFAULT_TIMEOUT), ), ) @property def host(self): """Host.""" return self._host @property def port(self): """Port.""" return self._port @property def autoconnect(self): """Automatically connect and re-connect.""" return self._autoconnect @property def timeout(self): """Connection Timeout.""" return self._timeout
[docs] async def async_connect(self): """ Establish connection (required before first communication). Raises: ConnectionRefusedError: If connection cannot be established """ _LOGGER.debug("connect()") self._reader, self._writer = await self._async_timedout(asyncio.open_connection(self._host, self._port))
[docs] async def async_disconnect(self): """Disconnect if not already done.""" _LOGGER.debug("disconnect()") if self._writer: try: await self._async_write("quit") self._writer.close() await self._writer.wait_closed() except BrokenPipeError: # pragma: no cover pass finally: self._reader, self._writer = None, None
[docs] def is_connected(self): """ Return `True` if connection is established. This does not check if the connection is still usable. Returns: bool """ return self._writer is not None and not self._writer.is_closing()
[docs] async def async_write(self, message): """ Send TCP `message` to EBUSD. Raises: ConnectionRefusedError: If connection cannot be established ConnectionError: If not connected (`autoconnect==False`) """ _LOGGER.debug("write(%r)", message) await self._async_ensure_connection() await self._async_write(message)
[docs] async def async_request(self, cmd, *args, **kwargs): """ Assemble request starting with `cmd` and position `args` and keywords `kwargs'. Raises: ConnectionRefusedError: If connection cannot be established ConnectionError: If not connected (`autoconnect==False`) """ parts = [cmd] parts += [f"-{option} {value}" for option, value in kwargs.items() if value is not None] parts += [str(arg) for arg in args] message = " ".join(parts) _LOGGER.debug("request(%r)", message) await self._async_ensure_connection() await self._async_write(message)
[docs] async def async_read(self, infinite=False, check=True): """ Receive lines until an empty one (`infinite==False`) or infinitly (`infinite==True`). Yields: str: line read Raises: ConnectionRefusedError: If connection cannot be established ConnectionError: If not connected (`autoconnect==False`) CommandError: If command failed (`check==True`) Shutdown: On EBUSD shutdown. """ _LOGGER.debug("read(infinite=%r, check=%r)", infinite, check) await self._async_ensure_connection() while True: line = await self._async_readline(check=check) if not line and not infinite: break yield line
[docs] async def async_readresp(self, check=True): """ Receive command response. Read one line as command response and one empty line. Returns: str: response Raises: ConnectionRefusedError: If connection cannot be established ConnectionError: If not connected (`autoconnect==False`) CommandError: If command failed (`check==True`) or trailing data. Shutdown: On EBUSD shutdown. """ _LOGGER.debug("readresp(check=%r)", check) await self._async_ensure_connection() line = await self._async_readline(check=check) empty = await self._async_readline() if empty: raise CommandError(f"Trailing data {empty}") return line
async def _async_write(self, message): self._writer.write(f"{message}\n".encode()) await self._async_timedout(self._writer.drain()) async def _async_readline(self, check=False): line = await self._reader.readline() line = line.decode("utf-8").rstrip() _LOGGER.debug("_readline() = %r", line) if line == "ERR: shutdown": raise Shutdown() if check and line.startswith("ERR: "): raise CommandError(line.lstrip("ERR: ")) return line async def _async_ensure_connection(self): if not self._writer or self._writer.is_closing(): if self._autoconnect: await self.async_connect() else: raise ConnectionError("Not connected") async def _async_timedout(self, task): if self._timeout: try: result = await asyncio.wait_for(task, timeout=self._timeout) except asyncio.TimeoutError as timeout: raise ConnectionError(f"{self.host}:{self.port} timeout") from timeout else: result = await task return result