Source code for smarts.env.utils.observation_conversion

# MIT License
#
# Copyright (C) 2022. Huawei Technologies Co., Ltd. All rights reserved.
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NON-INFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
from __future__ import annotations

import math
import warnings
from enum import IntEnum
from functools import cached_property
from typing import Any, Callable, Dict, List, Optional, Tuple

import gymnasium as gym
import numpy as np

from smarts.core.agent_interface import AgentInterface
from smarts.core.observations import Observation, SignalObservation, VehicleObservation
from smarts.core.plan import NavigationMission
from smarts.core.road_map import Waypoint

_LIDAR_SHP = 300
_NEIGHBOR_SHP = 50
_WAYPOINT_SHP = (12, 80)
_SIGNALS_SHP = (3,)
_POSITION_SHP = (3,)
_WAYPOINT_NAME_LIMIT = 50
_ID_NAME_LIMIT = _WAYPOINT_NAME_LIMIT
_WAYPOINT_CHAR_SET = frozenset(
    "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789-_=+.,;\"' "
)
_ID_CHAR_SET = _WAYPOINT_CHAR_SET

_VEC3_SIGNED_FLOAT32_SPACE = gym.spaces.Box(
    low=-1e10, high=1e10, shape=(3,), dtype=np.float32
)
_VEC3_UNSIGNED_FLOAT32_SPACE = gym.spaces.Box(
    low=0, high=1e10, shape=(3,), dtype=np.float32
)
_VEC3_SIGNED_FLOAT64_SPACE = gym.spaces.Box(
    low=-1e10, high=1e10, shape=(3,), dtype=np.float64
)
_SIGNED_FLOAT32_SPACE = gym.spaces.Box(low=-1e10, high=1e10, shape=(), dtype=np.float32)
_UNSIGNED_FLOAT32_SPACE = gym.spaces.Box(low=0, high=1e10, shape=(), dtype=np.float32)
_SIGNED_RADIANS_FLOAT32_SPACE = gym.spaces.Box(
    low=-math.pi, high=math.pi, shape=(), dtype=np.float32
)
_UNSIGNED_RADIANS_FLOAT32_SPACE = gym.spaces.Box(
    low=0, high=2 * math.pi, shape=(), dtype=np.float32
)
_UNSIGNED_INT8_SPACE = gym.spaces.Box(low=0, high=127, shape=(), dtype=np.int8)
_UNSIGNED_INT64_SPACE = gym.spaces.Box(low=0, high=1e10, shape=(), dtype=np.int64)
_DISCRETE2_SPACE = gym.spaces.Discrete(n=2)
_LANE_ID_SPACE = gym.spaces.Text(_WAYPOINT_NAME_LIMIT, charset=_WAYPOINT_CHAR_SET)


def _format_id(lane_id: str, max_length, type_):
    lane_name = lane_id
    if len(lane_name) > max_length:
        warnings.warn(
            f"`{type_}` named `{lane_name}` is more than "
            f"`{max_length}` characters long. It will be truncated "
            "and may cause unintended issues with navigation and lane identification.",
            category=UserWarning,
        )
    return lane_name[:max_length]


def _format_mission(mission: NavigationMission):
    if hasattr(mission.goal, "position"):
        goal_pos = np.array(mission.goal.position, dtype=np.float64)
    else:
        goal_pos = np.zeros((3,), dtype=np.float64)

    return {"goal_position": goal_pos}


def _format_waypoint_paths(waypoint_paths: List[List[Waypoint]]):
    # Truncate all paths to be of the same length
    min_len = min(map(len, waypoint_paths))
    trunc_paths = list(map(lambda x: x[:min_len], waypoint_paths))

    des_shp = _WAYPOINT_SHP
    rcv_shp = (len(trunc_paths), len(trunc_paths[0]))
    pad_shp = [0 if des - rcv < 0 else des - rcv for des, rcv in zip(des_shp, rcv_shp)]

    def extract_elem(waypoint: Waypoint):
        return (
            waypoint.heading,
            waypoint.lane_id,
            waypoint.lane_index,
            waypoint.lane_offset,
            waypoint.lane_width,
            waypoint.pos,
            waypoint.speed_limit,
        )

    paths = [
        map(extract_elem, path[: des_shp[1]]) for path in trunc_paths[: des_shp[0]]
    ]
    heading, lane_id, lane_index, lane_offset, lane_width, pos, speed_limit = zip(
        *[zip(*path) for path in paths]
    )

    heading = np.array(heading, dtype=np.float32)
    lane_id = tuple(
        tuple(
            _format_id(l_id, _WAYPOINT_NAME_LIMIT, "waypoint lane id")
            for l_id in s_lane_id
        )
        for s_lane_id in lane_id
    )
    lane_index = np.array(lane_index, dtype=np.int8)
    lane_offset = np.array(lane_offset, dtype=np.float32)
    lane_width = np.array(lane_width, dtype=np.float32)
    pos = np.array(pos, dtype=np.float64)
    speed_limit = np.array(speed_limit, dtype=np.float32)

    # fmt: off
    heading = np.pad(heading, ((0,pad_shp[0]),(0,pad_shp[1])), mode='constant', constant_values=0)
    lane_id = tuple(l + ("",) * pad_shp[1] for l in lane_id) + tuple(("", ) * des_shp[1] for _ in range(pad_shp[0]))
    lane_index = np.pad(lane_index, ((0,pad_shp[0]),(0,pad_shp[1])), mode='constant', constant_values=0)
    lane_offset = np.pad(lane_offset, ((0,pad_shp[0]),(0,pad_shp[1])), mode='constant', constant_values=0)
    lane_width = np.pad(lane_width, ((0,pad_shp[0]),(0,pad_shp[1])), mode='constant', constant_values=0)
    pos = np.pad(pos, ((0,pad_shp[0]),(0,pad_shp[1]),(0,1)), mode='constant', constant_values=0)
    speed_limit = np.pad(speed_limit, ((0,pad_shp[0]),(0,pad_shp[1])), mode='constant', constant_values=0)
    # fmt: on

    return {
        "heading": heading,
        "lane_id": lane_id,
        "lane_index": lane_index,
        "lane_offset": lane_offset,
        "lane_width": lane_width,
        "position": pos,
        "speed_limit": speed_limit,
    }


