Source code for smarts.core.vehicle

# Copyright (C) 2020. Huawei Technologies Co., Ltd. All rights reserved.
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NON-INFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
from __future__ import annotations

import importlib.resources as pkg_resources
import logging
import warnings
from functools import lru_cache, partial
from typing import TYPE_CHECKING, Dict, List, Optional, Tuple, Union

import numpy as np

import smarts.assets.vehicles.visual_model as smarts_vehicle_visuals
from smarts.core.sensor import CustomRenderSensor, OcclusionMapSensor

from . import config
from .actor import ActorRole
from .chassis import AckermannChassis, BoxChassis, Chassis
from .colors import SceneColors
from .coordinates import Dimensions, Heading, Pose
from .sensor import (
    AccelerometerSensor,
    DrivableAreaGridMapSensor,
    DrivenPathSensor,
    LanePositionSensor,
    LidarSensor,
    NeighborhoodVehiclesSensor,
    OGMSensor,
    RGBSensor,
    RoadWaypointsSensor,
    Sensor,
    SignalsSensor,
    TripMeterSensor,
    ViaSensor,
    WaypointsSensor,
)
from .utils.core_math import rotate_cw_around_point
from .utils.custom_exceptions import RendererException
from .vehicle_state import VEHICLE_CONFIGS, VehicleState

if TYPE_CHECKING:
    from smarts.core import plan
    from smarts.core.agent_interface import AgentInterface
    from smarts.core.renderer_base import RendererBase
    from smarts.core.sensor_manager import SensorManager
    from smarts.core.smarts import SMARTS


