# Copyright (C) 2020. Huawei Technologies Co., Ltd. All rights reserved.
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NON-INFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
import logging
import warnings
from pathlib import Path
from typing import Dict
from ray.rllib.env.multi_agent_env import MultiAgentEnv
import smarts
from envision import etypes as envision_types
from envision.client import Client as Envision
from smarts.core.local_traffic_provider import LocalTrafficProvider
from smarts.core.scenario import Scenario
from smarts.core.smarts import SMARTS
from smarts.env.utils.action_conversion import ActionOptions, ActionSpacesFormatter
from smarts.env.utils.observation_conversion import (
ObservationOptions,
ObservationSpacesFormatter,
)
from smarts.zoo.agent_spec import AgentSpec
[docs]class RLlibHiWayEnv(MultiAgentEnv):
"""This environment serves as a format to run multiple environments in
parallel. This environment requires a specific configuration.
Args:
config (Dict[str,Any]): An environment configuration dictionary containing the following key value pairs.
agent_specs: Dictionary mapping agent_ids to agent specs. Required.
scenarios: List of scenario directories that will be run. Required.
sim_name: A string to name this simulation. Defaults to None.
envision_record_data_replay_path: Specifies Envision's data replay output directory. Defaults to None.
envision_endpoint: Specifies Envision's uri. Defaults to None.
headless: True|False envision disabled|enabled. Defaults to True.
:spelling:ignore:`num_external_sumo_clients`: Number of SUMO clients beyond SMARTS. Defaults to 0.
seed: Random number generation seed. Defaults to 42.
sumo_auto_start: True|False sumo will start automatically. Defaults to False.
sumo_headless: True|False for `sumo`|`sumo-gui`. Defaults to False.
sumo_port: Specifies sumo port. Defaults to None.
fixed_timestep_sec: Step length for all components of the simulation. Defaults to 0.1 .
"""
def __init__(self, config):
super().__init__()
self._agent_specs: Dict[str, AgentSpec] = config["agent_specs"]
agent_interfaces = {
a_id: spec.interface for a_id, spec in self._agent_specs.items()
}
## ---- Required environment attributes ----
## See ray/rllib/env/multi_agent_env.py
self._agent_ids.update(id_ for id_ in self._agent_specs)
action_options = ActionOptions.multi_agent
self._action_formatter = ActionSpacesFormatter(
agent_interfaces=agent_interfaces, action_options=action_options
)
self.action_space = self._action_formatter.space
assert self.action_space is not None
observation_options = ObservationOptions.multi_agent
self._observations_formatter = ObservationSpacesFormatter(
agent_interfaces=agent_interfaces, observation_options=observation_options
)
self.observation_space = self._observations_formatter.space
assert self.observation_space is not None
self._action_space_in_preferred_format = (
self._check_if_action_space_maps_agent_id_to_sub_space()
)
self._obs_space_in_preferred_format = (
self._check_if_obs_space_maps_agent_id_to_sub_space()
)
assert self._action_space_in_preferred_format is True
## ---- /Required environment attributes ----
self._log = logging.getLogger(name=self.__class__.__name__)
seed = int(config.get("seed", 42))
# See https://docs.ray.io/en/latest/rllib-env.html#configuring-environments
# for context. We combine worker_index and vector_index through the Cantor pairing
# function (https://en.wikipedia.org/wiki/Pairing_function) into a unique integer
# and then add that to seed to both differentiate environment instances and
# preserve determinism.
a = config.worker_index
b = config.vector_index
c = (a + b) * (a + b + 1) // 2 + b
self._seed = seed + c
smarts.core.seed(self._seed + c)
self._scenarios = [
str(Path(scenario).resolve()) for scenario in config["scenarios"]
]
self._scenarios_iterator = Scenario.scenario_variations(
self._scenarios,
list(self._agent_specs.keys()),
)
self._sim_name = config.get("sim_name", None)
self._headless = config.get("headless", True)
self._num_external_sumo_clients = config.get("num_external_sumo_clients", 0)
self._sumo_headless = config.get("sumo_headless", True)
self._sumo_port = config.get("sumo_port")
self._sumo_auto_start = config.get("sumo_auto_start", True)
if "endless_traffic" in config:
self._log.warning(
"The endless_traffic option has been moved into Scenario Studio. Please update your scenario code.",
)
self._envision_endpoint = config.get("envision_endpoint", None)
self._envision_record_data_replay_path = config.get(
"envision_record_data_replay_path", None
)
timestep_sec = config.get("timestep_sec")
if timestep_sec:
warnings.warn(
"timestep_sec has been deprecated in favor of fixed_timestep_sec. Please update your code.",
category=DeprecationWarning,
)
self._fixed_timestep_sec = (
config.get("fixed_timestep_sec") or timestep_sec or 0.1
)
self._smarts = None # Created on env.setup()
self._dones_registered = 0
[docs] def step(self, agent_actions):
"""Environment step"""
agent_actions = {
agent_id: self._agent_specs[agent_id].action_adapter(action)
for agent_id, action in agent_actions.items()
}
assert isinstance(agent_actions, dict) and all(
isinstance(key, str) for key in agent_actions.keys()
), "Expected Dict[str, any]"
formatted_actions = self._action_formatter.format(agent_actions)
env_observations, rewards, dones, extras = self._smarts.step(formatted_actions)
env_observations = self._observations_formatter.format(env_observations)
# Agent termination: RLlib expects that we return a "last observation"
# on the step that an agent transitions to "done". All subsequent calls
# to env.step(..) will no longer contain actions from the "done" agent.
#
# The way we implement this behavior here is to rely on the presence of
# agent actions to filter out all environment observations/rewards/infos
# to only agents who are actively sending in actions.
observations = {
agent_id: obs
for agent_id, obs in env_observations.items()
if agent_id in formatted_actions
}
rewards = {
agent_id: reward
for agent_id, reward in rewards.items()
if agent_id in formatted_actions
}
scores = {
agent_id: score
for agent_id, score in extras["scores"].items()
if agent_id in formatted_actions
}
infos = {
agent_id: {
"score": value,
"reward": rewards[agent_id],
"speed": observations[agent_id]["ego_vehicle_state"]["speed"],
}
for agent_id, value in scores.items()
}
# Ensure all contain the same agent_ids as keys
assert (
agent_actions.keys()
== observations.keys()
== rewards.keys()
== infos.keys()
)
for agent_id in agent_actions:
agent_spec = self._agent_specs[agent_id]
observation = env_observations[agent_id]
reward = rewards[agent_id]
info = infos[agent_id]
observations[agent_id] = agent_spec.observation_adapter(observation)
rewards[agent_id] = agent_spec.reward_adapter(observation, reward)
infos[agent_id] = agent_spec.info_adapter(observation, reward, info)
for done in dones.values():
self._dones_registered += 1 if done else 0
dones["__all__"] = self._dones_registered >= len(self._agent_specs)
return (
observations,
rewards,
dones,
dones,
infos,
)
[docs] def reset(self, *, seed=None, options=None):
"""Environment reset."""
if seed not in (None, 0):
smarts.core.seed(self._seed + (seed or 0))
scenario = next(self._scenarios_iterator)
self._dones_registered = 0
if self._smarts is None:
self._smarts = self._build_smarts()
self._smarts.setup(scenario=scenario)
env_observations = self._smarts.reset(scenario=scenario)
env_observations = self._observations_formatter.format(
observations=env_observations
)
observations = {
agent_id: self._agent_specs[agent_id].observation_adapter(obs)
for agent_id, obs in env_observations.items()
}
info = {
agent_id: {
"score": 0,
"reward": 0,
"env_obs": agent_obs,
"done": False,
"map_source": self._smarts.scenario.road_map.source,
}
for agent_id, agent_obs in observations.items()
}
return observations, info
[docs] def close(self):
"""Environment close."""
if self._smarts is not None:
self._smarts.destroy()
def _build_smarts(self):
agent_interfaces = {
agent_id: spec.interface for agent_id, spec in self._agent_specs.items()
}
envision = None
if not self._headless or self._envision_record_data_replay_path:
envision = Envision(
endpoint=self._envision_endpoint,
sim_name=self._sim_name,
output_dir=self._envision_record_data_replay_path,
headless=self._headless,
)
preamble = envision_types.Preamble(scenarios=self._scenarios)
envision.send(preamble)
traffic_sims = []
if Scenario.any_support_sumo_traffic(self._scenarios):
from smarts.core.sumo_traffic_simulation import SumoTrafficSimulation
sumo_traffic = SumoTrafficSimulation(
headless=self._sumo_headless,
time_resolution=self._fixed_timestep_sec,
num_external_sumo_clients=self._num_external_sumo_clients,
sumo_port=self._sumo_port,
auto_start=self._sumo_auto_start,
)
traffic_sims += [sumo_traffic]
smarts_traffic = LocalTrafficProvider()
traffic_sims += [smarts_traffic]
sim = SMARTS(
agent_interfaces=agent_interfaces,
traffic_sims=traffic_sims,
envision=envision,
fixed_timestep_sec=self._fixed_timestep_sec,
)
return sim