# MIT License
#
# Copyright (C) 2022. Huawei Technologies Co., Ltd. All rights reserved.
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NON-INFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
from __future__ import annotations
import logging
from collections import Counter
from typing import (
TYPE_CHECKING,
Collection,
Dict,
Iterable,
Iterator,
List,
Optional,
Set,
Tuple,
)
from smarts.core import config
from smarts.core.observations import Observation
from smarts.core.sensors import SensorResolver, Sensors, SensorState
from smarts.core.sensors.local_sensor_resolver import LocalSensorResolver
from smarts.core.sensors.parallel_sensor_resolver import ParallelSensorResolver
from smarts.core.simulation_frame import SimulationFrame
from smarts.core.simulation_local_constants import SimulationLocalConstants
if TYPE_CHECKING:
from smarts.core.agent_interface import AgentInterface
from smarts.core.renderer_base import RendererBase
from smarts.core.sensor import Sensor
from smarts.core.utils.pybullet import bullet_client as bc
from smarts.core.vehicle import Vehicle
[docs]class SensorManager:
"""A sensor management system that associates actors with sensors."""
def __init__(self):
self._logger = logging.getLogger(self.__class__.__name__)
self._sensors: Dict[str, Sensor] = {}
# {actor_id: <SensorState>}
self._sensor_states: Dict[str, SensorState] = {}
# {actor_id: {sensor_id, ...}}
self._sensors_by_actor_id: Dict[str, Set[str]] = {}
self._actors_by_sensor_id: Dict[str, Set[str]] = {}
self._sensor_references = Counter()
# {sensor_id, ...}
self._scheduled_sensors: List[Tuple[str, Sensor]] = []
observation_workers = config()(
"core", "observation_workers", default=0, cast=int
)
parallel_resolver = ParallelSensorResolver
if (backing := config()("core", "sensor_parallelization")) == "ray":
try:
import ray
from smarts.ray.sensors.ray_sensor_resolver import RaySensorResolver
parallel_resolver = RaySensorResolver
except ImportError:
pass
elif backing == "mp":
pass
else:
raise LookupError(
f"SMARTS_CORE_SENSOR_PARALLELIZATION={backing} is not a valid option."
)
self._sensor_resolver: SensorResolver = (
parallel_resolver() if observation_workers > 0 else LocalSensorResolver()
)
[docs] def step(self, sim_frame: SimulationFrame, renderer: RendererBase):
"""Update sensor values based on the new simulation state."""
self._sensor_resolver.step(sim_frame, self._sensor_states.values())
for sensor in self._sensors.values():
sensor.step(sim_frame=sim_frame, renderer=renderer)
[docs] def observe(
self,
sim_frame: SimulationFrame,
sim_local_constants: SimulationLocalConstants,
agent_ids: Set[str],
renderer_ref: RendererBase,
physics_ref: bc.BulletClient,
) -> Tuple[Dict[str, Observation], Dict[str, bool]]:
"""Runs observations and updates the sensor states.
Args:
sim_frame (SimulationFrame):
The current state from the simulation.
sim_local_constants (SimulationLocalConstants):
The values that should stay the same for a simulation over a reset.
agent_ids (Set[str]):
The agent ids to process.
renderer_ref (RendererBase):
The renderer (if any) that should be used.
physics_ref:
The physics client.
"""
observations, dones, updated_sensors = self._sensor_resolver.observe(
sim_frame,
sim_local_constants,
agent_ids,
renderer_ref,
physics_ref,
)
for actor_id, sensors in updated_sensors.items():
for sensor_name, sensor in sensors.items():
self._sensors[
SensorManager._actor_and_sensor_name_to_sensor_id(
sensor_name, actor_id
)
] = sensor
return observations, dones
[docs] def observe_batch(
self,
sim_frame: SimulationFrame,
sim_local_constants: SimulationLocalConstants,
interface: AgentInterface,
sensor_states: Dict[str, SensorState],
vehicles: Dict[str, Vehicle],
renderer: RendererBase,
bullet_client: bc.BulletClient,
) -> Tuple[Dict[str, Observation], Dict[str, bool]]:
"""Operates all sensors on a batch of vehicles for a single agent."""
# TODO: Replace this with a more efficient implementation that _actually_
# does batching
assert sensor_states.keys() == vehicles.keys()
observations, dones = {}, {}
for vehicle_id, vehicle in vehicles.items():
sensor_state = sensor_states[vehicle_id]
(
observations[vehicle_id],
dones[vehicle_id],
updated_sensors,
) = Sensors.observe_vehicle(
sim_frame,
sim_local_constants,
interface,
sensor_state,
vehicle,
renderer,
bullet_client,
)
for sensor_name, sensor in updated_sensors.items():
self._sensors[
SensorManager._actor_and_sensor_name_to_sensor_id(
sensor_name, vehicle_id
)
] = sensor
return observations, dones
[docs] def teardown(self, renderer: RendererBase):
"""Tear down the current sensors and clean up any internal resources."""
self._logger.info("++ Sensors and sensor states reset. ++")
for sensor in self._sensors.values():
sensor.teardown(renderer=renderer)
self._sensors = {}
self._sensor_states = {}
self._sensors_by_actor_id = {}
self._sensor_references.clear()
self._scheduled_sensors.clear()
[docs] def add_sensor_state(self, actor_id: str, sensor_state: SensorState):
"""Add a sensor state associated with a given actor."""
self._logger.debug("Sensor state added for actor '%s'.", actor_id)
self._sensor_states[actor_id] = sensor_state
[docs] def remove_sensor_state_by_actor_id(self, actor_id: str):
"""Add a sensor state associated with a given actor."""
self._logger.debug("Sensor state removed for actor '%s'.", actor_id)
return self._sensor_states.pop(actor_id, None)
[docs] def remove_actor_sensors_by_actor_id(
self, actor_id: str, schedule_teardown: bool = True
) -> Iterable[Tuple[Sensor, int]]:
"""Remove association of an actor to sensors. If the sensor is no longer associated an actor, the
sensor is scheduled to be removed."""
sensor_states = self._sensor_states.get(actor_id)
if not sensor_states:
self._logger.warning(
"Attempted to remove sensors from actor with no sensors: '%s'.",
actor_id,
)
return []
self.remove_sensor_state_by_actor_id(actor_id)
sensors_by_actor = self._sensors_by_actor_id.get(actor_id)
if not sensors_by_actor:
return []
self._logger.debug("Target sensor removal for actor '%s'.", actor_id)
discarded_sensors = []
for sensor_id in sensors_by_actor:
self._actors_by_sensor_id[sensor_id].remove(actor_id)
self._sensor_references.subtract([sensor_id])
references = self._sensor_references[sensor_id]
discarded_sensors.append((self._sensors[sensor_id], references))
if references < 1:
self._disassociate_sensor(sensor_id, schedule_teardown)
del self._sensors_by_actor_id[actor_id]
return discarded_sensors
[docs] def remove_sensor(
self, sensor_id: str, schedule_teardown: bool = False
) -> Optional[Sensor]:
"""Remove a sensor by its id. Removes any associations it has with actors."""
self._logger.debug("Target removal of sensor '%s'.", sensor_id)
sensor = self._sensors.get(sensor_id)
if not sensor:
return None
self._disassociate_sensor(sensor_id, schedule_teardown)
return sensor
def _disassociate_sensor(self, sensor_id: str, schedule_teardown: bool):
if schedule_teardown:
self._scheduled_sensors.append((sensor_id, self._sensors[sensor_id]))
self._logger.info("Sensor '%s' removed from manager.", sensor_id)
del self._sensors[sensor_id]
del self._sensor_references[sensor_id]
## clean up any remaining references by actors
if sensor_id in self._actors_by_sensor_id:
for actor_id in self._actors_by_sensor_id[sensor_id]:
if sensors_ids := self._sensors_by_actor_id[actor_id]:
sensors_ids.remove(sensor_id)
del self._actors_by_sensor_id[sensor_id]
[docs] def sensor_state_exists(self, actor_id: str) -> bool:
"""Determines if a actor has a sensor state associated with it."""
return actor_id in self._sensor_states
[docs] def sensor_states_items(self) -> Iterator[Tuple[str, SensorState]]:
"""Gets all actor to sensor state associations."""
return map(lambda x: x, self._sensor_states.items())
[docs] def sensors_for_actor_id(self, actor_id: str) -> List[Sensor]:
"""Gets all sensors associated with the given actor."""
return [
self._sensors[s_id]
for s_id in self._sensors_by_actor_id.get(actor_id, set())
]
[docs] def sensors_for_actor_ids(
self, actor_ids: Set[str]
) -> Dict[str, Dict[str, Sensor]]:
"""Gets all sensors for the given actors."""
return {
actor_id: {
SensorManager._actor_sid_to_sname(s_id): self._sensors[s_id]
for s_id in self._sensors_by_actor_id.get(actor_id, set())
}
for actor_id in actor_ids
}
[docs] def sensor_state_for_actor_id(self, actor_id: str) -> Optional[SensorState]:
"""Gets the sensor state for the given actor."""
return self._sensor_states.get(actor_id)
@staticmethod
def _actor_sid_to_sname(sensor_id: str) -> str:
return sensor_id.partition("-")[0]
@staticmethod
def _actor_and_sensor_name_to_sensor_id(sensor_name: str, actor_id: str) -> str:
return f"{sensor_name}-{actor_id}"
[docs] def add_sensor_for_actor(self, actor_id: str, name: str, sensor: Sensor) -> str:
"""Adds a sensor association for a specific actor."""
# TAI: Allow multiple sensors of the same type on the same actor
s_id = SensorManager._actor_and_sensor_name_to_sensor_id(name, actor_id)
actor_sensors = self._sensors_by_actor_id.setdefault(actor_id, set())
if s_id in actor_sensors:
self._logger.warning(
"Duplicate sensor attempted to add to actor `%s`: `%s`", actor_id, s_id
)
return s_id
actor_sensors.add(s_id)
actors = self._actors_by_sensor_id.setdefault(s_id, set())
actors.add(actor_id)
return self.add_sensor(s_id, sensor)
[docs] def add_sensor(self, sensor_id, sensor: Sensor) -> str:
"""Adds a sensor to the sensor manager."""
self._logger.info("Added sensor '%s' to sensor manager.", sensor_id)
assert sensor_id not in self._sensors
self._sensors[sensor_id] = sensor
self._sensor_references.update([sensor_id])
return sensor_id
[docs] def clean_up_sensors_for_actors(
self, current_actor_ids: Set[str], renderer: RendererBase
):
"""Cleans up sensors that are attached to non-existing actors."""
# This is not good enough by itself since actors can keep alive sensors that are not in use by an agent
old_actor_ids = set(self._sensor_states)
missing_actors = old_actor_ids - current_actor_ids
for aid in missing_actors:
self.remove_actor_sensors_by_actor_id(aid)
for sensor_id, sensor in self._scheduled_sensors:
self._logger.info("Sensor '%s' destroyed.", sensor_id)
sensor.teardown(renderer=renderer)
self._scheduled_sensors.clear()