Source code for lories.settings

# -*- coding: utf-8 -*-
"""
lories.settings
~~~~~~~~~~~~~~~


"""

from __future__ import annotations

import logging.config
import os
import shutil
import sys
from argparse import ArgumentParser
from typing import Any, Dict

from lories.core import Configurations, Directories, Directory
from lories.util import to_bool


[docs] class Settings(Configurations): app: str # noinspection PyProtectedMember, SpellCheckingInspection def __init__(self, app_name: str, app_file: str = "settings.conf", parser: ArgumentParser = None) -> None: app_args = _parse_kwargs(parser) app_args["name"] = app_name app_args["systems"] = {k: to_bool(app_args.pop(f"system_{k}", False)) for k in ["scan", "flat", "copy"]} app_dirs = Directories(**{d: app_args.pop(d, None) for d in Directories.KEYS}) super().__init__(app_file, app_dirs, app_args) self._load(require=False) self._load_logging() override_path = os.path.join(self.dirs.data, self.name) if os.path.isfile(override_path): self._load_toml(override_path) self.dirs.update(self.get_member(Directories.TYPE, defaults={})) if self.dirs.conf._dir is None: self.dirs._conf = Directory(os.path.dirname(self.path), default="conf") def __getattr__(self, attr): # __getattr__ gets called when the item is not found via __getattribute__ # To avoid recursion, call __getattribute__ directly to get components dict configs = Configurations.__getattribute__(self, f"_{Configurations.__name__}__configs") if attr in configs.keys(): return configs[attr] raise AttributeError(f"'{type(self).__name__}' object has no configuration '{attr}'") @property def _members_dir(self) -> Directory: return self.dirs.conf def _load_logging(self) -> None: logging_file = os.path.join(self.dirs.conf, "logging.conf") if not os.path.isfile(logging_file) and not self.dirs.conf.is_default(): logging_default = logging_file.replace("logging.conf", "logging.default.conf") if os.path.isfile(logging_default): shutil.copy(logging_default, logging_file) if os.path.isfile(logging_file): logging.config.fileConfig(logging_file) for handler in logging.getLoggerClass().root.handlers: if isinstance(handler, logging.FileHandler): self.dirs.log = os.path.dirname(handler.baseFilename) break if not os.path.isdir(self.dirs.log): os.makedirs(self.dirs.log, exist_ok=True) else: handler_console = logging.StreamHandler(sys.stdout) handler_console.setLevel(logging.INFO) handler_console.setFormatter( logging.Formatter("%(asctime)s.%(msecs)03d - %(name)s - %(message)s", "%Y-%m-%d %H:%M:%S") ) # TODO: Think about fallback logging configuration necessity to use default logfile location # log_file = app_name.lower() # if not log_file.endswith('.log'): # log_file += '.log' # handler_file = logging.FileHandler(os.path.join(self.dirs._log or 'log', log_file)) # handler_file.setLevel(logging.WARN) # handler_file.setFormatter(logging.Formatter("%(asctime)s - %(name)s - %(message)s", # "%Y-%m-%d %H:%M:%S")) logging.basicConfig( force=True, level=logging.INFO, handlers=[ handler_console, # handler_file, ], )
# noinspection PyProtectedMember def _parse_kwargs(parser: ArgumentParser) -> Dict[str, Any]: if parser is not None: action_parsers = [a for a in parser._actions if a.dest == "action"] if len(action_parsers) > 0: action_parser = next(iter(action_parsers)) else: action_parser = parser.add_subparsers(dest="action") if not action_parser.required and action_parser.default is None: action_parser.required = True if "run" not in action_parser.choices.keys(): action_parser.add_parser("run", help="run local resources, connectors and systems") if "start" not in action_parser.choices.keys(): action_parser.add_parser("start", help="start the local resource system") if "rotate" not in action_parser.choices.keys(): replicate_parser = action_parser.add_parser( name="rotate", help="rotate data of a local database", ) replicate_parser.add_argument( "--full", dest="full", action="store_true", help="flags if the rotation should be executed for all data", ) if "replicate" not in action_parser.choices.keys(): replicate_parser = action_parser.add_parser( name="replicate", help="replicate data from a remote database", ) replicate_parser.add_argument( "--force", dest="force", action="store_true", help="flags if the replication should force rewrite data for checksum mismatches", ) replicate_parser.add_argument( "--full", dest="full", action="store_true", help="flags if the replication should be executed for all data", ) if "simulate" not in action_parser.choices.keys(): simulate_parser = action_parser.add_parser( name="simulate", help="simulate local resources and systems for a specified time range", ) simulate_parser.add_argument( "--start", dest="start", metavar="datetime", help="the start datetime for the simulation to start", ) simulate_parser.add_argument( "--end", dest="end", metavar="datetime", help="the end datetime for the simulation to end", ) parser.add_argument( "-c", "--conf-dir", dest="conf_dir", metavar="dir", help="directory to expect root configuration files", ) parser.add_argument( "-d", "--data-dir", dest="data_dir", metavar="dir", help="directory to expect and write data files to", ) parser.add_argument( "--system-scan", dest="system_scan", action="store_true", help="flags whether several systems will be expected, instead of a single one", ) parser.add_argument( "--system-flat", dest="system_flat", action="store_true", help="flags if the configuration files will be expected directly in the data directory, " + "instead of a corresponding 'conf' directory", ) parser.add_argument( "--system-copy", dest="system_copy", action="store_true", help="flags if the configured system files should be copied to the specified data directory if it is empty", ) args = parser.parse_args() kwargs = vars(args) else: kwargs = {} return kwargs