Source code for smarts.core.opendrive_road_network

# Copyright (C) 2022. Huawei Technologies Co., Ltd. All rights reserved.
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NON-INFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.

import heapq
import logging
import math
import os
import random
import time
from bisect import bisect
from dataclasses import dataclass
from functools import cached_property, lru_cache
from pathlib import Path
from typing import Dict, Generator, List, Optional, Sequence, Set, Tuple

import numpy as np

from smarts.core.utils.glb import make_map_glb, make_road_line_glb

# pytype: disable=import-error

try:
    import rtree
    from lxml import etree
    from opendrive2lanelet.opendriveparser.elements.geometry import Line as LineGeometry
    from opendrive2lanelet.opendriveparser.elements.junction import (
        Connection as ConnectionElement,
    )
    from opendrive2lanelet.opendriveparser.elements.junction import (
        Junction as JunctionElement,
    )
    from opendrive2lanelet.opendriveparser.elements.opendrive import (
        OpenDrive as OpenDriveElement,
    )
    from opendrive2lanelet.opendriveparser.elements.road import Road as RoadElement
    from opendrive2lanelet.opendriveparser.elements.roadLanes import Lane as LaneElement
    from opendrive2lanelet.opendriveparser.elements.roadLanes import (
        LaneOffset as LaneOffsetElement,
    )
    from opendrive2lanelet.opendriveparser.elements.roadLanes import (
        LaneSection as LaneSectionElement,
    )
    from opendrive2lanelet.opendriveparser.elements.roadLanes import (
        LaneWidth as LaneWidthElement,
    )
    from opendrive2lanelet.opendriveparser.elements.roadPlanView import (
        PlanView as PlanViewElement,
    )
    from opendrive2lanelet.opendriveparser.parser import parse_opendrive
except ImportError:
    raise ImportError(
        "You may not have installed the [opendrive] dependencies required for using the OpenDRIVE maps with SMARTS. Install it first using the command `pip install -e .[opendrive]` at the source directory."
    )
# pytype: enable=import-error

from shapely.geometry import Point as SPoint
from shapely.geometry import Polygon

from smarts.core.road_map import RoadMap, RoadMapWithCaches, Waypoint
from smarts.core.route_cache import RouteWithCache
from smarts.core.utils.core_math import (
    CubicPolynomial,
    constrain_angle,
    get_linear_segments_for_range,
    inplace_unwrap,
    line_intersect_vectorized,
    radians_to_vec,
    vec_2d,
)
from smarts.core.utils.key_wrapper import KeyWrapper
from smarts.sstudio.sstypes import MapSpec

from .coordinates import BoundingBox, Heading, Point, Pose, RefLinePoint
from .lanepoints import LanePoints, LinkedLanePoint


