Source code for smarts.core.default_map_builder

# Copyright (C) 2021. Huawei Technologies Co., Ltd. All rights reserved.
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NON-INFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
from __future__ import annotations

import os
import sys
import warnings
from enum import IntEnum
from typing import TYPE_CHECKING, NamedTuple, Optional, Tuple

from smarts.core.utils.file import file_md5_hash, path2hash

if TYPE_CHECKING:
    from smarts.core.road_map import RoadMap
    from smarts.sstudio.sstypes import MapSpec


_existing_map = None


class _RoadMapInfo(NamedTuple):
    map_spec: MapSpec  # pytype: disable=invalid-annotation
    obj: RoadMap
    map_hash: str


def _cache_result(map_spec, road_map: RoadMap, road_map_hash: str):
    global _existing_map
    _existing_map = _RoadMapInfo(map_spec, road_map, road_map_hash)


def _clear_cache():
    global _existing_map
    if _existing_map:
        import gc

        # Try to only keep one map around at a time...
        del _existing_map
        _existing_map = None
        gc.collect()


[docs]class MapType(IntEnum): """The format of a map.""" Unknown = 0 Sumo = 1 Opendrive = 2 Waymo = 3 Argoverse = 4
[docs]def find_mapfile_in_dir(map_dir: str) -> Tuple[MapType, str]: """Looks in a given directory for a supported map file.""" map_filename_type = { "map.net.xml": MapType.Sumo, "shifted_map-AUTOGEN.net.xml": MapType.Sumo, "map.xodr": MapType.Opendrive, } map_type = MapType.Unknown map_path = map_dir for f in os.listdir(map_dir): cand_map_type = map_filename_type.get(f) if cand_map_type is not None: return cand_map_type, os.path.join(map_dir, f) if f.endswith(".net.xml"): map_type = MapType.Sumo map_path = os.path.join(map_dir, f) elif f.endswith(".xodr"): map_type = MapType.Opendrive elif ".tfrecord" in f: map_type = MapType.Waymo map_path = os.path.join(map_dir, f) elif "log_map_archive" in f: map_type = MapType.Argoverse map_path = os.path.join(map_dir, f) return map_type, map_path
# This function should be re-callable (although caching is up to the implementation). # The idea here is that anything in SMARTS that needs to use a RoadMap # can call this builder to get or create one of default type. # # Downstream developers who want to extend SMARTS to support other # map formats (by extending the RoadMap base class) can create their # own version of this to reference from a MapSpec within their # scenario folder(s) and shouldn't have to change much else.
[docs]def get_road_map(map_spec) -> Tuple[Optional[RoadMap], Optional[str]]: """@return a RoadMap object and a hash that uniquely identifies it. Changes to the hash should signify that the map is different enough that map-related caches should be reloaded. If possible, the RoadMap object may be cached here and re-used. """ assert map_spec, "A road map spec must be specified" assert map_spec.source, "A road map source must be specified" if os.path.isdir(map_spec.source): map_type, map_source = find_mapfile_in_dir(map_spec.source) else: map_type = MapType.Unknown map_source = map_spec.source if map_source.endswith(".net.xml"): map_type = MapType.Sumo elif map_source.endswith(".xodr"): map_type = MapType.Opendrive elif ".tfrecord" in map_source: map_type = MapType.Waymo if map_type == MapType.Sumo: from smarts.core.sumo_road_network import SumoRoadNetwork map_class = SumoRoadNetwork elif map_type == MapType.Opendrive: from smarts.core.utils.custom_exceptions import OpenDriveException try: from smarts.core.opendrive_road_network import OpenDriveRoadNetwork except ImportError: raise OpenDriveException.required_to("use OpenDRIVE maps") map_class = OpenDriveRoadNetwork elif map_type == MapType.Waymo: from smarts.core.waymo_map import WaymoMap map_class = WaymoMap elif map_type == MapType.Argoverse: try: from smarts.core.argoverse_map import ( ArgoverseMap, # pytype: disable=import-error ) except (ImportError, ModuleNotFoundError): print(sys.exc_info()) print( "Missing dependencies for Argoverse. Install them using the command `pip install -e .[argoverse]` at the source directory." ) return None, None map_class = ArgoverseMap else: warnings.warn( f"A map source for road surface generation can not be resolved from the given reference: `{map_spec.source}`.", category=UserWarning, ) return None, None if _existing_map: if isinstance(_existing_map.obj, map_class) and _existing_map.obj.is_same_map( map_spec ): return _existing_map.obj, _existing_map.map_hash _clear_cache() road_map = map_class.from_spec(map_spec) if road_map is None: return None, None if os.path.isfile(road_map.source): road_map_hash = file_md5_hash(road_map.source) else: road_map_hash = path2hash(road_map.source) _cache_result(map_spec, road_map, road_map_hash) return road_map, road_map_hash