Source code for smarts.core.traffic_provider

# Copyright (C) 2022. Huawei Technologies Co., Ltd. All rights reserved.
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.


from typing import Optional

from shapely.geometry import Polygon

from .provider import Provider
from .road_map import RoadMap


[docs]class TrafficProvider(Provider): """A TrafficProvider is a Provider that controls/owns a (sub)set of vehicles that all share the same action space."""
[docs] def reserve_traffic_location_for_vehicle( self, vehicle_id: str, reserved_location: Polygon, ): """Reserve an area around a location where vehicles cannot spawn until a given vehicle is added. Args: vehicle_id: The vehicle to wait for. reserved_location: The space the vehicle takes up. """ raise NotImplementedError
[docs] def vehicle_collided(self, vehicle_id: str): """Called when a vehicle this provider manages is detected to have collided with any other vehicles in the scenario.""" raise NotImplementedError
[docs] def update_route_for_vehicle(self, vehicle_id: str, new_route: RoadMap.Route): """Set a new route for the given vehicle.""" raise NotImplementedError
[docs] def vehicle_dest_road(self, vehicle_id: str) -> Optional[str]: """Get the final road_id in the route of the given vehicle.""" raise NotImplementedError
[docs] def route_for_vehicle(self, vehicle_id: str) -> Optional[RoadMap.Route]: """Gets the current Route for the specified vehicle, if known.""" return None
[docs] def destroy(self): """Clean up any connections/resources.""" raise NotImplementedError