Source code for smarts.core.plan

# Copyright (C) 2020. Huawei Technologies Co., Ltd. All rights reserved.
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NON-INFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.

# to allow for typing to refer to class being defined (Mission)...
from __future__ import annotations

import math
import random
import sys
import warnings
from dataclasses import dataclass, field
from typing import List, Optional, Tuple, Type

import numpy as np

from smarts.core.coordinates import Dimensions, Heading, Point, Pose, RefLinePoint
from smarts.core.road_map import RoadMap
from smarts.core.utils.core_math import min_angles_difference_signed, vec_to_radians
from smarts.sstudio.sstypes import EntryTactic, TrapEntryTactic

MISSING = sys.maxsize


[docs]class PlanningError(Exception): """Raised in cases when map related planning fails.""" pass
# XXX: consider using smarts.core.coordinates.Pose for this
[docs]@dataclass(frozen=True) class Start: """A starting state for a route or mission.""" position: np.ndarray heading: Heading from_front_bumper: Optional[bool] = True @property def point(self) -> Point: """The coordinate of this starting location.""" return Point.from_np_array(self.position)
[docs] @classmethod def from_pose(cls, pose: Pose): """Convert to a starting location from a pose.""" return cls( position=pose.as_position2d(), heading=pose.heading, from_front_bumper=False, )
def __hash__(self): hash_ = getattr(self, "hash", None) if not hash_: hash_ = hash( ( tuple(self.position), self.heading, self.from_front_bumper, ) ) object.__setattr__(self, "hash", hash_) return hash_
[docs]@dataclass(frozen=True, unsafe_hash=True) class Goal: """Describes an expected end state for a route or mission."""
[docs] def is_specific(self) -> bool: """If the goal is reachable at a specific position.""" return False
[docs] def is_reached(self, vehicle_state) -> bool: """If the goal has been completed.""" return False
[docs]@dataclass(frozen=True, unsafe_hash=True) class EndlessGoal(Goal): """A goal that can never be completed.""" pass
[docs]@dataclass(frozen=True, unsafe_hash=True) class PositionalGoal(Goal): """A goal that can be completed by reaching an end area.""" position: Point # target_heading: Heading radius: float
[docs] @classmethod def from_road( cls, road_id: str, road_map: RoadMap, lane_index: int = 0, lane_offset: Optional[float] = None, radius: float = 1, ): """Generate the goal ending at the specified road lane.""" road = road_map.road_by_id(road_id) lane = road.lane_at_index(lane_index) if lane_offset is None: # Default to the midpoint safely ensuring we are on the lane and not # bordering another lane_offset = lane.length * 0.5 position = lane.from_lane_coord(RefLinePoint(lane_offset)) return cls(position=position, radius=radius)
[docs] def is_specific(self) -> bool: return True
[docs] def is_reached(self, vehicle_state) -> bool: a = vehicle_state.pose.position b = self.position sqr_dist = (a[0] - b.x) ** 2 + (a[1] - b.y) ** 2 return sqr_dist <= self.radius**2
[docs]@dataclass(frozen=True, unsafe_hash=True) class TraverseGoal(Goal): """A TraverseGoal is satisfied whenever an Agent-driven vehicle successfully finishes traversing a non-closed (acyclic) map It's a way for the vehicle to exit the simulation successfully, for example, driving across from one side to the other on a straight road and then continuing off the map. This goal is non-specific about *where* the map is exited, save for that the vehicle must be going the correct direction in its lane just prior to doing so.""" road_map: RoadMap
[docs] def is_specific(self) -> bool: return False
[docs] def is_reached(self, vehicle_state) -> bool: pose = vehicle_state.pose return self._drove_off_map(pose.point, pose.heading)
def _drove_off_map(self, veh_pos: Point, veh_heading: float) -> bool: # try to determine if the vehicle "exited" the map by driving beyond the end of a dead-end lane. nearest_lanes = self.road_map.nearest_lanes(veh_pos) if not nearest_lanes: return False # we can't tell anything here nl, dist = nearest_lanes[0] offset = nl.to_lane_coord(veh_pos).s nl_width, conf = nl.width_at_offset(offset) if conf > 0.5: if nl.outgoing_lanes or dist < 0.5 * nl_width + 1e-1: return False # the last lane it was in was not a dead-end, or it's still in a lane if offset < nl.length - 2 * nl_width: return False # it's no where near the end of the lane # now check its heading to ensure it was going in roughly the right direction for this lane end_vec = nl.vector_at_offset(nl.length - 0.1) end_heading = vec_to_radians(end_vec[:2]) heading_err = min_angles_difference_signed(end_heading, veh_heading) return abs(heading_err) < math.pi / 6
[docs]def default_entry_tactic(default_entry_speed: Optional[float] = None) -> EntryTactic: """The default tactic the simulation will use to acquire an actor for an agent.""" return TrapEntryTactic( start_time=MISSING, wait_to_hijack_limit_s=0, exclusion_prefixes=tuple(), zone=None, default_entry_speed=default_entry_speed, )
[docs]@dataclass(frozen=True) class Via: """Describes a collectible item that can be used to shape rewards.""" lane_id: str road_id: str lane_index: int position: Tuple[float, float] hit_distance: float required_speed: float
[docs]@dataclass(frozen=True) class VehicleSpec: """Vehicle specifications""" veh_id: str veh_config_type: str dimensions: Dimensions
[docs]@dataclass(frozen=True) class LapMission(NavigationMission): """A mission requiring a number of laps through the goal.""" num_laps: Optional[int] = None # None means infinite # of laps # If a route was specified in a sstudio.sstypes.LapMission object, # then this should be set to its road length route_length: Optional[float] = None def __post_init__(self): # TAI: consider allowing LapMissions for TraverseGoal goals (num_laps ~ num_traversals) assert self.goal.is_specific if self.route_length is None: # TAI: could just assert here, but may want to be more clever... self.route_length = 1
[docs] def is_complete(self, vehicle_state, distance_travelled: float) -> bool: """If the mission has been completed.""" return ( self.goal.is_reached(vehicle_state) and distance_travelled > self.route_length * self.num_laps )
[docs]@dataclass class PlanFrame: """Describes a plan that is serializable.""" road_ids: List[str] mission: Optional[NavigationMission]
[docs] @classmethod def empty(cls: Type[PlanFrame]): """Creates an empty plan frame.""" return cls(road_ids=[], mission=None)
[docs]class Plan: """Describes a navigation plan (route) to fulfill a mission.""" def __init__( self, road_map: RoadMap, mission: Optional[NavigationMission] = None, find_route: bool = True, ): self._road_map = road_map self._mission = mission self._route = None if find_route: self.create_route(mission) @property def route(self) -> Optional[RoadMap.Route]: """The route that this plan calls for.""" return self._route @route.setter def route(self, route: RoadMap.Route): # XXX: traffic simulator may also track route self._route = route @property def mission(self) -> Optional[NavigationMission]: """The mission this plan is meant to fulfill.""" # XXX: This currently can be `None` return self._mission @property def road_map(self) -> RoadMap: """The road map this plan is relative to.""" return self._road_map
[docs] def create_route( self, mission: NavigationMission, start_lane_radius: Optional[float] = None, end_lane_radius: Optional[float] = None, ): """Generates a route that conforms to a mission. Args: mission (Mission): A mission the agent should follow. Defaults to endless if `None`. start_lane_radius (Optional[float]): Radius (meter) to find the nearest starting lane for the given mission. Defaults to a function of `_default_lane_width` of the underlying road_map. end_lane_radius (Optional[float]): Radius (meter) to find the nearest ending lane for the given mission. Defaults to a function of `_default_lane_width` of the underlying road_map. """ assert not self._route or not len( self._route.road_ids ), "Already called create_route()." self._mission = mission or NavigationMission.random_endless_mission( self._road_map ) if not self._mission.requires_route: self._route = self._road_map.empty_route() return assert isinstance(self._mission.goal, PositionalGoal) start_lanes = self._road_map.nearest_lanes( self._mission.start.point, include_junctions=True, radius=start_lane_radius, ) if not start_lanes: self._mission = NavigationMission.endless_mission(Pose.origin()) raise PlanningError("Starting lane not found. Route must start in a lane.") via_roads = [self._road_map.road_by_id(via) for via in self._mission.route_vias] end_lanes = self._road_map.nearest_lanes( self._mission.goal.position, include_junctions=False, radius=end_lane_radius, ) assert end_lanes is not None, "No end lane found. Route must end in a lane." # When an agent is in an intersection, the `nearest_lanes` method might # not return the correct road as the first choice. Hence, nearest # starting lanes are tried in sequence until a route is found or until # all nearby starting lane options are exhausted. for end_lane, _ in end_lanes: for start_lane, _ in start_lanes: self._route = self._road_map.generate_routes( start_lane, end_lane, via_roads, 1 )[0] if self._route.road_length > 0: break if self._route.road_length > 0: break if len(self._route.roads) == 0: self._mission = NavigationMission.endless_mission(Pose.origin()) start_road_ids = [start_lane.road.road_id for start_lane, _ in start_lanes] raise PlanningError( "Unable to find a route between start={} and end={}. If either of " "these are junctions (not well supported today) please switch to " "roads and ensure there is a > 0 offset into the road if it is " "after a junction.".format(start_road_ids, end_lane.road.road_id) ) return self._mission
[docs] def frame(self) -> PlanFrame: """Get the state of this plan.""" assert self._mission return PlanFrame( road_ids=self._route.road_ids if self._route else [], mission=self._mission )
[docs] @classmethod def from_frame(cls, plan_frame: PlanFrame, road_map: RoadMap) -> "Plan": """Generate the plan from a frame.""" new_plan = cls(road_map=road_map, mission=plan_frame.mission, find_route=False) new_plan.route = road_map.route_from_road_ids(plan_frame.road_ids) return new_plan