Source code for scenic.core.vectors

"""Scenic vectors and vector fields."""

from __future__ import annotations

import collections
import functools
import itertools
import math
from math import cos, sin
import numbers
import random
import struct
import typing
import warnings

import numpy
from scipy.spatial.transform import Rotation
import shapely.geometry

from scenic.core.distributions import (
    Distribution,
    MethodDistribution,
    RejectionException,
    Samplable,
    TupleDistribution,
    distributionFunction,
    distributionMethod,
    makeOperatorHandler,
    needsSampling,
)
from scenic.core.geometry import hypot, makeShapelyPoint, normalizeAngle
from scenic.core.lazy_eval import (
    isLazy,
    makeDelayedFunctionCall,
    needsLazyEvaluation,
    valueInContext,
)
from scenic.core.type_support import (
    CoercionFailure,
    canCoerceType,
    coerceToFloat,
    toOrientation,
)
from scenic.core.utils import argsToString, cached_property

# Suppress gimbal lock warning from scipy.spatial.transform.Rotation.
# N.B. due to the way the pytest works, the warning will still appear when
# running the test suite.
warnings.filterwarnings(
    "ignore",
    message="Gimbal lock detected. Setting third angle to zero",
    module="scenic.core.vectors",
)


[docs]class VectorDistribution(Distribution): """A distribution over Vectors.""" _defaultValueType = None # will be set after Vector is defined def toVector(self): return self
[docs]class VectorOperatorDistribution(VectorDistribution): """Vector version of OperatorDistribution.""" def __init__(self, operator, obj, operands): super().__init__(obj, *operands) self.operator = operator self.object = obj self.operands = operands def sampleGiven(self, value): first = value[self.object] rest = (value[child] for child in self.operands) op = getattr(first, self.operator) return op(*rest) def evaluateInner(self, context): obj = valueInContext(self.object, context) operands = tuple(valueInContext(arg, context) for arg in self.operands) return VectorOperatorDistribution(self.operator, obj, operands) def __repr__(self): return f"{self.object!r}.{self.operator}({argsToString(self.operands)})"
[docs]class VectorMethodDistribution(VectorDistribution): """Vector version of MethodDistribution.""" def __init__(self, method, obj, args, kwargs): super().__init__(*args, *kwargs.values()) self.method = method self.object = obj self.arguments = args self.kwargs = kwargs def sampleGiven(self, value): args = (value[arg] for arg in self.arguments) kwargs = {name: value[arg] for name, arg in self.kwargs.items()} return self.method(self.object, *args, **kwargs) def evaluateInner(self, context): obj = valueInContext(self.object, context) arguments = tuple(valueInContext(arg, context) for arg in self.arguments) kwargs = {name: valueInContext(arg, context) for name, arg in self.kwargs.items()} return VectorMethodDistribution(self.method, obj, arguments, kwargs) def __repr__(self): args = argsToString(self.arguments, self.kwargs) return f"{self.object!r}.{self.method.__name__}({args})"
[docs]def scalarOperator(method): """Decorator for vector operators that yield scalars.""" op = method.__name__ setattr(VectorDistribution, op, makeOperatorHandler(op, float)) @functools.wraps(method) def helper(self, *args, **kwargs): if any(needsSampling(arg) for arg in itertools.chain(args, kwargs.values())): return MethodDistribution(method, self, args, kwargs) else: return method(self, *args, **kwargs) return helper
def makeVectorOperatorHandler(op, zeroIdentity): def handler(self, *args): # If the zero vector is the identity for this operator, and we can tell # that the operand is zero, simplify the expression forest. if ( zeroIdentity and not isLazy(args[0]) and all(coord == 0 for coord in args[0].coordinates) ): return self # The general case. return VectorOperatorDistribution(op, self, args) return handler
[docs]def vectorOperator(method, preservesZero=False, zeroIdentity=False): """Decorator for vector operators that yield vectors.""" op = method.__name__ setattr(VectorDistribution, op, makeVectorOperatorHandler(op, zeroIdentity)) @functools.wraps(method) def helper(self, *args): # If this operator preserves the zero vector, and we can tell that self # is zero, simplify the expression forest. if ( preservesZero and not needsSampling(self) and all(coord == 0 for coord in self.coordinates) ): return self # General cases when the arguments are lazy. if any(needsSampling(arg) for arg in args): if needsSampling(self): return VectorOperatorDistribution(op, self, args) return VectorMethodDistribution(method, self, args, {}) elif any(needsLazyEvaluation(arg) for arg in args): # see analogous comment in distributionFunction return makeDelayedFunctionCall(helper, args, {}) # If zero is the identity, simplify when possible (see above). if ( zeroIdentity and isinstance(args[0], (Vector, tuple, list, numpy.ndarray)) and all(coord == 0 for coord in args[0]) ): return self # General case when the arguments are not lazy. if needsSampling(self): return VectorOperatorDistribution(op, self, args) else: return method(self, *args) return helper
def zeroPreservingVectorOperator(method): return vectorOperator(method, preservesZero=True) def zeroIdentityVectorOperator(method): return vectorOperator(method, zeroIdentity=True)
[docs]def vectorDistributionMethod(method): """Decorator for methods that produce vectors. See distributionMethod.""" @functools.wraps(method) def helper(self, *args, **kwargs): if any(needsSampling(arg) for arg in itertools.chain(args, kwargs.values())): return VectorMethodDistribution(method, self, args, kwargs) elif any( needsLazyEvaluation(arg) for arg in itertools.chain(args, kwargs.values()) ): # see analogous comment in distributionFunction return makeDelayedFunctionCall(helper, (self,) + args, kwargs) else: return method(self, *args, **kwargs) return helper
[docs]class Orientation: """An orientation in 3D space.""" def __init__(self, rotation): if not isinstance(rotation, Rotation): raise TypeError( "Orientation's 'rotation' parameter must be a SciPy rotation." " Perhaps you want to use a factory method?" ) self.r = rotation self.q = rotation.as_quat()
[docs] @classmethod def fromQuaternion(cls, quaternion) -> Orientation: """Create an `Orientation` from a quaternion (of the form (x,y,z,w))""" r = Rotation.from_quat(quaternion) return cls(r)
[docs] @classmethod @distributionFunction def fromEuler(cls, yaw, pitch, roll) -> Orientation: """Create an `Orientation` from yaw, pitch, and roll angles (in radians).""" return cls._fromEuler(yaw, pitch, roll)
@classmethod def _fromEuler(cls, yaw, pitch, roll) -> Orientation: # Inner version of `fromEuler` which doesn't accept distributions. r = Rotation.from_euler("ZXY", [yaw, pitch, roll], degrees=False) return cls(r) @classmethod def _fromHeading(cls, heading) -> Orientation: # This method is faster than `from_euler` if we only have 1 angle. r = Rotation.from_rotvec([0, 0, heading], degrees=False) return cls(r) @property def w(self) -> float: return self.q[3] @property def x(self) -> float: return self.q[0] @property def y(self) -> float: return self.q[1] @property def z(self) -> float: return self.q[2] @property def yaw(self) -> float: """Yaw in the global coordinate system.""" return self.eulerAngles[0] @property def pitch(self) -> float: """Pitch in the global coordinate system.""" return self.eulerAngles[1] @property def roll(self) -> float: """Roll in the global coordinate system.""" return self.eulerAngles[2] @staticmethod def _coerce(thing) -> Orientation: if isinstance(thing, (float, int)): # fast path return Orientation._fromHeading(thing) elif isinstance(thing, Orientation): return thing elif hasattr(thing, "toOrientation"): return thing.toOrientation() elif isinstance(thing, Vector): return Orientation._fromEuler(*thing) elif isinstance(thing, (tuple, list)): if len(thing) != 3: raise CoercionFailure( "Cannot coerce a tuple/list of length not 3 to an orientation" ) return Orientation._fromEuler(*thing) elif canCoerceType(type(thing), float): return Orientation._fromHeading(coerceToFloat(thing)) else: raise CoercionFailure @staticmethod def _canCoerceType(ty): if issubclass(ty, (tuple, list, Vector, Orientation)): return True return canCoerceType(ty, float) or hasattr(ty, "toOrientation") @cached_property def eulerAngles(self) -> typing.Tuple[float, float, float]: """Global intrinsic Euler angles yaw, pitch, roll.""" return self.r.as_euler("ZXY", degrees=False) def getRotation(self): return self.r @cached_property def inverse(self) -> Orientation: return Orientation(self.r.inv()) @cached_property def _inverseRotation(self): return self.r.inv() # will be converted to a distributionMethod after the class definition def __mul__(self, other) -> Orientation: if type(other) is not Orientation: return NotImplemented # Preserve existing orientation objects when possible to help pruning. if self == globalOrientation: return other if other == globalOrientation: return self return Orientation(self.r * other.r) @distributionMethod def __add__(self, other) -> Orientation: if isinstance(other, (float, int)): other = Orientation._fromHeading(other) elif type(other) is not Orientation: return NotImplemented return other * self @distributionMethod def __radd__(self, other) -> Orientation: if isinstance(other, (float, int)): other = Orientation._fromHeading(other) elif type(other) is not Orientation: return NotImplemented return self * other def __repr__(self): return f"Orientation.fromEuler{tuple(self.eulerAngles)!r}" def __hash__(self): return hash(tuple(self.q)) + hash(tuple(-self.q))
[docs] @distributionFunction def localAnglesFor(self, orientation) -> typing.Tuple[float, float, float]: """Get local Euler angles for an orientation w.r.t. this orientation. That is, considering ``self`` as the parent orientation, find the Euler angles expressing the given orientation. """ local = orientation * self.inverse return local.eulerAngles
[docs] @distributionFunction def globalToLocalAngles(self, yaw, pitch, roll) -> typing.Tuple[float, float, float]: """Convert global Euler angles to local angles w.r.t. this orientation. Equivalent to `localAnglesFor` but takes Euler angles as input. """ orientation = Orientation.fromEuler(yaw, pitch, roll) return self.localAnglesFor(orientation)
def __eq__(self, other): if not isinstance(other, Orientation): return NotImplemented return numpy.array_equal(self.q, other.q) or numpy.array_equal(self.q, -other.q) def approxEq(self, other, tol=1e-10): if not isinstance(other, Orientation): return NotImplemented return abs(numpy.dot(self.q, other.q)) > 1 - tol @classmethod def encodeTo(cls, orientation, stream): stream.write(struct.pack("<dddd", *orientation.q)) @classmethod def decodeFrom(cls, stream): # Quaternion constructor does not roundtrip so we manually # construct a rotation and pass that in. quaternion = struct.unpack("<dddd", stream.read(32)) rotation = Rotation(quaternion, normalize=False) return cls(rotation)
globalOrientation = Orientation.fromEuler(0, 0, 0) Orientation.__mul__ = distributionMethod(Orientation.__mul__, identity=globalOrientation)
[docs]def alwaysGlobalOrientation(orientation): """Whether this orientation is always aligned with the global coordinate system. Returns False if the orientation is a distribution or delayed argument, since then the value cannot be known at this time. """ return isinstance(orientation, Orientation) and orientation == globalOrientation
[docs]class Vector(Samplable, collections.abc.Sequence): """A 3D vector, whose coordinates can be distributions.""" def __init__(self, x, y, z=0): self.coordinates = (x, y, z) super().__init__(self.coordinates) @property def x(self) -> float: return self.coordinates[0] @property def y(self) -> float: return self.coordinates[1] @property def z(self) -> float: return self.coordinates[2] def toVector(self) -> Vector: return self @staticmethod def _canCoerceType(ty): return issubclass(ty, (tuple, list, numpy.ndarray)) or hasattr(ty, "toVector") @staticmethod def _coerce(thing) -> Vector: if isinstance(thing, (tuple, list, numpy.ndarray)): l = len(thing) if not 2 <= l <= 3: raise CoercionFailure( "expected 2D/3D vector, got " f"{type(thing).__name__} of length {l}" ) return Vector(*thing) else: return thing.toVector() def sampleGiven(self, value): return Vector(*(value[coord] for coord in self.coordinates)) def evaluateInner(self, context): return Vector(*(valueInContext(coord, context) for coord in self.coordinates)) @vectorOperator def applyRotation(self, rotation): if not isinstance(rotation, Orientation): return TypeError("rotation must be an Orientation") r = rotation.getRotation() return Vector(*r.apply(self.coordinates))
[docs] @vectorOperator def sphericalCoordinates(self): """Returns this vector in spherical coordinates (rho, theta, phi)""" rho = math.hypot(self.x, self.y, self.z) theta = math.atan2(self.y, self.x) - math.pi / 2 phi = math.atan2(self.z, math.hypot(self.x, self.y)) return Vector(rho, theta, phi)
[docs] @zeroPreservingVectorOperator def rotatedBy(self, angleOrOrientation) -> Vector: """Return a vector equal to this one rotated counterclockwise by angle/orientation.""" if isinstance(angleOrOrientation, Orientation): return self.applyRotation(angleOrOrientation) x, y, z = self.x, self.y, self.z c, s = cos(angleOrOrientation), sin(angleOrOrientation) return Vector((c * x) - (s * y), (s * x) + (c * y), z)
@vectorOperator def offsetRotated(self, angleOrOrientation, offset) -> Vector: ro = offset.rotatedBy(angleOrOrientation) return self + ro @vectorOperator def offsetLocally(self, orientation, offset) -> Vector: # Faster version of `offsetRotated` that only accepts Orientations. r = orientation.getRotation() ro = r.apply(offset) x, y, z = self ox, oy, oz = ro return Vector(x + ox, y + oy, z + oz) @vectorOperator def offsetRadially(self, radius, heading) -> Vector: return self.offsetRotated(heading, Vector(0, radius)) @scalarOperator def distanceTo(self, other) -> float: if not isinstance(other, Vector): return other.distanceTo(self) dx, dy, dz = other.toVector() - self return math.hypot(dx, dy, dz) @scalarOperator def angleTo(self, other) -> float: return self.azimuthTo(other) @scalarOperator def azimuthTo(self, other) -> float: dx, dy, dz = other.toVector() - self return normalizeAngle(math.atan2(dy, dx) - (math.pi / 2)) @scalarOperator def altitudeTo(self, other) -> float: dx, dy, dz = other.toVector() - self return normalizeAngle(math.atan2(dz, math.hypot(dx, dy)))
[docs] @scalarOperator def angleWith(self, other) -> float: """Compute the signed angle between self and other. The angle is positive if other is counterclockwise of self (considering the smallest possible rotation to align them). """ x, y = self.x, self.y ox, oy = other.x, other.y return normalizeAngle(math.atan2(oy, ox) - math.atan2(y, x))
@scalarOperator def norm(self) -> float: return hypot(*self.coordinates) @scalarOperator def dot(self, other) -> float: x, y, z = self.x, self.y, self.z ox, oy, oz = other.x, other.y, other.z return (x * ox) + (y * oy) + (z * oz) @vectorOperator def cross(self, other) -> Vector: ax, ay, az = self.x, self.y, self.z bx, by, ba = other.x, other.y, other.z cx = ay * bz - az * by cy = az * bx - ax * bz cz = ax * by - ay * bx return Vector(cx, cy, cz) @vectorOperator def normalized(self) -> Vector: l = math.hypot(*self.coordinates) if l == 0: return Vector(0, 0, 0) return Vector(*(coord / l for coord in self.coordinates)) @zeroIdentityVectorOperator def __add__(self, other) -> Vector: return Vector(self[0] + other[0], self[1] + other[1], self[2] + other[2]) @zeroIdentityVectorOperator def __radd__(self, other) -> Vector: return Vector(self[0] + other[0], self[1] + other[1], self[2] + other[2]) @zeroIdentityVectorOperator def __sub__(self, other) -> Vector: return Vector(self[0] - other[0], self[1] - other[1], self[2] - other[2]) @vectorOperator def __rsub__(self, other) -> Vector: return Vector(other[0] - self[0], other[1] - self[1], other[2] - self[2]) @vectorOperator def __mul__(self, other) -> Vector: return Vector(*(coord * other for coord in self.coordinates)) def __rmul__(self, other) -> Vector: return self.__mul__(other) @vectorOperator def __truediv__(self, other) -> Vector: return Vector(*(coord / other for coord in self.coordinates)) def __len__(self): return len(self.coordinates) def __getitem__(self, index): return self.coordinates[index] def __repr__(self): return f"Vector({self.x}, {self.y}, {self.z})" def __eq__(self, other): """Check Vector equality. A Vector is equal to another if their coordinates are equal, or if the other is a tuple/list that contains the coordinates of the Vector. For backwards compatibility a Vector is also equal to a tuple/list of length 2 that has a 0 z component if the Vector also has a 0 z component and the x and y components are equal. """ if isinstance(other, Vector): return other.coordinates == self.coordinates elif isinstance(other, (tuple, list)): if len(other) == 2: return self.x == other[0] and self.y == other[1] and self.z == 0 return tuple(other) == self.coordinates else: return NotImplemented def __hash__(self): return hash(self.coordinates) @classmethod def encodeTo(cls, vec, stream): stream.write(struct.pack("<ddd", *vec.coordinates)) @classmethod def decodeFrom(cls, stream): return cls(*struct.unpack("<ddd", stream.read(24)))
VectorDistribution._defaultValueType = Vector class OrientedVector(Vector): def __init__(self, x, y, z, heading): super().__init__(x, y, z) self.heading = heading @staticmethod @distributionFunction def make(position, heading) -> OrientedVector: return OrientedVector(*position, heading) def toHeading(self): return self.heading def evaluateInner(self, context): hdg = valueInContext(self.heading, context) return OrientedVector( *(valueInContext(coord, context) for coord in self.coordinates), hdg ) def __eq__(self, other): if type(other) is not OrientedVector: return NotImplemented return other.coordinates == self.coordinates and other.heading == self.heading def __hash__(self): return hash((self.coordinates, self.heading))
[docs]class VectorField: """A vector field, providing an orientation at every point. Arguments: name (str): name for debugging. value: function computing the heading at the given `Vector`. minSteps (int): Minimum number of steps for `followFrom`; default 4. defaultStepSize (float): Default step size for `followFrom`; default 5. This is an upper bound: more steps will be taken as needed to ensure that no single step is longer than this value, but if the distance to travel is small then the steps may be smaller. """ def __init__(self, name, value, minSteps=4, defaultStepSize=5): self.name = name self.value = value self.valueType = Orientation self.minSteps = minSteps self.defaultStepSize = defaultStepSize @distributionMethod def __getitem__(self, pos) -> Orientation: val = self.value(pos) if isinstance(val, numbers.Real): # fast path return Orientation._fromHeading(val) if isLazy(val): raise ValueError(f"value function of {self.name} returned lazy value") return toOrientation( val, f"value function of {self.name} returned non-orientation" )
[docs] @vectorDistributionMethod def followFrom(self, pos, dist, steps=None, stepSize=None): """Follow the field from a point for a given distance. Uses the forward Euler approximation, covering the given distance with equal-size steps. The number of steps can be given manually, or computed automatically from a desired step size. Arguments: pos (`Vector`): point to start from. dist (float): distance to travel. steps (int): number of steps to take, or :obj:`None` to compute the number of steps based on the distance (default :obj:`None`). stepSize (float): length used to compute how many steps to take, or :obj:`None` to use the field's default step size. """ if steps is None: steps = self.minSteps stepSize = self.defaultStepSize if stepSize is None else stepSize if stepSize is not None: steps = max(steps, math.ceil(dist / stepSize)) stepSize = dist / steps step = numpy.array([0, stepSize, 0]) for i in range(steps): rot = self[pos].getRotation() pos += rot.apply(step) return Vector(*pos)
[docs] @staticmethod def forUnionOf(regions, tolerance=0): """Creates a `PiecewiseVectorField` from the union of the given regions. If none of the regions have an orientation, returns :obj:`None` instead. """ if any(reg.orientation for reg in regions): return PiecewiseVectorField("Union", regions, tolerance=tolerance) else: return None
def __str__(self): return f"<{type(self).__name__} {self.name}>"
[docs]class PolygonalVectorField(VectorField): """A piecewise-constant vector field defined over polygonal cells. Arguments: name (str): name for debugging. cells: a sequence of cells, with each cell being a pair consisting of a Shapely geometry and a heading. If the heading is :obj:`None`, we call the given **headingFunction** for points in the cell instead. headingFunction: function computing the heading for points in cells without specified headings, if any (default :obj:`None`). defaultHeading: heading for points not contained in any cell (default :obj:`None`, meaning reject such points). """ def __init__(self, name, cells, headingFunction=None, defaultHeading=None): self.cells = tuple(cells) if headingFunction is None and defaultHeading is not None: headingFunction = lambda pos: defaultHeading self.headingFunction = headingFunction for cell, heading in self.cells: if heading is None and headingFunction is None and defaultHeading is None: raise RuntimeError(f"missing heading for cell of PolygonalVectorField") self.defaultHeading = defaultHeading self.rtree = shapely.STRtree([cell[0] for cell in self.cells]) super().__init__(name, self.valueAt) def valueAt(self, pos): point = makeShapelyPoint(pos) candidates = self.rtree.query(point, predicate="intersects") if len(candidates) > 0: first = candidates.min() cell, heading = self.cells[first] return self.headingFunction(pos) if heading is None else heading if self.defaultHeading is not None: return self.defaultHeading raise RejectionException(f"evaluated PolygonalVectorField at undefined point")
[docs]class PiecewiseVectorField(VectorField): """A vector field defined by patching together several regions. The heading at a point is determined by checking each region in turn to see if it has an orientation and contains the point, returning the corresponding heading if so. If we get through all the regions, and **tolerance** is nonzero, we try again, this time allowing the point to be up to **tolerance** away from each region. If we still fail to find a region "containing" the point, then we return the **defaultHeading**, if any, and otherwise reject the scene. Arguments: name (str): name for debugging. regions (sequence of `Region` objects): the regions making up the field. tolerance (float): maximum distance at which to consider a point as being in one of the regions, if it is not otherwise contained (default 0). defaultHeading (float): the heading for points not in any region with an orientation (default :obj:`None`, meaning reject such points). """ def __init__(self, name, regions, tolerance=0, defaultHeading=None): self.regions = tuple(region for region in regions if region.orientation) self.tolerance = tolerance self.defaultHeading = defaultHeading super().__init__(name, self.valueAt) def valueAt(self, point): for region in self.regions: if region.containsPoint(point): return region.orientation[point] if self.tolerance > 0: for region in self.regions: if region.distanceTo(point) <= self.tolerance: return region.orientation[point] if self.defaultHeading is not None: return self.defaultHeading raise RejectionException(f"evaluated PiecewiseVectorField at undefined point")
class PolyhedronVectorField(VectorField): pass