[docs]@dataclass class LaneBoundary: """Describes a lane boundary.""" refline: PlanViewElement inner: Optional["LaneBoundary"] lane_widths: List[LaneWidthElement] lane_offsets: List[LaneOffsetElement] segment_size: float = 0.5
[docs] def refline_to_linear_segments(self, s_start: float, s_end: float) -> List[float]: """Get segment offsets between the given offsets.""" s_vals = [] geom_start = 0 for geom in self.refline._geometries: geom_end = geom_start + geom.length if type(geom) == LineGeometry: s_vals.extend([geom_start, geom_end]) else: s_vals.extend( get_linear_segments_for_range( geom_start, geom_end, self.segment_size ) ) geom_start += geom.length return [s for s in s_vals if s_start <= s <= s_end]
[docs] def get_lane_offset(self, s: float) -> float: """Get the lane offset for this boundary at a given s value.""" if len(self.lane_offsets) == 0: return 0 if s < self.lane_offsets[0].start_pos: return 0 i = bisect((KeyWrapper(self.lane_offsets, key=lambda x: x.start_pos)), s) - 1 poly = CubicPolynomial.from_list(self.lane_offsets[i].polynomial_coefficients) ds = s - self.lane_offsets[i].start_pos offset = poly.eval(ds) return offset
[docs] def lane_width_at_offset(self, offset: float) -> LaneWidthElement: """Get the lane width at the given offset.""" i = ( bisect((KeyWrapper(self.lane_widths, key=lambda x: x.start_offset)), offset) - 1 ) return self.lane_widths[i]
[docs] def calc_t(self, s: float, section_s_start: float, lane_idx: int) -> float: """Used to evaluate lane boundary shape.""" # Find the lateral shift of lane reference line with road reference line (known as laneOffset in OpenDRIVE) lane_offset = self.get_lane_offset(s) if not self.inner: return np.sign(lane_idx) * lane_offset width = self.lane_width_at_offset(s - section_s_start) poly = CubicPolynomial.from_list(width.polynomial_coefficients) return poly.eval(s - section_s_start - width.start_offset) + self.inner.calc_t( s, section_s_start, lane_idx )
[docs] def to_linear_segments(self, s_start: float, s_end: float) -> List[float]: """Convert from lane boundary shape to linear segments.""" if self.inner: inner_s_vals = self.inner.to_linear_segments(s_start, s_end) else: if self.lane_offsets: return get_linear_segments_for_range(s_start, s_end, self.segment_size) return self.refline_to_linear_segments(s_start, s_end) outer_s_vals: List[float] = [] curr_s_start = s_start for width in self.lane_widths: poly = CubicPolynomial.from_list(width.polynomial_coefficients) if poly.c == 0 and poly.d == 0: # Special case - only 2 vertices required outer_s_vals.extend([curr_s_start, curr_s_start + width.length]) else: outer_s_vals.extend( get_linear_segments_for_range( curr_s_start, curr_s_start + width.length, self.segment_size ) ) curr_s_start += width.length return sorted(set(inner_s_vals + outer_s_vals))
[docs]class OpenDriveRoadNetwork(RoadMapWithCaches): """A road map for an OpenDRIVE source.""" # The ASAM OpenDRIVE v1.6.1 spec is here: # https://www.asam.net/index.php?eID=dumpFile&t=f&f=4089&token=deea5d707e2d0edeeb4fccd544a973de4bc46a09 DEFAULT_LANE_WIDTH = 3.7 DEFAULT_LANE_SPEED = 16.67 # in m/s # Values to convert to m/s for each allowable unit type (see OpenDRIVE spec, section 2.3.2) SPEED_CONVERSION = { "m/s": 1.0, "km/h": 0.27778, "mph": 0.44704, } def __init__( self, xodr_file: str, map_spec: MapSpec, default_lane_speed=None, ): super().__init__() self._log = logging.getLogger(self.__class__.__name__) self._xodr_file = xodr_file self._default_lane_speed = ( default_lane_speed if default_lane_speed is not None else OpenDriveRoadNetwork.DEFAULT_LANE_SPEED ) self._map_spec = map_spec self._default_lane_width = OpenDriveRoadNetwork._spec_lane_width(map_spec) self._surfaces: Dict[str, OpenDriveRoadNetwork.Surface] = {} self._roads: Dict[str, OpenDriveRoadNetwork.Road] = {} self._lanes: Dict[str, OpenDriveRoadNetwork.Lane] = {} # Reference to lanes' R tree self._lane_rtree = None self._load() self._waypoints_cache = OpenDriveRoadNetwork._WaypointsCache()
[docs] @classmethod def from_spec( cls, map_spec: MapSpec, ): """Generate a road network from the given specification.""" if map_spec.shift_to_origin: logger = logging.getLogger(cls.__name__) logger.warning( "OpenDrive road networks do not yet support the 'shift_to_origin' option." ) xodr_file = OpenDriveRoadNetwork._map_path(map_spec) od_map = cls(xodr_file, map_spec) return od_map
@staticmethod def _spec_lane_width(map_spec: MapSpec) -> float: return ( map_spec.default_lane_width if map_spec.default_lane_width is not None else OpenDriveRoadNetwork.DEFAULT_LANE_WIDTH ) @staticmethod def _map_path(map_spec: MapSpec) -> str: if os.path.isdir(map_spec.source): # map.xodr is the default OpenDRIVE map name; try that: return os.path.join(map_spec.source, "map.xodr") return map_spec.source @staticmethod def _elem_id(elem, suffix): if type(elem) == LaneSectionElement: return f"{elem.parentRoad.id}_{elem.idx}_{suffix}" else: assert type(elem) == LaneElement return f"{elem.parentRoad.id}_{elem.lane_section.idx}_{suffix}_{elem.id}" def _load(self): # Parse the xml definition into an initial representation start = time.time() with open(self._xodr_file, "r") as f: od: OpenDriveElement = parse_opendrive(etree.parse(f).getroot()) end = time.time() elapsed = round((end - start) * 1000.0, 3) self._log.info(f"Parsing .xodr file: {elapsed} ms") # First pass: create all Road and Lane objects start = time.time() for road_elem in od.roads: road_elem: RoadElement = road_elem # Set road speed if ( road_elem.types and road_elem.types[0].speed and road_elem.types[0].speed.max ): road_elem_speed = float(road_elem.types[0].speed.max) unit = road_elem.types[0].speed.unit if unit: multiplier = OpenDriveRoadNetwork.SPEED_CONVERSION.get(unit, None) assert ( multiplier is not None ), f"invalid unit type for lane speed: {unit}" road_elem_speed *= multiplier else: road_elem_speed = self._default_lane_speed # Create new road for each lane section for section_elem in road_elem.lanes.lane_sections: section_elem: LaneSectionElement = section_elem # Create new roads so that all lanes for each road are in same direction for sub_road, suffix in [ (section_elem.leftLanes, "L"), (section_elem.rightLanes, "R"), ]: # Skip if there are no lanes if not sub_road: continue road_id = OpenDriveRoadNetwork._elem_id(section_elem, suffix) total_lanes = len(sub_road) road = OpenDriveRoadNetwork.Road( self, road_id, section_elem.parentRoad.junction is not None, section_elem.length, section_elem.sPos, total_lanes, ) self._roads[road_id] = road assert road_id not in self._surfaces self._surfaces[road_id] = road for lane_elem in sub_road: lane_id = OpenDriveRoadNetwork._elem_id(lane_elem, suffix) lane = OpenDriveRoadNetwork.Lane( self, lane_id, road, lane_elem.id, section_elem.length, lane_elem.type.lower() in [ "driving", "exit", "entry", "offramp", "onramp", "connectingramp", ], road_elem_speed, road_elem.planView, ) # Set road as drivable if it has at least one lane drivable road._is_drivable = road._is_drivable or lane.is_drivable self._lanes[lane_id] = lane assert lane_id not in self._surfaces self._surfaces[lane_id] = lane road._lanes.append(lane) end = time.time() elapsed = round((end - start) * 1000.0, 3) self._log.info(f"First pass: {elapsed} ms") # TODO: get signals and objects (crosswalks) here and add as Features to Road/Lane objects # Second pass: compute road and lane connections, compute lane boundaries and polygon start = time.time() for road_elem in od.roads: for section_elem in road_elem.lanes.lane_sections: for sub_road, suffix in [ (section_elem.leftLanes, "L"), (section_elem.rightLanes, "R"), ]: # Skip if there are no lanes if not sub_road: continue road_id = OpenDriveRoadNetwork._elem_id(section_elem, suffix) road: RoadMap.Road = self._roads[road_id] road_bounding_box = [ (float("inf"), float("inf")), (float("-inf"), float("-inf")), ] inner_boundary = LaneBoundary( road_elem.planView, None, [], road_elem.lanes.laneOffsets ) for lane_elem in sub_road: lane_id = OpenDriveRoadNetwork._elem_id(lane_elem, suffix) lane = self._lanes[lane_id] # Compute lanes in same direction lane.lanes_in_same_direction = [ l for l in road.lanes if l.lane_id != lane.lane_id ] # Compute lane to the left result = None direction = True if lane.index == len(road.lanes) - 1: if "R" in road.road_id: left_road_id = road.road_id.replace("R", "L") else: left_road_id = road.road_id.replace("L", "R") if left_road_id in self._roads: road_to_left = self._roads[left_road_id] result = road_to_left.lane_at_index( len(road_to_left.lanes) - 1 ) direction = False else: result = road.lane_at_index(lane.index + 1) lane.lane_to_left = result, direction # Compute lane to right result = None assert lane.index < len(road.lanes) if lane.index != 0: result = road.lane_at_index(lane.index - 1) lane.lane_to_right = result, True # Compute Lane connections self._compute_lane_connections(od, lane, lane_elem, road_elem) # Set lane's outer and inner boundary outer_boundary = LaneBoundary( None, inner_boundary, lane_elem.widths, road_elem.lanes.laneOffsets, ) lane._cache_geometry(inner_boundary, outer_boundary) inner_boundary = outer_boundary road_bounding_box = [ ( min( road_bounding_box[0][0], lane.bounding_box.min_pt.x, ), min( road_bounding_box[0][1], lane.bounding_box.min_pt.y, ), ), ( max( road_bounding_box[1][0], lane.bounding_box.max_pt.x, ), max( road_bounding_box[1][1], lane.bounding_box.max_pt.y, ), ), ] road.bounding_box = BoundingBox( min_pt=Point( x=road_bounding_box[0][0], y=road_bounding_box[0][1] ), max_pt=Point( x=road_bounding_box[1][0], y=road_bounding_box[1][1] ), ) end = time.time() elapsed = round((end - start) * 1000.0, 3) self._log.info(f"Second pass: {elapsed} ms") start = time.time() self._compute_lane_intersections(od) end = time.time() self._log.info(f"Intersections pass: {elapsed} ms") # Third pass: everything that depends on lane connections start = time.time() for road in list(self._roads.values()): # Compute incoming/outgoing roads based on lane connections in_roads = set() out_roads = set() for lane in road.lanes: for out_lane in lane.outgoing_lanes: if out_lane.road.road_id != road.road_id: out_roads.add(out_lane.road) # due to junctions, sometimes incoming lanes from junction connections # are not found by _compute_lane_connections() (because the elementType of the predecessor # is "junction" and not "road", so we repair that here. if lane not in out_lane.incoming_lanes: out_lane.incoming_lanes.append(lane) if lane.road not in out_lane.road.incoming_roads: out_lane.road.incoming_roads.append(lane.road) for lane in road.lanes: for in_lane in lane.incoming_lanes: if in_lane.road.road_id != road.road_id: in_roads.add(in_lane.road) # due to junctions, sometimes outgoing lanes from junction connections # are not found by _compute_lane_connections (because the elementType of the successor # is "junction" and not "road", so we repair that here. if lane not in in_lane.outgoing_lanes: in_lane.outgoing_lanes.append(lane) if lane.road not in in_lane.road.outgoing_roads: in_lane.road.outgoing_roads.append(lane.road) road.incoming_roads.extend(list(in_roads)) road.outgoing_roads.extend(list(out_roads)) for lane in road.lanes: # Compute lane foes foes = set(lane._intersections) foes |= { incoming for outgoing in lane.outgoing_lanes for incoming in outgoing.incoming_lanes if incoming != lane } lane._foes = list(foes) if lane.foes or len(lane.incoming_lanes) > 1: road._is_junction = True # recompute lane to left using road geometry if the map was converted from SUMO to OpenDRIVE curr_leftmost_lane = road.lane_at_index(len(road.lanes) - 1) if curr_leftmost_lane and curr_leftmost_lane.lane_to_left[0] is None: for other_road_id in self._roads: if other_road_id == road.road_id: continue other_road = self._roads[other_road_id] other_leftmost_lane = other_road.lane_at_index( len(other_road.lanes) - 1 ) if other_leftmost_lane.lane_to_left[0] is not None: continue curr_leftmost_edge_shape, _ = road._edge_shape(0) other_leftmost_edge_shape, _ = other_road._edge_shape(0) edge_border_i = np.array( [curr_leftmost_edge_shape[0], curr_leftmost_edge_shape[-1]] ) # start and end position edge_border_j = np.array( [ other_leftmost_edge_shape[-1], other_leftmost_edge_shape[0], ] ) # start and end position with reverse traffic direction # The edge borders of two lanes do not always overlap perfectly, # thus relax the tolerance threshold to 1 if np.linalg.norm(edge_border_i - edge_border_j) < 1: curr_leftmost_lane._lane_to_left = other_leftmost_lane, False other_leftmost_lane._lane_to_left = curr_leftmost_lane, False end = time.time() elapsed = round((end - start) * 1000.0, 3) self._log.info(f"Third pass: {elapsed} ms") def _compute_lane_connections( self, od: OpenDriveElement, lane: RoadMapWithCaches.Lane, lane_elem: LaneElement, road_elem: RoadElement, ): lane_link = lane_elem.link ls_index = lane_elem.lane_section.idx if lane_link.predecessorId: road_id, section_id = None, None if ls_index == 0: # This is the first lane section, so get the first/last lane section of the predecessor road road_predecessor = road_elem.link.predecessor if road_predecessor and road_predecessor.elementType == "road": road_id = road_predecessor.element_id pred_road_elem = od.getRoad(road_id) if not pred_road_elem: self._log.warning( f"Predecessor road {road_id} does not exist for road {road_elem.id}" ) return section_id = ( pred_road_elem.lanes.getLastLaneSectionIdx() if road_predecessor.contactPoint == "end" else 0 ) else: # Otherwise, get the previous lane section of the current road road_id = road_elem.id section_id = ls_index - 1 if road_id is not None and section_id is not None: pred_suffix = "L" if lane_link.predecessorId > 0 else "R" pred_lane_id = ( f"{road_id}_{section_id}_{pred_suffix}_{lane_link.predecessorId}" ) pred_lane = self.lane_by_id(pred_lane_id) if lane_elem.id < 0: # Direction of lane is the same as the reference line if pred_lane not in lane.incoming_lanes: lane.incoming_lanes.append(pred_lane) if lane not in pred_lane.outgoing_lanes: pred_lane.outgoing_lanes.append(lane) else: # Direction of lane is opposite the refline, so this is actually an outgoing lane if pred_lane not in lane.outgoing_lanes: lane.outgoing_lanes.append(pred_lane) if lane not in pred_lane.incoming_lanes: pred_lane.incoming_lanes.append(lane) if lane_link.successorId: road_id, section_id = None, None if ls_index == len(road_elem.lanes.lane_sections) - 1: # This is the last lane section, so get the first/last lane section of the successor road road_successor = road_elem.link.successor if road_successor and road_successor.elementType == "road": road_id = road_successor.element_id succ_road_elem = od.getRoad(road_id) if not succ_road_elem: self._log.warning( f"Successor road {road_id} does not exist for road {road_elem.id}" ) return section_id = ( succ_road_elem.lanes.getLastLaneSectionIdx() if road_successor.contactPoint == "end" else 0 ) else: # Otherwise, get the next lane section in the current road road_id = road_elem.id section_id = ls_index + 1 if road_id is not None and section_id is not None: succ_suffix = "L" if lane_link.successorId > 0 else "R" succ_lane_id = ( f"{road_id}_{section_id}_{succ_suffix}_{lane_link.successorId}" ) succ_lane = self.lane_by_id(succ_lane_id) if lane_elem.id < 0: # Direction of lane is the same as the reference line if succ_lane not in lane.outgoing_lanes: lane.outgoing_lanes.append(succ_lane) if lane not in succ_lane.incoming_lanes: succ_lane.incoming_lanes.append(lane) else: # Direction of lane is opposite the refline, so this is actually an incoming lane if succ_lane not in lane.incoming_lanes: lane.incoming_lanes.append(succ_lane) if lane not in succ_lane.outgoing_lanes: succ_lane.outgoing_lanes.append(lane) def _junction_road_lanes( self, jx_elem: JunctionElement, od: OpenDriveElement ) -> Generator[List["OpenDriveRoadNetwork.Lane"], None, None]: for cnxn_elem in jx_elem.connections: cnxn_elem: ConnectionElement = cnxn_elem cnxn_road_elem = od.getRoad(cnxn_elem.connectingRoad) croad_lanes = [] for section_elem in cnxn_road_elem.lanes.lane_sections: section_elem: LaneSectionElement = section_elem for sub_road, suffix in [ (section_elem.leftLanes, "L"), (section_elem.rightLanes, "R"), ]: if not sub_road: continue road_id = OpenDriveRoadNetwork._elem_id(section_elem, suffix) croad_lanes += self._roads[road_id].lanes yield croad_lanes @staticmethod def _check_intersection(line1: np.ndarray, line2: np.ndarray) -> bool: C = np.roll(line2, 0, axis=0)[:-1] D = np.roll(line2, -1, axis=0)[:-1] for i in range(len(line1) - 1): a = line1[i] b = line1[i + 1] ignore_start_pt = i == 0 if line_intersect_vectorized(a, b, C, D, ignore_start_pt): return True return False def _compute_lane_intersections(self, od_element: OpenDriveElement): intersections: Dict[ OpenDriveRoadNetwork.Lane, Set[OpenDriveRoadNetwork.Lane] ] = dict() for jx_elem in od_element.junctions: for jx_road_lanes1 in self._junction_road_lanes(jx_elem, od_element): for jx_road_lanes2 in self._junction_road_lanes(jx_elem, od_element): if jx_road_lanes2 == jx_road_lanes1: continue for jl1 in jx_road_lanes1: line1 = jl1._center_polyline_arr for jl2 in jx_road_lanes2: line2 = jl2._center_polyline_arr if self._check_intersection(line1, line2): jl1._intersections.add(jl2) jl2._intersections.add(jl1) @property def source(self) -> str: """This is the `.xodr` file of the OpenDRIVE map.""" return self._xodr_file
[docs] def is_same_map(self, map_spec: MapSpec) -> bool: return ( ( map_spec.source == self._map_spec.source or OpenDriveRoadNetwork._map_path(map_spec) == OpenDriveRoadNetwork._map_path(self._map_spec) ) and map_spec.lanepoint_spacing == self._map_spec.lanepoint_spacing and ( map_spec.default_lane_width == self._map_spec.default_lane_width or OpenDriveRoadNetwork._spec_lane_width(map_spec) == OpenDriveRoadNetwork._spec_lane_width(self._map_spec) ) and map_spec.shift_to_origin == self._map_spec.shift_to_origin )
[docs] def surface_by_id(self, surface_id: str) -> Optional[RoadMap.Surface]: return self._surfaces.get(surface_id)
@cached_property def bounding_box(self) -> BoundingBox: x_mins, y_mins, x_maxs, y_maxs = [], [], [], [] for road in self._roads.values(): bounding_box = road.bounding_box if bounding_box is None: continue x_mins.append(bounding_box.min_pt.x) y_mins.append(bounding_box.min_pt.y) x_maxs.append(bounding_box.max_pt.x) y_maxs.append(bounding_box.max_pt.y) return BoundingBox( min_pt=Point(x=min(x_mins), y=min(y_mins)), max_pt=Point(x=max(x_maxs), y=max(y_maxs)), )
[docs] def to_glb(self, glb_dir): polygons = [] for lane_id, lane in self._lanes.items(): metadata = { "road_id": lane.road.road_id, "lane_id": lane_id, "lane_index": lane.index, } polygons.append((lane.shape(), metadata)) lane_dividers, edge_dividers = self._compute_traffic_dividers() map_glb = make_map_glb( polygons, self.bounding_box, lane_dividers, edge_dividers ) map_glb.write_glb(Path(glb_dir) / "map.glb") road_lines_glb = make_road_line_glb(edge_dividers) road_lines_glb.write_glb(Path(glb_dir) / "road_lines.glb") lane_lines_glb = make_road_line_glb(lane_dividers) lane_lines_glb.write_glb(Path(glb_dir) / "lane_lines.glb")
def _compute_traffic_dividers(self): lane_dividers = [] # divider between lanes with same traffic direction road_dividers = [] # divider between roads with opposite traffic direction road_borders = [] dividers_checked = [] for road_id, road in self._roads.items(): road_left_border = None if not road.is_junction: leftmost_edge_shape, rightmost_edge_shape = road._edge_shape(0) road_borders.extend([leftmost_edge_shape, rightmost_edge_shape]) for lane in road.lanes: left_border_vertices_len = int((len(lane.lane_polygon) - 1) / 2) left_side = lane.lane_polygon[:left_border_vertices_len] if lane.index != len(road.lanes) - 1: lane_to_left, _ = lane.lane_to_left assert lane_to_left if lane.is_drivable and lane_to_left.is_drivable: lane_dividers.append(left_side) else: road_dividers.append(left_side) else: road_left_border = left_side assert road_left_border # The road borders that overlapped in positions form a road divider id_split = road_id.split("_") parent_road_id = f"{id_split[0]}_{id_split[1]}" if parent_road_id not in dividers_checked: dividers_checked.append(parent_road_id) if "R" in road.road_id: adjacent_road_id = road.road_id.replace("R", "L") else: adjacent_road_id = road.road_id.replace("L", "R") if adjacent_road_id in self._roads: road_dividers.append(road_left_border) for i in range(len(road_borders) - 1): for j in range(i + 1, len(road_borders)): edge_border_i = np.array( [road_borders[i][0], road_borders[i][-1]] ) # start and end position edge_border_j = np.array( [road_borders[j][-1], road_borders[j][0]] ) # start and end position with reverse traffic direction # The edge borders of two lanes do not always overlap perfectly, thus relax the tolerance threshold to 1 if np.linalg.norm(edge_border_i - edge_border_j) < 1: road_dividers.append(road_borders[i]) return lane_dividers, road_dividers
[docs] class Surface(RoadMapWithCaches.Surface): """Describes an OpenDRIVE surface.""" def __init__(self, surface_id: str, road_map): self._surface_id = surface_id self._map = road_map @property def surface_id(self) -> str: return self._surface_id @property def is_drivable(self) -> bool: # Not all lanes on OpenDRIVE roads are drivable raise NotImplementedError
[docs] class Lane(RoadMapWithCaches.Lane, Surface): """Describes an OpenDRIVE lane.""" def __init__( self, road_map, lane_id: str, road: "OpenDriveRoadNetwork.Road", index: int, length: float, is_drivable: bool, speed_limit: float, road_plan_view: PlanViewElement, ): super().__init__(lane_id, road_map) self._road = road # for internal OpenDRIVE convention # Lanes with positive lane_elem ID run on the left side of the center lane, while lanes with # lane_elem negative ID run on the right side of the center lane. # OpenDRIVE's assumption is that the direction of reference line is same as direction of lanes with # lane_elem negative ID, hence for a given road -1 will be the left most lane in one direction # and 1 will be the left most lane in other direction if it exists. # If there is only one lane in a road, its index will be -1. self._lane_elem_index = index # for Road Map convention, outermost/rightmost lane being 0 self._index = road._total_lanes - abs(index) self._length = length self._speed_limit = speed_limit self._plan_view = road_plan_view self._is_drivable = is_drivable self._incoming_lanes = [] self._outgoing_lanes = [] self._lanes_in_same_dir = [] self._foes = [] self._ref_coords = {} self._lane_boundaries = tuple() self._lane_polygon = [] self._centerline_points = [] self._bounding_box = None self._lane_to_left = None, True self._lane_to_right = None, True self._in_junction = None self._intersections = set() def __hash__(self) -> int: return hash(self.lane_id) + hash(self._map) @property def is_drivable(self) -> bool: return self._is_drivable @property def lane_id(self) -> str: return self._lane_id @property def road(self) -> "OpenDriveRoadNetwork.Road": return self._road @property def length(self) -> float: return self._length @property def speed_limit(self) -> Optional[float]: return self._speed_limit @property def in_junction(self) -> bool: return self.road.is_junction @property def index(self) -> int: return self._index @property def center_polyline(self) -> List[Point]: return self._centerline_points @cached_property def _center_polyline_arr(self) -> np.ndarray: return np.array([p.as_np_array[:2] for p in self._centerline_points]) @property def incoming_lanes(self) -> List[RoadMapWithCaches.Lane]: return self._incoming_lanes @property def outgoing_lanes(self) -> List[RoadMapWithCaches.Lane]: return self._outgoing_lanes @property def entry_surfaces(self) -> List[RoadMap.Surface]: return self.incoming_lanes @property def exit_surfaces(self) -> List[RoadMap.Surface]: return self.outgoing_lanes @property def lanes_in_same_direction(self) -> List[RoadMapWithCaches.Lane]: return self._lanes_in_same_dir @lanes_in_same_direction.setter def lanes_in_same_direction(self, lanes): self._lanes_in_same_dir = lanes @property def lane_to_left(self) -> Tuple[Optional[RoadMapWithCaches.Lane], bool]: return self._lane_to_left @lane_to_left.setter def lane_to_left(self, value): self._lane_to_left = value @property def lane_to_right(self) -> Tuple[Optional[RoadMapWithCaches.Lane], bool]: return self._lane_to_right @lane_to_right.setter def lane_to_right(self, value): self._lane_to_right = value @property def foes(self) -> List[RoadMapWithCaches.Lane]: return self._foes @property def lane_polygon(self) -> List[Tuple[float, float]]: """A list of polygons that describe the shape of the lane.""" return self._lane_polygon @cached_property def bounding_box(self) -> BoundingBox: """Get the minimal axis aligned bounding box that contains all geometry in this lane.""" # XXX: This shoudn't be public. x_coordinates, y_coordinates = zip(*self.lane_polygon) self._bounding_box = BoundingBox( min_pt=Point(x=min(x_coordinates), y=min(y_coordinates)), max_pt=Point(x=max(x_coordinates), y=max(y_coordinates)), ) return self._bounding_box def _t_angle(self, s_heading: float) -> float: lane_elem_id = self._lane_elem_index angle = ( (s_heading - math.pi / 2) if lane_elem_id < 0 else (s_heading + math.pi / 2) ) return constrain_angle(angle) def _cache_geometry( self, inner_boundary: LaneBoundary, outer_boundary: LaneBoundary ): # Set inner/outer boundaries self._lane_boundaries = (inner_boundary, outer_boundary) # Compute ref coords (s values to sample for polygon & centerline) section_len = self._length section_s_start = self.road._start_pos section_s_end = section_s_start + section_len inner_s_vals = inner_boundary.to_linear_segments( section_s_start, section_s_end ) outer_s_vals = outer_boundary.to_linear_segments( section_s_start, section_s_end ) # Cache centerline & ref coords center_xs, center_ys = [], [] s_vals = sorted(set(inner_s_vals + outer_s_vals)) for s in s_vals: t_inner = inner_boundary.calc_t( s, section_s_start, self._lane_elem_index ) t_outer = outer_boundary.calc_t( s, section_s_start, self._lane_elem_index ) (x_ref, y_ref), heading = self._plan_view.calc(s) angle = self._t_angle(heading) width_at_offset = t_outer - t_inner center_xs.append( x_ref + (t_inner + (width_at_offset / 2)) * math.cos(angle) ) center_ys.append( y_ref + (t_inner + (width_at_offset / 2)) * math.sin(angle) ) self._ref_coords[s] = (t_inner, t_outer) # For lanes left of the refline, reverse the order of centerline points to be in order of increasing s if self._lane_elem_index > 0: center_xs = center_xs[::-1] center_ys = center_ys[::-1] self._centerline_points = [ Point(x, y) for x, y in zip(center_xs, center_ys) ] # Cache lane polygon (normal size, with no buffer) self._lane_polygon = self._compute_lane_polygon() def _compute_lane_polygon( self, width_offset: float = 0.0, ) -> List[Tuple[float, float]]: xs, ys = [], [] xs_inner, ys_inner = [], [] xs_outer, ys_outer = [], [] s_vals = sorted(self._ref_coords.keys()) for s in s_vals: t_inner, t_outer = self._ref_coords[s] (x_ref, y_ref), heading = self._plan_view.calc(s) angle = self._t_angle(heading) xs_inner.append(x_ref + (t_inner - width_offset) * math.cos(angle)) ys_inner.append(y_ref + (t_inner - width_offset) * math.sin(angle)) xs_outer.append(x_ref + (t_outer + width_offset) * math.cos(angle)) ys_outer.append(y_ref + (t_outer + width_offset) * math.sin(angle)) xs.extend(xs_inner + xs_outer[::-1] + [xs_inner[0]]) ys.extend(ys_inner + ys_outer[::-1] + [ys_inner[0]]) return list(zip(xs, ys))
[docs] def waypoint_paths_for_pose( self, pose: Pose, lookahead: int, route: Optional[RoadMap.Route] = None ) -> List[List[Waypoint]]: if not self.is_drivable: return [] road_ids = [road.road_id for road in route.roads] if route else None return self._waypoint_paths_at(pose.point, lookahead, road_ids)
[docs] def waypoint_paths_at_offset( self, offset: float, lookahead: int = 30, route: Optional[RoadMap.Route] = None, ) -> List[List[Waypoint]]: if not self.is_drivable: return [] wp_start = self.from_lane_coord(RefLinePoint(offset)) road_ids = [road.road_id for road in route.roads] if route else None return self._waypoint_paths_at(wp_start, lookahead, road_ids)
def _waypoint_paths_at( self, point: Point, lookahead: int, filter_road_ids: Optional[Sequence[str]] = None, ) -> List[List[Waypoint]]: if not self.is_drivable: return [] closest_linked_lp = ( self._map._lanepoints.closest_linked_lanepoint_on_lane_to_point( point, self._lane_id ) ) return self._map._waypoints_starting_at_lanepoint( closest_linked_lp, lookahead, tuple(filter_road_ids) if filter_road_ids else (), point, )
[docs] @lru_cache(maxsize=8) def project_along( self, start_offset: float, distance: float ) -> Set[Tuple[RoadMapWithCaches.Lane, float]]: return super().project_along(start_offset, distance)
[docs] @lru_cache(maxsize=8) def contains_point(self, point: Point) -> bool: if ( self._bounding_box.min_pt.x <= point.x <= self._bounding_box.max_pt.x and self._bounding_box.min_pt.y <= point.y <= self._bounding_box.max_pt.y ): lane_point = self.to_lane_coord(point) width_at_offset, _ = self.width_at_offset(lane_point.s) # t-direction is negative for right side and positive for left side of the central reference # line of lane w.r.t its heading, absolute value of lane_point.t should be less than half of width at # that point return ( abs(lane_point.t) <= (width_at_offset / 2) and 0 <= lane_point.s < self.length ) return False
[docs] @lru_cache(maxsize=16) def oncoming_lanes_at_offset( self, offset: float ) -> List[RoadMapWithCaches.Lane]: result = [] radius = 1.1 * self.width_at_offset(offset)[0] pt = self.from_lane_coord(RefLinePoint(offset)) nearby_lanes = self._map.nearest_lanes(pt, radius=radius) if not nearby_lanes: return result my_vect = self.vector_at_offset(offset) my_norm = np.linalg.norm(my_vect) if my_norm == 0: return result threshold = -0.995562 # cos(175*pi/180) for lane, _ in nearby_lanes: if lane == self: continue lane_refline_pt = lane.to_lane_coord(pt) lv = lane.vector_at_offset(lane_refline_pt.s) lv_norm = np.linalg.norm(lv) if lv_norm == 0: continue lane_angle = np.dot(my_vect, lv) / (my_norm * lv_norm) if lane_angle < threshold: result.append(lane) return result
[docs] @lru_cache(maxsize=8) def center_at_point(self, point: Point) -> Point: return super().center_at_point(point)
@lru_cache(8) def _edges_at_point( self, point: Point ) -> Tuple[Optional[Point], Optional[Point]]: """Get the boundary points perpendicular to the center of the lane closest to the given world coordinate. Args: point: A world coordinate point. Returns: A pair of points indicating the left boundary and right boundary of the lane. """ from .shape import offset_along_shape, position_at_shape_offset reference_line_vertices_len = int((len(self._lane_polygon) - 1) / 2) # left_edge left_edge_shape = [ Point(x, y) for x, y in self._lane_polygon[:reference_line_vertices_len] ] left_offset = offset_along_shape(point, left_edge_shape) left_edge = position_at_shape_offset(left_edge_shape, left_offset) # right_edge right_edge_shape = [ Point(x, y) for x, y in self._lane_polygon[ reference_line_vertices_len : len(self._lane_polygon) - 1 ] ] right_offset = offset_along_shape(point, right_edge_shape) right_edge = position_at_shape_offset(right_edge_shape, right_offset) return left_edge, right_edge
[docs] @lru_cache(maxsize=8) def center_pose_at_point(self, point: Point) -> Pose: return super().center_pose_at_point(point)
[docs] @lru_cache(maxsize=1024) def curvature_radius_at_offset( self, offset: float, lookahead: int = 5 ) -> float: return super().curvature_radius_at_offset(offset, lookahead)
[docs] @lru_cache(maxsize=1024) def width_at_offset(self, lane_point_s: float) -> Tuple[float, float]: start_pos = self.road._start_pos if self._lane_elem_index < 0: road_offset = lane_point_s + start_pos else: road_offset = (self._length - lane_point_s) + start_pos inner_boundary, outer_boundary = self._lane_boundaries t_outer = outer_boundary.calc_t( road_offset, start_pos, self._lane_elem_index ) t_inner = inner_boundary.calc_t( road_offset, start_pos, self._lane_elem_index ) return abs(t_outer - t_inner), 1.0
[docs] @lru_cache(maxsize=4) def shape( self, buffer_width: float = 0.0, default_width: Optional[float] = None ) -> Polygon: if buffer_width == 0.0: return Polygon(self._lane_polygon) buffered_polygon = self._compute_lane_polygon(buffer_width / 2) return Polygon(buffered_polygon)
[docs] def lane_by_id(self, lane_id: str) -> RoadMapWithCaches.Lane: lane = self._lanes.get(lane_id) assert ( lane ), f"OpenDriveRoadNetwork got request for unknown lane_id: '{lane_id}'" return lane
[docs] class Road(RoadMapWithCaches.Road, Surface): """This is akin to a 'road segment' in real life. Many of these might correspond to a single named road in reality.""" def __init__( self, road_map, road_id: str, is_junction: bool, length: float, start_pos: float, total_lanes: int, ): super().__init__(road_id, road_map) self._log = logging.getLogger(self.__class__.__name__) self._road_id = road_id self._is_junction = is_junction self._length = length self._start_pos = start_pos self._is_drivable = False self._lanes = [] self._bounding_box = None self._incoming_roads = [] self._outgoing_roads = [] self._parallel_roads = [] self._total_lanes = total_lanes def __hash__(self) -> int: return hash(self.road_id) ^ hash(self._map) @property def road_id(self) -> str: return self._road_id @property def is_junction(self) -> bool: return self._is_junction @property def length(self) -> float: return self._length @property def is_drivable(self) -> bool: return self._is_drivable @property def incoming_roads(self) -> List[RoadMap.Road]: return self._incoming_roads @property def outgoing_roads(self) -> List[RoadMap.Road]: return self._outgoing_roads @property def entry_surfaces(self) -> List[RoadMap.Surface]: # TAI: also include lanes here? return self.incoming_roads @property def exit_surfaces(self) -> List[RoadMap.Surface]: # TAI: also include lanes here? return self.outgoing_roads @property def parallel_roads(self) -> List[RoadMap.Road]: return self._parallel_roads @property def lanes(self) -> List["OpenDriveRoadNetwork.Lane"]: return self._lanes @property def bounding_box(self) -> Optional[BoundingBox]: """Get the minimal axis aligned bounding box that contains all road geometry.""" return self._bounding_box @bounding_box.setter def bounding_box(self, value): # XXX: This shouldn't be public. self._bounding_box = value
[docs] @lru_cache(maxsize=8) def contains_point(self, point: Point) -> bool: if ( self._bounding_box.min_pt.x <= point.x <= self._bounding_box.max_pt.x and self._bounding_box.min_pt.y <= point.y <= self._bounding_box.max_pt.y ): for lane in self.lanes: if lane.contains_point(point): return True return False
@lru_cache(maxsize=8) def _edges_at_point(self, point: Point) -> Tuple[Point, Point]: """Get the boundary points perpendicular to the center of the road closest to the given world coordinate. Args: point: A world coordinate point. Returns: A pair of points indicating the left boundary and right boundary of the road. """ # left and right edge follow the lane reference line system or direction of that road leftmost_lane = self.lane_at_index(self._total_lanes - 1) rightmost_lane = self.lane_at_index(0) _, right_edge = rightmost_lane._edges_at_point(point) left_edge, _ = leftmost_lane._edges_at_point(point) return left_edge, right_edge
[docs] @lru_cache(maxsize=16) def oncoming_roads_at_point(self, point: Point) -> List[RoadMap.Road]: result = [] for lane in self.lanes: offset = lane.to_lane_coord(point).s result += [ ol.road for ol in lane.oncoming_lanes_at_offset(offset) if ol.road != self ] return result
def _edge_shape(self, buffer_width: float = 0.0): leftmost_lane = self.lane_at_index(self._total_lanes - 1) rightmost_lane = self.lane_at_index(0) if buffer_width == 0.0: rightmost_lane_buffered_polygon = rightmost_lane.lane_polygon leftmost_lane_buffered_polygon = leftmost_lane.lane_polygon else: rightmost_lane_buffered_polygon = rightmost_lane._compute_lane_polygon( buffer_width / 2 ) leftmost_lane_buffered_polygon = leftmost_lane._compute_lane_polygon( buffer_width / 2 ) # Right edge rightmost_edge_vertices_len = int( (len(rightmost_lane_buffered_polygon) - 1) / 2 ) rightmost_edge_shape = rightmost_lane_buffered_polygon[ rightmost_edge_vertices_len : len(rightmost_lane_buffered_polygon) - 1 ] # Left edge leftmost_edge_vertices_len = int( (len(leftmost_lane_buffered_polygon) - 1) / 2 ) leftmost_edge_shape = leftmost_lane_buffered_polygon[ :leftmost_edge_vertices_len ] return leftmost_edge_shape, rightmost_edge_shape
[docs] @lru_cache(maxsize=4) def shape( self, buffer_width: float = 0.0, default_width: Optional[float] = None ) -> Polygon: leftmost_edge_shape, rightmost_edge_shape = self._edge_shape(buffer_width) road_polygon = ( leftmost_edge_shape + rightmost_edge_shape + [leftmost_edge_shape[0]] ) return Polygon(road_polygon)
[docs] def lane_at_index(self, index: int) -> "OpenDriveRoadNetwork.Lane": lanes_with_index = [lane for lane in self.lanes if lane.index == index] if len(lanes_with_index) == 0: self._log.warning( f"Road with id {self.road_id} has no lane at index {index}" ) return None assert len(lanes_with_index) == 1 return lanes_with_index[0]
[docs] def road_by_id(self, road_id: str) -> "OpenDriveRoadNetwork.Road": road = self._roads.get(road_id) assert ( road ), f"OpenDriveRoadNetwork got request for unknown road_id: '{road_id}'" return road
@cached_property def _simple_lanes(self) -> List[RoadMapWithCaches.Lane]: return [lane for lane in self._lanes.values() if not lane.is_composite] def _build_lane_r_tree(self): result = rtree.index.Index() result.interleaved = True # only index simple lanes, as composite lanes can # always be gotten from a simple lane, and we don't # want more ambiguity in our spatial queries. for idx, lane in enumerate(self._simple_lanes): bounding_box = ( lane.bounding_box.min_pt.x, lane.bounding_box.min_pt.y, lane.bounding_box.max_pt.x, lane.bounding_box.max_pt.y, ) result.add(idx, bounding_box) return result def _get_neighboring_lanes( self, x: float, y: float, r: float = 0.1 ) -> List[Tuple[RoadMapWithCaches.Lane, float]]: neighboring_lanes = [] if self._lane_rtree is None: self._lane_rtree = self._build_lane_r_tree() simple_lanes = self._simple_lanes spt = SPoint(x, y) for i in self._lane_rtree.intersection((x - r, y - r, x + r, y + r)): lane = simple_lanes[i] d = lane.shape().distance(spt) if d < r: neighboring_lanes.append((lane, d)) return neighboring_lanes
[docs] @lru_cache(maxsize=1024) def nearest_lanes( self, point: Point, radius: Optional[float] = None, include_junctions=False ) -> List[Tuple[RoadMapWithCaches.Lane, float]]: if radius is None: radius = max(10, 2 * self._default_lane_width) candidate_lanes = self._get_neighboring_lanes(point[0], point[1], r=radius) candidate_lanes.sort(key=lambda lane_dist_tup: lane_dist_tup[1]) return candidate_lanes
[docs] def nearest_lane( self, point: Point, radius: Optional[float] = None, include_junctions: bool = False, ) -> Optional[RoadMapWithCaches.Lane]: nearest_lanes = self.nearest_lanes(point, radius, include_junctions) for lane, dist in nearest_lanes: if lane.contains_point(point): # Since OpenDRIVE has lanes of varying width, a point can be closer to a lane it does not lie in # when compared to the lane it does if it is closer to the outer lane's central line, # than the lane it lies in. return lane return nearest_lanes[0][0] if nearest_lanes else None
[docs] @lru_cache(maxsize=16) def road_with_point( self, point: Point, *, lanes_to_search: Optional[Sequence["RoadMap.Lane"]] = None, ) -> Optional[RoadMap.Road]: # Lookup nearest lanes if no search lanes were provided if not lanes_to_search: radius = max(5, 2 * self._default_lane_width) lanes = [nl for (nl, _) in self.nearest_lanes(point, radius)] else: lanes = lanes_to_search for lane in lanes: if lane.contains_point(point): return lane.road return None
[docs] class Route(RouteWithCache): """Describes a route between two OpenDRIVE roads.""" def __init__( self, road_map: RoadMap, roads: List[RoadMap.Road] = [], start_lane: Optional[RoadMap.Lane] = None, end_lane: Optional[RoadMap.Lane] = None, ): super().__init__(road_map, start_lane, end_lane) self._roads = roads self._length = sum([road.length for road in roads]) @property def roads(self) -> List[RoadMap.Road]: return self._roads @property def road_length(self) -> float: return self._length @cached_property def geometry(self) -> Sequence[Sequence[Tuple[float, float]]]: return [list(road.shape(1.0).exterior.coords) for road in self.roads]
@staticmethod def _shortest_route(start: RoadMap.Road, end: RoadMap.Road) -> List[RoadMap.Road]: queue = [(start.length, start.road_id, start)] came_from = dict() came_from[start] = None cost_so_far = dict() cost_so_far[start] = start.length current: Optional[RoadMap.Road] = None # Dijkstra’s Algorithm while queue: (_, _, current) = heapq.heappop(queue) if current == end: break for out_road in current.outgoing_roads: new_cost = cost_so_far[current] + out_road.length if out_road not in cost_so_far or new_cost < cost_so_far[out_road]: cost_so_far[out_road] = new_cost came_from[out_road] = current heapq.heappush(queue, (new_cost, out_road.road_id, out_road)) # This means we couldn't find a valid route since the queue is empty if current != end: return [] # Reconstruct path current = end path = [] while current != start: path.append(current) current = came_from[current] path.append(start) path.reverse() return path def _generate_routes( self, start_road: RoadMap.Road, start_lane: RoadMap.Lane, end_road: RoadMap.Road, end_lane: RoadMap.Lane, via: Optional[Sequence[RoadMap.Road]], max_to_gen: int, ) -> List[RoadMap.Route]: assert ( max_to_gen == 1 ), "multiple route generation not yet supported for OpenDRIVE" roads = [start_road] if via: roads += via if end_road != start_road: roads.append(end_road) route_roads = [] for cur_road, next_road in zip(roads, roads[1:] + [None]): if not next_road: route_roads.append(cur_road) break sub_route = OpenDriveRoadNetwork._shortest_route(cur_road, next_road) or [] if len(sub_route) < 2: self._log.warning( f"Unable to find valid path between {(cur_road.road_id, next_road.road_id)}." ) return [OpenDriveRoadNetwork.Route(road_map=self)] # The sub route includes the boundary roads (cur_road, next_road). # We clip the latter to prevent duplicates route_roads.extend(sub_route[:-1]) return [ OpenDriveRoadNetwork.Route( road_map=self, roads=route_roads, start_lane=start_lane, end_lane=end_lane, ) ]
[docs] def random_route( self, max_route_len: int = 10, starting_road: Optional[RoadMap.Road] = None, only_drivable: bool = True, ) -> RoadMap.Route: """ """ assert not starting_road or not only_drivable or starting_road.is_drivable next_roads = [starting_road] if starting_road else list(self._roads.values()) if only_drivable: next_roads = [r for r in next_roads if r.is_drivable] route_roads = [] while next_roads and len(route_roads) < max_route_len: cur_road = random.choice(next_roads) route_roads.append(cur_road) next_roads = list(cur_road.outgoing_roads) return OpenDriveRoadNetwork.Route(road_map=self, roads=route_roads)
[docs] def empty_route(self) -> RoadMap.Route: return OpenDriveRoadNetwork.Route(self)
[docs] def route_from_road_ids( self, road_ids: Sequence[str], resolve_intermediaries: bool = False ) -> RoadMap.Route: return OpenDriveRoadNetwork.Route.from_road_ids( self, road_ids, resolve_intermediaries )
class _WaypointsCache: def __init__(self): self.lookahead = 0 self.point = Point(0, 0) self.filter_road_ids = () self._starts = {} # XXX: all vehicles share this cache now (as opposed to before # when it was in Plan.py and each vehicle had its own cache). # TODO: probably need to add vehicle_id to the key somehow (or just make it bigger) def _match(self, lookahead, point, filter_road_ids) -> bool: return ( lookahead <= self.lookahead and point[0] == self.point[0] and point[1] == self.point[1] and filter_road_ids == self.filter_road_ids ) def update( self, lookahead: int, point: Point, filter_road_ids: tuple, llp, paths: List[List[Waypoint]], ): """Update the current cache if not already cached.""" if not self._match(lookahead, point, filter_road_ids): self.lookahead = lookahead self.point = point self.filter_road_ids = filter_road_ids self._starts = {} self._starts[llp.lp.lane.index] = paths def query( self, lookahead: int, point: Point, filter_road_ids: tuple, llp, ) -> Optional[List[List[Waypoint]]]: """Attempt to find previously cached waypoints""" if self._match(lookahead, point, filter_road_ids): hit = self._starts.get(llp.lp.lane.index, None) if hit: # consider just returning all of them (not slicing)? return [path[: (lookahead + 1)] for path in hit] return None @cached_property def _lanepoints(self): assert self._map_spec.lanepoint_spacing > 0 return LanePoints.from_opendrive(self, spacing=self._map_spec.lanepoint_spacing)
[docs] def waypoint_paths( self, pose: Pose, lookahead: int, within_radius: float = 5, route: Optional[RoadMap.Route] = None, ) -> List[List[Waypoint]]: road_ids = [] if route and route.roads: road_ids = [road.road_id for road in route.roads] if road_ids: return self._waypoint_paths_along_route(pose.point, lookahead, road_ids) closest_lps = self._lanepoints.closest_lanepoints(pose) closest_lane = closest_lps[0].lane waypoint_paths = [] for lane in closest_lane.road.lanes: waypoint_paths += lane._waypoint_paths_at(pose.point, lookahead) return sorted(waypoint_paths, key=lambda p: p[0].lane_index)
def _waypoint_paths_along_route( self, point: Point, lookahead: int, route: Sequence[str] ) -> List[List[Waypoint]]: """finds the closest lane to vehicle's position that is on its route, then gets waypoint paths from all lanes in its road there.""" assert len(route) > 0, f"Expected at least 1 road in the route, got: {route}" closest_llp_on_each_route_road = [ self._lanepoints.closest_linked_lanepoint_on_road(point, road) for road in route ] closest_linked_lp = min( closest_llp_on_each_route_road, key=lambda l_lp: np.linalg.norm( vec_2d(l_lp.lp.pose.position) - vec_2d(point) ), ) closest_lane = closest_linked_lp.lp.lane waypoint_paths = [] for lane in closest_lane.road.lanes: if closest_lane.road.road_id != route[-1] and all( out_lane.road.road_id not in route for out_lane in lane.outgoing_lanes ): continue waypoint_paths += lane._waypoint_paths_at(point, lookahead, route) return sorted(waypoint_paths, key=len, reverse=True) def _equally_spaced_path( self, path: Sequence[LinkedLanePoint], point: Point, lp_spacing: float, width_threshold=None, ) -> List[Waypoint]: """given a list of LanePoints starting near point, return corresponding Waypoints that may not be evenly spaced (due to lane change) but start at point. """ continuous_variables = [ "positions_x", "positions_y", "headings", "lane_width", "speed_limit", "lane_offset", ] discrete_variables = ["lane_id", "lane_index"] ref_lanepoints_coordinates = { parameter: [] for parameter in (continuous_variables + discrete_variables) } curr_lane_id = None skip_lanepoints = False index_skipped = [] for idx, lanepoint in enumerate(path): if lanepoint.is_inferred and 0 < idx < len(path) - 1: continue if curr_lane_id is None: curr_lane_id = lanepoint.lp.lane.lane_id # Compute the lane offset for the lanepoint position position = Point( x=lanepoint.lp.pose.position[0], y=lanepoint.lp.pose.position[1], z=0.0 ) lane_coord = lanepoint.lp.lane.to_lane_coord(position) # Skip one-third of lanepoints for next lanes not outgoing to previous lane if skip_lanepoints: if lane_coord.s > (lanepoint.lp.lane.length / 3): skip_lanepoints = False else: index_skipped.append(idx) continue if lanepoint.lp.lane.lane_id != curr_lane_id: previous_lane = self._lanes[curr_lane_id] curr_lane_id = lanepoint.lp.lane.lane_id # if the current lane is not outgoing to previous lane, start skipping one third of its lanepoints if lanepoint.lp.lane not in previous_lane.outgoing_lanes: skip_lanepoints = True index_skipped.append(idx) continue # Compute the lane's width at lanepoint's position width_at_offset = lanepoint.lp.lane_width if idx != 0 and width_threshold and width_at_offset < width_threshold: index_skipped.append(idx) continue ref_lanepoints_coordinates["positions_x"].append( lanepoint.lp.pose.position[0] ) ref_lanepoints_coordinates["positions_y"].append( lanepoint.lp.pose.position[1] ) ref_lanepoints_coordinates["headings"].append( lanepoint.lp.pose.heading.as_bullet ) ref_lanepoints_coordinates["lane_id"].append(lanepoint.lp.lane.lane_id) ref_lanepoints_coordinates["lane_index"].append(lanepoint.lp.lane.index) ref_lanepoints_coordinates["lane_width"].append(width_at_offset) ref_lanepoints_coordinates["lane_offset"].append( lanepoint.lp.lane.offset_along_lane(lanepoint.lp.pose.point) ) ref_lanepoints_coordinates["speed_limit"].append( lanepoint.lp.lane.speed_limit ) ref_lanepoints_coordinates["headings"] = inplace_unwrap( ref_lanepoints_coordinates["headings"] ) first_lp_heading = ref_lanepoints_coordinates["headings"][0] lp_position = path[0].lp.pose.point.as_np_array[:2] vehicle_pos = point.as_np_array[:2] heading_vec = radians_to_vec(first_lp_heading) projected_distant_lp_vehicle = np.inner( (vehicle_pos - lp_position), heading_vec ) ref_lanepoints_coordinates["positions_x"][0] = ( lp_position[0] + projected_distant_lp_vehicle * heading_vec[0] ) ref_lanepoints_coordinates["positions_y"][0] = ( lp_position[1] + projected_distant_lp_vehicle * heading_vec[1] ) cumulative_path_dist = np.cumsum( np.sqrt( np.ediff1d(ref_lanepoints_coordinates["positions_x"], to_begin=0) ** 2 + np.ediff1d(ref_lanepoints_coordinates["positions_y"], to_begin=0) ** 2 ) ) if len(cumulative_path_dist) <= lp_spacing: lp = path[0].lp lp_position = Point(x=lp.pose.position[0], y=lp.pose.position[1], z=0.0) lp_lane_coord = lp.lane.to_lane_coord(lp_position) return [ Waypoint( pos=lp.pose.as_position2d(), heading=lp.pose.heading, lane_width=lp.lane.width_at_offset(lp_lane_coord.s)[0], speed_limit=lp.lane.speed_limit, lane_id=lp.lane.lane_id, lane_index=lp.lane.index, lane_offset=lp.lane.offset_along_lane(lp.pose.point), ) ] evenly_spaced_cumulative_path_dist = np.linspace( 0, cumulative_path_dist[-1], len(path) ) evenly_spaced_coordinates = {} for variable in continuous_variables: evenly_spaced_coordinates[variable] = np.interp( evenly_spaced_cumulative_path_dist, cumulative_path_dist, ref_lanepoints_coordinates[variable], ) for variable in discrete_variables: ref_coordinates = ref_lanepoints_coordinates[variable] evenly_spaced_coordinates[variable] = [] jdx = 0 for idx in range(len(path)): if idx in index_skipped: position = Point( x=evenly_spaced_coordinates["positions_x"][idx], y=evenly_spaced_coordinates["positions_y"][idx], z=0.0, ) nearest_lane = self.nearest_lane(position) if nearest_lane: if variable == "lane_id": evenly_spaced_coordinates[variable].append( nearest_lane.lane_id ) else: evenly_spaced_coordinates[variable].append( nearest_lane.index ) else: while ( jdx + 1 < len(cumulative_path_dist) and evenly_spaced_cumulative_path_dist[idx] > cumulative_path_dist[jdx + 1] ): jdx += 1 evenly_spaced_coordinates[variable].append(ref_coordinates[jdx]) evenly_spaced_coordinates[variable].append(ref_coordinates[-1]) waypoint_path = [] for idx in range(len(path)): waypoint_path.append( Waypoint( pos=np.array( [ evenly_spaced_coordinates["positions_x"][idx], evenly_spaced_coordinates["positions_y"][idx], ] ), heading=Heading(evenly_spaced_coordinates["headings"][idx]), lane_width=evenly_spaced_coordinates["lane_width"][idx], speed_limit=evenly_spaced_coordinates["speed_limit"][idx], lane_id=evenly_spaced_coordinates["lane_id"][idx], lane_index=evenly_spaced_coordinates["lane_index"][idx], lane_offset=evenly_spaced_coordinates["lane_offset"][idx], ) ) return waypoint_path def _waypoints_starting_at_lanepoint( self, lanepoint: LinkedLanePoint, lookahead: int, filter_road_ids: tuple, point: Point, ) -> List[List[Waypoint]]: """computes equally-spaced Waypoints for all lane paths starting at lanepoint up to lookahead waypoints ahead, constrained to filter_road_ids if specified.""" # The following acts sort of like lru_cache(1), but it allows # for lookahead to be <= to the cached value... cache_paths = self._waypoints_cache.query( lookahead, point, filter_road_ids, lanepoint ) if cache_paths: return cache_paths lanepoint_paths = self._lanepoints.paths_starting_at_lanepoint( lanepoint, lookahead, filter_road_ids ) result = [ self._equally_spaced_path( path, point, self._map_spec.lanepoint_spacing, self._default_lane_width / 2, ) for path in lanepoint_paths ] self._waypoints_cache.update( lookahead, point, filter_road_ids, lanepoint, result ) return result