Source code for smarts.core.sensor

# MIT License
#
# Copyright (C) 2023. Huawei Technologies Co., Ltd. All rights reserved.
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NON-INFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
from __future__ import annotations

import abc
import importlib.resources as pkg_resources
import logging
import math
import sys
from collections import deque
from dataclasses import dataclass
from functools import lru_cache
from typing import TYPE_CHECKING, Any, Collection, Dict, List, Optional, Tuple, Union

import numpy as np

from smarts.core import glsl
from smarts.core.actor import ActorState
from smarts.core.agent_interface import (
    CustomRenderBufferDependency,
    CustomRenderCameraDependency,
    CustomRenderVariableDependency,
    RenderDependencyBase,
)
from smarts.core.coordinates import Dimensions, Pose, RefLinePoint
from smarts.core.lidar import Lidar
from smarts.core.masks import RenderMasks
from smarts.core.observations import (
    CustomRenderData,
    DrivableAreaGridMap,
    GridMapMetadata,
    Observation,
    OcclusionRender,
    OccupancyGridMap,
    RoadWaypoints,
    SignalObservation,
    TopDownRGB,
    ViaPoint,
)
from smarts.core.renderer_base import (
    RendererBase,
    ShaderStepBufferDependency,
    ShaderStepCameraDependency,
    ShaderStepVariableDependency,
)
from smarts.core.road_map import RoadMap, Waypoint
from smarts.core.shader_buffer import BufferID, CameraSensorID
from smarts.core.signals import SignalState
from smarts.core.simulation_frame import SimulationFrame
from smarts.core.utils.core_math import squared_dist
from smarts.core.vehicle_state import neighborhood_vehicles_around_vehicle

if TYPE_CHECKING:
    from smarts.core.actor import ActorState
    from smarts.core.lidar_sensor_params import SensorParams
    from smarts.core.plan import Plan
    from smarts.core.provider import ProviderState
    from smarts.core.simulation_frame import SimulationFrame
    from smarts.core.vehicle_state import VehicleState


def _gen_sensor_name(base_name: str, vehicle_state: VehicleState):
    return _gen_base_sensor_name(base_name, vehicle_state.actor_id)


def _gen_base_sensor_name(base_name: str, actor_id: str):
    return f"{base_name}_{actor_id}"


