# Copyright (C) 2020. Huawei Technologies Co., Ltd. All rights reserved.
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NON-INFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
# to allow for typing to refer to class being defined (LinkedLanePoint)
from __future__ import annotations
import queue
from collections import defaultdict
from dataclasses import dataclass
from functools import lru_cache
from typing import TYPE_CHECKING, List, NamedTuple, Sequence, Tuple
import numpy as np
from smarts.core.coordinates import Heading, Point, Pose
from smarts.core.utils.core_math import fast_quaternion_from_angle, vec_to_radians
if TYPE_CHECKING:
from smarts.core.road_map import RoadMap
[docs]@dataclass(frozen=True)
class LanePoint:
"""A point on a lane."""
lane: RoadMap.Lane
"""The lane this point is on."""
pose: Pose
"""The pose of this lane."""
lane_width: float
"""The width of the lane at this point."""
[docs]class LinkedLanePoint(NamedTuple):
"""A lane point that is linked to the next points in the road network."""
lp: LanePoint = None
is_inferred: bool = True
nexts: List[LinkedLanePoint] = [] # list of next immediate LanePoint(s)
# it's a list of LanePoints because a path may branch at junctions
def __hash__(self):
## distinguish between different continuations here too
## so the lru_cache on _lanepoint_paths_starting_at_lanepoint() below
## doesn't return the wrong set of LanePoints.
return hash((self.lp, tuple(nlp.lp for nlp in self.nexts)))
[docs]class LanePoints:
"""A LanePoint utility class."""
def __init__(self, shape_lps: List[LinkedLanePoint], spacing: float):
# XXX: for a big map, may not want to cache ALL of the potential LanePoints
# nor waste time here finding all of them.
# Lanepoints might be generated on demand based upon edges and look-ahead.
self._linked_lanepoints = LanePoints._interpolate_shape_lanepoints(
shape_lps, spacing
)
self._lp_points = np.array(
[l_lp.lp.pose.point.as_np_array[:2] for l_lp in self._linked_lanepoints]
)
self._lanepoints_by_lane_id = defaultdict(list)
self._lanepoints_by_edge_id = defaultdict(list)
for linked_lp in self._linked_lanepoints:
lp_edge_id = linked_lp.lp.lane.road.road_id
self._lanepoints_by_lane_id[linked_lp.lp.lane.lane_id].append(linked_lp)
self._lanepoints_by_edge_id[lp_edge_id].append(linked_lp)
self._lp_points_by_lane_id = {
lane_id: np.array([l_lp.lp.pose.point.as_np_array[:2] for l_lp in l_lps])
for lane_id, l_lps in self._lanepoints_by_lane_id.items()
}
self._lp_points_by_edge_id = {
edge_id: np.array([l_lp.lp.pose.point.as_np_array[:2] for l_lp in l_lps])
for edge_id, l_lps in self._lanepoints_by_edge_id.items()
}
[docs] @classmethod
def from_sumo(
cls,
sumo_road_network,
spacing,
):
"""Computes the lane shape (start/shape/end) lane-points for all lanes in
the network, the result of this function can be used to interpolate
lane-points along lanes to the desired granularity.
"""
from smarts.core.utils.sumo_utils import sumolib # isort:skip
from sumolib.net.edge import Edge # isort:skip
from sumolib.net.lane import Lane # isort:skip
from .sumo_road_network import SumoRoadNetwork
assert type(sumo_road_network) == SumoRoadNetwork
def _shape_lanepoints_along_lane(
road_map: SumoRoadNetwork, lane: RoadMap.Lane, lanepoint_by_lane_memo: dict
) -> Tuple[LinkedLanePoint, List[LinkedLanePoint]]:
lane_queue = queue.Queue()
lane_queue.put((lane, None))
shape_lanepoints = []
initial_lanepoint = None
while not lane_queue.empty():
lane, previous_lp = lane_queue.get()
first_lanepoint = lanepoint_by_lane_memo.get(lane.getID())
if first_lanepoint:
if previous_lp:
previous_lp.nexts.append(first_lanepoint)
continue
lane_shape = [np.array(p) for p in lane.getShape(False)]
assert len(lane_shape) >= 2, repr(lane_shape)
heading = vec_to_radians(lane_shape[1] - lane_shape[0])
heading = Heading(heading)
orientation = fast_quaternion_from_angle(heading)
lane_width, _ = road_map.lane_by_id(lane.getID()).width_at_offset(0)
first_lanepoint = LinkedLanePoint(
lp=LanePoint(
lane=road_map.lane_by_id(lane.getID()),
pose=Pose(position=lane_shape[0], orientation=orientation),
lane_width=lane_width,
),
nexts=[],
is_inferred=False,
)
if previous_lp is not None:
previous_lp.nexts.append(first_lanepoint)
if initial_lanepoint is None:
initial_lanepoint = first_lanepoint
lanepoint_by_lane_memo[lane.getID()] = first_lanepoint
shape_lanepoints.append(first_lanepoint)
curr_lanepoint = first_lanepoint
for p1, p2 in zip(lane_shape[1:], lane_shape[2:]):
heading_ = vec_to_radians(p2 - p1)
heading_ = Heading(heading_)
orientation_ = fast_quaternion_from_angle(heading_)
linked_lanepoint = LinkedLanePoint(
lp=LanePoint(
lane=road_map.lane_by_id(lane.getID()),
pose=Pose(position=p1, orientation=orientation_),
lane_width=lane_width,
),
nexts=[],
is_inferred=False,
)
shape_lanepoints.append(linked_lanepoint)
curr_lanepoint.nexts.append(linked_lanepoint)
curr_lanepoint = linked_lanepoint
# Add a lane-point for the last point of the current lane
lane_width, _ = curr_lanepoint.lp.lane.width_at_offset(0)
last_linked_lanepoint = LinkedLanePoint(
lp=LanePoint(
lane=curr_lanepoint.lp.lane,
pose=Pose(
position=lane_shape[-1][:2],
orientation=curr_lanepoint.lp.pose.orientation,
),
lane_width=lane_width,
),
nexts=[],
is_inferred=False,
)
shape_lanepoints.append(last_linked_lanepoint)
curr_lanepoint.nexts.append(last_linked_lanepoint)
curr_lanepoint = last_linked_lanepoint
for out_connection in lane.getOutgoing():
out_lane = out_connection.getToLane()
# Use internal lanes of junctions (if we're at a junction)
via_lane_id = out_connection.getViaLaneID()
if via_lane_id:
out_lane = road_map._graph.getLane(via_lane_id)
lane_queue.put((out_lane, curr_lanepoint))
return initial_lanepoint, shape_lanepoints
# Don't request internal lanes since we get them by calling
# `lane.getViaLaneID()`
edges = sumo_road_network._graph.getEdges(False)
lanepoint_by_lane_memo = {}
shape_lps = []
for edge in edges:
for lane in edge.getLanes():
_, new_lps = _shape_lanepoints_along_lane(
sumo_road_network, lane, lanepoint_by_lane_memo
)
shape_lps += new_lps
return cls(shape_lps, spacing)
[docs] @classmethod
def from_opendrive(
cls,
od_road_network,
spacing,
):
"""Computes the lane shape (start/shape/end) lane-points for all lanes in
the network, the result of this function can be used to interpolate
lane-points along lanes to the desired granularity.
"""
from .opendrive_road_network import OpenDriveRoadNetwork
assert type(od_road_network) == OpenDriveRoadNetwork
def _shape_lanepoints_along_lane(
lane: RoadMap.Lane,
lanepoint_by_lane_memo: dict,
) -> Tuple[LinkedLanePoint, List[LinkedLanePoint]]:
lane_queue = queue.Queue()
lane_queue.put((lane, None))
shape_lanepoints = []
initial_lanepoint = None
while not lane_queue.empty():
curr_lane, previous_lp = lane_queue.get()
first_lanepoint = lanepoint_by_lane_memo.get(curr_lane.lane_id)
if first_lanepoint:
if previous_lp:
previous_lp.nexts.append(first_lanepoint)
continue
lane_shape = [p.as_np_array[:2] for p in curr_lane.center_polyline]
assert len(lane_shape) >= 2, repr(lane_shape)
heading = vec_to_radians(lane_shape[1] - lane_shape[0])
heading = Heading(heading)
orientation = fast_quaternion_from_angle(heading)
first_lane_coord = curr_lane.to_lane_coord(
Point(x=lane_shape[0][0], y=lane_shape[0][1], z=0.0)
)
lane_width, _ = curr_lane.width_at_offset(first_lane_coord.s)
first_lanepoint = LinkedLanePoint(
lp=LanePoint(
lane=curr_lane,
pose=Pose(position=lane_shape[0], orientation=orientation),
lane_width=lane_width,
),
nexts=[],
is_inferred=False,
)
if previous_lp is not None:
previous_lp.nexts.append(first_lanepoint)
if initial_lanepoint is None:
initial_lanepoint = first_lanepoint
lanepoint_by_lane_memo[curr_lane.lane_id] = first_lanepoint
shape_lanepoints.append(first_lanepoint)
curr_lanepoint = first_lanepoint
for p1, p2 in zip(lane_shape[1:], lane_shape[2:]):
heading_ = vec_to_radians(p2 - p1)
heading_ = Heading(heading_)
orientation_ = fast_quaternion_from_angle(heading_)
lp_lane_coord = curr_lane.to_lane_coord(
Point(x=p1[0], y=p1[1], z=0.0)
)
lane_width, _ = curr_lane.width_at_offset(lp_lane_coord.s)
linked_lanepoint = LinkedLanePoint(
lp=LanePoint(
lane=curr_lane,
pose=Pose(position=p1, orientation=orientation_),
lane_width=lane_width,
),
nexts=[],
is_inferred=False,
)
shape_lanepoints.append(linked_lanepoint)
curr_lanepoint.nexts.append(linked_lanepoint)
curr_lanepoint = linked_lanepoint
# Add a lane-point for the last point of the current lane
last_lane_coord = curr_lanepoint.lp.lane.to_lane_coord(
Point(x=lane_shape[-1][0], y=lane_shape[-1][1], z=0.0)
)
lane_width, _ = curr_lanepoint.lp.lane.width_at_offset(
last_lane_coord.s
)
last_linked_lanepoint = LinkedLanePoint(
lp=LanePoint(
lane=curr_lanepoint.lp.lane,
pose=Pose(
position=lane_shape[-1],
orientation=curr_lanepoint.lp.pose.orientation,
),
lane_width=lane_width,
),
nexts=[],
is_inferred=False,
)
shape_lanepoints.append(last_linked_lanepoint)
curr_lanepoint.nexts.append(last_linked_lanepoint)
curr_lanepoint = last_linked_lanepoint
outgoing_roads_added = []
for out_lane in curr_lane.outgoing_lanes:
if out_lane.is_drivable:
lane_queue.put((out_lane, curr_lanepoint))
outgoing_road = out_lane.road
if out_lane.road not in outgoing_roads_added:
outgoing_roads_added.append(outgoing_road)
for outgoing_road in outgoing_roads_added:
for next_lane in outgoing_road.lanes:
if (
next_lane.is_drivable
and next_lane not in curr_lane.outgoing_lanes
):
lane_queue.put((next_lane, curr_lanepoint))
return initial_lanepoint, shape_lanepoints
roads = od_road_network._roads
lanepoint_by_lane_memo = {}
shape_lps = []
for road_id in roads:
road = roads[road_id]
# go ahead and add lane-points for composite lanes,
# even though we don't on other map formats,
# and then filter these out on lane-point queries.
# (not an issue right now for OpenDrive since we don't
# find composite lanes, but it may be in the future.)
for lane in road.lanes:
# Ignore non drivable lanes in OpenDRIVE
if lane.is_drivable:
_, new_lps = _shape_lanepoints_along_lane(
lane, lanepoint_by_lane_memo
)
shape_lps += new_lps
return cls(shape_lps, spacing)
[docs] @classmethod
def from_waymo(
cls,
waymo_road_network,
spacing,
):
"""Computes the lane shape (start/shape/end) lane-points for all lanes in
the network, the result of this function can be used to interpolate
lane-points along lanes to the desired granularity.
"""
from .waymo_map import WaymoMap
assert type(waymo_road_network) == WaymoMap
def _shape_lanepoints_along_lane(
lane: RoadMap.Lane,
lanepoint_by_lane_memo: dict,
) -> Tuple[LinkedLanePoint, List[LinkedLanePoint]]:
lane_queue = queue.Queue()
lane_queue.put((lane, None))
shape_lanepoints = []
initial_lanepoint = None
while not lane_queue.empty():
curr_lane, previous_lp = lane_queue.get()
first_lanepoint = lanepoint_by_lane_memo.get(curr_lane.lane_id)
if first_lanepoint:
if previous_lp:
previous_lp.nexts.append(first_lanepoint)
continue
lane_shape = curr_lane._lane_pts
assert (
len(lane_shape) >= 2
), f"{repr(lane_shape)} for lane_id={curr_lane.lane_id}"
vd = lane_shape[1] - lane_shape[0]
heading = Heading(vec_to_radians(vd[:2]))
orientation = fast_quaternion_from_angle(heading)
lane_width, _ = curr_lane.width_at_offset(0)
first_lanepoint = LinkedLanePoint(
lp=LanePoint(
lane=curr_lane,
pose=Pose(position=lane_shape[0], orientation=orientation),
lane_width=lane_width,
),
nexts=[],
is_inferred=False,
)
if previous_lp is not None:
previous_lp.nexts.append(first_lanepoint)
if initial_lanepoint is None:
initial_lanepoint = first_lanepoint
lanepoint_by_lane_memo[curr_lane.lane_id] = first_lanepoint
shape_lanepoints.append(first_lanepoint)
curr_lanepoint = first_lanepoint
for p1, p2 in zip(lane_shape[1:], lane_shape[2:]):
vd = p2 - p1
heading_ = Heading(vec_to_radians(vd[:2]))
orientation_ = fast_quaternion_from_angle(heading_)
lane_width, _ = curr_lane.width_at_offset(0)
linked_lanepoint = LinkedLanePoint(
lp=LanePoint(
lane=curr_lane,
pose=Pose(position=p1, orientation=orientation_),
lane_width=lane_width,
),
nexts=[],
is_inferred=False,
)
shape_lanepoints.append(linked_lanepoint)
curr_lanepoint.nexts.append(linked_lanepoint)
curr_lanepoint = linked_lanepoint
# Add a lanepoint for the last point of the current lane
curr_lanepoint_lane = curr_lanepoint.lp.lane
lane_width, _ = curr_lanepoint_lane.width_at_offset(0)
last_linked_lanepoint = LinkedLanePoint(
lp=LanePoint(
lane=curr_lanepoint.lp.lane,
pose=Pose(
position=lane_shape[-1],
orientation=curr_lanepoint.lp.pose.orientation,
),
lane_width=lane_width,
),
nexts=[],
is_inferred=False,
)
shape_lanepoints.append(last_linked_lanepoint)
curr_lanepoint.nexts.append(last_linked_lanepoint)
curr_lanepoint = last_linked_lanepoint
for out_lane in curr_lane.outgoing_lanes:
if out_lane and out_lane.is_drivable:
lane_queue.put((out_lane, curr_lanepoint))
return initial_lanepoint, shape_lanepoints
roads = waymo_road_network._roads
lanepoint_by_lane_memo = {}
shape_lps = []
for road_id in roads:
road = roads[road_id]
# go ahead and add lane-points for composite lanes,
# even though we don't on other map formats,
# and then filter these out on lane-point queries.
for lane in road.lanes:
# Ignore non drivable lanes in Waymo
if lane.is_drivable:
_, new_lps = _shape_lanepoints_along_lane(
lane, lanepoint_by_lane_memo
)
shape_lps += new_lps
return cls(shape_lps, spacing)
[docs] @classmethod
def from_argoverse(
cls,
argoverse_map,
spacing,
):
"""Computes the lane shape (start/shape/end) lane-points for all lanes in
the network, the result of this function can be used to interpolate
lane-points along lanes to the desired granularity.
"""
from .argoverse_map import ArgoverseMap
assert type(argoverse_map) == ArgoverseMap
def _shape_lanepoints_along_lane(
lane: RoadMap.Lane,
lanepoint_by_lane_memo: dict,
) -> Tuple[LinkedLanePoint, List[LinkedLanePoint]]:
lane_queue = queue.Queue()
lane_queue.put((lane, None))
shape_lanepoints = []
initial_lanepoint = None
while not lane_queue.empty():
curr_lane, previous_lp = lane_queue.get()
first_lanepoint = lanepoint_by_lane_memo.get(curr_lane.lane_id)
if first_lanepoint:
if previous_lp:
previous_lp.nexts.append(first_lanepoint)
continue
lane_shape = curr_lane._centerline
assert (
len(lane_shape) >= 2
), f"{repr(lane_shape)} for lane_id={curr_lane.lane_id}"
vd = lane_shape[1] - lane_shape[0]
heading = Heading(vec_to_radians(vd[:2]))
orientation = fast_quaternion_from_angle(heading)
lane_width, _ = curr_lane.width_at_offset(0)
first_lanepoint = LinkedLanePoint(
lp=LanePoint(
lane=curr_lane,
pose=Pose(position=lane_shape[0], orientation=orientation),
lane_width=lane_width,
),
nexts=[],
is_inferred=False,
)
if previous_lp is not None:
previous_lp.nexts.append(first_lanepoint)
if initial_lanepoint is None:
initial_lanepoint = first_lanepoint
lanepoint_by_lane_memo[curr_lane.lane_id] = first_lanepoint
shape_lanepoints.append(first_lanepoint)
curr_lanepoint = first_lanepoint
for p1, p2 in zip(lane_shape[1:], lane_shape[2:]):
vd = p2 - p1
heading_ = Heading(vec_to_radians(vd[:2]))
orientation_ = fast_quaternion_from_angle(heading_)
lane_width, _ = curr_lane.width_at_offset(0)
linked_lanepoint = LinkedLanePoint(
lp=LanePoint(
lane=curr_lane,
pose=Pose(position=p1, orientation=orientation_),
lane_width=lane_width,
),
nexts=[],
is_inferred=False,
)
shape_lanepoints.append(linked_lanepoint)
curr_lanepoint.nexts.append(linked_lanepoint)
curr_lanepoint = linked_lanepoint
# Add a lanepoint for the last point of the current lane
curr_lanepoint_lane = curr_lanepoint.lp.lane
lane_width, _ = curr_lanepoint_lane.width_at_offset(0)
last_linked_lanepoint = LinkedLanePoint(
lp=LanePoint(
lane=curr_lanepoint.lp.lane,
pose=Pose(
position=lane_shape[-1],
orientation=curr_lanepoint.lp.pose.orientation,
),
lane_width=lane_width,
),
nexts=[],
is_inferred=False,
)
shape_lanepoints.append(last_linked_lanepoint)
curr_lanepoint.nexts.append(last_linked_lanepoint)
curr_lanepoint = last_linked_lanepoint
for out_lane in curr_lane.outgoing_lanes:
if out_lane and out_lane.is_drivable:
lane_queue.put((out_lane, curr_lanepoint))
return initial_lanepoint, shape_lanepoints
roads = argoverse_map._roads
lanepoint_by_lane_memo = {}
shape_lps = []
for road in roads.values():
for lane in road.lanes:
if lane.is_drivable:
_, new_lps = _shape_lanepoints_along_lane(
lane, lanepoint_by_lane_memo
)
shape_lps += new_lps
return cls(shape_lps, spacing)
@staticmethod
def _interpolate_shape_lanepoints(
shape_lanepoints: Sequence[LinkedLanePoint], spacing: float
) -> List[LinkedLanePoint]:
# memoize interpolated lane-points on the shape lane-point at start of
# the line we are interpolating
interp_memo = {}
linked_lanepoints = []
for shape_lp in shape_lanepoints:
_, new_lanepoints = LanePoints._interpolate_from_shape_lp(
shape_lp, spacing, interp_memo
)
linked_lanepoints += new_lanepoints
return linked_lanepoints
@staticmethod
def _interpolate_from_shape_lp(
shape_lp: LinkedLanePoint, spacing: float, interp_memo: dict
) -> Tuple[LinkedLanePoint, List[LinkedLanePoint]]:
shape_queue = queue.Queue()
shape_queue.put((shape_lp, None))
newly_created_lanepoints = []
initial_lanepoint = None
while not shape_queue.empty():
shape_lp, previous_lp = shape_queue.get()
first_linked_lanepoint = interp_memo.get(shape_lp.lp)
if first_linked_lanepoint:
if previous_lp:
previous_lp.nexts.append(first_linked_lanepoint)
continue
first_linked_lanepoint = LinkedLanePoint(
lp=shape_lp.lp, # lane-points are frozen, so no need to copy lp here
nexts=[],
is_inferred=False,
)
if previous_lp is not None:
previous_lp.nexts.append(first_linked_lanepoint)
if initial_lanepoint is None:
initial_lanepoint = first_linked_lanepoint
interp_memo[shape_lp.lp] = first_linked_lanepoint
newly_created_lanepoints.append(first_linked_lanepoint)
for current_shape_lp in shape_lp.nexts:
if (
current_shape_lp.lp.lane.lane_id == shape_lp.lp.lane.lane_id
or current_shape_lp.lp.lane in shape_lp.lp.lane.outgoing_lanes
):
next_shape_lp = LanePoints._process_interp_for_lane_lp(
shape_lp,
first_linked_lanepoint,
current_shape_lp,
spacing,
newly_created_lanepoints,
)
shape_queue.put((current_shape_lp, next_shape_lp))
else:
shape_queue.put((current_shape_lp, first_linked_lanepoint))
return initial_lanepoint, newly_created_lanepoints
@staticmethod
def _process_interp_for_lane_lp(
shape_lp: LinkedLanePoint,
first_linked_lanepoint: LinkedLanePoint,
next_shape_lp: LinkedLanePoint,
spacing: float,
newly_created_lanepoints: List[LinkedLanePoint],
) -> LinkedLanePoint:
rmlane = shape_lp.lp.lane
curr_lanepoint = first_linked_lanepoint
lane_seg_vec = (
next_shape_lp.lp.pose.as_position2d() - shape_lp.lp.pose.as_position2d()
)
lane_seg_len = np.linalg.norm(lane_seg_vec)
# We set the initial distance into the lane at `spacing` because
# we already have a lane-point along this segment (curr_lanepoint)
dist_into_lane_seg = spacing
while dist_into_lane_seg < lane_seg_len:
p = dist_into_lane_seg / lane_seg_len
pos = shape_lp.lp.pose.as_position2d() + lane_seg_vec * p
# The thresholds for calculating last lane-point. If the
# midpoint between the current lane-point and the next shape
# lane-point is less than the minimum distance then the last
# lane-point position will be that midpoint. If the midpoint
# is closer than last spacing threshold to the next shape
# lane-point, then the last lane-point will be the current
# lane-point.
# XXX: the map scale should be taken into account here.
last_spacing_threshold_dist = 0.8 * spacing
minimum_dist_next_shape_lp = 1.4
half_distant_current_next_shape_lp = np.linalg.norm(
0.5
* (
curr_lanepoint.lp.pose.as_position2d()
- next_shape_lp.lp.pose.as_position2d()
)
)
mid_point_current_next_shape_lp = 0.5 * (
next_shape_lp.lp.pose.as_position2d()
+ curr_lanepoint.lp.pose.as_position2d()
)
if half_distant_current_next_shape_lp < minimum_dist_next_shape_lp:
pos = mid_point_current_next_shape_lp
dist_pos_next_shape_lp = np.linalg.norm(
next_shape_lp.lp.pose.as_position2d() - pos
)
if dist_pos_next_shape_lp < last_spacing_threshold_dist:
break
heading = vec_to_radians(lane_seg_vec)
orientation = fast_quaternion_from_angle(heading)
rmlane_coord = rmlane.to_lane_coord(Point(x=pos[0], y=pos[1], z=0.0))
lane_width, _ = rmlane.width_at_offset(rmlane_coord.s)
linked_lanepoint = LinkedLanePoint(
lp=LanePoint(
lane=rmlane,
pose=Pose(position=pos, orientation=orientation),
lane_width=lane_width,
),
nexts=[],
is_inferred=True,
)
curr_lanepoint.nexts.append(linked_lanepoint)
curr_lanepoint = linked_lanepoint
newly_created_lanepoints.append(linked_lanepoint)
dist_into_lane_seg += spacing
return curr_lanepoint
@staticmethod
def _closest_linked_lp_to_point(
point: Point,
linked_lps: List[LinkedLanePoint],
points: np.ndarray,
k: int = 1,
filter_composites: bool = False,
) -> List[LinkedLanePoint]:
x = point.as_np_array[:2]
dists = np.sqrt(np.sum((points - x) ** 2, axis=1))
closest_indices = np.argsort(dists)[:k]
if filter_composites:
result = [
linked_lps[idx]
for idx in closest_indices
if not linked_lps[idx].lp.lane.is_composite
]
if result:
return result
# if filtering, only return lane-points in composite lanes if we didn't hit any in simple lanes...
return [linked_lps[idx] for idx in closest_indices]
[docs] def closest_lanepoints(
self,
pose: Pose,
maximum_count: int = 10,
) -> List[LanePoint]:
"""Get the lane-points closest to the given pose.
Args:
pose:
The pose to look around for lane-points.
maximum_count:
The maximum number of lane-points that should be found.
"""
linked_lanepoints = LanePoints._closest_linked_lp_to_point(
pose.point,
self._linked_lanepoints,
self._lp_points,
k=maximum_count,
filter_composites=True,
)
return [llp.lp for llp in linked_lanepoints]
[docs] def closest_linked_lanepoint_on_lane_to_point(
self, point: Point, lane_id: str
) -> LinkedLanePoint:
"""Returns the closest linked lane-point on the given lane."""
return LanePoints._closest_linked_lp_to_point(
point,
self._lanepoints_by_lane_id[lane_id],
self._lp_points_by_lane_id[lane_id],
k=1,
)[0]
[docs] def closest_linked_lanepoint_on_road(
self, point: Point, road_id: str
) -> LinkedLanePoint:
"""Returns the closest linked lane-point on the given road."""
return LanePoints._closest_linked_lp_to_point(
point,
self._lanepoints_by_edge_id[road_id],
self._lp_points_by_edge_id[road_id],
)[0]
[docs] @lru_cache(maxsize=32)
def paths_starting_at_lanepoint(
self, lanepoint: LinkedLanePoint, lookahead: int, route_edge_ids: tuple
) -> List[List[LinkedLanePoint]]:
"""Returns all full branches from the given lane-point up to the length of the look-ahead.
Branches will be filtered at the lane level if they or their outgoing lanes do not belong
to a road in the route edge list.
Args:
lanepoint (LinkedLanePoint):
The starting lane-point.
lookahead (int):
The maximum lane-points in a branch.
route_edge_ids (Tuple[str]):
White-listed edge ids for a route.
Returns:
All branches (as lists) stemming from the input lane-point.
"""
# Early exit if there are no valid paths ahead in the route for this lane
lp_lane = lanepoint.lp.lane
if (
route_edge_ids
and lp_lane.road.road_id != route_edge_ids[-1]
and all(
out_lane.road.road_id not in route_edge_ids
for out_lane in lp_lane.outgoing_lanes
)
):
return []
lanepoint_paths = [[lanepoint]]
for _ in range(lookahead):
next_lanepoint_paths = []
for path in lanepoint_paths:
branching_paths = []
for next_lp in path[-1].nexts:
# TODO: This could be a problem for SUMO. What about internal lanes?
# Filter only the edges we're interested in
next_lane = next_lp.lp.lane
edge_id = next_lane.road.road_id
if route_edge_ids and edge_id not in route_edge_ids:
continue
if (
route_edge_ids
and edge_id != route_edge_ids[-1]
and all(
out_lane.road.road_id not in route_edge_ids
for out_lane in next_lane.outgoing_lanes
)
):
continue
new_path = path + [next_lp]
branching_paths.append(new_path)
if not branching_paths:
branching_paths = [path]
next_lanepoint_paths += branching_paths
lanepoint_paths = next_lanepoint_paths
return lanepoint_paths