def _format_signals(signals: List[SignalObservation]):
    des_shp = _SIGNALS_SHP
    rcv_shp = len(signals)
    pad_shp = max(0, des_shp[0] - rcv_shp)

    if rcv_shp == 0:
        return {
            "state": np.zeros(des_shp, dtype=np.int8),
            "stop_point": np.zeros(des_shp + (2,), dtype=np.float64),
            "last_changed": np.zeros(des_shp, dtype=np.float32),
        }

    signals = [
        (signal.state, signal.stop_point, signal.last_changed)
        for signal in signals[: des_shp[0]]
    ]
    state, stop_point, last_changed = zip(*signals)

    # fmt: off
    state = np.pad(state, ((0, pad_shp)), mode='constant', constant_values=0)
    stop_point = np.pad(stop_point, ((0, pad_shp), (0, 0)), mode='constant', constant_values=0)
    last_changed = np.pad(last_changed, ((0, pad_shp)), mode='constant', constant_values=0)
    # fmt: on

    return {
        "state": state,
        "stop_point": stop_point,
        "last_changed": last_changed,
    }


def _format_neighborhood_vehicle_states_option_lane_position(
    neighborhood_vehicle_states: Tuple[VehicleObservation],
):
    des_shp = _NEIGHBOR_SHP
    rcv_shp = len(neighborhood_vehicle_states)
    pad_shp = 0 if des_shp - rcv_shp < 0 else des_shp - rcv_shp

    if rcv_shp == 0:
        return np.zeros((des_shp, 3), dtype=np.float32)

    lane_positions = [
        nghb.lane_position for nghb in neighborhood_vehicle_states[:des_shp]
    ]
    lane_positions = np.array(lane_positions, dtype=np.float32)
    lane_positions = np.pad(
        lane_positions, ((0, pad_shp), (0, 0)), mode="constant", constant_values=0
    )
    return lane_positions


def _format_neighborhood_vehicle_states(
    neighborhood_vehicle_states: Tuple[VehicleObservation],
    agent_interface: Optional[AgentInterface],
) -> Dict[str, Any]:
    des_shp = _NEIGHBOR_SHP
    rcv_shp = len(neighborhood_vehicle_states)
    pad_shp = 0 if des_shp - rcv_shp < 0 else des_shp - rcv_shp

    if rcv_shp == 0:
        return {
            "box": np.zeros((des_shp, 3), dtype=np.float32),
            "heading": np.zeros((des_shp,), dtype=np.float32),
            "id": [""] * des_shp,
            "interest": np.zeros((des_shp,), dtype=np.int8),
            "lane_id": [""] * des_shp,
            "lane_index": np.zeros((des_shp,), dtype=np.int8),
            "position": np.zeros((des_shp, 3), dtype=np.float64),
            "speed": np.zeros((des_shp,), dtype=np.float32),
        }

    out_nvs = [
        (
            nghb.bounding_box.as_lwh,
            nghb.heading,
            _format_id(nghb.id, _ID_NAME_LIMIT, "vehicle id"),
            nghb.lane_index,
            nghb.position,
            nghb.speed,
            _format_id(nghb.lane_id, _WAYPOINT_NAME_LIMIT, "lane id"),
            nghb.interest,
        )
        for nghb in neighborhood_vehicle_states[:des_shp]
    ]
    box, heading, vehicle_id, lane_index, pos, speed, lane_id, interest = zip(*out_nvs)

    box = np.array(box, dtype=np.float32)
    heading = np.array(heading, dtype=np.float32)
    lane_index = np.array(lane_index, dtype=np.int8)
    pos = np.array(pos, dtype=np.float64)
    speed = np.array(speed, dtype=np.float32)
    interest = np.array(interest, dtype=np.int8)

    # fmt: off
    box = np.pad(box, ((0,pad_shp),(0,0)), mode='constant', constant_values=0)
    heading = np.pad(heading, ((0,pad_shp)), mode='constant', constant_values=0)
    vehicle_id = tuple(vehicle_id + ("",) * pad_shp)
    lane_index = np.pad(lane_index, ((0,pad_shp)), mode='constant', constant_values=0)
    pos = np.pad(pos, ((0,pad_shp),(0,0)), mode='constant', constant_values=0)
    speed = np.pad(speed, ((0,pad_shp)), mode='constant', constant_values=0)
    lane_id = tuple(lane_id + ("",) * pad_shp)
    interest = np.pad(interest, ((0,pad_shp)), mode="constant", constant_values=False)
    # fmt: on

    output = {
        "box": box,
        "heading": heading,
        "id": vehicle_id,
        "interest": interest,
        "lane_id": lane_id,
        "lane_index": lane_index,
        "position": pos,
        "speed": speed,
    }
    if agent_interface is not None and agent_interface.lane_positions:
        output[
            "lane_position"
        ] = _format_neighborhood_vehicle_states_option_lane_position(
            neighborhood_vehicle_states
        )

    return output


