Source code for smarts.core.bubble_manager

# Copyright (C) 2020. Huawei Technologies Co., Ltd. All rights reserved.
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NON-INFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
import logging
import math
from collections import defaultdict
from copy import deepcopy
from dataclasses import dataclass
from enum import Enum
from functools import lru_cache
from sys import maxsize
from typing import Dict, FrozenSet, List, Optional, Sequence, Set, Tuple, Union

from shapely.affinity import rotate, translate
from shapely.geometry import CAP_STYLE, JOIN_STYLE, Point, Polygon

from smarts.core.actor_capture_manager import ActorCaptureManager
from smarts.core.condition_state import ConditionState
from smarts.core.data_model import SocialAgent
from smarts.core.plan import (
    EndlessGoal,
    NavigationMission,
    Plan,
    PlanningError,
    PositionalGoal,
    Start,
)
from smarts.core.road_map import RoadMap
from smarts.core.utils.cache import cache, clear_cache
from smarts.core.utils.id import SocialAgentId
from smarts.core.utils.strings import truncate
from smarts.core.vehicle import Vehicle
from smarts.core.vehicle_index import VehicleIndex
from smarts.sstudio.sstypes import BoidAgentActor
from smarts.sstudio.sstypes import Bubble as SSBubble
from smarts.sstudio.sstypes import BubbleLimits, SocialAgentActor
from smarts.sstudio.sstypes.actor.traffic_engine_actor import TrafficEngineActor
from smarts.sstudio.sstypes.condition import Condition
from smarts.zoo.registry import make as make_social_agent


