# Copyright (C) 2020. Huawei Technologies Co., Ltd. All rights reserved.
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NON-INFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
from __future__ import annotations
import itertools
import random
from typing import TYPE_CHECKING, List, Sequence, Tuple
import numpy as np
import psutil
from .utils import pybullet
from .utils.core_math import batches, rotate_quat
if TYPE_CHECKING:
from .lidar_sensor_params import SensorParams
[docs]class Lidar:
"""Lidar utilities."""
def __init__(self, origin, sensor_params: SensorParams):
self._origin = origin
self._sensor_params = sensor_params
self._n_threads = psutil.cpu_count(logical=False)
# As an optimization we compute a set of "base rays" once and shift translate
# them to follow the user, and then trace for collisions.
self._base_rays = None
self._static_lidar_noise = self._compute_static_lidar_noise()
def __eq__(self, __value: object) -> bool:
return isinstance(__value, Lidar) and ((self._origin == __value._origin).all())
@property
def origin(self):
"""The center of the emission point of the lidar lasers."""
return self._origin
@origin.setter
def origin(self, value):
self._origin = value
def _compute_static_lidar_noise(self):
n_rays = int(
(self._sensor_params.end_angle - self._sensor_params.start_angle)
/ self._sensor_params.angle_resolution
)
n_points = n_rays * len(self._sensor_params.laser_angles)
static_lidar_noise = []
for _ in range(n_points):
static_lidar_noise.append(
random.gauss(
self._sensor_params.noise_mu, self._sensor_params.noise_sigma
)
)
return np.array(static_lidar_noise, dtype=np.float64)
[docs] def compute_point_cloud(
self, bullet_client
) -> Tuple[List[np.ndarray], List[int], List[Tuple[np.ndarray, np.ndarray]]]:
"""Generate a point cloud.
Returns:
Point cloud of 3D points, a list of hit objects, a list of rays fired.
"""
rays = self._compute_rays()
point_cloud, hits = self._trace_rays(rays, bullet_client)
# point_cloud = self._apply_noise(point_cloud)
assert (
len(point_cloud) == len(hits) == len(rays) == len(self._static_lidar_noise)
)
return point_cloud, hits, rays
def _compute_rays(self):
if self._base_rays is None:
self._base_rays = []
n_rays = int(
(self._sensor_params.end_angle - self._sensor_params.start_angle)
/ self._sensor_params.angle_resolution
)
yaws = -1 * self._sensor_params.laser_angles
rolls = np.arange(n_rays) * self._sensor_params.angle_resolution
origin = np.array([0, 0, 0])
for yaw, roll in itertools.product(yaws, rolls):
rot = pybullet.getQuaternionFromEuler((roll, 0, yaw))
direction = rotate_quat(
np.asarray(rot, dtype=float),
np.asarray((0, self._sensor_params.max_distance, 0), dtype=float),
)
self._base_rays.append((origin, direction))
rays = [
(origin + self._origin, direction + self._origin)
for origin, direction in self._base_rays
]
return rays
def _trace_rays(self, rays, bullet_client):
results = []
for batched_rays in batches(
rays, int(pybullet.MAX_RAY_INTERSECTION_BATCH_SIZE - 1)
):
origins, directions = zip(*batched_rays)
results.extend(
bullet_client.rayTestBatch(origins, directions, self._n_threads)
)
hit_ids, _, _, positions, _ = zip(*results)
positions = list(positions)
hits = []
for i, position in enumerate(positions):
hit = hit_ids[i] != -1
hits.append(hit)
positions[i] = (
np.array(position) if hit else np.array([np.inf, np.inf, np.inf])
)
return positions, hits
def _apply_noise(self, point_cloud):
dynamic_noise = np.random.normal(
self._sensor_params.noise_mu,
self._sensor_params.noise_sigma,
size=len(point_cloud),
)
local_pc = point_cloud - self._origin
noise = self._static_lidar_noise + dynamic_noise
return point_cloud + (
local_pc
/ np.linalg.norm(local_pc, axis=1)[:, np.newaxis]
* noise[:, np.newaxis]
)