def _configure_neighborhood_vehicle_states_space(agent_interface: AgentInterface):
    sub_spaces = {
        "box": gym.spaces.Box(
            low=0, high=1e10, shape=(_NEIGHBOR_SHP, 3), dtype=np.float32
        ),
        "heading": gym.spaces.Box(
            low=-math.pi, high=math.pi, shape=(_NEIGHBOR_SHP,), dtype=np.float32
        ),
        "id": gym.spaces.Tuple(
            (gym.spaces.Text(_ID_NAME_LIMIT, charset=_WAYPOINT_CHAR_SET),)
            * _NEIGHBOR_SHP
        ),
        "lane_index": gym.spaces.Box(
            low=0, high=127, shape=(_NEIGHBOR_SHP,), dtype=np.int8
        ),
        "position": gym.spaces.Box(
            low=-1e10, high=1e10, shape=(_NEIGHBOR_SHP, 3), dtype=np.float64
        ),
        "speed": gym.spaces.Box(
            low=0, high=1e10, shape=(_NEIGHBOR_SHP,), dtype=np.float32
        ),
        "interest": gym.spaces.MultiBinary(_NEIGHBOR_SHP),
    }
    if agent_interface.lane_positions:
        sub_spaces["lane_position"] = gym.spaces.Box(
            low=-1e10, high=1e10, shape=(_NEIGHBOR_SHP, 3), dtype=np.float32
        )
    return gym.spaces.Dict(sub_spaces)


def _format_lidar(
    lidar_point_cloud: Optional[
        Tuple[List[np.ndarray], List[bool], List[Tuple[np.ndarray, np.ndarray]]]
    ]
):
    # # MTA TODO: add lidar configuration like following:
    # sensor_params = self._agent_interface.lidar_point_cloud.sensor_params
    # n_rays = int(
    #     (sensor_params.end_angle - sensor_params.start_angle)
    #     / sensor_params.angle_resolution
    # )
    des_shp = _LIDAR_SHP
    hit = np.array(lidar_point_cloud[1], dtype=np.int8)
    point_cloud = np.array(lidar_point_cloud[0], dtype=np.float64)
    point_cloud = np.nan_to_num(
        point_cloud,
        copy=False,
        nan=np.float64(0),
        posinf=np.float64(0),
        neginf=np.float64(0),
    )
    ray_origin, ray_vector = zip(*(lidar_point_cloud[2]))
    ray_origin = np.array(ray_origin, np.float64)
    ray_vector = np.array(ray_vector, np.float64)

    try:
        assert hit.shape == (des_shp,)
        assert point_cloud.shape == (des_shp, 3)
        assert ray_origin.shape == (des_shp, 3)
        assert ray_vector.shape == (des_shp, 3)
    except Exception as exc:
        raise Exception("Internal Error: Mismatched lidar point cloud shape.") from exc

    return {
        "hit": hit,
        "point_cloud": point_cloud,
        "ray_origin": ray_origin,
        "ray_vector": ray_vector,
    }


