Source code for smarts.core.sumo_traffic_simulation

# Copyright (C) 2020. Huawei Technologies Co., Ltd. All rights reserved.
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NON-INFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
from __future__ import annotations

import logging
import random
import weakref
from functools import partial
from pathlib import Path
from typing import TYPE_CHECKING, Final, Iterable, List, Optional, Sequence, Tuple

import numpy as np
from shapely.affinity import rotate as shapely_rotate
from shapely.geometry import Polygon
from shapely.geometry import box as shapely_box

from smarts.core import config, gen_id
from smarts.core.actor import ActorRole, ActorState
from smarts.core.colors import SceneColors
from smarts.core.coordinates import Dimensions, Heading, Pose, RefLinePoint
from smarts.core.provider import (
    Provider,
    ProviderManager,
    ProviderRecoveryFlags,
    ProviderState,
)
from smarts.core.road_map import RoadMap
from smarts.core.route_cache import RouteWithCache
from smarts.core.signals import SignalLightState, SignalState
from smarts.core.sumo_road_network import SumoRoadNetwork
from smarts.core.traffic_provider import TrafficProvider
from smarts.core.utils.centralized_traci_server import spawn_if_not
from smarts.core.utils.core_logging import suppress_output
from smarts.core.vehicle import VEHICLE_CONFIGS, VehicleState

NO_CHECKS: Final = 0b00000

# isort:skip
from smarts.core.utils.sumo_utils import (
    LocalSumoProcess,
    RemoteSumoProcess,
    TraciConn,
    traci,
)

import traci.constants as tc  # isort:skip

if TYPE_CHECKING:
    import smarts.core.scenario


