Source code for pyisy.connection

"""Connection to the ISY."""

from __future__ import annotations

import asyncio
import ssl
import warnings
from typing import Literal
from urllib.parse import quote, urlencode

import aiohttp

from .constants import (
    METHOD_GET,
    URL_CLOCK,
    URL_CONFIG,
    URL_DEFINITIONS,
    URL_MEMBERS,
    URL_NETWORK,
    URL_NODES,
    URL_PING,
    URL_PROGRAMS,
    URL_RESOURCES,
    URL_STATUS,
    URL_SUBFOLDERS,
    URL_VARIABLES,
    VAR_INTEGER,
    VAR_STATE,
    XML_FALSE,
    XML_TRUE,
)
from .exceptions import ISYConnectionError, ISYInvalidAuthError
from .logging import _LOGGER, enable_logging

TLSVer = float | Literal["auto"]

MAX_HTTPS_CONNECTIONS_ISY = 2
MAX_HTTP_CONNECTIONS_ISY = 5
MAX_HTTPS_CONNECTIONS_IOX = 20
MAX_HTTP_CONNECTIONS_IOX = 50

MAX_RETRIES = 5
RETRY_BACKOFF = [0.01, 0.10, 0.25, 1, 2]  # Seconds

HTTP_OK = 200  # Valid request received, will run it
HTTP_UNAUTHORIZED = 401  # User authentication failed
HTTP_NOT_FOUND = 404  # Unrecognized request received and ignored
HTTP_SERVICE_UNAVAILABLE = 503  # Valid request received, system too busy to run it

HTTP_TIMEOUT = 30

HTTP_HEADERS = {
    "Connection": "keep-alive",
    "Keep-Alive": "5000",
    "Accept-Encoding": "gzip, deflate",
}

EMPTY_XML_RESPONSE = '<?xml version="1.0" encoding="UTF-8"?>'

# ``ssl.OP_LEGACY_SERVER_CONNECT`` was added to the stdlib ``ssl``
# module in Python 3.12; CI still runs on 3.11. The underlying OpenSSL
# flag ``SSL_OP_LEGACY_SERVER_CONNECT`` has had the stable value
# ``0x4`` for years, so fall back to the literal when the attribute is
# missing.
OP_LEGACY_SERVER_CONNECT = getattr(ssl, "OP_LEGACY_SERVER_CONNECT", 0x4)


