Source code for lories.simulation.results

# -*- coding: utf-8 -*-
"""
lories.simulation.results
~~~~~~~~~~~~~~~~~~~~~~~~~


"""

from __future__ import annotations

import os
import re
from collections.abc import Callable
from functools import reduce
from typing import Any, Dict, Iterable, Iterator, List, Optional, Sequence

import pandas as pd
from lories.components import Component
from lories.connectors import Database
from lories.connectors.tables import HDFDatabase
from lories.core import CONSTANTS, Configurator, Constant, Directories, ResourceError, Resources
from lories.core.typing import Configurations, Timestamp
from lories.data.util import resample, scale_energy, scale_power
from lories.simulation import Durations, Progress, Result
from lories.util import parse_freq


[docs] class Results(Configurator, Sequence[Result]): __list: List[Result] __resources: Resources __component: Component __database: Database _freq: Optional[str] data: pd.DataFrame dirs: Directories durations: Durations progress: Progress def __init__( self, component: Component, database: Database, configs: Configurations, desc: Optional[str] = None, **kwargs, ) -> None: super().__init__(configs) if database is None: database_configs = component.configs.get_member( key="connectors", ensure_exists=True, ).get_member( key="results", defaults={}, ) database_configs.update( { "key": "results", "type": "tables", "file": str(component.configs.dirs.data.joinpath(".results.h5")), "compression_level": 9, "compression_lib": "zlib", } ) database = HDFDatabase(component, configs=database_configs) database.configure(database_configs) component.connectors.add(database) self.__list = [] self.__database = self._assert_database(database) self.__component = self._assert_component(component) self.__resources = self._extract_resources(component) self.dirs = component.configs.dirs if not self.dirs.data.exists(): os.makedirs(self.dirs.data, exist_ok=True) self.dirs.tmp = self.dirs.data.joinpath(".results") if not self.dirs.tmp.exists(): os.makedirs(self.dirs.tmp, exist_ok=True) if desc is None: desc = component.name self.progress = Progress( desc=desc, file=str(self.dirs.data.joinpath("results.json")), **kwargs, ) self.durations = Durations(self.dirs.tmp) self.data = pd.DataFrame() @classmethod def _assert_database(cls, database: Database) -> Database: if database is None or not isinstance(database, Database): raise ResourceError(f"Invalid '{cls.__name__}' database: {type(database)}") return database @classmethod def _assert_component(cls, component: Component) -> Component: if component is None or not isinstance(component, Component): raise ResourceError(f"Invalid '{cls.__name__}' component: {type(component)}") return component @staticmethod def _extract_resources(component: Component) -> Resources: resources = [] def extend_resources(__component: Component) -> None: resources.extend(__component.data.channels) for _component in __component.components.values(): extend_resources(_component) extend_resources(component) return Resources(resources) def configure(self, configs: Configurations) -> None: super().configure(configs) self._freq = parse_freq(configs.get("freq", default=None)) def __repr__(self) -> str: return f"{type(self).__name__}({', '.join(str(r.key) for r in self.__list)})" def __str__(self) -> str: return f"{type(self).__name__}:\n\t" + "\n\t".join(f"{r.key} = {repr(r)}" for r in self.__list) def __contains__(self, result: str | Result) -> bool: if isinstance(result, str): return any(result == r.key for r in self.__list) return result in self.__list def __getitem__(self, index: Iterable[str] | str | int): if isinstance(index, str): for result in self.__list: if result.key == index: return result if isinstance(index, Iterable): return [r for r in self.__list if r.key == index] raise KeyError(index) def __iter__(self) -> Iterator[Result]: return iter(self.__list) def __len__(self) -> int: return len(self.__list) def __add__(self, other): return [*self, *other] def add( self, key: str | Constant, name: str, summary: Any, header: str = "Summary", **kwargs: Any, ) -> None: self.__list.append(Result(key, name, summary, header=header, **kwargs)) def append(self, result: Result) -> None: self.__list.append(result) def extend(self, results: Iterable[Result]) -> None: self.__list.extend(results) def __enter__(self, **kwargs) -> Results: configs = self.configs self.configure(configs) self.open() return self # noinspection PyShadowingBuiltins def __exit__(self, type, value, traceback): self.close() def open(self) -> None: self.__database.connect(self.__resources) def close(self) -> None: self.__database.disconnect() self.durations.stop() self.progress.close() @property def key(self) -> str: return self.__component.key @property def name(self) -> str: return self.__component.name @property def start(self) -> Optional[pd.Timestamp]: if len(self.data.index) == 0: return None return self.data.index[0] @property def end(self) -> Optional[pd.Timestamp]: if len(self.data.index) < 2: return None return self.data.index[-1] # noinspection PyShadowingBuiltins def filter(self, filter: Callable[[Result], bool]) -> Sequence[Result]: return [result for result in self.__list if filter(result)] def report(self) -> None: self.durations.complete() self.progress.complete(**self.to_dict()) try: # TODO: Make reports configurable from lories.io import excel excel_file = str(self.dirs.data.joinpath("results.xlsx")) excel.write(excel_file, "Results", self.to_frame()) if self.configs.get_bool("include", default=True): columns = {c.key: c.full_name(unit=True) for c in CONSTANTS} columns.update({r.get("column", default=r.key): r.full_name(unit=True) for r in self.__resources}) if self._freq is not None: resampled = [] for method, resources in self.__resources.groupby("aggregate"): resample_columns = [r.get("column", default=r.key) for r in resources] resample_columns = [c for c in resample_columns if c in self.data.columns] if len(resample_columns) == 0: continue if method is None: self._logger.warning( "Skipping resources for missing aggregate function: " + ", ".join(f"'{r.id}'" for r in resources) ) continue resampled.append(resample(self.data[resample_columns], self._freq, method)) if len(resampled) == 0: data = pd.DataFrame() else: data = pd.concat(resampled, axis="columns")[self.data.columns] data.rename(inplace=True, columns=columns) else: data = self.data.rename(columns=columns) excel.write(excel_file, "Timeseries", data) except ImportError: pass # noinspection PyTypeChecker def submit( self, function: Callable[..., pd.DataFrame], start: Optional[Timestamp] = None, end: Optional[Timestamp] = None, *args, **kwargs, ) -> None: columns = {r.get("column", default=r.key): r.id for r in self.__resources} if self.__database.exists(self.__resources, start, end): data = self.__database.read(self.__resources, start, end) data.rename(columns={v: k for k, v in columns.items()}, inplace=True) else: data = function(start, end, *args, **kwargs) self.__database.write(data.rename(columns=columns)) self.data = pd.concat([self.data, data], axis="index") def is_complete(self) -> bool: return len(self.__list) > 0 and self.durations.is_complete() def is_success(self) -> bool: return self.is_complete() and self.progress.status == "success" def to_dict(self) -> Dict[str, Any]: return {r.key: r.summary for r in self.__list} def to_frame(self) -> pd.DataFrame: summary = pd.DataFrame(columns=pd.MultiIndex.from_tuples((), names=["System", ""])) order = list(dict.fromkeys(r.order for r in self.__list)) order.sort() for result in reduce(lambda r1, r2: r1 + r2, [self.filter(lambda r: r.order == o) for o in order]): name = result.name value = result.summary if re.search(r".*\[.*kWh.*]", name): name, value = scale_energy(name, value) elif re.search(r".*\[.*W.*]", name): name, value = scale_power(name, value) summary.loc[self.name, (result.header, name)] = value return summary