[docs]class BubbleTransition(Enum): """Describes a bubble transition state.""" # --> [ AirlockEntered [ Entered ] Exited ] AirlockExited --> AirlockEntered = 0 # --> (Airlock) Entered = 1 # Airlock --> (Bubble) Exited = 2 # Bubble --> (Airlock) AirlockExited = 3 # Airlock -->
[docs]class BubbleRelationState(Enum): """Describes the actor's spatial occupancy in a bubble.""" InBubble = 0 InAirlock = 1 WasInBubble = 2
[docs]class BubbleCaptureState(Enum): """Describes the actor's capture state in the bubble.""" Controlled = 0 Captured = 1 Uncaptured = 2
[docs]class Bubble: """Wrapper around `sstudio.sstypes.Bubble` to cache bubble geometry (and does some internal Bubble-related business logic). """ def __init__(self, bubble: SSBubble, road_map: RoadMap): geometry = bubble.zone.to_geometry(road_map) self.centroid = [geometry.centroid.x, geometry.centroid.y] bounding = geometry.bounds self.radius = math.sqrt( pow(bounding[2] - self.centroid[0], 2) + pow(bounding[1] - self.centroid[1], 2) ) bubble_limit = ( bubble.limit or BubbleLimits() if bubble.limit is None or isinstance(bubble.limit, BubbleLimits) else BubbleLimits(bubble.limit.hijack_limit, bubble.limit.shadow_limit + 1) ) if isinstance(bubble.actor, BoidAgentActor): def safe_min(a, b): return min(a or maxsize, b or maxsize) if bubble.limit is None: bubble_limit = bubble.actor.capacity elif bubble.actor.capacity is not None: hijack_limit = safe_min( bubble.limit.hijack_limit, bubble.actor.capacity ) shadow_limit = safe_min( bubble.limit.shadow_limit, bubble.actor.capacity.shadow_limit ) bubble_limit = BubbleLimits(hijack_limit, shadow_limit) self._bubble_heading = 0.0 self._bubble = bubble self._limit = bubble_limit self._cached_inner_geometry = geometry self._exclusion_prefixes = bubble.exclusion_prefixes self._airlock_condition = bubble.airlock_condition self._active_condition = bubble.active_condition self._cached_airlock_geometry = self._cached_inner_geometry.buffer( bubble.margin, cap_style=CAP_STYLE.square, join_style=JOIN_STYLE.mitre, ) @property def exclusion_prefixes(self): """The blacklist of actor prefixes, used to ignore specific actors.""" return self._exclusion_prefixes @property def id(self): """The id of the underlying bubble.""" return self._bubble.id @property def actor(self) -> Union[SocialAgentActor, TrafficEngineActor]: """The actor that should replace the captured actor.""" return self._bubble.actor @property def follow_actor_id(self) -> str: """A target actor that the bubble should remain at a fixed offset from.""" return self._bubble.follow_actor_id @property def follow_vehicle_id(self) -> str: """A target vehicle that the bubble should remain at a fixed offset from.""" return self._bubble.follow_vehicle_id @property def limit(self): """The maximum number of actors that the bubble can have captured at the same time.""" return self._limit @property def is_boid(self): """If the actors captured by the bubble should be controlled by a boid agent.""" return self._bubble.is_boid @property def keep_alive(self): """If enabled, the social agent actor will be spawned upon first vehicle airlock and be reused for every subsequent vehicle entering the bubble until the episode is over. """ return self._bubble.keep_alive @property def airlock_condition(self) -> Condition: """Conditions under which this bubble will accept an agent.""" return self._airlock_condition @property def active_condition(self) -> Condition: """Fast inclusions for the bubble.""" return self._active_condition # XXX: In the case of traveling bubbles, the geometry and zone are moving # according to the follow vehicle. @property def geometry(self) -> Polygon: """The geometry of the managed bubble.""" return self._cached_inner_geometry @property def airlock_geometry(self) -> Polygon: """The airlock geometry of the managed bubble.""" return self._cached_airlock_geometry
[docs] def condition_passes( self, active_condition_requirements, ): """If the broadphase condition allows for this""" return ConditionState.TRUE in self.active_condition.evaluate( **active_condition_requirements )
[docs] def admissibility( self, vehicle_id: str, index: VehicleIndex, vehicle_ids_in_bubbles: Dict["Bubble", Set[str]], running_cursors: Set["Cursor"], ): """The vehicle_id we are querying for and the `other_vehicle_ids` _presently in this :class:`~smarts.sstudio.sstypes.bubble.Bubble`. """ for prefix in self.exclusion_prefixes: if vehicle_id.startswith(prefix): return False, False hijackable, shadowable = True, True if self._limit is not None: # Already hijacked (according to VehicleIndex) + to be hijacked (running cursors) current_hijacked_vehicle_ids = { v_id for v_id in vehicle_ids_in_bubbles[self] if index.vehicle_is_hijacked(v_id) } current_shadowed_vehicle_ids = { v_id for v_id in vehicle_ids_in_bubbles[self] if index.vehicle_is_shadowed(v_id) } vehicle_ids_by_bubble_state = ( BubbleManager._vehicle_ids_divided_by_bubble_state( frozenset(running_cursors) ) ) all_hijacked_vehicle_ids = ( current_hijacked_vehicle_ids | vehicle_ids_by_bubble_state[BubbleRelationState.InAirlock][self] ) - {vehicle_id} all_shadowed_vehicle_ids = ( current_shadowed_vehicle_ids | vehicle_ids_by_bubble_state[BubbleRelationState.InBubble][self] ) - {vehicle_id} hijackable = len(all_hijacked_vehicle_ids) < ( self._limit.hijack_limit or maxsize ) shadowable = len(all_shadowed_vehicle_ids) + len( all_hijacked_vehicle_ids ) < (self._limit.shadow_limit or maxsize) return hijackable, shadowable
[docs] def in_bubble_or_airlock_zone(self, position: Point): """Test if the position is within the bubble or airlock around the bubble.""" if not isinstance(position, Point): position = Point(position) in_airlock = position.within(self._cached_airlock_geometry) if not in_airlock: return False, False in_bubble = position.within(self._cached_inner_geometry) return in_bubble, in_airlock and not in_bubble
@property def is_traveling(self): """If the bubble is following an actor.""" return ( self._bubble.follow_actor_id is not None or self._bubble.follow_vehicle_id is not None )
[docs] def move_to_follow_vehicle(self, vehicle: Vehicle): """Move the bubble to a pose relative to the given vehicle.""" if not vehicle.valid: return x, y, _ = vehicle.position def _transform(geom): centroid = geom.centroid # Bring back to origin geom = translate(geom, xoff=-centroid.x, yoff=-centroid.y) geom = rotate(geom, -self._bubble_heading, "centroid", use_radians=True) # Now apply new transformation in "vehicle coordinate space" geom = translate( geom, xoff=self._bubble.follow_offset[0], yoff=self._bubble.follow_offset[1], ) geom = rotate(geom, vehicle.heading, (0, 0), use_radians=True) geom = translate(geom, xoff=x, yoff=y) return geom self._cached_inner_geometry = _transform(self._cached_inner_geometry) self._cached_airlock_geometry = _transform(self._cached_airlock_geometry) self.centroid = [ self._cached_inner_geometry.centroid.x, self._cached_inner_geometry.centroid.y, ] self._bubble_heading = vehicle.heading
def __repr__(self): return f"""Bubble( id={self.id}, traveling={self.is_traveling}, actor={self.actor}, follow_actor_id={self.follow_actor_id}, limit={self.limit}, geometry={self.geometry}, airlock_geometry={self.airlock_geometry}, follow_vehicle_id={self.follow_vehicle_id}, )""" def __hash__(self) -> int: return hash(self.id)
[docs]@dataclass(frozen=True) class Cursor: """Tracks an actor through an airlock or a bubble.""" # We would always want to have the vehicle go through the airlock zone. This may # not be the case if we spawn a vehicle in a bubble, but that wouldn't be ideal. vehicle_id: str state: Optional[BubbleRelationState] = None capture: Optional[BubbleCaptureState] = None transition: Optional[BubbleTransition] = None bubble: Optional[Bubble] = None
[docs] @staticmethod def for_removed( vehicle_id: str, bubble: Bubble, index: VehicleIndex, vehicle_ids_per_bubble: Dict[Bubble, Set[str]], ) -> "Cursor": """Generate a cursor for an inactive bubble. Args: vehicle (Vehicle): The vehicle that is to be tracked. bubble (Bubble): The bubble that the vehicle is interacting with. index (VehicleIndex): The vehicle index the vehicle is in. vehicle_ids_per_bubble (Dict[Bubble, Set[str]]): Bubbles associated with vehicle ids. running_cursors (Set["Cursor"]): A set of existing cursors. """ was_in_this_bubble = vehicle_id in vehicle_ids_per_bubble[bubble] is_hijacked, is_shadowed = index.vehicle_is_hijacked_or_shadowed(vehicle_id) transition = None if was_in_this_bubble and (is_shadowed or is_hijacked): transition = BubbleTransition.AirlockExited return Cursor( vehicle_id=vehicle_id, transition=transition, state=BubbleRelationState.WasInBubble, capture=BubbleCaptureState.Uncaptured, bubble=bubble, )
[docs] @staticmethod def from_pos( position: Point, vehicle_id: str, bubble: Bubble, index: VehicleIndex, vehicle_ids_per_bubble: Dict[Bubble, Set[str]], running_cursors: Set["Cursor"], previous_cursor: Optional["Cursor"], is_hijack_admissible, is_airlock_admissible, ) -> "Cursor": """Generate a cursor. Args: position (Point): The shapely position of the vehicle. vehicle (Vehicle): The vehicle that is to be tracked. bubble (Bubble): The bubble that the vehicle is interacting with. index (VehicleIndex): The vehicle index the vehicle is in. vehicle_ids_per_bubble (Dict[Bubble, Set[str]]): Bubbles associated with vehicle ids. running_cursors (Set["Cursor"]): A set of existing cursors. """ in_bubble_zone, in_airlock_zone = bubble.in_bubble_or_airlock_zone(position) is_social = vehicle_id in index.social_vehicle_ids() was_in_this_bubble = vehicle_id in vehicle_ids_per_bubble[bubble] previous_capture = ( previous_cursor.capture if previous_cursor is not None else BubbleCaptureState.Uncaptured ) # XXX: When a traveling bubble disappears and an agent is airlocked or # hijacked. It remains in that state. # TODO: Depending on step size, we could potentially skip transitions (e.g. # go straight to relinquish w/o hijacking first). This may be solved by # time-based airlocking. For robust code we'll want to handle these # scenarios (e.g. hijacking if didn't airlock first) transition = None capture = previous_capture if ( is_social and previous_capture is BubbleCaptureState.Uncaptured and is_airlock_admissible and (in_airlock_zone or in_bubble_zone) ): # In this case a vehicle has just entered the airlock transition = BubbleTransition.AirlockEntered capture = BubbleCaptureState.Captured elif ( previous_capture is BubbleCaptureState.Captured and is_hijack_admissible and in_bubble_zone ): # In this case a vehicle has just entered the bubble transition = BubbleTransition.Entered capture = BubbleCaptureState.Controlled elif ( was_in_this_bubble and previous_capture is BubbleCaptureState.Controlled and in_airlock_zone ): # XXX: This may get called repeatedly because we don't actually change # any state when this happens. # In this case a vehicle has just exited the bubble transition = BubbleTransition.Exited elif ( was_in_this_bubble and previous_capture in {BubbleCaptureState.Controlled, BubbleCaptureState.Captured} and not (in_airlock_zone or in_bubble_zone) ): # In this case a vehicle has just exited the airlock around the bubble transition = BubbleTransition.AirlockExited capture = BubbleCaptureState.Uncaptured state = None if in_bubble_zone: state = BubbleRelationState.InBubble elif in_airlock_zone: state = BubbleRelationState.InAirlock return Cursor( vehicle_id=vehicle_id, transition=transition, capture=capture, state=state, bubble=bubble, )
def __repr__(self): return f"Cursor(state={self.state}, transition={self.transition}, vehicle_id={self.vehicle_id})" def __hash__(self) -> int: return hash((self.vehicle_id, self.state, self.transition, self.bubble.id))
[docs]class BubbleManager(ActorCaptureManager): """Manages bubble interactions.""" def __init__(self, bubbles: Sequence[SSBubble], road_map: RoadMap): self._log = logging.getLogger(self.__class__.__name__) self._cursors: Set[Cursor] = set() self._last_vehicle_index = VehicleIndex.identity() self._bubbles = [Bubble(b, road_map) for b in bubbles] self._active_bubbles: Sequence[Bubble] = [] @property def active_bubbles(self) -> Sequence[Bubble]: """A sequence of currently active bubbles.""" return self._active_bubbles @cache def _bubble_groups(self, sim) -> Tuple[List[Bubble], List[Bubble]]: # Filter out traveling bubbles that are missing their follow vehicle def is_active(bubble: Bubble) -> bool: active_condition_requirements = { **self._gen_simulation_condition_kwargs( sim, bubble.active_condition.requires ), **self._gen_mission_condition_kwargs( bubble.actor.name, None, bubble.active_condition.requires ), } if not bubble.condition_passes(active_condition_requirements): return False if not bubble.is_traveling: return True vehicle = None if bubble.follow_actor_id is not None: vehicles = self._last_vehicle_index.vehicles_by_owner_id( bubble.follow_actor_id ) vehicle = vehicles[0] if len(vehicles) else None if bubble.follow_vehicle_id is not None: vehicle = self._last_vehicle_index.vehicle_by_id( bubble.follow_vehicle_id, None ) return vehicle is not None active_bubbles = [] inactive_bubbles = [] for bubble in self._bubbles: if is_active(bubble): active_bubbles.append(bubble) else: inactive_bubbles.append(bubble) return active_bubbles, inactive_bubbles @staticmethod @lru_cache(maxsize=2) def _vehicle_ids_divided_by_bubble_state( cursors: FrozenSet[Cursor], ) -> Dict[BubbleRelationState, Dict[Bubble, Set[str]]]: vehicle_ids_grouped_by_cursor = defaultdict(lambda: defaultdict(set)) for cursor in cursors: vehicle_ids_grouped_by_cursor[cursor.state][cursor.bubble].add( cursor.vehicle_id ) return vehicle_ids_grouped_by_cursor
[docs] def vehicle_ids_per_bubble( self, ) -> Dict[Bubble, Set[str]]: """Bubbles associated with the vehicles they contain.""" vid = self._vehicle_ids_divided_by_bubble_state(frozenset(self._cursors)) return defaultdict( set, {**vid[BubbleRelationState.InBubble], **vid[BubbleRelationState.InAirlock]}, )
[docs] def agent_ids_for_bubble(self, bubble: Bubble, sim) -> Set[str]: """Agents generated by this bubble.""" bubble_cursors = set(filter(lambda c: c.bubble == bubble, self._cursors)) agent_ids = set() for bc in bubble_cursors: if bc.state != BubbleRelationState.InBubble: continue agent_id = sim.vehicle_index.owner_id_from_vehicle_id(bc.vehicle_id) if agent_id is not None: agent_ids.add(agent_id) return agent_ids
[docs] @clear_cache def step(self, sim): """Update the associations between bubbles, actors, and agents""" self._active_bubbles, _ = self._bubble_groups(sim) self._move_traveling_bubbles(sim) self._cursors = self._sync_cursors( self._last_vehicle_index, sim.vehicle_index, sim ) self._handle_transitions(sim, self._cursors) self._last_vehicle_index = deepcopy(sim.vehicle_index)
def _sync_cursors(self, last_vehicle_index, vehicle_index, sim): # TODO: Not handling newly added vehicles means we require an additional step # before we trigger hijacking. # Newly added vehicles # add_index = vehicle_index - last_vehicle_index # TODO: Not handling deleted vehicles at this point should be fine because we're # stateless. # Recently terminated vehicles # del_index = last_vehicle_index - vehicle_index # Vehicles that stuck around persisted_vehicle_index = vehicle_index & last_vehicle_index # Calculate latest cursors vehicle_ids_per_bubble = self.vehicle_ids_per_bubble() cursors = set() active_bubbles, inactive_bubbles = self._bubble_groups(sim) active_bubbles: Sequence[Bubble] inactive_bubbles: Sequence[Bubble] inactive_bubbles_to_run = [ b for b in inactive_bubbles if len(vehicle_ids_per_bubble[b]) ] if inactive_bubbles_to_run: for bubble in inactive_bubbles_to_run: for _, vehicle in persisted_vehicle_index.vehicleitems(): cursor = Cursor.for_removed( vehicle_id=vehicle.id, bubble=bubble, index=persisted_vehicle_index, vehicle_ids_per_bubble=vehicle_ids_per_bubble, ) if cursor.transition not in (BubbleTransition.AirlockExited,): continue cursors.add(cursor) if not active_bubbles: return cursors # Cut down on duplicate generation of values vehicle_data = [ ( vehicle, vehicle.pose.point, math.sqrt( vehicle.width * vehicle.width + vehicle.length * vehicle.length ), ) for _, vehicle in persisted_vehicle_index.vehicleitems() ] old_cursors = {c.vehicle_id: c for c in self._cursors} for bubble in active_bubbles: sim_condition_kwargs = self._gen_simulation_condition_kwargs( sim, condition_requires=bubble.airlock_condition.requires ) mission_condition_kwargs = self._gen_mission_condition_kwargs( bubble.actor.name, None, condition_requires=bubble.airlock_condition.requires, ) was_in_this_bubble = vehicle_ids_per_bubble[bubble] for vehicle, point, v_radius in vehicle_data: sq_distance = (point.x - bubble.centroid[0]) * ( point.x - bubble.centroid[0] ) + (point.y - bubble.centroid[1]) * (point.y - bubble.centroid[1]) if vehicle.id in was_in_this_bubble or sq_distance <= pow( v_radius + bubble.radius + bubble._bubble.margin, 2 ): actor_condition_kwargs = self._gen_actor_state_condition_args( sim.road_map, vehicle.state, bubble.airlock_condition.requires, ) is_hijack_admissible, is_airlock_admissible = bubble.admissibility( vehicle.id, persisted_vehicle_index, vehicle_ids_per_bubble, cursors, ) is_airlock_admissible = is_airlock_admissible and ( ConditionState.TRUE in bubble.airlock_condition.evaluate( **sim_condition_kwargs, **mission_condition_kwargs, **actor_condition_kwargs, ) ) cursor = Cursor.from_pos( position=point.as_shapely, vehicle_id=vehicle.id, bubble=bubble, index=persisted_vehicle_index, vehicle_ids_per_bubble=vehicle_ids_per_bubble, running_cursors=cursors, previous_cursor=old_cursors.get(vehicle.id), is_hijack_admissible=is_hijack_admissible, is_airlock_admissible=is_airlock_admissible, ) cursors.add(cursor) return cursors def _handle_transitions(self, sim, cursors: Set[Cursor]): social_agent_vehicles = [] for agent_id in sim.agent_manager.social_agent_ids: social_agent_vehicles += sim.vehicle_index.vehicles_by_owner_id(agent_id) transitioned = [c for c in cursors if c.transition is not None] for cursor in transitioned: bubble = cursor.bubble actor = bubble.actor if cursor.transition == BubbleTransition.AirlockEntered: if isinstance(actor, SocialAgentActor): self._airlock_social_vehicle_with_social_agent( sim, cursor.vehicle_id, social_agent_actor=actor, is_boid=bubble.is_boid, keep_alive=bubble.keep_alive, ) elif isinstance(actor, TrafficEngineActor): pass else: self._log.warning( f"Unknown actor base used and will be skipped:\n {actor}" ) elif cursor.transition == BubbleTransition.Entered: if isinstance(actor, SocialAgentActor): self._hijack_social_vehicle_with_social_agent( sim, cursor.vehicle_id, actor, bubble.is_boid ) elif isinstance(actor, TrafficEngineActor): self._transfer_to_traffic_engine(sim, cursor.vehicle_id, actor) else: self._log.warning( f"Unknown actor base used and will be skipped:\n {actor}" ) elif cursor.transition == BubbleTransition.Exited: continue elif cursor.transition == BubbleTransition.AirlockExited: teardown = not bubble.is_boid or not bubble.keep_alive if isinstance(actor, SocialAgentActor): agent_id = BubbleManager._get_agent_id_from_cursor(cursor) sim.vehicle_exited_bubble( cursor.vehicle_id, agent_id, teardown_agent=teardown ) def _move_traveling_bubbles(self, sim): active_bubbles, inactive_bubbles = self._bubble_groups(sim) for bubble in [*active_bubbles, *inactive_bubbles]: if not bubble.is_traveling: continue vehicles = [] # XXX: Using a vehicle reference through the `_last_vehicle_index` is a # XXX clear error since it can reference out of date vehicles. if bubble.follow_actor_id is not None: vehicles += self._last_vehicle_index.vehicles_by_owner_id( bubble.follow_actor_id ) if bubble.follow_vehicle_id is not None: vehicle = self._last_vehicle_index.vehicle_by_id( bubble.follow_vehicle_id, None ) if vehicle is not None: vehicles += [vehicle] assert ( len(vehicles) <= 1 ), "Traveling bubbles only support pinning to a single vehicle" if len(vehicles) == 1: bubble.move_to_follow_vehicle(vehicles[0]) def _airlock_social_vehicle_with_social_agent( self, sim, vehicle_id: str, social_agent_actor: SocialAgentActor, is_boid: bool, keep_alive: bool, ): """When airlocked. The social agent will receive observations and execute its policy, however it won't actually operate the vehicle's controller. """ assert isinstance( social_agent_actor, SocialAgentActor ), "This must be a social agent actor type." self._log.debug( f"Airlocked vehicle={vehicle_id} with actor={social_agent_actor}" ) if is_boid: agent_id = BubbleManager._make_boid_social_agent_id(social_agent_actor) else: agent_id = BubbleManager._make_social_agent_id(vehicle_id) social_agent = None if is_boid and keep_alive or agent_id in sim.agent_manager.social_agent_ids: # E.g. if agent is a boid and was being re-used interface = sim.agent_manager.agent_interface_for_agent_id(agent_id) else: social_agent = make_social_agent( locator=social_agent_actor.agent_locator, **social_agent_actor.policy_kwargs, ) interface = social_agent.interface self._prepare_sensors_for_agent_control( sim, vehicle_id, agent_id, interface, is_boid=is_boid ) if social_agent is None: return self._start_social_agent( sim, agent_id, social_agent, social_agent_actor, is_boid=is_boid, keep_alive=keep_alive, ) def _hijack_social_vehicle_with_social_agent( self, sim, vehicle_id: str, social_agent_actor: SocialAgentActor, is_boid: bool ): """Upon hijacking the social agent is now in control of the vehicle. It will initialize the vehicle chassis (and by extension the controller) with a "greatest common denominator" state; that is: what's available via the vehicle front-end common to both source and destination policies during airlock. """ assert isinstance( social_agent_actor, SocialAgentActor ), "This must be a social agent actor type." self._log.debug(f"Hijack vehicle={vehicle_id} with actor={social_agent_actor}") if is_boid: agent_id = BubbleManager._make_boid_social_agent_id(social_agent_actor) else: agent_id = BubbleManager._make_social_agent_id(vehicle_id) agent_interface = sim.agent_manager.agent_interface_for_agent_id(agent_id) vehicle = sim.vehicle_index.switch_control_to_agent( sim, vehicle_id, agent_id, boid=is_boid, hijacking=True, recreate=False, agent_interface=agent_interface, ) if vehicle: sim.create_vehicle_in_providers(vehicle, agent_id) def _prepare_sensors_for_agent_control( self, sim, vehicle_id, agent_id, agent_interface, is_boid: bool ): plan = Plan(sim.road_map, None) vehicle = sim.vehicle_index.start_agent_observation( sim, vehicle_id, agent_id, agent_interface, plan, boid=is_boid, ) # Setup mission (also used for observations) # XXX: here we try to find where the vehicle was originally going, although # the agent may or may not want to go there too. But we preserve it # in the plan so when the agent relinquishes control, the next Provider # can resume going there (potentially via a different route at that point). dest_road_id = None for traffic_sim in sim.traffic_sims: if traffic_sim.manages_actor(vehicle.id): dest_road_id = traffic_sim.vehicle_dest_road(vehicle.id) if dest_road_id is not None: break if dest_road_id: goal = PositionalGoal.from_road(dest_road_id, sim.scenario.road_map) else: goal = EndlessGoal() mission = NavigationMission( start=Start(vehicle.position[:2], vehicle.heading), goal=goal ) try: plan.create_route(mission) except PlanningError: plan.route = sim.road_map.empty_route() def _start_social_agent( self, sim, agent_id, social_agent, social_agent_actor, is_boid: bool, keep_alive: bool, ): id_ = SocialAgentId.new(social_agent_actor.name) social_agent_data_model = SocialAgent( id=id_, actor_name=id_, is_boid=is_boid, is_boid_keep_alive=keep_alive, agent_locator=social_agent_actor.agent_locator, policy_kwargs=social_agent_actor.policy_kwargs, initial_speed=social_agent_actor.initial_speed, ) sim.agent_manager.start_social_agent( agent_id, social_agent, social_agent_data_model ) def _transfer_to_traffic_engine( self, sim, vehicle_id: str, traffic_engine_actor: TrafficEngineActor ): traffic_provider = traffic_engine_actor.traffic_provider from smarts.core.smarts import SMARTS assert isinstance(sim, SMARTS) new_provider = sim.get_provider_by_id(traffic_provider) current_provider = sim.provider_for_actor(vehicle_id) assert ( current_provider is not None ), f"Actor {vehicle_id} should have a provider." assert ( new_provider is not None ), f"Actor `{traffic_engine_actor.name}` requires a specific provider. Ensure that `{traffic_provider}` is added to the simulation." if not current_provider: return if not new_provider: return vehicle = sim.vehicle_index.vehicle_by_id(vehicle_id) sim.transition_to_provider( *sim.provider_relinquishing_actor( current_provider=current_provider, state=vehicle.state ) ) @staticmethod def _make_social_agent_id(vehicle_id): return f"BUBBLE-ACTOR-{truncate(vehicle_id, 48)}" @staticmethod def _make_boid_social_agent_id(social_agent_actor): return f"BUBBLE-ACTOR-{truncate(social_agent_actor.name, 48)}" @staticmethod def _get_agent_id_from_cursor(cursor: Cursor): if cursor.bubble.is_boid: return BubbleManager._make_boid_social_agent_id(cursor.bubble.actor) return BubbleManager._make_social_agent_id(cursor.vehicle_id)
[docs] def teardown(self): """Clean up internal state.""" self._cursors = set() self._bubbles = []