# Copyright (C) 2021. Huawei Technologies Co., Ltd. All rights reserved.
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NON-INFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
import argparse
import csv
import logging
import math
import os
import sqlite3
import sys
from collections import deque
from pathlib import Path
from typing import Any, Callable, Deque, Dict, Generator, Iterable, Optional
import numpy as np
from smarts.core.coordinates import BoundingBox, Point
from smarts.core.signal_provider import SignalLightState
from smarts.core.utils.core_math import (
circular_mean,
constrain_angle,
min_angles_difference_signed,
vec_to_radians,
)
from smarts.core.utils.file import read_tfrecord_file
from smarts.sstudio import sstypes
from smarts.waymo.exceptions import WaymoDatasetError
METERS_PER_FOOT = 0.3048
DEFAULT_LANE_WIDTH = 3.7 # a typical US highway lane is 12ft ~= 3.7m wide
class _TrajectoryDataset:
def __init__(self, dataset_spec: Dict[str, Any], output: str):
self._log = logging.getLogger(self.__class__.__name__)
self.check_dataset_spec(dataset_spec)
self._output = output
self._path = os.path.expanduser(dataset_spec["input_path"])
real_lane_width_m = dataset_spec.get("real_lane_width_m", DEFAULT_LANE_WIDTH)
lane_width = dataset_spec.get("map_lane_width", real_lane_width_m)
self._scale = lane_width / real_lane_width_m
self._flip_y = dataset_spec.get("flip_y", False)
self._swap_xy = dataset_spec.get("swap_xy", False)
# most trajectory datasets have .1s time delta (i.e., were collected at 10 Hz)
self._dt_sec = 0.1
class _WindowedReader:
"""Iterates over the rows in file using a sliding window that keeps track of
both a number of rows before the current row and a number of rows after.
These "windows" are passed to a row transformation function on each step.
For example, if window_before = 4 and window_after = 3, for a 9-row
file, the windows associated with each row are:
row 1, before = [], after = [2, 3, 4]
row 2, before = [1], after = [3, 4, 5]
row 3, before = [2, 1], after = [4, 5, 6]
row 4, before = [3, 2, 1], after = [5, 6, 7]
row 5, before = [4, 3, 2, 1], after = [6, 7, 8]
row 6, before = [5, 4, 3, 2], after = [7, 8, 9]
row 7, before = [6, 5, 4, 3], after = [8, 9]
row 8, before = [7, 6, 5, 4], after = [9]
row 9, before = [8, 7, 6, 5], after = []
Windows are cleared whenever the value in the (optional) `group_col` column changes.
This was designed to be nestable by making the `row_gen` parameter support an iterator over another _WindowedReader.
"""
Row = Dict[str, Any]
def __init__(
self,
row_gen: Iterable[Row],
transform_fn: Callable[[Row, Deque[Row], Deque[Row]], None],
window_before: int = 0,
window_after: int = 0,
group_col: Optional[str] = None,
):
self._row_gen = row_gen
self._transform_fn = transform_fn
self._before_width = window_before
self._after_width = window_after
self._group_col = group_col
def __iter__(self) -> Generator[Row, None, None]:
after_win = deque(maxlen=self._after_width)
before_win = deque(maxlen=self._before_width)
cur_row = None
prev_group = None
for row in self._row_gen:
if self._group_col and row[self._group_col] != prev_group:
while after_win:
if cur_row:
before_win.appendleft(cur_row)
cur_row = after_win.popleft()
self._transform_fn(cur_row, before_win, after_win)
yield cur_row
before_win.clear()
cur_row = None
if self._group_col:
prev_group = row[self._group_col]
if len(after_win) < self._after_width:
after_win.append(row)
continue
if cur_row:
before_win.appendleft(cur_row)
cur_row = after_win.popleft() if after_win else row
after_win.append(row)
self._transform_fn(cur_row, before_win, after_win)
yield cur_row
while after_win:
if cur_row:
before_win.appendleft(cur_row)
cur_row = after_win.popleft()
self._transform_fn(cur_row, before_win, after_win)
yield cur_row
@property
def scale(self) -> float:
"""The base scale based on the ratio of map lane size to real lane size."""
return self._scale
@property
def traffic_light_rows(self) -> Iterable:
"""Iterable dataset rows representing traffic light states (if present)."""
raise NotImplementedError
@property
def rows(self) -> Iterable:
"""The iterable rows of the dataset."""
raise NotImplementedError
def column_val_in_row(self, row, col_name: str) -> Any:
"""Access the value of a dataset row which intersects with the given column name."""
# XXX: this public method is improper because this requires a dataset row but that is
# implementation specific.
raise NotImplementedError
def check_dataset_spec(self, dataset_spec: Dict[str, Any]):
"""Validate the form of the dataset specification."""
errmsg = None
if "input_path" not in dataset_spec:
errmsg = "'input_path' field is required in dataset_spec."
elif dataset_spec.get("flip_y"):
if dataset_spec["source_type"] != "NGSIM":
errmsg = "'flip_y' option only supported for NGSIM datasets."
elif not dataset_spec.get("_map_bbox"):
errmsg = "'_map_bbox' is required if 'flip_y' option used; need to pass in a map_spec."
if errmsg:
self._log.error(errmsg)
raise ValueError(errmsg)
self._dataset_spec = dataset_spec
def _write_dict(self, curdict: Dict, insert_sql: str, cursor, curkey: str = ""):
for key, value in curdict.items():
newkey = f"{curkey}.{key}" if curkey else key
if isinstance(value, dict):
self._write_dict(value, insert_sql, cursor, newkey)
else:
cursor.execute(insert_sql, (newkey, str(value)))
def _create_tables(self, dbconxn):
ccur = dbconxn.cursor()
ccur.execute(
"""CREATE TABLE Spec (
key TEXT PRIMARY KEY,
value TEXT
) WITHOUT ROWID"""
)
ccur.execute(
"""CREATE TABLE Vehicle (
id INTEGER PRIMARY KEY,
type INTEGER NOT NULL,
length REAL,
width REAL,
height REAL,
is_ego_vehicle INTEGER DEFAULT 0
) WITHOUT ROWID"""
)
ccur.execute(
"""CREATE TABLE Trajectory (
vehicle_id INTEGER NOT NULL,
sim_time REAL NOT NULL,
position_x REAL NOT NULL,
position_y REAL NOT NULL,
heading_rad REAL NOT NULL,
speed REAL DEFAULT 0.0,
lane_id INTEGER DEFAULT 0,
PRIMARY KEY (vehicle_id, sim_time),
FOREIGN KEY (vehicle_id) REFERENCES Vehicle(id)
) WITHOUT ROWID"""
)
ccur.execute(
"""CREATE TABLE TrafficLightState (
sim_time REAL NOT NULL,
state INTEGER NOT NULL,
stop_point_x REAL NOT NULL,
stop_point_y REAL NOT NULL,
lane INTEGER NOT NULL
)"""
)
dbconxn.commit()
ccur.close()
def create_output(self, time_precision: int = 3):
"""Convert the dataset into the output database file.
Args:
time_precision: A limit for digits after decimal for each processed sim_time.
(3 is millisecond precision)
"""
dbconxn = sqlite3.connect(self._output)
self._log.debug("creating tables...")
self._create_tables(dbconxn)
self._log.debug("inserting data...")
iscur = dbconxn.cursor()
insert_kv_sql = "INSERT INTO Spec VALUES (?, ?)"
self._write_dict(self._dataset_spec, insert_kv_sql, iscur)
dbconxn.commit()
iscur.close()
# TAI: can use executemany() and batch insert rows together if this turns out to be too slow...
insert_vehicle_sql = "INSERT INTO Vehicle VALUES (?, ?, ?, ?, ?, ?)"
insert_traj_sql = "INSERT INTO Trajectory VALUES (?, ?, ?, ?, ?, ?, ?)"
insert_traffic_light_sql = (
"INSERT INTO TrafficLightState VALUES (?, ?, ?, ?, ?)"
)
vehicle_ids = set()
itcur = dbconxn.cursor()
x_offset = self._dataset_spec.get("x_offset", 0.0)
y_offset = self._dataset_spec.get("y_offset", 0.0)
for row in self.rows:
vid = int(self.column_val_in_row(row, "vehicle_id"))
if vid not in vehicle_ids:
ivcur = dbconxn.cursor()
# These are not available in all datasets
height = self.column_val_in_row(row, "height")
is_ego = self.column_val_in_row(row, "is_ego_vehicle")
veh_args = (
vid,
int(self.column_val_in_row(row, "type")),
float(self.column_val_in_row(row, "length")) * self.scale,
float(self.column_val_in_row(row, "width")) * self.scale,
float(height) * self.scale if height else None,
int(is_ego) if is_ego else 0,
)
ivcur.execute(insert_vehicle_sql, veh_args)
ivcur.close()
dbconxn.commit()
vehicle_ids.add(vid)
traj_args = (
vid,
# time units are in milliseconds for both NGSIM and Interaction datasets, convert to secs
round(
float(self.column_val_in_row(row, "sim_time")) / 1000,
time_precision,
),
(float(self.column_val_in_row(row, "position_x")) + x_offset)
* self.scale,
(float(self.column_val_in_row(row, "position_y")) + y_offset)
* self.scale,
float(self.column_val_in_row(row, "heading_rad")),
float(self.column_val_in_row(row, "speed")) * self.scale,
self.column_val_in_row(row, "lane_id"),
)
# Ignore datapoints with NaNs
if not any(a is not None and np.isnan(a) for a in traj_args):
itcur.execute(insert_traj_sql, traj_args)
# Insert traffic light states if available
try:
for row in self.traffic_light_rows:
tls_args = (
round(
float(self.column_val_in_row(row, "sim_time")) / 1000,
time_precision,
),
int(self.column_val_in_row(row, "state")),
float(self.column_val_in_row(row, "stop_point_x") + x_offset)
* self.scale,
float(self.column_val_in_row(row, "stop_point_y") + y_offset)
* self.scale,
float(self.column_val_in_row(row, "lane")),
)
itcur.execute(insert_traffic_light_sql, tls_args)
except NotImplementedError:
pass
itcur.close()
dbconxn.commit()
# ensure that sim_time always starts at 0:
self._log.debug("shifting sim_times..")
mcur = dbconxn.cursor()
mcur.execute(
f"UPDATE Trajectory SET sim_time = round(sim_time - (SELECT min(sim_time) FROM Trajectory), {time_precision})"
)
mcur.close()
dbconxn.commit()
self._log.debug("creating indices..")
icur = dbconxn.cursor()
icur.execute("CREATE INDEX Trajectory_Time ON Trajectory (sim_time)")
icur.execute("CREATE INDEX Trajectory_Vehicle ON Trajectory (vehicle_id)")
icur.execute("CREATE INDEX Vehicle_Type ON Vehicle (type)")
icur.execute(
"CREATE INDEX TrafficLightState_SimTime ON TrafficLightState (sim_time)"
)
dbconxn.commit()
icur.close()
dbconxn.close()
self._log.debug("output done")
[docs]class Interaction(_TrajectoryDataset):
"""A tool to convert a dataset to a database for use in SMARTS."""
def __init__(self, dataset_spec: Dict[str, Any], output: str):
super().__init__(dataset_spec, output)
assert not self._flip_y
self._max_angular_velocity = dataset_spec.get("max_angular_velocity", None)
self._heading_min_speed = dataset_spec.get("heading_inference_min_speed", 2.2)
self._prev_heading = None
self._next_row = None
[docs] def check_dataset_spec(self, dataset_spec: Dict[str, Any]):
super().check_dataset_spec(dataset_spec)
hiw = dataset_spec.get("heading_inference_window", 2)
# 11 is a semi-arbitrary max just to keep things "sane".
if not 2 <= hiw <= 11:
raise ValueError("heading_inference_window must be between 2 and 11")
def _lookup_agent_type(self, agent_type: str) -> int:
# Try to match the NGSIM types...
if agent_type == "motorcycle":
return 1
elif agent_type == "car":
return 2
elif agent_type == "truck":
return 3
elif agent_type == "pedestrian/bicycle":
return 4
self._log.warning(f"unknown agent_type: {agent_type}.")
return 0
def _row_gen(self) -> Generator[_TrajectoryDataset._WindowedReader.Row, None, None]:
x_margin = self._dataset_spec.get("x_margin_px", 0) / self.scale
y_margin = self._dataset_spec.get("y_margin_px", 0) / self.scale
with open(self._path, newline="") as csvfile:
for row in csv.DictReader(csvfile):
# See: https://interaction-dataset.com/details-and-format
# position and length/width are in meters.
# Note: track_id will be like "P12" for pedestrian tracks. (TODO)
row["vehicle_id"] = int(row["track_id"])
row["sim_time"] = row["timestamp_ms"]
if self._swap_xy:
row["position_x"] = float(row["y"])
row["position_y"] = float(row["x"])
row["vx"] = float(row["vy"])
row["vy"] = float(row["vx"])
else:
row["position_x"] = float(row["x"])
row["position_y"] = float(row["y"])
row["vx"] = float(row["vx"])
row["vy"] = float(row["vy"])
row["length"] = float(row.get("length", 0.0))
row["width"] = float(row.get("width", 0.0))
row["type"] = self._lookup_agent_type(row["agent_type"])
# offset of the map from the data...
if x_margin:
row["position_x"] -= x_margin
if y_margin:
row["position_y"] -= y_margin
if self._flip_y:
map_bb = self._dataset_spec["_map_bbox"]
row["position_y"] = map_bb.max_pt.y / self.scale - row["position_y"]
yield row
def _cal_speed(
self,
row: _TrajectoryDataset._WindowedReader.Row,
before_win: Deque[_TrajectoryDataset._WindowedReader.Row],
after_win: Deque[_TrajectoryDataset._WindowedReader.Row],
):
row["speed"] = np.linalg.norm((row["vx"], row["vy"]))
if after_win:
c = np.array((row["position_x"], row["position_y"]))
n = np.array((after_win[0]["position_x"], after_win[0]["position_y"]))
if not any(np.isnan(c)) and not any(np.isnan(n)):
# XXX: could try to divide by sim_time delta here instead of assuming it's fixed
row["speed"] = np.linalg.norm(n - c) / self._dt_sec
def _infer_heading(
self,
row: _TrajectoryDataset._WindowedReader.Row,
before_win: Deque[_TrajectoryDataset._WindowedReader.Row],
after_win: Deque[_TrajectoryDataset._WindowedReader.Row],
):
window = [np.array((r["position_x"], r["position_y"])) for r in before_win]
window.reverse()
window += [np.array((row["position_x"], row["position_y"]))]
window += [np.array((r["position_x"], r["position_y"])) for r in after_win]
speeds = (
[r["speed"] for r in before_win]
+ [row["speed"]]
+ [r["speed"] for r in after_win]
)
vecs = []
prev_vhat = None
prev_inst_heading = None
for w in range(len(window) - 1):
c = window[w]
n = window[w + 1]
if any(np.isnan(c)) or any(np.isnan(n)):
if prev_vhat is not None:
vecs.append(prev_vhat)
continue
s = np.linalg.norm(n - c)
if s == 0.0 or (
self._heading_min_speed is not None
and (
(s / self._dt_sec) < self._heading_min_speed
or speeds[w] < self._heading_min_speed
)
):
if prev_vhat is not None:
vecs.append(prev_vhat)
continue
vhat = (n - c) / s
inst_heading = vec_to_radians(vhat)
if prev_inst_heading is not None:
if self._max_angular_velocity:
# XXX: could try to divide by sim_time delta here instead of assuming it's fixed
angular_velocity = (
min_angles_difference_signed(inst_heading, prev_inst_heading)
/ self._dt_sec
)
if abs(angular_velocity) > self._max_angular_velocity:
inst_heading = (
prev_inst_heading
+ np.sign(angular_velocity)
* self._max_angular_velocity
* self._dt_sec
)
inst_heading += 0.5 * math.pi
vhat = np.array(
(math.cos(inst_heading), math.sin(inst_heading))
)
vecs.append(vhat)
prev_vhat = vhat
prev_inst_heading = inst_heading
if vecs:
new_heading = circular_mean(vecs)
elif self._prev_heading is not None:
new_heading = self._prev_heading
elif "psi_rad" in row:
new_heading = float(row["psi_rad"]) - 0.5 * math.pi
else:
new_heading = self._default_heading
self._prev_heading = new_heading
row["heading_rad"] = new_heading % (2 * math.pi)
@property
def rows(self) -> Generator[Dict, None, None]:
self._log.debug("transforming Interaction data...")
# first calculate speeds based on positions (instead of vx, vy)
# since dataset speeds are "instantaneous"and so don't match with dPos/dt, which can affect some models.
speeds_gen = _TrajectoryDataset._WindowedReader(
self._row_gen(), self._cal_speed, 0, 1, "vehicle_id"
)
# now infer heading with rolling window...
heading_window = self._dataset_spec.get("heading_inference_window", 2)
heading_before_win = int((heading_window / 2) + (heading_window % 2) - 1)
heading_after_win = int(heading_window / 2)
headings_gen = _TrajectoryDataset._WindowedReader(
speeds_gen,
self._infer_heading,
heading_before_win,
heading_after_win,
"vehicle_id",
)
map_bbox = self._dataset_spec.get("_map_bbox")
# note: iterating over outer generator iterates over all nested generators too...
# XXX: assumes all timesteps for a vehicle are grouped together in the file and are in sorted temporal order
for row in headings_gen:
if map_bbox and not map_bbox.contains(
Point(self.scale * row["position_x"], self.scale * row["position_y"])
):
self._log.info(
f"skipping row for vehicle {row['vehicle_id']} with position off of map"
)
continue
yield row
[docs] def column_val_in_row(self, row, col_name: str) -> Any:
return row.get(col_name)
[docs]class NGSIM(_TrajectoryDataset):
"""A tool for conversion of a NGSIM dataset for use within SMARTS."""
def __init__(self, dataset_spec: Dict[str, Any], output: str):
super().__init__(dataset_spec, output)
# self._prev_heading = 3 * math.pi / 2
self._prev_heading = None
self._default_heading = dataset_spec.get("default_heading", 3.0 * math.pi / 2.0)
self._max_angular_velocity = dataset_spec.get("max_angular_velocity", None)
# 2.2 corresponds to roughly 5mph.
self._heading_min_speed = dataset_spec.get("heading_inference_min_speed", 2.2)
self._determine_columns()
[docs] def check_dataset_spec(self, dataset_spec: Dict[str, Any]):
super().check_dataset_spec(dataset_spec)
hiw = dataset_spec.get("heading_inference_window", 2)
# 11 is a semi-arbitrary max just to keep things "sane".
if not 2 <= hiw <= 11:
raise ValueError("heading_inference_window must be between 2 and 11")
def _determine_columns(self):
self._columns = (
"vehicle_id",
"frame_id", # 1 frame per .1s
"total_frames",
"sim_time", # msecs
# front center in feet from left lane edge
"position_x" if not self._swap_xy else "position_y",
# front center in feet from entry edge
"position_y" if not self._swap_xy else "position_x",
"global_x" if not self._swap_xy else "global_y", # front center in feet
"global_y" if not self._swap_xy else "global_x", # front center in feet
"length", # feet
"width", # feet
"type", # 1 = motorcycle, 2 = auto, 3 = truck
"speed", # feet / sec
"acceleration", # feet / sec^2
"lane_id", # lower is further left
"preceding_vehicle_id",
"following_vehicle_id",
"spacing", # feet
"headway", # secs
)
with open(self._path, newline="") as infile:
num_cols = len(infile.readline().strip().split())
if num_cols > len(self._columns):
extra_cols = (
"origin_zone",
"destination_zone",
"intersection",
"section",
"direction",
"movement",
)
self._columns = self._columns[:16] + extra_cols + self._columns[16:]
assert num_cols == len(
self._columns
), f"unexpected number of columns/fields ({num_cols}) in {self._path}"
def _smooth_positions(
self,
row: _TrajectoryDataset._WindowedReader.Row,
before_win: Deque[_TrajectoryDataset._WindowedReader.Row],
after_win: Deque[_TrajectoryDataset._WindowedReader.Row],
):
pos_width = 1 + before_win.maxlen + after_win.maxlen
sumwin = (
lambda d, key: sum(
d[r][key] if r < len(d) else d[-1][key] for r in range(d.maxlen)
)
if d
else row[key] * d.maxlen
)
row["position_x"] += sumwin(before_win, "position_x") + sumwin(
after_win, "position_x"
)
row["position_x"] /= pos_width
row["position_y"] += sumwin(before_win, "position_y") + sumwin(
after_win, "position_y"
)
row["position_y"] /= pos_width
def _infer_heading(
self,
row: _TrajectoryDataset._WindowedReader.Row,
before_win: Deque[_TrajectoryDataset._WindowedReader.Row],
after_win: Deque[_TrajectoryDataset._WindowedReader.Row],
):
window = [np.array((r["position_x"], r["position_y"])) for r in before_win]
window.reverse()
window += [np.array((row["position_x"], row["position_y"]))]
window += [np.array((r["position_x"], r["position_y"])) for r in after_win]
speeds = (
[r["speed"] for r in before_win]
+ [row["speed"]]
+ [r["speed"] for r in after_win]
)
vecs = []
prev_vhat = None
prev_inst_heading = None
for w in range(len(window) - 1):
c = window[w]
n = window[w + 1]
if any(np.isnan(c)) or any(np.isnan(n)):
if prev_vhat is not None:
vecs.append(prev_vhat)
continue
s = np.linalg.norm(n - c)
if s == 0.0 or (
self._heading_min_speed is not None
and (
(s / self._dt_sec) < self._heading_min_speed
or speeds[w] < self._heading_min_speed
)
):
if prev_vhat is not None:
vecs.append(prev_vhat)
continue
vhat = (n - c) / s
inst_heading = vec_to_radians(vhat)
if prev_inst_heading is not None:
if self._max_angular_velocity:
# XXX: could try to divide by sim_time delta here instead of assuming it's fixed
angular_velocity = (
min_angles_difference_signed(inst_heading, prev_inst_heading)
/ self._dt_sec
)
if abs(angular_velocity) > self._max_angular_velocity:
inst_heading = (
prev_inst_heading
+ np.sign(angular_velocity)
* self._max_angular_velocity
* self._dt_sec
)
inst_heading += 0.5 * math.pi
vhat = np.array(
(math.cos(inst_heading), math.sin(inst_heading))
)
vecs.append(vhat)
prev_vhat = vhat
prev_inst_heading = inst_heading
if vecs:
new_heading = circular_mean(vecs)
elif self._prev_heading is None:
# TAI: backfill from the first "real" heading (second pass)
new_heading = self._default_heading
else:
new_heading = self._prev_heading
self._prev_heading = new_heading
row["heading_rad"] = new_heading % (2 * math.pi)
# now since SMARTS' positions are the vehicle centerpoints, but NGSIM's are at the front
# we must adjust the vehicle position to its centerpoint based on its inferred heading angle (+y = 0 rad)
adj_heading = row["heading_rad"] + 0.5 * math.pi
half_len = 0.5 * row["length"]
# XXX: need to use a different key heree since changing position_x or position_y would probably
# XXX: affect a row that's still in the before window of a nested generator (smooth_positions).
row["adj_position_x"] = row["position_x"] - half_len * np.cos(adj_heading)
row["adj_position_y"] = row["position_y"] - half_len * np.sin(adj_heading)
def _cal_speed(
self,
row: _TrajectoryDataset._WindowedReader.Row,
before_win: Deque[_TrajectoryDataset._WindowedReader.Row],
after_win: Deque[_TrajectoryDataset._WindowedReader.Row],
):
row["speed_discrete"] = None
if not after_win:
return row
c = np.array((row["adj_position_x"], row["adj_position_y"]))
n = np.array((after_win[0]["adj_position_x"], after_win[0]["adj_position_y"]))
if not any(np.isnan(c)) and not any(np.isnan(n)):
# XXX: could try to divide by sim_time delta here instead of assuming it's fixed
row["speed_discrete"] = np.linalg.norm(n - c) / self._dt_sec
def _row_gen(self) -> Generator[_TrajectoryDataset._WindowedReader.Row, None, None]:
x_margin = self._dataset_spec.get("x_margin_px", 0) / self.scale
y_margin = self._dataset_spec.get("y_margin_px", 0) / self.scale
with open(self._path, newline="") as infile:
for line in infile:
fields = line.split()
row = {col: fields[f] for f, col in enumerate(self._columns)}
row["lane_id"] = int(row["lane_id"])
row["length"] = float(row["length"]) * METERS_PER_FOOT
row["width"] = float(row["width"]) * METERS_PER_FOOT
row["speed"] = float(row["speed"]) * METERS_PER_FOOT
row["acceleration"] = float(row["acceleration"]) * METERS_PER_FOOT
row["spacing"] = float(row["spacing"]) * METERS_PER_FOOT
row["position_x"] = float(row["position_x"]) * METERS_PER_FOOT
row["position_y"] = float(row["position_y"]) * METERS_PER_FOOT
# offset of the map from the data...
if x_margin:
row["position_x"] -= x_margin
if y_margin:
row["position_y"] -= y_margin
if self._flip_y:
map_bb = self._dataset_spec["_map_bbox"]
row["position_y"] = map_bb.max_pt.y / self.scale - row["position_y"]
yield row
@property
def rows(self) -> Generator[Dict, None, None]:
self._log.debug("transforming NGSIM data...")
# smooth positions using a moving average...
# TAI: make this window size a parameter too?
posns_gen = _TrajectoryDataset._WindowedReader(
self._row_gen(), self._smooth_positions, 7, 7, "vehicle_id"
)
# infer heading with rolling window on previously-smoothed positions...
heading_window = self._dataset_spec.get("heading_inference_window", 2)
heading_before_win = int((heading_window / 2) + (heading_window % 2) - 1)
heading_after_win = int(heading_window / 2)
headings_gen = _TrajectoryDataset._WindowedReader(
posns_gen,
self._infer_heading,
heading_before_win,
heading_after_win,
"vehicle_id",
)
# finally calculate speeds based on these smoothed and centered positions...
# (This also overcomes problem that NGSIM speeds are "instantaneous"
# and so don't match with dPos/dt, which can affect some models.)
speeds_gen = _TrajectoryDataset._WindowedReader(
headings_gen, self._cal_speed, 0, 1, "vehicle_id"
)
map_bbox = self._dataset_spec.get("_map_bbox")
# note: iterating over outer generator iterates over all nested generators too...
# XXX: assumes all timesteps for a vehicle are grouped together in the file and are in sorted temporal order
for row in speeds_gen:
if map_bbox and not map_bbox.contains(
Point(
self.scale * row["adj_position_x"],
self.scale * row["adj_position_y"],
)
):
self._log.info(
f"skipping row for vehicle {row['vehicle_id']} with position off of map"
)
continue
yield row
[docs] def column_val_in_row(self, row, col_name: str) -> Any:
if col_name == "speed":
return row["speed_discrete"] if row["speed_discrete"] else row["speed"]
if col_name == "position_x":
return row["adj_position_x"]
if col_name == "position_y":
return row["adj_position_y"]
return row.get(col_name)
[docs]class Waymo(_TrajectoryDataset):
"""A tool for conversion of a Waymo dataset for use within SMARTS."""
def __init__(self, dataset_spec: Dict[str, Any], output: str):
super().__init__(dataset_spec, output)
def _get_scenario(self):
if "scenario_id" not in self._dataset_spec:
errmsg = "Dataset spec requires scenario_id to be set"
self._log.error(errmsg)
raise ValueError(errmsg)
scenario_id = self._dataset_spec["scenario_id"]
# Loop over the scenarios in the TFRecord and check its ID for a match
scenario = None
dataset = read_tfrecord_file(self._dataset_spec["input_path"])
from smarts.waymo.waymo_open_dataset.protos import scenario_pb2
for record in dataset:
parsed_scenario = scenario_pb2.Scenario()
parsed_scenario.ParseFromString(bytes(record))
if parsed_scenario.scenario_id == scenario_id:
return parsed_scenario
raise ValueError(
f"Dataset file does not contain scenario with id: {scenario_id}"
)
@property
def rows(self) -> Generator[Dict, None, None]:
def lerp(a: float, b: float, t: float) -> float:
return t * (b - a) + a
scenario = self._get_scenario()
for i in range(len(scenario.tracks)):
vehicle_id = scenario.tracks[i].id
vehicle_type = self._lookup_agent_type(scenario.tracks[i].object_type)
num_steps = len(scenario.timestamps_seconds)
rows = []
# First pass -- extract data
for j in range(num_steps):
obj_state = scenario.tracks[i].states[j]
vel = np.array([obj_state.velocity_x, obj_state.velocity_y])
row = dict()
row["valid"] = obj_state.valid
row["vehicle_id"] = vehicle_id
row["type"] = vehicle_type
row["length"] = obj_state.length
row["height"] = obj_state.height
row["width"] = obj_state.width
row["sim_time"] = scenario.timestamps_seconds[j]
row["position_x"] = obj_state.center_x
row["position_y"] = obj_state.center_y
row["heading_rad"] = (obj_state.heading - math.pi / 2) % (2 * math.pi)
row["speed"] = np.linalg.norm(vel)
row["lane_id"] = 0
row["is_ego_vehicle"] = 1 if i == scenario.sdc_track_index else 0
rows.append(row)
# Second pass -- align timesteps to 10 Hz and interpolate trajectory data if needed
interp_rows = [None] * num_steps
for j in range(num_steps):
row = rows[j]
time_current = row["sim_time"]
time_expected = round(j * self._dt_sec, 3)
time_error = time_current - time_expected
if round(abs(time_error), 1) >= self._dt_sec:
raise WaymoDatasetError(
f"[{scenario.scenario_id}] Waymo data deviates by more than the size of 1 timestep. This likely indicates a gap in the dataset."
)
if not row["valid"] or time_error == 0:
continue
if time_error > 0:
# We can't interpolate if the previous element doesn't exist or is invalid
if j == 0 or not rows[j - 1]["valid"]:
continue
# Interpolate backwards using previous timestep
interp_row = {"sim_time": time_expected}
prev_row = rows[j - 1]
prev_time = prev_row["sim_time"]
t = (time_expected - prev_time) / (time_current - prev_time)
interp_row["speed"] = lerp(prev_row["speed"], row["speed"], t)
interp_row["position_x"] = lerp(
prev_row["position_x"], row["position_x"], t
)
interp_row["position_y"] = lerp(
prev_row["position_y"], row["position_y"], t
)
interp_row["heading_rad"] = lerp(
prev_row["heading_rad"], row["heading_rad"], t
)
interp_rows[j] = interp_row
else:
# We can't interpolate if the next element doesn't exist or is invalid
if (
j == len(scenario.timestamps_seconds) - 1
or not rows[j + 1]["valid"]
):
continue
# Interpolate forwards using next timestep
interp_row = {"sim_time": time_expected}
next_row = rows[j + 1]
next_time = next_row["sim_time"]
t = (time_expected - time_current) / (next_time - time_current)
interp_row["speed"] = lerp(row["speed"], next_row["speed"], t)
interp_row["position_x"] = lerp(
row["position_x"], next_row["position_x"], t
)
interp_row["position_y"] = lerp(
row["position_y"], next_row["position_y"], t
)
h1 = row["heading_rad"]
h2 = next_row["heading_rad"]
if h2 - h1 > math.pi:
h1 += 2 * math.pi
elif h1 - h2 > math.pi:
h2 += 2 * math.pi
interp_row["heading_rad"] = lerp(h1, h2, t) % (2 * math.pi)
interp_rows[j] = interp_row
# Third pass -- filter invalid states, replace interpolated values, convert to ms, constrain angles
for j in range(num_steps):
if not rows[j]["valid"]:
continue
if interp_rows[j] is not None:
rows[j]["sim_time"] = interp_rows[j]["sim_time"]
rows[j]["position_x"] = interp_rows[j]["position_x"]
rows[j]["position_y"] = interp_rows[j]["position_y"]
rows[j]["heading_rad"] = interp_rows[j]["heading_rad"]
rows[j]["speed"] = interp_rows[j]["speed"]
rows[j]["sim_time"] *= 1000.0
rows[j]["heading_rad"] = constrain_angle(rows[j]["heading_rad"])
yield rows[j]
def _encode_tl_state(self, waymo_state) -> SignalLightState:
from smarts.waymo.waymo_open_dataset.protos.map_pb2 import (
TrafficSignalLaneState,
)
if waymo_state == TrafficSignalLaneState.LANE_STATE_STOP:
return SignalLightState.STOP
if waymo_state == TrafficSignalLaneState.LANE_STATE_CAUTION:
return SignalLightState.CAUTION
if waymo_state == TrafficSignalLaneState.LANE_STATE_GO:
return SignalLightState.GO
if waymo_state == TrafficSignalLaneState.LANE_STATE_ARROW_STOP:
return SignalLightState.STOP | SignalLightState.ARROW
if waymo_state == TrafficSignalLaneState.LANE_STATE_ARROW_CAUTION:
return SignalLightState.CAUTION | SignalLightState.ARROW
if waymo_state == TrafficSignalLaneState.LANE_STATE_ARROW_GO:
return SignalLightState.GO | SignalLightState.ARROW
if waymo_state == TrafficSignalLaneState.LANE_STATE_FLASHING_STOP:
return SignalLightState.STOP | SignalLightState.FLASHING
if waymo_state == TrafficSignalLaneState.LANE_STATE_FLASHING_CAUTION:
return SignalLightState.CAUTION | SignalLightState.FLASHING
return SignalLightState.UNKNOWN
@property
def traffic_light_rows(self) -> Generator[Dict, None, None]:
scenario = self._get_scenario()
num_steps = len(scenario.timestamps_seconds)
for i in range(num_steps):
dynamic_states = scenario.dynamic_map_states[i]
sim_time = scenario.timestamps_seconds[i] * 1000
for lane_state in dynamic_states.lane_states:
row = {
"sim_time": sim_time,
"state": self._encode_tl_state(lane_state.state).value,
"stop_point_x": lane_state.stop_point.x,
"stop_point_y": lane_state.stop_point.y,
"lane": lane_state.lane,
}
yield row
@staticmethod
def _lookup_agent_type(agent_type: int) -> int:
if agent_type == 1:
return 2 # car
elif agent_type == 2:
return 4 # pedestrian
elif agent_type == 3:
return 4 # cyclist
else:
return 0 # other
[docs] def column_val_in_row(self, row, col_name: str) -> Any:
return row[col_name]
[docs]class Argoverse(_TrajectoryDataset):
"""A tool for conversion of an Argoverse 2 dataset for use within SMARTS."""
def __init__(self, dataset_spec: Dict[str, Any], output: str):
super().__init__(dataset_spec, output)
@property
def rows(self) -> Generator[Dict, None, None]:
try:
# pytype: disable=import-error
from av2.datasets.motion_forecasting.data_schema import (
ObjectType as AvObjectType,
)
from av2.datasets.motion_forecasting.scenario_serialization import (
load_argoverse_scenario_parquet,
)
# pytype: enable=import-error
except ImportError:
print(
"Missing dependencies for Argoverse. Install them using the command `pip install -e .[argoverse]` at the source directory."
)
ALLOWED_TYPES = frozenset(
{
AvObjectType.VEHICLE,
AvObjectType.PEDESTRIAN,
AvObjectType.MOTORCYCLIST,
AvObjectType.CYCLIST,
AvObjectType.BUS,
}
)
def _lookup_agent_type(agent_type: AvObjectType) -> int:
# See decode_vehicle_type in traffic_history.py
if agent_type == AvObjectType.MOTORCYCLIST:
return 1 # motorcycle
elif agent_type == AvObjectType.VEHICLE:
return 2 # passenger
elif agent_type == AvObjectType.BUS:
return 3 # truck
elif agent_type in {AvObjectType.PEDESTRIAN, AvObjectType.CYCLIST}:
return 4 # pedestrian/bicycle
else:
return 0 # other
input_dir = Path(self._dataset_spec["input_path"])
scenario_id = input_dir.stem
parquet_file = input_dir / f"scenario_{scenario_id}.parquet"
scenario = load_argoverse_scenario_parquet(parquet_file)
# Normalize to start at 0, and convert to milliseconds
timestamps = (scenario.timestamps_ns - scenario.timestamps_ns[0]) * 1e-6
# The ego vehicle has a string ID, so we need to give it a unique int ID
all_ids = [int(t.track_id) for t in scenario.tracks if t.track_id != "AV"]
ego_id = max(all_ids) + 1
for track in scenario.tracks:
# Only use dynamic objects
if track.object_type not in ALLOWED_TYPES:
continue
if track.track_id == "AV":
is_ego = 1
vehicle_id = ego_id
else:
is_ego = 0
vehicle_id = int(track.track_id)
vehicle_type = _lookup_agent_type(track.object_type)
for obj_state in track.object_states:
row = dict()
row["vehicle_id"] = vehicle_id
row["type"] = vehicle_type
row["sim_time"] = timestamps[obj_state.timestep]
row["position_x"] = obj_state.position[0]
row["position_y"] = obj_state.position[1]
row["heading_rad"] = constrain_angle(
(obj_state.heading - math.pi / 2) % (2 * math.pi)
)
row["speed"] = np.linalg.norm(np.array(obj_state.velocity))
row["lane_id"] = 0
row["is_ego_vehicle"] = is_ego
# Dimensions are not present in the Argoverse data. Setting these to 0
# means default values for each vehicle type will be used.
# See TrafficHistory.decode_vehicle_type().
row["length"] = 0
row["height"] = 0
row["width"] = 0
yield row
[docs] def column_val_in_row(self, row, col_name: str) -> Any:
return row[col_name]
[docs]def import_dataset(
dataset_spec: sstypes.TrafficHistoryDataset,
output_path: str,
map_bbox: Optional[BoundingBox] = None,
):
"""called to pre-process (import) a TrafficHistoryDataset for use by SMARTS"""
if not dataset_spec.input_path:
print(f"skipping placeholder dataset spec '{dataset_spec.name}'.")
return
output = os.path.join(output_path, f"{dataset_spec.name}.shf")
if os.path.exists(output):
os.remove(output)
source = dataset_spec.source_type
dataset_dict = dataset_spec.__dict__
if map_bbox:
assert dataset_spec.filter_off_map
dataset_dict["_map_bbox"] = map_bbox
if source == "NGSIM":
dataset = NGSIM(dataset_dict, output)
elif source == "INTERACTION":
dataset = Interaction(dataset_dict, output)
elif source == "Waymo":
dataset = Waymo(dataset_dict, output)
elif source == "Argoverse":
dataset = Argoverse(dataset_dict, output)
else:
raise ValueError(
f"unsupported TrafficHistoryDataset type: {dataset_spec.source_type}"
)
dataset.create_output()
def _check_args(args) -> bool:
if not args.force and os.path.exists(args.output):
print("output file already exists\n")
return False
return True
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--x_offset", help="X offset of map", type=float)
parser.add_argument("--y_offset", help="Y offset of map", type=float)
parser.add_argument(
"--force",
"-f",
help="Force overwriting output file if it already exists",
action="store_true",
)
parser.add_argument(
"dataset",
type=str,
help="""Path to YAML file describing trajectories dataset. YAML file should correspond with types.TrafficHistoryDataset fields.""",
)
parser.add_argument(
"output", type=str, help="SMARTS traffic history file to create"
)
args = parser.parse_args()
if not _check_args(args):
parser.print_usage()
sys.exit(-1)
if args.force and os.path.exists(args.output):
os.remove(args.output)
import yaml
with open(args.dataset, "r") as yf:
dataset_spec = yaml.safe_load(yf)["trajectory_dataset"]
if not dataset_spec.get("input_path"):
print(f"skipping placeholder dataset spec at {args.dataset}.")
sys.exit(0)
if dataset_spec.get("filter_off_map", False) or dataset_spec.get("flip_y", False):
print(
f"cannot use 'filter_off_map' or 'flip_y' as specified in {args.dataset} in command-line usage"
)
sys.exit(-1)
if args.x_offset:
dataset_spec["x_offset"] = args.x_offset
if args.y_offset:
dataset_spec["y_offset"] = args.y_offset
source = dataset_spec.get("source_type", "NGSIM")
if source == "NGSIM":
dataset = NGSIM(dataset_spec, args.output)
elif source == "Waymo":
dataset = Waymo(dataset_spec, args.output)
else:
dataset = Interaction(dataset_spec, args.output)
dataset.create_output()