Source code for smarts.core.vehicle_index

# Copyright (C) 2020. Huawei Technologies Co., Ltd. All rights reserved.
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NON-INFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
"""
.. spelling:word-list::

    shadower
    shadowers
"""
from __future__ import annotations

import logging
from copy import copy, deepcopy
from functools import lru_cache
from io import StringIO
from pathlib import Path
from typing import (
    TYPE_CHECKING,
    Any,
    Dict,
    FrozenSet,
    Iterable,
    Iterator,
    List,
    NamedTuple,
    Optional,
    Sequence,
    Set,
    Tuple,
    Union,
)

import numpy as np
import tableprint as tp

from smarts.core import gen_id
from smarts.core.colors import SceneColors
from smarts.core.coordinates import Dimensions, Pose
from smarts.core.utils import resources
from smarts.core.utils.cache import cache, clear_cache
from smarts.core.utils.strings import truncate
from smarts.core.vehicle_state import VEHICLE_CONFIGS

from .actor import ActorRole
from .chassis import AckermannChassis, BoxChassis
from .controllers import ControllerState
from .road_map import RoadMap
from .sensors import SensorState
from .vehicle import Vehicle

if TYPE_CHECKING:
    from smarts.core import plan
    from smarts.core.agent_interface import AgentInterface
    from smarts.core.controllers.action_space_type import ActionSpaceType
    from smarts.core.renderer_base import RendererBase
    from smarts.core.smarts import SMARTS

    from .vehicle import VehicleState

VEHICLE_INDEX_ID_LENGTH = 128


def _2id(id_: str) -> bytes:
    separator = b"$"
    assert len(id_) <= VEHICLE_INDEX_ID_LENGTH - len(separator), id_

    if not isinstance(id_, bytes):
        id_ = bytes(id_.encode())

    assert isinstance(id_, bytes)
    return (separator + id_).zfill(VEHICLE_INDEX_ID_LENGTH - len(separator))


class _ControlEntity(NamedTuple):
    vehicle_id: bytes
    owner_id: bytes
    role: ActorRole
    shadower_id: bytes
    # Applies to shadower and owner
    # TODO: Consider moving this to an ActorRole field
    is_boid: bool
    is_hijacked: bool
    position: np.ndarray


