Source code for smarts.core.local_traffic_provider

# Copyright (C) 2022. Huawei Technologies Co., Ltd. All rights reserved.
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
from __future__ import annotations

import logging
import math
import random
import re
import weakref
import xml.etree.ElementTree as XET
from bisect import bisect_left, bisect_right, insort
from collections import defaultdict, deque
from dataclasses import dataclass
from functools import cached_property, lru_cache
from typing import TYPE_CHECKING, Any, Deque, Dict, List, Optional, Set, Tuple

import numpy as np
from shapely.affinity import rotate as shapely_rotate
from shapely.geometry import box as shapely_box

from smarts.core.utils.core_logging import timeit

from .actor import ActorRole, ActorState
from .coordinates import Dimensions, Heading, Point, Pose, RefLinePoint
from .provider import Provider, ProviderManager, ProviderRecoveryFlags, ProviderState
from .road_map import RoadMap
from .route_cache import RouteWithCache
from .signals import SignalLightState, SignalState
from .traffic_provider import TrafficProvider
from .utils.core_math import (
    min_angles_difference_signed,
    radians_to_vec,
    safe_division,
    vec_to_radians,
)
from .utils.kinematics import (
    distance_covered,
    stopping_distance,
    stopping_time,
    time_to_cover,
)
from .vehicle import VEHICLE_CONFIGS, VehicleState

if TYPE_CHECKING:
    from shapely.geometry import Polygon

    import smarts.core.scenario
    from smarts.core.controllers import ActionSpaceType


MAX_IMPATIENCE = 3.0


