# Source code for scenic.formats.opendrive.xodr_parser

```"""Parser for OpenDRIVE (.xodr) files."""

import abc
from collections import defaultdict
import itertools
import math
import warnings
import xml.etree.ElementTree as ET

import numpy as np
from scipy.optimize import brentq
from shapely.geometry import GeometryCollection, MultiPoint, MultiPolygon, Point, Polygon
from shapely.ops import snap, unary_union

from scenic.core.geometry import (
averageVectors,
cleanChain,
cleanPolygon,
plotPolygon,
polygonUnion,
removeHoles,
)
from scenic.core.regions import PolygonalRegion, PolylineRegion
from scenic.core.vectors import Vector

class OpenDriveWarning(UserWarning):
pass

def warn(message):
warnings.warn(message, OpenDriveWarning, stacklevel=2)

def buffer_union(polys, tolerance=0.01):
return polygonUnion(polys, buf=tolerance, tolerance=tolerance)

[docs]class Poly3:
"""Cubic polynomial."""

def __init__(self, a, b, c, d):
self.a = a
self.b = b
self.c = c
self.d = d

def eval_at(self, x):
return self.a + self.b * x + self.c * x**2 + self.d * x**3

return self.b + 2 * self.c * x + 3 * self.d * x**2

[docs]class Curve:
"""Geometric elements which compose road reference lines.
See the OpenDRIVE Format Specification for coordinate system details."""

def __init__(self, x0, y0, hdg, length):
self.x0 = x0
self.y0 = y0
self.hdg = hdg  # In radians counterclockwise, 0 at positive x-axis.
self.cos_hdg, self.sin_hdg = math.cos(hdg), math.sin(hdg)
self.length = length

[docs]    def to_points(self, num, extra_points=[]):
"""Sample NUM evenly-spaced points from curve.

Points are tuples of (x, y, s) with (x, y) absolute coordinates
and s the arc length along the curve. Additional points at s values in
extra_points are included if they are contained in the curve (unless
they are extremely close to one of the equally-spaced points).
"""
s_vals = []
extras = itertools.chain(extra_points, itertools.repeat(float("inf")))
next_extra = next(extras)
last_s = 0
for s in np.linspace(0, self.length, num=num):
while next_extra <= s:
if last_s + 1e-6 < next_extra < s - 1e-6:
s_vals.append(next_extra)
next_extra = next(extras)
s_vals.append(s)
last_s = s
return [self.point_at(s) for s in s_vals]

[docs]    @abc.abstractmethod
def point_at(self, s):
"""Get an (x, y, s) point along the curve at the given s coordinate."""
return

[docs]    def rel_to_abs(self, point):
"""Convert from relative coordinates of curve to absolute coordinates.
I.e. rotate counterclockwise by self.hdg and translate by (x0, x1)."""
x, y, s = point
return (
self.x0 + self.cos_hdg * x - self.sin_hdg * y,
self.y0 + self.sin_hdg * x + self.cos_hdg * y,
s,
)

[docs]class Cubic(Curve):
"""A curve defined by the cubic polynomial a + bu + cu^2 + du^3.
The curve starts at (X0, Y0) in direction HDG, with length LENGTH."""

def __init__(self, x0, y0, hdg, length, a, b, c, d):
super().__init__(x0, y0, hdg, length)
self.poly = Poly3(a, b, c, d)
# Crude upper bound for u (used to bracket u for a given s in point_at)
if d != 0:
self.ubound = max(
2 * abs(c / d), abs(b / d) ** 0.5, (3 * length / abs(d)) ** (1 / 3)
)
elif c != 0:
self.ubound = max(abs(b / c), (2 * length / abs(c)) ** 0.5)
else:
self.ubound = length / abs(b)

def arclength(self, u):
d_arc = lambda x: np.sqrt(1 + self.poly.grad_at(x) ** 2)

def point_at(self, s):
# Use Brent's method to find parameter u corresponding to arclength s;
# (N.B. Brent's method proved to be faster than Newton's and has no potential
# convergence issues.)
root_func = lambda x: self.arclength(x) - s
u = float(brentq(root_func, 0, self.ubound))
pt = (s, self.poly.eval_at(u), s)
return self.rel_to_abs(pt)

[docs]class ParamCubic(Curve):
"""A curve defined by the parametric equations
u = a_u + b_up + c_up^2 + d_up^3,
v = a_v + b_vp + c_vp^2 + d_up^3,
with p in [0, p_range].
The curve starts at (X0, Y0) in direction HDG, with length LENGTH."""

def __init__(self, x0, y0, hdg, length, au, bu, cu, du, av, bv, cv, dv, p_range=1):
super().__init__(x0, y0, hdg, length)
self.u_poly = Poly3(au, bu, cu, du)
self.v_poly = Poly3(av, bv, cv, dv)
self.p_range = p_range if p_range else 1

def arclength(self, p):

def point_at(self, s):
root_func = lambda x: self.arclength(x) - s
p = float(brentq(root_func, 0, self.p_range))
pt = (self.u_poly.eval_at(p), self.v_poly.eval_at(p), s)
return self.rel_to_abs(pt)

[docs]class Clothoid(Curve):
"""An Euler spiral with curvature varying linearly between CURV0 and CURV1.
The spiral starts at (X0, Y0) in direction HDG, with length LENGTH."""

def __init__(self, x0, y0, hdg, length, curv0, curv1):
super().__init__(x0, y0, hdg, length)
# Initial and final curvature.
self.curv0 = curv0
self.curv1 = curv1
self.curve_rate = (curv1 - curv0) / length
self.a = abs(curv0)
self.r = 1 / self.a if curv0 != 0 else 1  # value not used if curv0 == 0
self.ode_init = np.array([x0, y0, hdg])

def point_at(self, s):
# Generate a origin-centered clothoid with zero curvature at origin,
# then translate/rotate the relevant segment.
# Arcs are just a degenerate clothoid:
if self.curv0 == self.curv1:
if self.curv0 == 0:
pt = (s, 0, s)
else:
r = self.r
th = s * self.a
if self.curv0 > 0:
pt = (r * math.sin(th), r - r * math.cos(th), s)
else:
pt = (r * math.sin(th), -r + r * math.cos(th), s)
return self.rel_to_abs(pt)
else:

def clothoid_ode(s, state):
x, y, theta = state
return np.array(
[math.cos(theta), math.sin(theta), self.curv0 + (self.curve_rate * s)]
)

sol = solve_ivp(clothoid_ode, (0, s), self.ode_init)
x, y, hdg = sol.y[:, -1]
return (x, y, s)

[docs]class Line(Curve):
"""A line segment between (x0, y0) and (x1, y1)."""

def __init__(self, x0, y0, hdg, length):
super().__init__(x0, y0, hdg, length)
# Endpoints of line.
self.x1 = x0 + length * math.cos(hdg)
self.y1 = y0 + length * math.sin(hdg)

def point_at(self, s):
return self.rel_to_abs((s, 0, s))

class Lane:
def __init__(self, id_, type_, pred=None, succ=None):
self.id_ = id_
self.width = []  # List of tuples (Poly3, int) for width and s-offset.
self.type_ = type_
self.pred = pred
self.succ = succ
self.left_bounds = []  # to be filled in later
self.right_bounds = []
self.centerline = []
self.parent_lane_poly = None

def width_at(self, s):
# S here is relative to start of LaneSection this lane is in.
ind = 0
while ind + 1 < len(self.width) and self.width[ind + 1][1] <= s:
ind += 1
assert self.width[ind][1] <= s, "No matching width entry found."
w_poly, s_off = self.width[ind]
w = w_poly.eval_at(s - s_off)
if w < -1e-6:  # allow for numerical error
raise RuntimeError("OpenDRIVE lane has negative width")
return max(w, 0)

class LaneSection:
def __init__(self, s0, left_lanes={}, right_lanes={}):
self.s0 = s0
self.left_lanes = left_lanes
self.right_lanes = right_lanes
self.left_lane_ids = sorted(self.left_lanes.keys())
self.right_lane_ids = sorted(self.right_lanes.keys(), reverse=True)
self.lanes = dict(list(left_lanes.items()) + list(right_lanes.items()))

def get_lane(self, id_):
if id_ in self.left_lanes:
return self.left_lanes[id_]
elif id_ in self.right_lanes:
return self.right_lanes[id_]
elif id_ == 0:
return Lane(0, "none")
else:

def get_offsets(self, s):
"""Returns dict of lane id and offset from
reference line of lane boundary at coordinate S along line.
By convention, left lanes have positive width offset and right lanes
have negative."""
assert s >= self.s0, "Input s is before lane start position."
offsets = {}
for lane_id in self.left_lane_ids:
if lane_id - 1 in self.left_lane_ids:
offsets[lane_id] = offsets[lane_id - 1] + self.left_lanes[
lane_id
].width_at(s - self.s0)
else:
offsets[lane_id] = self.left_lanes[lane_id].width_at(s - self.s0)
for lane_id in self.right_lane_ids:
if lane_id + 1 in self.right_lane_ids:
offsets[lane_id] = offsets[lane_id + 1] - self.right_lanes[
lane_id
].width_at(s - self.s0)
else:
offsets[lane_id] = -self.right_lanes[lane_id].width_at(s - self.s0)
return offsets

"""Indicates Roads a and b, with ids id_a and id_b respectively, are connected."""

def __init__(self, id_a, id_b, contact_a, contact_b):
self.id_a = id_a
self.id_b = id_b
# contact_a and contact_b should be of value "start" or "end"
# and indicate which end of each road is connected to the other.
self.contact_a = contact_a
self.contact_b = contact_b

class Junction:
class Connection:
def __init__(self, incoming_id, connecting_id, connecting_contact, lane_links):
self.incoming_id = incoming_id
self.connecting_id = connecting_id
# contact point ('start' or 'end') on connecting road
self.connecting_contact = connecting_contact
# dict mapping incoming to connecting lane ids (empty = identity mapping)

def __init__(self, id_, name):
self.id_ = id_
self.name = name
self.connections = []
# Ids of roads that are paths within junction:
self.paths = []
self.poly = None

conn = Junction.Connection(
)
self.connections.append(conn)

def __init__(self, name, id_, length, junction, drive_on_right=True):
self.name = name
self.id_ = id_
self.length = length
self.junction = junction if junction != "-1" else None
self.predecessor = None
self.successor = None
self.signals = []  # List of Signal objects.
self.lane_secs = []  # List of LaneSection objects.
self.ref_line = []  # List of Curve objects defining reference line.
# NOTE: sec_points, sec_polys, sec_lane_polys should be ordered according to lane_secs.
self.sec_points = []  # List of lists of points, one for each LaneSection.
self.sec_polys = []  # List of Polygons, one for each LaneSections.
# List of dict of lane id to Polygon for each LaneSection.
self.sec_lane_polys = []
# List of lane polygons. Not a dict b/c lane id is not unique along road.
self.lane_polys = []
# Each polygon in lane_polys is the union of connected lane section polygons.
# lane_polys is currently not used.
# Reference line offset:
self.offset = []  # List of tuple (Poly3, s-coordinate).
self.drive_on_right = drive_on_right
# Used to fill in gaps between roads:
self.start_bounds_left = {}
self.start_bounds_right = {}
self.end_bounds_left = {}
self.end_bounds_right = {}

self.remappedStartLanes = None  # hack for handling spurious initial lane sections

def get_ref_line_offset(self, s):
if not self.offset:
return 0
ind = 0
while ind + 1 < len(self.offset) and self.offset[ind + 1][1] <= s:
ind += 1
poly, s0 = self.offset[ind]
assert s >= s0
return poly.eval_at(s - s0)

def get_ref_points(self, num):
"""Returns list of list of points for each piece of ref_line.
List of list structure necessary because each piece needs to be
constructed into Polygon separately then unioned afterwards to avoid
self-intersecting lines."""
ref_points = []
transition_points = [sec.s0 for sec in self.lane_secs[1:]]
last_s = 0
for piece in self.ref_line:
piece_points = piece.to_points(num, extra_points=transition_points)
assert piece_points, "Failed to get piece points"
if ref_points:
last_s = ref_points[-1][-1][2]
piece_points = [(p[0], p[1], p[2] + last_s) for p in piece_points]
ref_points.append(piece_points)
transition_points = [s - last_s for s in transition_points if s > last_s]
return ref_points

# Convert point to shapely Point.
point = Point(point.x, point.y)
for i in range(len(self.lane_secs)):
ref_points = self.sec_points[i]
poly = self.sec_polys[i]
if point.within(poly.buffer(1)):
lane_id = None
for id_ in self.sec_lane_polys[i].keys():
if point.within(self.sec_lane_polys[i][id_].buffer(1)):
lane_id = id_
break
min_dist = float("inf")
for i in range(len(ref_points)):
cur_point = Point(ref_points[i][0], ref_points[i][1])
if point.distance(cur_point) < min_dist:
closest_idx = i
if closest_idx >= len(ref_points) - 1:
closest_idx = len(ref_points) - 2
dy = ref_points[closest_idx + 1][1] - ref_points[closest_idx][1]
dx = ref_points[closest_idx + 1][0] - ref_points[closest_idx][0]
# Right lanes have negative lane_id.
# Flip heading if drive_on_right XOR right lane.
if self.drive_on_right != (lane_id < 0):
# Heading 0 is defined differently between OpenDrive and Scenic(?)
return (heading + math.pi) % (2 * math.pi) - math.pi

def calc_geometry_for_type(self, lane_types, num, tolerance, calc_gap=False):
"""Given a list of lane types, returns a tuple of:
- List of lists of points along the reference line, with same indexing as self.lane_secs
- List of region polygons, with same indexing as self.lane_secs
- List of dictionary of lane id to polygon, with same indexing as self.lane_secs
- List of polygons for each lane (not necessarily by id, but respecting lane successor/predecessor)
- Polygon for entire region.
If calc_gap=True, fills in gaps between connected roads. This is fairly expensive.
"""
ref_points = self.get_ref_points(num)
self.ref_line_points = list(itertools.chain.from_iterable(ref_points))
cur_lane_polys = {}
sec_points = []
sec_polys = []
sec_lane_polys = []
lane_polys = []
last_lefts = None
last_rights = None
cur_p = None

for i in range(len(self.lane_secs)):
cur_sec = self.lane_secs[i]
cur_sec_points = []
if i < len(self.lane_secs) - 1:
next_sec = self.lane_secs[i + 1]
s_stop = next_sec.s0
else:
s_stop = float("inf")
left_bounds = defaultdict(list)
right_bounds = defaultdict(list)
cur_sec_lane_polys = defaultdict(list)
cur_sec_polys = []
end_of_sec = False

while ref_points and not end_of_sec:
if not ref_points[0]:
ref_points.pop(0)
if not ref_points or (cur_p and cur_p[2] >= s_stop):
# Case 1: We have processed the entire reference line.
# Case 2: The s-coordinate has exceeded s_stop, so we should move
# onto the next LaneSection.
# Either way, we collect all the bound points so far into polygons.
end_of_sec = True
cur_last_lefts = {}
cur_last_rights = {}
for id_ in left_bounds:
# Polygon for piece of lane:
left = left_bounds[id_]
right = right_bounds[id_][::-1]
bounds = left + right

if len(bounds) < 3:
continue
poly = cleanPolygon(Polygon(bounds), tolerance)
if not poly.is_empty:
if poly.geom_type == "MultiPolygon":
poly = MultiPolygon(
[
p
for p in poly.geoms
if not p.is_empty and p.exterior
]
)
cur_sec_polys.extend(poly.geoms)
else:
cur_sec_polys.append(poly)
cur_sec_lane_polys[id_].append(poly)
cur_last_lefts[id_] = left_bounds[id_][-1]
cur_last_rights[id_] = right_bounds[id_][-1]
if i == 0 or not self.start_bounds_left:
self.start_bounds_left[id_] = left_bounds[id_][0]
self.start_bounds_right[id_] = right_bounds[id_][0]

left_bounds = defaultdict(list)
right_bounds = defaultdict(list)
if cur_last_lefts and cur_last_rights:
last_lefts = cur_last_lefts
last_rights = cur_last_rights
else:
cur_p = ref_points[0][0]
cur_sec_points.append(cur_p)
s = min(max(cur_p[2], cur_sec.s0), s_stop - 1e-6)
offsets = cur_sec.get_offsets(s)
offsets[0] = 0
for id_ in offsets:
offsets[id_] += self.get_ref_line_offset(s)
if len(ref_points[0]) > 1:
next_p = ref_points[0][1]
tan_vec = (next_p[0] - cur_p[0], next_p[1] - cur_p[1])
else:
if len(cur_sec_points) >= 2:
prev_p = cur_sec_points[-2]
else:
assert len(sec_points) > 0
if sec_points[-1]:
assert sec_points[-1][-1] == cur_p
prev_p = sec_points[-1][-2]
else:
prev_p = sec_points[-2][-2]

tan_vec = (cur_p[0] - prev_p[0], cur_p[1] - prev_p[1])
tan_norm = math.hypot(tan_vec[0], tan_vec[1])
assert tan_norm > 1e-10
normal_vec = (-tan_vec[1] / tan_norm, tan_vec[0] / tan_norm)
if cur_p[2] < s_stop:
# if at end of section, keep current point to be included in
# the next section as well; otherwise remove it
ref_points[0].pop(0)
elif len(ref_points[0]) == 1 and len(ref_points) > 1:
# also get rid of point if this is the last point of the current geometry and
# and there is another geometry following
ref_points[0].pop(0)
for id_ in offsets:
lane = cur_sec.get_lane(id_)
if lane.type_ in lane_types:
if id_ > 0:
prev_id = id_ - 1
else:
prev_id = id_ + 1
left_bound = [
cur_p[0] + normal_vec[0] * offsets[id_],
cur_p[1] + normal_vec[1] * offsets[id_],
]
right_bound = [
cur_p[0] + normal_vec[0] * offsets[prev_id],
cur_p[1] + normal_vec[1] * offsets[prev_id],
]
if id_ < 0:
left_bound, right_bound = right_bound, left_bound
halfway = (offsets[id_] + offsets[prev_id]) / 2
centerline = [
cur_p[0] + normal_vec[0] * halfway,
cur_p[1] + normal_vec[1] * halfway,
]
left_bounds[id_].append(left_bound)
right_bounds[id_].append(right_bound)
lane.left_bounds.append(left_bound)
lane.right_bounds.append(right_bound)
lane.centerline.append(centerline)
assert len(cur_sec_points) >= 2, i
sec_points.append(cur_sec_points)
sec_polys.append(buffer_union(cur_sec_polys, tolerance=tolerance))
for id_ in cur_sec_lane_polys:
poly = buffer_union(cur_sec_lane_polys[id_], tolerance=tolerance)
cur_sec_lane_polys[id_] = poly
cur_sec.get_lane(id_).poly = poly
sec_lane_polys.append(dict(cur_sec_lane_polys))
next_lane_polys = {}
for id_ in cur_sec_lane_polys:
pred_id = cur_sec.get_lane(id_).pred
if pred_id and pred_id in cur_lane_polys:
next_lane_polys[id_] = cur_lane_polys.pop(pred_id) + [
cur_sec_lane_polys[id_]
]
else:
next_lane_polys[id_] = [cur_sec_lane_polys[id_]]
for id_ in cur_lane_polys:
poly = buffer_union(cur_lane_polys[id_], tolerance=tolerance)
self.lane_secs[i - 1].get_lane(id_).parent_lane_poly = len(lane_polys)
lane_polys.append(poly)
cur_lane_polys = next_lane_polys
for id_ in cur_lane_polys:
poly = buffer_union(cur_lane_polys[id_], tolerance=tolerance)
cur_sec.get_lane(id_).parent_lane_poly = len(lane_polys)
lane_polys.append(poly)
union_poly = buffer_union(sec_polys, tolerance=tolerance)
if last_lefts and last_rights:
self.end_bounds_left.update(last_lefts)
self.end_bounds_right.update(last_rights)

# Difference and slightly erode all overlapping polygons
for i in range(len(sec_polys) - 1):
if sec_polys[i].overlaps(sec_polys[i + 1]):
sec_polys[i] = sec_polys[i].difference(sec_polys[i + 1]).buffer(-1e-6)
assert not sec_polys[i].overlaps(sec_polys[i + 1])

for polys in sec_lane_polys:
ids = sorted(polys)  # order adjacent lanes consecutively
for i in range(len(ids) - 1):
polyA, polyB = polys[ids[i]], polys[ids[i + 1]]
if polyA.overlaps(polyB):
polys[ids[i]] = polyA.difference(polyB).buffer(-1e-6)
assert not polys[ids[i]].overlaps(polyB)

for i in range(len(lane_polys) - 1):
if lane_polys[i].overlaps(lane_polys[i + 1]):
lane_polys[i] = lane_polys[i].difference(lane_polys[i + 1]).buffer(-1e-6)
assert not lane_polys[i].overlaps(lane_polys[i + 1])

# Set parent lane polygon references to corrected polygons
for sec in self.lane_secs:
for lane in sec.lanes.values():
parentIndex = lane.parent_lane_poly
if isinstance(parentIndex, int):
lane.parent_lane_poly = lane_polys[parentIndex]

return (sec_points, sec_polys, sec_lane_polys, lane_polys, union_poly)

def calculate_geometry(
self,
num,
tolerance,
calc_gap,
drivable_lane_types,
sidewalk_lane_types,
shoulder_lane_types,
):
# Note: this also calculates self.start_bounds_left, self.start_bounds_right,
# self.end_bounds_left, self.end_bounds_right
(
self.sec_points,
self.sec_polys,
self.sec_lane_polys,
self.lane_polys,
self.drivable_region,
) = self.calc_geometry_for_type(
drivable_lane_types, num, tolerance, calc_gap=calc_gap
)

for i, sec in enumerate(self.lane_secs):
sec.drivable_lanes = {}
sec.sidewalk_lanes = {}
sec.shoulder_lanes = {}
for id_, lane in sec.lanes.items():
ty = lane.type_
if ty in drivable_lane_types:
sec.drivable_lanes[id_] = lane
elif ty in sidewalk_lane_types:
sec.sidewalk_lanes[id_] = lane
elif ty in shoulder_lane_types:
sec.shoulder_lanes[id_] = lane
if not sec.drivable_lanes:
continue

rightmost = None
for id_ in itertools.chain(reversed(sec.right_lane_ids), sec.left_lane_ids):
if id_ in sec.drivable_lanes:
rightmost = sec.lanes[id_]
break
assert rightmost is not None, i
leftmost = None
for id_ in itertools.chain(reversed(sec.left_lane_ids), sec.right_lane_ids):
if id_ in sec.drivable_lanes:
leftmost = sec.lanes[id_]
break
assert leftmost is not None, i
sec.left_edge = leftmost.left_bounds
assert len(sec.left_edge) >= 2
sec.right_edge = rightmost.right_bounds
assert len(sec.right_edge) >= 2

_, _, _, _, self.sidewalk_region = self.calc_geometry_for_type(
sidewalk_lane_types, num, tolerance, calc_gap=calc_gap
)

_, _, _, _, self.shoulder_region = self.calc_geometry_for_type(
shoulder_lane_types, num, tolerance, calc_gap=calc_gap
)

assert self.sec_points
allElements = []
# Create lane and road sections
last_section = None
sidewalkSections = defaultdict(list)
shoulderSections = defaultdict(list)
for sec, pts, sec_poly, lane_polys in zip(
self.lane_secs, self.sec_points, self.sec_polys, self.sec_lane_polys
):
pts = [pt[:2] for pt in pts]  # drop s coordinate
assert sec.drivable_lanes
laneSections = {}
for id_, lane in sec.drivable_lanes.items():
succ = None  # will set this later
if last_section and lane.pred:
if lane.pred in last_section.lanesByOpenDriveID:
pred = last_section.lanesByOpenDriveID[lane.pred]
else:
warn(
f"lane {id_} has a non-drivable predecessor"
)
pred = None
else:
left, center, right = lane.left_bounds, lane.centerline, lane.right_bounds
if id_ > 0:  # backward lane
left, center, right = right[::-1], center[::-1], left[::-1]
succ, pred = pred, succ
polygon=lane_polys[id_],
centerline=PolylineRegion(cleanChain(center)),
leftEdge=PolylineRegion(cleanChain(left)),
rightEdge=PolylineRegion(cleanChain(right)),
successor=succ,
predecessor=pred,
lane=None,  # will set these later
group=None,
openDriveID=id_,
isForward=id_ < 0,
)
section._original_lane = lane
laneSections[id_] = section
allElements.append(section)
polygon=sec_poly,
centerline=PolylineRegion(cleanChain(pts)),
leftEdge=PolylineRegion(cleanChain(sec.left_edge)),
rightEdge=PolylineRegion(cleanChain(sec.right_edge)),
successor=None,
predecessor=last_section,
lanesByOpenDriveID=laneSections,
)
allElements.append(section)
last_section = section

for id_, lane in sec.sidewalk_lanes.items():
sidewalkSections[id_].append(lane)
for id_, lane in sec.shoulder_lanes.items():
shoulderSections[id_].append(lane)

# Build sidewalks and shoulders
# TODO improve this!
forwardSidewalks, backwardSidewalks = [], []
forwardShoulders, backwardShoulders = [], []
for id_ in sidewalkSections:
(forwardSidewalks if id_ < 0 else backwardSidewalks).append(id_)
for id_ in shoulderSections:
(forwardShoulders if id_ < 0 else backwardShoulders).append(id_)

def combineSections(laneIDs, sections, name):
leftmost, rightmost = max(laneIDs), min(laneIDs)
if len(laneIDs) != leftmost - rightmost + 1:
warn(f"ignoring {name} in the middle of road {self.id_}")
leftPoints, rightPoints = [], []
if leftmost < 0:
leftmost = rightmost
while leftmost + 1 in laneIDs:
leftmost = leftmost + 1
leftSecs, rightSecs = sections[leftmost], sections[rightmost]
for leftSec, rightSec in zip(leftSecs, rightSecs):
leftPoints.extend(leftSec.left_bounds)
rightPoints.extend(rightSec.right_bounds)
else:
rightmost = leftmost
while rightmost - 1 in laneIDs:
rightmost = rightmost - 1
leftSecs = reversed(sections[leftmost])
rightSecs = reversed(sections[rightmost])
for leftSec, rightSec in zip(leftSecs, rightSecs):
leftPoints.extend(reversed(rightSec.right_bounds))
rightPoints.extend(reversed(leftSec.left_bounds))
leftEdge = PolylineRegion(cleanChain(leftPoints))
rightEdge = PolylineRegion(cleanChain(rightPoints))

# Heuristically create some kind of reasonable centerline
if len(leftPoints) == len(rightPoints):
centerPoints = list(
averageVectors(l, r) for l, r in zip(leftPoints, rightPoints)
)
else:
num = max(len(leftPoints), len(rightPoints))
centerPoints = []
for d in np.linspace(0, 1, num):
l = leftEdge.lineString.interpolate(d, normalized=True)
r = rightEdge.lineString.interpolate(d, normalized=True)
centerPoints.append(averageVectors(l.coords[0], r.coords[0]))
centerline = PolylineRegion(cleanChain(centerPoints))
allPolys = (
sec.poly
for id_ in range(rightmost, leftmost + 1)
for sec in sections[id_]
)
union = buffer_union(allPolys, tolerance=tolerance)
return id_, union, centerline, leftEdge, rightEdge

def makeSidewalk(laneIDs):
if not laneIDs:
return None
id_, union, centerline, leftEdge, rightEdge = combineSections(
laneIDs, sidewalkSections, "sidewalk"
)
id=id_,
polygon=union,
centerline=centerline,
leftEdge=leftEdge,
rightEdge=rightEdge,
)
allElements.append(sidewalk)
return sidewalk

forwardSidewalk = makeSidewalk(forwardSidewalks)
backwardSidewalk = makeSidewalk(backwardSidewalks)

def makeShoulder(laneIDs):
if not laneIDs:
return None
id_, union, centerline, leftEdge, rightEdge = combineSections(
laneIDs, shoulderSections, "shoulder"
)
id=id_,
polygon=union,
centerline=centerline,
leftEdge=leftEdge,
rightEdge=rightEdge,
)
allElements.append(shoulder)
return shoulder

forwardShoulder = makeShoulder(forwardShoulders)
backwardShoulder = makeShoulder(backwardShoulders)

# Connect sections to their successors
next_section = None
for sec, section in reversed(list(zip(self.lane_secs, roadSections))):
if next_section is None:
next_section = section
for id_, lane in sec.drivable_lanes.items():
newLane = section.lanesByOpenDriveID[id_]
if newLane.isForward:
newLane._successor = lane.succ
else:
newLane._predecessor = lane.succ
continue
section._successor = next_section
for id_, lane in sec.drivable_lanes.items():
newLane = section.lanesByOpenDriveID[id_]
if newLane.isForward:
newLane._successor = next_section.lanesByOpenDriveID.get(lane.succ)
else:
newLane._predecessor = next_section.lanesByOpenDriveID.get(lane.succ)
next_section = section

# Connect lane sections to adjacent lane sections
lanes = section.lanesByOpenDriveID
for id_, lane in lanes.items():
if id_ < -1:
leftID = id_ + 1
elif id_ == -1:
leftID = 1
elif id_ == 1:
leftID = -1
else:
leftID = id_ - 1
rightID = id_ - 1 if id_ < 0 else id_ + 1
lane._laneToLeft = lanes.get(leftID)
lane._laneToRight = lanes.get(rightID)
if self.drive_on_right:
lane._fasterLane = lane._laneToLeft
lane._slowerLane = lane._laneToRight
else:
lane._slowerLane = lane._laneToLeft
lane._fasterLane = lane._laneToRight
if lane._fasterLane and lane._fasterLane.isForward != lane.isForward:
lane._fasterLane = None
if lane._slowerLane and lane._slowerLane.isForward != lane.isForward:
lane._slowerLane = None
if lane._laneToLeft:
if lane._laneToRight:

# Gather lane sections into lanes
nextID = 0
forwardLanes, backwardLanes = [], []
laneSection._visited = False
if not laneSection._visited:  # start of new lane
forward = laneSection.isForward
sections = []
successorLane = None  # lane this one will merge into
while True:
sections.append(laneSection)
laneSection._visited = True
assert laneSection.isForward == forward
if forward:
nextSection = laneSection._successor
else:
nextSection = laneSection._predecessor
if not nextSection or not isinstance(
):
break
elif nextSection._visited:
successorLane = nextSection.lane
break
laneSection = nextSection
ls = laneSection._original_lane
assert ls.parent_lane_poly

if not forward:
sections = tuple(reversed(sections))
leftPoints, rightPoints, centerPoints = [], [], []
for section in sections:
leftPoints.extend(section.leftEdge.points)
rightPoints.extend(section.rightEdge.points)
centerPoints.extend(section.centerline.points)
leftEdge = PolylineRegion(cleanChain(leftPoints))
rightEdge = PolylineRegion(cleanChain(rightPoints))
centerline = PolylineRegion(cleanChain(centerPoints))
polygon=ls.parent_lane_poly,
centerline=centerline,
leftEdge=leftEdge,
rightEdge=rightEdge,
group=None,
sections=tuple(sections),
)
nextID += 1
for section in sections:
section.lane = lane
(forwardLanes if forward else backwardLanes).append(lane)
allElements.append(lane)
lanes = forwardLanes + backwardLanes
assert lanes

for lane in lanes:
for section in lane.sections:

# Create lane groups
def getEdges(forward):
if forward:
startLanes = sec.forwardLanes
else:
startLanes = sec.backwardLanes
leftPoints = []
current = startLanes[-1]  # get leftmost lane of the first section
if current._laneToLeft and current._laneToLeft.isForward == forward:
current = current._laneToLeft
leftPoints.extend(current.leftEdge.points)
current = current._successor
leftEdge = PolylineRegion(cleanChain(leftPoints))
rightPoints = []
current = startLanes[0]  # get rightmost lane of the first section
if current._laneToRight and current._laneToRight.isForward == forward:
current = current._laneToRight
rightPoints.extend(current.rightEdge.points)
current = current._successor
rightEdge = PolylineRegion(cleanChain(rightPoints))
middleLane = startLanes[len(startLanes) // 2].lane  # rather arbitrary
return leftEdge, middleLane.centerline, rightEdge

if forwardLanes:
leftEdge, centerline, rightEdge = getEdges(forward=True)
polygon=buffer_union(
(lane.polygon for lane in forwardLanes), tolerance=tolerance
),
centerline=centerline,
leftEdge=leftEdge,
rightEdge=rightEdge,
lanes=tuple(forwardLanes),
curb=(forwardShoulder.rightEdge if forwardShoulder else rightEdge),
sidewalk=forwardSidewalk,
bikeLane=None,
shoulder=forwardShoulder,
opposite=None,
)
allElements.append(forwardGroup)
else:
forwardGroup = None
if backwardLanes:
leftEdge, centerline, rightEdge = getEdges(forward=False)
polygon=buffer_union(
(lane.polygon for lane in backwardLanes), tolerance=tolerance
),
centerline=centerline,
leftEdge=leftEdge,
rightEdge=rightEdge,
lanes=tuple(backwardLanes),
curb=(backwardShoulder.rightEdge if backwardShoulder else rightEdge),
sidewalk=backwardSidewalk,
bikeLane=None,
shoulder=backwardShoulder,
opposite=forwardGroup,
)
allElements.append(backwardGroup)
if forwardGroup:
forwardGroup._opposite = backwardGroup
else:
backwardGroup = None

# Create signal
for i, signal_ in enumerate(self.signals):
uid=f"signal{signal_.id_}_{self.id_}_{i}",
openDriveID=signal_.id_,
country=signal_.country,
type=signal_.type_,
)

assert forwardGroup or backwardGroup
if forwardGroup:
rightEdge = forwardGroup.rightEdge
else:
rightEdge = backwardGroup.leftEdge
if backwardGroup:
leftEdge = backwardGroup.rightEdge
else:
leftEdge = forwardGroup.leftEdge
centerline = PolylineRegion(tuple(pt[:2] for pt in self.ref_line_points))
name=self.name,
uid=f"road{self.id_}",  # need prefix to prevent collisions with intersections
id=self.id_,
polygon=self.drivable_region,
centerline=centerline,
leftEdge=leftEdge,
rightEdge=rightEdge,
lanes=lanes,
forwardLanes=forwardGroup,
backwardLanes=backwardGroup,
)

# Set up parent references
if forwardGroup:
if forwardGroup._sidewalk:
if forwardGroup._shoulder:
forwardGroup._shoulder.group = forwardGroup
if backwardGroup:
if backwardGroup._sidewalk:
if backwardGroup._shoulder:
backwardGroup._shoulder.group = backwardGroup
for lane in forwardLanes:
lane.group = forwardGroup
for sec in lane.sections:
sec.group = forwardGroup
del sec._original_lane
for lane in backwardLanes:
lane.group = backwardGroup
for sec in lane.sections:
sec.group = backwardGroup
del sec._original_lane

[docs]class Signal:
"""Traffic lights, stop signs, etc."""

def __init__(self, id_, country, type_, subtype, orientation, validity=None):
self.id_ = id_
self.country = country
self.type_ = type_
self.subtype = subtype
self.orientation = orientation
self.validity = validity

def is_valid(self):
return self.validity is None or self.validity != [0, 0]

class SignalReference:
def __init__(self, id_, orientation, validity=None):
self.id_ = id_
self.validity = validity
self.orientation = orientation

def is_valid(self):
return self.validity is None or self.validity != [0, 0]

defaultTolerance = 0.05

def __init__(
self,
tolerance=None,
fill_intersections=True,
drivable_lane_types=(
"driving",
"entry",
"exit",
"offRamp",
"onRamp",
"connectingRamp",
),
sidewalk_lane_types=("sidewalk",),
shoulder_lane_types=("shoulder", "parking", "stop", "border"),
):
self.tolerance = self.defaultTolerance if tolerance is None else tolerance
self.junctions = {}
self.sec_lane_polys = []
self.lane_polys = []
self.intersection_region = None
self.fill_intersections = fill_intersections
self.drivable_lane_types = drivable_lane_types
self.sidewalk_lane_types = sidewalk_lane_types
self.shoulder_lane_types = shoulder_lane_types

def calculate_geometry(self, num, calc_gap=False, calc_intersect=True):
# If calc_gap=True, fills in gaps between connected roads.
# If calc_intersect=True, calculates intersection regions.
# These are fairly expensive.
num,
calc_gap=calc_gap,
tolerance=self.tolerance,
drivable_lane_types=self.drivable_lane_types,
sidewalk_lane_types=self.sidewalk_lane_types,
shoulder_lane_types=self.shoulder_lane_types,
)

if calc_gap:
drivable_polys = []
sidewalk_polys = []
shoulder_polys = []
if not (drivable_poly is None or drivable_poly.is_empty):
drivable_polys.append(drivable_poly)
if not (sidewalk_poly is None or sidewalk_poly.is_empty):
sidewalk_polys.append(sidewalk_poly)
if not (shoulder_poly is None or shoulder_poly.is_empty):
shoulder_polys.append(shoulder_poly)

else:
else:

for id_, lane in a_sec.lanes.items():
other_id = lane.pred
else:
other_id = lane.succ
if other_id not in b_bounds_left or other_id not in b_bounds_right:
continue
if id_ not in a_bounds_left or id_ not in a_bounds_right:
continue

gap_poly = MultiPoint(
[
a_bounds_left[id_],
a_bounds_right[id_],
b_bounds_left[other_id],
b_bounds_right[other_id],
]
).convex_hull
if not gap_poly.is_valid:
continue
if gap_poly.geom_type == "Polygon" and not gap_poly.is_empty:
if lane.type_ in self.drivable_lane_types:
drivable_polys.append(gap_poly)
elif lane.type_ in self.sidewalk_lane_types:
sidewalk_polys.append(gap_poly)
elif lane.type_ in self.shoulder_lane_types:
shoulder_polys.append(gap_poly)
else:

self.drivable_region = buffer_union(drivable_polys, tolerance=self.tolerance)
self.sidewalk_region = buffer_union(sidewalk_polys, tolerance=self.tolerance)
self.shoulder_region = buffer_union(shoulder_polys, tolerance=self.tolerance)

if calc_intersect:
self.calculate_intersections()

def calculate_intersections(self):
intersect_polys = []
for junc in self.junctions.values():
junc_polys = [self.roads[i].drivable_region for i in junc.paths]
assert junc_polys, junc
union = buffer_union(junc_polys, tolerance=self.tolerance)
if self.fill_intersections:
union = removeHoles(union)
assert union.is_valid
junc.poly = union
intersect_polys.append(union)
self.intersection_region = buffer_union(intersect_polys, tolerance=self.tolerance)

# Convert point to shapely Point.
point = Point(point.x, point.y)
# raise RuntimeError('Point not in RoadMap: ', point)
return 0

def __parse_lanes(self, lanes_elem):
"""Lanes_elem should be <left> or <right> element.
Returns dict of lane ids and Lane objects."""
lanes = {}
for l in lanes_elem.iter("lane"):
id_ = int(l.get("id"))
type_ = l.get("type")
pred = None
succ = None
if pred_elem is not None:
pred = int(pred_elem.get("id"))
if succ_elem is not None:
succ = int(succ_elem.get("id"))
lane = Lane(id_, type_, pred, succ)
for w in l.iter("width"):
w_poly = Poly3(
float(w.get("a")),
float(w.get("b")),
float(w.get("c")),
float(w.get("d")),
)
lane.width.append((w_poly, float(w.get("sOffset"))))
lanes[id_] = lane
return lanes

return
else:
if junction not in self.junctions:
if contact == "start":
else:
connections = self.junctions[junction].connections
for c in connections:
)

def __parse_signal_validity(self, validity_elem):
if validity_elem is None:
return None
return [int(validity_elem.get("fromLane")), int(validity_elem.get("toLane"))]

def __parse_signal(self, signal_elem):
return Signal(
signal_elem.get("id"),
signal_elem.get("country"),
signal_elem.get("type"),
signal_elem.get("subtype"),
signal_elem.get("orientation"),
self.__parse_signal_validity(signal_elem.find("validity")),
)

def __parse_signal_reference(self, signal_reference_elem):
return SignalReference(
signal_reference_elem.get("id"),
signal_reference_elem.get("orientation"),
self.__parse_signal_validity(signal_reference_elem.find("validity")),
)

def parse(self, path):
tree = ET.parse(path)
root = tree.getroot()
if root.tag != "OpenDRIVE":
raise ValueError(f"{path} does not appear to be an OpenDRIVE file")

# parse junctions
for j in root.iter("junction"):
junction = Junction(int(j.get("id")), j.get("name"))
for c in j.iter("connection"):
ty = c.get("type", "default")
if ty != "default":
raise NotImplementedError(
f'unhandled "{ty}" type of junction connection'
)
c.get("contactPoint"),
)
if not junction.paths:
warn(f"junction {junction.id_} has no connecting roads; skipping it")
continue
self.junctions[junction.id_] = junction

# Creating temporal signals container to resolve referenced signals.
_temp_signals = {}
signals = r.find("signals")
if signals is not None:
for s in signals.iter("signal"):
signal = self.__parse_signal(s)
_temp_signals[signal.id_] = signal

r.get("name"), int(r.get("id")), float(r.get("length")), r.get("junction")
)
else:

warn(
" geometry may contain artifacts"
)
else:
else:
continue

# Parse planView:
plan_view = r.find("planView")
curves = []
for geom in plan_view.iter("geometry"):
x0 = float(geom.get("x"))
y0 = float(geom.get("y"))
s0 = float(geom.get("s"))
hdg = float(geom.get("hdg"))
length = float(geom.get("length"))
curve_elem = geom[0]
curve = None
if curve_elem.tag == "line":
curve = Line(x0, y0, hdg, length)
elif curve_elem.tag == "arc":
# Arc is clothoid of constant curvature.
curv = float(curve_elem.get("curvature"))
curve = Clothoid(x0, y0, hdg, length, curv, curv)
elif curve_elem.tag == "spiral":
curv0 = float(curve_elem.get("curvStart"))
curv1 = float(curve_elem.get("curvEnd"))
curve = Clothoid(x0, y0, hdg, length, curv0, curv1)
elif curve_elem.tag == "poly3":
a, b, c, d = (
cubic_elem.get("a"),
float(curve_elem.get("b")),
float(curve_elem.get("c")),
float(curve_elem.get("d")),
)
curve = Cubic(x0, y0, hdg, length, a, b, c, d)
elif curve_elem.tag == "paramPoly3":
au, bu, cu, du, av, bv, cv, dv = (
float(curve_elem.get("aU")),
float(curve_elem.get("bU")),
float(curve_elem.get("cU")),
float(curve_elem.get("dU")),
float(curve_elem.get("aV")),
float(curve_elem.get("bV")),
float(curve_elem.get("cV")),
float(curve_elem.get("dV")),
)
p_range = curve_elem.get("pRange")
if p_range and p_range != "normalized":
# TODO support arcLength
raise NotImplementedError("unsupported pRange for paramPoly3")
else:
p_range = 1
curve = ParamCubic(
x0, y0, hdg, length, au, bu, cu, du, av, bv, cv, dv, p_range
)
curves.append((s0, curve))
if not curves:
if not curves[0][0] == 0:
raise ValueError(
)
lastS = 0
lastCurve = curves[0][1]
refLine = []
for s0, curve in curves[1:]:
l = s0 - lastS
if abs(lastCurve.length - l) > 1e-4:
raise ValueError(
)
if l < 0:
elif l < 1e-6:
warn(
f"length {l}; skipping it"
)
else:
refLine.append(lastCurve)
lastS = s0
lastCurve = curve
if refLine and lastCurve.length < 1e-6:
warn(
f"length {lastCurve.length}; skipping it"
)
else:
# even if the last curve is shorter than the threshold, we'll keep it if
# it is the only curve; getting rid of the road entirely is handled by
refLine.append(lastCurve)
assert refLine

# Parse lanes:
lanes = r.find("lanes")
for offset in lanes.iter("laneOffset"):
(
Poly3(
float(offset.get("a")),
float(offset.get("b")),
float(offset.get("c")),
float(offset.get("d")),
),
float(offset.get("s")),
)
)

def popLastSectionIfShort(l):
if l < 1e-6:

# delete the length-0 section and re-link lanes appropriately
for id_, lane in prev.lanes.items():
if lane.succ is not None:
else:
else:
return None

last_s = float("-inf")
for ls_elem in lanes.iter("laneSection"):
s = float(ls_elem.get("s"))
l = s - last_s
assert l >= 0

last_s = s
left = ls_elem.find("left")
right = ls_elem.find("right")
left_lanes = {}
right_lanes = {}

if left is not None:
left_lanes = self.__parse_lanes(left)

if right is not None:
right_lanes = self.__parse_lanes(right)

lane_sec = LaneSection(s, left_lanes, right_lanes)

if badSec is not None:  # finish re-linking lanes across deleted section
for id_, lane in lane_sec.lanes.items():
if lane.pred is not None:

# parse signals
signals = r.find("signals")
if signals is not None:
for signal_elem in signals.iter("signal"):
signal = self.__parse_signal(signal_elem)
if signal.is_valid():

for signal_ref_elem in signals.iter("signalReference"):
signalReference = self.__parse_signal_reference(signal_ref_elem)
if signalReference.is_valid():
referencedSignal = _temp_signals[signalReference.id_]
signal = Signal(
referencedSignal.id_,
referencedSignal.country,
referencedSignal.type_,
referencedSignal.subtype,
signalReference.orientation,
signalReference.validity,
)

continue
else:

def toScenicNetwork(self):
assert self.intersection_region is not None

# Prepare registry of network elements
allElements = {}

def register(element):
assert element.uid is not None
assert element.uid not in allElements, element.uid
allElements[element.uid] = element

def registerAll(elements):
for elt in elements:
register(elt)

continue  # not actually a road you can drive on
registerAll(elts)

forwardA = True
else:
forwardA = False
else:

# Connect corresponding lanes
lanesB = secB.lanesByOpenDriveID
for laneA in secA.lanes:
if laneA.isForward == forwardA:
pred = laneA._predecessor
if pred is None:
continue
assert pred in lanesB
laneB = lanesB[pred]
laneA._predecessor = laneB
laneA.lane._predecessor = laneB.lane
laneA.lane.group._predecessor = laneB.lane.group
else:
succ = laneA._successor
if succ is None:
continue
assert succ in lanesB
laneB = lanesB[succ]
laneA._successor = laneB
laneA.lane._successor = laneB.lane
laneA.lane.group._successor = laneB.lane.group

intersections = {}
for jid, junction in self.junctions.items():
if not junction.connections:
continue
assert junction.poly is not None
if junction.poly.is_empty:
warn(f"skipping empty junction {jid}")
continue

# Gather all lanes involved in the junction's connections
allIncomingLanes, allOutgoingLanes = [], []
allSignals, seenSignals = [], set()
maneuversForLane = defaultdict(list)
for connection in junction.connections:
incomingID = connection.incoming_id
continue  # incoming road has no drivable lanes; skip it

connectingID = connection.connecting_id
continue  # connecting road has no drivable lanes; skip it

if signal.openDriveID not in seenSignals:
allSignals.append(signal)

# Find possible incoming lanes for this connection
incomingSection = None
remapping = self.roads[incomingID].remappedStartLanes  # could be None
assert incomingSection is None
remapping = None
assert incomingSection is not None
if remapping is None:
incomingLaneIDs = incomingSection.lanesByOpenDriveID
else:
incomingLaneIDs = {}
newIDs = incomingSection.lanesByOpenDriveID
for start, remapped in remapping.items():
if remapped in newIDs:
incomingLaneIDs[start] = newIDs[remapped]
assert len(incomingLaneIDs) == len(newIDs)

# Connect incoming lanes to connecting road
if connection.connecting_contact == "start":
connectingID
].remappedStartLanes  # could be None
else:
remapping = None
if remapping is None:
connectingLaneIDs = connectingSection.lanesByOpenDriveID
else:
connectingLaneIDs = {}
newIDs = connectingSection.lanesByOpenDriveID
for start, remapped in remapping.items():
if remapped in newIDs:
connectingLaneIDs[start] = newIDs[remapped]
assert len(connectingLaneIDs) == len(newIDs)
if not lane_links:  # all lanes connect to that with the same id
lane_links = {l: l for l in incomingLaneIDs}
for fromID, fromLane in incomingLaneIDs.items():
# (we only handle lanes in incomingLaneIDs, thus skipping non-drivable lanes)
continue  # lane not linked by this connection
toLane = connectingLaneIDs[toID]
if fromLane.lane not in allIncomingLanes:
allIncomingLanes.append(fromLane.lane)
fromLane._successor = toLane
fromLane.lane._successor = toLane.lane
toLane._predecessor = fromLane
toLane.lane._predecessor = fromLane.lane

# Collect outgoing lane and road
# TODO why is it allowed for this not to exist?
outgoingLane = toLane.lane._successor
if outgoingLane is None:
warn(
f"connecting road {connectingID} lane {toID} has no successor lane"
)
else:
if outgoingLane not in allOutgoingLanes:
allOutgoingLanes.append(outgoingLane)

# TODO future OpenDRIVE extension annotating left/right turns?
startLane=fromLane.lane,
connectingLane=toLane.lane,
endLane=outgoingLane,
intersection=None,  # will be patched once the Intersection is created
)
maneuversForLane[fromLane.lane].append(maneuver)

# Gather maneuvers
allManeuvers = []
for lane, maneuvers in maneuversForLane.items():
assert lane.maneuvers == ()
lane.maneuvers = tuple(maneuvers)
allManeuvers.extend(maneuvers)

def cyclicOrder(elements, contactStart=None):
points = []
for element in elements:
if contactStart is None:
assert old.predecessor == jid or old.successor == jid
contactStart = old.predecessor == jid
point = element.centerline[0 if contactStart else -1]
points.append(point)
centroid = sum(points, Vector(0, 0)) / len(points)
pairs = sorted(
zip(elements, points), key=lambda pair: centroid.angleTo(pair[1])
)
return tuple(elem for elem, pt in pairs)

# Create intersection
polygon=junction.poly,
name=junction.name,
uid=f"intersection{jid}",  # need prefix to prevent collisions with roads
id=jid,
incomingLanes=cyclicOrder(allIncomingLanes, contactStart=False),
outgoingLanes=cyclicOrder(allOutgoingLanes, contactStart=True),
maneuvers=tuple(allManeuvers),
signals=tuple(allSignals),
)
register(intersection)
intersections[jid] = intersection
for maneuver in allManeuvers:
object.__setattr__(maneuver, "intersection", intersection)

continue  # road does not have any drivable lanes, so we skipped it

# Gather all network elements
groups = []
intersections = tuple(intersections.values())
crossings = ()  # TODO add these
sidewalks, shoulders = [], []
for group in groups:
sidewalk = group._sidewalk
if sidewalk:
sidewalks.append(sidewalk)
shoulder = group._shoulder
if shoulder:
shoulders.append(shoulder)

# Add dummy maneuvers for lanes which merge/turn into another lane
for lane in lanes:
if not lane.maneuvers and lane._successor:
startLane=lane,
endLane=lane._successor,
)
lane.maneuvers = (maneuver,)

def combine(regions):
return PolygonalRegion.unionAll(regions, buf=self.tolerance)

elements=allElements,
laneGroups=tuple(groups),
lanes=lanes,
intersections=intersections,
crossings=crossings,
sidewalks=tuple(sidewalks),
shoulders=tuple(shoulders),
tolerance=self.tolerance,