[docs] class Connection: """Connection object to manage connection to and interaction with ISY."""
[docs] def __init__( self, address: str, port: int, username: str, password: str, use_https: bool = False, tls_ver: TLSVer = "auto", webroot: str = "", websession: aiohttp.ClientSession | None = None, verify_ssl: bool = False, ) -> None: """Initialize the Connection object.""" if len(_LOGGER.handlers) == 0: enable_logging(add_null_handler=True) self._address = address self._port = port self._username = username self._password = password self._auth = aiohttp.BasicAuth(self._username, self._password) self._webroot = webroot.rstrip("/") self.req_session = websession self._tls_ver = tls_ver self.use_https = use_https self._url = f"http{'s' if self.use_https else ''}://{self._address}:{self._port}{self._webroot}" self.semaphore = asyncio.Semaphore( MAX_HTTPS_CONNECTIONS_ISY if use_https else MAX_HTTP_CONNECTIONS_ISY ) if websession is None: websession = get_new_client_session(use_https, tls_ver) self.req_session = websession self.sslcontext = get_sslcontext(use_https, tls_ver, verify_ssl)
[docs] async def test_connection(self) -> str | None: """Test the connection and get the config for the ISY.""" config = await self.get_config(retries=None) if not config: raise ISYConnectionError("Could not connect to the ISY with the parameters provided.") return config
[docs] def increase_available_connections(self) -> None: """Increase the number of allowed connections for newer hardware.""" _LOGGER.debug("Increasing available simultaneous connections") self.semaphore = asyncio.Semaphore( MAX_HTTPS_CONNECTIONS_IOX if self.use_https else MAX_HTTP_CONNECTIONS_IOX )
[docs] async def close(self) -> None: """Cleanup connections and prepare for exit.""" await self.req_session.close()
@property def connection_info(self) -> dict[str, str | int | bytes | None]: """Return the connection info required to connect to the ISY.""" connection_info = {} connection_info["auth"] = self._auth.encode() connection_info["addr"] = self._address connection_info["port"] = int(self._port) connection_info["passwd"] = self._password connection_info["webroot"] = self._webroot if self.use_https and self._tls_ver: connection_info["tls"] = self._tls_ver return connection_info @property def url(self) -> str: """Return the full connection url.""" return self._url # COMMON UTILITIES
[docs] def compile_url(self, path: list[str], query: str | None = None) -> str: """Compile the URL to fetch from the ISY.""" url = self.url if path is not None: url += "/rest/" + "/".join([quote(item) for item in path]) if query is not None: url += "?" + urlencode(query) return url
[docs] async def request( self, url: str, retries: int = 0, ok404: bool = False, delay: int = 0, retry404: bool = False, ) -> str | None: """Execute request to ISY REST interface. retry404: ISY-994 returns spurious 404s on `/rest/nodes/.../cmd/...` when the Insteon network is overwhelmed. Pass True from command-issuing callers so a 404 falls into the existing retry/backoff loop instead of being treated as a permanent failure. """ _LOGGER.debug("ISY Request: %s", url) if delay: await asyncio.sleep(delay) try: async with ( self.semaphore, self.req_session.get( url, auth=self._auth, headers=HTTP_HEADERS, timeout=HTTP_TIMEOUT, ssl=self.sslcontext, ) as res, ): # /desc and other non-/rest URLs lack the "rest" substring. _, _, endpoint = url.partition("rest") if not endpoint: endpoint = url if res.status == HTTP_OK: _LOGGER.debug("ISY Response Received: %s", endpoint) results = await res.text(encoding="utf-8", errors="ignore") if results != EMPTY_XML_RESPONSE: return results _LOGGER.debug("Invalid empty XML returned: %s", endpoint) res.release() if res.status == HTTP_NOT_FOUND: if ok404: _LOGGER.debug("ISY Response Received %s", endpoint) res.release() return "" if retry404: # ISY-994 emits spurious 404s when the Insteon network # is busy; fall through to the retry/backoff loop. _LOGGER.debug( "ISY returned 404 for %s; controller may be busy, will retry", endpoint, ) res.release() else: _LOGGER.error("ISY Reported an Invalid Command Received %s", endpoint) res.release() return None if res.status == HTTP_UNAUTHORIZED: res.release() raise ISYInvalidAuthError("Invalid credentials provided for ISY connection.") if res.status == HTTP_SERVICE_UNAVAILABLE: _LOGGER.warning("ISY too busy to process request %s", endpoint) res.release() except TimeoutError: _LOGGER.warning("Timeout while trying to connect to the ISY.") except aiohttp.ClientSSLError as err: # SSL/TLS handshake failure. Subclass of ``ClientOSError``, so # the broader branch below would otherwise eat it silently as # a generic "ISY not ready or closed connection." debug. # Almost always one of: # * controller pinned below the ``tls_ver='auto'`` floor of # TLS 1.2 (e.g. an ISY-994 manually downgraded to 1.1, or # a modern OpenSSL distro with ``MinProtocol=TLSv1.2``). # * ``verify_ssl=True`` against the controller's self-signed # cert (``ClientConnectorCertificateError``). # * ISY-994 firmware (pre-RFC-5746) rejected by OpenSSL 3.x # with ``UNSAFE_LEGACY_RENEGOTIATION_DISABLED``. We # identify ISY-994 by the failure itself (the only peer # class that fails this way) and degrade the SSL context # once for the lifetime of the ``Connection`` — modern # peers (eisy/Polisy IoX, ISY-994 firmware that does # RFC 5746) stay strict. if ( self.sslcontext is not None and not (self.sslcontext.options & OP_LEGACY_SERVER_CONNECT) and "UNSAFE_LEGACY_RENEGOTIATION_DISABLED" in str(err) ): _LOGGER.warning( "Enabling ISY-994 legacy-renegotiation TLS compatibility for " "this controller; eisy/Polisy IoX peers do not need this. " "Original error: %s", err, ) self.sslcontext.options |= OP_LEGACY_SERVER_CONNECT return await self.request(url, retries=retries, ok404=ok404, delay=delay, retry404=retry404) # Always raise — retrying a real version/cert mismatch won't # recover, and callers (HA Core) need a definitive failure # to translate into ``ConfigEntryNotReady`` rather than a # silent ``None`` that looks like a transient miss. The SSL # detail rides along in the exception chain. raise ISYConnectionError(f"SSL/TLS error: {err}") from err except ( aiohttp.ClientOSError, aiohttp.ServerDisconnectedError, ): _LOGGER.debug("ISY not ready or closed connection.") except aiohttp.ClientResponseError as err: # Malformed framing/protocol error — retrying won't recover. # When the caller already opted into ``ok404=True`` we treat it # as another flavor of "feature not present": ISY-994 firmware # on a factory-reset / un-configured controller responds to # missing optional resources (``/CONF/STATE.VAR``, # ``/CONF/NET/RES.CFG``) with a real 404 whose framing trips # aiohttp's parser when the connection is reused — the next # request on the kept-alive socket reads the prior 404's HTML # body where an HTTP status line should be. Demote that to a # debug log and return ``""`` so the optional manager's # "no resource configured" path handles it cleanly. if ok404: _LOGGER.debug( "ISY response framing error on optional endpoint %s: %s", url, err.message, ) return "" _LOGGER.error( "Client Response Error from ISY: %s %s.", err.status, err.message, ) if retries is None: raise ISYConnectionError from err return None except aiohttp.ClientError as err: _LOGGER.error( "ISY Could not receive response from device because of a network issue: %s", type(err), ) if retries is None: raise ISYConnectionError if retries < MAX_RETRIES: _LOGGER.debug( "Retrying ISY Request in %ss, retry %s.", RETRY_BACKOFF[retries], retries + 1, ) # sleep to allow the ISY to catch up await asyncio.sleep(RETRY_BACKOFF[retries]) # recurse to try again return await self.request(url, retries + 1, ok404=ok404, retry404=retry404) # fail for good _LOGGER.error( "Bad ISY Request: (%s) Failed after %s retries.", url, retries, ) return None
[docs] async def ping(self) -> bool: """Test connection to the ISY and return True if alive.""" req_url = self.compile_url([URL_PING]) result = await self.request(req_url, ok404=True) return result is not None
[docs] async def get_description(self) -> str | None: """Fetch the services description from the ISY.""" url = "https://" if self.use_https else "http://" url += f"{self._address}:{self._port}{self._webroot}/desc" return await self.request(url)
[docs] async def get_config(self, retries: int = 0) -> str | None: """Fetch the configuration from the ISY.""" req_url = self.compile_url([URL_CONFIG]) return await self.request(req_url, retries=retries)
[docs] async def get_programs(self, address: int | str | None = None) -> str | None: """Fetch the list of programs from the ISY.""" addr = [URL_PROGRAMS] if address is not None: addr.append(str(address)) req_url = self.compile_url(addr, {URL_SUBFOLDERS: XML_TRUE}) return await self.request(req_url)
[docs] async def get_nodes(self) -> str | None: """Fetch the list of nodes/groups/scenes from the ISY.""" req_url = self.compile_url([URL_NODES], {URL_MEMBERS: XML_FALSE}) return await self.request(req_url)
[docs] async def get_status(self) -> str | None: """Fetch the status of nodes/groups/scenes from the ISY.""" req_url = self.compile_url([URL_STATUS]) return await self.request(req_url)
[docs] async def get_variable_defs(self) -> list[str | BaseException] | None: """Fetch the list of variables from the ISY. ``ok404=True`` because both endpoints legitimately 404 on a factory-reset / un-configured ISY-994 (``/CONF/INTEGER.VAR not found`` / ``/CONF/STATE.VAR not found``); the ``Variables`` parser already handles those bodies and ``None`` as "no variables defined" (see ``EMPTY_VARIABLE_RESPONSES``). Without ``ok404`` the request path emits ERROR-level log spam for what is really an empty-config success. """ req_list = [ [URL_VARIABLES, URL_DEFINITIONS, VAR_INTEGER], [URL_VARIABLES, URL_DEFINITIONS, VAR_STATE], ] req_urls = [self.compile_url(req) for req in req_list] return await asyncio.gather( *[self.request(req_url, ok404=True) for req_url in req_urls], return_exceptions=True, )
[docs] async def get_variables(self) -> str | None: """Fetch the variable details from the ISY to update local copy.""" req_list = [ [URL_VARIABLES, METHOD_GET, VAR_INTEGER], [URL_VARIABLES, METHOD_GET, VAR_STATE], ] req_urls = [self.compile_url(req) for req in req_list] async with asyncio.TaskGroup() as tg: tasks = [tg.create_task(self.request(req_url)) for req_url in req_urls] results = [r for r in (t.result() for t in tasks) if r is not None] result = "".join(results) return result.replace('</vars><?xml version="1.0" encoding="UTF-8"?><vars>', "")
[docs] async def get_network(self) -> str | None: """Fetch the list of network resources from the ISY.""" req_url = self.compile_url([URL_NETWORK, URL_RESOURCES]) result = await self.request(req_url, ok404=True) return result or None
[docs] async def get_time(self) -> str | None: """Fetch the system time info from the ISY.""" req_url = self.compile_url([URL_CLOCK]) return await self.request(req_url)
def get_new_client_session(use_https: bool, tls_ver: TLSVer = "auto") -> aiohttp.ClientSession: """Create a new Client Session for Connecting.""" if use_https: if not can_https(tls_ver): raise (ValueError("PyISY could not connect to the ISY. Check log for SSL/TLS error.")) return aiohttp.ClientSession(cookie_jar=aiohttp.CookieJar(unsafe=True)) return aiohttp.ClientSession() _TLS_VERSION_MAP: dict[float, ssl.TLSVersion] = { 1.1: ssl.TLSVersion.TLSv1_1, 1.2: ssl.TLSVersion.TLSv1_2, 1.3: ssl.TLSVersion.TLSv1_3, } # Floor for "auto" negotiation. Current eisy/Polisy IoX firmware rejects # TLS <=1.1 (RFC 8996). Stock ISY-994 firmware (4.5.4+) defaults to TLS 1.2 # and is user-configurable down to 1.0/1.1; users who have manually # downgraded their ISY-994's HTTPS Server Settings can still pin tls_ver=1.1. _TLS_AUTO_MIN = ssl.TLSVersion.TLSv1_2 def _warn_deprecated_pin(tls_ver: float) -> None: """Warn callers that pinning tls_ver is deprecated.""" warnings.warn( f"Passing tls_ver={tls_ver!r} is deprecated. The default 'auto' lets " "OpenSSL negotiate the highest TLS version both peers support " "(floor: TLS 1.2). Only pin tls_ver=1.1 if you have manually " "downgraded an ISY-994's HTTPS Server Settings below TLS 1.2.", DeprecationWarning, stacklevel=3, ) def get_sslcontext( use_https: bool, tls_ver: TLSVer = "auto", verify_ssl: bool = False, ) -> ssl.SSLContext | None: """Create an SSLContext object to use for the connections. eisy/Polisy and stock ISY-994 ship a self-signed cert, so verify_ssl defaults to False. Set verify_ssl=True for users who have installed a properly-signed certificate (CA-signed or imported via PKCS12) and have a CA bundle the OS trusts. """ if not use_https: return None context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) context.check_hostname = verify_ssl context.verify_mode = ssl.CERT_REQUIRED if verify_ssl else ssl.CERT_NONE if tls_ver == "auto": context.minimum_version = _TLS_AUTO_MIN elif tls_ver in _TLS_VERSION_MAP: _warn_deprecated_pin(tls_ver) context.minimum_version = _TLS_VERSION_MAP[tls_ver] context.maximum_version = _TLS_VERSION_MAP[tls_ver] else: raise ValueError(f"Unsupported TLS version: {tls_ver!r}") # Allow older ciphers for original ISY-994 hardware (TLS 1.1/1.2 only; # set_ciphers does not affect TLS 1.3 ciphersuites). context.set_ciphers("DEFAULT:!aNULL:!eNULL:!MD5:!3DES:!DES:!RC4:!IDEA:!SEED:!aDSS:!SRP:!PSK") # Note: ``OP_LEGACY_SERVER_CONNECT`` (ISY-994 RFC-5746 compat) is # NOT set here. ``Connection.request()`` enables it on demand the # first time the peer rejects the handshake with # ``UNSAFE_LEGACY_RENEGOTIATION_DISABLED`` — that way modern peers # (eisy/Polisy IoX, ISY-994 firmware that honors RFC 5746) keep # strict TLS, and only the controllers that actually need it # degrade. return context def can_https(tls_ver: TLSVer) -> bool: """ Verify minimum requirements to use an HTTPS connection. Returns boolean indicating whether HTTPS is available. """ output = True # check that Python was compiled against correct OpenSSL lib if "PROTOCOL_TLS_CLIENT" not in dir(ssl): _LOGGER.error("PyISY cannot use HTTPS: Compiled against old OpenSSL library. See docs.") output = False # check the requested TLS version if tls_ver != "auto" and tls_ver not in _TLS_VERSION_MAP: _LOGGER.error( "PyISY cannot use HTTPS: tls_ver must be 'auto' or one of " "1.1, 1.2, 1.3 (only ISY/IoX-supported versions)." ) output = False return output