Source code for smarts.env.gymnasium.wrappers.limit_relative_target_pose

# Copyright (C) 2023. Huawei Technologies Co., Ltd. All rights reserved.
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.

import logging
from typing import Any, Dict, Tuple

import gymnasium as gym
import numpy as np

logger = logging.getLogger(__file__)
logger.setLevel(logging.WARNING)


[docs]class LimitRelativeTargetPose(gym.Wrapper): """Limits the delta-x and delta-y in the RelativeTargetPose action space.""" def __init__(self, env: gym.Env): """ Args: env (gym.Env): Environment to be wrapped. """ super().__init__(env) self._time_delta = 0.1 self._speed_max = 22.22 # Units: m/s. Equivalent to 80 km/h. self._dist_max = self._speed_max * self._time_delta
[docs] def step( self, action: Dict[str, np.ndarray] ) -> Tuple[ Dict[str, Any], Dict[str, float], Dict[str, bool], Dict[str, bool], Dict[str, Dict[str, Any]], ]: """Steps the environment. Args: action (Dict[str, np.ndarray]): Action for each agent. Returns: Tuple[ Dict[str, Any], Dict[str, float], Dict[str, bool], Dict[str, bool], Dict[str, Dict[str, Any]] ]: Observation, reward, terminated, truncated, and info, for each agent is returned. """ limited_actions: Dict[str, np.ndarray] = {} for agent_name, agent_action in action.items(): limited_actions[agent_name] = self._limit( name=agent_name, action=agent_action, ) out = self.env.step(limited_actions) return out
def _limit( self, name: str, action: np.ndarray, ) -> np.ndarray: """Limit Euclidean distance travelled in RelativeTargetPose action space. Args: name (str): Agent's name. action (np.ndarray): Agent's action. Returns: np.ndarray: Agent's RelativeTargetPose action with constrained delta-x and delta-y coordinates. """ limited_action = np.array([action[0], action[1], action[2]], dtype=np.float32) vector = action[:2] dist = np.linalg.norm(vector) if dist > self._dist_max: unit_vector = vector / dist limited_action[0], limited_action[1] = self._dist_max * unit_vector logger.warning( "Action out of bounds. `%s`: Allowed max speed=%sm/s, but got speed=%sm/s. " "Action changed from %s to %s.", name, self._speed_max, dist / self._time_delta, action, limited_action, ) return limited_action