Source code for smarts.core.signal_provider

# Copyright (C) 2022. Huawei Technologies Co., Ltd. All rights reserved.
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NON-INFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
from typing import Dict, Iterable, Optional, Set, Tuple

from .actor import ActorRole, ActorState
from .controllers import ActionSpaceType
from .provider import Provider, ProviderManager, ProviderRecoveryFlags, ProviderState
from .road_map import RoadMap
from .scenario import Scenario
from .signals import SignalLightState, SignalState


[docs]class SignalProvider(Provider): """A SignalProvider manages traffic light signals.""" def __init__(self): self._my_signals: Dict[str, SignalState] = dict() # start with the default recovery flags... self._recovery_flags = super().recovery_flags self._road_map = None @property def recovery_flags(self) -> ProviderRecoveryFlags: return self._recovery_flags @recovery_flags.setter def recovery_flags(self, flags: ProviderRecoveryFlags): self._recovery_flags = flags @property def actions(self) -> Set[ActionSpaceType]: # TAI: could allow for agents to hijack things like signals as well. # (Then the action space could just be the desired SignalLightState.) return set()
[docs] def set_manager(self, manager: ProviderManager): pass
@property def _provider_state(self) -> ProviderState: return ProviderState(actors=list(self._my_signals.values()))
[docs] def setup(self, scenario: Scenario) -> ProviderState: self._road_map = scenario.road_map # we only need to control map signals that won't be controlled by other providers . # XXX: we assume that if there is a history provider, it will provide the signal state, # XXX: and if sumo traffic is supported then SumoTrafficSimulation is being used and it will provide state for Sumo signals. if scenario.traffic_history is None and not scenario.supports_sumo_traffic: for feature in self._road_map.dynamic_features: if feature.type == RoadMap.FeatureType.FIXED_LOC_SIGNAL: feature_lane = feature.type_specific_info controlled_lanes = [feature_lane.lane_id] self._my_signals[feature.feature_id] = SignalState( actor_id=feature.feature_id, actor_type="signal", source=self.source_str, role=ActorRole.Signal, state=SignalLightState.UNKNOWN, stopping_pos=feature.geometry[0], controlled_lanes=controlled_lanes, last_changed=None, ) return self._provider_state
[docs] def step(self, actions, dt: float, elapsed_sim_time: float) -> ProviderState: # TODO: update state of signals we control here based on some reasonable/simple default program return self._provider_state
[docs] def sync(self, provider_state: ProviderState): for actor_state in provider_state.actors: if actor_state.actor_id in self._my_signals: assert isinstance(actor_state, SignalState) self._my_signals[actor_state.actor_id] = actor_state
[docs] def can_accept_actor(self, state: ActorState) -> bool: return isinstance(state, SignalState)
[docs] def add_actor( self, provider_actor: ActorState, from_provider: Optional[Provider] = None ): assert isinstance(provider_actor, SignalState) self._my_signals[provider_actor.actor_id] = provider_actor
[docs] def reset(self): pass
[docs] def teardown(self): self._my_signals = dict()
[docs] def manages_actor(self, actor_id: str) -> bool: return actor_id in self._my_signals
[docs] def stop_managing(self, actor_id: str): if actor_id in self._my_signals: del self._my_signals[actor_id]
@property def actor_ids(self) -> Iterable[str]: """A set of actors that this provider manages. Returns: Iterable[str]: The actors this provider manages. """ return set(vs.actor_id for vs in self._my_signals.values())