"""Parser for OpenDRIVE (.xodr) files."""
import abc
from collections import defaultdict
import itertools
import math
import warnings
import xml.etree.ElementTree as ET
import numpy as np
from scipy.integrate import quad, solve_ivp
from scipy.optimize import brentq
from shapely.geometry import GeometryCollection, MultiPoint, MultiPolygon, Point, Polygon
from shapely.ops import snap, unary_union
from scenic.core.geometry import (
averageVectors,
cleanChain,
cleanPolygon,
plotPolygon,
polygonUnion,
removeHoles,
)
from scenic.core.regions import PolygonalRegion, PolylineRegion
from scenic.core.vectors import Vector
from scenic.domains.driving import roads as roadDomain
class OpenDriveWarning(UserWarning):
pass
def warn(message):
warnings.warn(message, OpenDriveWarning, stacklevel=2)
def buffer_union(polys, tolerance=0.01):
return polygonUnion(polys, buf=tolerance, tolerance=tolerance)
[docs]class Poly3:
"""Cubic polynomial."""
def __init__(self, a, b, c, d):
self.a = a
self.b = b
self.c = c
self.d = d
def eval_at(self, x):
return self.a + self.b * x + self.c * x**2 + self.d * x**3
def grad_at(self, x):
return self.b + 2 * self.c * x + 3 * self.d * x**2
[docs]class Curve:
"""Geometric elements which compose road reference lines.
See the OpenDRIVE Format Specification for coordinate system details."""
def __init__(self, x0, y0, hdg, length):
self.x0 = x0
self.y0 = y0
self.hdg = hdg # In radians counterclockwise, 0 at positive x-axis.
self.cos_hdg, self.sin_hdg = math.cos(hdg), math.sin(hdg)
self.length = length
[docs] def to_points(self, num, extra_points=[]):
"""Sample NUM evenly-spaced points from curve.
Points are tuples of (x, y, s) with (x, y) absolute coordinates
and s the arc length along the curve. Additional points at s values in
extra_points are included if they are contained in the curve (unless
they are extremely close to one of the equally-spaced points).
"""
s_vals = []
extras = itertools.chain(extra_points, itertools.repeat(float("inf")))
next_extra = next(extras)
last_s = 0
for s in np.linspace(0, self.length, num=num):
while next_extra <= s:
if last_s + 1e-6 < next_extra < s - 1e-6:
s_vals.append(next_extra)
next_extra = next(extras)
s_vals.append(s)
last_s = s
return [self.point_at(s) for s in s_vals]
[docs] @abc.abstractmethod
def point_at(self, s):
"""Get an (x, y, s) point along the curve at the given s coordinate."""
return
[docs] def rel_to_abs(self, point):
"""Convert from relative coordinates of curve to absolute coordinates.
I.e. rotate counterclockwise by self.hdg and translate by (x0, x1)."""
x, y, s = point
return (
self.x0 + self.cos_hdg * x - self.sin_hdg * y,
self.y0 + self.sin_hdg * x + self.cos_hdg * y,
s,
)
[docs]class Cubic(Curve):
"""A curve defined by the cubic polynomial a + bu + cu^2 + du^3.
The curve starts at (X0, Y0) in direction HDG, with length LENGTH."""
def __init__(self, x0, y0, hdg, length, a, b, c, d):
super().__init__(x0, y0, hdg, length)
self.poly = Poly3(a, b, c, d)
# Crude upper bound for u (used to bracket u for a given s in point_at)
if d != 0:
self.ubound = max(
2 * abs(c / d), abs(b / d) ** 0.5, (3 * length / abs(d)) ** (1 / 3)
)
elif c != 0:
self.ubound = max(abs(b / c), (2 * length / abs(c)) ** 0.5)
else:
self.ubound = length / abs(b)
def arclength(self, u):
d_arc = lambda x: np.sqrt(1 + self.poly.grad_at(x) ** 2)
return quad(d_arc, 0, u)[0]
def point_at(self, s):
# Use Brent's method to find parameter u corresponding to arclength s;
# (N.B. Brent's method proved to be faster than Newton's and has no potential
# convergence issues.)
root_func = lambda x: self.arclength(x) - s
u = float(brentq(root_func, 0, self.ubound))
pt = (s, self.poly.eval_at(u), s)
return self.rel_to_abs(pt)
[docs]class ParamCubic(Curve):
"""A curve defined by the parametric equations
u = a_u + b_up + c_up^2 + d_up^3,
v = a_v + b_vp + c_vp^2 + d_up^3,
with p in [0, p_range].
The curve starts at (X0, Y0) in direction HDG, with length LENGTH."""
def __init__(self, x0, y0, hdg, length, au, bu, cu, du, av, bv, cv, dv, p_range=1):
super().__init__(x0, y0, hdg, length)
self.u_poly = Poly3(au, bu, cu, du)
self.v_poly = Poly3(av, bv, cv, dv)
self.p_range = p_range if p_range else 1
def arclength(self, p):
d_arc = lambda x: math.hypot(self.u_poly.grad_at(x), self.v_poly.grad_at(x))
return quad(d_arc, 0, p)[0]
def point_at(self, s):
root_func = lambda x: self.arclength(x) - s
p = float(brentq(root_func, 0, self.p_range))
pt = (self.u_poly.eval_at(p), self.v_poly.eval_at(p), s)
return self.rel_to_abs(pt)
[docs]class Clothoid(Curve):
"""An Euler spiral with curvature varying linearly between CURV0 and CURV1.
The spiral starts at (X0, Y0) in direction HDG, with length LENGTH."""
def __init__(self, x0, y0, hdg, length, curv0, curv1):
super().__init__(x0, y0, hdg, length)
# Initial and final curvature.
self.curv0 = curv0
self.curv1 = curv1
self.curve_rate = (curv1 - curv0) / length
self.a = abs(curv0)
self.r = 1 / self.a if curv0 != 0 else 1 # value not used if curv0 == 0
self.ode_init = np.array([x0, y0, hdg])
def point_at(self, s):
# Generate a origin-centered clothoid with zero curvature at origin,
# then translate/rotate the relevant segment.
# Arcs are just a degenerate clothoid:
if self.curv0 == self.curv1:
if self.curv0 == 0:
pt = (s, 0, s)
else:
r = self.r
th = s * self.a
if self.curv0 > 0:
pt = (r * math.sin(th), r - r * math.cos(th), s)
else:
pt = (r * math.sin(th), -r + r * math.cos(th), s)
return self.rel_to_abs(pt)
else:
def clothoid_ode(s, state):
x, y, theta = state
return np.array(
[math.cos(theta), math.sin(theta), self.curv0 + (self.curve_rate * s)]
)
sol = solve_ivp(clothoid_ode, (0, s), self.ode_init)
x, y, hdg = sol.y[:, -1]
return (x, y, s)
[docs]class Line(Curve):
"""A line segment between (x0, y0) and (x1, y1)."""
def __init__(self, x0, y0, hdg, length):
super().__init__(x0, y0, hdg, length)
# Endpoints of line.
self.x1 = x0 + length * math.cos(hdg)
self.y1 = y0 + length * math.sin(hdg)
def point_at(self, s):
return self.rel_to_abs((s, 0, s))
class Lane:
def __init__(self, id_, type_, pred=None, succ=None):
self.id_ = id_
self.width = [] # List of tuples (Poly3, int) for width and s-offset.
self.type_ = type_
self.pred = pred
self.succ = succ
self.left_bounds = [] # to be filled in later
self.right_bounds = []
self.centerline = []
self.parent_lane_poly = None
def width_at(self, s):
# S here is relative to start of LaneSection this lane is in.
ind = 0
while ind + 1 < len(self.width) and self.width[ind + 1][1] <= s:
ind += 1
assert self.width[ind][1] <= s, "No matching width entry found."
w_poly, s_off = self.width[ind]
w = w_poly.eval_at(s - s_off)
if w < -1e-6: # allow for numerical error
raise RuntimeError("OpenDRIVE lane has negative width")
return max(w, 0)
class LaneSection:
def __init__(self, s0, left_lanes={}, right_lanes={}):
self.s0 = s0
self.left_lanes = left_lanes
self.right_lanes = right_lanes
self.left_lane_ids = sorted(self.left_lanes.keys())
self.right_lane_ids = sorted(self.right_lanes.keys(), reverse=True)
self.lanes = dict(list(left_lanes.items()) + list(right_lanes.items()))
def get_lane(self, id_):
if id_ in self.left_lanes:
return self.left_lanes[id_]
elif id_ in self.right_lanes:
return self.right_lanes[id_]
elif id_ == 0:
return Lane(0, "none")
else:
raise RuntimeError("Lane with id", id_, "not found")
def get_offsets(self, s):
"""Returns dict of lane id and offset from
reference line of lane boundary at coordinate S along line.
By convention, left lanes have positive width offset and right lanes
have negative."""
assert s >= self.s0, "Input s is before lane start position."
offsets = {}
for lane_id in self.left_lane_ids:
if lane_id - 1 in self.left_lane_ids:
offsets[lane_id] = offsets[lane_id - 1] + self.left_lanes[
lane_id
].width_at(s - self.s0)
else:
offsets[lane_id] = self.left_lanes[lane_id].width_at(s - self.s0)
for lane_id in self.right_lane_ids:
if lane_id + 1 in self.right_lane_ids:
offsets[lane_id] = offsets[lane_id + 1] - self.right_lanes[
lane_id
].width_at(s - self.s0)
else:
offsets[lane_id] = -self.right_lanes[lane_id].width_at(s - self.s0)
return offsets
[docs]class RoadLink:
"""Indicates Roads a and b, with ids id_a and id_b respectively, are connected."""
def __init__(self, id_a, id_b, contact_a, contact_b):
self.id_a = id_a
self.id_b = id_b
# contact_a and contact_b should be of value "start" or "end"
# and indicate which end of each road is connected to the other.
self.contact_a = contact_a
self.contact_b = contact_b
class Junction:
class Connection:
def __init__(self, incoming_id, connecting_id, connecting_contact, lane_links):
self.incoming_id = incoming_id
# id of connecting road
self.connecting_id = connecting_id
# contact point ('start' or 'end') on connecting road
self.connecting_contact = connecting_contact
# dict mapping incoming to connecting lane ids (empty = identity mapping)
self.lane_links = lane_links
def __init__(self, id_, name):
self.id_ = id_
self.name = name
self.connections = []
# Ids of roads that are paths within junction:
self.paths = []
self.poly = None
def add_connection(self, incoming_id, connecting_id, connecting_contact, lane_links):
conn = Junction.Connection(
incoming_id, connecting_id, connecting_contact, lane_links
)
self.connections.append(conn)
class Road:
def __init__(self, name, id_, length, junction, drive_on_right=True):
self.name = name
self.id_ = id_
self.length = length
self.junction = junction if junction != "-1" else None
self.predecessor = None
self.successor = None
self.signals = [] # List of Signal objects.
self.lane_secs = [] # List of LaneSection objects.
self.ref_line = [] # List of Curve objects defining reference line.
# NOTE: sec_points, sec_polys, sec_lane_polys should be ordered according to lane_secs.
self.sec_points = [] # List of lists of points, one for each LaneSection.
self.sec_polys = [] # List of Polygons, one for each LaneSections.
# List of dict of lane id to Polygon for each LaneSection.
self.sec_lane_polys = []
# List of lane polygons. Not a dict b/c lane id is not unique along road.
self.lane_polys = []
# Each polygon in lane_polys is the union of connected lane section polygons.
# lane_polys is currently not used.
# Reference line offset:
self.offset = [] # List of tuple (Poly3, s-coordinate).
self.drive_on_right = drive_on_right
# Used to fill in gaps between roads:
self.start_bounds_left = {}
self.start_bounds_right = {}
self.end_bounds_left = {}
self.end_bounds_right = {}
self.remappedStartLanes = None # hack for handling spurious initial lane sections
def get_ref_line_offset(self, s):
if not self.offset:
return 0
ind = 0
while ind + 1 < len(self.offset) and self.offset[ind + 1][1] <= s:
ind += 1
poly, s0 = self.offset[ind]
assert s >= s0
return poly.eval_at(s - s0)
def get_ref_points(self, num):
"""Returns list of list of points for each piece of ref_line.
List of list structure necessary because each piece needs to be
constructed into Polygon separately then unioned afterwards to avoid
self-intersecting lines."""
ref_points = []
transition_points = [sec.s0 for sec in self.lane_secs[1:]]
last_s = 0
for piece in self.ref_line:
piece_points = piece.to_points(num, extra_points=transition_points)
assert piece_points, "Failed to get piece points"
if ref_points:
last_s = ref_points[-1][-1][2]
piece_points = [(p[0], p[1], p[2] + last_s) for p in piece_points]
ref_points.append(piece_points)
transition_points = [s - last_s for s in transition_points if s > last_s]
return ref_points
def heading_at(self, point):
# Convert point to shapely Point.
point = Point(point.x, point.y)
for i in range(len(self.lane_secs)):
ref_points = self.sec_points[i]
poly = self.sec_polys[i]
if point.within(poly.buffer(1)):
lane_id = None
for id_ in self.sec_lane_polys[i].keys():
if point.within(self.sec_lane_polys[i][id_].buffer(1)):
lane_id = id_
break
assert lane_id is not None, "Point not found in sec_lane_polys."
min_dist = float("inf")
for i in range(len(ref_points)):
cur_point = Point(ref_points[i][0], ref_points[i][1])
if point.distance(cur_point) < min_dist:
closest_idx = i
if closest_idx >= len(ref_points) - 1:
closest_idx = len(ref_points) - 2
dy = ref_points[closest_idx + 1][1] - ref_points[closest_idx][1]
dx = ref_points[closest_idx + 1][0] - ref_points[closest_idx][0]
heading = math.atan2(dy, dx)
# Right lanes have negative lane_id.
# Flip heading if drive_on_right XOR right lane.
if self.drive_on_right != (lane_id < 0):
heading += math.pi
# Heading 0 is defined differently between OpenDrive and Scenic(?)
heading -= math.pi / 2
return (heading + math.pi) % (2 * math.pi) - math.pi
raise RuntimeError("Point not found in piece_polys")
def calc_geometry_for_type(self, lane_types, num, tolerance, calc_gap=False):
"""Given a list of lane types, returns a tuple of:
- List of lists of points along the reference line, with same indexing as self.lane_secs
- List of region polygons, with same indexing as self.lane_secs
- List of dictionary of lane id to polygon, with same indexing as self.lane_secs
- List of polygons for each lane (not necessarily by id, but respecting lane successor/predecessor)
- Polygon for entire region.
If calc_gap=True, fills in gaps between connected roads. This is fairly expensive.
"""
road_polygons = []
ref_points = self.get_ref_points(num)
self.ref_line_points = list(itertools.chain.from_iterable(ref_points))
cur_lane_polys = {}
sec_points = []
sec_polys = []
sec_lane_polys = []
lane_polys = []
last_lefts = None
last_rights = None
cur_p = None
for i in range(len(self.lane_secs)):
cur_sec = self.lane_secs[i]
cur_sec_points = []
if i < len(self.lane_secs) - 1:
next_sec = self.lane_secs[i + 1]
s_stop = next_sec.s0
else:
s_stop = float("inf")
left_bounds = defaultdict(list)
right_bounds = defaultdict(list)
cur_sec_lane_polys = defaultdict(list)
cur_sec_polys = []
end_of_sec = False
while ref_points and not end_of_sec:
if not ref_points[0]:
ref_points.pop(0)
if not ref_points or (cur_p and cur_p[2] >= s_stop):
# Case 1: We have processed the entire reference line.
# Case 2: The s-coordinate has exceeded s_stop, so we should move
# onto the next LaneSection.
# Either way, we collect all the bound points so far into polygons.
end_of_sec = True
cur_last_lefts = {}
cur_last_rights = {}
for id_ in left_bounds:
# Polygon for piece of lane:
left = left_bounds[id_]
right = right_bounds[id_][::-1]
bounds = left + right
if len(bounds) < 3:
continue
poly = cleanPolygon(Polygon(bounds), tolerance)
if not poly.is_empty:
if poly.geom_type == "MultiPolygon":
poly = MultiPolygon(
[
p
for p in poly.geoms
if not p.is_empty and p.exterior
]
)
cur_sec_polys.extend(poly.geoms)
else:
cur_sec_polys.append(poly)
cur_sec_lane_polys[id_].append(poly)
cur_last_lefts[id_] = left_bounds[id_][-1]
cur_last_rights[id_] = right_bounds[id_][-1]
if i == 0 or not self.start_bounds_left:
self.start_bounds_left[id_] = left_bounds[id_][0]
self.start_bounds_right[id_] = right_bounds[id_][0]
left_bounds = defaultdict(list)
right_bounds = defaultdict(list)
if cur_last_lefts and cur_last_rights:
last_lefts = cur_last_lefts
last_rights = cur_last_rights
else:
cur_p = ref_points[0][0]
cur_sec_points.append(cur_p)
s = min(max(cur_p[2], cur_sec.s0), s_stop - 1e-6)
offsets = cur_sec.get_offsets(s)
offsets[0] = 0
for id_ in offsets:
offsets[id_] += self.get_ref_line_offset(s)
if len(ref_points[0]) > 1:
next_p = ref_points[0][1]
tan_vec = (next_p[0] - cur_p[0], next_p[1] - cur_p[1])
else:
if len(cur_sec_points) >= 2:
prev_p = cur_sec_points[-2]
else:
assert len(sec_points) > 0
if sec_points[-1]:
assert sec_points[-1][-1] == cur_p
prev_p = sec_points[-1][-2]
else:
prev_p = sec_points[-2][-2]
tan_vec = (cur_p[0] - prev_p[0], cur_p[1] - prev_p[1])
tan_norm = math.hypot(tan_vec[0], tan_vec[1])
assert tan_norm > 1e-10
normal_vec = (-tan_vec[1] / tan_norm, tan_vec[0] / tan_norm)
if cur_p[2] < s_stop:
# if at end of section, keep current point to be included in
# the next section as well; otherwise remove it
ref_points[0].pop(0)
elif len(ref_points[0]) == 1 and len(ref_points) > 1:
# also get rid of point if this is the last point of the current geometry and
# and there is another geometry following
ref_points[0].pop(0)
for id_ in offsets:
lane = cur_sec.get_lane(id_)
if lane.type_ in lane_types:
if id_ > 0:
prev_id = id_ - 1
else:
prev_id = id_ + 1
left_bound = [
cur_p[0] + normal_vec[0] * offsets[id_],
cur_p[1] + normal_vec[1] * offsets[id_],
]
right_bound = [
cur_p[0] + normal_vec[0] * offsets[prev_id],
cur_p[1] + normal_vec[1] * offsets[prev_id],
]
if id_ < 0:
left_bound, right_bound = right_bound, left_bound
halfway = (offsets[id_] + offsets[prev_id]) / 2
centerline = [
cur_p[0] + normal_vec[0] * halfway,
cur_p[1] + normal_vec[1] * halfway,
]
left_bounds[id_].append(left_bound)
right_bounds[id_].append(right_bound)
lane.left_bounds.append(left_bound)
lane.right_bounds.append(right_bound)
lane.centerline.append(centerline)
assert len(cur_sec_points) >= 2, i
sec_points.append(cur_sec_points)
sec_polys.append(buffer_union(cur_sec_polys, tolerance=tolerance))
for id_ in cur_sec_lane_polys:
poly = buffer_union(cur_sec_lane_polys[id_], tolerance=tolerance)
cur_sec_lane_polys[id_] = poly
cur_sec.get_lane(id_).poly = poly
sec_lane_polys.append(dict(cur_sec_lane_polys))
next_lane_polys = {}
for id_ in cur_sec_lane_polys:
pred_id = cur_sec.get_lane(id_).pred
if pred_id and pred_id in cur_lane_polys:
next_lane_polys[id_] = cur_lane_polys.pop(pred_id) + [
cur_sec_lane_polys[id_]
]
else:
next_lane_polys[id_] = [cur_sec_lane_polys[id_]]
for id_ in cur_lane_polys:
poly = buffer_union(cur_lane_polys[id_], tolerance=tolerance)
self.lane_secs[i - 1].get_lane(id_).parent_lane_poly = len(lane_polys)
lane_polys.append(poly)
cur_lane_polys = next_lane_polys
for id_ in cur_lane_polys:
poly = buffer_union(cur_lane_polys[id_], tolerance=tolerance)
cur_sec.get_lane(id_).parent_lane_poly = len(lane_polys)
lane_polys.append(poly)
union_poly = buffer_union(sec_polys, tolerance=tolerance)
if last_lefts and last_rights:
self.end_bounds_left.update(last_lefts)
self.end_bounds_right.update(last_rights)
# Difference and slightly erode all overlapping polygons
for i in range(len(sec_polys) - 1):
if sec_polys[i].overlaps(sec_polys[i + 1]):
sec_polys[i] = sec_polys[i].difference(sec_polys[i + 1]).buffer(-1e-6)
assert not sec_polys[i].overlaps(sec_polys[i + 1])
for polys in sec_lane_polys:
ids = sorted(polys) # order adjacent lanes consecutively
for i in range(len(ids) - 1):
polyA, polyB = polys[ids[i]], polys[ids[i + 1]]
if polyA.overlaps(polyB):
polys[ids[i]] = polyA.difference(polyB).buffer(-1e-6)
assert not polys[ids[i]].overlaps(polyB)
for i in range(len(lane_polys) - 1):
if lane_polys[i].overlaps(lane_polys[i + 1]):
lane_polys[i] = lane_polys[i].difference(lane_polys[i + 1]).buffer(-1e-6)
assert not lane_polys[i].overlaps(lane_polys[i + 1])
# Set parent lane polygon references to corrected polygons
for sec in self.lane_secs:
for lane in sec.lanes.values():
parentIndex = lane.parent_lane_poly
if isinstance(parentIndex, int):
lane.parent_lane_poly = lane_polys[parentIndex]
return (sec_points, sec_polys, sec_lane_polys, lane_polys, union_poly)
def calculate_geometry(
self,
num,
tolerance,
calc_gap,
drivable_lane_types,
sidewalk_lane_types,
shoulder_lane_types,
):
# Note: this also calculates self.start_bounds_left, self.start_bounds_right,
# self.end_bounds_left, self.end_bounds_right
(
self.sec_points,
self.sec_polys,
self.sec_lane_polys,
self.lane_polys,
self.drivable_region,
) = self.calc_geometry_for_type(
drivable_lane_types, num, tolerance, calc_gap=calc_gap
)
for i, sec in enumerate(self.lane_secs):
sec.drivable_lanes = {}
sec.sidewalk_lanes = {}
sec.shoulder_lanes = {}
for id_, lane in sec.lanes.items():
ty = lane.type_
if ty in drivable_lane_types:
sec.drivable_lanes[id_] = lane
elif ty in sidewalk_lane_types:
sec.sidewalk_lanes[id_] = lane
elif ty in shoulder_lane_types:
sec.shoulder_lanes[id_] = lane
if not sec.drivable_lanes:
continue
rightmost = None
for id_ in itertools.chain(reversed(sec.right_lane_ids), sec.left_lane_ids):
if id_ in sec.drivable_lanes:
rightmost = sec.lanes[id_]
break
assert rightmost is not None, i
leftmost = None
for id_ in itertools.chain(reversed(sec.left_lane_ids), sec.right_lane_ids):
if id_ in sec.drivable_lanes:
leftmost = sec.lanes[id_]
break
assert leftmost is not None, i
sec.left_edge = leftmost.left_bounds
assert len(sec.left_edge) >= 2
sec.right_edge = rightmost.right_bounds
assert len(sec.right_edge) >= 2
_, _, _, _, self.sidewalk_region = self.calc_geometry_for_type(
sidewalk_lane_types, num, tolerance, calc_gap=calc_gap
)
_, _, _, _, self.shoulder_region = self.calc_geometry_for_type(
shoulder_lane_types, num, tolerance, calc_gap=calc_gap
)
def toScenicRoad(self, tolerance):
assert self.sec_points
allElements = []
# Create lane and road sections
roadSections = []
last_section = None
sidewalkSections = defaultdict(list)
shoulderSections = defaultdict(list)
for sec, pts, sec_poly, lane_polys in zip(
self.lane_secs, self.sec_points, self.sec_polys, self.sec_lane_polys
):
pts = [pt[:2] for pt in pts] # drop s coordinate
assert sec.drivable_lanes
laneSections = {}
for id_, lane in sec.drivable_lanes.items():
succ = None # will set this later
if last_section and lane.pred:
if lane.pred in last_section.lanesByOpenDriveID:
pred = last_section.lanesByOpenDriveID[lane.pred]
else:
warn(
f"road {self.id_} section {len(roadSections)} "
f"lane {id_} has a non-drivable predecessor"
)
pred = None
else:
pred = lane.pred # will correct inter-road links later
left, center, right = lane.left_bounds, lane.centerline, lane.right_bounds
if id_ > 0: # backward lane
left, center, right = right[::-1], center[::-1], left[::-1]
succ, pred = pred, succ
section = roadDomain.LaneSection(
id=f"road{self.id_}_sec{len(roadSections)}_lane{id_}",
polygon=lane_polys[id_],
centerline=PolylineRegion(cleanChain(center)),
leftEdge=PolylineRegion(cleanChain(left)),
rightEdge=PolylineRegion(cleanChain(right)),
successor=succ,
predecessor=pred,
lane=None, # will set these later
group=None,
road=None,
openDriveID=id_,
isForward=id_ < 0,
)
section._original_lane = lane
laneSections[id_] = section
allElements.append(section)
section = roadDomain.RoadSection(
id=f"road{self.id_}_sec{len(roadSections)}",
polygon=sec_poly,
centerline=PolylineRegion(cleanChain(pts)),
leftEdge=PolylineRegion(cleanChain(sec.left_edge)),
rightEdge=PolylineRegion(cleanChain(sec.right_edge)),
successor=None,
predecessor=last_section,
road=None, # will set later
lanesByOpenDriveID=laneSections,
)
roadSections.append(section)
allElements.append(section)
last_section = section
for id_, lane in sec.sidewalk_lanes.items():
sidewalkSections[id_].append(lane)
for id_, lane in sec.shoulder_lanes.items():
shoulderSections[id_].append(lane)
# Build sidewalks and shoulders
# TODO improve this!
forwardSidewalks, backwardSidewalks = [], []
forwardShoulders, backwardShoulders = [], []
for id_ in sidewalkSections:
(forwardSidewalks if id_ < 0 else backwardSidewalks).append(id_)
for id_ in shoulderSections:
(forwardShoulders if id_ < 0 else backwardShoulders).append(id_)
def combineSections(laneIDs, sections, name):
leftmost, rightmost = max(laneIDs), min(laneIDs)
if len(laneIDs) != leftmost - rightmost + 1:
warn(f"ignoring {name} in the middle of road {self.id_}")
leftPoints, rightPoints = [], []
if leftmost < 0:
leftmost = rightmost
while leftmost + 1 in laneIDs:
leftmost = leftmost + 1
leftSecs, rightSecs = sections[leftmost], sections[rightmost]
for leftSec, rightSec in zip(leftSecs, rightSecs):
leftPoints.extend(leftSec.left_bounds)
rightPoints.extend(rightSec.right_bounds)
else:
rightmost = leftmost
while rightmost - 1 in laneIDs:
rightmost = rightmost - 1
leftSecs = reversed(sections[leftmost])
rightSecs = reversed(sections[rightmost])
for leftSec, rightSec in zip(leftSecs, rightSecs):
leftPoints.extend(reversed(rightSec.right_bounds))
rightPoints.extend(reversed(leftSec.left_bounds))
leftEdge = PolylineRegion(cleanChain(leftPoints))
rightEdge = PolylineRegion(cleanChain(rightPoints))
# Heuristically create some kind of reasonable centerline
if len(leftPoints) == len(rightPoints):
centerPoints = list(
averageVectors(l, r) for l, r in zip(leftPoints, rightPoints)
)
else:
num = max(len(leftPoints), len(rightPoints))
centerPoints = []
for d in np.linspace(0, 1, num):
l = leftEdge.lineString.interpolate(d, normalized=True)
r = rightEdge.lineString.interpolate(d, normalized=True)
centerPoints.append(averageVectors(l.coords[0], r.coords[0]))
centerline = PolylineRegion(cleanChain(centerPoints))
allPolys = (
sec.poly
for id_ in range(rightmost, leftmost + 1)
for sec in sections[id_]
)
union = buffer_union(allPolys, tolerance=tolerance)
id_ = f"road{self.id_}_{name}({leftmost},{rightmost})"
return id_, union, centerline, leftEdge, rightEdge
def makeSidewalk(laneIDs):
if not laneIDs:
return None
id_, union, centerline, leftEdge, rightEdge = combineSections(
laneIDs, sidewalkSections, "sidewalk"
)
sidewalk = roadDomain.Sidewalk(
id=id_,
polygon=union,
centerline=centerline,
leftEdge=leftEdge,
rightEdge=rightEdge,
road=None,
crossings=(), # TODO add crosswalks
)
allElements.append(sidewalk)
return sidewalk
forwardSidewalk = makeSidewalk(forwardSidewalks)
backwardSidewalk = makeSidewalk(backwardSidewalks)
def makeShoulder(laneIDs):
if not laneIDs:
return None
id_, union, centerline, leftEdge, rightEdge = combineSections(
laneIDs, shoulderSections, "shoulder"
)
shoulder = roadDomain.Shoulder(
id=id_,
polygon=union,
centerline=centerline,
leftEdge=leftEdge,
rightEdge=rightEdge,
road=None,
)
allElements.append(shoulder)
return shoulder
forwardShoulder = makeShoulder(forwardShoulders)
backwardShoulder = makeShoulder(backwardShoulders)
# Connect sections to their successors
next_section = None
for sec, section in reversed(list(zip(self.lane_secs, roadSections))):
if next_section is None:
next_section = section
for id_, lane in sec.drivable_lanes.items():
newLane = section.lanesByOpenDriveID[id_]
if newLane.isForward:
# will correct inter-road links later
newLane._successor = lane.succ
else:
newLane._predecessor = lane.succ
continue
section._successor = next_section
for id_, lane in sec.drivable_lanes.items():
newLane = section.lanesByOpenDriveID[id_]
if newLane.isForward:
newLane._successor = next_section.lanesByOpenDriveID.get(lane.succ)
else:
newLane._predecessor = next_section.lanesByOpenDriveID.get(lane.succ)
next_section = section
# Connect lane sections to adjacent lane sections
for section in roadSections:
lanes = section.lanesByOpenDriveID
for id_, lane in lanes.items():
if id_ < -1:
leftID = id_ + 1
elif id_ == -1:
leftID = 1
elif id_ == 1:
leftID = -1
else:
leftID = id_ - 1
rightID = id_ - 1 if id_ < 0 else id_ + 1
lane._laneToLeft = lanes.get(leftID)
lane._laneToRight = lanes.get(rightID)
if self.drive_on_right:
lane._fasterLane = lane._laneToLeft
lane._slowerLane = lane._laneToRight
else:
lane._slowerLane = lane._laneToLeft
lane._fasterLane = lane._laneToRight
if lane._fasterLane and lane._fasterLane.isForward != lane.isForward:
lane._fasterLane = None
if lane._slowerLane and lane._slowerLane.isForward != lane.isForward:
lane._slowerLane = None
adj = []
if lane._laneToLeft:
adj.append(lane._laneToLeft)
if lane._laneToRight:
adj.append(lane._laneToRight)
lane.adjacentLanes = tuple(adj)
# Gather lane sections into lanes
nextID = 0
forwardLanes, backwardLanes = [], []
for roadSection in roadSections:
for laneSection in roadSection.lanes:
laneSection._visited = False
for roadSection, sec in zip(roadSections, self.lane_secs):
for laneSection in roadSection.lanes:
if not laneSection._visited: # start of new lane
forward = laneSection.isForward
sections = []
successorLane = None # lane this one will merge into
while True:
sections.append(laneSection)
laneSection._visited = True
assert laneSection.isForward == forward
if forward:
nextSection = laneSection._successor
else:
nextSection = laneSection._predecessor
if not nextSection or not isinstance(
nextSection, roadDomain.LaneSection
):
break
elif nextSection._visited:
successorLane = nextSection.lane
break
laneSection = nextSection
ls = laneSection._original_lane
assert ls.parent_lane_poly
if not forward:
sections = tuple(reversed(sections))
leftPoints, rightPoints, centerPoints = [], [], []
for section in sections:
leftPoints.extend(section.leftEdge.points)
rightPoints.extend(section.rightEdge.points)
centerPoints.extend(section.centerline.points)
leftEdge = PolylineRegion(cleanChain(leftPoints))
rightEdge = PolylineRegion(cleanChain(rightPoints))
centerline = PolylineRegion(cleanChain(centerPoints))
lane = roadDomain.Lane(
id=f"road{self.id_}_lane{nextID}",
polygon=ls.parent_lane_poly,
centerline=centerline,
leftEdge=leftEdge,
rightEdge=rightEdge,
group=None,
road=None,
sections=tuple(sections),
successor=successorLane, # will correct inter-road links later
)
nextID += 1
for section in sections:
section.lane = lane
(forwardLanes if forward else backwardLanes).append(lane)
allElements.append(lane)
lanes = forwardLanes + backwardLanes
assert lanes
# Compute lane adjacencies
for lane in lanes:
adj = []
for section in lane.sections:
adj.extend(sec.lane for sec in section.adjacentLanes)
lane.adjacentLanes = tuple(adj)
# Create lane groups
def getEdges(forward):
if forward:
sec = roadSections[0]
startLanes = sec.forwardLanes
else:
sec = roadSections[-1]
startLanes = sec.backwardLanes
leftPoints = []
current = startLanes[-1] # get leftmost lane of the first section
while current and isinstance(current, roadDomain.LaneSection):
if current._laneToLeft and current._laneToLeft.isForward == forward:
current = current._laneToLeft
leftPoints.extend(current.leftEdge.points)
current = current._successor
leftEdge = PolylineRegion(cleanChain(leftPoints))
rightPoints = []
current = startLanes[0] # get rightmost lane of the first section
while current and isinstance(current, roadDomain.LaneSection):
if current._laneToRight and current._laneToRight.isForward == forward:
current = current._laneToRight
rightPoints.extend(current.rightEdge.points)
current = current._successor
rightEdge = PolylineRegion(cleanChain(rightPoints))
middleLane = startLanes[len(startLanes) // 2].lane # rather arbitrary
return leftEdge, middleLane.centerline, rightEdge
if forwardLanes:
leftEdge, centerline, rightEdge = getEdges(forward=True)
forwardGroup = roadDomain.LaneGroup(
id=f"road{self.id_}_forward",
polygon=buffer_union(
(lane.polygon for lane in forwardLanes), tolerance=tolerance
),
centerline=centerline,
leftEdge=leftEdge,
rightEdge=rightEdge,
road=None,
lanes=tuple(forwardLanes),
curb=(forwardShoulder.rightEdge if forwardShoulder else rightEdge),
sidewalk=forwardSidewalk,
bikeLane=None,
shoulder=forwardShoulder,
opposite=None,
)
allElements.append(forwardGroup)
else:
forwardGroup = None
if backwardLanes:
leftEdge, centerline, rightEdge = getEdges(forward=False)
backwardGroup = roadDomain.LaneGroup(
id=f"road{self.id_}_backward",
polygon=buffer_union(
(lane.polygon for lane in backwardLanes), tolerance=tolerance
),
centerline=centerline,
leftEdge=leftEdge,
rightEdge=rightEdge,
road=None,
lanes=tuple(backwardLanes),
curb=(backwardShoulder.rightEdge if backwardShoulder else rightEdge),
sidewalk=backwardSidewalk,
bikeLane=None,
shoulder=backwardShoulder,
opposite=forwardGroup,
)
allElements.append(backwardGroup)
if forwardGroup:
forwardGroup._opposite = backwardGroup
else:
backwardGroup = None
# Create signal
roadSignals = []
for i, signal_ in enumerate(self.signals):
signal = roadDomain.Signal(
uid=f"signal{signal_.id_}_{self.id_}_{i}",
openDriveID=signal_.id_,
country=signal_.country,
type=signal_.type_,
)
roadSignals.append(signal)
# Create road
assert forwardGroup or backwardGroup
if forwardGroup:
rightEdge = forwardGroup.rightEdge
else:
rightEdge = backwardGroup.leftEdge
if backwardGroup:
leftEdge = backwardGroup.rightEdge
else:
leftEdge = forwardGroup.leftEdge
centerline = PolylineRegion(tuple(pt[:2] for pt in self.ref_line_points))
road = roadDomain.Road(
name=self.name,
uid=f"road{self.id_}", # need prefix to prevent collisions with intersections
id=self.id_,
polygon=self.drivable_region,
centerline=centerline,
leftEdge=leftEdge,
rightEdge=rightEdge,
lanes=lanes,
forwardLanes=forwardGroup,
backwardLanes=backwardGroup,
sections=roadSections,
signals=tuple(roadSignals),
crossings=(), # TODO add these!
)
allElements.append(road)
# Set up parent references
if forwardGroup:
forwardGroup.road = road
if forwardGroup._sidewalk:
forwardGroup._sidewalk.road = road
if forwardGroup._shoulder:
forwardGroup._shoulder.road = road
forwardGroup._shoulder.group = forwardGroup
if backwardGroup:
backwardGroup.road = road
if backwardGroup._sidewalk:
backwardGroup._sidewalk.road = road
if backwardGroup._shoulder:
backwardGroup._shoulder.road = road
backwardGroup._shoulder.group = backwardGroup
for section in roadSections:
section.road = road
for lane in forwardLanes:
lane.group = forwardGroup
lane.road = road
for sec in lane.sections:
sec.group = forwardGroup
sec.road = road
del sec._original_lane
for lane in backwardLanes:
lane.group = backwardGroup
lane.road = road
for sec in lane.sections:
sec.group = backwardGroup
sec.road = road
del sec._original_lane
return road, allElements
[docs]class Signal:
"""Traffic lights, stop signs, etc."""
def __init__(self, id_, country, type_, subtype, orientation, validity=None):
self.id_ = id_
self.country = country
self.type_ = type_
self.subtype = subtype
self.orientation = orientation
self.validity = validity
def is_valid(self):
return self.validity is None or self.validity != [0, 0]
class SignalReference:
def __init__(self, id_, orientation, validity=None):
self.id_ = id_
self.validity = validity
self.orientation = orientation
def is_valid(self):
return self.validity is None or self.validity != [0, 0]
class RoadMap:
defaultTolerance = 0.05
def __init__(
self,
tolerance=None,
fill_intersections=True,
drivable_lane_types=(
"driving",
"entry",
"exit",
"offRamp",
"onRamp",
"connectingRamp",
),
sidewalk_lane_types=("sidewalk",),
shoulder_lane_types=("shoulder", "parking", "stop", "border"),
elide_short_roads=False,
):
self.tolerance = self.defaultTolerance if tolerance is None else tolerance
self.roads = {}
self.road_links = []
self.junctions = {}
self.sec_lane_polys = []
self.lane_polys = []
self.intersection_region = None
self.fill_intersections = fill_intersections
self.drivable_lane_types = drivable_lane_types
self.sidewalk_lane_types = sidewalk_lane_types
self.shoulder_lane_types = shoulder_lane_types
self.elide_short_roads = elide_short_roads
def calculate_geometry(self, num, calc_gap=False, calc_intersect=True):
# If calc_gap=True, fills in gaps between connected roads.
# If calc_intersect=True, calculates intersection regions.
# These are fairly expensive.
for road in self.roads.values():
road.calculate_geometry(
num,
calc_gap=calc_gap,
tolerance=self.tolerance,
drivable_lane_types=self.drivable_lane_types,
sidewalk_lane_types=self.sidewalk_lane_types,
shoulder_lane_types=self.shoulder_lane_types,
)
self.sec_lane_polys.extend(road.sec_lane_polys)
self.lane_polys.extend(road.lane_polys)
if calc_gap:
drivable_polys = []
sidewalk_polys = []
shoulder_polys = []
for road in self.roads.values():
drivable_poly = road.drivable_region
sidewalk_poly = road.sidewalk_region
shoulder_poly = road.shoulder_region
if not (drivable_poly is None or drivable_poly.is_empty):
drivable_polys.append(drivable_poly)
if not (sidewalk_poly is None or sidewalk_poly.is_empty):
sidewalk_polys.append(sidewalk_poly)
if not (shoulder_poly is None or shoulder_poly.is_empty):
shoulder_polys.append(shoulder_poly)
for link in self.road_links:
road_a = self.roads[link.id_a]
road_b = self.roads[link.id_b]
assert link.contact_a in ["start", "end"], "Invalid link record."
assert link.contact_b in ["start", "end"], "Invalid link record."
if link.contact_a == "start":
a_sec = road_a.lane_secs[0]
a_bounds_left = road_a.start_bounds_left
a_bounds_right = road_a.start_bounds_right
else:
a_sec = road_a.lane_secs[-1]
a_bounds_left = road_a.end_bounds_left
a_bounds_right = road_a.end_bounds_right
if link.contact_b == "start":
b_bounds_left = road_b.start_bounds_left
b_bounds_right = road_b.start_bounds_right
else:
b_bounds_left = road_b.end_bounds_left
b_bounds_right = road_b.end_bounds_right
for id_, lane in a_sec.lanes.items():
if link.contact_a == "start":
other_id = lane.pred
else:
other_id = lane.succ
if other_id not in b_bounds_left or other_id not in b_bounds_right:
continue
if id_ not in a_bounds_left or id_ not in a_bounds_right:
continue
gap_poly = MultiPoint(
[
a_bounds_left[id_],
a_bounds_right[id_],
b_bounds_left[other_id],
b_bounds_right[other_id],
]
).convex_hull
if not gap_poly.is_valid:
continue
if gap_poly.geom_type == "Polygon" and not gap_poly.is_empty:
if lane.type_ in self.drivable_lane_types:
drivable_polys.append(gap_poly)
elif lane.type_ in self.sidewalk_lane_types:
sidewalk_polys.append(gap_poly)
elif lane.type_ in self.shoulder_lane_types:
shoulder_polys.append(gap_poly)
else:
drivable_polys = [road.drivable_region for road in self.roads.values()]
sidewalk_polys = [road.sidewalk_region for road in self.roads.values()]
shoulder_polys = [road.shoulder_region for road in self.roads.values()]
self.drivable_region = buffer_union(drivable_polys, tolerance=self.tolerance)
self.sidewalk_region = buffer_union(sidewalk_polys, tolerance=self.tolerance)
self.shoulder_region = buffer_union(shoulder_polys, tolerance=self.tolerance)
if calc_intersect:
self.calculate_intersections()
def calculate_intersections(self):
intersect_polys = []
for junc in self.junctions.values():
junc_polys = [self.roads[i].drivable_region for i in junc.paths]
assert junc_polys, junc
union = buffer_union(junc_polys, tolerance=self.tolerance)
if self.fill_intersections:
union = removeHoles(union)
assert union.is_valid
junc.poly = union
intersect_polys.append(union)
self.intersection_region = buffer_union(intersect_polys, tolerance=self.tolerance)
def heading_at(self, point):
"""Return the road heading at point."""
# Convert point to shapely Point.
point = Point(point.x, point.y)
for road in self.roads.values():
if point.within(road.drivable_region.buffer(1)):
return road.heading_at(point)
# raise RuntimeError('Point not in RoadMap: ', point)
return 0
def __parse_lanes(self, lanes_elem):
"""Lanes_elem should be <left> or <right> element.
Returns dict of lane ids and Lane objects."""
lanes = {}
for l in lanes_elem.iter("lane"):
id_ = int(l.get("id"))
type_ = l.get("type")
link = l.find("link")
pred = None
succ = None
if link is not None:
pred_elem = link.find("predecessor")
succ_elem = link.find("successor")
if pred_elem is not None:
pred = int(pred_elem.get("id"))
if succ_elem is not None:
succ = int(succ_elem.get("id"))
lane = Lane(id_, type_, pred, succ)
for w in l.iter("width"):
w_poly = Poly3(
float(w.get("a")),
float(w.get("b")),
float(w.get("c")),
float(w.get("d")),
)
lane.width.append((w_poly, float(w.get("sOffset"))))
lanes[id_] = lane
return lanes
def __parse_link(self, link_elem, road, contact):
if link_elem is None:
return
road_id = road.id_
if link_elem.get("elementType") == "road":
id_b = int(link_elem.get("elementId"))
contact_b = link_elem.get("contactPoint")
link = RoadLink(road_id, id_b, contact, contact_b)
self.road_links.append(link)
return link
else:
assert link_elem.get("elementType") == "junction", "Unknown link type"
junction = int(link_elem.get("elementId"))
if junction not in self.junctions:
return # junction had no connecting roads, so we skipped it
if contact == "start":
road.predecessor = junction
else:
road.successor = junction
connections = self.junctions[junction].connections
for c in connections:
if c.incoming_id == road_id:
self.road_links.append(
RoadLink(road_id, c.connecting_id, contact, c.connecting_contact)
)
def __parse_signal_validity(self, validity_elem):
if validity_elem is None:
return None
return [int(validity_elem.get("fromLane")), int(validity_elem.get("toLane"))]
def __parse_signal(self, signal_elem):
return Signal(
signal_elem.get("id"),
signal_elem.get("country"),
signal_elem.get("type"),
signal_elem.get("subtype"),
signal_elem.get("orientation"),
self.__parse_signal_validity(signal_elem.find("validity")),
)
def __parse_signal_reference(self, signal_reference_elem):
return SignalReference(
signal_reference_elem.get("id"),
signal_reference_elem.get("orientation"),
self.__parse_signal_validity(signal_reference_elem.find("validity")),
)
def parse(self, path):
tree = ET.parse(path)
root = tree.getroot()
if root.tag != "OpenDRIVE":
raise ValueError(f"{path} does not appear to be an OpenDRIVE file")
# parse junctions
for j in root.iter("junction"):
junction = Junction(int(j.get("id")), j.get("name"))
for c in j.iter("connection"):
ty = c.get("type", "default")
if ty != "default":
raise NotImplementedError(
f'unhandled "{ty}" type of junction connection'
)
lane_links = {}
for l in c.iter("laneLink"):
lane_links[int(l.get("from"))] = int(l.get("to"))
junction.add_connection(
int(c.get("incomingRoad")),
int(c.get("connectingRoad")),
c.get("contactPoint"),
lane_links,
)
junction.paths.append(int(c.get("connectingRoad")))
if not junction.paths:
warn(f"junction {junction.id_} has no connecting roads; skipping it")
continue
self.junctions[junction.id_] = junction
# Creating temporal signals container to resolve referenced signals.
_temp_signals = {}
for r in root.iter("road"):
signals = r.find("signals")
if signals is not None:
for s in signals.iter("signal"):
signal = self.__parse_signal(s)
_temp_signals[signal.id_] = signal
# parse roads
self.elidedRoads = {}
for r in root.iter("road"):
road = Road(
r.get("name"), int(r.get("id")), float(r.get("length")), r.get("junction")
)
link = r.find("link")
if link is not None:
pred_elem = link.find("predecessor")
succ_elem = link.find("successor")
pred_link = self.__parse_link(pred_elem, road, "start")
succ_link = self.__parse_link(succ_elem, road, "end")
else:
pred_link = succ_link = None
if road.length < self.tolerance:
warn(
f"road {road.id_} has length shorter than tolerance;"
" geometry may contain artifacts"
)
if self.elide_short_roads:
warn(f"attempting to elide road {road.id_} of length {road.length}")
assert road.junction is None
self.elidedRoads[road.id_] = road
if pred_link:
road.predecessor = pred_link.id_b
road.predecessorContact = pred_link.contact_b
else:
road.predecessorContact = None
if succ_link:
road.successor = succ_link.id_b
road.successorContact = succ_link.contact_b
else:
road.successorContact = None
continue
# Parse planView:
plan_view = r.find("planView")
curves = []
for geom in plan_view.iter("geometry"):
x0 = float(geom.get("x"))
y0 = float(geom.get("y"))
s0 = float(geom.get("s"))
hdg = float(geom.get("hdg"))
length = float(geom.get("length"))
curve_elem = geom[0]
curve = None
if curve_elem.tag == "line":
curve = Line(x0, y0, hdg, length)
elif curve_elem.tag == "arc":
# Arc is clothoid of constant curvature.
curv = float(curve_elem.get("curvature"))
curve = Clothoid(x0, y0, hdg, length, curv, curv)
elif curve_elem.tag == "spiral":
curv0 = float(curve_elem.get("curvStart"))
curv1 = float(curve_elem.get("curvEnd"))
curve = Clothoid(x0, y0, hdg, length, curv0, curv1)
elif curve_elem.tag == "poly3":
a, b, c, d = (
cubic_elem.get("a"),
float(curve_elem.get("b")),
float(curve_elem.get("c")),
float(curve_elem.get("d")),
)
curve = Cubic(x0, y0, hdg, length, a, b, c, d)
elif curve_elem.tag == "paramPoly3":
au, bu, cu, du, av, bv, cv, dv = (
float(curve_elem.get("aU")),
float(curve_elem.get("bU")),
float(curve_elem.get("cU")),
float(curve_elem.get("dU")),
float(curve_elem.get("aV")),
float(curve_elem.get("bV")),
float(curve_elem.get("cV")),
float(curve_elem.get("dV")),
)
p_range = curve_elem.get("pRange")
if p_range and p_range != "normalized":
# TODO support arcLength
raise NotImplementedError("unsupported pRange for paramPoly3")
else:
p_range = 1
curve = ParamCubic(
x0, y0, hdg, length, au, bu, cu, du, av, bv, cv, dv, p_range
)
curves.append((s0, curve))
if not curves:
raise ValueError(f"road {road.id_} has an empty planView")
if not curves[0][0] == 0:
raise ValueError(
f"reference line of road {road.id_} does not start at s=0"
)
lastS = 0
lastCurve = curves[0][1]
refLine = []
for s0, curve in curves[1:]:
l = s0 - lastS
if abs(lastCurve.length - l) > 1e-4:
raise ValueError(
f"planView of road {road.id_} has inconsistent length"
)
if l < 0:
raise ValueError(f"planView of road {road.id_} is not in order")
elif l < 1e-6:
warn(
f"road {road.id_} reference line has a geometry of "
f"length {l}; skipping it"
)
else:
refLine.append(lastCurve)
lastS = s0
lastCurve = curve
if refLine and lastCurve.length < 1e-6:
warn(
f"road {road.id_} reference line has a geometry of "
f"length {lastCurve.length}; skipping it"
)
else:
# even if the last curve is shorter than the threshold, we'll keep it if
# it is the only curve; getting rid of the road entirely is handled by
# road elision above
refLine.append(lastCurve)
assert refLine
road.ref_line = refLine
# Parse lanes:
lanes = r.find("lanes")
for offset in lanes.iter("laneOffset"):
road.offset.append(
(
Poly3(
float(offset.get("a")),
float(offset.get("b")),
float(offset.get("c")),
float(offset.get("d")),
),
float(offset.get("s")),
)
)
def popLastSectionIfShort(l):
if l < 1e-6:
warn(f"road {road.id_} has a lane section of length {l}; skipping it")
# delete the length-0 section and re-link lanes appropriately
badSec = road.lane_secs.pop()
if road.lane_secs:
prev = road.lane_secs[-1]
for id_, lane in prev.lanes.items():
if lane.succ is not None:
lane.succ = badSec.lanes[lane.succ].succ
else:
if road.remappedStartLanes is None:
road.remappedStartLanes = {l: l for l in badSec.lanes}
for start, current in road.remappedStartLanes.items():
road.remappedStartLanes[start] = badSec.lanes[current].succ
return badSec
else:
return None
last_s = float("-inf")
for ls_elem in lanes.iter("laneSection"):
s = float(ls_elem.get("s"))
l = s - last_s
assert l >= 0
badSec = popLastSectionIfShort(l)
last_s = s
left = ls_elem.find("left")
right = ls_elem.find("right")
left_lanes = {}
right_lanes = {}
if left is not None:
left_lanes = self.__parse_lanes(left)
if right is not None:
right_lanes = self.__parse_lanes(right)
lane_sec = LaneSection(s, left_lanes, right_lanes)
if badSec is not None: # finish re-linking lanes across deleted section
for id_, lane in lane_sec.lanes.items():
if lane.pred is not None:
lane.pred = badSec.lanes[lane.pred].pred
road.lane_secs.append(lane_sec)
# parse signals
signals = r.find("signals")
if signals is not None:
for signal_elem in signals.iter("signal"):
signal = self.__parse_signal(signal_elem)
if signal.is_valid():
road.signals.append(signal)
for signal_ref_elem in signals.iter("signalReference"):
signalReference = self.__parse_signal_reference(signal_ref_elem)
if signalReference.is_valid():
referencedSignal = _temp_signals[signalReference.id_]
signal = Signal(
referencedSignal.id_,
referencedSignal.country,
referencedSignal.type_,
referencedSignal.subtype,
signalReference.orientation,
signalReference.validity,
)
road.signals.append(signal)
if len(road.lane_secs) > 1:
popLastSectionIfShort(road.length - s)
assert road.lane_secs
self.roads[road.id_] = road
# Handle links to/from elided roads
new_links = []
for link in self.road_links:
if link.id_a in self.elidedRoads:
continue
if link.id_b in self.elidedRoads:
elided = self.elidedRoads[link.id_b]
if link.contact_b == "start":
link.id_b = elided.successor
link.contact_b = elided.successorContact
else:
link.id_b = elided.predecessor
link.contact_b = elided.predecessorContact
if link.contact_b is None:
continue # link to intersection
new_links.append(link)
self.road_links = new_links
def toScenicNetwork(self):
assert self.intersection_region is not None
# Prepare registry of network elements
allElements = {}
def register(element):
assert element.uid is not None
assert element.uid not in allElements, element.uid
allElements[element.uid] = element
def registerAll(elements):
for elt in elements:
register(elt)
# Convert roads
mainRoads, connectingRoads, roads = {}, {}, {}
for id_, road in self.roads.items():
if road.drivable_region.is_empty:
continue # not actually a road you can drive on
newRoad, elts = road.toScenicRoad(tolerance=self.tolerance)
registerAll(elts)
(connectingRoads if road.junction else mainRoads)[id_] = newRoad
roads[id_] = newRoad
# Hook up inter-road links
for link in self.road_links:
if link.id_b in connectingRoads:
continue # actually a road-to-junction link; handled later
if link.id_a not in roads or link.id_b not in roads:
continue # may link non-drivable roads we haven't parsed; ignore it
# Work out connectivity of roads and adjacent sections
roadA, roadB = roads[link.id_a], roads[link.id_b]
if link.contact_a == "start":
secA = roadA.sections[0]
roadA._predecessor = roadB
forwardA = True
else:
secA = roadA.sections[-1]
roadA._successor = roadB
forwardA = False
if link.contact_b == "start":
secB = roadB.sections[0]
else:
secB = roadB.sections[-1]
# Connect corresponding lanes
lanesB = secB.lanesByOpenDriveID
for laneA in secA.lanes:
if laneA.isForward == forwardA:
pred = laneA._predecessor
if pred is None:
continue
assert pred in lanesB
laneB = lanesB[pred]
laneA._predecessor = laneB
laneA.lane._predecessor = laneB.lane
laneA.lane.group._predecessor = laneB.lane.group
else:
succ = laneA._successor
if succ is None:
continue
assert succ in lanesB
laneB = lanesB[succ]
laneA._successor = laneB
laneA.lane._successor = laneB.lane
laneA.lane.group._successor = laneB.lane.group
# Hook up connecting road links and create intersections
intersections = {}
for jid, junction in self.junctions.items():
if not junction.connections:
continue
assert junction.poly is not None
if junction.poly.is_empty:
warn(f"skipping empty junction {jid}")
continue
# Gather all lanes involved in the junction's connections
allIncomingLanes, allOutgoingLanes = [], []
allRoads, seenRoads = [], set()
allSignals, seenSignals = [], set()
maneuversForLane = defaultdict(list)
for connection in junction.connections:
incomingID = connection.incoming_id
incomingRoad = mainRoads.get(incomingID)
if not incomingRoad:
continue # incoming road has no drivable lanes; skip it
connectingID = connection.connecting_id
connectingRoad = connectingRoads.get(connectingID)
if not connectingRoad:
continue # connecting road has no drivable lanes; skip it
for signal in connectingRoad.signals:
if signal.openDriveID not in seenSignals:
allSignals.append(signal)
seenSignals.add(signal.openDriveID)
# Find possible incoming lanes for this connection
if incomingID not in seenRoads:
allRoads.append(incomingRoad)
seenRoads.add(incomingID)
oldRoad = self.roads[incomingID]
incomingSection = None
if oldRoad.predecessor == jid:
incomingSection = incomingRoad.sections[0]
remapping = self.roads[incomingID].remappedStartLanes # could be None
if oldRoad.successor == jid:
assert incomingSection is None
incomingSection = incomingRoad.sections[-1]
remapping = None
assert incomingSection is not None
if remapping is None:
incomingLaneIDs = incomingSection.lanesByOpenDriveID
else:
incomingLaneIDs = {}
newIDs = incomingSection.lanesByOpenDriveID
for start, remapped in remapping.items():
if remapped in newIDs:
incomingLaneIDs[start] = newIDs[remapped]
assert len(incomingLaneIDs) == len(newIDs)
# Connect incoming lanes to connecting road
if connection.connecting_contact == "start":
connectingSection = connectingRoad.sections[0]
remapping = self.roads[
connectingID
].remappedStartLanes # could be None
else:
connectingSection = connectingRoad.sections[-1]
remapping = None
if remapping is None:
connectingLaneIDs = connectingSection.lanesByOpenDriveID
else:
connectingLaneIDs = {}
newIDs = connectingSection.lanesByOpenDriveID
for start, remapped in remapping.items():
if remapped in newIDs:
connectingLaneIDs[start] = newIDs[remapped]
assert len(connectingLaneIDs) == len(newIDs)
lane_links = connection.lane_links
if not lane_links: # all lanes connect to that with the same id
lane_links = {l: l for l in incomingLaneIDs}
for fromID, fromLane in incomingLaneIDs.items():
# Link incoming lane to connecting road
# (we only handle lanes in incomingLaneIDs, thus skipping non-drivable lanes)
if fromID not in lane_links:
continue # lane not linked by this connection
toID = lane_links[fromID]
toLane = connectingLaneIDs[toID]
if fromLane.lane not in allIncomingLanes:
allIncomingLanes.append(fromLane.lane)
fromLane._successor = toLane
fromLane.lane._successor = toLane.lane
toLane._predecessor = fromLane
toLane.lane._predecessor = fromLane.lane
# Collect outgoing lane and road
# TODO why is it allowed for this not to exist?
outgoingLane = toLane.lane._successor
if outgoingLane is None:
warn(
f"connecting road {connectingID} lane {toID} has no successor lane"
)
else:
if outgoingLane not in allOutgoingLanes:
allOutgoingLanes.append(outgoingLane)
outgoingRoad = outgoingLane.road
if outgoingRoad.id not in seenRoads:
allRoads.append(outgoingRoad)
seenRoads.add(outgoingRoad.id)
# TODO future OpenDRIVE extension annotating left/right turns?
maneuver = roadDomain.Maneuver(
startLane=fromLane.lane,
connectingLane=toLane.lane,
endLane=outgoingLane,
intersection=None, # will be patched once the Intersection is created
)
maneuversForLane[fromLane.lane].append(maneuver)
# Gather maneuvers
allManeuvers = []
for lane, maneuvers in maneuversForLane.items():
assert lane.maneuvers == ()
lane.maneuvers = tuple(maneuvers)
allManeuvers.extend(maneuvers)
# Order connected roads and lanes by adjacency
def cyclicOrder(elements, contactStart=None):
points = []
for element in elements:
if contactStart is None:
old = self.roads[element.id]
assert old.predecessor == jid or old.successor == jid
contactStart = old.predecessor == jid
point = element.centerline[0 if contactStart else -1]
points.append(point)
centroid = sum(points, Vector(0, 0)) / len(points)
pairs = sorted(
zip(elements, points), key=lambda pair: centroid.angleTo(pair[1])
)
return tuple(elem for elem, pt in pairs)
# Create intersection
intersection = roadDomain.Intersection(
polygon=junction.poly,
name=junction.name,
uid=f"intersection{jid}", # need prefix to prevent collisions with roads
id=jid,
roads=cyclicOrder(allRoads),
incomingLanes=cyclicOrder(allIncomingLanes, contactStart=False),
outgoingLanes=cyclicOrder(allOutgoingLanes, contactStart=True),
maneuvers=tuple(allManeuvers),
signals=tuple(allSignals),
crossings=(), # TODO add these
)
register(intersection)
intersections[jid] = intersection
for maneuver in allManeuvers:
object.__setattr__(maneuver, "intersection", intersection)
# Hook up road-intersection links
for rid, oldRoad in self.roads.items():
if rid not in roads:
continue # road does not have any drivable lanes, so we skipped it
newRoad = roads[rid]
if oldRoad.predecessor:
intersection = intersections[oldRoad.predecessor]
newRoad._predecessor = intersection
newRoad.sections[0]._predecessor = intersection
if newRoad.backwardLanes:
newRoad.backwardLanes._successor = intersection
if oldRoad.successor:
intersection = intersections[oldRoad.successor]
newRoad._successor = intersection
newRoad.sections[-1]._successor = intersection
if newRoad.forwardLanes:
newRoad.forwardLanes._successor = intersection
# Gather all network elements
roads = tuple(mainRoads.values())
connectingRoads = tuple(connectingRoads.values())
allRoads = roads + connectingRoads
groups = []
for road in allRoads:
if road.forwardLanes:
groups.append(road.forwardLanes)
if road.backwardLanes:
groups.append(road.backwardLanes)
lanes = [lane for road in allRoads for lane in road.lanes]
intersections = tuple(intersections.values())
crossings = () # TODO add these
sidewalks, shoulders = [], []
for group in groups:
sidewalk = group._sidewalk
if sidewalk:
sidewalks.append(sidewalk)
shoulder = group._shoulder
if shoulder:
shoulders.append(shoulder)
# Add dummy maneuvers for lanes which merge/turn into another lane
for lane in lanes:
if not lane.maneuvers and lane._successor:
maneuver = roadDomain.Maneuver(
type=roadDomain.ManeuverType.STRAIGHT,
startLane=lane,
endLane=lane._successor,
)
lane.maneuvers = (maneuver,)
def combine(regions):
return PolygonalRegion.unionAll(regions, buf=self.tolerance)
return roadDomain.Network(
elements=allElements,
roads=roads,
connectingRoads=connectingRoads,
laneGroups=tuple(groups),
lanes=lanes,
intersections=intersections,
crossings=crossings,
sidewalks=tuple(sidewalks),
shoulders=tuple(shoulders),
tolerance=self.tolerance,
roadRegion=combine(roads),
laneRegion=combine(lanes),
intersectionRegion=combine(intersections),
crossingRegion=combine(crossings),
sidewalkRegion=combine(sidewalks),
)