Source code for smarts.core.smarts

# Copyright (C) 2020. Huawei Technologies Co., Ltd. All rights reserved.
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NON-INFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
from __future__ import annotations

import importlib.resources as pkg_resources
import logging
import os
import re
import warnings
from functools import cached_property
from typing import (
    Any,
    Dict,
    Final,
    Iterable,
    List,
    Optional,
    Sequence,
    Set,
    Tuple,
    Union,
)

import numpy as np

from envision import etypes as envision_types
from envision.client import Client as EnvisionClient
from smarts import VERSION, assets
from smarts.core.actor_capture_manager import ActorCaptureManager
from smarts.core.id_actor_capture_manager import IdActorCaptureManager
from smarts.core.plan import Plan
from smarts.core.renderer_base import RendererBase
from smarts.core.sensors import SensorState
from smarts.core.simulation_local_constants import SimulationLocalConstants
from smarts.core.utils.core_logging import timeit
from smarts.core.utils.type_operations import TypeSuite

from . import config
from .actor import ActorRole, ActorState
from .agent_interface import AgentInterface
from .agent_manager import AgentManager
from .agents_provider import (
    AgentPhysicsProvider,
    AgentsProvider,
    DirectControlProvider,
    MotionPlannerProvider,
    TrajectoryInterpolationProvider,
)
from .bubble_manager import BubbleManager
from .controllers import ActionSpaceType
from .coordinates import BoundingBox, Point
from .external_provider import ExternalProvider
from .observations import Observation
from .provider import (
    ActorProviderTransition,
    Provider,
    ProviderManager,
    ProviderRecoveryFlags,
    ProviderState,
)
from .road_map import RoadMap
from .scenario import NavigationMission, Scenario
from .sensor_manager import SensorManager
from .signal_provider import SignalProvider
from .signals import SignalLightState, SignalState
from .simulation_frame import SimulationFrame
from .traffic_history_provider import TrafficHistoryProvider
from .traffic_provider import TrafficProvider
from .trap_manager import TrapManager
from .utils import pybullet
from .utils.core_math import rounder_for_dt
from .utils.custom_exceptions import RendererException
from .utils.id import Id
from .utils.pybullet import bullet_client as bc
from .vehicle import Vehicle
from .vehicle_index import VehicleIndex
from .vehicle_state import Collision, VehicleState, neighborhood_vehicles_around_vehicle

logging.basicConfig(
    format="%(asctime)s.%(msecs)03d %(levelname)s: {%(module)s} %(message)s",
    datefmt="%Y-%m-%d,%H:%M:%S",
    level=logging.ERROR,
)

MAX_PYBULLET_FREQ: Final[int] = 240


