Source code for lories.connectors.context

# -*- coding: utf-8 -*-
"""
lories.connectors.context
~~~~~~~~~~~~~~~~~~~~~~~~~


"""

from __future__ import annotations

import logging
from concurrent import futures
from concurrent.futures import Future
from typing import Callable, Dict, Optional, Type

from lories._core._channels import Channels  # noqa
from lories._core._connector import Connector, _Connector, _ConnectorContext  # noqa
from lories.connectors.errors import ConnectorError
from lories.connectors.tasks import ConnectTask
from lories.core.configs import Configurations
from lories.core.register import RegistratorContext, Registry
from lories.data.channels import ChannelState

registry = Registry[_Connector]()


# noinspection PyShadowingBuiltins
def register_connector_type(
    type: str,
    *alias: str,
    factory: Callable[[ConnectorContext | Connector, Optional[Configurations]], Connector] = None,
    replace: bool = False,
) -> Callable[[Type[Connector]], Type[Connector]]:
    # noinspection PyShadowingNames
    def _register(cls: Type[Connector]) -> Type[Connector]:
        registry.register(cls, type, *alias, factory=factory, replace=replace)
        return cls

    return _register


# noinspection PyProtectedMember
[docs] class ConnectorContext(_ConnectorContext, RegistratorContext[Connector]): @property def _registry(self) -> Registry[Connector]: return registry # noinspection PyShadowingBuiltins def connect( self, filter: Optional[Callable[[Connector], bool]] = None, channels: Optional[Channels] = None, timeout: Optional[float] = None, ) -> None: self._connect(*self.filter(filter), channels=channels, timeout=timeout) def _connect( self, *connectors: Connector, channels: Optional[Channels] = None, timeout: Optional[float] = None, ) -> None: connect_futures = {} for connector in connectors: if not connector.is_enabled(): self._logger.debug( f"Skipping to connect disabled {type(connector).__name__} '{connector.name}': {connector.id}" ) continue if connector._is_connected(): self._logger.debug( f"Skipping already connected {type(connector).__name__} '{connector.name}': {connector.id}" ) continue if not connector._is_connectable(): self._logger.debug( f"Skipping not connectable {type(connector).__name__} '{connector.name}': {connector.id}" ) continue connect_task = self.__connect(connector, channels) connect_future = self._executor.submit(connect_task) connect_futures[connect_future] = connect_task self.__connect_futures(connect_futures, timeout) # noinspection PyUnresolvedReferences def __connect(self, connector: Connector, channels: Optional[Channels] = None) -> ConnectTask: self._logger.debug(f"Connecting {type(connector).__name__} '{connector.name}': {connector.id}") if channels is None: channels = self.context.filter(lambda c: c.has_connector(connector.id)) channels.update(self.context.filter(lambda c: c.has_logger(connector.id)).apply(lambda c: c.from_logger())) return ConnectTask(connector, channels) def __connect_futures( self, tasks: Dict[Future, ConnectTask], timeout: Optional[float] = None, ) -> None: try: for future in futures.as_completed(tasks, timeout=timeout): tasks.pop(future) self.__connect_callback(future) except TimeoutError: for future, task in tasks.items(): self._logger.warning(f"Timed out opening connector '{task.connector.id}' after {timeout} seconds") future.cancel() def __connect_callback(self, future: Future) -> None: try: connector = future.result() self._logger.info(f"Connected {type(connector).__name__} '{connector.name}': {connector.id}") except ConnectorError as e: self._logger.warning(f"Failed opening connector '{e.connector.id}': {str(e)}") if self._logger.getEffectiveLevel() <= logging.DEBUG: self._logger.exception(e) # noinspection PyShadowingBuiltins def reconnect( self, filter: Optional[Callable[[Connector], bool]] = None, ) -> None: self._reconnect(*self.filter(filter)) def _reconnect(self, *connectors: Connector) -> None: for connector in connectors: if not connector.is_enabled(): self._logger.debug( f"Skipping reconnecting disabled {type(connector).__name__} '{connector.name}': {connector.id}" ) continue if not connector._is_connected() and connector._connected: # Connection aborted and not yet disconnected properly self._disconnect(connector) continue connect_task = self.__connect(connector) connect_future = self._executor.submit(connect_task) connect_future.add_done_callback(self.__connect_callback) # noinspection PyShadowingBuiltins def disconnect( self, filter: Optional[Callable[[Connector], bool]] = None, ) -> None: self._disconnect(*self.filter(filter)) def _disconnect(self, *connectors: Connector) -> None: for connector in reversed(connectors): if not connector._is_connected(): self._logger.debug( f"Skipping to disconnect unconnected {type(connector).__name__} '{connector.name}': {connector.id}" ) continue self.__disconnect(connector) def __disconnect(self, connector: Connector) -> None: try: self._logger.debug(f"Disconnecting {type(connector).__name__} '{connector.name}': {connector.id}") connector.set_channels(ChannelState.DISCONNECTING) connector.disconnect() self._logger.info(f"Disconnected {type(connector).__name__} '{connector.name}': {connector.id}") except Exception as e: self._logger.warning(f"Failed closing connector '{connector.id}': {str(e)}") if self._logger.getEffectiveLevel() <= logging.DEBUG: self._logger.exception(e) finally: connector.set_channels(ChannelState.DISCONNECTED)