Source code for smarts.core.route_cache

# Copyright (C) 2022. Huawei Technologies Co., Ltd. All rights reserved.
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.

import logging
import math
from dataclasses import dataclass, field
from functools import cached_property, lru_cache
from typing import Dict, List, Optional, Sequence, Set, Tuple

from .road_map import RoadMap

# cache_keys shouldn't be exposed/used outside of this Route
_RouteKey = int


@dataclass
class _LaneContinuation:
    """Struct containing information about the future of a Lane along a Route."""

    dist_to_end: float = 0.0
    dist_to_junction: float = math.inf
    next_junction: Optional[RoadMap.Lane] = None
    dist_to_road: Dict[RoadMap.Road, float] = field(default_factory=dict)


_route_sub_lengths: Dict[
    _RouteKey, Dict[RoadMap.Route.RouteLane, _LaneContinuation]
] = dict()


[docs]class RouteWithCache(RoadMap.Route): """A cache for commonly-needed but expensive-to-compute information about RoadMap.Routes.""" def __init__( self, road_map: RoadMap, start_lane: Optional[RoadMap.Lane] = None, end_lane: Optional[RoadMap.Lane] = None, ): self._map = road_map self._logger = logging.getLogger(self.__class__.__name__) self._start_lane: Optional[RoadMap.Lane] = start_lane self._end_lane: Optional[RoadMap.Lane] = end_lane def __hash__(self) -> int: key: int = self._cache_key # pytype: disable=annotation-type-mismatch return key def __eq__(self, other) -> bool: return self.__class__ == other.__class__ and hash(self) == hash(other) @property def start_lane(self) -> Optional[RoadMap.Lane]: "Route's start lane." return self._start_lane @property def end_lane(self) -> Optional[RoadMap.Lane]: "Route's end lane." return self._end_lane @cached_property def road_ids(self) -> List[str]: """Get the road IDs for this route. Returns: (List[str]): A list of the road IDs for the Roads in this Route. """ return [road.road_id for road in self.roads]
[docs] @staticmethod def from_road_ids( road_map, road_ids: Sequence[str], resolve_intermediaries: bool = False, ) -> RoadMap.Route: """Factory to generate a new RouteWithCache from a sequence of road ids.""" if len(road_ids) > 0 and resolve_intermediaries: via_roads = [road_map.road_by_id(r) for r in road_ids[1:-1]] routes = road_map.generate_routes( start=road_map.road_by_id(road_ids[0]), end=road_map.road_by_id(road_ids[-1]), via=via_roads, max_to_gen=1, ) if len(routes) > 0: return routes[0] route_roads = [] for road_id in road_ids: road = road_map.road_by_id(road_id) assert road, f"cannot add unknown road {road_id} to route" route_roads.append(road) return road_map.Route(road_map=road_map, roads=route_roads)
@cached_property def _cache_key(self) -> _RouteKey: return ( hash(tuple(road.road_id for road in self.roads)) ^ hash(self._map) ^ hash((self.start_lane, self.end_lane)) ) @property def is_cached(self) -> bool: """Returns True if information about this Route has been cached.""" return self._cache_key in _route_sub_lengths
[docs] def remove_from_cache(self): """Remove information about this Route from the cache.""" if self.is_cached: del _route_sub_lengths[self._cache_key]
# TAI: could pre-cache curvatures here too (like waypoints) ?
[docs] def add_to_cache(self): """Add information about this Route to the cache if not already there.""" if self.is_cached: return cache_key = self._cache_key _route_sub_lengths[ cache_key ] = dict() # pytype: disable=container-type-mismatch def _backprop_length( bplane: RoadMap.Lane, length: float, rind: int, junction: bool, final_lane: RoadMap.Lane, ): assert rind >= 0 rind -= 1 for il in bplane.incoming_lanes: rl = RoadMap.Route.RouteLane(il, rind) il_cont = _route_sub_lengths[cache_key].get(rl) if il_cont is not None: if junction: if il.in_junction: junction = False else: il_cont.dist_to_junction = il_cont.dist_to_end il_cont.next_junction = final_lane il_cont.dist_to_road[final_lane.road] = il_cont.dist_to_end il_cont.dist_to_end += length _backprop_length(il, length, rind, junction, final_lane) road = None for r_ind, road in enumerate(self.roads): for lane in road.lanes: # r_ind is required to correctly handle routes with sub-cycles rl = RoadMap.Route.RouteLane(lane, r_ind) assert rl not in _route_sub_lengths[cache_key] _backprop_length(lane, lane.length, r_ind, lane.in_junction, lane) lc = _LaneContinuation(lane.length) if lane.in_junction: lc.next_junction = lane lc.dist_to_junction = 0.0 _route_sub_lengths[cache_key][rl] = lc if not road: return # give lanes that would form a loop an advantage... first_road = self.roads[0] for lane in road.lanes: rl = RoadMap.Route.RouteLane(lane, r_ind) for og in lane.outgoing_lanes: if og.road == first_road: _route_sub_lengths[cache_key][rl].dist_to_end += 1
def _find_along( self, rpt: RoadMap.Route.RoutePoint, radius: float = 30.0 ) -> Optional[RoadMap.Route.RouteLane]: for cand_lane, _ in self._map.nearest_lanes( rpt.pt, radius, include_junctions=True ): try: rind = self.roads.index(cand_lane.road) if rind >= 0 and (rpt.road_index is None or rpt.road_index == rind): return RoadMap.Route.RouteLane(cand_lane, rind) except ValueError: pass self._logger.warning("Unable to find road on route near point %s", rpt) return None
[docs] @lru_cache(maxsize=8) def distance_between( self, start: RoadMap.Route.RoutePoint, end: RoadMap.Route.RoutePoint ) -> Optional[float]: rt_ln = self._find_along(start) if not rt_ln: return None start_lane = rt_ln.lane sind = rt_ln.road_index start_road = start_lane.road rt_ln = self._find_along(end) if not rt_ln: return None end_lane = rt_ln.lane eind = rt_ln.road_index end_road = end_lane.road d = 0.0 start_offset = start_lane.offset_along_lane(start.pt) end_offset = end_lane.offset_along_lane(end.pt) if start_road == end_road and sind == eind: return end_offset - start_offset negate = False if sind > eind: start_lane = end_lane start_road, end_road = end_road, start_road start_offset, end_offset = end_offset, start_offset negate = True d = end_offset + start_lane.length - start_offset for rind, road in enumerate(self.roads): if rind >= eind: break if rind <= sind: continue d += road.length return -d if negate else d
[docs] @lru_cache(maxsize=8) def project_along( self, start: RoadMap.Route.RoutePoint, distance: float ) -> Optional[Set[Tuple[RoadMap.Lane, float]]]: rt_ln = self._find_along(start) if not rt_ln: return None start_lane = rt_ln.lane sind = rt_ln.road_index orig_offset = start_lane.offset_along_lane(start.pt) for rind, road in enumerate(self.roads): if rind < sind: continue start_offset = 0 if rind != sind else orig_offset if distance > road.length - start_offset: distance -= road.length - start_offset continue return {(lane, distance) for lane in road.lanes} return set()
[docs] def distance_from( self, cur_lane: RoadMap.Route.RouteLane, route_road: Optional[RoadMap.Road] = None, ) -> Optional[float]: self.add_to_cache() lc = _route_sub_lengths[self._cache_key].get(cur_lane) if not lc: return None if route_road: return lc.dist_to_road.get(route_road) return lc.dist_to_end
[docs] def next_junction( self, cur_lane: RoadMap.Route.RouteLane, offset: float ) -> Tuple[Optional[RoadMap.Lane], float]: self.add_to_cache() lc = _route_sub_lengths[self._cache_key].get(cur_lane) if lc: dist = lc.dist_to_junction if lc.dist_to_junction > 0: dist -= offset assert dist >= 0 return lc.next_junction, dist return None, math.inf