[docs]class SMARTSNotSetupError(Exception): """Represents a case where SMARTS cannot operate because it is not set up yet.""" pass
[docs]class SMARTSDestroyedError(Exception): """Represents a case where SMARTS cannot operate because it is destroyed.""" pass
[docs]class SMARTS(ProviderManager): """The core SMARTS simulator. This is the direct interface to all parts of the simulation. Args: agent_interfaces (Dict[str, AgentInterface]): The interfaces providing SMARTS with the understanding of what features each agent needs. traffic_sims (Optional[List[TrafficProvider]], optional): An optional list of traffic simulators for providing non-agent traffic. Defaults to None. envision (Optional[envision.client.Client], optional): An envision client for connecting to an envision visualization server. Defaults to None. visdom (Union[bool, Any], optional): Deprecated. Use SMARTS_VISDOM_ENABLED. A visdom client for connecting to a visdom visualization server. fixed_timestep_sec (Optional[float], optional): The fixed timestep that will be default if time is not otherwise specified at step. Defaults to 0.1. reset_agents_only (bool, optional): When specified the simulation will continue use of the current scenario. Defaults to False. external_provider (bool, optional): Creates a special provider `SMARTS.external_provider` that allows for inserting state. Defaults to False. """ def __init__( self, agent_interfaces: Dict[str, AgentInterface], # traffic_sim is deprecated: use traffic_sims instead traffic_sim: Optional[TrafficProvider] = None, traffic_sims: Optional[List[TrafficProvider]] = None, envision: Optional[EnvisionClient] = None, visdom: Optional[Union[bool, Any]] = False, fixed_timestep_sec: Optional[float] = 0.1, reset_agents_only: bool = False, external_provider: bool = False, ): conf = config() self._log = logging.getLogger(self.__class__.__name__) self._sim_id = Id.new("smarts") self._is_setup = False self._is_destroyed = False self._scenario: Optional[Scenario] = None self._renderer: RendererBase = None self._envision: Optional[EnvisionClient] = envision if visdom is not False and visdom is not None: warnings.warn( "Using visdom through the arguments is deprecated. Please use engine configuration or SMARTS_VISDOM_ENABLED." ) else: visdom = None self._visdom: Any = None if conf("visdom", "enabled", cast=bool) or visdom is True: from smarts.visdom.visdom_client import VisdomClient self._visdom = VisdomClient( hostname=conf("visdom", "hostname"), port=conf("visdom", "port"), ) elif not isinstance(self._visdom, bool): self._visdom = visdom self._external_provider: ExternalProvider = None self._resetting = False self._reset_required = False assert fixed_timestep_sec is None or fixed_timestep_sec > 0 self.fixed_timestep_sec: Optional[float] = fixed_timestep_sec self._last_dt = fixed_timestep_sec self._elapsed_sim_time = 0.0 self._total_sim_time = 0.0 self._step_count = 0 self._signal_provider = SignalProvider() self._agent_physics_provider = AgentPhysicsProvider(self) self._direct_control_provider = DirectControlProvider(self) self._motion_planner_provider = MotionPlannerProvider(self) self._traffic_history_provider = TrafficHistoryProvider() self._trajectory_interpolation_provider = TrajectoryInterpolationProvider(self) traffic_sims = traffic_sims or [] traffic_sims.append(self._traffic_history_provider) if traffic_sim: warnings.warn( "SMARTS traffic_sim property has been deprecated in favor of traffic_sims. Please update your code.", category=DeprecationWarning, ) traffic_sims += [traffic_sim] self._provider_suite = TypeSuite(Provider) if external_provider: self._external_provider = ExternalProvider(self) self.add_provider(self._external_provider) self.add_provider(self._agent_physics_provider) self.add_provider(self._direct_control_provider) self.add_provider(self._motion_planner_provider) self.add_provider(self._trajectory_interpolation_provider) for traffic_sim in traffic_sims: recovery_flags = ( ProviderRecoveryFlags.EPISODE_REQUIRED | ProviderRecoveryFlags.ATTEMPT_RECOVERY | ProviderRecoveryFlags.RELINQUISH_ACTORS ) self.add_provider(traffic_sim, recovery_flags) self.add_provider(self._signal_provider) # We buffer provider state between steps to compensate for TRACI's timestep delay self._last_provider_state = None self._reset_agents_only = reset_agents_only # a.k.a "teleportation" # For macOS GUI. See our `BulletClientMACOS` docstring for details. # from .utils.pybullet import BulletClientMACOS # self._bullet_client = BulletClientMACOS(pybullet.GUI) self._bullet_client = pybullet.SafeBulletClient( pybullet.DIRECT ) # pylint: disable=no-member # Set up indices self._sensor_manager = SensorManager() self._vehicle_index = VehicleIndex() self._agent_manager = AgentManager(self, agent_interfaces) # TODO: Should not be stored in SMARTS self._vehicle_collisions: Dict[str, List[Collision]] = dict() self._vehicle_states = [] self._bubble_manager = None self._trap_manager: TrapManager = TrapManager() self._actor_capture_managers: List[ActorCaptureManager] = [ self._trap_manager, IdActorCaptureManager(), ] self._ground_bullet_id = None self._map_bb = None
[docs] def step( self, agent_actions: Dict[str, Any], time_delta_since_last_step: Optional[float] = None, ) -> Tuple[ Dict[str, Observation], Dict[str, float], Dict[str, bool], Dict[str, Dict[str, float]], ]: """Progress the simulation by a fixed or specified time. Args: agent_actions: Actions that the agents want to perform on their actors. time_delta_since_last_step: Overrides the simulation step length. Progress simulation time by the given amount. Note the time_delta_since_last_step param is in (nominal) seconds. Returns: The simulation step return as (observations, rewards, dones, infos). """ if not self._is_setup: raise SMARTSNotSetupError("Must call reset() or setup() before stepping.") self._check_valid() assert not ( self._fixed_timestep_sec and time_delta_since_last_step ), "cannot switch from fixed- to variable-time steps mid-simulation" try: with timeit("Last SMARTS Simulation Step", self._log.info): return self._step(agent_actions, time_delta_since_last_step) except (KeyboardInterrupt, SystemExit): # ensure we clean-up if the user exits the simulation self._log.info("Simulation was interrupted by the user.") self.destroy() raise # re-raise the KeyboardInterrupt except RendererException: self.destroy() raise # re-raise renderer not found exceptions without retrying. except Exception as e: self._log.error( "Simulation crashed with exception. Attempting to cleanly shutdown." ) self._log.exception(e) if not config().get_setting("core", "debug", cast=bool): self.destroy() raise # re-raise
def _check_if_acting_on_active_agents(self, agent_actions): for agent_id in agent_actions.keys(): if agent_id not in self._agent_manager.ego_agent_ids: self._log.warning( f"Attempted to perform actions on non-existing agent, {agent_id} " ) def _step(self, agent_actions, time_delta_since_last_step: Optional[float] = None): """Steps through the simulation while applying the given agent actions. Returns the observations, rewards, done, and infos signals. """ # Due to a limitation of our traffic simulator(SUMO) interface(TRACI), we can # only observe traffic state of the previous simulation step. # # To compensate for this, we: # # 0. Advance the simulation clock # 1. Fetch social agent actions # 2. Step all providers and harmonize state # 3. Step bubble manager # 4. Calculate observation and reward # 5. Send observations to social agents # 6. Clear done agents # 7. Perform visualization # # In this way, observations and reward are computed with data that is # consistently with one step of latency and the agent will observe consistent # data. # 0. Advance the simulation clock. # It's been this long since our last step. self._last_dt = time_delta_since_last_step or self._fixed_timestep_sec or 0.1 self._elapsed_sim_time = self._rounder(self._elapsed_sim_time + self._last_dt) # 1. Fetch agent actions with timeit("Fetching agent actions", self._log.debug): all_agent_actions = self._agent_manager.fetch_agent_actions(agent_actions) # 2. Step all providers and harmonize state with timeit("Stepping all providers and harmonizing state", self._log.debug): provider_state = self._step_providers(all_agent_actions) with timeit("Checking if all agents are active", self._log.debug): self._check_if_acting_on_active_agents(agent_actions) # 3. Step bubble manager and trap manager with timeit("Syncing vehicle index", self._log.debug): self._vehicle_index.sync() with timeit("Stepping through bubble manager", self._log.debug): self._bubble_manager.step(self) with timeit("Stepping through actor capture managers", self._log.debug): for actor_capture_manager in self._actor_capture_managers: actor_capture_manager.step(self) # 4. Calculate observation and reward # We pre-compute vehicle_states here because we *think* the users will # want these during their observation/reward computations. # This is a hack to give us some short term perf wins. Longer term we # need to expose better support for batched computations with timeit("Syncing provider state", self._log.debug): self._sync_smarts_and_provider_actor_states(provider_state) self._sensor_manager.clean_up_sensors_for_actors( set(v.actor_id for v in self._vehicle_states), renderer=self.renderer_ref, ) # Reset frame state try: del self.cached_frame except AttributeError: pass self.cached_frame # Agents with timeit("Stepping through sensors", self._log.debug): self._sensor_manager.step(self.cached_frame, self.renderer_ref) if self.renderer_ref: # runs through the render pipeline (for camera-based sensors) # MUST perform this after _sensor_manager.step() above, and before observe() below, # so that all updates are ready before rendering happens per with timeit("Syncing the renderer", self._log.debug): self.renderer_ref.sync(self.cached_frame) # TODO OBSERVATIONS: this needs to happen between the base and rendered observations # with timeit("Running through the render pipeline", self._log.debug): # self.renderer_ref.render() # TODO OBSERVATIONS: Need to split the observation generation with timeit("Calculating observations and rewards", self._log.debug): observations, rewards, scores, dones = self._agent_manager.observe() with timeit("Filtering response for ego", self._log.debug): response_for_ego = self._agent_manager.filter_response_for_ego( (observations, rewards, scores, dones) ) # 5. Send observations to social agents with timeit("Sending observations to social agents", self._log.debug): self._agent_manager.send_observations_to_social_agents(observations) # 6. Clear done agents with timeit("Clearing done agents", self._log.debug): self._teardown_done_agents_and_vehicles(dones) # 7. Perform visualization with timeit("Trying to emit the envision state", self._log.debug): self._try_emit_envision_state(provider_state, observations, scores) with timeit("Trying to emit the visdom observations", self._log.debug): self._try_emit_visdom_obs(observations) observations, rewards, scores, dones = response_for_ego extras = dict(scores=scores) self._step_count += 1 return observations, rewards, dones, extras def _teardown_done_agents_and_vehicles(self, dones): def done_vehicle_ids(dones): vehicle_ids = set() for agent_id, done in dones.items(): if self._agent_manager.is_boid_agent(agent_id): vehicle_ids.update(id_ for id_ in done if done[id_]) elif done: ids = self._vehicle_index.vehicle_ids_by_owner_id(agent_id) # 0 if shadowing, 1 if active assert len(ids) <= 1, f"{len(ids)} <= 1" vehicle_ids.update(ids) return vehicle_ids def done_agent_ids(dones): agent_ids = set() for agent_id, done in dones.items(): if self._agent_manager.is_boid_agent(agent_id): if not self.agent_manager.is_boid_keep_alive_agent( agent_id ) and all(dones[agent_id].values()): agent_ids.add(agent_id) elif done: agent_ids.add(agent_id) return agent_ids # XXX: These can not be put inline because we do queries that must proceed # the actual teardown. agents_to_teardown = done_agent_ids(dones) self._agent_manager.teardown_ego_agents(agents_to_teardown) self._agent_manager.teardown_social_agents(agents_to_teardown) vehicles_to_teardown = done_vehicle_ids(dones) self._teardown_vehicles(vehicles_to_teardown)
[docs] def reset( self, scenario: Scenario, start_time: float = 0.0 ) -> Dict[str, Observation]: """Reset the simulation, reinitialize with the specified scenario. Then progress the simulation up to the first time an agent returns an observation, or `start_time` if there are no agents in the simulation. Args: scenario (smarts.core.scenario.Scenario): The scenario to reset the simulation with. start_time (float): The initial amount of simulation time to skip. This has implications on all time dependent systems. .. note:: SMARTS simulates a step and then updates vehicle control. If you want a vehicle to enter at exactly ``0.3`` with a step of ``0.1`` it means the simulation should start at ``start_time==0.2``. Returns: Dict[str, Observation]. This observation is as follows... - If no agents: the initial simulation observation at `start_time` - If agents: the first step of the simulation with an agent observation """ tries = config()("core", "reset_retries", cast=int) + 1 first_exception = None for _ in range(tries): try: self._resetting = True return self._reset(scenario, start_time) except Exception as err: if not first_exception: first_exception = err finally: self._resetting = False self._log.error("Failed to successfully reset after %i tries.", tries) raise first_exception
def _reset(self, scenario: Scenario, start_time: float): self._check_valid() self._total_sim_time += self._elapsed_sim_time self._elapsed_sim_time = max(0, start_time) # The past is not allowed self._step_count = 0 if ( scenario == self._scenario and self._reset_agents_only and not self._reset_required ): vehicle_ids_to_teardown = set() agent_ids = self._agent_manager.teardown_ego_agents() agent_ids |= self.agent_manager.teardown_social_agents() for agent_id in agent_ids: ids = self._vehicle_index.vehicle_ids_by_owner_id(agent_id) vehicle_ids_to_teardown |= set(ids) self._teardown_vehicles(set(vehicle_ids_to_teardown)) self._reset_providers() for actor_capture_manager in self._actor_capture_managers: actor_capture_manager.reset(scenario, self) self._agent_manager.init_ego_agents() if self._renderer: self._renderer.reset() else: self.teardown() self._reset_providers() self.setup(scenario) if not self.local_constants.road_map.is_same_map(scenario.map_spec): del self.local_constants self.local_constants # Tell history provide to ignore vehicles if we have assigned mission to them self._traffic_history_provider.set_replaced_ids( m.vehicle_spec.veh_id for m in scenario.missions.values() if m and m.vehicle_spec ) self._reset_required = False self._vehicle_states = [v.state for v in self._vehicle_index.vehicles] try: del self.cached_frame except AttributeError: pass observations, _, _, _ = self._agent_manager.observe() observations_for_ego = self._agent_manager.reset_agents(observations) # Visualization self._try_emit_visdom_obs(observations) while len(self._agent_manager.ego_agent_ids) and len(observations_for_ego) < 1: observations_for_ego, _, _, _ = self.step({}) return observations_for_ego
[docs] def setup(self, scenario: Scenario): """Setup the next scenario.""" self._check_valid() self._scenario = scenario self._setup_bullet_client(self._bullet_client) provider_state = self._setup_providers(self._scenario) self._vehicle_index.load_vehicle_definitions_list( scenario.vehicle_definitions_filepath ) self._agent_manager.setup_agents() self._bubble_manager = BubbleManager(scenario.bubbles, scenario.road_map) for actor_capture_manager in self._actor_capture_managers: actor_capture_manager.reset(scenario, self) if self._renderer or any( a.requires_rendering for a in self._agent_manager.agent_interfaces.values() ): self.renderer.setup(scenario) self._harmonize_providers(provider_state) self._last_provider_state = provider_state self._is_setup = True
[docs] def add_provider( self, provider: Provider, recovery_flags: ProviderRecoveryFlags = ProviderRecoveryFlags.EXPERIMENT_REQUIRED, ): """Add a provider to the simulation. A provider is a co-simulator conformed to a common interface. """ self._check_valid() assert isinstance(provider, Provider) provider.recovery_flags = recovery_flags self._provider_suite.insert(provider) if isinstance(provider, TrafficProvider): provider.set_manager(self)
[docs] def remove_provider(self, requested_type_or_provider: Union[type, Provider]): """Remove a provider from the simulation. Args: requested_type (type | Provider): The type of the provider to remove or provider to remove. Returns: Optional[Provider]: The provider that was removed. """ self._check_valid() out_provider = None if isinstance(requested_type_or_provider, type): out_provider = self._provider_suite.remove_by_type( requested_type_or_provider ) elif isinstance(requested_type_or_provider, Provider): out_provider = self._provider_suite.remove(requested_type_or_provider) return out_provider
[docs] def switch_ego_agents(self, agent_interfaces: Dict[str, AgentInterface]): """Change the ego agents in the simulation. Effective on the next reset.""" self._check_valid() self._agent_manager.switch_initial_agents(agent_interfaces) self._is_setup = False
[docs] def add_agent_with_mission( self, agent_id: str, agent_interface: AgentInterface, mission: NavigationMission ): """Add an agent to the simulation. The simulation will attempt to provide a vehicle for the agent. """ self._check_valid() # TODO: check that agent_id isn't already used... if self._trap_manager.add_trap_for_agent( agent_id, mission, self.road_map, self.elapsed_sim_time ): self._agent_manager.add_ego_agent(agent_id, agent_interface) else: self._log.warning( f"Unable to add entry trap for new agent '{agent_id}' with mission." )
[docs] def add_agent_and_switch_control( self, vehicle_id: str, agent_id: str, agent_interface: AgentInterface, mission: NavigationMission, ) -> Vehicle: """Add the new specified ego agent and then take over control of the specified vehicle.""" self._check_valid() self.agent_manager.add_ego_agent(agent_id, agent_interface, for_trap=False) vehicle = self.switch_control_to_agent( vehicle_id, agent_id, mission, recreate=False, is_hijacked=True ) self.create_vehicle_in_providers(vehicle, agent_id, True) return vehicle
[docs] def switch_control_to_agent( self, vehicle_id: str, agent_id: str, mission: NavigationMission, recreate: bool, is_hijacked: bool, ) -> Vehicle: """Give control of the specified vehicle to the given agent. It is not possible to take over a vehicle already controlled by another agent. """ self._check_valid() assert not self.vehicle_index.vehicle_is_hijacked( vehicle_id ), f"Vehicle has already been hijacked: {vehicle_id}" assert not vehicle_id in self.vehicle_index.agent_vehicle_ids(), ( f"`{agent_id}` can't hijack vehicle that is already controlled by an agent" f" `{self.agent_manager.agent_for_vehicle(vehicle_id)}`: {vehicle_id}" ) # Switch control to agent plan = Plan(self.road_map, mission) interface = self.agent_manager.agent_interface_for_agent_id(agent_id) self.vehicle_index.start_agent_observation( self, vehicle_id, agent_id, interface, plan, initialize_sensors=not recreate, ) vehicle = self.vehicle_index.switch_control_to_agent( self, vehicle_id, agent_id, boid=False, recreate=recreate, hijacking=is_hijacked, agent_interface=interface, ) return vehicle
[docs] def provider_for_actor(self, actor_id: str) -> Optional[Provider]: for provider in self.providers: if provider.manages_actor(actor_id): return provider return None
def _remove_vehicle_from_providers(self, vehicle_id: str): for provider in self.providers: provider.remove_actor(vehicle_id)
[docs] def create_vehicle_in_providers( self, vehicle: Vehicle, agent_id: str, is_ego: bool = False, ): """Notify providers of the existence of an agent-controlled vehicle, one of which should assume management of it.""" self._check_valid() prev_provider: Optional[Provider] = self.provider_for_actor(vehicle.id) self._stop_managing_with_providers(vehicle.id) role = ActorRole.EgoAgent if is_ego else ActorRole.SocialAgent interface = self.agent_manager.agent_interface_for_agent_id(agent_id) for provider in self.providers: if interface.action in provider.actions: state = VehicleState( actor_id=vehicle.id, source=provider.source_str, role=role, vehicle_config_type="passenger", pose=vehicle.pose, dimensions=vehicle.chassis.dimensions, speed=vehicle.speed, ) if provider.can_accept_actor(state): # just use the first provider we find that accepts it # (note that the vehicle will already have a mission plan # registered for it in its sensor state in the vehicle_index.) provider.add_actor(state, prev_provider) return # there should always be an AgentsProvider present, so we just assert here assert ( False ), f"could not find a provider to accept vehicle {vehicle.id} for agent {agent_id} with role={role.name}"
[docs] def vehicle_exited_bubble( self, vehicle_id: str, agent_id: str, teardown_agent: bool ): """Bubbles call this when a vehicle is exiting the bubble. Will try to find a new provider for the vehicle if necessary.""" original_agent_id = agent_id agent_id = None # FIXME: This only gets the first shadow agent and this shadow agent is not specific to a bubble!!!!!! shadow_agent_id = self._vehicle_index.shadower_id_from_vehicle_id(vehicle_id) if shadow_agent_id is not None: assert original_agent_id == shadow_agent_id if self._vehicle_index.vehicle_is_hijacked(vehicle_id): agent_id = self._vehicle_index.owner_id_from_vehicle_id(vehicle_id) assert agent_id == original_agent_id self._log.debug( "agent=%s relinquishing vehicle=%s (shadow_agent=%s)", agent_id, vehicle_id, shadow_agent_id, ) state, route = self._vehicle_index.relinquish_agent_control( self, vehicle_id, self.road_map ) new_prov = self._agent_releases_actor(agent_id, state, teardown_agent) if ( route is not None and route.road_length > 0 and isinstance(new_prov, TrafficProvider) ): new_prov.update_route_for_vehicle(vehicle_id, route) if shadow_agent_id: self._log.debug( "shadow_agent=%s will stop shadowing vehicle=%s", shadow_agent_id, vehicle_id, ) self._vehicle_index.stop_shadowing(shadow_agent_id, vehicle_id) if teardown_agent: self.teardown_social_agents([shadow_agent_id]) if self._vehicle_index.shadower_id_from_vehicle_id(vehicle_id) is None: self._sensor_manager.remove_actor_sensors_by_actor_id(vehicle_id)
def _agent_releases_actor( self, agent_id: str, state: ActorState, teardown_agent: bool, ) -> Optional[Provider]: """Find a new provider for an actor previously managed by an agent. Returns the new provider or None if a suitable one could not be found.""" current_provider = self.provider_for_actor(state.actor_id) new_provider = self.provider_releases_actor(current_provider, state) if teardown_agent: self.teardown_social_agents([agent_id]) return new_provider
[docs] def provider_relinquishing_actor( self, current_provider: Optional[Provider], state: ActorState ) -> Tuple[Optional[Provider], ActorProviderTransition]: # now try to find one who will take it... if isinstance(state, VehicleState): state.role = ActorRole.Social # XXX ASSUMPTION: might use Unknown instead? new_provider = None for provider in self.providers: if provider is current_provider: continue if provider.can_accept_actor(state): # Here we just use the first provider we find that accepts it. # If we want to give preference to, say, Sumo over SMARTS traffic, # then we should ensure that Sumo comes first in the traffic_sims # list we pass to SMARTS __init__(). new_provider = provider break actor_transition = ActorProviderTransition() object.__setattr__(actor_transition, "current_provider", current_provider) object.__setattr__(actor_transition, "actor_state", state) return new_provider, actor_transition
[docs] def provider_removing_actor(self, provider: Optional[Provider], actor_id: str): # Note: for vehicles, pybullet_provider_sync() will also call teardown # when it notices a social vehicle has exited the simulation. self._teardown_vehicles([actor_id])
def _setup_bullet_client(self, client: bc.BulletClient): client.resetSimulation() client.configureDebugVisualizer( pybullet.COV_ENABLE_GUI, 0 ) # pylint: disable=no-member max_pybullet_freq = config()( "physics", "max_pybullet_freq", default=MAX_PYBULLET_FREQ, cast=int ) # PyBullet defaults the timestep to 240Hz. Several parameters are tuned with # this value in mind. For example the number of solver iterations and the error # reduction parameters (erp) for contact, friction and non-contact joints. # Attempting to get around this we set the number of substeps so that # timestep * substeps = 240Hz. Bullet (C++) does something to this effect as # well (https://git.io/Jvf0M), but PyBullet does not expose it. # But if our timestep is variable (due to being externally driven) # then we will step pybullet multiple times ourselves as necessary # to account for the time delta on each SMARTS step. self._pybullet_period = ( self._fixed_timestep_sec if self._fixed_timestep_sec else 1 / max_pybullet_freq ) client.setPhysicsEngineParameter( fixedTimeStep=self._pybullet_period, numSubSteps=int(self._pybullet_period * max_pybullet_freq), numSolverIterations=10, solverResidualThreshold=0.001, # warmStartingFactor=0.99 ) client.setGravity(0, 0, -9.8) self._map_bb = None self._setup_pybullet_ground_plane(client) def _setup_pybullet_ground_plane(self, client: bc.BulletClient): plane_path = self._scenario.plane_filepath if not os.path.exists(plane_path): with pkg_resources.path(assets, "plane.urdf") as path: plane_path = str(path.absolute()) if not self._map_bb: self._map_bb = self.road_map.bounding_box if self._map_bb: # 1e6 is the default value for plane length and width in smarts/assets/plane.urdf. DEFAULT_PLANE_DIM = 1e6 ground_plane_scale = ( 2.2 * max(self._map_bb.length, self._map_bb.width) / DEFAULT_PLANE_DIM ) ground_plane_center = self._map_bb.center else: # first step on undefined map, just use a big scale (1e6). # it should get updated as soon as vehicles are added... ground_plane_scale = 1.0 ground_plane_center = (0, 0, 0) if self._ground_bullet_id is not None: client.removeBody(self._ground_bullet_id) self._ground_bullet_id = None self._ground_bullet_id = client.loadURDF( plane_path, useFixedBase=True, basePosition=ground_plane_center, globalScaling=ground_plane_scale, )
[docs] def teardown(self): """Clean up episode resources.""" if self._agent_manager is not None: self._agent_manager.teardown() if self._vehicle_index is not None: self._vehicle_index.teardown(self.renderer_ref) if self._sensor_manager is not None: self._sensor_manager.teardown(self.renderer_ref) if self._bullet_client is not None: self._bullet_client.resetSimulation() if self._renderer is not None: self._renderer.teardown() self._teardown_providers() if self._bubble_manager is not None: self._bubble_manager.teardown() self._bubble_manager = None for actor_capture_manager in self._actor_capture_managers: actor_capture_manager.teardown() if self._trap_manager is not None: self._trap_manager = None self._ground_bullet_id = None self._is_setup = False
[docs] def destroy(self): """Destroy the simulation. Cleans up all remaining simulation resources.""" if self._is_destroyed: return self.teardown() if self._envision: self._envision.teardown() if self._visdom: self._visdom.teardown() if self._agent_manager is not None: self._agent_manager.destroy() self._agent_manager = None if self._vehicle_index is not None: self._vehicle_index = None for traffic_sim in self._provider_suite.get_all_by_type(TrafficProvider): if traffic_sim: traffic_sim.destroy() self._provider_suite.clear_type(TrafficProvider) if self._renderer is not None: self._renderer.destroy() self._renderer = None if self._bullet_client is not None: self._bullet_client.disconnect() self._bullet_client = None self._is_destroyed = True
def _check_valid(self): if self._is_destroyed: raise SMARTSDestroyedError( "The current SMARTS instance has already been destroyed." ) def __del__(self): try: self.destroy() except (TypeError, AttributeError) as e: # This is a print statement because the logging module may be deleted at program exit. try: exception = SMARTSDestroyedError( "ERROR: A SMARTS instance may have been deleted by gc before a call to destroy." " Please explicitly call `del obj` or `SMARTS.destroy()` to make this error" " go away.", e, ) except (TypeError, KeyboardInterrupt): return raise exception def _teardown_vehicles(self, vehicle_ids): self._vehicle_index.teardown_vehicles_by_vehicle_ids( vehicle_ids, self.renderer_ref ) self._clear_collisions(vehicle_ids) for v_id in vehicle_ids: self._remove_vehicle_from_providers(v_id)
[docs] def attach_sensors_to_vehicles( self, agent_interface: AgentInterface, vehicle_ids, overwrite_sensors=False, reset_sensors=False, ): """Set the specified vehicles with the sensors needed to satisfy the specified agent interface. See :attr:`smarts.core.smarts.SMARTS.prepare_observe_from`. Args: agent_interface (AgentInterface): The minimum interface generating the observations. vehicle_ids (Sequence[str]): The vehicles to target. overwrite_sensors (bool, optional): If to replace existing sensors (USE CAREFULLY). Defaults to False. reset_sensors (bool, optional): If to remove all existing sensors before adding sensors (USE **VERY** CAREFULLY). Defaults to False. """ self.prepare_observe_from( vehicle_ids, agent_interface, overwrite_sensors, reset_sensors )
[docs] def prepare_observe_from( self, vehicle_ids: Sequence[str], interface: AgentInterface, overwrite_sensors, reset_sensors, ): """Assigns the given vehicle sensors as is described by the agent interface. Args: vehicle_ids (Sequence[str]): The vehicles to target. interface (AgentInterface): The minimum interface generating the observations. overwrite_sensors (bool, optional): If to replace existing sensors (USE CAREFULLY). reset_sensors (bool, optional): If to remove all existing sensors (USE **VERY** CAREFULLY). """ self._check_valid() for v_id in vehicle_ids: v = self._vehicle_index.vehicle_by_id(v_id) Vehicle.attach_sensors_to_vehicle( self.sensor_manager, self, v, interface, replace=overwrite_sensors, reset_sensors=reset_sensors, )
[docs] def observe_from( self, vehicle_ids: Sequence[str], interface: AgentInterface, ) -> Tuple[Dict[str, Observation], Dict[str, bool]]: """Generate observations from the specified vehicles. Args: vehicle_ids (Sequence[str]): The vehicles to target. interface (AgentInterface): The intended interface generating the observations (this may be ignored.) Returns: Tuple[Dict[str, Observation], Dict[str, bool]]: A dictionary of observations and the hypothetical dones. """ self._check_valid() vehicles: Dict[str, Vehicle] = { v_id: self.vehicle_index.vehicle_by_id(v_id) for v_id in vehicle_ids } sensor_states = { vehicle.id: self._sensor_manager.sensor_state_for_actor_id(vehicle.id) or SensorState.invalid() for vehicle in vehicles.values() } return self.sensor_manager.observe_batch( self.cached_frame, self.local_constants, interface, sensor_states, vehicles, self.renderer_ref, self.bc, )
@property def renderer(self) -> RendererBase: """The renderer singleton. On call, the sim will attempt to create it if it does not exist.""" if not self._renderer: try: from smarts.core.renderer_base import DEBUG_MODE from smarts.p3d.renderer import Renderer self._renderer = Renderer(self._sim_id, debug_mode=DEBUG_MODE.ERROR) except ImportError as exc: raise RendererException.required_to("use camera observations") from exc except Exception as exc: self._renderer = None raise RendererException("Unable to create renderer.") from exc if not self._renderer.is_setup: if self._scenario: self._renderer.setup(self._scenario) self._vehicle_index.begin_rendering_vehicles(self._renderer) assert self._renderer is not None return self._renderer @property def renderer_ref(self) -> Optional[RendererBase]: """Get the reference of the renderer. This can be `None`.""" return self._renderer @property def is_rendering(self) -> bool: """If the simulation has image rendering active.""" return self._renderer is not None @property def road_stiffness(self) -> Any: """The stiffness of the road.""" return self._bullet_client.getDynamicsInfo(self._ground_bullet_id, -1)[9] @property def dynamic_action_spaces(self) -> Set[ActionSpaceType]: """The set of vehicle action spaces that use dynamics (physics).""" return self._agent_physics_provider.actions @property def traffic_sims(self) -> List[TrafficProvider]: """The underlying traffic simulations.""" return self._provider_suite.get_all_by_type(TrafficProvider) @property def traffic_history_provider(self) -> TrafficHistoryProvider: """The source of any traffic history data.""" return self._traffic_history_provider @property def road_map(self) -> RoadMap: """The road map api which allows lookup of road features.""" return self.scenario.road_map @property def external_provider(self) -> ExternalProvider: """The external provider that can be used to inject vehicle states directly.""" return self._external_provider @property def bc(self): """The bullet physics client instance.""" return self._bullet_client @property def envision(self) -> Optional[EnvisionClient]: """The envision instance""" return self._envision @property def step_count(self) -> int: """The number of steps since the last reset.""" return self._step_count @property def elapsed_sim_time(self) -> float: """Elapsed time since simulation start.""" return self._elapsed_sim_time @property def version(self) -> str: """SMARTS version.""" return VERSION
[docs] def teardown_social_agents(self, agent_ids: Iterable[str]): """Tear-down agents in the given sequence. Args: agent_ids: A sequence of agent ids to terminate and release. """ agents_to_teardown = { id_ for id_ in agent_ids if not self.agent_manager.is_boid_agent(id_) or self.agent_manager.is_boid_done(id_) } self.agent_manager.teardown_social_agents(filter_ids=agents_to_teardown)
[docs] def teardown_social_agents_without_actors(self, agent_ids: Iterable[str]): """Tear-down agents in the given list that have no actors registered as controlled-by or shadowed-by (for each given agent.) Args: agent_ids: Sequence of agent ids """ self._check_valid() original_agents = set(agent_ids) agents_to_teardown = { agent_id for agent_id in original_agents # Only clean-up when there is no actor association left if len( self._vehicle_index.vehicles_by_owner_id( agent_id, include_shadowers=True ) ) == 0 } if self._log.isEnabledFor(logging.WARNING): skipped_agents = original_agents - agents_to_teardown if len(skipped_agents) > 0: self._log.warning( "Some agents were skipped because they still had vehicles: %s", skipped_agents, ) self.teardown_social_agents(agent_ids=agents_to_teardown)
def _teardown_vehicles_and_agents(self, vehicle_ids): shadow_and_controlling_agents = set() for vehicle_id in vehicle_ids: agent_id = self._vehicle_index.owner_id_from_vehicle_id(vehicle_id) if agent_id: shadow_and_controlling_agents.add(agent_id) shadow_agent_id = self._vehicle_index.shadower_id_from_vehicle_id( vehicle_id ) if shadow_agent_id: shadow_and_controlling_agents.add(shadow_agent_id) self._vehicle_index.teardown_vehicles_by_vehicle_ids( vehicle_ids, self.renderer_ref ) self.teardown_social_agents_without_actors(shadow_and_controlling_agents) # XXX: don't remove vehicle from its (traffic) Provider here, as it may be being teleported # (and needs to remain registered in Traci during this step). def _pybullet_provider_sync(self, provider_state: ProviderState): current_actor_ids = {v.actor_id for v in provider_state.actors} previous_sv_ids = self._vehicle_index.social_vehicle_ids() exited_actors = previous_sv_ids - current_actor_ids self._teardown_vehicles_and_agents(exited_actors) # Update our pybullet world given this provider state dt = provider_state.dt or self._last_dt for vehicle in provider_state.actors: if not isinstance(vehicle, VehicleState): continue vehicle_id = vehicle.actor_id # Either this is an agent vehicle (ego or social), or it is a social vehicle (traffic). # If it's controlled by an agent, then its state will have been updated # via perform_agent_actions() already (through an appropriate controller). # So here, we just deal with social (traffic) vehicles... if vehicle_id not in self._vehicle_index.agent_vehicle_ids(): if vehicle_id in self._vehicle_index.social_vehicle_ids(): social_vehicle = self._vehicle_index.vehicle_by_id(vehicle_id) else: # It is a new social vehicle we have not seen yet. # Create it's avatar. social_vehicle = self._vehicle_index.build_social_vehicle( sim=self, vehicle_state=vehicle, owner_id="", vehicle_id=vehicle_id, ) if not vehicle.updated: # Note: update_state() happens *after* pybullet has been stepped. social_vehicle.update_state(vehicle, dt=dt) def _step_pybullet(self): self._bullet_client.stepSimulation() pybullet_substeps = max(1, round(self._last_dt / self._pybullet_period)) - 1 for _ in range(pybullet_substeps): for vehicle in self._vehicle_index.vehicles: vehicle.chassis.reapply_last_control() self._bullet_client.stepSimulation() for vehicle in self._vehicle_index.vehicles: vehicle.step(self._elapsed_sim_time) @property def vehicle_index(self) -> VehicleIndex: """The vehicle index for direct vehicle manipulation.""" return self._vehicle_index @property def agent_manager(self) -> AgentManager: """The agent manager for direct agent manipulation.""" return self._agent_manager @property def sensor_manager(self) -> SensorManager: """The sensor manager for direct manipulation.""" return self._sensor_manager @property def providers(self) -> List[Provider]: """The current providers controlling actors within the simulation.""" return self._provider_suite.instances
[docs] def get_provider_by_type(self, requested_type) -> Optional[Provider]: """Get The first provider that matches the requested type.""" self._check_valid() for provider in self.providers: if isinstance(provider, requested_type): return provider return None
[docs] def get_provider_by_id(self, requested_id) -> Optional[Provider]: """Find the current provider by its identifier. Args: requested_id (str): The provider identifier. Returns: Optional[Provider]: The provider if it exists, otherwise `None`. """ self._check_valid() for provider in self.providers: if provider.provider_id() == requested_id: return provider return None
def _setup_providers(self, scenario) -> ProviderState: provider_state = ProviderState() for provider in self.providers: try: new_provider_state = provider.setup(scenario) except Exception as provider_error: new_provider_state = self._handle_provider(provider, provider_error) provider_state.merge(new_provider_state) return provider_state def _teardown_providers(self): for provider in self.providers: provider.teardown() self._last_provider_state = None def _harmonize_providers(self, provider_state: ProviderState): for provider in self.providers: try: provider.sync(provider_state) except Exception as provider_error: self._handle_provider(provider, provider_error) self._pybullet_provider_sync(provider_state) def _reset_providers(self): for provider in self.providers: try: provider.reset() except Exception as provider_error: self._handle_provider(provider, provider_error) def _handle_provider( self, provider: Provider, provider_error ) -> Optional[ProviderState]: provider_problem = bool(provider_error or not provider.connected) if not provider_problem: return None recovery_flags = provider.recovery_flags recovered = False provider_state = None if recovery_flags & ProviderRecoveryFlags.ATTEMPT_RECOVERY: provider_state, recovered = provider.recover( self._scenario, self.elapsed_sim_time, provider_error ) provider_state = provider_state or ProviderState() if recovered: return provider_state if recovery_flags & ProviderRecoveryFlags.RELINQUISH_ACTORS: # see if any other provider can take control of its actors... self._log.warning( "attempting to transfer actors from {provider.source_str} to other providers..." ) for actor in provider_state.actors: self.provider_releases_actor(provider, actor) if recovery_flags & ProviderRecoveryFlags.EPISODE_REQUIRED: self._reset_required = True if self._resetting: self._log.error( f"`Provider {provider.source_str} has crashed during reset`" ) raise provider_error return provider_state elif recovery_flags & ProviderRecoveryFlags.EXPERIMENT_REQUIRED: raise provider_error # default to re-raise error raise provider_error def _provider_actions( self, provider: Provider, actions: Dict[str, Any] ) -> Tuple[Dict[str, Any], Dict[str, Any]]: agent_actions = dict() vehicle_actions = dict() for agent_id, action in actions.items(): # TAI: reconsider include_shadowers = True vehicles = self._vehicle_index.vehicles_by_owner_id( agent_id, include_shadowers=True ) if not vehicles: continue interface = self._agent_manager.agent_interface_for_agent_id(agent_id) assert interface, f"agent {agent_id} has no interface" if interface.action not in provider.actions: continue assert isinstance(provider, AgentsProvider) agent_actions[agent_id] = action if self._agent_manager.is_boid_agent(agent_id): vehicle_ids = [v.id for v in vehicles] for vehicle_id, vehicle_action in action.items(): assert vehicle_id in vehicle_ids vehicle_actions[vehicle_id] = vehicle_action else: assert len(vehicles) == 1 vehicle_actions[vehicles[0].id] = action return agent_actions, vehicle_actions def _step_providers(self, actions) -> ProviderState: provider_vehicle_actions = dict() for provider in self.providers: with timeit( f"Performing actions on {provider.__class__.__name__}", self._log.debug ): agent_actions, vehicle_actions = self._provider_actions( provider, actions ) provider_vehicle_actions[provider] = vehicle_actions if isinstance(provider, AgentsProvider): provider.perform_agent_actions(agent_actions) with timeit("Stepping physics", self._log.debug): self._check_ground_plane() self._step_pybullet() self._process_collisions() accumulated_provider_state = ProviderState() agent_vehicle_ids = self._vehicle_index.agent_vehicle_ids() for provider in self.providers: with timeit(f"Stepping {provider.__class__.__name__}", self._log.debug): try: provider_state = provider.step( provider_vehicle_actions[provider], self._last_dt, self._elapsed_sim_time, ) except Exception as provider_error: provider_state = self._handle_provider(provider, provider_error) raise # by this point, "stop_managing()" should have been called for the hijacked vehicle on all TrafficProviders assert not isinstance( provider, TrafficProvider ) or not provider_state.intersects( agent_vehicle_ids ), f"{agent_vehicle_ids} in {provider_state.actors}" accumulated_provider_state.merge(provider_state) with timeit("Harmonizing provider state", self._log.debug): self._harmonize_providers(accumulated_provider_state) return accumulated_provider_state def _sync_smarts_and_provider_actor_states( self, external_provider_state: ProviderState ): self._last_provider_state = external_provider_state self._vehicle_states = [v.state for v in self._vehicle_index.vehicles] self._last_provider_state.replace_actor_type(self._vehicle_states, VehicleState) @property def should_reset(self): """If the simulation requires a reset.""" return self._reset_required @property def resetting(self) -> bool: """If the simulation is currently resetting""" return self._resetting @property def scenario(self) -> Scenario: """The current simulation scenario.""" return self._scenario @property def fixed_timestep_sec(self) -> float: """The simulation fixed time-step.""" # May be None if time deltas are externally driven return self._fixed_timestep_sec @fixed_timestep_sec.setter def fixed_timestep_sec(self, fixed_timestep_sec: float): if not fixed_timestep_sec: max_pybullet_freq = config()( "physics", "max_pybullet_freq", default=MAX_PYBULLET_FREQ, cast=int ) self._rounder = rounder_for_dt(round(1 / max_pybullet_freq, 6)) else: self._rounder = rounder_for_dt(fixed_timestep_sec) self._fixed_timestep_sec = fixed_timestep_sec self._is_setup = False # need to re-setup pybullet @property def last_dt(self) -> float: """The last delta time.""" assert not self._last_dt or self._last_dt > 0 return self._last_dt
[docs] def neighborhood_vehicles_around_vehicle( self, vehicle_id: str, radius: Optional[float] = None ) -> List[VehicleState]: """Find vehicles in the vicinity of the target vehicle.""" self._check_valid() vehicle = self._vehicle_index.vehicle_by_id(vehicle_id) return neighborhood_vehicles_around_vehicle( vehicle.state, self._vehicle_states, radius )
def _clear_collisions(self, vehicle_ids): for vehicle_id in vehicle_ids: self._vehicle_collisions.pop(vehicle_id, None) def _get_pybullet_collisions(self, vehicle_id: str) -> Set[str]: vehicle = self._vehicle_index.vehicle_by_id(vehicle_id) # We are only concerned with vehicle-vehicle collisions return { p.bullet_id for p in vehicle.chassis.contact_points if p.bullet_id != self._ground_bullet_id } def _process_collisions(self): self._vehicle_collisions = dict() for vehicle_id in self._vehicle_index.agent_vehicle_ids(): collidee_bullet_ids = self._get_pybullet_collisions(vehicle_id) if not collidee_bullet_ids: continue vehicle_collisions = self._vehicle_collisions.setdefault(vehicle_id, []) for bullet_id in collidee_bullet_ids: collidee = self._bullet_id_to_vehicle(bullet_id) owner_id = self._vehicle_index.owner_id_from_vehicle_id(collidee.id) collision = Collision( collidee_id=collidee.id, collidee_owner_id=owner_id ) vehicle_collisions.append(collision) traffic_providers = self._provider_suite.get_all_by_type(TrafficProvider)[:] for vehicle_id in self._vehicle_index.social_vehicle_ids(): for provider in traffic_providers: if provider.manages_actor(vehicle_id) and self._get_pybullet_collisions( vehicle_id ): provider.vehicle_collided(vehicle_id) def _bullet_id_to_vehicle(self, bullet_id): for vehicle in self._vehicle_index.vehicles: if bullet_id == vehicle.chassis.bullet_id: return vehicle assert ( False ), f"Only collisions with agent or social vehicles is supported, hit {bullet_id}" def _check_ground_plane(self): rescale_plane = False map_min = np.array(self._map_bb.min_pt)[:2] if self._map_bb else np.array([]) map_max = np.array(self._map_bb.max_pt)[:2] if self._map_bb else np.array([]) for vehicle_id in self._vehicle_index.agent_vehicle_ids(): vehicle = self._vehicle_index.vehicle_by_id(vehicle_id) map_spot = vehicle.pose.point.as_np_array[:2] if len(map_min) == 0: map_min = map_spot rescale_plane = True elif any(map_spot < map_min): map_min = np.minimum(map_spot, map_min) rescale_plane = True if len(map_max) == 0: map_max = map_spot rescale_plane = True elif any(map_spot > map_max): map_max = np.maximum(map_spot, map_max) rescale_plane = True if rescale_plane: MIN_DIM = 500.0 if map_max[0] - map_min[0] < MIN_DIM: map_min[0] -= MIN_DIM map_max[0] += MIN_DIM if map_max[1] - map_min[1] < MIN_DIM: map_min[1] -= MIN_DIM map_max[1] += MIN_DIM self._map_bb = BoundingBox(Point(*map_min), Point(*map_max)) self._log.info( f"rescaling pybullet ground plane to at least {map_min} and {map_max}" ) self._setup_pybullet_ground_plane(self._bullet_client) def _try_emit_envision_state(self, provider_state: ProviderState, obs, scores): if not self._envision: return filter = self._envision.envision_state_filter traffic = {} signals = dict() lane_ids = {} agent_vehicle_ids = self._vehicle_index.agent_vehicle_ids() vt_mapping = { "passenger": envision_types.VehicleType.Car, "bus": envision_types.VehicleType.Bus, "coach": envision_types.VehicleType.Coach, "truck": envision_types.VehicleType.Truck, "trailer": envision_types.VehicleType.Trailer, "motorcycle": envision_types.VehicleType.Motorcycle, "pedestrian": envision_types.VehicleType.Pedestrian, } for v in provider_state.actors: if isinstance(v, SignalState): env_ss = envision_types.SignalLightState.Unknown if v.state == SignalLightState.OFF: env_ss = envision_types.SignalLightState.Off elif v.state & SignalLightState.STOP: env_ss = envision_types.SignalLightState.Stop elif v.state & SignalLightState.CAUTION: env_ss = envision_types.SignalLightState.Caution elif v.state & SignalLightState.GO: env_ss = envision_types.SignalLightState.Go # TODO: eventually do flashing and arrow states too signals[v.actor_id] = envision_types.SignalState( v.actor_id, env_ss, tuple(v.stopping_pos.as_np_array) ) continue if not isinstance(v, VehicleState): continue if v.actor_id in agent_vehicle_ids: # this is an agent controlled vehicle agent_id = self._vehicle_index.owner_id_from_vehicle_id(v.actor_id) is_boid_agent = self._agent_manager.is_boid_agent(agent_id) agent_obs = obs[agent_id] vehicle_obs = agent_obs[v.actor_id] if is_boid_agent else agent_obs if ( filter.simulation_data_filter["lane_ids"].enabled and vehicle_obs.waypoint_paths and len(vehicle_obs.waypoint_paths[0]) > 0 ): lane_ids[agent_id] = vehicle_obs.waypoint_paths[0][0].lane_id if not filter.simulation_data_filter["traffic"].enabled: continue waypoint_paths = [] if ( filter.actor_data_filter["waypoint_paths"].enabled and vehicle_obs.waypoint_paths ): waypoint_paths = vehicle_obs.waypoint_paths road_waypoints = [] if ( filter.actor_data_filter["road_waypoints"].enabled and vehicle_obs.road_waypoints ): road_waypoints = [ path for paths in vehicle_obs.road_waypoints.lanes.values() for path in paths ] # (points, hits, rays), just want points point_cloud = ([], [], []) if filter.actor_data_filter["point_cloud"].enabled: point_cloud = (vehicle_obs.lidar_point_cloud or point_cloud)[0] # TODO: driven path should be read from vehicle_obs driven_path = [] if filter.actor_data_filter["driven_path"].enabled: driven_path = self._vehicle_index.vehicle_by_id( v.actor_id ).driven_path_sensor( filter.actor_data_filter["driven_path"].max_count ) mission_route_geometry = None if self._agent_manager.is_ego(agent_id): actor_type = envision_types.TrafficActorType.Agent if filter.actor_data_filter["mission_route_geometry"].enabled: mission_route_geometry = ( self._sensor_manager.sensor_state_for_actor_id(v.actor_id) .get_plan(self.road_map) .route.geometry ) else: actor_type = envision_types.TrafficActorType.SocialAgent traffic[v.actor_id] = envision_types.TrafficActorState( name=self._agent_manager.agent_name(agent_id), actor_type=actor_type, vehicle_type=envision_types.VehicleType.Car, position=tuple(v.pose.position), heading=float(v.pose.heading), speed=v.speed, actor_id=envision_types.format_actor_id( agent_id, v.actor_id, is_multi=is_boid_agent, ), events=vehicle_obs.events, waypoint_paths=waypoint_paths + road_waypoints, point_cloud=point_cloud, driven_path=driven_path, mission_route_geometry=mission_route_geometry, lane_id=lane_ids.get(agent_id), interest=self.cached_frame.actor_is_interest(v.actor_id), ) elif v.actor_id in self._vehicle_index.social_vehicle_ids(): # this is a social vehicle if filter.simulation_data_filter["traffic"].enabled: veh_type = vt_mapping.get( v.vehicle_config_type if v.vehicle_config_type else v.vehicle_type, envision_types.VehicleType.Car, ) traffic[v.actor_id] = envision_types.TrafficActorState( actor_id=v.actor_id, actor_type=envision_types.TrafficActorType.SocialVehicle, vehicle_type=veh_type, position=tuple(v.pose.position), heading=float(v.pose.heading), speed=v.speed, interest=self.cached_frame.actor_is_interest(v.actor_id), ) bubble_geometry = [] if filter.simulation_data_filter["bubble_geometry"].enabled: bubble_geometry = [ list(bubble.geometry.exterior.coords) for bubble in self._bubble_manager.active_bubbles ] scenario_folder_path = self.scenario._root scenario_name = os.path.split((scenario_folder_path).rstrip("/"))[1] assert ( scenario_name != "" ), f"Scenario name was not properly extracted from the scenario folder path: {scenario_folder_path}" state = envision_types.State( traffic=traffic, signals=signals, scenario_id=self.scenario.scenario_hash, scenario_name=scenario_name, bubbles=bubble_geometry, scores=scores, ego_agent_ids=list(self._agent_manager.ego_agent_ids), frame_time=self._rounder(self._elapsed_sim_time + self._total_sim_time), ) self._envision.send(state) def _try_emit_visdom_obs(self, obs): if not self._visdom: return self._visdom.send(obs) @cached_property def local_constants(self): """Generate the frozen state that should not change until next reset.""" self._check_valid() road_map, road_map_hash = self.scenario.map_spec.builder_fn( self.scenario.map_spec ) return SimulationLocalConstants( road_map=road_map, road_map_hash=road_map_hash, ) @cached_property def cached_frame(self) -> SimulationFrame: """Generate a frozen frame state of the simulation.""" self._check_valid() agent_actor_ids = self.vehicle_index.agent_vehicle_ids() actor_states = self._last_provider_state vehicles = dict(self.vehicle_index.vehicleitems()) vehicle_ids = set(vid for vid in vehicles) return SimulationFrame( actor_states=getattr(actor_states, "actors", {}), agent_interfaces=self.agent_manager.agent_interfaces.copy(), agent_vehicle_controls={ a_id: self.vehicle_index.owner_id_from_vehicle_id(a_id) for a_id in agent_actor_ids }, ego_ids=self.agent_manager.ego_agent_ids, pending_agent_ids=self.agent_manager.pending_agent_ids, elapsed_sim_time=self.elapsed_sim_time, fixed_timestep=self.fixed_timestep_sec, resetting=self.resetting, # road_map = self.road_map, map_spec=self.scenario.map_spec, last_dt=self.last_dt, last_provider_state=self._last_provider_state, step_count=self.step_count, vehicle_collisions=self._vehicle_collisions, vehicle_states={ vehicle_state.actor_id: vehicle_state for vehicle_state in self._vehicle_states }, vehicles_for_agents={ agent_id: self.vehicle_index.vehicle_ids_by_owner_id( agent_id, include_shadowers=True ) for agent_id in self.agent_manager.active_agents }, vehicle_ids=vehicle_ids, vehicle_sensors=self.sensor_manager.sensors_for_actor_ids(vehicle_ids), sensor_states=dict(self.sensor_manager.sensor_states_items()), _ground_bullet_id=self._ground_bullet_id, interest_filter=re.compile( self.scenario.metadata.get("actor_of_interest_re_filter", "") ), )