Source code for scenic.core.vectors

"""Scenic vectors and vector fields."""

import math
from math import sin, cos
import random
import collections
import itertools
import functools

import shapely.geometry

from scenic.core.distributions import (Samplable, Distribution, MethodDistribution,
                                       needsSampling, makeOperatorHandler, distributionMethod)
from scenic.core.lazy_eval import valueInContext, needsLazyEvaluation, makeDelayedFunctionCall
import scenic.core.utils as utils
from scenic.core.geometry import normalizeAngle

[docs]class VectorDistribution(Distribution): """A distribution over Vectors.""" defaultValueType = None # will be set after Vector is defined def toVector(self): return self
[docs]class CustomVectorDistribution(VectorDistribution): """Distribution with a custom sampler given by an arbitrary function.""" def __init__(self, sampler, *dependencies, name='CustomVectorDistribution', evaluator=None): super().__init__(*dependencies) self.sampler = sampler self.name = name self.evaluator = evaluator def sampleGiven(self, value): return self.sampler(value) def evaluateInner(self, context): if self.evaluator is None: raise NotImplementedError('evaluateIn() not supported by this distribution') return self.evaluator(self, context) def __str__(self): deps = utils.argsToString(self.dependencies) return f'{self.name}{deps}'
[docs]class VectorOperatorDistribution(VectorDistribution): """Vector version of OperatorDistribution.""" def __init__(self, operator, obj, operands): super().__init__(obj, *operands) self.operator = operator self.object = obj self.operands = operands def sampleGiven(self, value): first = value[self.object] rest = (value[child] for child in self.operands) op = getattr(first, self.operator) return op(*rest) def evaluateInner(self, context): obj = valueInContext(self.object, context) operands = tuple(valueInContext(arg, context) for arg in self.operands) return VectorOperatorDistribution(self.operator, obj, operands) def __str__(self): ops = utils.argsToString(self.operands) return f'{self.object}.{self.operator}{ops}'
[docs]class VectorMethodDistribution(VectorDistribution): """Vector version of MethodDistribution.""" def __init__(self, method, obj, args, kwargs): super().__init__(*args, *kwargs.values()) self.method = method self.object = obj self.arguments = args self.kwargs = kwargs def sampleGiven(self, value): args = (value[arg] for arg in self.arguments) kwargs = { name: value[arg] for name, arg in self.kwargs.items() } return self.method(self.object, *args, **kwargs) def evaluateInner(self, context): obj = valueInContext(self.object, context) arguments = tuple(valueInContext(arg, context) for arg in self.arguments) kwargs = { name: valueInContext(arg, context) for name, arg in self.kwargs.items() } return VectorMethodDistribution(self.method, obj, arguments, kwargs) def __str__(self): args = utils.argsToString(itertools.chain(self.arguments, self.kwargs.values())) return f'{self.object}.{self.method.__name__}{args}'
[docs]def scalarOperator(method): """Decorator for vector operators that yield scalars.""" op = method.__name__ setattr(VectorDistribution, op, makeOperatorHandler(op)) @functools.wraps(method) def handler2(self, *args, **kwargs): if any(needsSampling(arg) for arg in itertools.chain(args, kwargs.values())): return MethodDistribution(method, self, args, kwargs) else: return method(self, *args, **kwargs) return handler2
def makeVectorOperatorHandler(op): def handler(self, *args): return VectorOperatorDistribution(op, self, args) return handler
[docs]def vectorOperator(method): """Decorator for vector operators that yield vectors.""" op = method.__name__ setattr(VectorDistribution, op, makeVectorOperatorHandler(op)) @functools.wraps(method) def handler2(self, *args): if needsSampling(self): return VectorOperatorDistribution(op, self, args) elif any(needsSampling(arg) for arg in args): return VectorMethodDistribution(method, self, args, {}) elif any(needsLazyEvaluation(arg) for arg in args): # see analogous comment in distributionFunction return makeDelayedFunctionCall(handler2, args, {}) else: return method(self, *args) return handler2
[docs]def vectorDistributionMethod(method): """Decorator for methods that produce vectors. See distributionMethod.""" @functools.wraps(method) def helper(self, *args, **kwargs): if any(needsSampling(arg) for arg in itertools.chain(args, kwargs.values())): return VectorMethodDistribution(method, self, args, kwargs) elif any(needsLazyEvaluation(arg) for arg in itertools.chain(args, kwargs.values())): # see analogous comment in distributionFunction return makeDelayedFunctionCall(helper, (self,) + args, kwargs) else: return method(self, *args, **kwargs) return helper
[docs]class Vector(Samplable, collections.abc.Sequence): """A 2D vector, whose coordinates can be distributions.""" def __init__(self, x, y): self.coordinates = (x, y) super().__init__(self.coordinates) @property def x(self): return self.coordinates[0] @property def y(self): return self.coordinates[1] def toVector(self): return self def sampleGiven(self, value): return Vector(*(value[coord] for coord in self.coordinates)) def evaluateInner(self, context): return Vector(*(valueInContext(coord, context) for coord in self.coordinates))
[docs] @vectorOperator def rotatedBy(self, angle): """Return a vector equal to this one rotated counterclockwise by the given angle.""" x, y = self.x, self.y c, s = cos(angle), sin(angle) return Vector((c * x) - (s * y), (s * x) + (c * y))
@vectorOperator def offsetRotated(self, heading, offset): ro = offset.rotatedBy(heading) return self + ro @vectorOperator def offsetRadially(self, radius, heading): return self.offsetRotated(heading, Vector(0, radius)) @scalarOperator def distanceTo(self, other): dx, dy = other.toVector() - self return math.hypot(dx, dy) @scalarOperator def angleTo(self, other): dx, dy = other.toVector() - self return normalizeAngle(math.atan2(dy, dx) - (math.pi / 2)) @vectorOperator def __add__(self, other): return Vector(self[0] + other[0], self[1] + other[1]) @vectorOperator def __radd__(self, other): return Vector(self[0] + other[0], self[1] + other[1]) @vectorOperator def __sub__(self, other): return Vector(self[0] - other[0], self[1] - other[1]) @vectorOperator def __rsub__(self, other): return Vector(other[0] - self[0], other[1] - self[1]) def __len__(self): return len(self.coordinates) def __getitem__(self, index): return self.coordinates[index] def __repr__(self): return f'({self.x} @ {self.y})' def __eq__(self, other): if type(other) is not Vector: return NotImplemented return other.coordinates == self.coordinates def __hash__(self): return hash(self.coordinates)
VectorDistribution.defaultValueType = Vector class OrientedVector(Vector): def __init__(self, x, y, heading): super().__init__(x, y) self.heading = heading def toHeading(self): return self.heading def __eq__(self, other): if type(other) is not OrientedVector: return NotImplemented return (other.coordinates == self.coordinates and other.heading == self.heading) def __hash__(self): return hash((self.coordinates, self.heading)) class VectorField: def __init__(self, name, value): self.name = name self.value = value self.valueType = float @distributionMethod def __getitem__(self, pos): return self.value(pos) @vectorDistributionMethod def followFrom(self, pos, dist, steps=4): step = dist / steps for i in range(steps): pos = pos.offsetRadially(step, self[pos]) return pos def __str__(self): return f'<{type(self).__name__} {self.name}>' class PolygonalVectorField(VectorField): def __init__(self, name, cells, headingFunction=None, defaultHeading=None): self.cells = tuple(cells) if headingFunction is None and defaultHeading is not None: headingFunction = lambda pos: defaultHeading self.headingFunction = headingFunction for cell, heading in self.cells: if heading is None and headingFunction is None and defaultHeading is None: raise RuntimeError(f'missing heading for cell of PolygonalVectorField') self.defaultHeading = defaultHeading super().__init__(name, self.valueAt) def valueAt(self, pos): point = shapely.geometry.Point(pos) for cell, heading in self.cells: if cell.intersects(point): return self.headingFunction(pos) if heading is None else heading if self.defaultHeading is not None: return self.defaultHeading raise RuntimeError(f'evaluated PolygonalVectorField at undefined point {pos}')