# Copyright (C) 2021. Huawei Technologies Co., Ltd. All rights reserved.
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NON-INFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
import importlib.resources as pkg_resources
import os
import re
import tempfile
from dataclasses import dataclass
from functools import lru_cache
from importlib.util import find_spec
from pathlib import Path
from typing import Any, Dict, Optional, Union
import yaml
import smarts.assets.vehicles
from smarts.core import config
[docs]def load_vehicle_definitions_list(vehicle_list_filepath: Optional[str]):
"""Load a vehicle definition list file."""
if (vehicle_list_filepath is None) or not os.path.exists(vehicle_list_filepath):
vehicle_list_filepath = config()("assets", "default_vehicle_definitions_list")
vehicle_list_filepath = Path(vehicle_list_filepath).absolute()
return VehicleDefinitions(
data=load_yaml_config_with_substitution(vehicle_list_filepath),
filepath=vehicle_list_filepath,
)
[docs]def load_yaml_config(path: Path) -> Optional[Dict[str, Any]]:
"""Read in a yaml configuration to dictionary format."""
config = None
if path.exists():
assert path.suffix in (".yaml", ".yml"), f"`{str(path)}` is not a YAML file."
with open(path, "r", encoding="utf-8") as file:
config = yaml.safe_load(file)
return config
def _replace_with_module_path(base: str, module_str: str):
spec = find_spec(module_str)
if not spec:
raise RuntimeError(
f"Spec cannot be found for `${{{module_str}}}` in config:\n{base}"
)
origin = spec.origin
if origin.endswith("__init__.py"):
origin = origin[: -len("__init__.py")]
return base.replace(f"${{{{{module_str}}}}}", origin)
[docs]def load_yaml_config_with_substitution(
path: Union[str, Path]
) -> Optional[Dict[str, Any]]:
"""Read in a yaml configuration to dictionary format replacing instances of ${{module}} with
module's file path and ${} with the SMARTS environment variable."""
smarts_config = config()
out_config = None
if isinstance(path, str):
path = Path(path)
if path.exists():
assert path.suffix in (".yaml", ".yml"), f"`{str(path)}` is not a YAML file."
with tempfile.NamedTemporaryFile("w", suffix=".py", dir=path.parent) as c:
with open(str(path), "r", encoding="utf-8") as o:
pattern = re.compile(r"\$\{\{(.+)\}\}")
conf = o.read()
match = pattern.findall(conf)
if match:
for val in match:
conf = _replace_with_module_path(conf, val)
conf = smarts_config.substitute_settings(conf, path.__str__())
c.write(conf)
c.flush()
with open(c.name, "r", encoding="utf-8") as file:
out_config = yaml.safe_load(file)
return out_config
[docs]@dataclass(frozen=True)
class VehicleDefinitions:
"""This defines a set of vehicle definitions and loading utilities."""
data: Dict[str, Any]
"""The data associated with the vehicle definitions. This is generally vehicle type keys."""
filepath: Union[str, Path]
"""The path to the vehicle definitions file."""
def __post_init__(self):
if isinstance(self.filepath, Path):
object.__setattr__(self, "filepath", self.filepath.__str__())
[docs] @lru_cache(maxsize=20)
def load_vehicle_definition(self, vehicle_class: str):
"""Loads in a particular vehicle definition."""
if vehicle_definition_filepath := self.data.get(vehicle_class):
return load_yaml_config_with_substitution(Path(vehicle_definition_filepath))
raise OSError(
f"Vehicle '{vehicle_class}' is not defined in {list(self.data.keys())}"
)
[docs] @lru_cache(maxsize=20)
def controller_params_for_vehicle_class(self, vehicle_class: str):
"""Get the controller parameters for the given vehicle type"""
vehicle_definition = self.load_vehicle_definition(vehicle_class)
controller_params = Path(vehicle_definition["controller_params"])
return load_yaml_config_with_substitution(controller_params)
[docs] @lru_cache(maxsize=20)
def chassis_params_for_vehicle_class(self, vehicle_class: str):
"""Get the controller parameters for the given vehicle type"""
vehicle_definition = self.load_vehicle_definition(vehicle_class)
chassis_parms = Path(vehicle_definition["chassis_params"])
return load_yaml_config_with_substitution(chassis_parms)
def __hash__(self) -> int:
return hash(self.filepath)