Source code for smarts.core.configuration

# MIT License
#
# Copyright (C) 2023. Huawei Technologies Co., Ltd. All rights reserved.
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NON-INFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
import ast
import configparser
import functools
import logging
import os
import pathlib
import re
import warnings
from typing import Any, Callable, Final, List, Optional, Union

import smarts

_UNSET = object()

logger = logging.getLogger(__name__)


def _passthrough_cast(val):
    return val


def _convert_truthy(t: str) -> bool:
    """Convert value to a boolean. This should only allow ([Tt]rue)|([Ff]alse)|[\\d].

    This is necessary because bool("false") == True.
    Args:
        t (str): The value to convert.

    Returns:
        bool: The truth value.
    """
    # ast literal_eval will parse python literals int, str, e.t.c.
    out = ast.literal_eval(t.strip().title())
    assert isinstance(out, (bool, int))
    return bool(out)


_assets_path = os.path.join(list(smarts.__path__)[0], "assets")
_config_defaults: Final = {
    ("assets", "path"): _assets_path,
    ("assets", "default_agent_vehicle"): "sedan",
    ("assets", "default_vehicle_definitions_list"): os.path.join(
        _assets_path, "vehicles/vehicle_definitions_list.yaml"
    ),
    ("core", "observation_workers"): 0,
    ("core", "max_custom_image_sensors"): 32,
    ("core", "sensor_parallelization"): "mp",
    ("core", "debug"): False,
    ("core", "reset_retries"): 0,
    ("physics", "max_pybullet_freq"): 240,
    ("ray", "num_gpus"): 0,
    ("ray", "num_cpus"): None,
    ("ray", "log_to_driver"): False,
    ("sumo", "central_port"): 8619,
    ("sumo", "central_host"): "localhost",
    ("sumo", "traci_serve_mode"): "local",  # local|central
    ("traffic", "traci_retries"): 5,
    ("visdom", "enabled"): False,
    ("visdom", "hostname"): "http://localhost",
    ("visdom", "port"): 8097,
}


[docs]class Config: """A configuration utility that handles configuration from file and environment variable. Args: config_file (Union[str, pathlib.Path]): The path to the configuration file. environment_prefix (str, optional): The prefix given to the environment variables. Defaults to "". Raises: FileNotFoundError: If the configuration file cannot be found at the given file location. """ def __init__( self, config_file: Union[str, pathlib.Path], environment_prefix: str = "" ) -> None: self._config = configparser.ConfigParser( interpolation=configparser.ExtendedInterpolation() ) self._environment_prefix = environment_prefix.upper() self._environment_variable_format_string = ( self._environment_prefix + "_{}_{}" if self._environment_prefix else "{}_{}" ) self.env_variable_substitution_pattern = re.compile(r"\$\{(.+)\}") if isinstance(config_file, str): config_file = pathlib.Path(config_file) config_file = config_file.resolve() if not config_file.is_file(): raise FileNotFoundError(f"Configuration file not found at {config_file}") self._config.read(str(config_file.absolute())) logger.info(msg=f"Using engine configuration from: {config_file.absolute()}") @property def environment_prefix(self): """The prefix that environment variables configuration is provided with.""" return self._environment_prefix
[docs] @functools.lru_cache(maxsize=100) def get_setting( self, section: str, option: str, default: Any = _UNSET, cast: Callable[[Any], Any] = _passthrough_cast, ) -> Optional[Any]: """Finds the given configuration checking the following in order: environment variable, configuration file, and default. Args: section (str): The grouping that the configuration option is under. option (str): The specific configuration option. default (Any, optional): The default if the requested configuration option is not found. Defaults to _UNSET. cast (Callable, optional): A function that takes a string and returns the desired type. Defaults to str. Returns: Optional[str]: The value of the configuration. Raises: KeyError: If the configuration option is not found in the configuration file and no default is provided. configparser.NoSectionError: If the section in the configuration file is not found and no default is provided. """ env_variable = self._environment_variable_format_string.format( section.upper(), option.upper() ) setting = os.getenv(env_variable) if cast is bool: # This is necessary because bool("false") == True. cast = _convert_truthy if setting is not None: return cast(setting) try: value = self._config[section][option] except (configparser.NoSectionError, KeyError) as exc: if default is _UNSET: if (value := _config_defaults.get((section, option), _UNSET)) != _UNSET: return value raise EnvironmentError( f"Setting `${env_variable}` cannot be found in environment or configuration." ) from exc return default return cast(value)
[docs] def substitute_settings(self, input: str, source: Optional[str] = "") -> str: """Given a string, substitutes in configuration settings if they exist.""" m: List[str] = self.env_variable_substitution_pattern.findall(input) if not m: return input output = input for val in set(m): if self.environment_prefix: environment_prefix, _, setting = val.partition("_") if environment_prefix != self.environment_prefix: warnings.warn( f"Unable to substitute environment variable `{val}` from `{source}`" ) continue else: setting = val section, _, option_name = setting.lower().partition("_") env_value = self(section, option_name) output = output.replace(f"${{{val}}}", env_value) return output
def __call__( self, section: str, option: str, /, default: Any = _UNSET, cast: Callable[[str], Any] = str, ) -> Optional[Any]: return self.get_setting(section, option, default, cast) def __repr__(self) -> str: return f"Config(config_file={ {k: dict(v.items()) for k, v in self._config.items(raw=True)} }, environment_prefix={self._environment_prefix})"