[docs]class Sensor(metaclass=abc.ABCMeta): """The sensor base class."""
[docs] def step(self, sim_frame: SimulationFrame, **kwargs): """Update sensor state."""
[docs] @abc.abstractmethod def teardown(self, **kwargs): """Clean up internal resources""" raise NotImplementedError
@abc.abstractmethod def __call__(self, *args: Any, **kwds: Any) -> Any: raise NotImplementedError @property def mutable(self) -> bool: """If this sensor mutates on call.""" return True @property def serializable(self) -> bool: """If this sensor can be serialized.""" return True
[docs]class CameraSensor(Sensor): """The base for a sensor that renders images.""" def __init__( self, vehicle_state: VehicleState, renderer: RendererBase, name: str, mask: int, width: int, height: int, resolution: float, build_camera: bool = True, ): assert renderer self._log = logging.getLogger(self.__class__.__name__) self._name = name self._camera_name = _gen_sensor_name(name, vehicle_state) if build_camera: renderer.build_offscreen_camera( self._camera_name, mask, width, height, resolution, ) self._follow_actor( vehicle_state, renderer ) # ensure we have a correct initial camera position self._target_actor = vehicle_state.actor_id self._mask = mask self._width = width self._height = height self._resolution = resolution def __eq__(self, __value: object) -> bool: return ( isinstance(__value, self.__class__) and self._target_actor == __value._target_actor and self._camera_name == __value._camera_name and self._mask == __value._mask and self._width == __value._width and self._height == __value._height and self._resolution == __value._resolution )
[docs] def teardown(self, **kwargs): renderer: Optional[RendererBase] = kwargs.get("renderer") if not renderer: return camera = renderer.camera_for_id(self._camera_name) camera.teardown()
[docs] def step(self, sim_frame: SimulationFrame, **kwargs): if not self._target_actor in sim_frame.actor_states_by_id: return self._follow_actor( sim_frame.actor_states_by_id[self._target_actor], kwargs.get("renderer") )
def _follow_actor(self, actor_state: ActorState, renderer: RendererBase): if not renderer: return camera = renderer.camera_for_id(self._camera_name) pose = actor_state.get_pose() dimensions = actor_state.get_dimensions() if pose == None or dimensions == None: return camera.update(pose, dimensions.height + 10) @property def camera_name(self) -> str: """The name of the camera this sensor is using.""" return self._camera_name @property def name(self) -> str: """The name of this sensor.""" return self._name @property def serializable(self) -> bool: return False
[docs]class DrivableAreaGridMapSensor(CameraSensor): """A sensor that renders drivable area from around its target actor.""" def __init__( self, vehicle_state: VehicleState, width: int, height: int, resolution: float, renderer: RendererBase, ): super().__init__( vehicle_state, renderer, CameraSensorID.DRIVABLE_AREA_GRID_MAP.value, RenderMasks.DRIVABLE_AREA_HIDE, width, height, resolution, ) self._resolution = resolution def __call__(self, renderer: RendererBase) -> DrivableAreaGridMap: camera = renderer.camera_for_id(self._camera_name) assert camera is not None, "Drivable area grid map has not been initialized" ram_image = camera.wait_for_ram_image(img_format="A") mem_view = memoryview(ram_image) image: np.ndarray = np.frombuffer(mem_view, np.uint8) width, height = camera.image_dimensions image.shape = (height, width, 1) image = np.flipud(image) metadata = GridMapMetadata( resolution=self._resolution, height=image.shape[0], width=image.shape[1], camera_position=camera.position, camera_heading=camera.heading, ) return DrivableAreaGridMap(data=image, metadata=metadata)
[docs]class OGMSensor(CameraSensor): """A sensor that renders occupancy information from around its target actor.""" def __init__( self, vehicle_state: VehicleState, width: int, height: int, resolution: float, renderer: RendererBase, ): super().__init__( vehicle_state, renderer, CameraSensorID.OCCUPANCY_GRID_MAP.value, RenderMasks.OCCUPANCY_HIDE, width, height, resolution, ) def __call__(self, renderer: RendererBase) -> OccupancyGridMap: base_camera = renderer.camera_for_id(self._camera_name) assert base_camera is not None, "OGM has not been initialized" ram_image = base_camera.wait_for_ram_image(img_format="A") mem_view = memoryview(ram_image) grid: np.ndarray = np.frombuffer(mem_view, np.uint8) width, height = base_camera.image_dimensions grid.shape = (height, width, 1) grid = np.flipud(grid) metadata = GridMapMetadata( resolution=self._resolution, height=grid.shape[0], width=grid.shape[1], camera_position=base_camera.position, camera_heading=base_camera.heading, ) return OccupancyGridMap(data=grid, metadata=metadata)
[docs]class RGBSensor(CameraSensor): """A sensor that renders color values from around its target actor.""" def __init__( self, vehicle_state: VehicleState, width: int, height: int, resolution: float, renderer: RendererBase, ): super().__init__( vehicle_state, renderer, CameraSensorID.TOP_DOWN_RGB.value, RenderMasks.RGB_HIDE, width, height, resolution, ) self._resolution = resolution def __call__(self, renderer: RendererBase) -> TopDownRGB: camera = renderer.camera_for_id(self._camera_name) assert camera is not None, "RGB has not been initialized" ram_image = camera.wait_for_ram_image(img_format="RGB") mem_view = memoryview(ram_image) image: np.ndarray = np.frombuffer(mem_view, np.uint8) width, height = camera.image_dimensions image.shape = (height, width, 3) image = np.flipud(image) metadata = GridMapMetadata( resolution=self._resolution, height=image.shape[0], width=image.shape[1], camera_position=camera.position, camera_heading=camera.heading, ) return TopDownRGB(data=image, metadata=metadata)
[docs]class OcclusionMapSensor(CameraSensor): """A sensor that demonstrates only the areas that can be seen by the vehicle.""" def __init__( self, vehicle_state: VehicleState, width: int, height: int, resolution: float, renderer: RendererBase, ogm_sensor: OGMSensor, add_surface_noise: bool, ): self._effect_cameras = [] super().__init__( vehicle_state, renderer, CameraSensorID.OCCLUSION.value, RenderMasks.NONE, width, height, resolution, build_camera=False, ) occlusion_camera0 = ogm_sensor.camera_name occlusion_camera1 = occlusion_camera0 if add_surface_noise: # generate simplex camera with pkg_resources.path(glsl, "simplex_shader.frag") as simplex_shader_path: simplex_camera_name = _gen_sensor_name("simplex", vehicle_state) renderer.build_shader_step( name=simplex_camera_name, fshader_path=simplex_shader_path, dependencies=( ShaderStepVariableDependency( value=1.0 / resolution, script_variable_name="scale", ), ShaderStepCameraDependency( _gen_sensor_name( CameraSensorID.DRIVABLE_AREA_GRID_MAP.value, vehicle_state, ), "iChannel0", ), ), priority=10, width=width, height=height, ) self._effect_cameras.append(simplex_camera_name) occlusion_camera1 = simplex_camera_name # feed simplex and ogm to composite with pkg_resources.path(glsl, "occlusion_shader.frag") as composite_shader_path: composite_camera_name = _gen_sensor_name( CameraSensorID.OCCLUSION.value, vehicle_state ) renderer.build_shader_step( name=composite_camera_name, fshader_path=composite_shader_path, dependencies=( ShaderStepCameraDependency(occlusion_camera0, "iChannel0"), ShaderStepCameraDependency(occlusion_camera1, "iChannel1"), ), priority=30, width=width, height=height, ) self._effect_cameras.append(composite_camera_name) def _follow_actor(self, actor_state: ActorState, renderer: RendererBase): if not renderer: return for effect_name in self._effect_cameras: pose = actor_state.get_pose() dimensions = actor_state.get_dimensions() if pose == None or dimensions == None: continue camera = renderer.camera_for_id(effect_name) camera.update(pose, dimensions.height)
[docs] def teardown(self, **kwargs): renderer: Optional[RendererBase] = kwargs.get("renderer") if not renderer: return for effect_name in self._effect_cameras: camera = renderer.camera_for_id(effect_name) camera.teardown()
def __call__(self, renderer: RendererBase) -> OcclusionRender: effect_camera = renderer.camera_for_id(self._effect_cameras[-1]) ram_image = effect_camera.wait_for_ram_image("RGB") mem_view = memoryview(ram_image) grid: np.ndarray = np.frombuffer(mem_view, np.uint8)[::3] grid.shape = effect_camera.image_dimensions grid = np.flipud(grid) metadata = GridMapMetadata( resolution=self._resolution, height=grid.shape[0], width=grid.shape[1], camera_position=(math.inf, math.inf, math.inf), camera_heading=math.inf, ) return OcclusionRender(data=grid, metadata=metadata)
[docs]class CustomRenderSensor(CameraSensor): """Defines a configurable image sensor.""" def __init__( self, vehicle_state: VehicleState, width: int, height: int, resolution: float, renderer: RendererBase, fragment_shader_path: str, render_dependencies: Tuple[RenderDependencyBase, ...], ogm_sensor: Optional[OGMSensor], top_down_rgb_sensor: Optional[RGBSensor], drivable_area_grid_map_sensor: Optional[DrivableAreaGridMapSensor], occlusion_map_sensor: Optional[OcclusionMapSensor], name: str, ): super().__init__( vehicle_state, renderer, name, RenderMasks.NONE, width, height, resolution, build_camera=False, ) dependencies = [] named_camera_sensors = ( (CameraSensorID.OCCUPANCY_GRID_MAP, ogm_sensor), (CameraSensorID.TOP_DOWN_RGB, top_down_rgb_sensor), (CameraSensorID.DRIVABLE_AREA_GRID_MAP, drivable_area_grid_map_sensor), (CameraSensorID.OCCLUSION, occlusion_map_sensor), ) def has_required(dependency_name, required_name, sensor) -> bool: if dependency_name == required_name: if sensor: return True raise UserWarning( f"Custom render depency requires `{d.name}` but the sensor is not attached in the interface." ) return False for d in render_dependencies: if isinstance(d, CustomRenderCameraDependency): for csn, sensor in named_camera_sensors: if has_required(d.camera_dependency_name, csn.value, sensor): break camera_id = ( _gen_sensor_name(d.camera_dependency_name, vehicle_state) if d.is_self_targetted() else _gen_base_sensor_name(d.camera_dependency_name, d.target_actor) ) dependency = ShaderStepCameraDependency( camera_id, d.variable_name, ) elif isinstance(d, CustomRenderVariableDependency): dependency = ShaderStepVariableDependency( value=d.value, script_variable_name=d.variable_name, ) elif isinstance(d, CustomRenderBufferDependency): if isinstance(d.buffer_dependency_name, str): buffer_name = BufferID(d.buffer_dependency_name) else: buffer_name = d.buffer_dependency_name dependency = ShaderStepBufferDependency( buffer_id=buffer_name, script_uniform_name=d.variable_name, ) else: raise TypeError( f"Dependency must be a subtype of `{RenderDependencyBase}`" ) dependencies.append(dependency) renderer.build_shader_step( name=self._camera_name, fshader_path=fragment_shader_path, dependencies=dependencies, priority=40, width=width, height=height, )
[docs] def step(self, sim_frame: SimulationFrame, **kwargs): if not self._target_actor in sim_frame.actor_states_by_id: return actor_state = sim_frame.actor_states_by_id[self._target_actor] renderer = kwargs.get("renderer") observations: Optional[Dict[str, Observation]] = kwargs.get("observations") target = None if isinstance(observations, dict): for k, o in observations.items(): o: Observation if o.ego_vehicle_state.id == self._target_actor: target = o assert isinstance(target, Observation) if not renderer: return renderer: RendererBase camera = renderer.camera_for_id(self._camera_name) pose = actor_state.get_pose() dimensions = actor_state.get_dimensions() if not target: camera.update( pose=pose, height=(dimensions.height + 10) if dimensions else None ) else: camera.update(observation=target)
[docs] def teardown(self, **kwargs): renderer = kwargs.get("renderer") if not renderer: return renderer: RendererBase camera = renderer.camera_for_id(self._camera_name) camera.teardown()
def __call__(self, renderer: RendererBase) -> CustomRenderData: effect_camera = renderer.camera_for_id(self._camera_name) ram_image = effect_camera.wait_for_ram_image("RGB") mem_view = memoryview(ram_image) grid: np.ndarray = np.frombuffer(mem_view, np.uint8) grid.shape = effect_camera.image_dimensions + (3,) grid = np.flipud(grid) metadata = GridMapMetadata( resolution=self._resolution, height=grid.shape[0], width=grid.shape[1], camera_position=(math.inf, math.inf, math.inf), # has no position camera_heading=math.inf, # has no heading ) return CustomRenderData(data=grid, metadata=metadata)
[docs]class LidarSensor(Sensor): """A lidar sensor.""" def __init__( self, vehicle_state: VehicleState, sensor_params: Optional[SensorParams] = None, lidar_offset=(0, 0, 1), ): self._lidar_offset = np.array(lidar_offset) self._lidar = Lidar( vehicle_state.pose.position + self._lidar_offset, sensor_params, )
[docs] def follow_vehicle(self, vehicle_state: VehicleState): """Update the sensor to target the given vehicle.""" self._lidar.origin = vehicle_state.pose.position + self._lidar_offset
def __eq__(self, __value: object) -> bool: return isinstance(__value, LidarSensor) and ( (self._lidar_offset == __value._lidar_offset).all() and (self._lidar.origin == __value._lidar.origin).all() ) def __call__(self, bullet_client): return self._lidar.compute_point_cloud(bullet_client)
[docs] def teardown(self, **kwargs): pass
@property def serializable(self) -> bool: return False
@dataclass(frozen=True) class _DrivenPathSensorEntry: timestamp: float position: Tuple[float, float]
[docs]class DrivenPathSensor(Sensor): """Tracks the driven path as a series of positions (regardless if the vehicle is following the route or not). For performance reasons it only keeps the last N=max_path_length path segments. """ def __init__(self, max_path_length: int = 500): self._driven_path = deque(maxlen=max_path_length)
[docs] def track_latest_driven_path(self, elapsed_sim_time, vehicle_state): """Records the current location of the tracked vehicle.""" position = vehicle_state.pose.position[:2] self._driven_path.append( _DrivenPathSensorEntry(timestamp=elapsed_sim_time, position=tuple(position)) )
def __call__(self, count=sys.maxsize): return [x.position for x in self._driven_path][-count:]
[docs] def teardown(self, **kwargs): pass
def __eq__(self, __value: object) -> bool: return ( isinstance(__value, DrivenPathSensor) and self._driven_path == __value._driven_path )
[docs] def distance_travelled( self, elapsed_sim_time, last_n_seconds: Optional[float] = None, last_n_steps: Optional[int] = None, ): """Find the amount of distance travelled over the last # of seconds XOR steps""" if last_n_seconds is None and last_n_steps is None: raise ValueError("Either last N seconds or last N steps must be provided") if last_n_steps is not None: n = last_n_steps + 1 # to factor in the current step we're on filtered_pos = [x.position for x in self._driven_path][-n:] else: # last_n_seconds threshold = elapsed_sim_time - last_n_seconds filtered_pos = [ x.position for x in self._driven_path if x.timestamp >= threshold ] xs = np.array([p[0] for p in filtered_pos]) ys = np.array([p[1] for p in filtered_pos]) dist_array = (xs[:-1] - xs[1:]) ** 2 + (ys[:-1] - ys[1:]) ** 2 return np.sum(np.sqrt(dist_array))
[docs]class TripMeterSensor(Sensor): """Tracks distance travelled along the route (in meters). Meters driven while off-route are not counted as part of the total. """ def __init__(self): self._wps_for_distance: List[Waypoint] = [] self._dist_travelled = 0.0 self._last_dist_travelled = 0.0 self._last_actor_position = None def __eq__(self, __value: object) -> bool: return isinstance(__value, TripMeterSensor) and ( self._wps_for_distance == __value._wps_for_distance and self._dist_travelled == __value._dist_travelled and self._last_dist_travelled == __value._last_dist_travelled )
[docs] def update_distance_wps_record( self, waypoint_paths: List[List[Waypoint]], vehicle_state: VehicleState, plan: Plan, road_map: RoadMap, ): """Append a waypoint to the history if it is not already counted.""" # Distance calculation. Intention is the shortest trip travelled at the lane # level the agent has travelled. This is to prevent lateral movement from # increasing the total distance travelled. self._last_dist_travelled = self._dist_travelled new_wp = waypoint_paths[0][0] wp_road = road_map.lane_by_id(new_wp.lane_id).road.road_id should_count_wp = ( plan.mission == None # if we do not have a fixed route, we count all waypoints we accumulate or not plan.mission.requires_route # if we have a route to follow, only count wps on route or wp_road in [road.road_id for road in plan.route.roads] ) if not self._wps_for_distance: self._last_actor_position = vehicle_state.pose.position if should_count_wp: self._wps_for_distance.append(new_wp) return # sensor does not have enough history most_recent_wp = self._wps_for_distance[-1] # TODO: Instead of storing a waypoint every 0.5m just find the next one immediately threshold_for_counting_wp = 0.5 # meters from last tracked waypoint if ( np.linalg.norm(new_wp.pos - most_recent_wp.pos) > threshold_for_counting_wp and should_count_wp ): self._wps_for_distance.append(new_wp) additional_distance = TripMeterSensor._compute_additional_dist_travelled( most_recent_wp, new_wp, vehicle_state.pose.position, self._last_actor_position, ) self._dist_travelled += additional_distance self._last_actor_position = vehicle_state.pose.position
@staticmethod def _compute_additional_dist_travelled( recent_wp: Waypoint, new_waypoint: Waypoint, vehicle_position: np.ndarray, last_vehicle_pos: np.ndarray, ): # old waypoint minus current ahead waypoint wp_disp_vec = new_waypoint.pos - recent_wp.pos # make unit vector wp_unit_vec = wp_disp_vec / (np.linalg.norm(wp_disp_vec) or 1) # vehicle position minus last vehicle position position_disp_vec = vehicle_position[:2] - last_vehicle_pos[:2] # distance of vehicle between last and current wp distance = np.dot(position_disp_vec, wp_unit_vec) return distance def __call__(self, increment: bool = False): if increment: return self._dist_travelled - self._last_dist_travelled return self._dist_travelled
[docs] def teardown(self, **kwargs): pass
[docs]class NeighborhoodVehiclesSensor(Sensor): """Detects other vehicles around the sensor equipped vehicle.""" def __init__(self, radius: Optional[float] = None): self._radius = radius @property def radius(self) -> float: """Radius to check for nearby vehicles.""" return self._radius def __call__( self, vehicle_state: VehicleState, vehicle_states: Collection[VehicleState] ) -> List[VehicleState]: return neighborhood_vehicles_around_vehicle( vehicle_state, vehicle_states, radius=self._radius ) def __eq__(self, __value: object) -> bool: return ( isinstance(__value, NeighborhoodVehiclesSensor) and self._radius == __value._radius )
[docs] def teardown(self, **kwargs): pass
@property def mutable(self) -> bool: return False
[docs]class WaypointsSensor(Sensor): """Detects waypoints leading forward along the vehicle plan.""" def __init__(self, lookahead: int = 32): self._lookahead = lookahead def __call__(self, vehicle_state: VehicleState, plan: Plan, road_map: RoadMap): return road_map.waypoint_paths( pose=vehicle_state.pose, lookahead=self._lookahead, route=plan.route, ) def __eq__(self, __value: object) -> bool: return ( isinstance(__value, WaypointsSensor) and self._lookahead == __value._lookahead )
[docs] def teardown(self, **kwargs): pass
@property def mutable(self) -> bool: return False
[docs]class RoadWaypointsSensor(Sensor): """Detects waypoints from all paths nearby the vehicle.""" def __init__(self, horizon: int = 32): self._horizon = horizon def __call__( self, vehicle_state: VehicleState, plan: Plan, road_map: RoadMap ) -> RoadWaypoints: veh_pt = vehicle_state.pose.point lane = road_map.nearest_lane(veh_pt) if not lane: return RoadWaypoints(lanes={}) road = lane.road lane_paths = {} for croad in ( [road] + road.parallel_roads + road.oncoming_roads_at_point(veh_pt) ): for lane in croad.lanes: lane_paths[lane.lane_id] = self._paths_for_lane( lane, vehicle_state, plan ) return RoadWaypoints(lanes=lane_paths) def _paths_for_lane( self, lane: RoadMap.Lane, vehicle_state: VehicleState, plan: Plan, overflow_offset: Optional[float] = None, ): """Gets waypoint paths along the given lane.""" # XXX: the following assumes waypoint spacing is 1m if overflow_offset is None: offset = lane.offset_along_lane(vehicle_state.pose.point) start_offset = offset - self._horizon else: start_offset = lane.length + overflow_offset incoming_lanes = lane.incoming_lanes paths = [] if start_offset < 0 and len(incoming_lanes) > 0: for lane in incoming_lanes: paths += self._paths_for_lane(lane, vehicle_state, plan, start_offset) start_offset = max(0, start_offset) wp_start = lane.from_lane_coord(RefLinePoint(start_offset)) adj_pose = Pose.from_center(wp_start, vehicle_state.pose.heading) wps_to_lookahead = self._horizon * 2 paths += lane.waypoint_paths_for_pose( pose=adj_pose, lookahead=wps_to_lookahead, route=plan.route, ) return paths def __eq__(self, __value: object) -> bool: return isinstance(__value, RoadWaypoints) and self._horizon == __value._horizon
[docs] def teardown(self, **kwargs): pass
@property def mutable(self) -> bool: return False
[docs]class AccelerometerSensor(Sensor): """Tracks motion changes within the vehicle equipped with this sensor.""" def __init__(self): self.linear_velocities = deque(maxlen=3) self.angular_velocities = deque(maxlen=3) def __call__(self, linear_velocity, angular_velocity, dt: float): if linear_velocity is not None: self.linear_velocities.append(linear_velocity) if angular_velocity is not None: self.angular_velocities.append(angular_velocity) linear_acc = np.array((0.0, 0.0, 0.0)) angular_acc = np.array((0.0, 0.0, 0.0)) linear_jerk = np.array((0.0, 0.0, 0.0)) angular_jerk = np.array((0.0, 0.0, 0.0)) if not dt: return (linear_acc, angular_acc, linear_jerk, angular_jerk) if len(self.linear_velocities) >= 2: linear_acc = (self.linear_velocities[-1] - self.linear_velocities[-2]) / dt if len(self.linear_velocities) >= 3: last_linear_acc = ( self.linear_velocities[-2] - self.linear_velocities[-3] ) / dt linear_jerk = linear_acc - last_linear_acc if len(self.angular_velocities) >= 2: angular_acc = ( self.angular_velocities[-1] - self.angular_velocities[-2] ) / dt if len(self.angular_velocities) >= 3: last_angular_acc = ( self.angular_velocities[-2] - self.angular_velocities[-3] ) / dt angular_jerk = angular_acc - last_angular_acc return (linear_acc, angular_acc, linear_jerk, angular_jerk) def __eq__(self, __value: object) -> bool: return isinstance(__value, AccelerometerSensor) and ( [ (a == b).all() for a, b in zip(self.linear_velocities, __value.linear_velocities) ] and [ (a == b).all() for a, b in zip(self.angular_velocities, __value.angular_velocities) ] )
[docs] def teardown(self, **kwargs): pass
[docs]class LanePositionSensor(Sensor): """Tracks lane-relative RefLine (Frenet) coordinates.""" def __init__(self): pass def __call__(self, lane: RoadMap.Lane, vehicle_state: VehicleState): return lane.to_lane_coord(vehicle_state.pose.point) def __eq__(self, __value: object) -> bool: return True
[docs] def teardown(self, **kwargs): pass
@property def mutable(self) -> bool: return False
[docs]class ViaSensor(Sensor): """Tracks collection of ViaPoint collectibles""" def __init__(self, lane_acquisition_range, speed_accuracy): self._consumed_via_points = set() self._acquisition_range = lane_acquisition_range self._speed_accuracy = speed_accuracy def __eq__(self, __value: object) -> bool: return isinstance(__value, ViaSensor) and ( self._consumed_via_points == __value._consumed_via_points and self._acquisition_range == __value._acquisition_range and self._speed_accuracy == __value._speed_accuracy ) def __call__(self, vehicle_state: VehicleState, plan: Plan, road_map: RoadMap): if plan.mission is None: return () near_points: List[Tuple[float, ViaPoint]] = [] vehicle_position = vehicle_state.pose.position[:2] @lru_cache() def closest_point_on_lane(position, lane_id, road_map): lane = road_map.lane_by_id(lane_id) return lane.center_at_point(position) for via in plan.mission.via: closest_position_on_lane = closest_point_on_lane( tuple(vehicle_position), via.lane_id, road_map ) closest_position_on_lane = closest_position_on_lane[:2] dist_from_lane_sq = squared_dist(vehicle_position, closest_position_on_lane) if dist_from_lane_sq > self._acquisition_range**2: continue dist_from_point_sq = squared_dist(vehicle_position, via.position) hit = ( dist_from_point_sq <= via.hit_distance**2 and via not in self._consumed_via_points and np.isclose( vehicle_state.speed, via.required_speed, atol=self._speed_accuracy ) ) point = ViaPoint( tuple(via.position), lane_index=via.lane_index, road_id=via.road_id, required_speed=via.required_speed, hit=hit, ) near_points.append((dist_from_point_sq, point)) if hit: self._consumed_via_points.add(via) near_points.sort(key=lambda dist, _: dist) return tuple(p for _, p in near_points)
[docs] def teardown(self, **kwargs): pass
[docs]class SignalsSensor(Sensor): """Reports state of traffic signals (lights) in the lanes ahead of vehicle.""" def __init__(self, lookahead: float): self._lookahead = lookahead def __eq__(self, __value: object) -> bool: return ( isinstance(__value, SignalsSensor) and self._lookahead == __value._lookahead ) @staticmethod def _is_signal_type(feature: RoadMap.Feature) -> bool: # XXX: eventually if we add other types of dynamic features, we'll need to update this. return ( feature.type == RoadMap.FeatureType.FIXED_LOC_SIGNAL or feature.is_dynamic ) def __call__( self, lane: Optional[RoadMap.Lane], lane_pos: RefLinePoint, state: VehicleState, plan: Plan, provider_state: ProviderState, ) -> Tuple[SignalObservation, ...]: if not lane: return () result = [] upcoming_signals = [] for feat in lane.features: if not self._is_signal_type(feat): continue for pt in feat.geometry: # TAI: we might want to use the position of our back bumper # instead of centroid to allow agents to have some (even more) # imprecision in their handling of stopping at signals. if lane.offset_along_lane(pt) >= lane_pos.s: upcoming_signals.append(feat) break lookahead = self._lookahead - lane.length + lane_pos.s self._find_signals_ahead(lane, lookahead, plan.route, upcoming_signals) for signal in upcoming_signals: for actor_state in provider_state.actors: if actor_state.actor_id == signal.feature_id: signal_state = actor_state break else: logger = logging.getLogger(self.__class__.__name__) logger.warning( "could not find signal state corresponding with feature_id=%s}", signal.feature_id, ) continue assert isinstance(signal_state, SignalState) controlled_lanes = None if signal_state.controlled_lanes: controlled_lanes = signal_state.controlled_lanes result.append( SignalObservation( state=signal_state.state, stop_point=signal_state.stopping_pos, controlled_lanes=tuple(controlled_lanes), last_changed=signal_state.last_changed, ) ) return tuple(result) def _find_signals_ahead( self, lane: RoadMap.Lane, lookahead: float, route: Optional[RoadMap.Route], upcoming_signals: List[RoadMap.Feature], ): if lookahead <= 0: return for ogl in lane.outgoing_lanes: if route and route.road_length > 0 and ogl.road not in route.roads: continue upcoming_signals += [ feat for feat in ogl.features if self._is_signal_type(feat) ] self._find_signals_ahead( ogl, lookahead - lane.length, route, upcoming_signals )
[docs] def teardown(self, **kwargs): pass