# TODO: Consider wrapping the controlled_by recarry into a sep state object
#       VehicleIndex can perform operations on. Then we can do diffs of that
#       recarray with subset queries.
[docs]class VehicleIndex: """A vehicle management system that associates owners with vehicles.""" def __init__(self): self._log = logging.getLogger(self.__class__.__name__) self._controlled_by = VehicleIndex._build_empty_controlled_by() # Fixed-length ID to original ID # TODO: This quitely breaks if owner and vehicle IDs are the same. It assumes # global uniqueness. self._2id_to_id: Dict[bytes, str] = {} # {vehicle_id (fixed-length): <Vehicle>} self._vehicles: Dict[bytes, Vehicle] = {} # {vehicle_id (fixed-length): <ControllerState>} self._controller_states: Dict[bytes, ControllerState] = {} # Loaded from yaml file on scenario reset self._vehicle_definitions: resources.VehicleDefinitions = ( resources.VehicleDefinitions({}, "") )
[docs] @classmethod def identity(cls): """Returns an empty identity index.""" return cls()
def __sub__(self, other: VehicleIndex) -> VehicleIndex: vehicle_ids = set(self._controlled_by["vehicle_id"]) - set( other._controlled_by["vehicle_id"] ) vehicle_ids = [self._2id_to_id[id_] for id_ in vehicle_ids] return self._subset(vehicle_ids) def __and__(self, other: VehicleIndex) -> VehicleIndex: vehicle_ids = set(self._controlled_by["vehicle_id"]) & set( other._controlled_by["vehicle_id"] ) vehicle_ids = [self._2id_to_id[id_] for id_ in vehicle_ids] return self._subset(vehicle_ids) def _subset(self, vehicle_ids: Iterable[str]): assert self.vehicle_ids().issuperset( vehicle_ids ), f"{', '.join(list(self.vehicle_ids())[:3])}{', '.join(list(vehicle_ids)[:3])}" b_vehicle_ids = [_2id(id_) for id_ in vehicle_ids] index = VehicleIndex() indices = np.isin( self._controlled_by["vehicle_id"], b_vehicle_ids, assume_unique=True ) index._controlled_by = self._controlled_by[indices] index._2id_to_id = {id_: self._2id_to_id[id_] for id_ in b_vehicle_ids} index._vehicles = {id_: self._vehicles[id_] for id_ in b_vehicle_ids} index._controller_states = { id_: self._controller_states[id_] for id_ in b_vehicle_ids if id_ in self._controller_states } return index def __deepcopy__(self, memo) -> VehicleIndex: result = self.__class__.__new__(self.__class__) memo[id(self)] = result dict_ = copy(self.__dict__) shallow = ["_2id_to_id", "_vehicles", "_controller_states"] shallow += [c for c in dict_ if c.startswith("_cache_decorator_")] for k in shallow: v = dict_.pop(k) setattr(result, k, copy(v)) for k, v in dict_.items(): setattr(result, k, deepcopy(v, memo)) return result @cache def vehicle_ids(self) -> Set[str]: """A set of all unique vehicles ids in the index.""" vehicle_ids = self._controlled_by["vehicle_id"] return {self._2id_to_id[id_] for id_ in vehicle_ids} @cache def agent_vehicle_ids(self) -> Set[str]: """A set of vehicle ids associated with an agent.""" vehicle_ids = self._controlled_by[ (self._controlled_by["role"] == ActorRole.EgoAgent) | (self._controlled_by["role"] == ActorRole.SocialAgent) ]["vehicle_id"] return {self._2id_to_id[id_] for id_ in vehicle_ids} @cache def social_vehicle_ids( self, vehicle_types: Optional[FrozenSet[str]] = None ) -> Set[str]: """A set of vehicle ids associated with traffic vehicles.""" vehicle_ids = self._controlled_by[ self._controlled_by["role"] == ActorRole.Social ]["vehicle_id"] return { self._2id_to_id[id_] for id_ in vehicle_ids if not vehicle_types or self._vehicles[id_].vehicle_type in vehicle_types } @cache def vehicle_is_hijacked_or_shadowed(self, vehicle_id: str) -> Tuple[bool, bool]: """Determine if a vehicle is either taken over by an owner or watched.""" b_vehicle_id = _2id(vehicle_id) v_index = self._controlled_by["vehicle_id"] == b_vehicle_id if not np.any(v_index): return False, False vehicle = self._controlled_by[v_index] assert len(vehicle) == 1 vehicle = vehicle[0] return bool(vehicle["is_hijacked"]), bool(vehicle["shadower_id"]) @cache def vehicle_ids_by_owner_id( self, owner_id: str, include_shadowers: bool = False ) -> List[str]: """Returns all vehicles for the given owner ID as a list. This is most applicable when an agent is controlling multiple vehicles (e.g. with boids). """ b_owner_id = _2id(owner_id) v_index = self._controlled_by["owner_id"] == b_owner_id if include_shadowers: v_index = v_index | (self._controlled_by["shadower_id"] == b_owner_id) b_vehicle_ids = self._controlled_by[v_index]["vehicle_id"] return [self._2id_to_id[id_] for id_ in b_vehicle_ids] @cache def owner_id_from_vehicle_id(self, vehicle_id: str) -> Optional[str]: """Find the owner id associated with the given vehicle.""" b_vehicle_id = _2id(vehicle_id) owner_ids = self._controlled_by[ self._controlled_by["vehicle_id"] == b_vehicle_id ]["owner_id"] if owner_ids: return self._2id_to_id[owner_ids[0]] return None @cache def shadower_id_from_vehicle_id(self, vehicle_id: str) -> Optional[str]: """Find the first shadowing entity watching a vehicle.""" b_vehicle_id = _2id(vehicle_id) shadower_ids = self._controlled_by[ self._controlled_by["vehicle_id"] == b_vehicle_id ]["shadower_id"] if shadower_ids: return self._2id_to_id[shadower_ids[0]] return None @cache def shadower_ids(self) -> Set[str]: """Get all current shadowing entity IDs.""" return set( self._2id_to_id[sa_id] for sa_id in self._controlled_by["shadower_id"] if sa_id not in (b"", None) ) @cache def vehicle_position(self, vehicle_id: str): """Find the position of the given vehicle.""" b_vehicle_id = _2id(vehicle_id) positions = self._controlled_by[ self._controlled_by["vehicle_id"] == b_vehicle_id ]["position"] return positions[0] if len(positions) > 0 else None
[docs] def vehicles_by_owner_id(self, owner_id: str, include_shadowers: bool = False): """Find vehicles associated with the given owner id. Args: owner_id: The owner id to find all associated vehicle ids. include_shadowers: If to include vehicles that the owner is only watching. Returns: A list of associated vehicles. """ vehicle_ids = self.vehicle_ids_by_owner_id(owner_id, include_shadowers) return [self._vehicles[_2id(id_)] for id_ in vehicle_ids]
[docs] def vehicle_is_hijacked(self, vehicle_id: str) -> bool: """Determine if a vehicle is controlled by an owner.""" is_hijacked, _ = self.vehicle_is_hijacked_or_shadowed(vehicle_id) return is_hijacked
[docs] def vehicle_is_shadowed(self, vehicle_id: str) -> bool: """Determine if a vehicle is watched by an owner.""" _, is_shadowed = self.vehicle_is_hijacked_or_shadowed(vehicle_id) return is_shadowed
@property def vehicles(self) -> List[Vehicle]: """A list of all existing vehicles.""" return list(self._vehicles.values()) @cache def vehicleitems(self) -> Iterator[Tuple[str, Vehicle]]: """A list of all vehicle IDs paired with their vehicle.""" return map(lambda x: (self._2id_to_id[x[0]], x[1]), self._vehicles.items()) @cache def vehicle_by_id( self, vehicle_id: str, default: Optional[Union[Vehicle, Ellipsis.__class__]] = ..., ): """Get a vehicle by its id.""" b_vehicle_id = _2id(vehicle_id) if default is ...: return self._vehicles[b_vehicle_id] return self._vehicles.get(b_vehicle_id, default)
[docs] @clear_cache def teardown_vehicles_by_vehicle_ids( self, vehicle_ids: Sequence[str], renderer: Optional[RendererBase] ): """Terminate and remove a vehicle from the index using its id.""" self._log.debug("Tearing down vehicle ids: %s", vehicle_ids) b_vehicle_ids = [_2id(id_) for id_ in vehicle_ids] if len(b_vehicle_ids) == 0: return for b_vehicle_id in b_vehicle_ids: vehicle = self._vehicles.pop(b_vehicle_id, None) if vehicle is not None: vehicle.teardown(renderer=renderer) # popping since sensor_states/controller_states may not include the # vehicle if it's not being controlled by an agent self._controller_states.pop(b_vehicle_id, None) # TODO: This stores agents as well; those aren't being cleaned-up self._2id_to_id.pop(b_vehicle_id, None) remove_vehicle_indices = np.isin( self._controlled_by["vehicle_id"], b_vehicle_ids, assume_unique=True ) self._controlled_by = self._controlled_by[~remove_vehicle_indices]
[docs] def teardown_vehicles_by_owner_ids( self, owner_ids: Iterable[str], renderer: RendererBase, include_shadowing: bool = True, ) -> List[str]: """Terminate and remove all vehicles associated with an owner id.""" vehicle_ids: List[str] = [] for owner_id in owner_ids: vehicle_ids.extend( [v.id for v in self.vehicles_by_owner_id(owner_id, include_shadowing)] ) self.teardown_vehicles_by_vehicle_ids(vehicle_ids, renderer) return vehicle_ids
[docs] @clear_cache def sync(self): """Update the state of the index.""" for vehicle_id, vehicle in self._vehicles.items(): v_index = self._controlled_by["vehicle_id"] == vehicle_id entity = _ControlEntity(*self._controlled_by[v_index][0]) self._controlled_by[v_index] = tuple( entity._replace(position=vehicle.position) )
[docs] @clear_cache def teardown(self, renderer: RendererBase): """Clean up resources, resetting the index.""" self._controlled_by = VehicleIndex._build_empty_controlled_by() for vehicle in self._vehicles.values(): vehicle.teardown(renderer=renderer, exclude_chassis=True) self._vehicles = {} self._controller_states = {} self._2id_to_id = {}
[docs] @clear_cache def start_agent_observation( self, sim: SMARTS, vehicle_id: str, agent_id: str, agent_interface: AgentInterface, plan: "plan.Plan", boid: bool = False, initialize_sensors: bool = True, ): """Associate an agent to a vehicle. Set up any needed sensor requirements.""" b_vehicle_id, b_agent_id = _2id(vehicle_id), _2id(agent_id) self._2id_to_id[b_agent_id] = agent_id vehicle = self._vehicles[b_vehicle_id] sim.sensor_manager.add_sensor_state( vehicle.id, SensorState( agent_interface.max_episode_steps, plan_frame=plan.frame(), ), ) if initialize_sensors: Vehicle.attach_sensors_to_vehicle( sim.sensor_manager, sim, vehicle, agent_interface ) self._controller_states[b_vehicle_id] = ControllerState.from_action_space( agent_interface.action, vehicle.pose, sim ) v_index = self._controlled_by["vehicle_id"] == b_vehicle_id entity = _ControlEntity(*self._controlled_by[v_index][0]) self._controlled_by[v_index] = tuple( entity._replace(shadower_id=b_agent_id, is_boid=boid) ) # XXX: We are not giving the vehicle a chassis here but rather later # when we switch_to_agent_control. This means when control that requires # a chassis comes online, it needs to appropriately initialize # chassis-specific states, such as tire rotation. return vehicle
[docs] @clear_cache def switch_control_to_agent( self, sim: SMARTS, vehicle_id: str, agent_id: str, boid: bool = False, hijacking: bool = False, recreate: bool = False, agent_interface: Optional[AgentInterface] = None, ): """Give control of the specified vehicle to the specified agent. Args: sim: An instance of a SMARTS simulation. vehicle_id: The id of the vehicle to associate. agent_id: The id of the agent to associate. boid: If the agent is acting as a boid agent controlling multiple vehicles. hijacking: If the vehicle has been taken over from another controlling owner. recreate: If the vehicle should be destroyed and regenerated. agent_interface: The agent interface for sensor requirements. """ self._log.debug("Switching control of '%s' to '%s'", vehicle_id, agent_id) b_vehicle_id, b_agent_id = _2id(vehicle_id), _2id(agent_id) if recreate: # XXX: Recreate is presently broken for bubbles because it impacts the # sumo traffic sim sync(...) logic in how it detects a vehicle as # being hijacked vs joining. Presently it's still used for trapping. return self._switch_control_to_agent_recreate( sim, b_vehicle_id, b_agent_id, boid, hijacking ) vehicle = self._vehicles[b_vehicle_id] chassis = None if agent_interface and agent_interface.action in sim.dynamic_action_spaces: vehicle_definition = self._vehicle_definitions.load_vehicle_definition( agent_interface.vehicle_class ) chassis = AckermannChassis( pose=vehicle.pose, bullet_client=sim.bc, vehicle_dynamics_filepath=vehicle_definition.get("dynamics_model"), tire_parameters_filepath=vehicle_definition.get("tire_params"), friction_map=sim.scenario.surface_patches, controller_parameters=self._vehicle_definitions.controller_params_for_vehicle_class( agent_interface.vehicle_class ), chassis_parameters=self._vehicle_definitions.chassis_params_for_vehicle_class( agent_interface.vehicle_class ), initial_speed=vehicle.speed, ) else: chassis = BoxChassis( pose=vehicle.pose, speed=vehicle.speed, dimensions=vehicle.state.dimensions, bullet_client=sim.bc, ) vehicle.swap_chassis(chassis) v_index = self._controlled_by["vehicle_id"] == b_vehicle_id entity = _ControlEntity(*self._controlled_by[v_index][0]) role = ActorRole.SocialAgent if hijacking else ActorRole.EgoAgent self._controlled_by[v_index] = tuple( entity._replace( role=role, owner_id=b_agent_id, shadower_id=b"", is_boid=boid, is_hijacked=hijacking, ) ) return vehicle
[docs] @clear_cache def stop_shadowing(self, shadower_id: str, vehicle_id: Optional[str] = None): """Ends the shadowing by an a shadowing observer. Args: shadower_id (str): Removes this shadowing observer from all vehicles. vehicle_id (str, optional): If given this method removes shadowing from a specific vehicle. Defaults to None. """ shadower_id = _2id(shadower_id) v_index = self._controlled_by["shadower_id"] == shadower_id if vehicle_id: b_vehicle_id = _2id(vehicle_id) # This multiplication finds overlap of "shadower_id" and "vehicle_id" v_index = (self._controlled_by["vehicle_id"] == b_vehicle_id) * v_index for entity in self._controlled_by[v_index]: entity = _ControlEntity(*entity) self._controlled_by[v_index] = tuple(entity._replace(shadower_id=b""))
[docs] @clear_cache def stop_agent_observation(self, vehicle_id: str) -> Vehicle: """Strip all sensors from a vehicle and stop all owners from watching the vehicle.""" b_vehicle_id = _2id(vehicle_id) vehicle = self._vehicles[b_vehicle_id] v_index = self._controlled_by["vehicle_id"] == b_vehicle_id entity = self._controlled_by[v_index][0] entity = _ControlEntity(*entity) self._controlled_by[v_index] = tuple(entity._replace(shadower_id=b"")) return vehicle
[docs] @clear_cache def relinquish_agent_control( self, sim: SMARTS, vehicle_id: str, road_map: RoadMap ) -> Tuple[VehicleState, Optional[RoadMap.Route]]: """Give control of the vehicle back to its original controller.""" self._log.debug(f"Relinquishing agent control v_id={vehicle_id}") v_id = _2id(vehicle_id) ss = sim.sensor_manager.sensor_state_for_actor_id(vehicle_id) route = ss.get_plan(road_map).route if ss else None vehicle = self.stop_agent_observation(vehicle_id) # pytype: disable=attribute-error Vehicle.detach_all_sensors_from_vehicle(vehicle) # pytype: enable=attribute-error sim.sensor_manager.remove_actor_sensors_by_actor_id(vehicle_id) sim.sensor_manager.remove_sensor_state_by_actor_id(vehicle_id) vehicle = self._vehicles[v_id] box_chassis = BoxChassis( pose=vehicle.chassis.pose, speed=vehicle.chassis.speed, dimensions=vehicle.chassis.dimensions, bullet_client=sim.bc, ) vehicle.swap_chassis(box_chassis) v_index = self._controlled_by["vehicle_id"] == v_id entity = self._controlled_by[v_index][0] entity = _ControlEntity(*entity) self._controlled_by[v_index] = tuple( entity._replace( role=ActorRole.Social, owner_id=b"", shadower_id=b"", is_boid=False, is_hijacked=False, ) ) return vehicle.state, route
[docs] @clear_cache def attach_sensors_to_vehicle( self, sim: SMARTS, vehicle_id: str, agent_interface: AgentInterface, plan: "plan.Plan", ): """Attach sensors as per the agent interface requirements to the specified vehicle.""" b_vehicle_id = _2id(vehicle_id) vehicle = self._vehicles[b_vehicle_id] Vehicle.attach_sensors_to_vehicle( sim.sensor_manager, sim, vehicle, agent_interface, ) sim.sensor_manager.add_sensor_state( vehicle.id, SensorState( agent_interface.max_episode_steps, plan_frame=plan.frame(), ), ) self._controller_states[b_vehicle_id] = ControllerState.from_action_space( agent_interface.action, vehicle.pose, sim )
def _switch_control_to_agent_recreate( self, sim: SMARTS, b_vehicle_id: bytes, b_agent_id: bytes, boid: bool, hijacking: bool, ): # XXX: vehicle_id and agent_id are already fixed-length as this is an internal # method. agent_id = self._2id_to_id[b_agent_id] # TODO: There existed a SUMO connection error bug # (https://gitlab.smartsai.xyz/smarts/SMARTS/-/issues/671) that occurred # during lane changing when we hijacked/trapped a SUMO vehicle. Forcing # vehicle recreation seems to address the problem. Ideally we discover # the underlaying problem and can go back to the preferred implementation # of simply swapping control of a persistent vehicle. # Get the old state values from the shadowed vehicle agent_interface = sim.agent_manager.agent_interface_for_agent_id(agent_id) assert ( agent_interface is not None ), f"Missing agent_interface for agent_id={agent_id}" vehicle = self._vehicles[b_vehicle_id] sensor_state = sim.sensor_manager.sensor_state_for_actor_id(vehicle.id) assert sensor_state is not None controller_state = self._controller_states[b_vehicle_id] plan = sensor_state.get_plan(sim.road_map) vehicle_definition = self._vehicle_definitions.load_vehicle_definition( agent_interface.vehicle_class ) # Create a new vehicle to replace the old one new_vehicle = VehicleIndex._build_agent_vehicle( sim, vehicle.id, agent_interface.action, vehicle_definition.get("type"), agent_interface.vehicle_class, plan.mission, vehicle_definition.get("dynamics_model"), vehicle_definition.get("tire_params"), vehicle_definition.get("visual_model"), not hijacking, sim.scenario.surface_patches, ) # Apply the physical values from the old vehicle chassis to the new one new_vehicle.chassis.inherit_physical_values(vehicle.chassis) # Reserve space inside the traffic sim for traffic_sim in sim.traffic_sims: if traffic_sim.manages_actor(vehicle.id): traffic_sim.reserve_traffic_location_for_vehicle( vehicle.id, vehicle.chassis.to_polygon ) # Remove the old vehicle self.teardown_vehicles_by_vehicle_ids([vehicle.id], sim.renderer_ref) sim.sensor_manager.remove_actor_sensors_by_actor_id(vehicle.id) # HACK: Directly remove the vehicle from the traffic provider (should do this via the sim instead) for traffic_sim in sim.traffic_sims: if traffic_sim.manages_actor(vehicle.id): # TAI: we probably should call "remove_vehicle(vehicle.id)" here instead, # and then call "add_vehicle(new_vehicle.state)", but since # the old and new vehicle-id and state are supposed to be the same # we take this short-cut. traffic_sim.stop_managing(vehicle.id) # Take control of the new vehicle self._enfranchise_agent( sim, agent_id, agent_interface, new_vehicle, controller_state, sensor_state, boid, hijacking, ) return new_vehicle @classmethod def _build_agent_vehicle( cls, sim: SMARTS, vehicle_id: str, action: Optional[ActionSpaceType], vehicle_type: str, vehicle_class: str, mission: plan.NavigationMission, vehicle_dynamics_filepath: Optional[str], tire_filepath: str, visual_model_filepath: str, trainable: bool, surface_patches: Sequence[Dict[str, Any]], initial_speed: Optional[float] = None, ) -> Vehicle: """Create a new vehicle and set up sensors and planning information as required by the ego agent. """ chassis_dims = Vehicle.agent_vehicle_dims(mission, default=vehicle_type) start = mission.start if start.from_front_bumper: start_pose = Pose.from_front_bumper( front_bumper_position=np.array(start.position[:2]), heading=start.heading, length=chassis_dims.length, ) else: start_pose = Pose.from_center(start.position, start.heading) vehicle_color = SceneColors.Agent if trainable else SceneColors.SocialAgent controller_parameters = ( sim.vehicle_index._vehicle_definitions.controller_params_for_vehicle_class( vehicle_class ) ) chassis_parameters = ( sim.vehicle_index._vehicle_definitions.chassis_params_for_vehicle_class( vehicle_class ) ) chassis = None if action in sim.dynamic_action_spaces: if mission.vehicle_spec: logger = logging.getLogger(cls.__name__) logger.warning( "setting vehicle dimensions on a AckermannChassis not yet supported" ) chassis = AckermannChassis( pose=start_pose, bullet_client=sim.bc, vehicle_dynamics_filepath=vehicle_dynamics_filepath, tire_parameters_filepath=tire_filepath, friction_map=surface_patches, controller_parameters=controller_parameters, chassis_parameters=chassis_parameters, initial_speed=initial_speed, ) else: chassis = BoxChassis( pose=start_pose, speed=initial_speed, dimensions=chassis_dims, bullet_client=sim.bc, ) vehicle = Vehicle( id=vehicle_id, chassis=chassis, color=vehicle_color, vehicle_config_type=vehicle_type, vehicle_class=vehicle_class, visual_model_filepath=visual_model_filepath, ) return vehicle
[docs] def build_agent_vehicle( self, sim: SMARTS, agent_id: str, agent_interface: AgentInterface, plan: "plan.Plan", trainable: bool, initial_speed: Optional[float] = None, boid: bool = False, *, vehicle_id: Optional[str] = None, ) -> Vehicle: """Build an entirely new vehicle for an agent.""" vehicle_definition = self._vehicle_definitions.load_vehicle_definition( agent_interface.vehicle_class ) vehicle = VehicleIndex._build_agent_vehicle( sim=sim, vehicle_id=vehicle_id or agent_id, action=agent_interface.action, vehicle_type=vehicle_definition.get("type"), vehicle_class=agent_interface.vehicle_class, mission=plan.mission, vehicle_dynamics_filepath=vehicle_definition.get("dynamics_model"), tire_filepath=vehicle_definition.get("tire_params"), visual_model_filepath=vehicle_definition.get("visual_model"), trainable=trainable, surface_patches=sim.scenario.surface_patches, initial_speed=initial_speed, ) sensor_state = SensorState(agent_interface.max_episode_steps, plan.frame()) controller_state = ControllerState.from_action_space( agent_interface.action, vehicle.pose, sim ) self._enfranchise_agent( sim, agent_id, agent_interface, vehicle, controller_state, sensor_state, boid, hijacking=False, ) return vehicle
@clear_cache def _enfranchise_agent( self, sim: SMARTS, agent_id: str, agent_interface: AgentInterface, vehicle: Vehicle, controller_state: ControllerState, sensor_state: SensorState, boid: bool = False, hijacking: bool = False, ): # XXX: agent_id must be the original agent_id (not the fixed _2id(...)) Vehicle.attach_sensors_to_vehicle( sim.sensor_manager, sim, vehicle, agent_interface ) if sim.is_rendering: vehicle.create_renderer_node(sim.renderer_ref) sim.renderer.begin_rendering_vehicle(vehicle.id, is_agent=True) b_vehicle_id = _2id(vehicle.id) b_agent_id = _2id(agent_id) sim.sensor_manager.add_sensor_state(vehicle.id, sensor_state) self._controller_states[b_vehicle_id] = controller_state self._vehicles[b_vehicle_id] = vehicle self._2id_to_id[b_vehicle_id] = vehicle.id self._2id_to_id[b_agent_id] = agent_id role = ActorRole.SocialAgent if hijacking else ActorRole.EgoAgent entity = _ControlEntity( vehicle_id=b_vehicle_id, owner_id=b_agent_id, role=role, shadower_id=b"", is_boid=boid, is_hijacked=hijacking, position=vehicle.position, ) self._controlled_by = np.insert(self._controlled_by, 0, tuple(entity)) @staticmethod def _build_social_vehicle( sim: SMARTS, vehicle_id: str, vehicle_state: VehicleState ) -> Vehicle: """Create a new unassociated vehicle.""" dims = Dimensions.copy_with_defaults( vehicle_state.dimensions, VEHICLE_CONFIGS[vehicle_state.vehicle_config_type].dimensions, ) chassis = BoxChassis( pose=vehicle_state.pose, speed=vehicle_state.speed, dimensions=dims, bullet_client=sim.bc, ) return Vehicle( id=vehicle_id, chassis=chassis, vehicle_config_type=vehicle_state.vehicle_config_type, visual_model_filepath=None, )
[docs] @clear_cache def build_social_vehicle( self, sim: SMARTS, vehicle_state: VehicleState, owner_id: str, vehicle_id: Optional[str] = None, ) -> Vehicle: """Build an entirely new vehicle for a social agent.""" if vehicle_id is None: vehicle_id = gen_id() vehicle = VehicleIndex._build_social_vehicle( sim, vehicle_id, vehicle_state, ) b_vehicle_id, b_owner_id = _2id(vehicle_id), _2id(owner_id) if owner_id else b"" if sim.is_rendering: vehicle.create_renderer_node(sim.renderer_ref) sim.renderer.begin_rendering_vehicle(vehicle.id, is_agent=False) self._vehicles[b_vehicle_id] = vehicle self._2id_to_id[b_vehicle_id] = vehicle.id role = vehicle_state.role assert role not in ( ActorRole.EgoAgent, ActorRole.SocialAgent, ), f"role={role} from {vehicle_state.source}" entity = _ControlEntity( vehicle_id=b_vehicle_id, owner_id=b_owner_id, role=role, shadower_id=b"", is_boid=False, is_hijacked=False, position=np.asarray(vehicle.position), ) self._controlled_by = np.insert(self._controlled_by, 0, tuple(entity)) return vehicle
[docs] def begin_rendering_vehicles(self, renderer: RendererBase): """Render vehicles using the specified renderer.""" agent_vehicle_ids = self.agent_vehicle_ids() for vehicle in self._vehicles.values(): if vehicle.create_renderer_node(renderer): is_agent = vehicle.id in agent_vehicle_ids renderer.begin_rendering_vehicle(vehicle.id, is_agent)
@cache def controller_state_for_vehicle_id(self, vehicle_id: str) -> ControllerState: """Retrieve the controller state of the given vehicle.""" return self._controller_states[_2id(vehicle_id)]
[docs] def load_vehicle_definitions_list(self, vehicle_definitions_filepath: str): """Loads in a list of vehicle definitions.""" self._vehicle_definitions = resources.load_vehicle_definitions_list( vehicle_definitions_filepath )
@staticmethod def _build_empty_controlled_by(): return np.array( [], dtype=[ # E.g. [(<vehicle ID>, <owner ID>, <owner type>), ...] ("vehicle_id", f"|S{VEHICLE_INDEX_ID_LENGTH}"), ("owner_id", f"|S{VEHICLE_INDEX_ID_LENGTH}"), ("role", "B"), # XXX: Keeping things simple, this is currently assumed to be an agent. # We can add a shadower_role when needed ("shadower_id", f"|S{VEHICLE_INDEX_ID_LENGTH}"), ("is_boid", "B"), ("is_hijacked", "B"), ("position", np.float64, (3,)), ], ) def __repr__(self) -> str: io = StringIO("") n_columns = len(self._controlled_by.dtype.names) by = self._controlled_by.copy().astype( list(zip(self._controlled_by.dtype.names, ["O"] * n_columns)) ) by["position"] = [", ".join([f"{x:.2f}" for x in p]) for p in by["position"]] by["owner_id"] = [str(truncate(p, 20)) for p in by["owner_id"]] by["vehicle_id"] = [str(truncate(p, 20)) for p in by["vehicle_id"]] by["shadower_id"] = [str(truncate(p, 20)) for p in by["shadower_id"]] by["is_boid"] = [str(bool(x)) for x in by["is_boid"]] by["is_hijacked"] = [str(bool(x)) for x in by["is_hijacked"]] by["role"] = [str(ActorRole(x)).split(".")[-1] for x in by["role"]] # XXX: tableprint crashes when there's no data if by.size == 0: by = [[""] * n_columns] tp.table(by, self._controlled_by.dtype.names, style="round", out=io) return io.getvalue()