Source code for smarts.sstudio.genscenario

# Copyright (C) 2020. Huawei Technologies Co., Ltd. All rights reserved.
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NON-INFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
"""This script provides a Python interface to generate scenario artifacts. This includes
route files `sumo `\\*.rou.xml`), missions, and bubbles.
"""

import collections
import inspect
import itertools
import logging
import os
import pickle
import sqlite3
from dataclasses import dataclass, replace
from pathlib import Path
from typing import Any, Dict, List, Optional, Sequence, Tuple, Union

import cloudpickle
import yaml

import smarts.core
from smarts.core.default_map_builder import find_mapfile_in_dir
from smarts.core.utils.core_logging import timeit
from smarts.core.utils.file import file_md5_hash, path2hash, pickle_hash

from . import sstypes
from .generators import TrafficGenerator

logging.basicConfig(level=logging.WARNING)
logger = logging.getLogger(__file__)
logger.setLevel(logging.WARNING)


[docs]@dataclass(frozen=True) class ActorAndMission: """Holds an Actor object and its associated Mission.""" actor: sstypes.Actor """Specification for traffic actor. """ mission: Union[sstypes.Mission, sstypes.EndlessMission, sstypes.LapMission] """Mission for traffic actor. """
def _check_if_called_externally(): frame_info = inspect.stack()[2] module = inspect.getmodule(frame_info[0]) if not module or module.__name__ != "smarts.sstudio.genscenario": logger.warning( "", exc_info=DeprecationWarning( "Calling gen_* methods directly is now deprecated. All scenarios must call gen_scenario()." ), ) def _build_graph(scenario: sstypes.Scenario, base_dir: str) -> Dict[str, Any]: graph = collections.defaultdict(list) graph["map"] = [os.path.join(base_dir, "map", "map.glb")] if scenario.map_spec: graph["map_spec"] = [os.path.join(base_dir, "map", "map_spec.pkl")] if scenario.traffic: for name, traffic in scenario.traffic.items(): ext = "smarts" if traffic.engine == "SMARTS" else "rou" artifact_path = os.path.join(base_dir, "traffic", f"{name}.{ext}.xml") graph["traffic"].append(artifact_path) if scenario.ego_missions: graph["ego_missions"] = [os.path.join(base_dir, "missions.pkl")] if scenario.social_agent_missions: for name in scenario.social_agent_missions.keys(): artifact_path = os.path.join(base_dir, "social_agents", f"{name}.pkl") graph["social_agent_missions"].append(artifact_path) if scenario.bubbles: graph["bubbles"] = [os.path.join(base_dir, "bubbles.pkl")] if scenario.friction_maps: graph["friction_maps"] = [os.path.join(base_dir, "friction_map.pkl")] if scenario.traffic_histories: for dataset in scenario.traffic_histories: assert isinstance(dataset, sstypes.TrafficHistoryDataset) artifact_path = os.path.join(base_dir, f"{dataset.name}.shf") graph["traffic_histories"].append(artifact_path) if scenario.scenario_metadata is not None: graph["scenario_metadata"] = [os.path.join(base_dir, "scenario_metadata.yaml")] return graph def _needs_build( db_conn: sqlite3.Connection, scenario_obj: Any, artifact_paths: List[str], obj_hash: str, map_needs_build: Optional[bool] = None, ) -> bool: if scenario_obj is None: return False # There's no object in the DSL, so nothing to build if map_needs_build: return True # Passed as an override for artifacts that depend on the map if not all([os.path.exists(f) for f in artifact_paths]): return True # Some of the expected output files don't exist query = """SELECT scenario_obj_hash FROM Artifacts WHERE artifact_path=?;""" for artifact_path in artifact_paths: cur = db_conn.cursor() cur.execute(query, (artifact_path,)) row = cur.fetchone() cur.close() if row is None: return True # Artifact is not in the db artifact_hash = row[0] if artifact_hash != obj_hash: return True # Hash does not match, out of date return False def _update_artifacts( db_conn: sqlite3.Connection, artifact_paths: List[str], hash_val: str ): query = """REPLACE INTO Artifacts (artifact_path, scenario_obj_hash) VALUES(?, ?);""" cur = db_conn.cursor() for artifact_path in artifact_paths: cur.execute(query, (artifact_path, hash_val)) db_conn.commit() cur.close()
[docs]def gen_scenario( scenario: sstypes.Scenario, output_dir: Union[str, Path], seed: int = 42, ): """This is now the preferred way to generate a scenario. Instead of calling the gen_* methods directly, we provide this higher-level abstraction that takes care of the sub-calls. """ # XXX: For now this simply coalesces the sub-calls but in the future this allows # us to simplify our serialization between SStudio and SMARTS. smarts.core.seed(seed) scenario_dir = os.path.abspath(str(output_dir)) build_dir = os.path.join(scenario_dir, "build") build_graph = _build_graph(scenario, build_dir) os.makedirs(build_dir, exist_ok=True) # Create DB for build caching db_path = os.path.join(build_dir, "build.db") db_conn = sqlite3.connect(db_path) cur = db_conn.cursor() cur.execute( """CREATE TABLE IF NOT EXISTS Artifacts ( artifact_path TEXT PRIMARY KEY, scenario_obj_hash TEXT ) WITHOUT ROWID""" ) db_conn.commit() cur.close() # Map spec artifact_paths = build_graph["map_spec"] obj_hash = pickle_hash(scenario.map_spec, True) if _needs_build(db_conn, scenario.map_spec, artifact_paths, obj_hash): with timeit("map_spec", logger.info): gen_map_spec_artifact(scenario_dir, scenario.map_spec) _update_artifacts(db_conn, artifact_paths, obj_hash) map_spec = scenario.map_spec else: if scenario.map_spec is not None: map_spec = scenario.map_spec else: map_spec = sstypes.MapSpec(source=scenario_dir) if map_spec.shift_to_origin and scenario.traffic_histories: logger.warning( "Attempting to shift map with traffic histories." "The data may no longer line up with the map." ) # Track changes to map file itself, if we can find it if os.path.isdir(map_spec.source): _, map_source = find_mapfile_in_dir(map_spec.source) else: map_source = map_spec.source if os.path.isfile(map_source): map_hash = file_md5_hash(map_source) else: map_hash = path2hash(map_source) # Map glb file artifact_paths = build_graph["map"] map_needs_rebuild = _needs_build(db_conn, map_spec, artifact_paths, map_hash) if map_needs_rebuild: with timeit("map_glb", logger.info): road_map, _ = map_spec.builder_fn(map_spec) if not road_map: logger.warning( "No reference to a RoadNetwork file was found in {}, or one could not be created. " "Please make sure the path passed is a valid Scenario with RoadNetwork file required " "(or a way to create one) for scenario building.".format( scenario_dir ) ) return map_dir = os.path.join(build_dir, "map") os.makedirs(map_dir, exist_ok=True) road_map.to_glb(map_dir) _update_artifacts(db_conn, artifact_paths, map_hash) # Traffic artifact_paths = build_graph["traffic"] obj_hash = pickle_hash(scenario.traffic, True) if _needs_build( db_conn, scenario.traffic, artifact_paths, obj_hash, map_needs_rebuild ): with timeit("traffic", logger.info): for iteration, (name, traffic) in enumerate(scenario.traffic.items()): derived_seed = seed + iteration gen_traffic( scenario=scenario_dir, traffic=traffic, name=name, seed=derived_seed, map_spec=map_spec, ) _update_artifacts(db_conn, artifact_paths, obj_hash) # Ego missions artifact_paths = build_graph["ego_missions"] obj_hash = pickle_hash(scenario.ego_missions, True) if _needs_build( db_conn, scenario.ego_missions, artifact_paths, obj_hash, map_needs_rebuild ): with timeit("ego_missions", logger.info): missions = [] for mission in scenario.ego_missions: if isinstance(mission, sstypes.GroupedLapMission): gen_group_laps( scenario=output_dir, begin=mission.route.begin, end=mission.route.end, grid_offset=mission.offset, used_lanes=mission.lanes, vehicle_count=mission.actor_count, entry_tactic=mission.entry_tactic, num_laps=mission.num_laps, map_spec=map_spec, ) else: missions.append(mission) if missions: gen_agent_missions( scenario=output_dir, missions=missions, map_spec=map_spec, ) _update_artifacts(db_conn, artifact_paths, obj_hash) # Social agent missions artifact_paths = build_graph["social_agent_missions"] obj_hash = pickle_hash(scenario.social_agent_missions, True) if _needs_build( db_conn, scenario.social_agent_missions, artifact_paths, obj_hash, map_needs_rebuild, ): with timeit("social_agent_missions", logger.info): for name, (actors, missions) in scenario.social_agent_missions.items(): if not ( isinstance(actors, collections.abc.Sequence) and isinstance(missions, collections.abc.Sequence) ): raise ValueError("Actors and missions must be sequences") gen_social_agent_missions( name=name, scenario=output_dir, social_agent_actor=actors, missions=missions, map_spec=map_spec, ) _update_artifacts(db_conn, artifact_paths, obj_hash) # Bubbles artifact_paths = build_graph["bubbles"] obj_hash = pickle_hash(scenario.bubbles, True) if _needs_build(db_conn, scenario.bubbles, artifact_paths, obj_hash): with timeit("bubbles", logger.info): gen_bubbles(scenario=output_dir, bubbles=scenario.bubbles) _update_artifacts(db_conn, artifact_paths, obj_hash) # Friction maps artifact_paths = build_graph["friction_maps"] obj_hash = pickle_hash(scenario.friction_maps, True) if _needs_build(db_conn, scenario.friction_maps, artifact_paths, obj_hash): with timeit("friction_maps", logger.info): gen_friction_map( scenario=output_dir, surface_patches=scenario.friction_maps ) _update_artifacts(db_conn, artifact_paths, obj_hash) # Traffic histories artifact_paths = build_graph["traffic_histories"] obj_hash = pickle_hash(scenario.traffic_histories, True) if _needs_build( db_conn, scenario.traffic_histories, artifact_paths, obj_hash, map_needs_rebuild ): with timeit("traffic_histories", logger.info): gen_traffic_histories( scenario=output_dir, histories_datasets=scenario.traffic_histories, map_spec=map_spec, ) _update_artifacts(db_conn, artifact_paths, obj_hash) # Scenario metadata artifact_paths = build_graph["scenario_metadata"] obj_hash = pickle_hash(scenario.scenario_metadata, True) if _needs_build( db_conn, scenario.scenario_metadata, artifact_paths, obj_hash, map_needs_rebuild ): with timeit("scenario_metadata", logger.info): gen_metadata( scenario=output_dir, scenario_metadata=scenario.scenario_metadata, )
[docs]def gen_map_spec_artifact( scenario: str, map_spec: sstypes.MapSpec, output_dir: Optional[str] = None ): """Saves a map spec to file.""" _check_if_called_externally() build_dir = os.path.join(scenario, "build") output_dir = os.path.join(output_dir or build_dir, "map") os.makedirs(output_dir, exist_ok=True) output_path = os.path.join(output_dir, "map_spec.pkl") with open(output_path, "wb") as f: # we use cloudpickle here instead of pickle because the # map_spec object may contain a reference to a map_builder callable cloudpickle.dump(map_spec, f)
[docs]def gen_traffic( scenario: str, traffic: sstypes.Traffic, name: str, output_dir: Optional[str] = None, seed: int = 42, map_spec: Optional[sstypes.MapSpec] = None, ): """Generates the traffic routes for the given scenario. If the output directory is not provided, the scenario directory is used.""" _check_if_called_externally() assert name != "missions", "The name 'missions' is reserved for missions!" build_dir = os.path.join(scenario, "build") output_dir = os.path.join(output_dir or build_dir, "traffic") os.makedirs(output_dir, exist_ok=True) generator = TrafficGenerator(scenario, map_spec, overwrite=True) saved_path = generator.plan_and_save(traffic, name, output_dir, seed=seed) if saved_path: logger.debug(f"Generated traffic for scenario={scenario}")
[docs]def gen_social_agent_missions( scenario: str, missions: Sequence[sstypes.Mission], social_agent_actor: Union[ sstypes.SocialAgentActor, Sequence[sstypes.SocialAgentActor] ], name: str, map_spec: Optional[sstypes.MapSpec] = None, ): """Generates the social agent missions for the given scenario. Args: scenario: The scenario directory missions: A sequence of missions for social agents to perform social_agent_actor: The social agent actor(s) this scenario will use. name: A short name for this grouping of social agents. Is also used as the name of the social agent traffic file map_spec: An optional map specification that takes precedence over scenario directory information. """ _check_if_called_externally() # For backwards compatibility we support both a single value and a sequence actors = social_agent_actor if not isinstance(actors, collections.abc.Sequence): actors = [actors] # This doesn't support BoidAgentActor. Here we make that explicit if any(isinstance(actor, sstypes.BoidAgentActor) for actor in actors): raise ValueError( "gen_social_agent_missions(...) can't be called with BoidAgentActor, got:" f"{actors}" ) actor_names = [a.name for a in actors] if len(actor_names) != len(set(actor_names)): raise ValueError(f"Actor names={actor_names} must not contain duplicates") output_dir = os.path.join(scenario, "build", "social_agents") saved = gen_missions( scenario=scenario, missions=missions, actors=actors, name=name, output_dir=output_dir, map_spec=map_spec, ) if saved: logger.debug(f"Generated social agent missions for scenario={scenario}")
[docs]def gen_agent_missions( scenario: str, missions: Sequence, map_spec: Optional[sstypes.MapSpec] = None, ): """Generates a route file to represent missions (a route per mission). Will create the output_dir if it doesn't exist already. The output file will be named `missions`. Args: scenario: The scenario directory missions: A sequence of missions for social agents to perform map_spec: An optional map specification that takes precedence over scenario directory information. """ _check_if_called_externally() output_dir = os.path.join(scenario, "build") saved = gen_missions( scenario=scenario, missions=missions, actors=[sstypes.TrafficActor(name="car")], name="missions", output_dir=output_dir, map_spec=map_spec, ) if saved: logger.debug(f"Generated missions for scenario={scenario}")
[docs]def gen_group_laps( scenario: str, begin: Tuple[str, int, Any], end: Tuple[str, int, Any], grid_offset: int, used_lanes: int, vehicle_count: int, entry_tactic: Optional[sstypes.EntryTactic], num_laps: int = 3, map_spec: Optional[sstypes.MapSpec] = None, ): """Generates missions that start with a grid offset at the start-line and do a number of laps until finishing. Args: scenario: The scenario directory begin: The edge and offset of the first vehicle end: The edge and offset of the finish-line grid_offset: The F1 starting line staggered with offset disadvantage imposed per vehicle used_lanes: The number of lanes used for the starting-line from the innermost lane vehicle_count: The number of vehicles to use num_laps: The amount of laps before finishing """ _check_if_called_externally() start_road_id, start_lane, start_offset = begin end_road_id, end_lane, end_offset = end missions = [] for i in range(vehicle_count): s_lane = (start_lane + i) % used_lanes missions.append( sstypes.LapMission( sstypes.Route( begin=( start_road_id, s_lane, start_offset - grid_offset * i, ), end=(end_road_id, (end_lane + i) % used_lanes, end_offset), ), num_laps=num_laps, entry_tactic=entry_tactic, # route_length=route_length, ) ) saved = gen_agent_missions( scenario=scenario, missions=missions, map_spec=map_spec, ) if saved: logger.debug(f"Generated grouped lap missions for scenario={scenario}")
[docs]def gen_bubbles(scenario: str, bubbles: Sequence[sstypes.Bubble]): """Generates 'bubbles' in the scenario that capture vehicles for actors. Args: scenario: The scenario directory bubbles: The bubbles to add to the scenario. """ _check_if_called_externally() output_path = os.path.join(scenario, "build", "bubbles.pkl") with open(output_path, "wb") as f: pickle.dump(bubbles, f)
[docs]def gen_friction_map( scenario: str, surface_patches: Sequence[sstypes.RoadSurfacePatch] ): """Generates friction map file according to the surface patches defined in scenario file. """ _check_if_called_externally() output_path = os.path.join(scenario, "build", "friction_map.pkl") with open(output_path, "wb") as f: pickle.dump(surface_patches, f)
[docs]def gen_missions( scenario: str, missions: Sequence[sstypes.Mission], actors: Sequence[sstypes.Actor], name: str, output_dir: str, map_spec: Optional[sstypes.MapSpec] = None, ): """Generates a route file to represent missions (a route per mission). Will create the output_dir if it doesn't exist already. """ _check_if_called_externally() generator = TrafficGenerator(scenario, map_spec) def resolve_mission(mission): route = getattr(mission, "route", None) kwargs = {} if route: kwargs["route"] = generator.resolve_route(route, False) via = getattr(mission, "via", ()) if via != (): kwargs["via"] = _resolve_vias(via, generator=generator) mission = replace(mission, **kwargs) return mission os.makedirs(output_dir, exist_ok=True) output_path = os.path.join(output_dir, name + ".pkl") _validate_missions(missions) missions = [ ActorAndMission(actor=actor, mission=resolve_mission(mission)) for actor, mission in itertools.product(actors, missions) ] with open(output_path, "wb") as f: pickle.dump(missions, f) return True
def _resolve_vias(via: Tuple[sstypes.Via], generator): vias = [*via] for i in range(len(vias)): v = vias[i] if isinstance(v.road_id, sstypes.JunctionEdgeIDResolver): vias[i] = replace(v, road_id=v.road_id.to_edge(generator.road_network)) return tuple(vias) def _validate_missions(missions): for mission in missions: _validate_entry_tactic(mission) def _validate_entry_tactic(mission): if not mission.entry_tactic: return if isinstance(mission.entry_tactic, sstypes.TrapEntryTactic): if not mission.entry_tactic.zone and not isinstance( mission.entry_tactic.zone, sstypes.MapZone ): return z_edge, _, _ = mission.entry_tactic.zone.start if isinstance(mission, sstypes.EndlessMission): edge, _, _ = mission.start assert ( edge == z_edge ), f"Zone edge `{z_edge}` is not the same edge as `types.EndlessMission` start edge `{edge}`" elif isinstance(mission, (sstypes.Mission, sstypes.LapMission)): edge, _, _ = mission.route.begin assert ( edge == z_edge ), f"Zone edge `{z_edge}` is not the same edge as `types.Mission` route begin edge `{edge}`"
[docs]def gen_traffic_histories( scenario: str, histories_datasets: Sequence[Union[sstypes.TrafficHistoryDataset, str]], map_spec: Optional[sstypes.MapSpec] = None, ): """Converts traffic history to a format that SMARTS can use. Args: scenario: The scenario directory histories_datasets: A sequence of traffic history descriptors. map_spec: An optional map specification that takes precedence over scenario directory information. """ _check_if_called_externally() road_map = None # shared across all history_datasets in scenario for hdsr in histories_datasets: assert isinstance(hdsr, sstypes.TrafficHistoryDataset) if not hdsr.input_path: print(f"skipping placeholder dataset spec '{hdsr.name}'.") continue from smarts.sstudio import genhistories map_bbox = None if hdsr.filter_off_map or hdsr.flip_y: if map_spec: if not road_map: road_map, _ = map_spec.builder_fn(map_spec) assert road_map map_bbox = road_map.bounding_box else: logger.warn( f"no map_spec supplied, so unable to filter off-map coordinates and/or flip_y for {hdsr.name}" ) output_dir = os.path.join(scenario, "build") genhistories.import_dataset(hdsr, output_dir, map_bbox)
[docs]def gen_metadata(scenario: str, scenario_metadata: sstypes.StandardMetadata): """Generate the metadata for the scenario Args: scenario (str):The scenario directory scenario_metadata (smarts.sstudio.sstypes.standard_metadata.StandardMetadata): Scenario metadata information. """ _check_if_called_externally() output_path = os.path.join(scenario, "build", "scenario_metadata.yaml") with open(output_path, "w") as f: yaml.dump(scenario_metadata._dict_metadata, f)