[docs]class BaseSpaceFormat: """Defines the base interface for an observation formatter."""
[docs] def format(self, obs: Observation): """Selects and formats the given observation to get a value that matches the space attribute.""" raise NotImplementedError()
[docs] def active(self, agent_interface: AgentInterface) -> bool: """If this formatting is active and should be included in the output.""" raise NotImplementedError()
@property def name(self): """The name that should represent this observation space in hierarchy.""" raise NotImplementedError() @property def space(self): """The observation space this should format the smarts observation to match.""" raise NotImplementedError() def __call__(self, agent_interface: AgentInterface) -> "BaseSpaceFormat": """The observation space this should format the smarts observation to match.""" raise NotImplementedError()
[docs]class StandardSpaceFormat(BaseSpaceFormat): """A formatter that is generated by configuration. This is immutable.""" def __init__( self, formatting_func: Callable[[Observation], Dict[str, Any]], active_func: Callable[[AgentInterface], bool], name: str, space: gym.Space, ) -> None: self._formatting_func = formatting_func self._active_func = active_func self._name = name self._space = space
[docs] def format(self, obs: Observation): """Selects and formats the given observation to get a value that matches the :attr:`space`.""" return self._formatting_func(obs)
[docs] def active(self, agent_interface: AgentInterface) -> bool: """If this formatting is active and should be included in the output.""" return self._active_func(agent_interface)
@property def name(self): """The name that should represent this observation space in hierarchy.""" return self._name @property def space(self): """The observation space this should format the smarts observation to match.""" return self._space def __call__(self, agent_interface: AgentInterface) -> BaseSpaceFormat: return self
[docs]class StandardConfigurableSpaceFormat(BaseSpaceFormat): """A formatter that defers agent interface configuration.""" def __init__( self, formatting_func: Callable[ [Observation, Optional[AgentInterface]], Dict[str, Any] ], active_func: Callable[[AgentInterface], bool], name: str, space_func: Callable[[AgentInterface], gym.Space], *, _agent_interface: Optional[AgentInterface] = None, ) -> None: self._formatting_func = formatting_func self._active_func = active_func self._name = name self._space_func = space_func self._agent_interface = _agent_interface
[docs] def format(self, obs: Observation): """Selects and formats the given observation to get a value that matches the :attr:`space`.""" return self._formatting_func(obs, self._agent_interface)
[docs] def active(self, agent_interface: AgentInterface) -> bool: """If this formatting is active and should be included in the output.""" return self._active_func(agent_interface)
@property def name(self): """The name that should represent this observation space in hierarchy.""" return self._name @property def space(self): """The observation space this should format the smarts observation to match.""" assert ( self._agent_interface is not None ), "Agent interface must be applied to call this method." return self._space_func(self._agent_interface) def __call__(self, agent_interface: AgentInterface) -> BaseSpaceFormat: return type(self)( self._formatting_func, self._active_func, self._name, self._space_func, _agent_interface=agent_interface, )
[docs]class StandardCompoundSpaceFormat(BaseSpaceFormat): """A compound formatter that defers agent interface configuration.""" def __init__( self, space_generators: List[Callable[[AgentInterface], BaseSpaceFormat]], active_func: Callable[[AgentInterface], bool], name: str, *, _spaces: Optional[List[BaseSpaceFormat]] = None, ) -> None: self._space_generators = space_generators self._spaces = _spaces or [] self._active_func = active_func self._name = name
[docs] def format(self, obs: Observation): return {s.name: s.format(obs) for s in self._spaces}
[docs] def active(self, agent_interface: AgentInterface) -> bool: """If this formatting is active and should be included in the output.""" return self._active_func(agent_interface)
@property def name(self): """The name that should represent this observation space in hierarchy.""" return self._name @property def space(self): return gym.spaces.Dict({s.name: s.space for s in self._spaces}) def __call__(self, agent_interface: AgentInterface) -> BaseSpaceFormat: _spaces = [space(agent_interface) for space in self._space_generators] return type(self)( [], self._active_func, self._name, _spaces=[space for space in _spaces if space.active(agent_interface)], )
[docs]class Image8BSpaceFormat(BaseSpaceFormat): """Defines a observation formatter which is an 8-bit image.""" def __init__(self, dimensions, layers: int) -> None: self._dimensions = dimensions self._colors = layers @property def space(self): return gym.spaces.Box( low=0, high=255, shape=(self._dimensions.height, self._dimensions.width, self._colors), dtype=np.uint8, ) def __call__(self, agent_interface: AgentInterface) -> BaseSpaceFormat: return self
ego_box_space_format = StandardSpaceFormat( lambda obs: np.array(obs.ego_vehicle_state.bounding_box.as_lwh, dtype=np.float32), lambda _: True, "box", _VEC3_UNSIGNED_FLOAT32_SPACE, ) ego_heading_space_format = StandardSpaceFormat( lambda obs: np.float32(obs.ego_vehicle_state.heading), lambda _: True, "heading", _SIGNED_RADIANS_FLOAT32_SPACE, ) ego_lane_index_space_format = StandardSpaceFormat( lambda obs: np.int8(obs.ego_vehicle_state.lane_index), lambda _: True, "lane_index", _UNSIGNED_INT8_SPACE, ) ego_linear_velocity_space_format = StandardSpaceFormat( lambda obs: np.array(obs.ego_vehicle_state.linear_velocity, dtype=np.float32), lambda _: True, "linear_velocity", _VEC3_SIGNED_FLOAT32_SPACE, ) ego_angular_velocity_space_format = StandardSpaceFormat( lambda obs: np.array(obs.ego_vehicle_state.angular_velocity, dtype=np.float32), lambda _: True, "angular_velocity", _VEC3_SIGNED_FLOAT32_SPACE, ) ego_position_space_format = StandardSpaceFormat( lambda obs: np.array(obs.ego_vehicle_state.position, dtype=np.float64), lambda _: True, "position", _VEC3_SIGNED_FLOAT64_SPACE, ) ego_speed_space_format = StandardSpaceFormat( lambda obs: np.float32(obs.ego_vehicle_state.speed), lambda _: True, "speed", _UNSIGNED_FLOAT32_SPACE, ) ego_lane_id_space_format = StandardSpaceFormat( lambda obs: _format_id(obs.ego_vehicle_state.lane_id, _WAYPOINT_NAME_LIMIT, "lane"), lambda _: True, "lane_id", _LANE_ID_SPACE, ) ego_steering_space_format = StandardSpaceFormat( lambda obs: np.float32(obs.ego_vehicle_state.steering), lambda _: True, "steering", _SIGNED_RADIANS_FLOAT32_SPACE, ) ego_yaw_rate_space_format = StandardSpaceFormat( lambda obs: np.float32(obs.ego_vehicle_state.yaw_rate), lambda _: True, "yaw_rate", _UNSIGNED_RADIANS_FLOAT32_SPACE, ) ego_angular_acceleration_space_format = StandardSpaceFormat( lambda obs: np.array(obs.ego_vehicle_state.angular_acceleration, dtype=np.float32), lambda agent_interface: bool(agent_interface.accelerometer), "angular_acceleration", _VEC3_SIGNED_FLOAT32_SPACE, ) ego_angular_jerk_space_format = StandardSpaceFormat( lambda obs: np.array(obs.ego_vehicle_state.angular_jerk, dtype=np.float32), lambda agent_interface: bool(agent_interface.accelerometer), "angular_jerk", _VEC3_SIGNED_FLOAT32_SPACE, ) ego_linear_acceleration_space_format = StandardSpaceFormat( lambda obs: np.array(obs.ego_vehicle_state.linear_acceleration, dtype=np.float32), lambda agent_interface: bool(agent_interface.accelerometer), "linear_acceleration", _VEC3_SIGNED_FLOAT32_SPACE, ) ego_linear_jerk_space_format = StandardSpaceFormat( lambda obs: np.array(obs.ego_vehicle_state.linear_jerk, dtype=np.float32), lambda agent_interface: bool(agent_interface.accelerometer), "linear_jerk", _VEC3_SIGNED_FLOAT32_SPACE, ) ego_lane_position_format = StandardSpaceFormat( lambda obs: np.array(obs.ego_vehicle_state.lane_position, dtype=np.float32), lambda agent_interface: bool(agent_interface.lane_positions), "lane_position", _VEC3_SIGNED_FLOAT32_SPACE, ) mission_space_format = StandardSpaceFormat( lambda obs: _format_mission(obs.ego_vehicle_state.mission), lambda _: True, "mission", gym.spaces.Dict( { "goal_position": gym.spaces.Box( low=-1e10, high=1e10, shape=_POSITION_SHP, dtype=np.float64 ) } ), ) distance_travelled_space_format = StandardSpaceFormat( lambda obs: np.float32(obs.distance_travelled), lambda _: True, "distance_travelled", _SIGNED_FLOAT32_SPACE, ) events_interest_done_space_format = StandardSpaceFormat( lambda obs: np.int64(obs.events.interest_done), lambda _: True, "interest_done", _DISCRETE2_SPACE, ) events_agents_alive_done_space_format = StandardSpaceFormat( lambda obs: np.int64(obs.events.agents_alive_done), lambda _: True, "agents_alive_done", _DISCRETE2_SPACE, ) events_collisions_space_format = StandardSpaceFormat( lambda obs: np.int64(len(obs.events.collisions) > 0), lambda _: True, "collisions", _DISCRETE2_SPACE, ) events_not_moving_space_format = StandardSpaceFormat( lambda obs: np.int64(obs.events.not_moving), lambda _: True, "not_moving", _DISCRETE2_SPACE, ) events_off_road_space_format = StandardSpaceFormat( lambda obs: np.int64(obs.events.off_road), lambda _: True, "off_road", _DISCRETE2_SPACE, ) events_off_route_space_format = StandardSpaceFormat( lambda obs: np.int64(obs.events.off_route), lambda _: True, "off_route", _DISCRETE2_SPACE, ) events_on_shoulder_space_format = StandardSpaceFormat( lambda obs: np.int64(obs.events.on_shoulder), lambda _: True, "on_shoulder", _DISCRETE2_SPACE, ) events_reached_goal_space_format = StandardSpaceFormat( lambda obs: np.int64(obs.events.reached_goal), lambda _: True, "reached_goal", _DISCRETE2_SPACE, ) events_reached_max_episode_steps_space_format = StandardSpaceFormat( lambda obs: np.int64(obs.events.reached_max_episode_steps), lambda _: True, "reached_max_episode_steps", _DISCRETE2_SPACE, ) events_wrong_way_space_format = StandardSpaceFormat( lambda obs: np.int64(obs.events.wrong_way), lambda _: True, "wrong_way", _DISCRETE2_SPACE, )
[docs]class DrivableAreaGridMapSpaceFormat(Image8BSpaceFormat): """Formats for `obs.drivable_area_grid_map`.""" def __init__(self, agent_interface: AgentInterface) -> None: super().__init__(dimensions=agent_interface.drivable_area_grid_map, layers=1)
[docs] def format(self, obs: Observation): return obs.drivable_area_grid_map.data.astype(np.uint8)
[docs] def active(self, agent_interface: AgentInterface) -> bool: return bool(agent_interface.drivable_area_grid_map)
@property def name(self): return "drivable_area_grid_map"
lidar_point_cloud_space_format = StandardConfigurableSpaceFormat( lambda obs, agent_interface: _format_lidar(obs.lidar_point_cloud), lambda agent_interface: bool(agent_interface.lidar_point_cloud), "lidar_point_cloud", # MTA TODO: add lidar configuration lambda _: gym.spaces.Dict( { "hit": gym.spaces.MultiBinary(_LIDAR_SHP), "point_cloud": gym.spaces.Box( low=-1e10, high=1e10, shape=(_LIDAR_SHP, 3), dtype=np.float64 ), "ray_origin": gym.spaces.Box( low=-1e10, high=1e10, shape=(_LIDAR_SHP, 3), dtype=np.float64 ), "ray_vector": gym.spaces.Box( low=-1e10, high=1e10, shape=(_LIDAR_SHP, 3), dtype=np.float64 ), } ), ) neighborhood_vehicle_states_space_format = StandardConfigurableSpaceFormat( lambda obs, agent_interface: _format_neighborhood_vehicle_states( obs.neighborhood_vehicle_states, agent_interface ), lambda agent_interface: bool(agent_interface.neighborhood_vehicle_states), "neighborhood_vehicle_states", _configure_neighborhood_vehicle_states_space, )
[docs]class OccupancyGridMapSpaceFormat(Image8BSpaceFormat): """Formats for `obs.occupancy_grid_map`.""" def __init__(self, agent_interface: AgentInterface) -> None: super().__init__(dimensions=agent_interface.occupancy_grid_map, layers=1)
[docs] def format(self, obs: Observation): return obs.occupancy_grid_map.data.astype(np.uint8)
[docs] def active(self, agent_interface: AgentInterface) -> bool: return bool(agent_interface.occupancy_grid_map)
@property def name(self): return "occupancy_grid_map"
[docs]class TopDownRGBSpaceFormat(Image8BSpaceFormat): """Formats for `obs.top_down_rgb`.""" def __init__(self, agent_interface: AgentInterface) -> None: super().__init__(dimensions=agent_interface.top_down_rgb, layers=3)
[docs] def format(self, obs: Observation): return obs.top_down_rgb.data.astype(np.uint8)
[docs] def active(self, agent_interface: AgentInterface) -> bool: return bool(agent_interface.top_down_rgb)
@property def name(self): return "top_down_rgb"
waypoint_paths_space_format = StandardSpaceFormat( lambda obs: _format_waypoint_paths(obs.waypoint_paths), lambda agent_interface: bool(agent_interface.waypoint_paths), "waypoint_paths", gym.spaces.Dict( { "heading": gym.spaces.Box( low=-math.pi, high=math.pi, shape=_WAYPOINT_SHP, dtype=np.float32 ), "lane_id": gym.spaces.Tuple( ( gym.spaces.Tuple( gym.spaces.Text( _WAYPOINT_NAME_LIMIT, charset=_WAYPOINT_CHAR_SET, ) for _ in range(_WAYPOINT_SHP[1]) ) for _ in range(_WAYPOINT_SHP[0]) ) ), "lane_index": gym.spaces.Box( low=0, high=127, shape=_WAYPOINT_SHP, dtype=np.int8 ), "lane_offset": gym.spaces.Box( low=0, high=1e10, shape=_WAYPOINT_SHP, dtype=np.float32 ), "lane_width": gym.spaces.Box( low=0, high=1e10, shape=_WAYPOINT_SHP, dtype=np.float32 ), "position": gym.spaces.Box( low=-1e10, high=1e10, shape=_WAYPOINT_SHP + (3,), dtype=np.float64 ), "speed_limit": gym.spaces.Box( low=0, high=1e10, shape=_WAYPOINT_SHP, dtype=np.float32 ), } ), ) signals_space_format = StandardSpaceFormat( lambda obs: _format_signals(obs.signals), lambda agent_interface: bool(agent_interface.signals), "signals", gym.spaces.Dict( { "state": gym.spaces.Box(low=0, high=32, shape=_SIGNALS_SHP, dtype=np.int8), "stop_point": gym.spaces.Box( low=-1e10, high=1e10, shape=_SIGNALS_SHP + (2,), dtype=np.float64 ), "last_changed": gym.spaces.Box( low=0, high=1e10, shape=_SIGNALS_SHP, dtype=np.float32 ), } ), ) enabled_space_format = StandardSpaceFormat( lambda _: np.int64(True), lambda _: True, "active", _DISCRETE2_SPACE, ) steps_completed_space_format = StandardSpaceFormat( lambda obs: np.int64(obs.steps_completed), lambda _: True, "steps_completed", _UNSIGNED_INT64_SPACE, ) ego_vehicle_state_space_format = StandardCompoundSpaceFormat( space_generators=[ # required ego_angular_velocity_space_format, ego_box_space_format, ego_heading_space_format, ego_lane_id_space_format, ego_lane_index_space_format, ego_linear_velocity_space_format, ego_position_space_format, ego_speed_space_format, ego_steering_space_format, ego_yaw_rate_space_format, mission_space_format, # optional ego_angular_acceleration_space_format, ego_angular_jerk_space_format, ego_lane_position_format, ego_linear_acceleration_space_format, ego_linear_jerk_space_format, ], active_func=lambda _: True, name="ego_vehicle_state", ) events_space_format = StandardCompoundSpaceFormat( space_generators=[ events_interest_done_space_format, events_agents_alive_done_space_format, events_collisions_space_format, events_not_moving_space_format, events_off_road_space_format, events_off_route_space_format, events_on_shoulder_space_format, events_reached_goal_space_format, events_reached_max_episode_steps_space_format, events_wrong_way_space_format, ], active_func=lambda _: True, name="events", ) observation_space_format = StandardCompoundSpaceFormat( space_generators=[ enabled_space_format, steps_completed_space_format, distance_travelled_space_format, ego_vehicle_state_space_format, events_space_format, DrivableAreaGridMapSpaceFormat, lidar_point_cloud_space_format, neighborhood_vehicle_states_space_format, OccupancyGridMapSpaceFormat, TopDownRGBSpaceFormat, waypoint_paths_space_format, signals_space_format, ], active_func=lambda _: True, name="observation", )
[docs]class ObservationOptions(IntEnum): """Defines the options for how the formatting matches the observation space.""" multi_agent = 0 """Observation partially matches observation space. Only active agents are included.""" full = 1 """Observation fully matches observation space. Inactive and active agents are included.""" unformatted = 2 """Observation is the original unformatted observation. The observation will not match the observation space.""" default = multi_agent """Defaults to :attr:`multi_agent`."""
[docs]class ObservationSpacesFormatter: """Formats a smarts observation to fixed sized object. Observations in numpy array format, suitable for vectorized processing. For each agent id:: obs = dict({ If the agent is active. "active": 1 if agent is active in smarts, else 0 Total distance travelled in meters. "distance_travelled": np.float32 The number of steps taken by the agent "steps_completed": np.float32 Ego vehicle state, with the following attributes. "ego_vehicle_state": dict({ "angular_acceleration": Angular acceleration vector. Requires `accelerometer` attribute enabled in AgentInterface, else absent. shape=(3,). dtype=np.float32. "angular_jerk": Angular jerk vector. Requires `accelerometer` attribute enabled in AgentInterface, else absent. shape=(3,). dtype=np.float32. "angular_velocity": Angular velocity vector. shape=(3,). dtype=np.float32). "box": Length, width, and height of the vehicle bounding box. shape=(3,). dtype=np.float32. "heading": Vehicle heading in radians [-pi, pi]. dtype=np.float32. "lane_id": The ID of the lane that the vehicle is on. "lane_index": Vehicle's lane number. Rightmost lane has index 0 and increases towards left. dtype=np.int8. "linear_acceleration": Vehicle acceleration in x, y, and z axes. Requires `accelerometer` attribute enabled in AgentInterface, else absent. shape=(3,). dtype=np.float32. "linear_jerk": Linear jerk vector. Requires `accelerometer` attribute enabled in AgentInterface, else absent. shape=(3,). dtype=np.float32. "linear_velocity": Vehicle velocity in x, y, and z axes. shape=(3,). dtype=np.float32. "position": Coordinate of the center of the vehicle bounding box's bottom plane. shape=(3,). dtype=np.float64. "speed": Vehicle speed in m/s. dtype=np.float32. "steering": Angle of front wheels in radians [-pi, pi]. dtype=np.float32. "yaw_rate": Rotation speed around vertical axis in rad/s [0, 2pi]. dtype=np.float32. "lane_position": A reference line coordinate. Coordinates are s, t, and h relating to lane offset along lane, horizontal displacement and surface displacement. shape=(3,). dtype=np.float64 )} A dictionary of event markers. "events": dict({ "interest_done": 1 if `DoneCriteria.interest` is triggered, else 0. "agents_alive_done": 1 if `DoneCriteria.agents_alive` is triggered, else 0. "collisions": 1 if any collisions occurred with ego vehicle, else 0. "not_moving": 1 if `DoneCriteria.not_moving` is triggered, else 0. "off_road": 1 if ego vehicle drives off road, else 0. "off_route": 1 if ego vehicle drives off mission route, else 0. "on_shoulder": 1 if ego vehicle drives on road shoulder, else 0. "reached_goal": 1 if ego vehicle reaches its goal, else 0. "reached_max_episode_steps": 1 if maximum episode steps reached, else 0. "wrong_way": 1 if ego vehicle drives in the wrong traffic direction, else 0. }) Drivable area grid map. Map is binary, with 255 if a cell contains a road, else 0. dtype=np.uint8. "drivable_area_grid_map": np.ndarray Lidar point cloud, with the following attributes. "lidar_point_cloud": dict({ "hit": Binary array. 1 if an object is hit, else 0. shape(300,). "point_cloud": Coordinates of lidar point cloud. shape=(300,3). dtype=np.float64. "ray_origin": Ray origin coordinates. shape=(300,3). dtype=np.float64. "ray_vector": Ray vectors. shape=(300,3). dtype=np.float64. }) Mission details for the ego agent. "mission": dict({ "goal_position": Achieve goal by reaching the end position. Defaults to np.array([0,0,0]) for no mission. shape=(3,). dtype=np.float64. }) Feature array of 10 nearest neighborhood vehicles. If nearest neighbor vehicles are insufficient, default feature values are padded. "neighborhood_vehicle_states": dict({ "box": Bounding box of neighbor vehicles. Defaults to np.array([.0,.0,.0]) per vehicle. shape=(10,3). dtype=np.float32. "heading": Heading of neighbor vehicles in radians [-pi, pi]. Defaults to np.array([0]) per vehicle. shape=(10,). dtype=np.float32. "id": The vehicle ids of neighbor vehicles. Defaults to str("") per vehicle. shape=(10,Text(50)) "interest": If the vehicles are of interest. Defaults to np.array([False]) per vehicle. shape=(10,). dtype=np.int8 "lane_id": The ID of the lane that the vehicle is on. Defaults to str("") per vehicle. shape=(10,Text(50)) "lane_index": Lane number of neighbor vehicles. Defaults to np.array([0]) per vehicle. shape=(10,). dtype=np.int8. "position": Coordinate of the center of neighbor vehicles' bounding box's bottom plane. Defaults to np.array([.0,.0,.0]) per vehicle. shape=(10,3). dtype=np.float64. "speed": Speed of neighbor vehicles in m/s. Defaults to np.array([0]) per vehicle. shape=(10,). dtype=np.float32. "lane_position": A reference line coordinates. Coordinates are s, t, and h relating to offset along lane, horizontal displacement, and surface displacement. Defaults to np.array([.0,.0,.0]) per vehicle. shape=(10,3). dtype=np.float64 }) Occupancy grid map. Map is binary, with 255 if a cell is occupied, else 0. dtype=np.uint8. "occupancy_grid_map": np.ndarray RGB image, from the top view, with ego vehicle at the center. shape=(height, width, 3). dtype=np.uint8. "top_down_rgb": np.ndarray Feature array of 20 waypoints ahead or in the mission route, from the nearest 4 lanes. If lanes or waypoints ahead are insufficient, default values are padded. "waypoint_paths": dict({ "heading": Lane heading angle at a waypoint in radians [-pi, pi]. Defaults to np.array([0]) per waypoint. shape=(4,20). dtype=np.float32. "lane_id": The closest lane id for the waypoint. Defaults to str("") per waypoint. shape=(4,20,Text(50)) "lane_index": Lane number at a waypoint. Defaults to np.array([0]) per waypoint. shape=(4,20). dtype=np.int8. "lane_width": Lane width at a waypoint in meters. Defaults to np.array([0]) per waypoint. shape=(4,20). dtype=np.float32. "position": Coordinate of a waypoint. Defaults to np.array([0,0,0]). shape=(4,20,3). dtype=np.float64. "speed_limit": Lane speed limit at a waypoint in m/s. shape=(4,20). dtype=np.float32. }) Feature array of 3 upcoming signals. If there aren't this many signals ahead, default values are padded. "signals": dict({ "state": The state of the traffic signal. See smarts.core.signal_provider.SignalLightState for interpretation. Defaults to np.array([0]) per signal. shape=(3,), dtype=np.int8. "stop_point": The stopping point for traffic controlled by the signal, i.e., the point where actors should stop when the signal is in a stop state. Defaults to np.array([0, 0]) per signal. shape=(3,2), dtype=np.float64. "last_changed": If known, the simulation time this signal last changed its state. Defaults to np.array([0]) per signal. shape=(3,), dtype=np.float32. }) }) """ def __init__( self, agent_interfaces: Dict[str, AgentInterface], observation_options: ObservationOptions, ) -> None: self._space_formats = { agent_id: observation_space_format(agent_interface) for agent_id, agent_interface in agent_interfaces.items() } self.observation_options = observation_options super().__init__()
[docs] def format(self, observations: Dict[str, Observation]): """Formats smarts observations fixed sized containers.""" if self.observation_options == ObservationOptions.unformatted: return observations # TODO MTA: Parallelize the conversion if possible active_obs = { agent_id: self._space_formats[agent_id].format(obs) for agent_id, obs in observations.items() } out_obs = active_obs if self.observation_options == ObservationOptions.full: missing_ids = set(self._space_formats.keys()) - set(active_obs.keys()) padded_obs = { agent_id: space_format.space.sample() for agent_id, space_format in self._space_formats.items() if agent_id in missing_ids } for obs in padded_obs.values(): obs["active"] = np.int64(False) out_obs.update(padded_obs) return out_obs
@cached_property def space(self): """The observation space this should format the smarts observations to match.""" if self.observation_options is ObservationOptions.unformatted: return None return gym.spaces.Dict( { agent_id: space_format.space for agent_id, space_format in self._space_formats.items() } )