Source code for lories.connectors.tasks.task
# -*- coding: utf-8 -*-
"""
lories.connectors.tasks.task
~~~~~~~~~~~~~~~~~~~~~~~~~~~~
"""
from __future__ import annotations
import logging
from abc import ABC, abstractmethod
from threading import Thread
from typing import Any
from lories._core._channel import ChannelState # noqa
from lories._core._channels import Channels # noqa
from lories._core._connector import Connector # noqa
from lories.connectors.errors import ConnectionError, ConnectorError
[docs]
class ConnectorTask(ABC, Thread):
connector: Connector
channels: Channels
def __init__(self, connector: Connector, channels: Channels, name: str = None, **kwargs):
super().__init__(name=name, target=self.__call__, **kwargs)
self._logger = logging.getLogger(self.__module__)
self.connector = connector
self.channels = channels
# noinspection PyUnresolvedReferences
def __call__(self, **kwargs) -> Any:
try:
return self.run(**kwargs)
except ConnectionError as e:
try:
self.connector.set_channels(ChannelState.DISCONNECTING)
self.connector.disconnect()
finally:
self.connector.set_channels(ChannelState.DISCONNECTED)
raise e
except ConnectorError as e:
raise e
except Exception as e:
raise ConnectorError(self.connector, str(e))
[docs]
@abstractmethod
def run(self, **kwargs) -> Any:
pass