[docs]class Vehicle: """Represents a single vehicle.""" _HAS_DYNAMIC_ATTRIBUTES = True # dynamic pytype attribute _sensor_names = ( "ogm_sensor", "rgb_sensor", "lidar_sensor", "driven_path_sensor", "trip_meter_sensor", "drivable_area_grid_map_sensor", "occlusion_map_sensor", "neighborhood_vehicle_states_sensor", "waypoints_sensor", "road_waypoints_sensor", "accelerometer_sensor", "lane_position_sensor", "via_sensor", "signals_sensor", ) + tuple( f"custom_render{i}_sensor" for i in range(config()("core", "max_custom_image_sensors", cast=int)) ) def __init__( self, id: str, chassis: Chassis, visual_model_filepath: Optional[str], vehicle_config_type: str = "sedan", vehicle_class: str = "generic_sedan", color: Optional[SceneColors] = None, action_space=None, ): self._log = logging.getLogger(self.__class__.__name__) self._id = id self._chassis: Chassis = chassis if vehicle_config_type == "sedan": vehicle_config_type = "passenger" self._vehicle_config_type = vehicle_config_type self._vehicle_class = vehicle_class self._action_space = action_space self._meta_create_sensor_functions() self._sensors = {} self._visual_model_path = visual_model_filepath if self._visual_model_path in {None, ""}: with pkg_resources.path( smarts_vehicle_visuals, VEHICLE_CONFIGS[self._vehicle_config_type].glb_model, ) as path: self._visual_model_path = path # Color override self._color: Optional[SceneColors] = color if self._color is None: self._color = SceneColors.SocialVehicle self._initialized = True self._has_stepped = False def _assert_initialized(self): assert self._initialized, f"Vehicle({self.id}) is not initialized" def __eq__(self, __o: object) -> bool: if self is __o: return True if isinstance(__o, self.__class__) and self.state == __o.state: return True return False def __repr__(self): return f"""Vehicle({self.id}, pose={self.pose}, speed={self.speed}, type={self.vehicle_type}, class={self.vehicle_class}, w={self.width}, l={self.length}, h={self.height} )""" @property def id(self): """The id of this vehicle.""" return self._id @property def length(self) -> float: """The length of this vehicle.""" self._assert_initialized() return self._chassis.dimensions.length @property def max_steering_wheel(self) -> Optional[float]: """The max steering value the chassis steering wheel can turn to. Some chassis types do not support this. """ self._assert_initialized() return getattr(self._chassis, "max_steering_wheel", None) @property def width(self) -> float: """The width of this vehicle.""" self._assert_initialized() return self._chassis.dimensions.width @property def height(self) -> float: """The height of this vehicle.""" self._assert_initialized() return self._chassis.dimensions.height @property def speed(self) -> float: """The current speed of this vehicle.""" self._assert_initialized() return self._chassis.speed @property def sensors(self) -> Dict[str, Sensor]: """The sensors attached to this vehicle.""" self._assert_initialized() return self._sensors # # TODO: See issue #898 This is a currently a no-op # @speed.setter # def speed(self, speed): # self._chassis.speed = speed @property def vehicle_color(self) -> Union[SceneColors, None]: """The color of this vehicle (generally used for rendering purposes.)""" self._assert_initialized() return self._color @property def state(self) -> VehicleState: """The current state of this vehicle.""" self._assert_initialized() return VehicleState( actor_id=self.id, actor_type=self.vehicle_type, source="SMARTS", # this is the "ground truth" state vehicle_config_type=self._vehicle_config_type, pose=self.pose, dimensions=self._chassis.dimensions, speed=self.speed, # pytype: disable=attribute-error steering=self._chassis.steering, # pytype: enable=attribute-error yaw_rate=self._chassis.yaw_rate, linear_velocity=self._chassis.velocity_vectors[0], angular_velocity=self._chassis.velocity_vectors[1], ) @property def action_space(self): """The action space this vehicle uses.""" self._assert_initialized() return self._action_space @property def pose(self) -> Pose: """The pose of this vehicle. Pose is defined as position and orientation.""" self._assert_initialized() return self._chassis.pose @property def chassis(self) -> Chassis: """The underlying chassis of this vehicle.""" self._assert_initialized() return self._chassis @property def heading(self) -> Heading: """The heading of this vehicle. Note: Heading rotates counterclockwise with north as 0. """ self._assert_initialized() return self._chassis.pose.heading @property def position(self) -> np.ndarray: """The position of this vehicle.""" self._assert_initialized() return self._chassis.pose.position @property def bounding_box(self) -> List[np.ndarray]: """The minimum fitting heading aligned bounding box. Four 2D points representing the minimum fitting box.""" # XXX: this doesn't return a smarts.core.coordinates.BoundingBox! self._assert_initialized() # Assuming the position is the center, # calculate the corner coordinates of the bounding_box origin = self.position[:2] dimensions = np.array([self.width, self.length]) corners = np.array([(-1, 1), (1, 1), (1, -1), (-1, -1)]) / 2 heading = self.heading return [ rotate_cw_around_point( point=origin + corner * dimensions, radians=Heading.flip_clockwise(heading), origin=origin, ) for corner in corners ] @property def vehicle_type(self) -> str: """Get the vehicle type name as recognized by SMARTS. (e.g. 'car')""" return VEHICLE_CONFIGS[self._vehicle_config_type].vehicle_type @property def vehicle_config_type(self) -> str: """Get the vehicle type identifier. (e.g. 'sedan')""" return self._vehicle_config_type @property def vehicle_class(self) -> str: """Get the custom class of vehicle this is. (e.g. 'ford_f150')""" return self._vehicle_class @property def valid(self) -> bool: """Check if the vehicle still `exists` and is still operable.""" return self._initialized @property def sensor_names(self) -> Tuple[str]: """The names of the sensors that are potentially available to this vehicle.""" return self._sensor_names
[docs] @staticmethod def agent_vehicle_dims( mission: "plan.NavigationMission", default: Optional[str] = None ) -> Dimensions: """Get the vehicle dimensions from the mission requirements. Args: A mission for the agent. Returns: The mission vehicle spec dimensions XOR the default "passenger" vehicle dimensions. """ if not default: default = config().get_setting("assets", "default_agent_vehicle") if default == "sedan": default = "passenger" default_type = default if mission.vehicle_spec: # mission.vehicle_spec.veh_config_type will always be "passenger" for now, # but we use that value here in case we ever expand our history functionality. vehicle_config_type = mission.vehicle_spec.veh_config_type return Dimensions.copy_with_defaults( mission.vehicle_spec.dimensions, VEHICLE_CONFIGS[vehicle_config_type or default_type].dimensions, ) return VEHICLE_CONFIGS[default_type].dimensions
[docs] @classmethod def attach_sensors_to_vehicle( cls, sensor_manager: SensorManager, sim: SMARTS, vehicle: Vehicle, agent_interface: AgentInterface, replace=True, reset_sensors=False, ): """Attach sensors as required to satisfy the agent interface's requirements""" # The distance travelled sensor is not optional b/c it is used for the score # and reward calculation vehicle_state = vehicle.state has_no_sensors = len(vehicle.sensors) == 0 added_sensors: List[Tuple[str, Sensor]] = [] if reset_sensors: sensor_manager.remove_actor_sensors_by_actor_id(vehicle.id) # pytype: disable=attribute-error Vehicle.detach_all_sensors_from_vehicle(vehicle) # pytype: enable=attribute-error def add_sensor_if_needed( sensor_type, sensor_name: str, condition: bool = True, **kwargs, ): assert ( sensor_name in cls._sensor_names ), f"{sensor_name}:{cls._sensor_names}" if ( replace or has_no_sensors or (condition and not vehicle.subscribed_to(sensor_name)) ): sensor = sensor_type(**kwargs) vehicle.attach_sensor(sensor, sensor_name) added_sensors.append((sensor_name, sensor)) # pytype: disable=attribute-error add_sensor_if_needed(TripMeterSensor, sensor_name="trip_meter_sensor") add_sensor_if_needed(DrivenPathSensor, sensor_name="driven_path_sensor") if agent_interface.neighborhood_vehicle_states: add_sensor_if_needed( NeighborhoodVehiclesSensor, sensor_name="neighborhood_vehicle_states_sensor", radius=agent_interface.neighborhood_vehicle_states.radius, ) add_sensor_if_needed( AccelerometerSensor, sensor_name="accelerometer_sensor", condition=agent_interface.accelerometer, ) add_sensor_if_needed( WaypointsSensor, sensor_name="waypoints_sensor", condition=agent_interface.waypoint_paths, ) if agent_interface.road_waypoints: add_sensor_if_needed( RoadWaypointsSensor, "road_waypoints_sensor", horizon=agent_interface.road_waypoints.horizon, ) add_sensor_if_needed( LanePositionSensor, "lane_position_sensor", condition=agent_interface.lane_positions, ) # DrivableAreaGridMapSensor if agent_interface.drivable_area_grid_map: if not sim.renderer_ref: raise RendererException.required_to("add a drivable_area_grid_map") add_sensor_if_needed( DrivableAreaGridMapSensor, "drivable_area_grid_map_sensor", True, # Always add this sensor vehicle_state=vehicle_state, width=agent_interface.drivable_area_grid_map.width, height=agent_interface.drivable_area_grid_map.height, resolution=agent_interface.drivable_area_grid_map.resolution, renderer=sim.renderer_ref, ) # OGMSensor if agent_interface.occupancy_grid_map: if not sim.renderer_ref: raise RendererException.required_to("add an OGM") add_sensor_if_needed( OGMSensor, "ogm_sensor", True, # Always add this sensor vehicle_state=vehicle_state, width=agent_interface.occupancy_grid_map.width, height=agent_interface.occupancy_grid_map.height, resolution=agent_interface.occupancy_grid_map.resolution, renderer=sim.renderer_ref, ) if agent_interface.occlusion_map: if not vehicle.subscribed_to("ogm_sensor"): warnings.warn( "Occupancy grid map sensor must be attached to use occlusion sensor.", category=UserWarning, ) else: add_sensor_if_needed( OcclusionMapSensor, "occlusion_map_sensor", vehicle_state=vehicle_state, width=agent_interface.occlusion_map.width, height=agent_interface.occlusion_map.height, resolution=agent_interface.occlusion_map.resolution, renderer=sim.renderer_ref, ogm_sensor=vehicle.sensor_property("ogm_sensor"), add_surface_noise=agent_interface.occlusion_map.surface_noise, ) # RGBSensor if agent_interface.top_down_rgb: if not sim.renderer_ref: raise RendererException.required_to("add an RGB camera") add_sensor_if_needed( RGBSensor, "rgb_sensor", True, # Always add this sensor vehicle_state=vehicle_state, width=agent_interface.top_down_rgb.width, height=agent_interface.top_down_rgb.height, resolution=agent_interface.top_down_rgb.resolution, renderer=sim.renderer_ref, ) if len(agent_interface.custom_renders): if not sim.renderer_ref: raise RendererException.required_to("add a fragment program.") for i, program in enumerate(agent_interface.custom_renders): add_sensor_if_needed( CustomRenderSensor, f"custom_render{i}_sensor", vehicle_state=vehicle_state, width=program.width, height=program.height, resolution=program.resolution, renderer=sim.renderer_ref, fragment_shader_path=program.fragment_shader_path, render_dependencies=program.dependencies, ogm_sensor=vehicle.sensor_property("ogm_sensor", None), top_down_rgb_sensor=vehicle.sensor_property("rgb_sensor", None), drivable_area_grid_map_sensor=vehicle.sensor_property( "drivable_area_grid_map_sensor", default=None ), occlusion_map_sensor=vehicle.sensor_property( "occlusion_map_sensor", default=None ), name=program.name, ) if agent_interface.lidar_point_cloud: add_sensor_if_needed( LidarSensor, "lidar_sensor", vehicle_state=vehicle_state, sensor_params=agent_interface.lidar_point_cloud.sensor_params, ) add_sensor_if_needed( ViaSensor, "via_sensor", True, lane_acquisition_range=80, speed_accuracy=1.5 ) if agent_interface.signals: add_sensor_if_needed( SignalsSensor, "signals_sensor", lookahead=agent_interface.signals.lookahead, ) # pytype: enable=attribute-error for sensor_name, sensor in added_sensors: if not sensor: continue sensor_manager.add_sensor_for_actor(vehicle.id, sensor_name, sensor)
[docs] def step(self, current_simulation_time: float): """Update internal state.""" self._has_stepped = True self._chassis.step(current_simulation_time)
[docs] def control(self, *args, **kwargs): """Apply control values to this vehicle. Forwards control to the chassis. """ self._chassis.control(*args, **kwargs)
[docs] def update_state(self, state: VehicleState, dt: float): """Update the vehicle's state""" state.updated = True if state.role != ActorRole.External: assert isinstance(self._chassis, BoxChassis) self.control(pose=state.pose, speed=state.speed, dt=dt) return # External actors are "privileged", which means they work directly (bypass force application). # Conceptually, this is playing 'god' with physics and should only be used # to defer to a co-simulator's states. linear_velocity, angular_velocity = None, None if not np.allclose( self._chassis.velocity_vectors[0], state.linear_velocity ) or not np.allclose(self._chassis.velocity_vectors[1], state.angular_velocity): linear_velocity = state.linear_velocity angular_velocity = state.angular_velocity if not state.dimensions.equal_if_defined(self.length, self.width, self.height): self._log.warning( "Unable to change a vehicle's dimensions via external_state_update()." ) # XXX: any way to update acceleration in pybullet? self._chassis.state_override(dt, state.pose, linear_velocity, angular_velocity)
[docs] def create_renderer_node(self, renderer: RendererBase): """Create the vehicle's rendering node in the renderer.""" return renderer.create_vehicle_node( self._visual_model_path, self._id, self.vehicle_color, self.pose )
# @lru_cache(maxsize=1) def _warn_AckermannChassis_set_pose(self): if self._has_stepped and isinstance(self._chassis, AckermannChassis): logging.warning( f"Agent `{self._id}` has called set pose after step." "This may cause collision problems" ) # TODO: Merge this w/ speed setter as a set GCD call?
[docs] def set_pose(self, pose: Pose): """Use with caution. This will directly set the pose of the chassis. This may disrupt physics simulation of the chassis physics body for a few steps after use. """ self._warn_AckermannChassis_set_pose() self._chassis.set_pose(pose)
[docs] def swap_chassis(self, chassis: Chassis): """Swap the current chassis with the given chassis. Apply the GCD of the previous chassis to the new chassis ("greatest common denominator state" from front-end to back-end) """ chassis.inherit_physical_values(self._chassis) self._chassis.teardown() self._chassis = chassis
[docs] def teardown(self, renderer: RendererBase, exclude_chassis: bool = False): """Clean up internal resources""" if not exclude_chassis: self._chassis.teardown() if renderer: renderer.remove_vehicle_node(self._id) self._initialized = False
[docs] def attach_sensor(self, sensor: Sensor, sensor_name: str): """replace previously-attached sensor with this one (to allow updating its parameters). Sensors might have been attached to a non-agent vehicle (for example, for observation collection from history vehicles), but if that vehicle gets hijacked, we want to use the sensors specified by the hijacking agent's interface.""" detach = getattr(self, f"detach_{sensor_name}") if detach: self.detach_sensor(sensor_name) self._log.debug("Replaced existing %s on vehicle %s", sensor_name, self.id) setattr(self, f"_{sensor_name}", sensor) self._sensors[sensor_name] = sensor
[docs] def detach_sensor(self, sensor_name: str): """Detach a sensor by name.""" self._log.debug("Removed existing %s on vehicle %s", sensor_name, self.id) sensor = getattr(self, f"_{sensor_name}", None) if sensor is not None: setattr(self, f"_{sensor_name}", None) del self._sensors[sensor_name] return sensor
[docs] def subscribed_to(self, sensor_name: str): """Confirm if the sensor is subscribed.""" sensor = getattr(self, f"_{sensor_name}", None) return sensor is not None
[docs] def sensor_property( self, sensor_name: str, default: Optional[Union[Sensor, Ellipsis.__class__]] = ..., ): """Call a sensor by name.""" sensor = getattr(self, f"_{sensor_name}", None if default is ... else default) assert ( sensor is not None or default is not ... ), f"'{sensor_name}' is not attached to '{self.id}'" return sensor
def _meta_create_instance_sensor_functions(self): for sensor_name in Vehicle._sensor_names: setattr(self, f"_{sensor_name}", None) setattr( self, f"attach_{sensor_name}", partial(self.attach_sensor, sensor_name=sensor_name), ) setattr( self, f"detach_{sensor_name}", partial(self.detach_sensor, sensor_name=sensor_name), ) @classmethod @lru_cache(1) def _meta_create_class_sensor_functions(cls): for sensor_name in cls._sensor_names: setattr( cls, f"subscribed_to_{sensor_name}", property(partial(cls.subscribed_to, sensor_name=sensor_name)), ) setattr( Vehicle, f"{sensor_name}", property(partial(cls.sensor_property, sensor_name=sensor_name)), ) def detach_all_sensors_from_vehicle(vehicle): sensors = [] for sensor_name in cls._sensor_names: detach_sensor_func = getattr(vehicle, f"detach_{sensor_name}") sensors.append(detach_sensor_func()) return sensors setattr( cls, "detach_all_sensors_from_vehicle", staticmethod(detach_all_sensors_from_vehicle), ) def _meta_create_sensor_functions(self): # Bit of metaprogramming to make sensor creation more DRY self._meta_create_instance_sensor_functions() self._meta_create_class_sensor_functions()