# -*- coding: utf-8 -*-
"""
lories.application.main
~~~~~~~~~~~~~~~~~~~~~~~
"""
from __future__ import annotations
import sys
import traceback
from typing import Optional, Type
import pandas as pd
from lories import ConfigurationUnavailableError, Settings, System
from lories.application import Interface
from lories.connectors import Database, DatabaseException
from lories.data.manager import DataManager
from lories.simulation import Results
from lories.typing import Timestamp
from lories.util import slice_range, to_timedelta
[docs]
class Application(DataManager):
_interface: Optional[Interface] = None
@classmethod
def load(cls, name: str, factory: Type[System] = System, **kwargs) -> Application:
settings = Settings(name, **kwargs)
app = cls(settings)
app.configure(settings, factory)
return app
# noinspection PyProtectedMember
def __init__(self, settings: Settings, **kwargs) -> None:
super().__init__(settings, name=settings["name"], **kwargs)
if not settings.has_member(Interface.TYPE):
settings._add_member(Interface.TYPE, {"enabled": False})
# Check if the tasked action may be headless
if settings.get("action").lower() == "start":
self._interface = Interface(self, settings.get_member(Interface.TYPE))
# noinspection PyProtectedMember, PyTypeChecker, PyMethodOverriding
def configure(self, settings: Settings, factory: Type[System] = System) -> None:
super().configure(settings)
self._logger.debug(f"Setting up {type(self).__name__}: {self.name}")
components = []
try:
system_dirs = settings.dirs.to_dict()
system_dirs["conf_dir"] = None
systems_configs = settings.get_member("systems")
systems_flat = systems_configs.get_bool("flat")
if systems_configs.get_bool("scan"):
if systems_configs.get_bool("copy"):
factory.copy(self.settings)
system_dirs["scan_dir"] = str(settings.dirs.data)
components.extend(factory.scan(self._components, **system_dirs, flat=systems_flat))
else:
components.append(factory.load(self._components, **system_dirs, flat=systems_flat))
except ConfigurationUnavailableError as e:
self._logger.warning(str(e))
if not self._components.has_type(System) and settings.dirs.data.is_default():
components += self._components.load(configs_dir=settings.dirs.conf, configure=False, sort=False)
if len(components) > 0:
self._components.configure(components)
if self._has_interface():
self._interface.configure(settings.get_member(Interface.TYPE))
# noinspection PyTypeChecker
@property
def settings(self) -> Settings:
return self.configs
@property
def interface(self) -> Interface:
return self._interface
def _has_interface(self) -> bool:
return self._interface is not None and self._interface.is_enabled()
def main(self) -> None:
action = self.settings["action"]
try:
if action == "run":
with self:
self.run(
start=self.settings.get_date("start", default=None),
end=self.settings.get_date("end", default=None),
)
elif action == "start":
with self:
self.start()
elif action == "simulate":
with self:
self.simulate(
start=self.settings.get_date("start", default=None),
end=self.settings.get_date("end", default=None),
)
elif action == "rotate":
self.rotate(full=self.settings.get_bool("full"))
elif action == "replicate":
self.replicate(full=self.settings.get_bool("full"), force=self.settings.get_bool("force"))
except Exception as e:
self._logger.warning(f"Error during '{action}': {str(e)}")
self._logger.exception(e)
exit(1)
def start(self, wait: bool = True) -> None:
_has_interface = self._has_interface()
if _has_interface:
wait = False
super().start(wait)
if _has_interface:
self._interface.start()
# noinspection PyUnresolvedReferences, PyProtectedMember, PyShadowingBuiltins
def simulate(
self,
start: Optional[Timestamp] = None,
end: Optional[Timestamp] = None,
**kwargs,
) -> None:
simulation = self.settings.get_member("simulation", defaults={"data": {"include": True}})
timezone = simulation.get("timezone", None)
if start is None:
start = simulation.get_date("start", default=None, timezone=timezone)
if end is None:
end = simulation.get_date("end", default=None, timezone=timezone)
if start is None or end is None or end < start:
self._logger.error("Invalid settings missing specified simulation period")
sys.exit(1)
slice = simulation.get_bool("slice", default=True)
freq = simulation.get("freq", default=None)
database_id = simulation.get("database", default="results")
database_configs = simulation.get_member("databases", defaults={})
if database_id == "results" and "results" not in database_configs:
database_configs["results"] = {
"type": "tables",
"file": ".results.h5",
"compression_level": 9,
"compression_lib": "zlib",
}
self.connectors.load(database_configs)
database = self.connectors.get(database_id)
if not isinstance(database, Database):
raise DatabaseException(database, f"Invalid results cache type '{type(database)}'")
error = False
summary = []
systems = self.components.get_all(System)
for system in systems:
self._logger.info(f"Starting simulation of system '{system.name}': {system.id}")
if slice and freq is not None and start + to_timedelta(freq) < end:
slices = slice_range(start, end, timezone=system.location.timezone, freq=freq)
else:
slices = [(start, end)]
with Results(system, database, simulation.get_member("data"), total=len(slices)) as results:
results.durations.start("Simulation")
try:
for slice_start, slice_end in slices:
slice_prior = results.data.tail(1) if not results.data.empty else None
results.submit(
system.simulate,
slice_start,
slice_end,
slice_prior,
**kwargs,
)
results.progress.update()
results.durations.stop("Simulation")
self._logger.debug(
f"Finished simulation of system '{system.name}' in {results.durations['Simulation']} minutes"
)
self._logger.debug(f"Starting evaluation of system '{system.name}': {system.id}")
results.durations.start("Evaluation")
results_data = system.evaluate(results)
results.report()
# TODO: Call evaluations from configs
if "errors" in results_data.columns.get_level_values(0):
self._logger.debug(f"Starting evaluation of {len(results_data['errors'].columns)} errors")
results.durations.stop("Evaluation")
self._logger.debug(
f"Finished evaluation of system '{system.name}' in {results.durations['Evaluation']} minutes"
)
summary.append(results.to_frame())
except Exception as e:
error = True
self._logger.error(f"Error simulating system {system.name}: {str(e)}")
self._logger.debug("%s: %s", type(e).__name__, traceback.format_exc())
results.durations.complete()
results.progress.complete(
status="error",
message=str(e),
error=type(e).__name__,
trace=traceback.format_exc(),
)
if not error and len(summary) > 1:
try:
from lories.io import excel
excel_file = str(self.configs.dirs.data.joinpath("summary.xlsx"))
excel.write(excel_file, "Summary", pd.concat(summary, axis="index"))
except ImportError:
pass