[docs]class LocalTrafficProvider(TrafficProvider): """A LocalTrafficProvider simulates multiple traffic actors on a generic RoadMap.""" def __init__(self): self._logger = logging.getLogger(self.__class__.__name__) self._sim = None self._scenario = None self.road_map: RoadMap = None self._flows: Dict[str, Dict[str, Any]] = dict() self._my_actors: Dict[str, _TrafficActor] = dict() self._other_actors: Dict[ str, Tuple[ActorState, Optional[RoadMap.Route]] ] = dict() self._reserved_areas: Dict[str, Polygon] = dict() self._actors_created: int = 0 self._lane_bumpers_cache: Dict[ RoadMap.Lane, List[Tuple[float, VehicleState, int]] ] = dict() self._offsets_cache: Dict[str, Dict[str, float]] = dict() # start with the default recovery flags... self._recovery_flags = super().recovery_flags @property def recovery_flags(self) -> ProviderRecoveryFlags: return self._recovery_flags @recovery_flags.setter def recovery_flags(self, flags: ProviderRecoveryFlags): self._recovery_flags = flags
[docs] def set_manager(self, manager: ProviderManager): self._sim = weakref.ref(manager)
@property def actions(self) -> Set[ActionSpaceType]: return set()
[docs] def manages_actor(self, actor_id: str) -> bool: return actor_id in self._my_actors
def _load_traffic_flows(self, traffic_spec: str): vtypes = {} routes = {} root = XET.parse(traffic_spec).getroot() assert root.tag == "routes" for child in root: if child.tag == "vType": vid = child.attrib["id"] vtypes[vid] = child.attrib elif child.tag == "route": rid = child.attrib["id"] routes[rid] = child.attrib["edges"] elif child.tag == "flow": flow = child.attrib vtype = vtypes.get(flow["type"]) assert vtype, f"undefined vehicle type {flow['type']} used in flow" flow["vtype"] = vtype route = routes.get(flow["route"]) assert isinstance( route, str ), f"undefined route {flow['route']} used in flow" route = self.road_map.route_from_road_ids(route.split()) route.add_to_cache() flow["route"] = route flow["begin"] = float(flow["begin"]) flow["end"] = float(flow["end"]) if "vehsPerHour" in flow: freq = float(flow["vehsPerHour"]) if freq <= 0: logging.warning( f"vehPerHour value is {freq}<=0 vehicles may not be emitted!!!" ) freq = 0 flow["emit_period"] = safe_division(3600.0, freq) elif "period" in flow: period = float(flow["period"]) assert period > 0.0 flow["emit_period"] = period elif "probability" in flow: emit_prob = float(flow["probability"]) assert 0.0 <= emit_prob <= 1.0 flow["emit_prob"] = emit_prob else: assert ( False ), "either 'vehsPerHour' or 'probability' must be specified for Flow emission" flow_id = str(flow["id"]) flow["endless"] = "endless" in flow_id self._flows[flow_id] = flow def _check_actor_bbox(self, actor: "_TrafficActor") -> bool: actor_bbox = actor.bbox(True) for reserved_area in self._reserved_areas.values(): if reserved_area.intersects(actor_bbox): return False for my_actor in self._my_actors.values(): if actor != my_actor and my_actor.bbox().intersects(actor_bbox): return False for other, _ in self._other_actors.values(): if isinstance(other, VehicleState) and other.bbox.intersects(actor_bbox): return False return True def _add_actor_in_flow(self, flow: Dict[str, Any]) -> bool: new_actor = _TrafficActor.from_flow(flow, self) if not self._check_actor_bbox(new_actor): return False self._my_actors[new_actor.actor_id] = new_actor self._logger.info(f"traffic actor {new_actor.actor_id} entered simulation") return True def _add_actors_for_time(self, sim_time: float, dt: float = 1.0): for flow in self._flows.values(): if not flow["begin"] <= sim_time < flow["end"]: continue try_add = False last_added = flow.get("last_added") emit_prob = flow.get("emit_prob") emit_period = flow.get("emit_period") if emit_period is not None: try_add = last_added is None or sim_time - last_added >= emit_period elif emit_prob is not None: try_add = random.random() <= emit_prob * dt if try_add and self._add_actor_in_flow(flow): flow["last_added"] = sim_time @property def _my_actor_states(self) -> List[VehicleState]: return [actor.state for actor in self._my_actors.values()] @property def _other_vehicle_states(self) -> List[VehicleState]: return [ other for other, _ in self._other_actors.values() if isinstance(other, VehicleState) ] @property def _all_states(self) -> List[VehicleState]: return self._my_actor_states + self._other_vehicle_states @property def _provider_state(self) -> ProviderState: return ProviderState(actors=self._my_actor_states)
[docs] def setup(self, scenario: smarts.core.scenario.Scenario) -> ProviderState: assert self._sim() is not None self._scenario = scenario self.road_map = scenario.road_map traffic_specs = [ ts for ts in self._scenario.traffic_specs if ts.endswith(".smarts.xml") ] assert len(traffic_specs) <= 1 if traffic_specs: self._load_traffic_flows(traffic_specs[0]) # TAI: is there any point if not? self._add_actors_for_time(0.0) return self._provider_state
def _create_actor_caches(self): self._offsets_cache = dict() self._lane_bumpers_cache = dict() for ovs in self._all_states: center = ovs.pose.point length = ovs.dimensions.length hhx, hhy = radians_to_vec(ovs.pose.heading) * (0.5 * length) back = Point(center.x - hhx, center.y - hhy) front = Point(center.x + hhx, center.y + hhy) back_lane = self.road_map.nearest_lane(back, radius=length * 0.5) front_lane = self.road_map.nearest_lane(front, radius=length * 0.5) if back_lane: back_offset = back_lane.offset_along_lane(back) lbc = self._lane_bumpers_cache.setdefault(back_lane, []) lbc.append((back_offset, ovs, 1)) if front_lane: front_offset = front_lane.offset_along_lane(front) lbc = self._lane_bumpers_cache.setdefault(front_lane, []) lbc.append((front_offset, ovs, 2)) if front_lane and back_lane != front_lane: # it's changing lanes, don't misjudge the target lane... fake_back_offset = front_lane.offset_along_lane(back) self._lane_bumpers_cache[front_lane].append((fake_back_offset, ovs, 0)) for cache in self._lane_bumpers_cache.values(): cache.sort() def _cached_lane_offset(self, vs: VehicleState, lane: RoadMap.Lane): lane_offsets = self._offsets_cache.setdefault(vs.actor_id, dict()) return lane_offsets.setdefault( lane.lane_id, lane.offset_along_lane(vs.pose.point) ) def _relinquish_actor(self, actor_state: ActorState): sim = self._sim() assert sim sim.provider_releases_actor(self, actor_state) self._logger.debug( f"{actor_state} is no longer managed by local traffic provider" ) if actor_state.actor_id in self._my_actors: del self._my_actors[actor_state.actor_id]
[docs] def step(self, actions, dt: float, elapsed_sim_time: float) -> ProviderState: sim = self._sim() assert sim with timeit("Adding actors", self._logger.debug): self._add_actors_for_time(elapsed_sim_time, dt) for other in self._other_vehicle_states: if other.actor_id in self._reserved_areas: del self._reserved_areas[other.actor_id] # precompute nearest lanes and offsets for all vehicles and cache # (this prevents having to do it O(ovs^2) times) with timeit("Generating caches", self._logger.debug): self._create_actor_caches() # Do state update in two passes so that we don't use next states in the # computations for actors encountered later in the iterator. with timeit("Computing states", self._logger.debug): for actor in self._my_actors.values(): actor.compute_next_state(dt) dones = set() losts = set() removed = set() remap_ids: Dict[str, str] = dict() with timeit("Stepping actors", self._logger.debug): for actor_id, actor in self._my_actors.items(): actor.step(dt) if actor.finished_route: dones.add(actor.actor_id) elif actor.off_route: losts.add(actor) elif actor.teleporting: # pybullet doesn't like it when a vehicle jumps from one side of the map to another, # so we need to give teleporting vehicles a new id and thus a new chassis. actor.bump_id() remap_ids[actor_id] = actor.actor_id for actor in losts - removed: removed.add(actor.actor_id) self._relinquish_actor(actor.state) for actor_id in dones - removed: actor = self._my_actors.get(actor_id) if actor: sim.provider_removing_actor(self, actor_id) # The following is not really necessary due to the above calling teardown(), # but it doesn't hurt... if actor_id in self._my_actors: del self._my_actors[actor_id] for orig_id, new_id in remap_ids.items(): self._my_actors[new_id] = self._my_actors[orig_id] del self._my_actors[orig_id] return self._provider_state
[docs] def sync(self, provider_state: ProviderState): missing = self._my_actors.keys() - { psv.actor_id for psv in provider_state.actors } for left in missing: self._logger.warning( f"locally provided actor '{left}' disappeared from simulation" ) del self._my_actors[left] hijacked = self._my_actors.keys() & { psv.actor_id for psv in provider_state.actors if psv.source != self.source_str } for jack in hijacked: self.stop_managing(jack) self._other_actors = dict() for os in provider_state.actors: my_actor = self._my_actors.get(os.actor_id) if my_actor: assert os.source == self.source_str # here we override our state with the "consensus" state... # (Note: this is different from what Sumo does; # we're allowing for true "harmonization" if necessary. # "You may say that I'm a dreamer, but I'm not the only one, # I hope that one day you'll join us, and the world will # be as one." ;) my_actor.state = os else: assert os.source != self.source_str self._other_actors[os.actor_id] = (os, None)
[docs] def reset(self): # Unify interfaces with other providers pass
[docs] def teardown(self): self._my_actors = dict() self._other_actors = dict() self._reserved_areas = dict() self._flows = dict()
[docs] def destroy(self): pass
[docs] def stop_managing(self, actor_id: str): # called when agent hijacks this vehicle self._logger.debug(f"{actor_id} is removed from local traffic management") assert ( actor_id in self._my_actors ), f"stop_managing() called for non-tracked vehicle id '{actor_id}'" del self._my_actors[actor_id]
[docs] def reserve_traffic_location_for_vehicle( self, vehicle_id: str, reserved_location, ): self._reserved_areas[vehicle_id] = reserved_location
[docs] def vehicle_collided(self, vehicle_id: str): traffic_actor = self._my_actors.get(vehicle_id) if not traffic_actor: # guess we already removed it for some other reason (off route?) return
# TAI: consider relinquishing / removing the vehicle? Like: # self._relinquish_actor(traffic_actor.state) # If collidee(s) include(s) an EgoAgent, it will likely be # marked "done" and things will end anyway. # (But this is not guaranteed depending on the done criteria that were set.) # Probably the most realistic thing we can do is leave the vehicle sitting in the road, blocking traffic! # (... and then add a "rubber-neck mode" for all nearby vehicles?! ;) # Let's do that for now, but we should also consider just removing the vehicle. # traffic_actor.stay_put()
[docs] def update_route_for_vehicle(self, vehicle_id: str, new_route: RoadMap.Route): traffic_actor = self._my_actors.get(vehicle_id) if traffic_actor: traffic_actor.update_route(new_route) return other = self._other_actors.get(vehicle_id) if other: self._other_actors[vehicle_id] = (other[0], new_route) return assert False, f"unknown vehicle_id: {vehicle_id}"
[docs] def route_for_vehicle(self, vehicle_id: str) -> Optional[RoadMap.Route]: traffic_actor = self._my_actors.get(vehicle_id) if traffic_actor: return traffic_actor.route other = self._other_actors.get(vehicle_id) if other: _, oroute = other return oroute if oroute else None assert False, f"unknown vehicle_id: {vehicle_id}"
[docs] def vehicle_dest_road(self, vehicle_id: str) -> Optional[str]: route = self.route_for_vehicle(vehicle_id) return route.road_ids[-1]
[docs] def can_accept_actor(self, state: ActorState) -> bool: # We don't accept vehicles that aren't on the road # (those should currently be removed from the simultation). return ( isinstance(state, VehicleState) and (state.role == ActorRole.Social or state.role == ActorRole.Unknown) and self.road_map.nearest_lane(state.pose.point) is not None )
[docs] def add_actor( self, provider_actor: ActorState, from_provider: Optional[Provider] = None ): assert isinstance(provider_actor, VehicleState) route = None if from_provider and isinstance(from_provider, TrafficProvider): route = from_provider.route_for_vehicle(provider_actor.actor_id) assert not route or isinstance(route, RouteWithCache) provider_actor.source = self.source_str provider_actor.role = ActorRole.Social xfrd_actor = _TrafficActor.from_state(provider_actor, self, route) self._my_actors[xfrd_actor.actor_id] = xfrd_actor if xfrd_actor.actor_id in self._other_actors: del self._other_actors[xfrd_actor.actor_id] self._logger.info( f"traffic actor {xfrd_actor.actor_id} transferred to {self.source_str}." )
def _signal_state_means_stop( self, lane: RoadMap.Lane, signal_feat: RoadMap.Feature ) -> bool: feat_state = self._other_actors.get(signal_feat.feature_id) if not feat_state: return False feat_state = feat_state[0] assert isinstance(feat_state, SignalState) return not (feat_state.state & SignalLightState.GO) def _stopped_at_features(self, actor_id: str) -> List[str]: actor = self._my_actors.get(actor_id) if actor: return actor.stopped_at_features return []
class _TrafficActor: """Simulates a vehicle managed by the LocalTrafficProvider.""" def __init__(self, flow: Dict[str, Any], owner: LocalTrafficProvider): self._logger = logging.getLogger(self.__class__.__name__) self._owner = weakref.ref(owner) self._state = None self._flow: Dict[str, Any] = flow self._vtype: Dict[str, Any] = flow["vtype"] self._route_ind: int = 0 self._done_with_route: bool = False self._off_route: bool = False self._route: RoadMap.Route = flow["route"] self._stranded: bool = False self._teleporting: bool = False self._lane = None self._offset = 0 self._lane_speed: Dict[int, Tuple[float, float]] = dict() self._lane_windows: Dict[int, _TrafficActor._LaneWindow] = dict() self._lane_win: _TrafficActor._LaneWindow = None self._target_lane_win: _TrafficActor._LaneWindow = None self._target_speed: float = 15.0 # arbitrary reasonable default self._dest_lane = None self._dest_offset = None self._wrong_way: bool = False # The default values here all attempt to match those in sstudio.sstypes, # which in turn attempt to match Sumo's defaults. self._min_space_cush = float(self._vtype.get("minGap", 2.5)) speed_factor = float(self._vtype.get("speedFactor", 1.0)) speed_dev = float(self._vtype.get("speedDev", 0.1)) self._speed_factor = random.gauss(speed_factor, speed_dev) if self._speed_factor <= 0: self._speed_factor = 0.1 # arbitrary minimum speed is 10% of speed limit self._imperfection = float(self._vtype.get("sigma", 0.5)) self._max_accel = float(self._vtype.get("accel", 2.6)) assert self._max_accel >= 0.0 self._max_decel = float(self._vtype.get("decel", 4.5)) assert self._max_decel >= 0.0 self._cutting_into = None self._cutting_in = False self._in_front_after_cutin_secs = 0 self._cutin_hold_secs = float(self._vtype.get("lcHoldPeriod", 10.0)) self._forward_after_added = 0 self._after_added_hold_secs = self._cutin_hold_secs self._target_cutin_gap = 2.5 * self._min_space_cush self._aggressiveness = float(self._vtype.get("lcAssertive", 1.0)) if self._aggressiveness <= 0: self._logger.warning( "non-positive value {self._aggressiveness} for 'assertive' lane-changing parameter will be ignored" ) self._aggressiveness = 1.0 self._cutin_prob = float(self._vtype.get("lcCutinProb", 0.0)) if not 0.0 <= self._cutin_prob <= 1.0: self._logger.warning( "illegal probability {self._cutin_prob} for 'cutin_prob' lane-changing parameter will be ignored" ) self._cutin_prob = 0.0 self._dogmatic = self._vtype.get("lcDogmatic", "False") == "True" self._cutin_slow_down = float(self._vtype.get("lcSlowDownAfter", 1.0)) if self._cutin_slow_down < 0: self._cutin_slow_down = 0 self._multi_lane_cutin = self._vtype.get("lcMultiLaneCutin", "False") == "True" self._yield_to_agents = self._vtype.get("jmYieldToAgents", "normal") self._wait_to_restart = float(self._vtype.get("jmWaitToRestart", 0.0)) self._stopped_at_feat = dict() self._waiting_at_feat = defaultdict(float) self._time_to_impatience = 3 self._current_impatience = 0 self._max_angular_velocity = 26 # arbitrary, based on limited testing self._prev_angular_err = None self._bumper_wins_front = _TrafficActor._RelWindow(5) self._bumper_wins_back = _TrafficActor._RelWindow(5) owner._actors_created += 1 @property def _impatience(self): return min(1, max(0, self._current_impatience)) @classmethod def from_flow(cls, flow: Dict[str, Any], owner: LocalTrafficProvider): """Factory to construct a _TrafficActor object from a flow dictionary.""" vclass = flow["vtype"]["vClass"] dimensions = VEHICLE_CONFIGS[vclass].dimensions vehicle_type = vclass if vclass != "passenger" else "car" new_actor = cls(flow, owner) new_actor._lane, new_actor._offset = new_actor._resolve_flow_pos( flow, "depart", dimensions ) position = new_actor._lane.from_lane_coord(RefLinePoint(s=new_actor._offset)) heading = vec_to_radians( new_actor._lane.vector_at_offset(new_actor._offset)[:2] ) init_speed = new_actor._resolve_flow_speed(flow) endless = "-endless" if flow.get("endless", False) else "" vehicle_id = f"{new_actor._vtype['id']}{endless}-{owner._actors_created}" new_actor._state = VehicleState( actor_id=vehicle_id, actor_type=vehicle_type, source=owner.source_str, role=ActorRole.Social, pose=Pose.from_center(position, Heading(heading)), dimensions=dimensions, vehicle_config_type=vclass, speed=init_speed, linear_acceleration=np.array((0.0, 0.0, 0.0)), ) new_actor._dest_lane, new_actor._dest_offset = new_actor._resolve_flow_pos( flow, "arrival", dimensions ) return new_actor @classmethod def from_state( cls, state: VehicleState, owner: LocalTrafficProvider, route: Optional[RouteWithCache], ): """Factory to construct a _TrafficActor object from an existing VehiclState object.""" cur_lane = owner.road_map.nearest_lane(state.pose.point) assert ( cur_lane ), f"LocalTrafficProvider accepted a vehicle that's off road? pos={state.pose.point}" if not route or not route.roads: # initialize with a random route, which will probably be replaced route = owner.road_map.random_route(starting_road=cur_lane.road) route.add_to_cache() flow = dict() flow["vtype"] = dict() flow["route"] = route flow["arrivalLane"] = f"{cur_lane.index}" # XXX: assumption! flow["arrivalPos"] = "max" flow["departLane"] = f"{cur_lane.index}" # XXX: assumption! flow["departPos"] = "0" flow["endless"] = "endless" in state.actor_id # use default values for everything else in flow dict(s)... new_actor = _TrafficActor(flow, owner) new_actor.state = state new_actor._lane = cur_lane new_actor._offset = cur_lane.offset_along_lane(state.pose.point) new_actor._dest_lane, new_actor._dest_offset = new_actor._resolve_flow_pos( flow, "arrival", state.dimensions ) return new_actor def _resolve_flow_pos( self, flow: Dict[str, Any], depart_arrival: str, dimensions: Dimensions ) -> Tuple[RoadMap.Lane, float]: base_err = ( f"scenario traffic specifies flow with invalid route {depart_arrival} point" ) road = ( self._route.roads[0] if depart_arrival == "depart" else self._route.roads[-1] ) assert road, "invalid route" lane_ind = max(0, min(int(flow[f"{depart_arrival}Lane"]), len(road.lanes) - 1)) lane = road.lanes[lane_ind] offset = flow[f"{depart_arrival}Pos"] if offset == "max": offset = max(lane.length - 0.5 * dimensions.length, 0) elif offset == "random": offset = random.random() * lane.length elif 0 <= float(offset) <= lane.length: offset = float(offset) else: raise Exception( f"{base_err}: starting offset {offset} invalid for road_id '{road.road_id}'." ) target_pt = lane.from_lane_coord(RefLinePoint(s=offset)) offset = lane.offset_along_lane(target_pt) return (lane, offset) def _resolve_flow_speed(self, flow: Dict[str, Any]) -> float: depart_speed = flow.get("departSpeed", 0.0) # maxSpeed's default attempts to match the one in sstudio.sstypes, # which in turn attempt to match Sumo's default. max_speed = float(self._vtype.get("maxSpeed", 55.5)) if depart_speed == "random": return random.random() * max_speed elif depart_speed == "max": return min(max_speed, self._lane.speed_limit) elif depart_speed == "speedLimit": if self._lane.speed_limit is not None: return self._lane.speed_limit else: raise Exception( f"scenario specifies departSpeed='speed_limit' but no speed limit defined for lane '{self._lane.lane_id}'." ) departSpeed = float(depart_speed) assert departSpeed >= 0 return departSpeed @property def state(self) -> VehicleState: """Returns the current VehicleState for this actor.""" return self._state @state.setter def state(self, state: VehicleState): """Sets the current VehicleState for this actor.""" self._state = state self.bbox.cache_clear() @property def actor_id(self) -> str: """A unique id identifying this actor.""" return self._state.actor_id def bump_id(self): """Changes the id of a teleporting vehicle.""" mm = re.match(r"([^_]+)(_(\d+))?$", self._state.actor_id) if mm: ver = int(mm.group(3)) if mm.group(3) else 0 self._state.actor_id = f"{mm.group(1)}_{ver + 1}" @property def route(self) -> RoadMap.Route: """The route this actor will attempt to take.""" return self._route def update_route(self, route: RoadMap.Route): """Update the route (sequence of road_ids) this actor will attempt to take. A unique route_key is provided for referencing the route cache in the owner provider. """ self._route = route if isinstance(self._route, RouteWithCache): self._route.add_to_cache() self._dest_lane, self._dest_offset = self._resolve_flow_pos( self._flow, "arrival", self._state.dimensions ) self._route_ind = 0 def stay_put(self): """Tells this actor to stop acting and remain where it is indefinitely.""" if not self._stranded: self._logger.info(f"{self.actor_id} stranded") self._stranded = True self._state.speed = 0 self._state.linear_acceleration = None @property def finished_route(self) -> bool: """Returns True if this vehicle has reached the end of its route.""" return self._done_with_route @property def off_route(self) -> bool: """Returns True if this vehicle has left its route before it got to the end.""" return self._off_route @property def wrong_way(self) -> bool: """Returns True if this vehicle is currently going the wrong way in its lane.""" return self._wrong_way @property def teleporting(self) -> bool: """Returns True if this vehicle is teleporting back to the beginning of its route on this step.""" return self._teleporting @property def lane(self) -> RoadMap.Lane: """Returns the current Lane object.""" return self._lane @property def road(self) -> RoadMap.Road: """Returns the current Road object.""" return self._lane.road @property def offset_along_lane(self) -> float: """Returns the current offset along the current Lane object.""" return self._offset @property def speed(self) -> float: """Returns the current speed.""" return self._state.speed @property def acceleration(self) -> float: """Returns the current (linear) acceleration.""" if self._state.linear_acceleration is None: return 0.0 return np.linalg.norm(self._state.linear_acceleration) @lru_cache(maxsize=2) def bbox(self, cushion_length: bool = False) -> Polygon: """Returns a bounding box around the vehicle.""" # note: lru_cache must be cleared whenever pose changes pos = self._state.pose.point dims = self._state.dimensions length_buffer = self._min_space_cush if cushion_length else 0 half_len = 0.5 * dims.length + length_buffer poly = shapely_box( pos.x - 0.5 * dims.width, pos.y - half_len, pos.x + 0.5 * dims.width, pos.y + half_len, ) return shapely_rotate(poly, self._state.pose.heading, use_radians=True) @property def stopped_at_features(self) -> List[str]: """If this vehicle is currently stopped at any features, returns their feature_ids, otherwise an empty list.""" return list(self._stopped_at_feat.keys()) class _LaneWindow: def __init__( self, lane: RoadMap.Lane, time_left: float, ttc: float, ttre: float, gap: float, lane_coord: RefLinePoint, agent_gap: Optional[float], ahead_id: Optional[str], ): self.lane = lane self.time_left = time_left self.adj_time_left = time_left # could eventually be negative self.ttc = ttc self.ttre = ttre # time until we'd get rear-ended self.gap = gap # just the gap ahead (in meters) self.lane_coord = lane_coord self.agent_gap = agent_gap self.ahead_id = ahead_id @property def drive_time(self) -> float: """The amount of time this vehicle might drive in the target lane under present conditions.""" return min(self.ttc, self.adj_time_left) @cached_property def width(self) -> float: """The width of this lane at its lane_coord.""" return self.lane.width_at_offset(self.lane_coord.s)[0] @cached_property def radius(self) -> float: """The radius of curvature of this lane at its lane_coord.""" # we round the offset in an attempt to reduce the unique hits on the LRU caches... rounded_offset = round(self.lane_coord.s) return self.lane.curvature_radius_at_offset( rounded_offset, lookahead=max(math.ceil(2 * self.width), 2) ) @lru_cache(maxsize=4) def _angle_scale(self, to_index: int, theta: float = math.pi / 6) -> float: # we need to correct for not going straight across. # other things being equal, we target ~30 degrees (sin(30)=.5) on average. if abs(self.radius) > 1e5 or self.radius == 0: return safe_division(1.0, math.sin(theta), 1e6) # here we correct for the local road curvature (which affects how far we must travel)... T = safe_division(self.radius, self.width, 1e6) # XXX: This cannot be an assertion because it could happen for map related reasons. if abs(T) <= 1.0: logging.debug( "abnormally high curvature? radius=%s, width=%s at offset %s of lane %s", self.radius, self.width, self.lane_coord.s, self.lane.lane_id, ) if to_index > self.lane.index: se = T * (T - 1) return math.sqrt( 2 * ( se + 0.5 - se * math.cos( safe_division(1, (math.tan(theta) * (T - 1)), default=0) ) ) ) se = T * (T + 1) return math.sqrt( 2 * ( se + 0.5 - se * math.cos(safe_division(1, (math.tan(theta) * (T + 1)), default=0)) ) ) def crossing_time_at_speed( self, to_index: int, speed: float, acc: float = 0.0 ) -> float: """Returns how long it would take to cross from this lane to the lane indexed by to_index given our current speed and acceleration.""" angle_scale = self._angle_scale(to_index) return time_to_cover(angle_scale * self.width, speed, acc) @lru_cache(maxsize=8) def exit_time(self, speed: float, to_index: int, acc: float = 0.0) -> float: """Returns how long it would take to drive into the to_index lane from this lane given our current speed and acceleration.""" ct = self.crossing_time_at_speed(to_index, speed, acc) t = self.lane_coord.t pm = (-1 if to_index >= self.lane.index else 1) * np.sign(t) angle_scale = self._angle_scale(to_index) return 0.5 * ct + pm * time_to_cover(angle_scale * abs(t), speed, acc) def _find_vehicle_ahead_on_route( self, lane: RoadMap.Lane, dte: float, rind: int ) -> Tuple[float, Optional[VehicleState]]: owner = self._owner() assert owner nv_ahead_dist = math.inf nv_ahead_vs = None rind += 1 for ogl in lane.outgoing_lanes: rt_oln = RoadMap.Route.RouteLane(ogl, rind) len_to_end = self._route.distance_from(rt_oln) if len_to_end is None: continue lbc = owner._lane_bumpers_cache.get(ogl) if lbc: fi = 0 while fi < len(lbc): ogl_offset, ogl_vs, _ = lbc[fi] if ogl_vs.actor_id != self.actor_id: break fi += 1 if fi == len(lbc): continue ogl_dist = dte - (len_to_end - ogl_offset) if ogl_dist < nv_ahead_dist: nv_ahead_dist = ogl_dist nv_ahead_vs = ogl_vs continue ogl_dist, ogl_vs = self._find_vehicle_ahead_on_route(ogl, dte, rind) if ogl_dist < nv_ahead_dist: nv_ahead_dist = ogl_dist nv_ahead_vs = ogl_vs return nv_ahead_dist, nv_ahead_vs def _find_vehicle_ahead( self, lane: RoadMap.Lane, my_offset: float, search_start: float ) -> Tuple[float, Optional[VehicleState]]: owner = self._owner() assert owner lbc = owner._lane_bumpers_cache.get(lane) if lbc: lane_spot = bisect_right(lbc, (search_start, self._state, 3)) # if we're at an angle to the lane, it's possible for the # first thing we hit to be our own entries in the bumpers cache, # which we need to skip. while lane_spot < len(lbc) and self.actor_id == lbc[lane_spot][1].actor_id: lane_spot += 1 if lane_spot < len(lbc): lane_offset, nvs, _ = lbc[lane_spot] assert lane_offset >= search_start if lane_offset > my_offset: return lane_offset - my_offset, nvs return 0, nvs rt_ln = RoadMap.Route.RouteLane(lane, self._route_ind) route_len = self._route.distance_from(rt_ln) or lane.length my_dist_to_end = route_len - my_offset return self._find_vehicle_ahead_on_route(lane, my_dist_to_end, self._route_ind) def _find_vehicle_behind( self, lane: RoadMap.Lane, my_offset: float, search_start: float ) -> Tuple[float, Optional[VehicleState]]: owner = self._owner() assert owner lbc = owner._lane_bumpers_cache.get(lane) if lbc: lane_spot = bisect_left(lbc, (search_start, self._state, -1)) # if we're at an angle to the lane, it's possible for the # first thing we hit to be our own entries in the bumpers cache, # which we need to skip. while lane_spot > 0 and self.actor_id == lbc[lane_spot - 1][1].actor_id: lane_spot -= 1 if lane_spot > 0: lane_offset, bv_vs, _ = lbc[lane_spot - 1] assert lane_offset <= search_start if lane_offset < my_offset: return my_offset - lane_offset, bv_vs return 0, bv_vs def find_last(ll: RoadMap.Lane) -> Tuple[float, Optional[VehicleState]]: owner = self._owner() assert owner plbc = owner._lane_bumpers_cache.get(ll) if not plbc: return math.inf, None for bv_offset, bv_vs, _ in reversed(plbc): if bv_vs.actor_id != self.actor_id: return bv_offset, bv_vs return math.inf, None # look back until split entry or vehicle found min_to_look = 100 rind = self._route_ind - 1 dist_looked = my_offset while len(lane.incoming_lanes) == 1 and rind >= 0: lane = lane.incoming_lanes[0] rind -= 1 dist_looked += lane.length bv_offset, bv_vs = find_last(lane) if bv_vs: return dist_looked - bv_offset, bv_vs if rind == 0 or dist_looked > min_to_look: return math.inf, None # we hit a split without looking very far... # so go down each split branch until looked 100m back def _backtrack_to_len(ll, looked, rind) -> Tuple[float, Optional[VehicleState]]: if rind < 0 or looked >= min_to_look: return math.inf, None best_offset = math.inf best_vs = None for inl in ll.incoming_lanes: il_offset, il_vs = find_last(inl) if il_vs: return looked + inl.length - il_offset, il_vs il_offset, il_vs = _backtrack_to_len(inl, looked + inl.length, rind - 1) if il_offset < best_offset: best_offset = il_offset best_vs = il_vs return best_offset, best_vs return _backtrack_to_len(lane, dist_looked, rind) def _compute_lane_window(self, lane: RoadMap.Lane): lane_coord = lane.to_lane_coord(self._state.pose.point) my_offset = lane_coord.s my_speed, my_acc = self._lane_speed[lane.index] rt_ln = RoadMap.Route.RouteLane(lane, self._route_ind) path_len = self._route.distance_from(rt_ln) or lane.length path_len -= my_offset lane_time_left = safe_division(path_len, self.speed) half_len = 0.5 * self._state.dimensions.length front_bumper = my_offset + half_len back_bumper = my_offset - half_len ahead_dist, nv_vs = self._find_vehicle_ahead(lane, front_bumper, back_bumper) if nv_vs: ahead_dist -= self._min_space_cush ahead_dist = max(0, ahead_dist) speed_delta = my_speed - nv_vs.speed acc_delta = my_acc if nv_vs.linear_acceleration is not None: acc_delta -= np.linalg.norm(nv_vs.linear_acceleration) lane_ttc = max(time_to_cover(ahead_dist, speed_delta, acc_delta), 0) else: lane_ttc = math.inf behind_dist, bv_vs = self._find_vehicle_behind(lane, back_bumper, front_bumper) if bv_vs: behind_dist -= self._min_space_cush behind_dist = max(0, behind_dist) speed_delta = my_speed - bv_vs.speed acc_delta = my_acc if bv_vs.linear_acceleration is not None: acc_delta -= np.linalg.norm(bv_vs.linear_acceleration) lane_ttre = max(time_to_cover(behind_dist, -speed_delta, -acc_delta), 0) else: lane_ttre = math.inf self._lane_windows[lane.index] = _TrafficActor._LaneWindow( lane, lane_time_left, lane_ttc, lane_ttre, ahead_dist, lane_coord, behind_dist if bv_vs and bv_vs.role == ActorRole.EgoAgent else None, nv_vs.actor_id if nv_vs else None, ) def _compute_lane_windows(self): self._lane_windows = dict() for lane in self.road.lanes: self._compute_lane_window(lane) for index, lw in self._lane_windows.items(): lw.adj_time_left -= self._crossing_time_into(index)[0] def _crossing_time_into(self, target_idx: int) -> Tuple[float, bool]: my_idx = self._lane.index if my_idx == target_idx: return 0.0, True accel = self.acceleration max_speed = ( self._lane_windows[target_idx].lane.speed_limit * self._speed_factor ) or 1e-13 if self.speed < max_speed: # using my current acceleration is too conservative b/c I could # accelerate to a new speed as I change lanes (for example, # if moving from a stopped lane of traffic to a new one). bumped_accel = self._max_accel * (1.0 - self.speed / max_speed) accel = max(bumped_accel, accel, self._max_accel) min_idx = min(target_idx, my_idx + 1) max_idx = max(target_idx + 1, my_idx) cross_time = self._lane_windows[my_idx].exit_time(self.speed, target_idx, accel) for i in range(min_idx, max_idx): lw = self._lane_windows[i] lct = lw.crossing_time_at_speed(target_idx, self.speed, accel) if i == target_idx: lct *= 0.75 cross_time += lct # note: we *could* be more clever and use cross_time for each lane separately # to try to thread our way through the gaps independently... nah. for i in range(min_idx, max_idx): lw = self._lane_windows[i] if min(lw.ttc, lw.time_left, lw.ttre) <= cross_time: return cross_time, False return cross_time, True def _should_cutin(self, lw: _LaneWindow) -> bool: target_ind = lw.lane.index if target_ind == self._lane.index: return False if not self._multi_lane_cutin and abs(target_ind - self._lane.index) > 1: return False if not self._dogmatic and lw.time_left < stopping_time( self.speed, self._max_decel ): return False min_gap = safe_division( self._target_cutin_gap, self._aggressiveness, default=1e5 ) max_gap = self._target_cutin_gap + 2 if min_gap < lw.agent_gap < max_gap and self._crossing_time_into(target_ind)[1]: return random.random() < self._cutin_prob return False def _pick_lane(self, dt: float): # first, survey the state of the road around me self._compute_lane_windows() my_idx = self._lane.index self._lane_win = self._lane_windows[my_idx] # Try to find the best among available lanes... best_lw = self._lane_windows[my_idx] # Default current lane then check right lanes then left lanes. lanes_to_right = list(range(0, my_idx))[::-1] lanes_to_left = list( range(min(my_idx + 1, len(self._lane_windows)), len(self._lane_windows)) ) checks = lanes_to_right + lanes_to_left # hold lane for some time if added recently if self._forward_after_added < self._after_added_hold_secs: self._forward_after_added += dt # skip checks checks = [] ## TODO: Determine how blocked lane changes should be addressed ## Idea is to keep lane if blocked on right, slow down if blocked on left # if doing_cut_in and blocked_from_cut_in: # # blocked on the right so pick a closer lane until cutin lane is available # if blocked_on_right: # # exclude blocked lane from lane checks # # if blocked_on_left: # # do nothing for now and wait for idx in checks: lw = self._lane_windows[idx] # skip lanes I can't drive in (e.g., bike lanes on waymo maps) if not lw.lane.is_drivable: continue # if I can't safely reach the lane, don't consider it change_time = 0 if abs(idx - my_idx) > 1: change_time, can_cross = self._crossing_time_into(idx) if not can_cross: continue min_time_cush = float(self._vtype.get("tau", 1.0)) neighbor_lane_bias = ( 0.1 * change_time * (1 if abs(self._lane.index - idx) == 1 else 0) ) will_rearend = lw.ttc + neighbor_lane_bias < min_time_cush # if my route destination is available, prefer that if ( lw.lane == self._dest_lane and lw.lane_coord.s + lw.gap >= self._dest_offset ): # TAI: speed up or slow down as appropriate if _crossing_time_into() was False best_lw = lw if not will_rearend and not self._dogmatic: break cut_in_is_real_lane = self._cutting_into and self._cutting_into.index < len( self._lane_windows ) # if I'm in the process of changing lanes, continue (unless it's no longer safe) if ( cut_in_is_real_lane and self._crossing_time_into(self._cutting_into.index)[1] and not will_rearend ): best_lw = self._lane_windows[self._cutting_into.index] if self._cutting_into != self._lane: break # so I'm finally in the target cut-in lane, but now I gotta # stay there a bit to not appear indecisive self._in_front_after_cutin_secs += dt if self._in_front_after_cutin_secs < self._cutin_hold_secs: break self._cutting_into = None self._cutting_in = False self._in_front_secs = 0 # don't change lanes in junctions, except for the above reasons # (since it makes collision avoidance harder for everyone!) if lw.lane.in_junction: continue if change_time < lw.time_left: # also don't change lanes if we can't *finish* before entering a junction # (unless our lane is ending) rl = RoadMap.Route.RouteLane(lw.lane, self._route_ind) owner = self._owner() assert owner l_offset = owner._cached_lane_offset(self._state, lw.lane) _, nj_dist = self._route.next_junction(rl, l_offset) if change_time > time_to_cover(nj_dist, self.speed, self.acceleration): continue # if there's an agent behind me, possibly cut in on it if lw.agent_gap and self._should_cutin(lw): best_lw = lw self._cutting_into = lw.lane self._cutting_in = True continue longer_drive_time = lw.drive_time > best_lw.drive_time equal_drive_time = lw.drive_time == best_lw.drive_time is_destination_lane = lw.lane == self._dest_lane highest_ttre = lw.ttre >= best_lw.ttre right_of_current_lw = idx < self._lane_win.lane.index # otherwise, keep track of the remaining options and eventually # pick the lane with the longest available driving time on my route # or, in the case of ties, the right-most lane (assuming I'm not # cutting anyone off to get there). if equal_drive_time and not will_rearend: if is_destination_lane and self._offset < self._dest_offset: best_lw = lw if highest_ttre and right_of_current_lw: best_lw = lw if longer_drive_time: best_lw = lw if will_rearend and lw.ttc > best_lw.ttc: best_lw = lw # keep track of the fact I'm changing lanes for next step # so I don't swerve back and forth indecesively if best_lw.lane != self._lane and not self._cutting_into: self._cutting_into = best_lw.lane self._target_lane_win = best_lw def _compute_lane_speeds(self): owner = self._owner() assert owner def _get_radius(lane: RoadMap.Lane) -> float: l_offset = owner._cached_lane_offset(self._state, lane) # we round the offset in an attempt to reduce the unique hits on the LRU caches... l_offset = round(l_offset) l_width, _ = lane.width_at_offset(l_offset) return lane.curvature_radius_at_offset( l_offset, lookahead=max(math.ceil(2 * l_width), 2) ) self._lane_speed = dict() my_radius = _get_radius(self._lane) for l in self._lane.road.lanes: ratio = 1.0 if l != self._lane and abs(my_radius) < 1e5: l_radius = _get_radius(l) if abs(l_radius) < 1e5: ratio = safe_division(l_radius, my_radius, default=0) if ratio < 0: ratio = 1.0 self._lane_speed[l.index] = (ratio * self.speed, ratio * self.acceleration) def _slow_for_curves(self): # XXX: this may be too expensive. if so, we'll need to precompute curvy spots for routes lookahead = math.ceil(1 + math.log(self._target_speed)) lookahead = max(1, lookahead) # we round the offset in an attempt to reduce the unique hits on the LRU caches... rounded_offset = round(self._offset) radius = self._lane.curvature_radius_at_offset(rounded_offset, lookahead) # pi/2 radian right turn == 6 m/s with radius 10.5 m # TODO: also depends on vehicle type (traction, length, etc.) self._target_speed = min(abs(radius) * 0.5714, self._target_speed) class _RelWindow: @dataclass class _RelativeVehInfo: dist: float bearing: float heading: float dt: float def __init__(self, width: int = 5): self._width = width self._junction_foes: Dict[ Tuple[str, int], Deque[_TrafficActor._RelWindow._RelativeVehInfo] ] = dict() def add_to_win( self, veh_id: str, bumper: int, fv_pos: np.ndarray, my_pos: np.ndarray, my_heading: float, dt: float, ) -> Tuple[float, float]: """Add a relative observation of veh_id over the last dt secs.""" bvec = fv_pos - my_pos fv_range = np.linalg.norm(bvec) rel_bearing = min_angles_difference_signed(vec_to_radians(bvec), my_heading) fv_win = self._junction_foes.setdefault( (veh_id, bumper), deque(maxlen=self._width) ) fv_win.append(self._RelativeVehInfo(fv_range, rel_bearing, my_heading, dt)) return rel_bearing, fv_range def predict_crash_in(self, veh_id: str, bumper: int) -> float: """Estimate if and when I might collide with veh_id using the constant bearing, decreasing range (CBDR) techique, but attempting to correct for non-linearities.""" # CBDR is a good approximation at distance over time with # both vehicles behaving "steadily" (ideally, driving # straight at a constant speed). It breaks down, thankfully # in preictable ways, if these assumptions are violated. # # Range Dependency: # As range gets small, bearing can fluctuate more, but will also # tend to start to change steadily when on a collision course. # To overcome this here, we just make our "CB" computation # depend on range. # # When range is on the order of car length(s), the following might eventually # help with evasive maneuvers: # - for side impacts to me, the absolute value of the rel_bearing from my front bumper # will increase monotonically (and rapidly) just before impact, meanwhile the value # from my back bumper will decrease analogously. # - for side impacts to them, the rel_bearing to their front bumper from mine will change # signs just before impact, while abs(rel_bearing) to their back bumper will dramatically # reduce. # # Heading Changes: # - If we turn, we need to consider how this will change rel_bearing from our perspective. # Adding to the heading will increase apparent rel_bearing, but this instaneous change # should not count against CBDR, so we correct for it here. # - If they turn, we don't need to do anything (CBDR as implemented here still works # to predict collisions). # # (Relative) Acceleration: # - if there is a difference in our accelerations, rel_bearing for a collision course # will change quadratically in a complicated way. We don't try to mitigate this. window = self._junction_foes[(veh_id, bumper)] if len(window) <= 1: return math.inf prev_range, prev_bearing, prev_heading = None, None, None range_del = 0.0 bearing_del = 0.0 for rvi in window: if prev_range is not None: range_del += (rvi.dist - prev_range) / rvi.dt if prev_bearing is not None: bearing_del += ( min_angles_difference_signed(rvi.bearing, prev_bearing) + min_angles_difference_signed(rvi.heading, prev_heading) ) / rvi.dt prev_range = rvi.dist prev_bearing = rvi.bearing prev_heading = rvi.heading range_del /= len(window) - 1 bearing_del /= len(window) - 1 final_range = window[-1].dist # the exponent here was determined by trial and error if range_del < 0 and abs(bearing_del) < safe_division( math.pi, final_range**1.4 ): return safe_division(-final_range, range_del) return math.inf def purge_unseen(self, seen: Set[str]): """Remove unseen vehicle IDs from this cache.""" self._junction_foes = { (sv, bumper): self._junction_foes[(sv, bumper)] for sv in seen for bumper in (False, True) if (sv, bumper) in self._junction_foes } @staticmethod @lru_cache(maxsize=32) def _turn_angle(junction: RoadMap.Lane, approach_index: int) -> float: # TAI: consider moving this into RoadMap.Lane if junction.outgoing_lanes: next_lane = junction.outgoing_lanes[0] nlv = next_lane.vector_at_offset(0.5 * next_lane.length) else: nlv = junction.vector_at_offset(junction.length) nla = vec_to_radians(nlv[:2]) mli = min(approach_index, len(junction.incoming_lanes) - 1) if mli >= 0: prev_lane = junction.incoming_lanes[mli] plv = prev_lane.vector_at_offset(prev_lane.length - 1) else: plv = junction.vector_at_offset(0) pla = vec_to_radians(plv[:2]) return min_angles_difference_signed(nla, pla) def _higher_priority( self, junction: RoadMap.Lane, dist_to_junction: float, traffic_lane: RoadMap.Lane, traffic_veh: VehicleState, bearing: float, foe: RoadMap.Lane, ) -> bool: owner = self._owner() assert owner # take into account TLS (don't yield to TL-stopped vehicles) # XXX: we currently only determine this for actors we're controlling owner = self._owner() assert owner if traffic_veh.source == owner.source_str and owner._stopped_at_features( traffic_veh.actor_id ): return True # Smith vs. Neo if traffic_veh.role in (ActorRole.EgoAgent, ActorRole.SocialAgent): if self._yield_to_agents == "never": return True elif self._yield_to_agents == "always": return False assert ( self._yield_to_agents == "normal" ), f"unknown yield_to_agents value: {self._yield_to_agents}" # if already blocking the foes's path then don't yield if dist_to_junction <= 0 and self._lane == junction: def _in_lane(lane): for _, vs, _ in owner._lane_bumpers_cache.get(lane, []): if vs.actor_id == self.actor_id: return True return False if _in_lane(traffic_lane): return True if traffic_lane != junction: # XXX: might need to go deeper (but don't know their route)... for ogtl in traffic_lane.outgoing_lanes: if _in_lane(ogtl): return True # Straight > Right > Left priority turn_thresh = 0.166 * math.pi my_ta = self._turn_angle(junction, self._lane.index) their_ta = self._turn_angle(foe, traffic_lane.index) if my_ta >= turn_thresh and their_ta < turn_thresh: # me left, them not left return False if abs(my_ta) < turn_thresh and abs(their_ta) >= turn_thresh: # me straight, them turning return True if my_ta <= -turn_thresh: # me right if their_ta >= turn_thresh: # them left return True if abs(their_ta) < turn_thresh: # them straight return False # Major over minor roads my_lanes = len(self._lane.road.lanes) their_lanes = len(traffic_lane.road.lanes) if my_lanes > their_lanes: return True # Vehicle to the right (couter clockwise) elif my_lanes == their_lanes and bearing > 0: return True return False def _backtrack_until_long_enough( self, result: Set[RoadMap.Lane], lane: RoadMap.Lane, cur_length: float, min_length: float, ): result.add(lane) cur_length += lane.length if cur_length < min_length: for il in lane.incoming_lanes: if not il.is_drivable: continue self._backtrack_until_long_enough(result, il, cur_length, min_length) def _handle_junctions(self, dt: float, window: int = 5, max_range: float = 100.0): assert max_range > 0, "Max range is used as denominator" rl = RoadMap.Route.RouteLane(self._target_lane_win.lane, self._route_ind) owner = self._owner() assert owner l_offset = owner._cached_lane_offset(self._state, self._target_lane_win.lane) njl, nj_dist = self._route.next_junction(rl, l_offset) if njl is None or nj_dist > max_range: return updated = set() my_pos = self._state.pose.point.as_np_array[:2] my_heading = self._state.pose.heading half_len = 0.5 * self._state.dimensions.length hv = radians_to_vec(my_heading) my_front = my_pos + half_len * hv my_back = my_pos - half_len * hv min_range = max_range for foe in njl.foes: check_lanes = set() self._backtrack_until_long_enough(check_lanes, foe, 0, max_range) assert check_lanes for check_lane in check_lanes: if check_lane == self._lane: continue handled = set() lbc = owner._lane_bumpers_cache.get(check_lane, []) for offset, fv, bumper in lbc: if fv.actor_id == self.actor_id: continue # vehicles can be in the bumpers_cache more than once # (for their front and back bumpers, and b/c they may straddle lanes). # We need to check both bumpers for collisions here as (especially # for longer vehicles) the answer can come out differently. if bumper == 0: # don't worry about the "fake back bumper" continue fv_pos = check_lane.from_lane_coord( RefLinePoint(offset) ).as_np_array[:2] f_rb, f_rng = self._bumper_wins_front.add_to_win( fv.actor_id, bumper, fv_pos, my_front, my_heading, dt ) b_rb, b_rng = self._bumper_wins_back.add_to_win( fv.actor_id, bumper, fv_pos, my_back, my_heading, dt ) updated.add(fv.actor_id) # we will only do something if the potential collider is "ahead" of us... if min(abs(f_rb), abs(b_rb)) >= 0.45 * math.pi: continue est_front_crash = ( self._bumper_wins_front.predict_crash_in(fv.actor_id, bumper) if f_rng < max_range else math.inf ) est_back_crash = ( self._bumper_wins_back.predict_crash_in(fv.actor_id, bumper) if b_rng < max_range else math.inf ) if est_front_crash <= est_back_crash: est_crash = est_front_crash rel_bearing = f_rb fv_range = f_rng else: est_crash = est_back_crash rel_bearing = b_rb fv_range = b_rng if est_crash > 60.0: # ignore future crash if after arbitrarily-high threshold of 1 min... continue if fv.actor_id not in handled and not self._higher_priority( njl, nj_dist, check_lane, fv, rel_bearing, foe ): self._logger.debug( f"{self.actor_id} may slow down to avoid collision @ {njl.lane_id} with {fv.actor_id} currently in {check_lane.lane_id}" ) rng = nj_dist if nj_dist > 0 else fv_range if rng < min_range: min_range = rng handled.add(fv.actor_id) if check_lane == foe: # we've already picked our lane, but we still update our gaps and ttc # because these are also used by the acceleration PID controller. self._target_lane_win.ttc = min( est_crash, self._target_lane_win.ttc ) crash_dist = distance_covered( est_crash, self.speed, self.acceleration ) self._target_lane_win.gap = min( crash_dist, self._target_lane_win.gap ) self._bumper_wins_front.purge_unseen(updated) self._bumper_wins_back.purge_unseen(updated) self._target_speed *= math.pow(min_range / max_range, 0.75) def _find_features_ahead( self, rl: RoadMap.Route.RouteLane, lookahead: float, upcoming_feats: List[RoadMap.Feature], ): lane = rl.lane lookahead -= lane.length if lane != self._lane: upcoming_feats += lane.features else: lookahead += self._offset # make sure the feature is not behind me... half_len = 0.5 * self._state.dimensions.length my_bb = self._offset - half_len for feat in lane.features: for pt in feat.geometry: if lane.offset_along_lane(pt) >= my_bb: upcoming_feats.append(feat) break if lookahead <= 0: return nri = rl.road_index + 1 if nri >= len(self._route.roads): return for ogl in lane.outgoing_lanes: if ogl.road == self._route.roads[nri]: nrl = RoadMap.Route.RouteLane(ogl, nri) self._find_features_ahead(nrl, lookahead, upcoming_feats) def _handle_features_and_signals(self, dt: float): my_stopping_dist = stopping_distance(self.speed, self._max_decel) lookahead = 2 * my_stopping_dist upcoming_feats = [] rl = RoadMap.Route.RouteLane(self._lane, self._route_ind) self._find_features_ahead(rl, lookahead, upcoming_feats) # TODO: check non-fixed-location signals here too if not upcoming_feats: return for feat in upcoming_feats: if feat.type == RoadMap.FeatureType.SPEED_BUMP: self._target_speed *= 0.5 continue if feat.type == RoadMap.FeatureType.STOP_SIGN: def dist_to_stop(): dist_to_sign = feat.min_dist_from(self._state.pose.point) lw, _ = self._lane.width_at_offset(self._offset) hdist_to_sign = 0.5 * lw + 2 if hdist_to_sign >= dist_to_sign: return 0.0 dts = math.sqrt(dist_to_sign**2 - hdist_to_sign**2) dts -= 0.5 * self._state.dimensions.length return max(dts, 0.0) if feat.feature_id not in self._stopped_at_feat: dts = dist_to_stop() if self.speed < 0.1 and dts <= self._min_space_cush: self._stopped_at_feat[feat.feature_id] = True self._lane_win.gap = min(dts, self._lane_win.gap) elif self._stopped_at_feat[feat.feature_id]: self._waiting_at_feat[feat.feature_id] += dt if self._waiting_at_feat[feat.feature_id] > self._wait_to_restart: del self._waiting_at_feat[feat.feature_id] self._stopped_at_feat[feat.feature_id] = False else: self._lane_win.gap = min(dist_to_stop(), self._lane_win.gap) continue if feat.is_dynamic: owner = self._owner() assert owner should_stop = owner._signal_state_means_stop(self._lane, feat) if should_stop and self.speed == 0.0: self._stopped_at_feat[feat.feature_id] = True elif not should_stop and self._stopped_at_feat.get( feat.feature_id, False ): del self._stopped_at_feat[feat.feature_id] self._waiting_at_feat[feat.feature_id] = 0.0 if feat.feature_id in self._waiting_at_feat: if self._waiting_at_feat[feat.feature_id] > self._wait_to_restart: del self._waiting_at_feat[feat.feature_id] continue self._waiting_at_feat[feat.feature_id] += dt if should_stop or feat.feature_id in self._waiting_at_feat: # TAI: could decline to do this if my_stopping_dist >> dist_to_stop dist_to_stop = feat.min_dist_from(self._state.pose.point) self._lane_win.gap = min(dist_to_stop, self._lane_win.gap) def _check_speed(self, dt: float): target_lane = self._target_lane_win.lane if target_lane.speed_limit is None: self._target_speed = self.speed # XXX: on a road like this, we can only slow down # (due to curves and traffic) but we never speed back up! # TAI: setting the target_speed to some fixed default instead? else: self._target_speed = target_lane.speed_limit self._target_speed *= self._speed_factor if self._cutting_in: self._target_speed *= self._cutin_slow_down self._slow_for_curves() if self._target_speed > 0: self._handle_features_and_signals(dt) if self._target_speed > 0: self._handle_junctions(dt) if self._target_speed > 0: max_speed = float(self._vtype.get("maxSpeed", 55.55)) if self._target_speed >= max_speed: self._target_speed = max_speed def _angle_to_lane(self, dt: float) -> float: my_heading = self._state.pose.heading look_ahead = max(dt * self.speed, 2) proj_pt = self._state.pose.position[:2] + look_ahead * radians_to_vec( my_heading ) proj_pt = Point(*proj_pt) target_lane = self._target_lane_win.lane lane_coord = target_lane.to_lane_coord(proj_pt) lat_err = lane_coord.t target_vec = target_lane.vector_at_offset(lane_coord.s) target_heading = vec_to_radians(target_vec[:2]) heading_delta = min_angles_difference_signed(target_heading, my_heading) self._wrong_way = abs(heading_delta) > 0.5 * math.pi if self._wrong_way: self._logger.warning( f"{self.actor_id} going the wrong way for {target_lane.lane_id}. heading_delta={heading_delta}." ) # slow down so it's easier to turn around self._target_speed *= 0.67 # Here we may also want to take into account speed, accel, inertia, etc. # and maybe even self._aggressiveness... # magic numbers here were just what looked reasonable in limited testing angular_velocity = 3.75 * heading_delta - 1.25 * lat_err if not self._wrong_way: # add some damping... # but only if we're not going the wrong way already! (if we are, try to correct quickly.) damping = heading_delta * lat_err if damping < 0: angular_velocity += 2.2 * np.sign(angular_velocity) * damping if self._prev_angular_err is not None: angular_velocity -= 0.2 * (heading_delta - self._prev_angular_err[0]) angular_velocity += 0.2 * (lat_err - self._prev_angular_err[1]) if abs(angular_velocity) > dt * self._max_angular_velocity: angular_velocity = ( np.sign(angular_velocity) * dt * self._max_angular_velocity ) # also slow down... self._target_speed *= 0.8 angular_velocity += 0.02 * self._imperfection * (random.random() - 0.5) self._prev_angular_err = (heading_delta, lat_err) return angular_velocity def _near_dest(self, within: float = 0) -> bool: if self._lane.road != self._dest_lane.road: return False dest_lane_offset = self._dest_lane.offset_along_lane(self._state.pose.point) dist_left = self._dest_offset - dest_lane_offset return dist_left <= within def _compute_acceleration(self, dt: float) -> float: emergency_decl = float(self._vtype.get("emergencyDecel", 4.5)) assert emergency_decl >= 0.0 speed_denom = self.speed # usually target_lane == lane, in which case the later terms here are redundant. # when they are different, we still care about lane because we are exiting it. # Rather than recompute the exit time from our current t-coord, we just # use twice the current lane time left as a short-cut. time_cush = max( min( self._target_lane_win.ttc, safe_division(self._target_lane_win.gap, speed_denom), self._target_lane_win.time_left, self._lane_win.ttc, safe_division(self._lane_win.gap, speed_denom), 2 * self._lane_win.time_left, ), 1e-13, ) tau = float(self._vtype.get("tau", 1.0)) min_time_cush = tau - tau * self._impatience if ( not self._near_dest(min_time_cush * speed_denom) and time_cush < min_time_cush ): if self.speed > 0: severity = 4 * safe_division((min_time_cush - time_cush), min_time_cush) return -emergency_decl * np.clip(severity, 0, 1.0) return 0 space_cush = max(min(self._target_lane_win.gap, self._lane_win.gap), 1e-13) if space_cush < self._min_space_cush - self._min_space_cush * self._impatience: if self.speed > 0: severity = 4 * safe_division( (self._min_space_cush - space_cush), self._min_space_cush ) return -emergency_decl * np.clip(severity, 0, 1.0) return 0 my_speed, my_acc = self._lane_speed[self._target_lane_win.lane.index] # magic numbers / weights here were just what looked reasonable in limited testing P = 0.0060 * (self._target_speed - my_speed) I = (-0.0150 / space_cush + -0.0333 / time_cush) * (1 - self._impatience) D = -0.0010 * my_acc PID = (P + I + D) / dt PID += 0.02 * self._imperfection * (random.random() - 0.5) PID = np.clip(PID, -1.0, 1.0) if PID > 0: return PID * self._max_accel return PID * self._max_decel def compute_next_state(self, dt: float): """Pre-computes the next state for this traffic actor.""" self._compute_lane_speeds() if math.isclose(self.speed, 0, abs_tol=1.5): self._current_impatience = min( MAX_IMPATIENCE, self._current_impatience + dt / self._time_to_impatience ) else: self._current_impatience = max( 0, self._current_impatience - dt / self._time_to_impatience ) self._pick_lane(dt) self._check_speed(dt) angular_velocity = self._angle_to_lane(dt) acceleration = self._compute_acceleration(dt) target_heading = self._state.pose.heading + angular_velocity * dt target_heading %= 2 * math.pi heading_vec = radians_to_vec(target_heading) self._next_linear_acceleration = dt * acceleration * heading_vec self._next_speed = self._state.speed + acceleration * dt if self._next_speed < 0: # don't go backwards self._next_speed = 0 dpos = heading_vec * self.speed * dt target_pos = self._state.pose.position + np.append(dpos, 0.0) self._next_pose = Pose.from_center(target_pos, Heading(target_heading)) def step(self, dt: float): """Updates to the pre-computed next state for this traffic actor.""" if self._stranded: return self._teleporting = False self._state.pose = self._next_pose self._state.speed = self._next_speed self._state.linear_acceleration = self._next_linear_acceleration self._state.updated = False prev_road_id = self._lane.road.road_id self.bbox.cache_clear() # if there's more than one lane near us (like in a junction) pick closest one that's in our route owner = self._owner() assert owner nls = owner.road_map.nearest_lanes( self._next_pose.point, radius=self._state.dimensions.length, include_junctions=True, ) if not nls: self._logger.warning( f"actor {self.actor_id} out-of-lane: {self._next_pose}" ) self._off_route = True if self._flow["endless"]: self._reroute() return self._lane = None best_ri_delta = None next_route_ind = None for nl, d in nls: try: next_route_ind = self._route.road_ids.index( nl.road.road_id, self._route_ind ) self._off_route = False ri_delta = next_route_ind - self._route_ind if ri_delta <= 1: self._lane = nl best_ri_delta = ri_delta break if best_ri_delta is None or ri_delta < best_ri_delta: self._lane = nl best_ri_delta = ri_delta except ValueError: pass if not self._lane: self._off_route = True self._lane = nls[0][0] self._logger.info( f"actor {self.actor_id} is off of its route. now in lane {self._lane.lane_id}." ) road_id = self._lane.road.road_id if road_id != prev_road_id: self._route_ind += 1 if best_ri_delta is None else best_ri_delta self._offset = self._lane.offset_along_lane(self._next_pose.point) if self._near_dest(): if self._lane == self._dest_lane: if self._flow["endless"]: self._reroute() else: self._done_with_route = True else: self._off_route = True self._logger.info( f"actor {self.actor_id} is beyond end of route in wrong lane ({self._lane.lane_id} instead of {self._dest_lane.lane_id})." ) def _reroute(self): if not self._off_route and self._route.road_ids[0] in { oid.road_id for oid in self._lane.road.outgoing_roads }: self._route_ind = -1 self._logger.debug( f"{self.actor_id} will loop around to beginning of its route" ) return self._logger.info(f"{self.actor_id} teleporting back to beginning of its route") self._teleporting = True self._lane, self._offset = self._resolve_flow_pos( self._flow, "depart", self._state.dimensions ) position = self._lane.from_lane_coord(RefLinePoint(s=self._offset)) heading = vec_to_radians(self._lane.vector_at_offset(self._offset)[:2]) self._state.pose = Pose.from_center(position, Heading(heading)) start_speed = self._resolve_flow_speed(self._flow) if start_speed > 0: self._state.speed = start_speed self._state.linear_acceleration = np.array((0.0, 0.0, 0.0)) self._state.updated = False self._route_ind = 0 owner = self._owner() assert owner if not owner._check_actor_bbox(self): self._logger.info( f"{self.actor_id} could not teleport back to beginning of its route b/c it was already occupied; just removing it." ) self._done_with_route = True elif self._off_route: self._off_route = False