[docs]class SumoTrafficSimulation(TrafficProvider): """ Args: headless: False to run with `sumo-gui`. True to run with `sumo` time_resolution: SUMO simulation occurs in discrete `time_resolution`-second steps WARNING: Since our interface(TRACI) to SUMO is delayed by one simulation step, setting a higher time resolution may lead to unexpected artifacts num_external_sumo_clients: Block and wait on the specified number of other clients to connect to SUMO. sumo_port: The port that sumo will attempt to run on. auto_start: False to pause simulation when SMARTS runs, and wait for user to click start on `sumo-gui`. allow_reload: Reset SUMO instead of restarting SUMO when the current map is the same as the previous. remove_agents_only_mode: Remove only agent vehicles used by SMARTS and not delete other SUMO vehicles when the traffic simulation calls to tear-down traci_retries: The number of times to retry acquisition of a TraCI server before throwing an exception. """ _HAS_DYNAMIC_ATTRIBUTES = True def __init__( self, headless: bool = True, time_resolution: Optional[float] = 0.1, num_external_sumo_clients: int = 0, sumo_port: Optional[int] = None, auto_start: bool = True, allow_reload: bool = True, debug: bool = False, remove_agents_only_mode: bool = False, traci_retries: Optional[int] = None, ): self._remove_agents_only_mode = remove_agents_only_mode self._log = logging.getLogger(self.__class__.__name__) self._debug = debug self._scenario = None self._log_file = None assert ( time_resolution ), "cannot use SUMO traffic simulation with variable time deltas" self._time_resolution = time_resolution self._headless = headless self._cumulative_sim_seconds = 0 self._non_sumo_vehicle_ids = set() self._sumo_vehicle_ids = set() self._hijacked = set() self._is_setup = False self._last_trigger_time = -1000000 self._num_dynamic_ids_used = 0 self._traci_conn: Optional[TraciConn] = None self._num_clients = 1 + num_external_sumo_clients self._sumo_port = sumo_port self._last_traci_state = None self._auto_start = auto_start self._to_be_teleported = dict() self._reserved_areas = dict() self._allow_reload = allow_reload self._traffic_lights = dict() self._tls_cache = dict() self._last_provider_state = ProviderState() self._last_vehicle_subscriptions = dict() self._sim = None self._handling_error = False self._traci_retries = traci_retries # XXX: This is used to try to avoid interrupting other instances in race condition (see GH #2139) self._foreign_traci_servers: List[TraciConn] = [] if ( self._sumo_port is not None or (traci_serve_mode := config()("sumo", "traci_serve_mode")) == "local" ): self._process_factory = partial(LocalSumoProcess, self._sumo_port) elif traci_serve_mode == "central": remote_host = config()("sumo", "central_host") remote_port = config()("sumo", "central_port", cast=int) spawn_if_not(remote_host, remote_port) self._process_factory = partial( RemoteSumoProcess, remote_host=remote_host, remote_port=remote_port, ) # start with the default recovery flags... self._recovery_flags = super().recovery_flags # TODO: remove when SUMO fixes SUMO reset memory growth bug. # `sumo-gui` memory growth is faster. self._reload_count = 50 self._current_reload_count = 0 # /TODO self._traci_exceptions = ( traci.exceptions.TraCIException, traci.exceptions.FatalTraCIError, ) def __repr__(self): return f"""SumoTrafficSim( _scenario={repr(self._scenario)}, _time_resolution={self._time_resolution}, _headless={self._headless}, _cumulative_sim_seconds={self._cumulative_sim_seconds}, _non_sumo_vehicle_ids={self._non_sumo_vehicle_ids}, _sumo_vehicle_ids={self._sumo_vehicle_ids}, _is_setup={self._is_setup}, _last_trigger_time={self._last_trigger_time}, _num_dynamic_ids_used={self._num_dynamic_ids_used}, _traci_conn={repr(self._traci_conn)} )""" def __str__(self): return repr(self)
[docs] def destroy(self): """Clean up TraCI related connections.""" if self._traci_conn: self._traci_conn.close_traci_and_pipes() self._is_setup = False
@property def recovery_flags(self) -> ProviderRecoveryFlags: return self._recovery_flags @recovery_flags.setter def recovery_flags(self, flags: ProviderRecoveryFlags): self._recovery_flags = flags
[docs] def set_manager(self, manager: ProviderManager): self._sim = weakref.ref(manager)
@property def headless(self): """Does not show TraCI visualization.""" return self._headless @property def actor_ids(self) -> Iterable[str]: """The vehicles that sumo manages. Returns: Iterable[str]: Sumo vehicle ids. """ return self._sumo_vehicle_ids def _initialize_traci_conn(self, num_retries=5): # TODO: inline sumo or process pool # the retries are to deal with port collisions # since the way we start sumo here has a race condition on # each spawned process claiming a port current_retries = 0 while current_retries < num_retries: if self._traci_conn is not None: self._traci_conn.close_traci_and_pipes() self._traci_conn = None sumo_binary = "sumo" if self._headless else "sumo-gui" sumo_process = self._process_factory() sumo_process.generate( base_params=self._base_sumo_load_params(), sumo_binary=sumo_binary ) self._traci_conn = TraciConn( sumo_process=sumo_process, ) try: self._traci_conn.connect( timeout=5, minimum_traci_version=20, minimum_sumo_version=(1, 10, 0), debug=self._debug, ) if not self._traci_conn.connected: # Save the connection to try to avoid closing it for the other client. self._foreign_traci_servers.append(self._traci_conn) self._traci_conn = None raise traci.exceptions.TraCIException( "TraCI server was likely taken by other client." ) except traci.exceptions.FatalTraCIError: # Could not connect in time just retry connection current_retries += 1 continue except traci.exceptions.TraCIException: # SUMO process died... unsure why this is not a fatal traci error current_retries += 1 continue except ConnectionRefusedError: # Some other process somehow owns the port... sumo needs to be restarted. continue except KeyboardInterrupt: self._log.debug("Keyboard interrupted TraCI connection.") self._traci_conn.close_traci_and_pipes(wait=False) raise break else: exception = traci.exceptions.FatalTraCIError( f"Unable to connect to TraCI server after `{num_retries=}`." ) self._handle_traci_exception(exception, actors_relinquishable=False) raise exception try: # It is mandatory to set order when using multiple clients. self._traci_conn.setOrder(0) self._traci_conn.getVersion() except (traci.exceptions.FatalTraCIError, TypeError) as err: self._handle_traci_exception(err, actors_relinquishable=False) self.teardown() raise self._log.debug("Finished starting sumo process") def _base_sumo_load_params(self): load_params = [ "--num-clients=%d" % self._num_clients, "--net-file=%s" % self._scenario.road_map.source, "--quit-on-end", "--log=%s" % self._log_file, "--error-log=%s.err" % self._log_file, "--no-step-log", "--no-warnings=1", "--seed=%s" % random.randint(0, 2147483648), "--time-to-teleport=%s" % -1, "--collision.check-junctions=true", "--collision.action=none", "--lanechange.duration=3.0", # TODO: 1--lanechange.duration1 or 1--lateral-resolution`, in combination with `route_id`, # causes lane change crashes as of SUMO 1.6.0. # Controlling vehicles that have been added to the simulation with a route causes # lane change related crashes. # "--lateral-resolution=100", # smooth lane changes "--step-length=%f" % self._time_resolution, "--default.action-step-length=%f" % self._time_resolution, "--begin=0", # start simulation at time=0 "--end=31536000", # keep the simulation running for a year ] rerouter_file = Path(self._scenario.road_map.source).parent / "rerouter.add.xml" if rerouter_file.exists(): load_params.append(f"--additional-files={rerouter_file}") if self._auto_start: load_params.append("--start") ## See for more information about --route-files # https://sumo.dlr.de/docs/Simulation/Basic_Definition.html#traffic_demand_routes # https://sumo.dlr.de/docs/sumo.html#loading_order_of_input_files sumo_route_files = [ ts for ts in self._scenario.traffic_specs if ts.endswith(".rou.xml") ] if sumo_route_files: load_params.append("--route-files={}".format(",".join(sumo_route_files))) return load_params def _restart_sumo(self): engine_config = config() traci_retries = self._traci_retries or engine_config( "sumo", "traci_retries", default=5, cast=int ) self._initialize_traci_conn(num_retries=traci_retries)
[docs] def setup(self, scenario) -> ProviderState: """Initialize the simulation with a new scenario.""" self._log.debug("Setting up SumoTrafficSim %s", self) assert ( not self._is_setup ), f"Can't setup twice, {self._is_setup}, see teardown()" # restart sumo process only when map file changes restart_sumo = ( not self._scenario or not self.connected or self._scenario.road_map_hash != scenario.road_map_hash or self._current_reload_count >= self._reload_count or self._traci_conn.must_reset() # Some versions of sumo crash when reloading ) self._current_reload_count = self._current_reload_count % self._reload_count + 1 self._scenario = scenario assert isinstance( scenario.road_map, SumoRoadNetwork ), "SumoTrafficSimulation requires a SumoRoadNetwork" self._log_file = scenario.unique_sumo_log_file() try: if restart_sumo: self._restart_sumo() elif self._allow_reload: assert ( self._traci_conn is not None ), "TraCI should be connected at this point." try: self._traci_conn.load(self._base_sumo_load_params()) except traci.exceptions.FatalTraCIError: self._restart_sumo() except traci.exceptions.FatalTraCIError: return ProviderState() assert self._traci_conn is not None, "No active TraCI connection" self._traci_conn.simulation.subscribe( [tc.VAR_DEPARTED_VEHICLES_IDS, tc.VAR_ARRIVED_VEHICLES_IDS] ) self._traffic_lights = dict() for tls_id in self._traci_conn.trafficlight.getIDList(): self._traffic_lights[ tls_id ] = self._traci_conn.trafficlight.getControlledLinks(tls_id) self._traci_conn.trafficlight.subscribe( tls_id, [tc.TL_RED_YELLOW_GREEN_STATE] ) # XXX: SUMO caches the previous subscription results. Calling `simulationStep` # effectively flushes the results. We need to use epsilon instead of zero # as zero will step according to a default (non-zero) step-size. self.step({}, 1e-6, 0) if not self.connected: self._is_setup = False return ProviderState() self._is_setup = True return self._compute_provider_state()
def _handle_traci_exception( self, error, actors_relinquishable: bool = True, removed_actor_id: Optional[str] = None, ): if self._handling_error: return self._handling_error = True if isinstance(error, traci.exceptions.TraCIException): # XXX: Needs further investigation whenever this happens. self._log.debug("TraCI has provided a warning %s", error) return if isinstance(error, traci.exceptions.FatalTraCIError): self._log.error( "TraCI has disconnected with: `%s`. Please check the logging file `%s`.", error, self._log_file, ) sim = self._sim() if ( sim and actors_relinquishable and self.recovery_flags & ProviderRecoveryFlags.RELINQUISH_ACTORS ): self._log.warning( "attempting to transfer SUMO vehicles to other providers..." ) for actor in self._last_provider_state.actors: if actor.actor_id != removed_actor_id: sim.provider_releases_actor(self, actor) self._traci_conn.close_traci_and_pipes() self._handling_error = False def _remove_vehicles(self): vehicles_to_remove = None if self._remove_agents_only_mode: vehicles_to_remove = self._non_sumo_vehicle_ids else: vehicles_to_remove = self._non_sumo_vehicle_ids.union( self._sumo_vehicle_ids ) sim = self._sim() for vehicle_id in vehicles_to_remove: if sim: # Call for immediate removal of the vehicle sim.provider_removing_actor(self, vehicle_id) try: self._traci_conn.vehicle.remove(vehicle_id) except traci.exceptions.FatalTraCIError as err: self._handle_traci_exception(err, actors_relinquishable=False) raise except traci.exceptions.TraCIException as err: self._handle_traci_exception(err, actors_relinquishable=False)
[docs] def teardown(self): self._log.debug("Tearing down SUMO traffic sim %s", self) if not self._is_setup: self._log.debug("Nothing to teardown") return assert self._is_setup if self.connected: try: self._remove_vehicles() except traci.exceptions.FatalTraCIError: pass if not self._allow_reload and self._traci_conn is not None: self._traci_conn.close_traci_and_pipes() for i, trc in reversed( [ (j, trc) for j, trc in enumerate(self._foreign_traci_servers) if not trc.viable ] ): self._foreign_traci_servers.pop(i) trc.close_traci_and_pipes(wait=False) self._cumulative_sim_seconds = 0 self._non_sumo_vehicle_ids = set() self._sumo_vehicle_ids = set() self._hijacked = set() self._is_setup = False self._num_dynamic_ids_used = 0 self._to_be_teleported = dict() self._reserved_areas = dict() self._last_vehicle_subscriptions = dict()
@property def connected(self): return self._traci_conn is not None and self._traci_conn.connected @property def actions(self): # Unify interfaces with other providers return {}
[docs] def reset(self): # Unify interfaces with other providers pass
[docs] def recover( self, scenario: smarts.core.scenario.Scenario, elapsed_sim_time: float, error: Optional[Exception] = None, ) -> Tuple[ProviderState, bool]: if isinstance(error, self._traci_exceptions): self._handle_traci_exception(error) elif isinstance(error, Exception): raise error return self._last_provider_state, False
[docs] def step(self, actions, dt: float, elapsed_sim_time: float) -> ProviderState: assert not actions if not self.connected: self._last_provider_state = ProviderState() else: self._last_provider_state = self._step(dt) return self._last_provider_state
def _step(self, dt): # we tell SUMO to step through dt more seconds of the simulation self._cumulative_sim_seconds += dt try: # Suppress errors here, to avoid a known (and likely benign) # error related to removing vehicles. # See: https://github.com/huawei-noah/SMARTS/issues/1155 with suppress_output(stderr=False): self._traci_conn.simulationStep(self._cumulative_sim_seconds) except traci.exceptions.FatalTraCIError as err: self._handle_traci_exception(err) return ProviderState() except traci.exceptions.TraCIException as err: self._handle_traci_exception(err) return self._compute_provider_state()
[docs] def sync(self, provider_state: ProviderState): if not self.connected: return None return self._sync(provider_state)
def _sync(self, provider_state: ProviderState): provider_vehicles = { v.actor_id: v for v in provider_state.actors if isinstance(v, VehicleState) } external_vehicle_ids = { v.actor_id for v in provider_vehicles.values() if v.source != self.source_str } internal_vehicle_ids = { v.actor_id for v in provider_vehicles.values() if v.source == self.source_str } # Represents current state traffic_vehicle_states = self._traci_conn.vehicle.getAllSubscriptionResults() traffic_vehicle_ids = set(traffic_vehicle_states) self._last_vehicle_subscriptions = traffic_vehicle_states # State / ownership changes external_vehicles_that_have_joined = ( external_vehicle_ids - self._non_sumo_vehicle_ids - traffic_vehicle_ids - self._hijacked ) vehicles_that_have_become_external = ( traffic_vehicle_ids & external_vehicle_ids - self._non_sumo_vehicle_ids ) # XXX: They may have become internal because they've been relinquished or # because they've been destroyed from a collision. Presently we're not # differentiating and will take over as social vehicles regardless. vehicles_that_have_become_internal = ( internal_vehicle_ids & self._non_sumo_vehicle_ids & traffic_vehicle_ids ) external_vehicles_that_have_left = ( self._non_sumo_vehicle_ids - external_vehicle_ids - vehicles_that_have_become_internal ) log = "" if external_vehicles_that_have_left: log += ( f"external_vehicles_that_have_left={external_vehicles_that_have_left}\n" ) if external_vehicles_that_have_joined: log += f"external_vehicles_that_have_joined={external_vehicles_that_have_joined}\n" if vehicles_that_have_become_external: log += f"vehicles_that_have_become_external={vehicles_that_have_become_external}\n" if vehicles_that_have_become_internal: log += f"vehicles_that_have_become_internal={vehicles_that_have_become_internal}\n" if log: self._log.debug(log) for vehicle_id in external_vehicles_that_have_left: self._log.debug("Non SUMO vehicle %s left simulation", vehicle_id) self._non_sumo_vehicle_ids.remove(vehicle_id) self._traci_conn.vehicle.remove(vehicle_id) for vehicle_id in external_vehicles_that_have_joined: vehicle_state = provider_vehicles[vehicle_id] dimensions = Dimensions.copy_with_defaults( vehicle_state.dimensions, VEHICLE_CONFIGS[vehicle_state.vehicle_config_type].dimensions, ) self._create_vehicle(vehicle_id, dimensions, vehicle_state.role) no_checks = NO_CHECKS self._traci_conn.vehicle.setSpeedMode(vehicle_id, no_checks) # update the state of all current managed vehicles for vehicle_id in self._non_sumo_vehicle_ids: provider_vehicle = provider_vehicles[vehicle_id] pos, sumo_heading = provider_vehicle.pose.as_sumo( provider_vehicle.dimensions.length, Heading(0) ) # See https://sumo.dlr.de/docs/TraCI/Change_Vehicle_State.html#move_to_xy_0xb4 # for flag values try: self._move_vehicle( provider_vehicle.actor_id, pos, sumo_heading, provider_vehicle.speed, ) # since the vehicle may have switched roles (e.g., been trapped), recolor it self._traci_conn.vehicle.setColor( vehicle_id, SumoTrafficSimulation._color_for_role(provider_vehicle.role), ) except traci.exceptions.TraCIException: # Likely as a result of https://github.com/eclipse/sumo/issues/3993 # the vehicle got removed because we skipped a moveToXY call between # internal stepSimulations, so we add the vehicle back here. self._log.warning( "Attempted to (TraCI) SUMO.moveToXY(...) on missing " "vehicle(id=%s)", vehicle_id, ) self._create_vehicle( vehicle_id, provider_vehicle.dimensions, provider_vehicle.role ) self._move_vehicle( provider_vehicle.actor_id, pos, sumo_heading, provider_vehicle.speed, ) for vehicle_id in vehicles_that_have_become_external: no_checks = NO_CHECKS self._traci_conn.vehicle.setSpeedMode(vehicle_id, no_checks) self._traci_conn.vehicle.setColor( vehicle_id, SumoTrafficSimulation._color_for_role(ActorRole.SocialAgent) ) self._non_sumo_vehicle_ids.add(vehicle_id) for vehicle_id in vehicles_that_have_become_internal: self._traci_conn.vehicle.setColor( vehicle_id, SumoTrafficSimulation._color_for_role(ActorRole.Social) ) self._non_sumo_vehicle_ids.remove(vehicle_id) # Let sumo take over speed again # For setSpeedMode look at: https://sumo.dlr.de/docs/TraCI/Change_Vehicle_State.html#speed_mode_0xb3 all_checks = 0b11111 self._traci_conn.vehicle.setSpeedMode(vehicle_id, all_checks) self._traci_conn.vehicle.setSpeed(vehicle_id, -1) self._reroute_vehicles(traffic_vehicle_states) self._teleport_exited_vehicles() @staticmethod def _color_for_role(role: ActorRole) -> np.ndarray: if role == ActorRole.EgoAgent: return np.array(SceneColors.Agent.value[:3]) * 255 elif role == ActorRole.SocialAgent: return np.array(SceneColors.SocialAgent.value[:3]) * 255 elif role == ActorRole.Social: return np.array(SceneColors.SocialVehicle.value[:3]) * 255 return np.array(SceneColors.SocialVehicle.value[:3]) * 255 def _move_vehicle(self, vehicle_id, position, heading, speed): x, y, _ = position self._traci_conn.vehicle.moveToXY( vehID=vehicle_id, edgeID="", # let sumo choose the edge lane=-1, # let sumo choose the lane x=x, y=y, angle=heading, # only used for visualizing in sumo-gui keepRoute=0b010, ) self._traci_conn.vehicle.setSpeed(vehicle_id, speed)
[docs] def update_route_for_vehicle(self, vehicle_id: str, new_route: RoadMap.Route): """Sets a new route for vehicle_id, but only if it is different from the previously-set route (otherwise, avoids the TraCI call). Any sumo-special roads (e.g., junction) are removed from the new route before setting it because Sumo doesn't allow specifying these in the call to its `vehicle.setRoute()` and will raise an exception otherwise.""" if not self.connected: return old_route = self._route_for_vehicle(vehicle_id) if old_route: new_route_ids = [rr for rr in new_route.road_ids if rr[0] != ":"] if new_route_ids == list(old_route): return try: # Note: the first edge of the route must be the edge we're currently on... self._traci_conn.vehicle.setRoute(vehicle_id, new_route.road_ids) except self._traci_exceptions as err: self._handle_traci_exception(err)
def _create_vehicle(self, vehicle_id, dimensions, role: ActorRole): assert isinstance( vehicle_id, str ), f"SUMO expects string ids: {vehicle_id} is a {type(vehicle_id)}" self._log.debug("Non SUMO vehicle %s joined simulation", vehicle_id) self._non_sumo_vehicle_ids.add(vehicle_id) self._traci_conn.vehicle.add( vehID=vehicle_id, routeID="", # we don't care which route this vehicle is on ) vehicle_color = SumoTrafficSimulation._color_for_role(role) self._traci_conn.vehicle.setColor(vehicle_id, vehicle_color) # Directly below are two of the main factors that affect vehicle secure gap for # purposes of determining the safety gaps that SUMO vehicles will abide by. The # remaining large factor is vehicle speed. # See: # http://sumo-user-mailing-list.90755.n8.nabble.com/sumo-user-Questions-on-SUMO-Built-In-Functions-getSecureGap-amp-brakeGap-td3254.html # Set the controlled vehicle's time headway in seconds self._traci_conn.vehicle.setTau(vehicle_id, 4) # Set the controlled vehicle's maximum natural deceleration in m/s self._traci_conn.vehicle.setDecel(vehicle_id, 6) # setup the vehicle size self._traci_conn.vehicle.setLength(vehicle_id, dimensions.length) self._traci_conn.vehicle.setWidth(vehicle_id, dimensions.width) self._traci_conn.vehicle.setHeight(vehicle_id, dimensions.height) def _decode_tls_state(self, tls_state: str) -> SignalLightState: assert len(tls_state) == 1 if tls_state in "gG": return SignalLightState.GO if tls_state in "rRs": return SignalLightState.STOP if tls_state in "yY": return SignalLightState.CAUTION if tls_state in "oO": return SignalLightState.OFF return SignalLightState.UNKNOWN def _create_signal_state( self, sig_id: str, controlled_links: Sequence[Tuple[str, str, str]] ) -> SignalState: incoming_lane_id = None controlled_lanes = [] for link in controlled_links: in_lane_id, out_lane_id, via_id = link via_lane = self._scenario.road_map.lane_by_id(via_id) assert via_lane controlled_lanes.append(via_id) assert not incoming_lane_id or incoming_lane_id == in_lane_id incoming_lane_id = in_lane_id incoming_lane = self._scenario.road_map.lane_by_id(incoming_lane_id) loc = incoming_lane.from_lane_coord(RefLinePoint(s=incoming_lane.length)) return SignalState( actor_id=sig_id, actor_type="signal", source=self.source_str, role=ActorRole.Signal, state=SignalLightState.UNKNOWN, stopping_pos=loc, controlled_lanes=controlled_lanes, last_changed=None, ) def _traffic_light_states(self) -> List[SignalState]: signal_states = [] traffic_light_states = self._traci_conn.trafficlight.getAllSubscriptionResults() for tls_id, tls_state in traffic_light_states.items(): tls_state = tls_state[tc.TL_RED_YELLOW_GREEN_STATE] tls_control = self._traffic_lights.get(tls_id) assert tls_control for s, controlled_links in enumerate(tls_control): sig_id = f"tls_{tls_id}-{s}" if not controlled_links: continue sig_state = self._tls_cache.setdefault( sig_id, self._create_signal_state(sig_id, controlled_links) ) prev_state = sig_state.state sig_state.state = self._decode_tls_state(tls_state[s]) if ( sig_state.state != prev_state and prev_state != SignalLightState.UNKNOWN ): sig_state.last_changed = self._cumulative_sim_seconds signal_states.append(sig_state) return signal_states def _compute_provider_state(self) -> ProviderState: return ProviderState( actors=self._compute_traffic_vehicles() + self._traffic_light_states() )
[docs] def manages_actor(self, actor_id: str) -> bool: if not self.connected: return False return actor_id in self._sumo_vehicle_ids
def _compute_traffic_vehicles(self) -> List[VehicleState]: self._last_traci_state = self._traci_conn.simulation.getSubscriptionResults() if not self._last_traci_state: return [] # New social vehicles that have entered the map newly_departed_sumo_traffic = [ vehicle_id for vehicle_id in self._last_traci_state[tc.VAR_DEPARTED_VEHICLES_IDS] if vehicle_id not in self._non_sumo_vehicle_ids ] reserved_areas = [position for position in self._reserved_areas.values()] # Subscribe to all vehicles to reduce repeated traci calls for vehicle_id in newly_departed_sumo_traffic: self._traci_conn.vehicle.subscribe( vehicle_id, [ tc.VAR_POSITION, # Decimal=66, Hex=0x42 tc.VAR_ANGLE, # Decimal=67, Hex=0x43 tc.VAR_SPEED, # Decimal=64, Hex=0x40 tc.VAR_VEHICLECLASS, # Decimal=73, Hex=0x49 tc.VAR_ROUTE_INDEX, # Decimal=105, Hex=0x69 tc.VAR_EDGES, # Decimal=84, Hex=0x54 tc.VAR_TYPE, # Decimal=79, Hex=0x4F tc.VAR_LENGTH, # Decimal=68, Hex=0x44 tc.VAR_WIDTH, # Decimal=77, Hex=0x4d ], ) sumo_vehicle_state = self._traci_conn.vehicle.getAllSubscriptionResults() for vehicle_id in newly_departed_sumo_traffic: other_vehicle_shape = self._shape_of_vehicle(sumo_vehicle_state, vehicle_id) violates_reserved_area = False for reserved_area in reserved_areas: if reserved_area.intersects(other_vehicle_shape): violates_reserved_area = True break if violates_reserved_area: self._traci_conn.vehicle.remove(vehicle_id) sumo_vehicle_state.pop(vehicle_id) continue self._log.debug("SUMO vehicle %s entered simulation", vehicle_id) # Non-sumo vehicles will show up the step after the sync where the non-sumo vehicle is # added. newly_departed_non_sumo_vehicles = [ vehicle_id for vehicle_id in self._last_traci_state[tc.VAR_DEPARTED_VEHICLES_IDS] if vehicle_id not in newly_departed_sumo_traffic ] for vehicle_id in newly_departed_non_sumo_vehicles: if vehicle_id in self._reserved_areas: del self._reserved_areas[vehicle_id] # Note: we cannot just pop() the self._hijacked from sumo_vehicle_state here, # as this (bizarrely) affects the self._traci_conn.vehicle.getAllSubscriptionResults() call # in _sync() below! self._sumo_vehicle_ids = ( set(sumo_vehicle_state.keys()) - self._non_sumo_vehicle_ids - self._hijacked ) provider_vehicles = [] # batched conversion of positions to numpy arrays front_bumper_positions = np.array( [ sumo_vehicle[tc.VAR_POSITION] for sumo_vehicle in sumo_vehicle_state.values() ] ).reshape(-1, 2) for i, (sumo_id, sumo_vehicle) in enumerate(sumo_vehicle_state.items()): if sumo_id in self._hijacked: continue # XXX: We can safely rely on iteration order over dictionaries being # stable on py3.7. # See: https://www.python.org/downloads/release/python-370/ # "The insertion-order preservation nature of dict objects is now an # official part of the Python language spec." front_bumper_pos = front_bumper_positions[i] heading = Heading.from_sumo(sumo_vehicle[tc.VAR_ANGLE]) speed = sumo_vehicle[tc.VAR_SPEED] vehicle_config_type = sumo_vehicle[tc.VAR_VEHICLECLASS] dimensions = VEHICLE_CONFIGS[vehicle_config_type].dimensions provider_vehicles.append( VehicleState( # XXX: In the case of the SUMO traffic provider, the vehicle ID is # the sumo ID is the actor ID. actor_id=sumo_id, source=self.source_str, role=ActorRole.Social, vehicle_config_type=vehicle_config_type, pose=Pose.from_front_bumper( front_bumper_pos, heading, dimensions.length ), dimensions=dimensions, speed=speed, ) ) return provider_vehicles def _teleport_exited_vehicles(self): if not self._last_traci_state: self._last_traci_state = ( self._traci_conn.simulation.getSubscriptionResults() ) if not self._last_traci_state: return exited_sumo_traffic = [ vehicle_id for vehicle_id in self._last_traci_state[tc.VAR_ARRIVED_VEHICLES_IDS] if vehicle_id not in self._non_sumo_vehicle_ids ] for v_id in exited_sumo_traffic: if v_id in self._to_be_teleported: route = self._to_be_teleported[v_id]["route"] type_id = self._to_be_teleported[v_id]["type_id"] self._teleport_vehicle(v_id, route, 0, type_id) # XXX: del self._to_be_teleported[v_id] def _teleport_vehicle(self, vehicle_id, route, lane_offset, type_id): self._log.debug( "Teleporting %s to lane_offset=%s route=%s", vehicle_id, lane_offset, route ) spawn_road = self._scenario.road_map.road_by_id(route[0]) lane_index = random.randint(0, len(spawn_road.lanes) - 1) self._emit_vehicle_by_route(vehicle_id, route, lane_index, lane_offset, type_id) def _reroute_vehicles(self, vehicle_states): for vehicle_id, state in vehicle_states.items(): if vehicle_id not in self._sumo_vehicle_ids: continue if "endless" not in vehicle_id: continue route_index = state[tc.VAR_ROUTE_INDEX] route_edges = state[tc.VAR_EDGES] type_id = state[tc.VAR_TYPE] if route_index != len(route_edges) - 1: # The vehicle is not in the last route edge. continue # Check if these edges forms a loop. from_road = self._scenario.road_map.road_by_id(route_edges[-1]) to_road = self._scenario.road_map.road_by_id(route_edges[0]) next_roads = [road.road_id for road in from_road.outgoing_roads] if to_road not in next_roads: # Reroute only if it's loop, otherwise, teleport the vehicle. self._to_be_teleported[vehicle_id] = { "route": route_edges, "type_id": type_id, } continue # The first edge in the list has to be the one that the vehicle # is in at the moment, which is the last edge in current route_edges. new_route_edges = route_edges[-1:] + route_edges self._traci_conn.vehicle.setRoute(vehicle_id, new_route_edges) def _route_for_vehicle(self, vehicle_id: str) -> Optional[List[str]]: state = self._last_vehicle_subscriptions.get(vehicle_id) if state is None: return None return state[tc.VAR_EDGES]
[docs] def vehicle_dest_road(self, vehicle_id: str) -> Optional[str]: route = self._route_for_vehicle(vehicle_id) return route[-1] if route else None
[docs] def route_for_vehicle(self, vehicle_id: str) -> Optional[RoadMap.Route]: sim = self._sim() if sim is None or sim.road_map is None: return None route_ids = self._route_for_vehicle(vehicle_id) return ( sim.road_map.route_from_road_ids(route_ids, resolve_intermediaries=True) if route_ids else None )
[docs] def reserve_traffic_location_for_vehicle( self, vehicle_id: str, reserved_location: Polygon, ): self._reserved_areas[vehicle_id] = reserved_location
[docs] def vehicle_collided(self, vehicle_id): # Sumo should already know about this and deal with it appropriately. pass
[docs] def stop_managing(self, actor_id: str): self._hijacked.add(actor_id)
[docs] def remove_actor(self, actor_id: str): if not self.connected: return try: self._traci_conn.vehicle.remove(actor_id) except self._traci_exceptions as err: self._handle_traci_exception(err, removed_actor_id=actor_id) self._sumo_vehicle_ids.discard(actor_id) self._hijacked.discard(actor_id) self._non_sumo_vehicle_ids.discard(actor_id)
def _shape_of_vehicle(self, sumo_vehicle_state, vehicle_id): p = sumo_vehicle_state[vehicle_id][tc.VAR_POSITION] length = sumo_vehicle_state[vehicle_id][tc.VAR_LENGTH] width = sumo_vehicle_state[vehicle_id][tc.VAR_WIDTH] heading = Heading.from_sumo(sumo_vehicle_state[vehicle_id][tc.VAR_ANGLE]) poly = shapely_box( p[0] - width * 0.5, p[1] - length, p[0] + width * 0.5, p[1], ) return shapely_rotate(poly, heading, use_radians=True) def _emit_vehicle_by_route( self, vehicle_id, route, lane_index, lane_offset, type_id="DEFAULT_VEHTYPE" ): route_id = f"route-{gen_id()}" self._traci_conn.route.add(route_id, route) self._traci_conn.vehicle.add( vehicle_id, route_id, typeID=type_id, departPos=lane_offset, departLane=lane_index, ) return vehicle_id
[docs] def can_accept_actor(self, state: ActorState) -> bool: # We only accept transferred vehicles we previously used to own that # have since been relinquished to us by the agent that hijacked them. # (This is a conservative policy to avoid "glitches"; we may relax it # in the future.) return ( self.connected and isinstance(state, VehicleState) and state.role == ActorRole.Social and state.actor_id in self._hijacked )
[docs] def add_actor( self, provider_actor: ActorState, from_provider: Optional[Provider] = None ): assert isinstance(provider_actor, VehicleState) if provider_actor.actor_id not in self._hijacked: route = None if from_provider and isinstance(from_provider, TrafficProvider): route = from_provider.route_for_vehicle(provider_actor.actor_id) assert not route or isinstance(route, RouteWithCache) self._traci_conn.vehicle.setRoute( provider_actor.actor_id, route.road_ids ) # elif hijacked there is no need to get the route from from_provider because this vehicle # is one that we used to manage, and Sumo/Traci should remember it. self._non_sumo_vehicle_ids.discard(provider_actor.actor_id) self._hijacked.discard(provider_actor.actor_id) provider_actor.source = self.source_str provider_actor.role = ActorRole.Social self._log.info( "traffic actor %s transferred to %s.", provider_actor.actor_id, self.source_str, )