# Copyright (C) 2020. Huawei Technologies Co., Ltd. All rights reserved.
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NON-INFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
from __future__ import annotations
import glob
import logging
import os
import pickle
import random
import uuid
import warnings
from functools import lru_cache
from itertools import cycle, product
from pathlib import Path
from typing import (
TYPE_CHECKING,
Any,
Callable,
Dict,
Generator,
Iterable,
List,
Optional,
Sequence,
Tuple,
Union,
)
import cloudpickle
import numpy as np
import yaml
from smarts.core.coordinates import Heading, Point, RefLinePoint
from smarts.core.data_model import SocialAgent
from smarts.core.plan import (
EndlessGoal,
LapMission,
NavigationMission,
PositionalGoal,
Start,
TraverseGoal,
VehicleSpec,
Via,
default_entry_tactic,
)
from smarts.core.traffic_history import TrafficHistory
from smarts.core.utils.core_math import (
combination_pairs_with_unique_indices,
radians_to_vec,
vec_to_radians,
)
from smarts.core.utils.file import make_dir_in_smarts_log_dir, path2hash
from smarts.core.utils.id import SocialAgentId
from smarts.sstudio import sstypes as sstudio_types
from smarts.sstudio.sstypes import EntryTactic, MapSpec
from smarts.sstudio.sstypes import Via as SSVia
VehicleWindow = TrafficHistory.TrafficHistoryVehicleWindow
# Suppress trimesh deprecation warning
with warnings.catch_warnings():
warnings.filterwarnings(
"ignore",
message="Please use `coo_matrix` from the `scipy.sparse` namespace, the `scipy.sparse.coo` namespace is deprecated.",
category=DeprecationWarning,
)
import trimesh # only suppress the warnings caused by trimesh
if TYPE_CHECKING:
from smarts.core.road_map import RoadMap
[docs]class Scenario:
"""The purpose of the Scenario is to provide an aggregate of all
code/configuration/assets that is specialized to a scenario.
Args:
scenario_root:
The scenario asset folder (i.e. './scenarios/trigger'.)
traffic_specs:
The social vehicle traffic specs.
missions:
agent_id to mission mapping.
map_spec:
If specified, allows specifying a :class:`~smarts.sstudio.sstypes.map_spec.MapSpec` at run-time
to override any spec that may have been pre-specified
in the scenario folder (or the default if none were).
Also see comments around the :class:`~smarts.sstudio.sstypes.map_spec.MapSpec` definition.
"""
def __init__(
self,
scenario_root: str,
traffic_specs: Sequence[str] = [],
missions: Optional[Dict[str, NavigationMission]] = None,
social_agents: Optional[Dict[str, Tuple[Any, SocialAgent]]] = None,
log_dir: Optional[str] = None,
surface_patches: Optional[Sequence[Dict[str, Any]]] = None,
traffic_history: Optional[str] = None,
map_spec: Optional[MapSpec] = None,
route: Optional[str] = None, # deprecated: use traffic_specs instead
):
self._logger = logging.getLogger(self.__class__.__name__)
self._root = scenario_root
self._traffic_specs = traffic_specs
if route:
warnings.warn(
"Scenario route property has been deprecated in favor of traffic_specs. Please update your code.",
category=DeprecationWarning,
)
traffic_path = os.path.join(scenario_root, "build", "traffic")
self._traffic_specs = [os.path.join(traffic_path, route)]
self._missions = missions or {}
self._bubbles = Scenario._discover_bubbles(scenario_root)
self._metadata = Scenario._discover_metadata(scenario_root)
self._social_agents = social_agents or {}
self._surface_patches = surface_patches
self._log_dir = self._resolve_log_dir(log_dir)
if traffic_history:
self._traffic_history = TrafficHistory(traffic_history)
default_lane_width = self._traffic_history.lane_width
else:
self._traffic_history = None
default_lane_width = None
# XXX: using a map builder_fn supplied by users is a security risk
# as SMARTS will be executing the code "as is". We are currently
# trusting our users to not try to sabotage their own simulations.
# In the future, this may need to be revisited if SMARTS is ever
# shared in a multi-user mode.
if not map_spec:
map_spec = Scenario.discover_map(self._root, 1.0, default_lane_width)
self._road_map, self._road_map_hash = map_spec.builder_fn(map_spec)
self._scenario_hash = path2hash(str(Path(self.root_filepath).resolve()))
os.makedirs(self._log_dir, exist_ok=True)
def __repr__(self):
return f"""Scenario(
_root={self._root},
_traffic_specs={self._traffic_specs},
_missions={self._missions},
)"""
[docs] @staticmethod
def get_scenario_list(scenarios_or_scenarios_dirs: Sequence[str]) -> Sequence[str]:
"""Find all specific scenario directories in the directory trees of the initial scenario
directory.
"""
scenario_roots = []
for root in scenarios_or_scenarios_dirs:
if Scenario.is_valid_scenario(root):
# This is the single scenario mode, only training against a single scenario
scenario_roots.append(root)
else:
scenario_roots.extend(Scenario.discover_scenarios(root))
return scenario_roots
[docs] @staticmethod
def scenario_variations(
scenarios_or_scenarios_dirs: Sequence[str],
agents_to_be_briefed: Sequence[str],
shuffle_scenarios: bool = True,
circular: bool = True,
) -> Generator["Scenario", None, None]:
"""Generate a cycle of scenario configurations.
Args:
scenarios_or_scenarios_dirs (Sequence[str]):
A sequence of either the scenario to run (see scenarios/ for some samples you
can use) OR a directory of scenarios to sample from.
agents_to_be_briefed:
Agent IDs that will be assigned a mission ("briefed" on a mission).
Returns:
A generator that serves up Scenarios.
"""
scenario_roots = Scenario.get_scenario_list(scenarios_or_scenarios_dirs)
if shuffle_scenarios:
np.random.shuffle(scenario_roots)
if circular:
scenario_roots = cycle(scenario_roots)
return Scenario.variations_for_all_scenario_roots(
scenario_roots, agents_to_be_briefed, shuffle_scenarios
)
[docs] @staticmethod
def variations_for_all_scenario_roots(
scenario_roots, agents_to_be_briefed, shuffle_scenarios=True
) -> Generator["Scenario", None, None]:
"""Convert scenario roots to concrete scenarios.
Args:
scenario_roots:
Scenario directories containing scenario resource files.
agents_to_be_briefed:
Agent IDs that will be assigned a mission ("briefed" on a mission).
shuffle_scenarios:
Return scenarios in a pseudo-random order.
Returns:
A generator that serves up Scenarios.
"""
for scenario_root in scenario_roots:
surface_patches = Scenario.discover_friction_map(scenario_root)
agent_missions = Scenario.discover_agent_missions(
scenario_root, agents_to_be_briefed
)
social_agent_infos = Scenario._discover_social_agents_info(scenario_root)
social_agents = [
{
agent_id: (agent.to_agent_spec(), (agent, mission))
for agent_id, (
agent,
mission,
) in per_episode_social_agent_infos.items()
}
for per_episode_social_agent_infos in social_agent_infos
]
# `or [None]` so that product(...) will not return an empty result
# but insted a [(..., `None`), ...].
agent_missions = agent_missions or [None]
if len(agents_to_be_briefed) > len(agent_missions):
warnings.warn(
f"Scenario `{scenario_root}` has {len(agent_missions)} missions and"
f" but there are {len(agents_to_be_briefed)} agents to assign"
" missions to. The missions will be padded with random missions."
)
mission_agent_groups = combination_pairs_with_unique_indices(
agents_to_be_briefed, agent_missions
)
social_agents = social_agents or [None]
traffic_histories = Scenario.discover_traffic_histories(scenario_root) or [
None
]
traffic = Scenario.discover_traffic(scenario_root) or [[]]
roll_traffic = 0
roll_social_agents = 0
roll_traffic_histories = 0
if shuffle_scenarios:
roll_traffic = random.randint(0, len(traffic))
roll_social_agents = random.randint(0, len(social_agents))
roll_traffic_histories = 0 # random.randint(0, len(traffic_histories))
for (
concrete_traffic,
concrete_agent_missions,
concrete_social_agents,
concrete_traffic_history,
) in product(
np.roll(traffic, roll_traffic, 0),
mission_agent_groups,
np.roll(social_agents, roll_social_agents, 0),
np.roll(traffic_histories, roll_traffic_histories, 0),
):
concrete_social_agent_missions = {
agent_id: mission
for agent_id, (_, (_, mission)) in (
concrete_social_agents or {}
).items()
}
# Filter out mission
concrete_social_agents = {
agent_id: (_agent_spec, social_agent)
for agent_id, (_agent_spec, (social_agent, _)) in (
concrete_social_agents or {}
).items()
}
yield Scenario(
scenario_root,
traffic_specs=concrete_traffic,
missions={
**{a_id: mission for a_id, mission in concrete_agent_missions},
**concrete_social_agent_missions,
},
social_agents=concrete_social_agents,
surface_patches=surface_patches,
traffic_history=concrete_traffic_history,
)
[docs] @staticmethod
def discover_agent_missions_count(scenario_root):
"""Retrieve the agent missions from the given scenario directory."""
missions_file = os.path.join(scenario_root, "build", "missions.pkl")
if os.path.exists(missions_file):
with open(missions_file, "rb") as f:
return len(pickle.load(f))
return 0
[docs] @staticmethod
def discover_agent_missions(scenario_root, agents_to_be_briefed):
"""Returns a sequence of {agent_id: mission} mappings.
If no missions are discovered we generate random ones. If there is only one
agent to be briefed we return a list of `{agent_id: mission}` cycling through
each mission. If there are multiple agents to be briefed we assume that each
one is intended to get its own mission and that `len(agents_to_be_briefed) ==
len(missions)`. In this case a list of one dictionary is returned.
"""
road_map, _ = Scenario.build_map(scenario_root)
missions = []
missions_file = os.path.join(scenario_root, "build", "missions.pkl")
if os.path.exists(missions_file):
with open(missions_file, "rb") as f:
missions = pickle.load(f)
missions = [
Scenario._extract_mission(actor_and_mission.mission, road_map)
for actor_and_mission in missions
]
if not missions:
missions = [None for _ in range(len(agents_to_be_briefed))]
return missions
[docs] @staticmethod
def discover_friction_map(scenario_root) -> List[Dict[str, Any]]:
"""Returns the list of surface patches parameters defined in
scenario file. Each element of the list contains the
parameters of the specified surface patch.
"""
surface_patches = []
friction_map_file = os.path.join(scenario_root, "build", "friction_map.pkl")
if os.path.exists(friction_map_file):
with open(friction_map_file, "rb") as f:
map_surface_patches = pickle.load(f)
for surface_patch in map_surface_patches:
surface_patches.append(
{
"zone": surface_patch.zone,
"begin_time": surface_patch.begin_time,
"end_time": surface_patch.end_time,
"friction coefficient": surface_patch.friction_coefficient,
}
)
return surface_patches
@staticmethod
@lru_cache(maxsize=16)
def _discover_social_agents_info(
scenario: Union[str, Scenario],
) -> Sequence[Dict[str, Tuple[SocialAgent, NavigationMission]]]:
"""Loops through the social agent mission pickles, instantiating corresponding
implementations for the given types. The output is a list of
{agent_id: (mission, locator)}, where each dictionary corresponds to the
social agents to run for a given concrete Scenario (which translates to
"per episode" when swapping).
"""
scenario_root = (
scenario.root_filepath if isinstance(scenario, Scenario) else scenario
)
road_map, _ = Scenario.build_map(scenario_root)
social_agents_path = os.path.join(scenario_root, "build", "social_agents")
if not os.path.exists(social_agents_path):
return []
# [ ( missions_file, agent_actor, Mission ) ]
agent_bucketer = []
# like dict.setdefault
def setdefault(l: list, index: int, default):
while len(l) < index + 1:
l.append([])
return l[index]
file_match = os.path.join(social_agents_path, "*.pkl")
for missions_file_path in glob.glob(file_match):
with open(missions_file_path, "rb") as missions_file:
count = 0
missions = pickle.load(missions_file)
for mission_and_actor in missions:
# Each pickle file will contain a list of actor/mission pairs. The pairs
# will most likely be generated in an M:N fashion
# (i.e. A1: M1, A1: M2, A2: M1, A2: M2). The desired behavior is to have
# a single pair per concrete Scenario (which would translate to
# "per episode" when swapping)
assert isinstance(
mission_and_actor.actor, sstudio_types.SocialAgentActor
)
actor = mission_and_actor.actor
extracted_mission = Scenario._extract_mission(
mission_and_actor.mission, road_map
)
namespace = os.path.basename(missions_file_path)
namespace = os.path.splitext(namespace)[0]
setdefault(agent_bucketer, count, []).append(
(
SocialAgent(
id=SocialAgentId.new(actor.name, group=namespace),
actor_name=actor.name,
is_boid=False,
is_boid_keep_alive=False,
agent_locator=actor.agent_locator,
policy_kwargs=actor.policy_kwargs,
initial_speed=actor.initial_speed,
),
extracted_mission,
)
)
count += 1
social_agents_info = []
for l in agent_bucketer:
social_agents_info.append(
{agent.id: (agent, mission) for agent, mission in l}
)
return social_agents_info
[docs] @staticmethod
def discover_scenarios(scenario_or_scenarios_dir):
"""Retrieve all specific scenarios in the directory tree of the given scenario directory.
Args:
scenario_or_scenario_dir:
A directory that either immediately contains a scenario or the root of a directory tree that contains multiple scenarios.
Returns:
All specific scenarios.
"""
if Scenario.is_valid_scenario(scenario_or_scenarios_dir):
# This is the single scenario mode, only training against a single scenario
scenario = scenario_or_scenarios_dir
discovered_scenarios = [scenario]
else:
# Find all valid scenarios in the given scenarios directory
discovered_scenarios = []
for scenario_file in os.listdir(scenario_or_scenarios_dir):
scenario_root = os.path.join(scenario_or_scenarios_dir, scenario_file)
if Scenario.is_valid_scenario(scenario_root):
discovered_scenarios.append(scenario_root)
assert (
len(discovered_scenarios) > 0
), f"No valid scenarios found in {scenario_or_scenarios_dir}"
return discovered_scenarios
[docs] @staticmethod
def build_map(scenario_root: str) -> Tuple[Optional[RoadMap], Optional[str]]:
"""Builds a road map from the given scenario's resources."""
# XXX: using a map builder_fn supplied by users is a security risk
# as SMARTS will be executing the code "as is". We are currently
# trusting our users to not try to sabotage their own simulations.
# In the future, this may need to be revisited if SMARTS is ever
# shared in a multi-user mode.
map_spec = Scenario.discover_map(scenario_root)
return map_spec.builder_fn(map_spec)
[docs] @staticmethod
def discover_map(
scenario_root: str,
lanepoint_spacing: Optional[float] = None,
default_lane_width: Optional[float] = None,
shift_to_origin: bool = False,
) -> MapSpec:
"""Generates the map specification from the given scenario's file resources.
Args:
scenarios_root:
A specific scenario to run (e.g. scenarios/sumo/loop)
lanepoint_spacing:
The distance between lane-points that represent a lane's geometry.
default_lane_width:
The default width of a lane from its center if it does not have a specific width.
shift_to_origin:
Shifts the map location to near the simulation origin so that the map contains (0, 0).
Returns:
A new map spec.
"""
path = os.path.join(scenario_root, "build", "map", "map_spec.pkl")
if not os.path.exists(path):
# Use our default map builder if none specified by scenario...
return MapSpec(
scenario_root,
lanepoint_spacing,
default_lane_width,
shift_to_origin,
)
with open(path, "rb") as f:
road_map = cloudpickle.load(f)
return road_map
[docs] @staticmethod
def discover_routes(scenario_root):
"""Discover the route files in the given scenario.
>>> Scenario.discover_routes("scenarios/sumo/intersections/2lane")
['all.rou.xml', 'horizontal.rou.xml', 'turns.rou.xml', 'unprotected_left.rou.xml', 'vertical.rou.xml']
>>> Scenario.discover_routes("scenarios/sumo/loop") # loop does not have any routes
['basic.rou.xml']
"""
warnings.warn(
"Scenario.discover_routes() has been deprecated in favor of Scenario.discover_traffic(). Please update your code.",
category=DeprecationWarning,
)
return sorted(
[
os.path.basename(r)
for r in glob.glob(
os.path.join(scenario_root, "build", "traffic", "*.rou.xml")
)
]
)
[docs] @staticmethod
def discover_traffic(scenario_root: str) -> List[Optional[List[str]]]:
"""Discover the traffic spec files in the given scenario."""
traffic_path = os.path.join(scenario_root, "build", "traffic")
# combine any SMARTS and SUMO traffic together...
sumo_traffic = glob.glob(os.path.join(traffic_path, "*.rou.xml"))
smarts_traffic = glob.glob(os.path.join(traffic_path, "*.smarts.xml"))
if sumo_traffic and not smarts_traffic:
return [[ts] for ts in sumo_traffic]
elif not sumo_traffic and smarts_traffic:
return [[ts] for ts in smarts_traffic]
return [list(ts) for ts in product(sumo_traffic, smarts_traffic)]
@staticmethod
def _discover_bubbles(scenario_root):
path = os.path.join(scenario_root, "build", "bubbles.pkl")
if not os.path.exists(path):
return []
with open(path, "rb") as f:
bubbles = pickle.load(f)
return bubbles
@staticmethod
def _discover_metadata(scenario_root):
path = os.path.join(scenario_root, "build", "scenario_metadata.yaml")
if not os.path.exists(path):
return dict()
with open(path, "r") as f:
metadata = yaml.load(f, yaml.Loader)
return metadata
[docs] def set_ego_missions(self, ego_missions: Dict[str, NavigationMission]):
"""Replaces the ego missions within the scenario.
Args:
ego_missions: Ego agent ids mapped to missions.
"""
self._missions = ego_missions
[docs] def get_vehicle_start_at_time(
self, vehicle_id: str, start_time: float
) -> Tuple[Start, float]:
"""Returns a Start object that can be used to create a Mission for
a vehicle from a traffic history dataset starting at its location
at start_time. Also returns its speed at that time."""
pphs = self._traffic_history.vehicle_pose_at_time(vehicle_id, start_time)
assert pphs
pos_x, pos_y, heading, speed = pphs
# missions start from front bumper, but pos is center of vehicle
veh_dims = self._traffic_history.vehicle_dims(vehicle_id)
hhx, hhy = radians_to_vec(heading) * (0.5 * veh_dims.length)
return (
Start(
np.array([pos_x + hhx, pos_y + hhy]),
Heading(heading),
from_front_bumper=True,
),
speed,
)
[docs] def get_vehicle_goal(self, vehicle_id: str) -> Point:
"""Get the final position for a history vehicle."""
final_exit_time = self._traffic_history.vehicle_final_exit_time(vehicle_id)
final_pose = self._traffic_history.vehicle_pose_at_time(
vehicle_id, final_exit_time
)
assert final_pose
final_pos_x, final_pos_y, _, _ = final_pose
return Point(final_pos_x, final_pos_y)
[docs] def discover_missions_of_traffic_histories(self) -> Dict[str, NavigationMission]:
"""Retrieves the missions of traffic history vehicles."""
vehicle_missions = {}
for row in self._traffic_history.first_seen_times():
v_id = str(row[0])
start_time = float(row[1])
start, speed = self.get_vehicle_start_at_time(v_id, start_time)
entry_tactic = default_entry_tactic(speed)
veh_config_type = self._traffic_history.vehicle_config_type(v_id)
veh_dims = self._traffic_history.vehicle_dims(v_id)
vehicle_missions[v_id] = NavigationMission(
start=start,
entry_tactic=entry_tactic,
goal=TraverseGoal(self.road_map),
start_time=start_time,
vehicle_spec=VehicleSpec(
veh_id=v_id,
veh_config_type=veh_config_type,
dimensions=veh_dims,
),
)
return vehicle_missions
[docs] def create_dynamic_traffic_history_mission(
self, vehicle_id: str, trigger_time: float, positional_radius: int
) -> Tuple[NavigationMission, NavigationMission]:
"""Builds missions out of the given vehicle information.
Args:
vehicle_id:
The id of a vehicle in the traffic history dataset.
trigger_time:
The time that this mission should become active.
positional_radius:
The goal radius for the positional goal.
Returns:
(smarts.core.plan.NavigationMission, smarts.core.plan.NavigationMission): A positional mission that follows the initial
original vehicle's travel as well as a traverse style mission which is done when the
vehicle leaves the map.
"""
start, speed = self.get_vehicle_start_at_time(vehicle_id, trigger_time)
veh_goal = self.get_vehicle_goal(vehicle_id)
entry_tactic = default_entry_tactic(speed)
# create a positional mission and a traverse mission
positional_mission = NavigationMission(
start=start,
entry_tactic=entry_tactic,
goal=PositionalGoal(veh_goal, radius=positional_radius),
)
traverse_mission = NavigationMission(
start=start,
entry_tactic=entry_tactic,
goal=TraverseGoal(self._road_map),
)
return positional_mission, traverse_mission
[docs] def history_missions_for_window(
self,
exists_at_or_after: float,
ends_before: float,
minimum_vehicle_window: float,
filter: Optional[
Callable[
[Iterable[VehicleWindow]],
Iterable[VehicleWindow],
]
] = None,
) -> Sequence[NavigationMission]:
"""Discovers vehicle missions for the given window of time.
:param exists_at_or_after: The starting time of any vehicles to query for.
:type exists_at_or_after: float
:param ends_before: The last point in time a vehicle should be in the simulation.
Vehicles ending after that time are not considered.
:type ends_before: float
:param minimum_vehicle_window: The minimum time that a vehicle must be in the simulation
to be considered for a mission.
:type minimum_vehicle_window: float
:param filter: A filter in the form of ``(func(Sequence[TrafficHistoryVehicleWindow]) -> Sequence[TrafficHistoryVehicleWindow])``,
which passes in traffic vehicle information and then should be used purely to filter the sequence down.
:return: A set of missions derived from the traffic history.
:rtype: List[smarts.core.plan.NavigationMission]
"""
vehicle_windows = self._traffic_history.vehicle_windows_in_range(
exists_at_or_after, ends_before, minimum_vehicle_window
)
def _gen_mission(vw: TrafficHistory.TrafficHistoryVehicleWindow):
assert isinstance(
vw, TrafficHistory.TrafficHistoryVehicleWindow
), "`filter(..)` likely returns malformed data."
v_id = str(vw.vehicle_id)
start_time = float(vw.start_time)
start = Start(
np.array(vw.axle_start_position),
Heading(vw.start_heading),
)
entry_tactic = default_entry_tactic(vw.start_speed)
veh_config_type = vw.vehicle_type
veh_dims = vw.dimensions
vehicle_mission = NavigationMission(
start=start,
entry_tactic=entry_tactic,
goal=TraverseGoal(self.road_map),
start_time=start_time,
vehicle_spec=VehicleSpec(
veh_id=v_id,
veh_config_type=veh_config_type,
dimensions=veh_dims,
),
)
return vehicle_mission
if filter is not None:
vehicle_windows = filter(vehicle_windows)
return [_gen_mission(vw) for vw in vehicle_windows]
[docs] @staticmethod
def discover_traffic_histories(scenario_root: str):
"""Finds all existing traffic history files in the specific scenario."""
build_dir = Path(scenario_root) / "build"
return [
entry
for entry in os.scandir(str(build_dir))
if entry.is_file() and entry.path.endswith(".shf")
]
@staticmethod
def _extract_mission(mission, road_map):
"""Takes a sstudio.sstypes.(Mission, EndlessMission, etc.) and converts it to
the corresponding SMARTS mission types.
"""
def resolve_offset(offset: Union[str, float], lane_length: float):
# epsilon to ensure we are within this edge and not the subsequent one
epsilon = 1e-6
lane_length -= epsilon
if offset == "base":
return epsilon
elif offset == "max":
return lane_length
elif offset == "random":
return random.uniform(epsilon, lane_length)
else:
return float(offset)
def to_position_and_heading(
road_id: str, lane_index: int, offset: Union[str, float], road_map: RoadMap
):
road = road_map.road_by_id(road_id)
lane = road.lane_at_index(lane_index)
offset = resolve_offset(offset, lane.length)
position = lane.from_lane_coord(RefLinePoint(s=offset))
lane_vector = lane.vector_at_offset(offset)
heading = vec_to_radians(lane_vector[:2])
return position, Heading(heading)
def to_scenario_via(
vias: Tuple[SSVia, ...], road_map: RoadMap
) -> Tuple[Via, ...]:
s_vias = []
for via in vias:
road = road_map.road_by_id(via.road_id)
lane = road.lane_at_index(via.lane_index)
lane_width, _ = lane.width_at_offset(via.lane_offset)
hit_distance = (
via.hit_distance if via.hit_distance > 0 else lane_width / 2
)
via_position = lane.from_lane_coord(RefLinePoint(via.lane_offset))
s_vias.append(
Via(
lane_id=lane.lane_id,
lane_index=via.lane_index,
road_id=via.road_id,
position=tuple(via_position[:2]),
hit_distance=hit_distance,
required_speed=via.required_speed,
)
)
return tuple(s_vias)
# XXX: For now we discard the route and just take the start and end to form our missions.
# In the future, we could create a Plan object here too when there's a route specified.
if isinstance(mission, sstudio_types.Mission):
position, heading = to_position_and_heading(
*mission.route.begin,
road_map,
)
start = Start(position, heading)
position, _ = to_position_and_heading(
*mission.route.end,
road_map,
)
goal = PositionalGoal(position, radius=2)
entry_tactic = mission.entry_tactic
start_time = Scenario._extract_mission_start_time(mission, entry_tactic)
return NavigationMission(
start=start,
route_vias=mission.route.via,
goal=goal,
start_time=start_time,
entry_tactic=entry_tactic,
via=to_scenario_via(mission.via, road_map),
)
elif isinstance(mission, sstudio_types.EndlessMission):
position, heading = to_position_and_heading(
*mission.begin,
road_map,
)
start = Start(position, heading)
entry_tactic = mission.entry_tactic
start_time = Scenario._extract_mission_start_time(mission, entry_tactic)
return NavigationMission(
start=start,
goal=EndlessGoal(),
start_time=start_time,
entry_tactic=entry_tactic,
via=to_scenario_via(mission.via, road_map),
)
elif isinstance(mission, sstudio_types.LapMission):
start_road_id, start_lane, start_road_offset = mission.route.begin
end_road_id, end_lane, end_road_offset = mission.route.end
travel_road = road_map.road_by_id(start_road_id)
if start_road_id == end_road_id:
travel_road = travel_road.outgoing_roads[0]
end_road = road_map.road_by_id(end_road_id)
via_roads = [road_map.road_by_id(r) for r in mission.route.via]
route = road_map.generate_routes(travel_road, end_road, via_roads, 1)[0]
start_position, start_heading = to_position_and_heading(
*mission.route.begin,
road_map,
)
end_position, _ = to_position_and_heading(
*mission.route.end,
road_map,
)
entry_tactic = mission.entry_tactic
start_time = Scenario._extract_mission_start_time(mission, entry_tactic)
return LapMission(
start=Start(start_position, start_heading),
goal=PositionalGoal(end_position, radius=2),
route_vias=mission.route.via,
start_time=start_time,
entry_tactic=entry_tactic,
via=to_scenario_via(mission.via, road_map),
num_laps=mission.num_laps,
route_length=route.road_length,
)
raise RuntimeError(
f"sstudio mission={mission} is an invalid type={type(mission)}"
)
@staticmethod
def _extract_mission_start_time(mission, entry_tactic: Optional[EntryTactic]):
return (
mission.start_time
if mission.start_time != sstudio_types.MISSING
else entry_tactic.start_time
if entry_tactic
else 0
)
[docs] @staticmethod
def is_valid_scenario(scenario_root) -> bool:
"""Checks if the scenario_root directory matches our expected scenario structure
>>> Scenario.is_valid_scenario("scenarios/sumo/loop")
True
>>> Scenario.is_valid_scenario("scenarios/non_existant")
False
"""
# just make sure we can load the map
try:
road_map, _ = Scenario.build_map(scenario_root)
except FileNotFoundError:
return False
return road_map is not None
[docs] @staticmethod
def next(scenario_iterator, log_id="") -> "Scenario":
"""Utility to override specific attributes from a scenario iterator"""
scenario = next(scenario_iterator)
scenario._log_id = log_id
return scenario
@property
def name(self) -> str:
"""The name of the scenario."""
return os.path.normpath(self._root)
@property
def root_filepath(self) -> str:
"""The root directory of the scenario."""
return self._root
@property
def surface_patches(self):
"""A list of surface areas with dynamics implications (e.g. icy road.)"""
return self._surface_patches
@property
def road_map_hash(self):
"""A hash value of this road map."""
return self._road_map_hash
@property
def plane_filepath(self) -> str:
"""The ground plane."""
return os.path.join(self._root, "plane.urdf")
@property
def vehicle_filepath(self) -> Optional[str]:
"""The file-path of the vehicle's physics model."""
warnings.warn(
"tire_parameters_filepath is no longer in use. Please update your code.",
category=DeprecationWarning,
)
if not os.path.isdir(self._root):
return None
for fname in os.listdir(self._root):
if fname.endswith("vehicle.urdf"):
return os.path.join(self._root, fname)
return None
@property
def tire_parameters_filepath(self) -> str:
"""The path of the tire model's parameters."""
warnings.warn(
"tire_parameters_filepath is no longer in use. Please update your code.",
category=DeprecationWarning,
)
return os.path.join(self._root, "tire_parameters.yaml")
@property
def controller_parameters_filepath(self) -> str:
"""The path of the vehicle controller parameters."""
warnings.warn(
"controller_parameters_filepath is no longer in use. Please update your code.",
category=DeprecationWarning,
)
return os.path.join(self._root, "controller_parameters.yaml")
@property
def vehicle_definitions_filepath(self) -> str:
"""The path to the default list of vehicle definitions."""
return os.path.join(self._root, "vehicle_definitions_list.yaml")
@property
def traffic_specs(self) -> Sequence[str]:
"""The traffic spec file names to use for this scenario."""
return self._traffic_specs
@property
def route(self) -> Optional[str]:
"""The traffic route file name."""
warnings.warn(
"Scenario route property has been deprecated in favor of traffic_specs. Please update your code.",
category=DeprecationWarning,
)
assert len(self._traffic_specs) <= 1
return (
os.path.basename(self._traffic_specs[0])
if len(self._traffic_specs) == 1
else None
)
@property
def route_files_enabled(self):
"""If there is a traffic route file."""
warnings.warn(
"Scenario route_file_enabled property has been deprecated in favor of traffic_specs. Please update your code.",
category=DeprecationWarning,
)
return bool(self._traffic_specs)
@property
def route_filepath(self):
"""The file-path to the traffic route file."""
warnings.warn(
"Scenario route_filepath property has been deprecated in favor of traffic_specs. Please update your code.",
category=DeprecationWarning,
)
assert len(self._traffic_specs) == 1
return self._traffic_specs[0]
@property
def map_glb_filepath(self):
"""The map geometry file-path."""
return os.path.join(self._root, "build", "map", "map.glb")
@property
def map_glb_metadata(self):
"""The metadata for the current map `.glb` file."""
metadata = self.map_glb_meta_for_file(self.map_glb_filepath)
return metadata
[docs] def unique_sumo_log_file(self):
"""A unique logging file for SUMO logging."""
return os.path.join(self._log_dir, f"sumo-{str(uuid.uuid4())[:8]}")
@property
def road_map(self) -> RoadMap:
"""The road map of the scenario."""
return self._road_map
@property
def map_spec(self) -> Optional[MapSpec]:
"""The map spec for the road map used in this scenario."""
return self.road_map.map_spec
@property
def supports_sumo_traffic(self) -> bool:
"""Returns True if this scenario uses a Sumo road network."""
from smarts.core.sumo_road_network import SumoRoadNetwork
return isinstance(self._road_map, SumoRoadNetwork)
[docs] @staticmethod
def any_support_sumo_traffic(scenarios: Sequence[str]) -> bool:
"""Determines if any of the given scenarios support Sumo traffic simulation."""
from smarts.core.default_map_builder import MapType, find_mapfile_in_dir
for scenario_root in Scenario.get_scenario_list(scenarios):
map_type, _ = find_mapfile_in_dir(scenario_root)
if map_type == MapType.Sumo:
return True
return False
[docs] @staticmethod
def all_support_sumo_traffic(scenarios: Sequence[str]) -> bool:
"""Determines if all given scenarios support Sumo traffic simulation."""
from smarts.core.sumo_road_network import SumoRoadNetwork
for scenario_root in Scenario.get_scenario_list(scenarios):
try:
road_map, _ = Scenario.build_map(scenario_root)
except FileNotFoundError:
raise FileNotFoundError(
f"Unable to find network file in map_source={scenario_root}."
)
if not isinstance(road_map, SumoRoadNetwork):
return False
return True
@property
def missions(self) -> Dict[str, NavigationMission]:
"""Agent missions contained within this scenario."""
return self._missions
@property
def social_agents(self) -> Dict[str, Tuple[Any, SocialAgent]]:
"""Managed social agents within this scenario."""
return self._social_agents
@property
def bubbles(self):
"""Bubbles within this scenario."""
return self._bubbles
[docs] def mission(self, agent_id) -> Optional[NavigationMission]:
"""Get the mission assigned to the given agent."""
return self._missions.get(agent_id, None)
def _resolve_log_dir(self, log_dir):
if log_dir is None:
log_dir = make_dir_in_smarts_log_dir("_sumo_run_logs")
return os.path.abspath(log_dir)
@property
def traffic_history(self) -> Optional[TrafficHistory]:
"""Traffic history contained within this scenario."""
return self._traffic_history
@property
def scenario_hash(self) -> str:
"""A hash of the scenario."""
return self._scenario_hash
@property
def metadata(self) -> Dict:
"""Scenario metadata values.
Returns:
Dict: The values.
"""
return self._metadata or {}