Source code for smarts.sstudio.generators

# Copyright (C) 2020. Huawei Technologies Co., Ltd. All rights reserved.
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NON-INFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
"""
.. spelling:word-list::

    idx

"""
import logging
import os
import random
import subprocess
import tempfile
from typing import Optional

import numpy as np
from yattag import Doc, indent

from smarts.core.road_map import RoadMap
from smarts.core.utils.core_math import wrap_value
from smarts.core.utils.file import make_dir_in_smarts_log_dir, replace

from . import sstypes

SECONDS_PER_HOUR_INV = 1 / 60 / 60


[docs]class InvalidRoute(Exception): """An exception given if a route cannot be successfully plotted.""" pass
[docs]class RandomRouteGenerator: """Generates a random route out of the routes available in the road map. Args: road_map: A network of routes defined for vehicles of different kinds to travel on. """ def __init__(self, road_map: RoadMap): self._log = logging.getLogger(self.__class__.__name__) self._road_map = road_map def __iter__(self): return self def __next__(self): """Provides the next random route.""" def random_lane_index(road_id: str) -> int: lanes = self._road_map.road_by_id(road_id).lanes return random.randint(0, len(lanes) - 1) def random_lane_offset(road_id: str, lane_idx: int) -> float: lane = self._road_map.road_by_id(road_id).lanes[lane_idx] return random.uniform(0, lane.length) # HACK: loop + continue is a temporary solution so we more likely return a valid # route. In future we need to be able to handle random routes that are just # a single road long. for _ in range(100): route = self._road_map.random_route(max_route_len=10) if len(route.roads) < 2: continue start_road_id = route.roads[0].road_id start_lane_index = random_lane_index(start_road_id) start_lane_offset = random_lane_offset(start_road_id, start_lane_index) end_road_id = route.roads[-1].road_id end_lane_index = random_lane_index(end_road_id) end_lane_offset = random_lane_offset(end_road_id, end_lane_index) return sstypes.Route( begin=(start_road_id, start_lane_index, start_lane_offset), via=tuple(road.road_id for road in route.roads[1:-1]), end=(end_road_id, end_lane_index, end_lane_offset), ) raise InvalidRoute( "Unable to generate a valid random route that contains \ at least two roads." )
[docs]class TrafficGenerator: """Generates traffic from scenario information.""" def __init__( self, scenario_dir: str, scenario_map_spec: Optional[sstypes.MapSpec], log_dir: Optional[str] = None, overwrite: bool = False, ): """ Args: scenario: The path to the scenario directory. scenario_map_spec: The map spec information. log_dir: Where logging information about traffic planning should be written to. overwrite: Whether to overwrite existing traffic information. """ self._log = logging.getLogger(self.__class__.__name__) self._scenario = scenario_dir self._overwrite = overwrite self._scenario_map_spec = scenario_map_spec self._road_network_path = os.path.join(self._scenario, "map.net.xml") if scenario_map_spec and scenario_map_spec.source: if os.path.isfile(scenario_map_spec.source): self._road_network_path = scenario_map_spec.source elif os.path.exists(scenario_map_spec.source): self._road_network_path = os.path.join( scenario_map_spec.source, "map.net.xml" ) self._road_network = None self._random_route_generator = None self._log_dir = self._resolve_log_dir(log_dir)
[docs] def plan_and_save( self, traffic: sstypes.Traffic, name: str, output_dir: Optional[str] = None, seed: int = 42, ): """Writes a traffic spec to a route file in the given output_dir. traffic: The traffic to write out name: The name of the traffic resource file output_dir: The output directory of the traffic file seed: The seed used for deterministic random calls """ random.seed(seed) ext = "smarts" if traffic.engine == "SMARTS" else "rou" route_path = os.path.join(output_dir, "{}.{}.xml".format(name, ext)) if os.path.exists(route_path): if self._overwrite: self._log.info( f"Routes at routes={route_path} already exist, overwriting" ) else: self._log.info(f"Routes at routes={route_path} already exist, skipping") return None if traffic.engine == "SMARTS": self._writexml(traffic, True, route_path) return route_path assert ( traffic.engine == "SUMO" ), f"Unsupported traffic engine specified: {traffic.engine}" with tempfile.TemporaryDirectory() as temp_dir: trips_path = os.path.join(temp_dir, "trips.trips.xml") self._writexml(traffic, False, trips_path) route_alt_path = os.path.join(output_dir, "{}.rou.alt.xml".format(name)) scenario_name = os.path.basename(os.path.normpath(output_dir)) log_path = f"{self._log_dir}/{scenario_name}" os.makedirs(log_path, exist_ok=True) from smarts.core.utils.sumo_utils import sumolib int32_limits = np.iinfo(np.int32) duarouter_path = sumolib.checkBinary("duarouter") subprocess.check_call( [ duarouter_path, "--unsorted-input", "true", "--net-file", self.road_network.source, "--route-files", trips_path, "--output-file", route_path, "--seed", str(wrap_value(seed, int32_limits.min, int32_limits.max)), "--ignore-errors", "false", "--no-step-log", "true", "--repair", "true", "--error-log", f"{log_path}/{name}.log", ], stderr=subprocess.DEVNULL, # suppress warnings from duarouter stdout=subprocess.DEVNULL, # suppress success messages from duarouter ) # Remove the rou.alt.xml file if os.path.exists(route_alt_path): os.remove(route_alt_path) return route_path
def _writexml( self, traffic: sstypes.Traffic, fill_in_route_gaps: bool, route_path: str ): """Writes a traffic spec into a route file. Typically this would be the source data to Sumo's DUAROUTER. """ doc = Doc() doc.asis('<?xml version="1.0" encoding="UTF-8"?>') with doc.tag( "routes", ("xmlns:xsi", "http://www.w3.org/2001/XMLSchema-instance"), ("xsi:noNamespaceSchemaLocation", "http://sumo.sf.net/xsd/routes_file.xsd"), ): # Actors and routes may be declared once then reused. To prevent creating # duplicates we unique them here. actors_for_vtypes = { actor for flow in traffic.flows for actor in flow.actors.keys() } if traffic.trips: actors_for_vtypes |= {trip.actor for trip in traffic.trips} vehicle_id_set = {trip.vehicle_name for trip in traffic.trips} vehicle_ids_list = [trip.vehicle_name for trip in traffic.trips] if len(vehicle_id_set) != len(vehicle_ids_list): raise ValueError("Repeated single vehicle names is not allowed.") for actor in actors_for_vtypes: sigma = min(1, max(0, actor.imperfection.sample())) # range [0,1] min_gap = max(0, actor.min_gap.sample()) # range >= 0 doc.stag( "vType", id=actor.id, accel=actor.accel, decel=actor.decel, vClass=actor.vehicle_type, speedFactor=actor.speed.mean, speedDev=actor.speed.sigma, sigma=sigma, minGap=min_gap, maxSpeed=actor.max_speed, **actor.lane_changing_model, **actor.junction_model, ) # Make sure all routes are "resolved" (e.g. `RandomRoute` are converted to # `Route`) so that we can write them all to file. resolved_routes = {} for route in {flow.route for flow in traffic.flows}: resolved_routes[route] = self.resolve_route(route, fill_in_route_gaps) for route in set(resolved_routes.values()): doc.stag("route", id=route.id, edges=" ".join(route.roads)) # We don't de-dup flows since defining the same flow multiple times should # create multiple traffic flows. Since IDs can't be reused, we also unique # them here. for flow_idx, flow in enumerate(traffic.flows): total_weight = sum(flow.actors.values()) route = resolved_routes[flow.route] for actor_idx, (actor, weight) in enumerate(flow.actors.items()): vehs_per_hour = flow.rate * (weight / total_weight) rate_option = {} if flow.randomly_spaced: vehs_per_sec = vehs_per_hour * SECONDS_PER_HOUR_INV rate_option = dict(probability=vehs_per_sec) else: rate_option = dict(vehsPerHour=vehs_per_hour) doc.stag( "flow", # have to encode the flow.repeat_route within the vehcile id b/c # duarouter complains about any additional xml tags or attributes. id="{}-{}{}-{}-{}".format( actor.name, flow.id, "-endless" if flow.repeat_route else "", flow_idx, actor_idx, ), type=actor.id, route=route.id, departLane=route.begin[1], departPos=route.begin[2], departSpeed=actor.depart_speed, arrivalLane=route.end[1], arrivalPos=route.end[2], begin=flow.begin, end=flow.end, **rate_option, ) # write trip into xml format if traffic.trips: self.write_trip_xml(traffic, doc, fill_in_route_gaps) with open(route_path, "w") as f: f.write( indent( doc.getvalue(), indentation=" ", newline="\r\n", indent_text=True ) )
[docs] def write_trip_xml(self, traffic, doc, fill_in_gaps): """Writes a trip spec into a route file. Typically this would be the source data to SUMO's DUAROUTER. """ # Make sure all routes are "resolved" (e.g. `RandomRoute` are converted to # `Route`) so that we can write them all to file. resolved_routes = {} for route in {trip.route for trip in traffic.trips}: resolved_routes[route] = self.resolve_route(route, fill_in_gaps) for route in set(resolved_routes.values()): doc.stag("route", id=route.id + "trip", edges=" ".join(route.roads)) # We don't de-dup flows since defining the same flow multiple times should # create multiple traffic flows. Since IDs can't be reused, we also unique # them here. for trip_idx, trip in enumerate(traffic.trips): route = resolved_routes[trip.route] actor = trip.actor doc.stag( "vehicle", id="{}".format(trip.vehicle_name), type=actor.id, route=route.id + "trip", depart=trip.depart, departLane=route.begin[1], departPos=route.begin[2], departSpeed=actor.depart_speed, arrivalLane=route.end[1], arrivalPos=route.end[2], )
def _cache_road_network(self): if not self._road_network: from smarts.core.sumo_road_network import SumoRoadNetwork map_spec = sstypes.MapSpec(self._road_network_path) self._road_network = SumoRoadNetwork.from_spec(map_spec)
[docs] def resolve_edge_length(self, edge_id, lane_idx): """Determine the length of the given lane on an edge. Args: edge_id: The edge id of the road segment. lane_idx: The index of a lane. Returns: The length of the lane (same as the length of the edge.) """ self._cache_road_network() lane = self._road_network.road_by_id(edge_id).lanes[lane_idx] return lane.length
def _map_for_route(self, route) -> RoadMap: map_spec = route.map_spec if not map_spec: # XXX: Spacing is crudely "large enough" so we're less likely overlap vehicles lp_spacing = 2.0 if self._scenario_map_spec: map_spec = replace( self._scenario_map_spec, lanepoint_spacing=lp_spacing ) else: map_spec = sstypes.MapSpec( self._road_network_path, lanepoint_spacing=lp_spacing ) road_map, _ = map_spec.builder_fn(map_spec) return road_map def _fill_in_gaps(self, route: sstypes.Route) -> sstypes.Route: # TODO: do this at runtime so each vehicle on the flow can take a different variation of the route ? # TODO: or do it like SUMO and generate a huge *.rou.xml file instead ? road_map = self._map_for_route(route) start_road = road_map.road_by_id(route.begin[0]) end_road = road_map.road_by_id(route.end[0]) vias = [road_map.road_by_id(via) for via in route.via] routes = road_map.generate_routes(start_road, end_road, vias) if not routes: raise InvalidRoute( "Could not find route that starts at {route.begin[0]}, ends at {route.end[0]} and includes {route.vias}." ) return replace( route, via=tuple((road.road_id for road in routes[0].roads[1:-1])) )
[docs] def resolve_route(self, route, fill_in_gaps: bool) -> sstypes.Route: """Attempts to fill in the route between the beginning and end specified in the initial route. Args: route: An incomplete route. Returns: smarts.sstudio.sstypes.route.Route: A complete route listing all road segments it passes through. """ if not isinstance(route, sstypes.RandomRoute): return self._fill_in_gaps(route) if fill_in_gaps else route if not self._random_route_generator: road_map = self._map_for_route(route) # Lazy-load to improve performance when not using random route generation. self._random_route_generator = RandomRouteGenerator(road_map) return next(self._random_route_generator)
@property def road_network(self): """Retrieves the road network this generator is associated with.""" self._cache_road_network() return self._road_network def _resolve_log_dir(self, log_dir): if log_dir is None: log_dir = make_dir_in_smarts_log_dir("_duarouter_routing") return os.path.abspath(log_dir)