Source code for smarts.p3d.renderer

# Copyright (C) 2020. Huawei Technologies Co., Ltd. All rights reserved.
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.

# to allow for typing to refer to class being defined (Renderer)
from __future__ import annotations

import importlib.resources as pkg_resources
import itertools
import logging
import math
import os
import re
import warnings
from dataclasses import dataclass
from functools import lru_cache
from pathlib import Path
from threading import Lock
from typing import TYPE_CHECKING, Collection, Dict, Literal, Optional, Tuple, Union

import gltf
import numpy as np
from direct.showbase.ShowBase import ShowBase

# pytype: disable=import-error
from panda3d.core import (

from smarts.core import glsl
from smarts.core.colors import Colors, SceneColors
from smarts.core.coordinates import Point, Pose
from smarts.core.masks import RenderMasks
from smarts.core.renderer_base import (
from smarts.core.scenario import Scenario as StaticScenario
from smarts.core.shader_buffer import BufferID
from smarts.core.signals import SignalState, signal_state_to_color
from smarts.core.simulation_frame import SimulationFrame
from smarts.core.utils.core_logging import suppress_output
from smarts.core.vehicle_state import VehicleState

    from smarts.core.agent_interface import AgentInterface
    from smarts.core.observations import Observation

# pytype: enable=import-error


class _ShowBaseInstance(ShowBase):
    """Wraps a singleton instance of ShowBase from Panda3D."""

    _rendering_backend: BACKEND_LITERALS = "pandagl"

    def __new__(cls):
        # Singleton pattern:  ensure only 1 ShowBase instance
        if "__it__" not in cls.__dict__:
            if cls._debug_mode <= DEBUG_MODE.INFO:
                loadPrcFileData("", "gl-debug #t")
                f"load-display {cls._rendering_backend}",
            loadPrcFileData("", "aux-display pandagl")
            loadPrcFileData("", "aux-display pandadx9")
            loadPrcFileData("", "aux-display pandadx8")
            loadPrcFileData("", "aux-display pandagles")
            loadPrcFileData("", "aux-display pandagles2")
            loadPrcFileData("", "aux-display p3headlessgl")
            loadPrcFileData("", "aux-display p3tinydisplay")

            # disable vsync otherwise we are limited to refresh-rate of screen
            loadPrcFileData("", "sync-video false")
            loadPrcFileData("", "model-path %s" % os.getcwd())
            # TODO: the following speeds up rendering a bit... might consider it.
            # loadPrcFileData("", "model-cache-dir %s/.panda3d_cache" % os.getcwd())
            loadPrcFileData("", "audio-library-name null")
            loadPrcFileData("", "gl-version 3 3")
            loadPrcFileData("", f"notify-level {}")
                "", f"default-directnotify-level {}"
            loadPrcFileData("", "print-pipe-types false")
            # loadPrcFileData("", "basic-shaders-only #t")
            # loadPrcFileData('', 'threading-model Cull/Draw')
            # have makeTextureBuffer create a visible window
            # loadPrcFileData('', 'show-buffers true')
        it = cls.__dict__.get("__it__")
        if it is None:
            cls.__it__ = it = object.__new__(cls)
        return it

    def __init__(self):
        pass  # singleton pattern, uses init() instead (don't call super().__init__() here!)

    def init(self):
        """Initializer for the purposes of maintaining a singleton of this class."""
        self._render_lock = Lock()
            # There can be only 1 ShowBase instance at a time.

            self.setBackgroundColor(0, 0, 0, 1)

            # Displayed framerate is misleading since we are not using a realtime clock

        except Exception as e:
            raise e

    def set_rendering_verbosity(cls, debug_mode: DEBUG_MODE):
        """Set rendering debug information verbosity."""
        cls._debug_mode = debug_mode
        loadPrcFileData("", f"notify-level {}")
            "", f"default-directnotify-level {}"

    def set_rendering_backend(
        rendering_backend: BACKEND_LITERALS,
        """Sets the rendering backend."""
        if "__it__" not in cls.__dict__:
            cls._rendering_backend = rendering_backend
            if cls._rendering_backend != rendering_backend:
                warnings.warn("Cannot apply rendering backend after setup.")

    def destroy(self):
        """Destroy this renderer and clean up all remaining resources."""
        self.__class__.__it__ = None

    def __del__(self):
        except (AttributeError, TypeError):

    def setup_sim_root(self, simid: str):
        """Creates the simulation root node in the scene graph."""
        root_np = NodePath(simid)
        with self._render_lock:
        with pkg_resources.path(
            glsl, "unlit_shader.vert"
        ) as vshader_path, pkg_resources.path(
            glsl, "unlit_shader.frag"
        ) as fshader_path:
            unlit_shader = Shader.load(
            root_np.setShader(unlit_shader, priority=10)
        return root_np

    def render_node(self, sim_root: NodePath):
        """Render a panda3D scene graph from the given node."""
        # Hack to prevent other SMARTS instances from also rendering
        # when we call poll() here.
        hidden = []
        with self._render_lock:
            for node_path in self.render.children:
                if node_path != sim_root and not node_path.isHidden():
            for node_path in hidden:

class _P3DCameraMixin:

    camera_np: NodePath
    buffer: GraphicsOutput
    tex: Texture

    def wait_for_ram_image(self, img_format: str, retries=100):
        """Attempt to acquire a graphics buffer."""
        # Rarely, we see dropped frames where an image is not available
        # for our observation calculations.
        # We've seen this happen fairly reliable when we are initializing
        # a multi-agent + multi-instance simulation.
        # To deal with this, we can try to force a render and block until
        # we are fairly certain we have an image in ram to return to the user
        for i in range(retries):
            if self.tex.mightHaveRamImage():
            getattr(self, "renderer").log.debug(
                f"No image available (attempt {i}/{retries}), forcing a render"
            region = self.buffer.getDisplayRegion(0)

        assert self.tex.mightHaveRamImage()
        ram_image = self.tex.getRamImageAs(img_format)
        assert ram_image is not None
        return ram_image

    def image_dimensions(self):
        """The dimensions of the output camera image."""
        return (self.tex.getXSize(), self.tex.getYSize())

    def position(self) -> Tuple[float, float, float]:
        """The position of the camera."""
        raise NotImplementedError()

    def padding(self) -> Tuple[int, int, int, int]:
        """The padding on the image. This follows the "css" convention: (top, left, bottom, right)."""
        return self.tex.getPadYSize(), self.tex.getPadXSize(), 0, 0

    def heading(self) -> float:
        """The heading of this camera."""
        return np.radians(self.camera_np.getH())

    def teardown(self):
        """Clean up internal resources."""
        region = self.buffer.getDisplayRegion(0)
        getattr(self, "renderer").remove_buffer(self.buffer)

[docs]@dataclass class P3DOffscreenCamera(_P3DCameraMixin, OffscreenCamera): """A camera used for rendering images to a graphics buffer."""
[docs] def update(self, pose: Pose, height: float, *args, **kwargs): """Update the location of the camera. Args: pose: The pose of the camera target. height: The height of the camera above the camera target. """ pos, heading = pose.as_panda3d() self.camera_np.setPos(pos[0], pos[1], height) self.camera_np.lookAt(*pos) self.camera_np.setH(heading)
@property def position(self) -> Tuple[float, float, float]: return self.camera_np.getPos()
class _BufferAccessor: _static_methods = {} _acceleration_set = { BufferID.EGO_VEHICLE_STATE_LINEAR_ACCELERATION, BufferID.EGO_VEHICLE_STATE_ANGULAR_ACCELERATION, } _jerk_set = { BufferID.EGO_VEHICLE_STATE_LINEAR_JERK, BufferID.EGO_VEHICLE_STATE_ANGULAR_JERK, } _waypoints_set = { BufferID.WAYPOINT_PATHS_POSITION, BufferID.WAYPOINT_PATHS_HEADING, BufferID.WAYPOINT_PATHS_LANE_ID, BufferID.WAYPOINT_PATHS_LANE_INDEX, BufferID.WAYPOINT_PATHS_LANE_OFFSET, BufferID.WAYPOINT_PATHS_LANE_WIDTH, BufferID.WAYPOINT_PATHS_SPEED_LIMIT, } _road_waypoints_set = { BufferID.ROAD_WAYPOINTS_POSITION, BufferID.ROAD_WAYPOINTS_HEADING, BufferID.ROAD_WAYPOINTS_LANE_ID, BufferID.ROAD_WAYPOINTS_LANE_INDEX, BufferID.ROAD_WAYPOINTS_LANE_OFFSET, BufferID.ROAD_WAYPOINTS_LANE_WIDTH, BufferID.ROAD_WAYPOINTS_LANE_INDEX, BufferID.ROAD_WAYPOINTS_SPEED_LIMIT, } _via_near_set = { BufferID.VIA_DATA_NEAR_VIA_POINTS_HIT, BufferID.VIA_DATA_NEAR_VIA_POINTS_LANE_INDEX, BufferID.VIA_DATA_NEAR_VIA_POINTS_POSITION, BufferID.VIA_DATA_NEAR_VIA_POINTS_REQUIRED_SPEED, BufferID.VIA_DATA_NEAR_VIA_POINTS_ROAD_ID, } _lidar_set = { BufferID.LIDAR_POINT_CLOUD_DIRECTION, BufferID.LIDAR_POINT_CLOUD_HITS, BufferID.LIDAR_POINT_CLOUD_ORIGIN, BufferID.LIDAR_POINT_CLOUD_POINTS, } _signals_set = { # BufferID.SIGNALS_CONTROLLED_LANES, BufferID.SIGNALS_LAST_CHANGED, BufferID.SIGNALS_LIGHT_STATE, BufferID.SIGNALS_STOP_POINT, } def __init__(self) -> None: self._memos = {} def should_get_data(self, buffer_id: BufferID, observation: Observation): """If the buffer can and should get data from the observation.""" if ( buffer_id in self._acceleration_set and observation.ego_vehicle_state.linear_acceleration is None ): return False elif ( buffer_id in self._jerk_set and observation.ego_vehicle_state.linear_jerk is None ): return False elif buffer_id in self._waypoints_set and ( observation.waypoint_paths is None or len(observation.waypoint_paths) == 0 ): return False elif buffer_id in self._road_waypoints_set and ( observation.road_waypoints is None or len(observation.road_waypoints.lanes) == 0 ): return False elif ( buffer_id in self._via_near_set and len(observation.via_data.near_via_points) == 0 ): return False elif buffer_id in self._signals_set and ( observation.signals is None or len(observation.signals) > 0 ): return False elif buffer_id in self._lidar_set and (observation.lidar_point_cloud is None): return False return True def get_data_for_buffer(self, buffer_id: BufferID, observation: Observation): """Retrieve the data buffer from the observation.""" if len(self._static_methods) == 0: self._gen_methods_for_buffer() return self._static_methods.get(buffer_id, lambda o, m: None)( observation, self._get_memo_for_buffer(buffer_id) ) def _get_memo_for_buffer(self, buffer_id: BufferID): if buffer_id in self._waypoints_set: return self._waypoint_paths_flattened elif buffer_id in self._road_waypoints_set: return self._road_waypoints_flattened return None @lru_cache(maxsize=1) def _road_waypoints_flattened(self, o: Observation): return [ wp for wp in itertools.chain( *(wpl for wpl in itertools.chain(*o.road_waypoints.lanes.values())) ) ] @lru_cache(maxsize=1) def _waypoint_paths_flattened(self, o: Observation): return [wp for wp in itertools.chain(*o.waypoint_paths)] @classmethod def _gen_methods_for_buffer(cls): cls._static_methods[BufferID.DELTA_TIME] = lambda o, m: o.dt cls._static_methods[BufferID.ELAPSED_SIM_TIME] = lambda o, m: o.elapsed_sim_time cls._static_methods[BufferID.EGO_VEHICLE_STATE_HEADING] = lambda o, m: float( o.ego_vehicle_state.heading ) cls._static_methods[ BufferID.EGO_VEHICLE_STATE_SPEED ] = lambda o, m: o.ego_vehicle_state.speed cls._static_methods[ BufferID.EGO_VEHICLE_STATE_YAW_RATE ] = lambda o, m: o.ego_vehicle_state.yaw_rate cls._static_methods[ BufferID.EGO_VEHICLE_STATE_STEERING ] = lambda o, m: o.ego_vehicle_state.steering cls._static_methods[ BufferID.EGO_VEHICLE_STATE_LANE_INDEX ] = lambda o, m: o.ego_vehicle_state.yaw_rate cls._static_methods[ BufferID.DISTANCE_TRAVELLED ] = lambda o, m: o.distance_travelled cls._static_methods[BufferID.STEP_COUNT] = lambda o, m: o.step_count cls._static_methods[BufferID.STEPS_COMPLETED] = lambda o, m: o.steps_completed cls._static_methods[BufferID.VEHICLE_TYPE] = ( lambda o, m: hash(o.ego_vehicle_state.mission.vehicle_spec.veh_config_type) if o.ego_vehicle_state.mission.vehicle_spec else -1 ) cls._static_methods[BufferID.EVENTS_COLLISIONS] = lambda o, m: len( ) cls._static_methods[BufferID.EVENTS_OFF_ROAD] = lambda o, m: int( ) cls._static_methods[BufferID.EVENTS_OFF_ROUTE] = lambda o, m: int( ) cls._static_methods[BufferID.EVENTS_ON_SHOULDER] = lambda o, m: int( ) cls._static_methods[BufferID.EVENTS_WRONG_WAY] = lambda o, m: int( ) cls._static_methods[BufferID.EVENTS_NOT_MOVING] = lambda o, m: int( ) cls._static_methods[BufferID.EVENTS_REACHED_GOAL] = lambda o, m: int( ) cls._static_methods[ BufferID.EVENTS_REACHED_MAX_EPISODE_STEPS ] = lambda o, m: int( cls._static_methods[BufferID.EVENTS_AGENTS_ALIVE_DONE] = lambda o, m: int( ) cls._static_methods[BufferID.EVENTS_INTEREST_DONE] = lambda o, m: int( ) cls._static_methods[BufferID.UNDER_THIS_VEHICLE_CONTROL] = lambda o, m: int( o.under_this_agent_control ) cls._static_methods[ BufferID.EGO_VEHICLE_STATE_POSITION ] = lambda o, m: o.ego_vehicle_state.position cls._static_methods[ BufferID.EGO_VEHICLE_STATE_BOUNDING_BOX ] = lambda o, m: o.ego_vehicle_state.bounding_box.as_lwh cls._static_methods[ BufferID.EGO_VEHICLE_STATE_LANE_POSITION ] = lambda o, m: tuple(o.ego_vehicle_state.lane_position) cls._static_methods[ BufferID.EGO_VEHICLE_STATE_LINEAR_VELOCITY ] = lambda o, m: o.ego_vehicle_state.linear_velocity cls._static_methods[ BufferID.EGO_VEHICLE_STATE_ANGULAR_VELOCITY ] = lambda o, m: o.ego_vehicle_state.angular_velocity cls._static_methods[ BufferID.EGO_VEHICLE_STATE_LINEAR_ACCELERATION ] = lambda o, m: o.ego_vehicle_state.linear_acceleration cls._static_methods[ BufferID.EGO_VEHICLE_STATE_ANGULAR_ACCELERATION ] = lambda o, m: o.ego_vehicle_state.angular_acceleration cls._static_methods[ BufferID.EGO_VEHICLE_STATE_LINEAR_JERK ] = lambda o, m: o.ego_vehicle_state.linear_jerk cls._static_methods[ BufferID.EGO_VEHICLE_STATE_ANGULAR_JERK ] = lambda o, m: o.ego_vehicle_state.angular_jerk cls._static_methods[BufferID.EGO_VEHICLE_STATE_ROAD_ID] = lambda o, m: hash( o.ego_vehicle_state.road_id ) cls._static_methods[BufferID.EGO_VEHICLE_STATE_LANE_ID] = lambda o, m: hash( o.ego_vehicle_state.lane_id ) # XXX: Float cast is sometimes needed because Panda3D reacts badly to non-standard types like numpy float64. cls._static_methods[ BufferID.NEIGHBORHOOD_VEHICLE_STATES_POSITION ] = lambda o, m: [vs.position for vs in o.neighborhood_vehicle_states] cls._static_methods[ BufferID.NEIGHBORHOOD_VEHICLE_STATES_BOUNDING_BOX ] = lambda o, m: [ vs.bounding_box.as_lwh for vs in o.neighborhood_vehicle_states ] cls._static_methods[ BufferID.NEIGHBORHOOD_VEHICLE_STATES_LANE_POSITION ] = lambda o, m: [ tuple(float(v) for v in vs.lane_position) if vs.lane_position is not None else (-1.0, -1.0, -1.0) for vs in o.neighborhood_vehicle_states ] cls._static_methods[ BufferID.NEIGHBORHOOD_VEHICLE_STATES_HEADING ] = lambda o, m: [float(vs.heading) for vs in o.neighborhood_vehicle_states] cls._static_methods[BufferID.NEIGHBORHOOD_VEHICLE_STATES_SPEED] = lambda o, m: [ vs.speed for vs in o.neighborhood_vehicle_states ] cls._static_methods[ BufferID.NEIGHBORHOOD_VEHICLE_STATES_ROAD_ID ] = lambda o, m: [hash(vs.road_id) for vs in o.neighborhood_vehicle_states] cls._static_methods[ BufferID.NEIGHBORHOOD_VEHICLE_STATES_LANE_ID ] = lambda o, m: [hash(vs.lane_id) for vs in o.neighborhood_vehicle_states] cls._static_methods[ BufferID.NEIGHBORHOOD_VEHICLE_STATES_LANE_INDEX ] = lambda o, m: [vs.lane_index for vs in o.neighborhood_vehicle_states] cls._static_methods[ BufferID.NEIGHBORHOOD_VEHICLE_STATES_INTEREST ] = lambda o, m: [int(vs.interest) for vs in o.neighborhood_vehicle_states] cls._static_methods[BufferID.WAYPOINT_PATHS_POSITION] = lambda o, m: [ wp.position.tolist() for wp in m(o) ] cls._static_methods[BufferID.WAYPOINT_PATHS_HEADING] = lambda o, m: [ float(wp.heading) for wp in m(o) ] cls._static_methods[BufferID.WAYPOINT_PATHS_LANE_WIDTH] = lambda o, m: [ wp.lane_width for wp in m(o) ] cls._static_methods[BufferID.WAYPOINT_PATHS_SPEED_LIMIT] = lambda o, m: [ wp.speed_limit for wp in m(o) ] cls._static_methods[BufferID.WAYPOINT_PATHS_LANE_OFFSET] = lambda o, m: [ wp.lane_offset for wp in m(o) ] cls._static_methods[BufferID.WAYPOINT_PATHS_LANE_ID] = lambda o, m: [ hash(wp.lane_id) for wp in m(o) ] cls._static_methods[BufferID.WAYPOINT_PATHS_LANE_INDEX] = lambda o, m: [ wp.lane_index for wp in m(o) ] cls._static_methods[BufferID.ROAD_WAYPOINTS_POSITION] = lambda o, m: [ wp.position.tolist() for wp in m(o) ] cls._static_methods[BufferID.ROAD_WAYPOINTS_HEADING] = lambda o, m: [ float(wp.heading) for wp in m(o) ] cls._static_methods[BufferID.ROAD_WAYPOINTS_LANE_WIDTH] = lambda o, m: [ wp.lane_width for wp in m(o) ] cls._static_methods[BufferID.ROAD_WAYPOINTS_SPEED_LIMIT] = lambda o, m: [ wp.speed_limit for wp in m(o) ] cls._static_methods[BufferID.ROAD_WAYPOINTS_LANE_OFFSET] = lambda o, m: [ wp.lane_offset for wp in m(o) ] cls._static_methods[BufferID.ROAD_WAYPOINTS_LANE_ID] = lambda o, m: [ hash(wp.lane_id) for wp in m(o) ] cls._static_methods[BufferID.ROAD_WAYPOINTS_LANE_INDEX] = lambda o, m: [ wp.lane_index for wp in m(o) ] cls._static_methods[BufferID.VIA_DATA_NEAR_VIA_POINTS_POSITION] = lambda o, m: [ via.position for via in o.via_data.near_via_points ] cls._static_methods[ BufferID.VIA_DATA_NEAR_VIA_POINTS_LANE_INDEX ] = lambda o, m: [via.lane_index for via in o.via_data.near_via_points] cls._static_methods[BufferID.VIA_DATA_NEAR_VIA_POINTS_ROAD_ID] = lambda o, m: [ hash(via.road_id) for via in o.via_data.near_via_points ] cls._static_methods[BufferID.VIA_DATA_NEAR_VIA_POINTS_HIT] = lambda o, m: [ int(via.hit) for via in o.via_data.near_via_points ] cls._static_methods[ BufferID.VIA_DATA_NEAR_VIA_POINTS_REQUIRED_SPEED ] = lambda o, m: [via.required_speed for via in o.via_data.near_via_points] cls._static_methods[BufferID.LIDAR_POINT_CLOUD_POINTS] = lambda o, m: [ l.tolist() for l in o.lidar_point_cloud[0] ] cls._static_methods[BufferID.LIDAR_POINT_CLOUD_HITS] = lambda o, m: [ int(h) for h in o.lidar_point_cloud[1] ] cls._static_methods[BufferID.LIDAR_POINT_CLOUD_ORIGIN] = lambda o, m: [ o.tolist() for o, _ in o.lidar_point_cloud[2] ] cls._static_methods[BufferID.LIDAR_POINT_CLOUD_DIRECTION] = lambda o, m: [ d.tolist() for _, d in o.lidar_point_cloud[2] ] cls._static_methods[BufferID.SIGNALS_LIGHT_STATE] = lambda o, m: [ int(l.state) for l in o.signals ] cls._static_methods[BufferID.SIGNALS_STOP_POINT] = lambda o, m: [ tuple(l.stop_point) for l in o.signals ] cls._static_methods[BufferID.SIGNALS_LAST_CHANGED] = lambda o, m: [ l.last_changed for l in o.signals ]
[docs]@dataclass class P3DShaderStep(_P3DCameraMixin, ShaderStep): """A camera used for rendering images using a shader and a full-screen quad.""" fullscreen_quad_node: NodePath
[docs] def update( self, pose: Optional[Pose] = None, height: Optional[float] = None, observation: Optional[Observation] = None, **kwargs, ): """Update the location of the shader directional values. Args: pose: The pose of the camera target. height: The height of the camera above the camera target. """ inputs = {} if pose is not None: self.fullscreen_quad_node.setShaderInputs( iHeading=pose.heading, iTranslation=(pose.point.x, pose.point.y), ) inputs["iHeading"] = pose.heading inputs["iTranslation"] = (pose.point.x, pose.point.y) if height is not None: inputs["iElevation"] = height if len(self.buffer_dependencies) == 0: return buffers = set(self.buffer_dependencies) if observation is not None: ba = _BufferAccessor() for b in buffers: if ba.should_get_data(b.buffer_id, observation): inputs[b.script_uniform_name] = ba.get_data_for_buffer( b.buffer_id, observation ) if inputs: self.fullscreen_quad_node.setShaderInputs(**inputs)
@property def position(self) -> Tuple[float, float, float]: raise ValueError("Shader step does not have a position.")
[docs]class Renderer(RendererBase): """The utility used to render simulation geometry.""" def __init__( self, simid: str, debug_mode: DEBUG_MODE = DEBUG_MODE.ERROR, rendering_backend: BACKEND_LITERALS = "pandagl", ): self._log: logging.Logger = logging.getLogger(self.__class__.__name__) self._is_setup = False self._simid = simid self._root_np = None self._vehicles_np = None self._signals_np = None self._road_map_np = None self._dashed_lines_np = None self._vehicle_nodes = {} self._signal_nodes = {} self._camera_nodes: Dict[str, Union[P3DOffscreenCamera, P3DShaderStep]] = {} _ShowBaseInstance.set_rendering_verbosity(debug_mode=debug_mode) _ShowBaseInstance.set_rendering_backend(rendering_backend=rendering_backend) # Note: Each instance of the SMARTS simulation will have its own Renderer, # but all Renderer objects share the same ShowBaseInstance. self._showbase_instance: _ShowBaseInstance = _ShowBaseInstance() self._interest_filter: Optional[re.Pattern] = None self._interest_color: Optional[Union[Colors, SceneColors]] = None @property def id(self): """The id of the simulation rendered.""" return self._simid @property def is_setup(self) -> bool: """If the renderer has been fully initialized.""" return self._is_setup @property def log(self) -> logging.Logger: """The rendering logger.""" return self._log
[docs] def remove_buffer(self, buffer): """Remove the rendering buffer.""" self._showbase_instance.graphicsEngine.removeWindow(buffer)
def _load_line_data(self, path: Path, name: str) -> GeomNode: # Extract geometry from model lines = [] road_lines_np = self._showbase_instance.loader.loadModel(path, noCache=True) geomNodeCollection = road_lines_np.findAllMatches("**/+GeomNode") for nodePath in geomNodeCollection: geomNode = nodePath.node() geom = geomNode.getGeom(0) vdata = geom.getVertexData() vreader = GeomVertexReader(vdata, "vertex") pts = [] while not vreader.isAtEnd(): v = vreader.getData3() pts.append((v.x, v.y, v.z)) lines.append(pts) # Create geometry node geo_format = GeomVertexFormat.getV3() vdata = GeomVertexData(name, geo_format, Geom.UHStatic) vertex = GeomVertexWriter(vdata, "vertex") prim = GeomLinestrips(Geom.UHStatic) for pts in lines: for x, y, z in pts: vertex.addData3(x, y, z) prim.add_next_vertices(len(pts)) assert prim.closePrimitive() geom = Geom(vdata) geom.addPrimitive(prim) node_path = GeomNode(name) node_path.addGeom(geom) return node_path def _ensure_root(self): if self._root_np is None: self._root_np = self._showbase_instance.setup_sim_root(self._simid) # if self._log.getEffectiveLevel() <= logging.DEBUG: # self._log.debug( print( "Renderer started with backend %s", self._showbase_instance.pipe.get_type(), )
[docs] def load_road_map(self, map_path: Union[str, Path]): """Load the road map from its path.""" # Load map self._ensure_root() if self._road_map_np: self._log.debug( "road_map=%s already exists. Removing and adding a new " "one from glb_path=%s", self._road_map_np, map_path, ) with suppress_output(): map_np = self._showbase_instance.loader.loadModel(map_path, noCache=True) node_path = self._root_np.attachNewNode("road_map") map_np.reparent_to(node_path) node_path.hide(RenderMasks.OCCUPANCY_HIDE) node_path.setColor(SceneColors.Road.value) self._road_map_np = node_path self._is_setup = True return map_np.getBounds()
[docs] def setup(self, scenario: StaticScenario): """Initialize this renderer.""" self._ensure_root() self._vehicles_np = self._root_np.attachNewNode("vehicles") self._signals_np = self._root_np.attachNewNode("signals") map_path = scenario.map_glb_filepath map_dir = Path(map_path).parent # Load map self.load_road_map(map_path) # Road lines (solid, yellow) road_lines_path = map_dir / "road_lines.glb" if road_lines_path.exists(): road_lines_np = self._load_line_data(road_lines_path, "road_lines") solid_lines_np = self._root_np.attachNewNode(road_lines_np) solid_lines_np.setColor(SceneColors.EdgeDivider.value) solid_lines_np.hide(RenderMasks.OCCUPANCY_HIDE) solid_lines_np.setRenderModeThickness(2) # Lane lines (dashed, white) lane_lines_path = map_dir / "lane_lines.glb" if lane_lines_path.exists(): lane_lines_np = self._load_line_data(lane_lines_path, "lane_lines") dashed_lines_np = self._root_np.attachNewNode(lane_lines_np) dashed_lines_np.setColor(SceneColors.LaneDivider.value) dashed_lines_np.hide(RenderMasks.OCCUPANCY_HIDE) dashed_lines_np.setRenderModeThickness(2) with pkg_resources.path( glsl, "dashed_line_shader.vert" ) as vshader_path, pkg_resources.path( glsl, "dashed_line_shader.frag" ) as fshader_path: dashed_line_shader = Shader.load( Shader.SL_GLSL, vertex=str(vshader_path.absolute()), fragment=str(fshader_path.absolute()), ) dashed_lines_np.setShader(dashed_line_shader, priority=20) dashed_lines_np.setShaderInput( "iResolution", self._showbase_instance.getSize() ) self._dashed_lines_np = dashed_lines_np if scenario_metadata := scenario.metadata: if interest_pattern := scenario_metadata.get( "actor_of_interest_re_filter", None ): self._interest_filter = re.compile(interest_pattern) self._interest_color = scenario_metadata.get( "actor_of_interest_color", SceneColors.SocialAgent ) self._is_setup = True
[docs] def render(self): """Render the scene graph of the simulation.""" if not self._is_setup: self._ensure_root() warnings.warn( "Renderer is not setup. Rendering before scene setup may be unintentional.", RendererNotSetUpWarning, ) self._showbase_instance.render_node(self._root_np)
[docs] def reset(self): """Reset the render back to initialized state.""" if self._vehicles_np is not None: self._vehicles_np.removeNode() self._vehicles_np = self._root_np.attachNewNode("vehicles") if self._signals_np is not None: self._signals_np.removeNode() self._signals_np = self._root_np.attachNewNode("signals") self._vehicle_nodes = {} self._signal_nodes = {}
[docs] def step(self): """provided for non-SMARTS uses; normally not used by SMARTS.""" self._showbase_instance.taskMgr.step()
[docs] def sync(self, sim_frame: SimulationFrame): """Update the current state of the vehicles and signals within the renderer.""" signal_ids = set() for actor_id, actor_state in sim_frame.actor_states_by_id.items(): if isinstance(actor_state, VehicleState): self.update_vehicle_node(actor_id, actor_state.pose) elif isinstance(actor_state, SignalState): signal_ids.add(actor_id) color = signal_state_to_color(actor_state.state) if actor_id not in self._signal_nodes: self.create_signal_node(actor_id, actor_state.stopping_pos, color) self.begin_rendering_signal(actor_id) else: self.update_signal_node(actor_id, actor_state.stopping_pos, color) missing_vehicle_ids = set(self._vehicle_nodes) - set(sim_frame.vehicle_ids) missing_signal_ids = set(self._signal_nodes) - signal_ids for vid in missing_vehicle_ids: self.remove_vehicle_node(vid) for sig_id in missing_signal_ids: self.remove_signal_node(sig_id)
[docs] def teardown(self): """Clean up internal resources.""" if self._root_np is not None: self._root_np.clearLight() self._root_np.removeNode() self._root_np = None self._vehicles_np = None for sig_id in list(self._signal_nodes): self.remove_signal_node(sig_id) self._signals_np = None self._road_map_np = None self._dashed_lines_np = None self._is_setup = False
[docs] def destroy(self): """Destroy the renderer. Cleans up all remaining renderer resources.""" self.teardown() self._showbase_instance = None
def __del__(self): self.destroy()
[docs] def set_interest(self, interest_filter: re.Pattern, interest_color: Colors): """Sets the color of all vehicles that have ids that match the given pattern. Args: interest_filter (re.Pattern): The regular expression pattern to match. interest_color (Colors): The color that the vehicle should show as. """ assert isinstance(interest_filter, re.Pattern) self._interest_filter = interest_filter self._interest_color = interest_color
[docs] def create_vehicle_node( self, glb_model: Union[str, Path], vid: str, color: Union[Colors, SceneColors], pose: Pose, ): """Create a vehicle node.""" if vid in self._vehicle_nodes: return False node_path = self._showbase_instance.loader.loadModel(glb_model) node_path.setName("vehicle-%s" % vid) if ( self._interest_filter is not None and self._interest_color is not None and self._interest_filter.match(vid) ): node_path.setColor(self._interest_color.value) else: node_path.setColor(color.value) pos, heading = pose.as_panda3d() node_path.setPosHpr(*pos, heading, 0, 0) node_path.hide(RenderMasks.DRIVABLE_AREA_HIDE) if color in (SceneColors.Agent,): node_path.hide(RenderMasks.OCCUPANCY_HIDE) self._vehicle_nodes[vid] = node_path return True
[docs] def begin_rendering_vehicle(self, vid: str, is_agent: bool): """Add the vehicle node to the scene graph""" vehicle_path = self._vehicle_nodes.get(vid, None) if not vehicle_path: self._log.warning("Renderer ignoring invalid vehicle id: %s", vid) return vehicle_path.reparentTo(self._vehicles_np)
[docs] def update_vehicle_node(self, vid: str, pose: Pose): """Move the specified vehicle node.""" vehicle_path = self._vehicle_nodes.get(vid, None) if not vehicle_path: self._log.warning("Renderer ignoring invalid vehicle id: %s", vid) return pos, heading = pose.as_panda3d() vehicle_path.setPosHpr(*pos, heading, 0, 0)
[docs] def remove_vehicle_node(self, vid: str): """Remove a vehicle node""" vehicle_path = self._vehicle_nodes.get(vid, None) if not vehicle_path: self._log.warning("Renderer ignoring invalid vehicle id: %s", vid) return vehicle_path.removeNode() del self._vehicle_nodes[vid]
[docs] def create_signal_node( self, sig_id: str, position: Point, color: Union[Colors, SceneColors] ): """Create a signal node.""" if sig_id in self._signal_nodes: return False # Create geometry node name = f"signal-{sig_id}" geo_format = GeomVertexFormat.getV3() vdata = GeomVertexData(name, geo_format, Geom.UHStatic) vertex = GeomVertexWriter(vdata, "vertex") num_pts = 10 # number of points around the circumference seg_radians = 2 * math.pi / num_pts vertex.addData3(0, 0, 0) for i in range(num_pts): angle = i * seg_radians x = math.cos(angle) y = math.sin(angle) vertex.addData3(x, y, 0) prim = GeomTrifans(Geom.UHStatic) prim.addVertex(0) # add center point prim.add_next_vertices(num_pts) # add outer points prim.addVertex(1) # add first outer point again to complete the circle assert prim.closePrimitive() geom = Geom(vdata) geom.addPrimitive(prim) geom_node = GeomNode(name) geom_node.addGeom(geom) node_path = self._root_np.attachNewNode(geom_node) node_path.setName(name) node_path.setColor(color.value) node_path.setPos(position.x, position.y, 0.01) node_path.setScale(0.9, 0.9, 1) node_path.hide(RenderMasks.DRIVABLE_AREA_HIDE) self._signal_nodes[sig_id] = node_path return True
[docs] def begin_rendering_signal(self, sig_id: str): """Add the signal node to the scene graph""" signal_np = self._signal_nodes.get(sig_id, None) if not signal_np: self._log.warning("Renderer ignoring invalid signal id: %s", sig_id) return signal_np.reparentTo(self._signals_np)
[docs] def update_signal_node( self, sig_id: str, position: Point, color: Union[Colors, SceneColors] ): """Move the specified signal node.""" signal_np = self._signal_nodes.get(sig_id, None) if not signal_np: self._log.warning("Renderer ignoring invalid signal id: %s", sig_id) return signal_np.setPos(position.x, position.y, 0.01) signal_np.setColor(color.value)
[docs] def remove_signal_node(self, sig_id: str): """Remove a signal node""" signal_np = self._signal_nodes.get(sig_id, None) if not signal_np: self._log.warning("Renderer ignoring invalid signal id: %s", sig_id) return signal_np.removeNode() del self._signal_nodes[sig_id]
[docs] def camera_for_id(self, camera_id: str) -> Union[P3DOffscreenCamera, P3DShaderStep]: """Get a camera by its id.""" camera = self._camera_nodes.get(camera_id) assert ( camera is not None ), f"Camera {camera_id} does not exist, have you created this camera?" return camera
[docs] def build_offscreen_camera( self, name: str, mask: int, width: int, height: int, resolution: float, ) -> None: """Generates a new off-screen camera.""" # setup buffer win_props = WindowProperties.size(width, height) fb_props = FrameBufferProperties() fb_props.setRgbColor(True) fb_props.setRgbaBits(8, 8, 8, 1) # XXX: Though we don't need the depth buffer returned, setting this to 0 # causes undefined behavior where the ordering of meshes is random. fb_props.setDepthBits(8) buffer = self._showbase_instance.pipe, "{}-buffer".format(name), -100, fb_props, win_props, GraphicsPipe.BFRefuseWindow,,, ) buffer.setClearColor((0, 0, 0, 0)) # Set background color to black # Necessary for the lane lines to be in the proper proportions if self._dashed_lines_np is not None: self._dashed_lines_np.setShaderInput( "iResolution", (buffer.size.x, buffer.size.y) ) # setup texture tex = Texture() region = buffer.getDisplayRegion(0) region.window.addRenderTexture( tex, GraphicsOutput.RTM_copy_ram, GraphicsOutput.RTP_color ) # setup camera lens = OrthographicLens() lens.setFilmSize(width * resolution, height * resolution) camera_np = self._showbase_instance.makeCamera( buffer, camName=name, scene=self._root_np, lens=lens ) camera_np.reparentTo(self._root_np) # mask is set to make undesirable objects invisible to this camera camera_np.node().setCameraMask(camera_np.node().getCameraMask() & mask) camera = P3DOffscreenCamera(self, camera_np, buffer, tex) self._camera_nodes[name] = camera
[docs] def build_shader_step( self, name: str, fshader_path: Union[str, Path], dependencies: Collection[ ShaderStepDependencyBase # Union[ShaderStepCameraDependency, ShaderStepVariableDependency, ShaderStepBufferDependency] ], priority: int, height: int, width: int, ) -> None: # setup buffer win_props = WindowProperties.size(width, height) fb_props = FrameBufferProperties() fb_props.setRgbColor(True) fb_props.setRgbaBits(8, 8, 8, 0) # XXX: Though we don't need the depth buffer returned, setting this to 0 # causes undefined behavior where the ordering of meshes is random. fb_props.setDepthBits(0) buffer = self._showbase_instance.pipe, "{}-buffer".format(name), priority, fb_props, win_props, GraphicsPipe.BFRefuseWindow,,, ) cm = CardMaker("filter-stage-quad") cm.setFrameFullscreenQuad() quad = NodePath(cm.generate()) quad.setDepthTest(0) quad.setDepthWrite(0) quad.setColor(1, 0.5, 0.5, 1) # setup texture tex = Texture() # tex.setup_2d_texture(width, height, Texture.T_unsigned_byte, Texture.F_r8i) region = buffer.getDisplayRegion(0) region.window.addRenderTexture( tex, GraphicsOutput.RTM_copy_ram, GraphicsOutput.RTP_color ) # setup camera lens = OrthographicLens() lens.setFilmSize(width, height) lens.setFilmSize(2, 2) lens.setFilmOffset(0, 0) lens.setNearFar(-1000, 1000) quadcamnode = Camera(name) quadcamnode.setLens(lens) quadcam: NodePath = quad.attachNewNode(quadcamnode) dr = buffer.makeDisplayRegion((0, 1, 0, 1)) dr.disableClears() dr.setCamera(quadcam) dr.setActive(True) dr.setScissorEnabled(False) # buffer clearing buffer.setClearColor((0, 0, 0, 0)) # Set background color to black buffer.setClearColorActive(True) assert tex.getExpectedRamImageSize() == tex.getXSize() * tex.getYSize() * 3 with pkg_resources.path(glsl, "unlit_shader.vert") as vshader_path: quad.setShader( Shader.load(Shader.SL_GLSL, vertex=vshader_path, fragment=fshader_path) ) camera_dependencies = tuple( c for c in dependencies if isinstance(c, ShaderStepCameraDependency) ) cameras = tuple( self.camera_for_id(c.camera_id) for c in camera_dependencies if isinstance(c, ShaderStepCameraDependency) ) for dep, dep_cam in zip(camera_dependencies, cameras): quad.setShaderInput(dep.script_variable_name, dep_cam.tex) size_x, size_y = dep_cam.image_dimensions quad.setShaderInput( f"{dep.script_variable_name}Resolution", n1=size_x, n2=size_y ) buffer_dependencies = tuple( v for v in dependencies if isinstance(v, ShaderStepBufferDependency) ) Renderer._set_shader_inputs( quad, width, height, buffers=buffer_dependencies ) variable_dependencies = tuple( v for v in dependencies if isinstance(v, ShaderStepVariableDependency) ) for dep in variable_dependencies: shader_input = ShaderInput(dep.script_variable_name, dep.value) quad.setShaderInput(shader_input) camera = P3DShaderStep( self, shader_file=fshader_path, camera_dependencies=cameras, buffer_dependencies=buffer_dependencies, camera_np=quadcam, buffer=buffer, tex=tex, fullscreen_quad_node=quad, ) self._camera_nodes[name] = camera
@staticmethod def _set_shader_inputs( surface, width, height, buffers: Tuple[ShaderStepBufferDependency] ): inputs = { "iResolution": (width, height), "iTranslation": (0.0, 0.0), "iHeading": 0.0, "iElevation": 0.0, } for svn, bn in ((b.script_uniform_name, b.buffer_id) for b in buffers): initial_value = None # SINGLE VALUES if bn in ( BufferID.DELTA_TIME, BufferID.ELAPSED_SIM_TIME, BufferID.EGO_VEHICLE_STATE_HEADING, BufferID.EGO_VEHICLE_STATE_SPEED, BufferID.EGO_VEHICLE_STATE_STEERING, BufferID.EGO_VEHICLE_STATE_YAW_RATE, BufferID.DISTANCE_TRAVELLED, ): initial_value = float() elif bn in ( BufferID.STEP_COUNT, BufferID.STEPS_COMPLETED, BufferID.VEHICLE_TYPE, ): initial_value = int() elif bn in ( BufferID.EVENTS_COLLISIONS, BufferID.EVENTS_OFF_ROAD, BufferID.EVENTS_OFF_ROUTE, BufferID.EVENTS_ON_SHOULDER, BufferID.EVENTS_WRONG_WAY, BufferID.EVENTS_NOT_MOVING, BufferID.EVENTS_REACHED_GOAL, BufferID.EVENTS_REACHED_MAX_EPISODE_STEPS, BufferID.EVENTS_AGENTS_ALIVE_DONE, BufferID.EVENTS_INTEREST_DONE, BufferID.UNDER_THIS_VEHICLE_CONTROL, ): initial_value = bool() elif bn in ( BufferID.EGO_VEHICLE_STATE_POSITION, BufferID.EGO_VEHICLE_STATE_BOUNDING_BOX, BufferID.EGO_VEHICLE_STATE_LANE_POSITION, ): initial_value = (float(), float(), float()) elif bn in ( BufferID.EGO_VEHICLE_STATE_LINEAR_VELOCITY, BufferID.EGO_VEHICLE_STATE_ANGULAR_VELOCITY, BufferID.EGO_VEHICLE_STATE_LINEAR_ACCELERATION, BufferID.EGO_VEHICLE_STATE_ANGULAR_ACCELERATION, BufferID.EGO_VEHICLE_STATE_LINEAR_JERK, BufferID.EGO_VEHICLE_STATE_ANGULAR_JERK, ): initial_value = (float(), float()) elif bn in ( BufferID.EGO_VEHICLE_STATE_ROAD_ID, BufferID.EGO_VEHICLE_STATE_LANE_ID, BufferID.EGO_VEHICLE_STATE_LANE_INDEX, ): initial_value = int() # Vector of NEIGHBORHOOD_VEHICLE_STATES elif bn in ( BufferID.NEIGHBORHOOD_VEHICLE_STATES_POSITION, BufferID.NEIGHBORHOOD_VEHICLE_STATES_BOUNDING_BOX, BufferID.NEIGHBORHOOD_VEHICLE_STATES_LANE_POSITION, ): initial_value = [ (float(), float(), float()), ] * 20 elif bn in ( BufferID.NEIGHBORHOOD_VEHICLE_STATES_HEADING, BufferID.NEIGHBORHOOD_VEHICLE_STATES_SPEED, ): initial_value = [ float(), ] elif bn in ( BufferID.NEIGHBORHOOD_VEHICLE_STATES_ROAD_ID, BufferID.NEIGHBORHOOD_VEHICLE_STATES_LANE_ID, BufferID.NEIGHBORHOOD_VEHICLE_STATES_LANE_INDEX, BufferID.NEIGHBORHOOD_VEHICLE_STATES_INTEREST, ): initial_value = [ int(), ] # Vector of waypoints from WAYPOINT_PATHS elif bn in (BufferID.WAYPOINT_PATHS_POSITION,): initial_value = [(float(), float())] elif bn in ( BufferID.WAYPOINT_PATHS_HEADING, BufferID.WAYPOINT_PATHS_LANE_WIDTH, BufferID.WAYPOINT_PATHS_SPEED_LIMIT, BufferID.WAYPOINT_PATHS_LANE_OFFSET, ): initial_value = [ float(), ] elif bn in ( BufferID.WAYPOINT_PATHS_LANE_ID, BufferID.WAYPOINT_PATHS_LANE_INDEX, ): initial_value = [ int(), ] # Vector of waypoints from ROAD_WAYPOINTS elif bn in (BufferID.ROAD_WAYPOINTS_POSITION,): initial_value = [ (float(), float()), ] elif bn in ( BufferID.ROAD_WAYPOINTS_HEADING, BufferID.ROAD_WAYPOINTS_LANE_WIDTH, BufferID.ROAD_WAYPOINTS_SPEED_LIMIT, BufferID.ROAD_WAYPOINTS_LANE_OFFSET, ): initial_value = [ float(), ] elif bn in ( BufferID.ROAD_WAYPOINTS_LANE_ID, BufferID.ROAD_WAYPOINTS_LANE_INDEX, ): initial_value = [ int(), ] # Vector of via data from VIA_DATA elif bn in (BufferID.VIA_DATA_NEAR_VIA_POINTS_POSITION,): initial_value = [ (float(), float()), ] elif bn in ( BufferID.VIA_DATA_NEAR_VIA_POINTS_LANE_INDEX, BufferID.VIA_DATA_NEAR_VIA_POINTS_ROAD_ID, BufferID.VIA_DATA_NEAR_VIA_POINTS_HIT, ): initial_value = [ int(), ] elif bn in (BufferID.VIA_DATA_NEAR_VIA_POINTS_REQUIRED_SPEED,): initial_value = [ float(), ] # Vector of lidar point information from LIDAR_POINT_CLOUD elif bn in ( BufferID.LIDAR_POINT_CLOUD_POINTS, BufferID.LIDAR_POINT_CLOUD_ORIGIN, BufferID.LIDAR_POINT_CLOUD_DIRECTION, ): initial_value = [ (float(), float(), float()), ] elif bn in (BufferID.LIDAR_POINT_CLOUD_HITS,): initial_value = [ int(), ] # SIGNALS elif bn in ( BufferID.SIGNALS_LIGHT_STATE, # BufferName.SIGNALS_CONTROLLED_LANES, ): initial_value = [ int(), ] elif bn in (BufferID.SIGNALS_STOP_POINT,): initial_value = [float(), float()] elif bn in (BufferID.SIGNALS_LAST_CHANGED,): initial_value = [ float(), ] else: raise ValueError(f"Buffer `{bn}` is not yet implemented.") inputs[svn] = initial_value surface.setShaderInputs(**inputs)