"""Python implementations of Scenic language constructs.
This module is automatically imported by all Scenic programs. In addition to
defining the built-in functions, operators, specifiers, etc., it also stores
global state such as the list of all created Scenic objects.
.. highlight:: scenic-grammar
"""
__all__ = (
# Primitive statements and functions
"ego",
"workspace",
"new",
"require",
"resample",
"param",
"globalParameters",
"mutate",
"verbosePrint",
"localPath",
"model",
"simulator",
"simulation",
"require_monitor",
"terminate_when",
"terminate_simulation_when",
"terminate_after",
"in_initial_scenario",
"override",
"record",
"record_initial",
"record_final",
"sin",
"cos",
"hypot",
"max",
"min",
"filter",
"str",
"float",
"int",
"round",
"len",
"range",
# Prefix operators
"Visible",
"NotVisible",
"Front",
"Back",
"Left",
"Right",
"FrontLeft",
"FrontRight",
"BackLeft",
"BackRight",
"Top",
"Bottom",
"TopFrontLeft",
"TopFrontRight",
"TopBackLeft",
"TopBackRight",
"BottomFrontLeft",
"BottomFrontRight",
"BottomBackLeft",
"BottomBackRight",
"RelativeHeading",
"ApparentHeading",
"RelativePosition",
"DistanceFrom",
"DistancePast",
"Follow",
"AngleTo",
"AngleFrom",
"AltitudeTo",
"AltitudeFrom",
# Infix operators
"FieldAt",
"RelativeTo",
"OffsetAlong",
"CanSee",
"Intersects",
"Until",
"Implies",
"VisibleFromOp",
"NotVisibleFromOp",
# Primitive types
"Vector",
"Orientation",
"VectorField",
"PolygonalVectorField",
"Shape",
"MeshShape",
"BoxShape",
"CylinderShape",
"ConeShape",
"SpheroidShape",
"MeshVolumeRegion",
"MeshSurfaceRegion",
"BoxRegion",
"SpheroidRegion",
"PathRegion",
"Region",
"PointSetRegion",
"RectangularRegion",
"CircularRegion",
"SectorRegion",
"PolygonalRegion",
"PolylineRegion",
"Workspace",
"Mutator",
"Range",
"DiscreteRange",
"Options",
"Uniform",
"Discrete",
"Normal",
"TruncatedNormal",
"VerifaiParameter",
"VerifaiRange",
"VerifaiDiscreteRange",
"VerifaiOptions",
# Constructible types
"Point",
"OrientedPoint",
"Object",
# Specifiers
"With",
"At",
"In",
"ContainedIn",
"On",
"Beyond",
"VisibleFrom",
"NotVisibleFrom",
"VisibleSpec",
"NotVisibleSpec",
"OffsetBy",
"OffsetAlongSpec",
"Facing",
"ApparentlyFacing",
"FacingToward",
"FacingDirectlyToward",
"FacingAwayFrom",
"FacingDirectlyAwayFrom",
"LeftSpec",
"RightSpec",
"Ahead",
"Behind",
"Above",
"Below",
"Following",
# Constants
"everywhere",
"nowhere",
# Exceptions
"GuardViolation",
"PreconditionViolation",
"InvariantViolation",
"RejectionException",
# Internal APIs # TODO remove?
"_scenic_default",
"Behavior",
"Monitor",
"_makeTerminationAction",
"_makeSimulationTerminationAction",
"BlockConclusion",
"runTryInterrupt",
"wrapStarredValue",
"callWithStarArgs",
"Modifier",
"DynamicScenario",
# Proposition Factories
"AtomicProposition",
"PropositionAnd",
"PropositionOr",
"PropositionNot",
"Always",
"Eventually",
"Next",
)
# various Python types and functions used in the language but defined elsewhere
from scenic.core.distributions import (
DiscreteRange,
Normal,
Options,
RandomControlFlowError,
Range,
TruncatedNormal,
Uniform,
)
from scenic.core.dynamics.behaviors import Behavior, Monitor
from scenic.core.dynamics.guards import (
GuardViolation,
InvariantViolation,
PreconditionViolation,
)
from scenic.core.dynamics.invocables import BlockConclusion, runTryInterrupt
from scenic.core.dynamics.scenarios import DynamicScenario
from scenic.core.external_params import (
VerifaiDiscreteRange,
VerifaiOptions,
VerifaiParameter,
VerifaiRange,
)
from scenic.core.geometry import cos, hypot, max, min, sin
from scenic.core.object_types import Mutator, Object, OrientedPoint, Point
from scenic.core.regions import (
BoxRegion,
CircularRegion,
MeshSurfaceRegion,
MeshVolumeRegion,
PathRegion,
PointSetRegion,
PolygonalRegion,
PolylineRegion,
RectangularRegion,
Region,
SectorRegion,
SpheroidRegion,
everywhere,
nowhere,
)
from scenic.core.shapes import (
BoxShape,
ConeShape,
CylinderShape,
MeshShape,
Shape,
SpheroidShape,
)
from scenic.core.specifiers import PropertyDefault as _scenic_default
from scenic.core.vectors import PolygonalVectorField, Vector, VectorField
from scenic.core.workspaces import Workspace
Discrete = Options
# isort: split
# everything that should not be directly accessible from the language is imported here:
import builtins
import collections.abc
from contextlib import contextmanager
import functools
import importlib
import numbers
from pathlib import Path
import sys
import traceback
import typing
from scenic.core.distributions import (
Distribution,
MultiplexerDistribution,
RejectionException,
StarredDistribution,
TupleDistribution,
canUnpackDistributions,
distributionFunction,
needsSampling,
toDistribution,
)
from scenic.core.dynamics.actions import _EndScenarioAction, _EndSimulationAction
import scenic.core.errors as errors
from scenic.core.errors import InvalidScenarioError, ScenicSyntaxError
from scenic.core.external_params import ExternalParameter
from scenic.core.geometry import apparentHeadingAtPoint, normalizeAngle
from scenic.core.lazy_eval import (
DelayedArgument,
isLazy,
needsLazyEvaluation,
requiredProperties,
valueInContext,
)
import scenic.core.object_types
from scenic.core.object_types import Constructible, Object2D, OrientedPoint2D, Point2D
import scenic.core.propositions as propositions
from scenic.core.regions import convertToFootprint
import scenic.core.requirements as requirements
from scenic.core.simulators import RejectSimulationException
from scenic.core.specifiers import ModifyingSpecifier, Specifier
from scenic.core.type_support import (
Heading,
canCoerce,
coerce,
evaluateRequiringEqualTypes,
isA,
toHeading,
toOrientation,
toScalar,
toType,
toTypes,
toVector,
underlyingType,
)
from scenic.core.vectors import Orientation, alwaysGlobalOrientation
### Internals
activity = 0
currentScenario = None
scenarioStack = []
scenarios = []
evaluatingRequirement = False
_globalParameters = {}
lockedParameters = set()
lockedModel = None
loadingModel = False
currentSimulation = None
inInitialScenario = True
runningScenarios = [] # in order, oldest first
currentBehavior = None
simulatorFactory = None
evaluatingGuard = False
mode2D = False
_originalConstructibles = (Point, OrientedPoint, Object)
BUFFERING_PITCH = 0.1
## APIs used internally by the rest of Scenic
# Scenic compilation
def isActive():
"""Are we in the middle of compiling a Scenic module?
The 'activity' global can be >1 when Scenic modules in turn import other
Scenic modules.
"""
return activity > 0
def activate(options, namespace=None):
"""Activate the veneer when beginning to compile a Scenic module."""
global activity, _globalParameters, lockedParameters, lockedModel, currentScenario
if options.paramOverrides or options.modelOverride:
assert activity == 0
_globalParameters.update(options.paramOverrides)
lockedParameters = set(options.paramOverrides)
lockedModel = options.modelOverride
# If we are in 2D mode, set the global flag and replace all classes
# with their 2D compatibility counterparts.
if options.mode2D:
global mode2D, Point, OrientedPoint, Object
assert mode2D or activity == 0
mode2D = True
Point = Point2D
OrientedPoint = OrientedPoint2D
Object = Object2D
scenic.core.object_types.Point = Point
scenic.core.object_types.OrientedPoint = OrientedPoint
scenic.core.object_types.Object = Object
activity += 1
assert not evaluatingRequirement
assert not evaluatingGuard
assert currentSimulation is None
# placeholder scenario for top-level code
newScenario = DynamicScenario._dummy(namespace)
scenarioStack.append(newScenario)
currentScenario = newScenario
def deactivate():
"""Deactivate the veneer after compiling a Scenic module."""
global activity, _globalParameters, lockedParameters, lockedModel, mode2D
global currentScenario, scenarios, scenarioStack, simulatorFactory
activity -= 1
assert activity >= 0
assert not evaluatingRequirement
assert not evaluatingGuard
assert currentSimulation is None
scenarioStack.pop()
assert len(scenarioStack) == activity
scenarios = []
if activity == 0:
lockedParameters = set()
lockedModel = None
currentScenario = None
simulatorFactory = None
_globalParameters = {}
if mode2D:
global Point, OrientedPoint, Object
mode2D = False
Point, OrientedPoint, Object = _originalConstructibles
scenic.core.object_types.Point = Point
scenic.core.object_types.OrientedPoint = OrientedPoint
scenic.core.object_types.Object = Object
else:
currentScenario = scenarioStack[-1]
# Instance/Object creation
def registerInstance(inst):
"""Add a Scenic instance to the global list of created objects.
This is called by the Point/OrientedPoint constructor.
"""
if currentScenario:
assert isinstance(inst, Constructible)
currentScenario._registerInstance(inst)
def registerObject(obj):
"""Add a Scenic object to the global list of created objects.
This is called by the Object constructor.
"""
if evaluatingRequirement:
raise InvalidScenarioError("tried to create an object inside a requirement")
elif currentBehavior is not None:
raise InvalidScenarioError("tried to create an object inside a behavior")
elif activity > 0 or currentScenario:
assert not evaluatingRequirement
assert isinstance(obj, Object)
currentScenario._registerObject(obj)
if currentSimulation:
currentSimulation._createObject(obj)
# External parameter creation
def registerExternalParameter(value):
"""Register a parameter whose value is given by an external sampler."""
if activity > 0:
assert isinstance(value, ExternalParameter)
currentScenario._externalParameters.append(value)
# Function call support
def wrapStarredValue(value, lineno):
if isinstance(value, TupleDistribution) or not needsSampling(value):
return value
elif isinstance(value, Distribution):
return [StarredDistribution(value, lineno)]
else:
raise TypeError(f"iterable unpacking cannot be applied to {value}")
def callWithStarArgs(_func_to_call, *args, **kwargs):
if not canUnpackDistributions(_func_to_call):
# wrap function to delay evaluation until starred distributions are sampled
_func_to_call = distributionFunction(_func_to_call)
return _func_to_call(*args, **kwargs)
# Simulations
def instantiateSimulator(factory, params):
global _globalParameters
assert not _globalParameters # TODO improve hack?
_globalParameters = dict(params)
try:
return factory()
finally:
_globalParameters = {}
def beginSimulation(sim):
global currentSimulation, currentScenario, inInitialScenario, runningScenarios
global _globalParameters
if isActive():
raise RuntimeError("tried to start simulation during Scenic compilation!")
assert currentSimulation is None
assert currentScenario is None
assert not scenarioStack
currentSimulation = sim
currentScenario = sim.scene.dynamicScenario
runningScenarios = [] # will be updated by DynamicScenario._start
inInitialScenario = currentScenario._setup is None
currentScenario._bindTo(sim.scene)
_globalParameters = dict(sim.scene.params)
# rebind globals that could be referenced by behaviors to their sampled values
for modName, (
namespace,
sampledNS,
originalNS,
) in sim.scene.behaviorNamespaces.items():
namespace.clear()
namespace.update(sampledNS)
def endSimulation(sim):
global currentSimulation, currentScenario, currentBehavior, runningScenarios
global _globalParameters
currentSimulation = None
currentScenario = None
runningScenarios = []
currentBehavior = None
_globalParameters = {}
for modName, (
namespace,
sampledNS,
originalNS,
) in sim.scene.behaviorNamespaces.items():
namespace.clear()
namespace.update(originalNS)
def simulationInProgress():
return currentSimulation is not None
# Requirements
@contextmanager
def executeInRequirement(scenario, boundEgo, values):
global evaluatingRequirement, currentScenario
assert activity == 0
assert not evaluatingRequirement
evaluatingRequirement = True
if currentScenario is None:
currentScenario = scenario
clearScenario = True
else:
assert currentScenario is scenario
clearScenario = False
oldEgo = currentScenario._ego
oldObjects = currentScenario._objects
currentScenario._objects = tuple(values[obj] for obj in currentScenario.objects)
if boundEgo:
currentScenario._ego = boundEgo
try:
yield
except RandomControlFlowError as e:
# Such errors should not be possible inside a requirement, since all values
# should have already been sampled: something's gone wrong with our rebinding.
raise RuntimeError("internal error: requirement dependency not sampled") from e
finally:
evaluatingRequirement = False
currentScenario._ego = oldEgo
currentScenario._objects = oldObjects
if clearScenario:
currentScenario = None
# Dynamic scenarios
def registerDynamicScenarioClass(cls):
scenarios.append(cls)
@contextmanager
def executeInScenario(scenario, inheritEgo=False):
global currentScenario
oldScenario = currentScenario
if inheritEgo and oldScenario is not None:
scenario._ego = oldScenario._ego # inherit ego from parent
currentScenario = scenario
try:
yield
except AttributeError as e:
# Convert confusing AttributeErrors from trying to access nonexistent scenario
# variables into NameErrors, which is what the user would expect. The information
# needed to do this was made available in Python 3.10, but unfortunately could be
# wrong until 3.10.3: see bpo-46940.
if sys.version_info >= (3, 10, 3) and isinstance(e.obj, DynamicScenario):
newExc = NameError(f"name '{e.name}' is not defined", name=e.name)
raise newExc.with_traceback(e.__traceback__)
else:
raise
finally:
currentScenario = oldScenario
def prepareScenario(scenario):
if currentSimulation:
verbosePrint(f"Starting scenario {scenario}", level=3)
def finishScenarioSetup(scenario):
global inInitialScenario
inInitialScenario = False
def startScenario(scenario):
assert scenario not in runningScenarios
runningScenarios.append(scenario)
def endScenario(scenario, reason, quiet=False):
runningScenarios.remove(scenario)
if not quiet:
verbosePrint(f"Stopping scenario {scenario} because: {reason}", level=3)
# Dynamic behaviors
@contextmanager
def executeInBehavior(behavior):
global currentBehavior
oldBehavior = currentBehavior
currentBehavior = behavior
try:
yield
except AttributeError as e:
# See comment for corresponding code in executeInScenario
if sys.version_info >= (3, 10, 3) and isinstance(e.obj, Behavior):
newExc = NameError(f"name '{e.name}' is not defined", name=e.name)
raise newExc.with_traceback(e.__traceback__)
else:
raise
finally:
currentBehavior = oldBehavior
@contextmanager
def executeInGuard():
global evaluatingGuard
assert not evaluatingGuard
evaluatingGuard = True
try:
yield
finally:
evaluatingGuard = False
def _makeTerminationAction(agent, line):
assert activity == 0
if agent:
scenario = agent._parentScenario()
assert scenario is not None
else:
scenario = None
return _EndScenarioAction(scenario, line)
def _makeSimulationTerminationAction(line):
assert activity == 0
return _EndSimulationAction(line)
### Parsing support
[docs]class Modifier(typing.NamedTuple):
name: str
value: typing.Any
terminator: typing.Optional[str] = None
### Primitive statements and functions
def new(cls, specifiers):
if not (isinstance(cls, type) and issubclass(cls, Constructible)):
raise TypeError(f'"{cls.__name__}" is not a Scenic class')
return cls._withSpecifiers(specifiers)
[docs]def ego(obj=None):
"""Function implementing loads and stores to the 'ego' pseudo-variable.
The translator calls this with no arguments for loads, and with the source
value for stores.
"""
egoObject = currentScenario._ego
if obj is None:
if egoObject is None:
raise InvalidScenarioError("referred to ego object not yet assigned")
elif not isinstance(obj, Object):
if isinstance(obj, type) and issubclass(obj, Object):
suffix = " (perhaps you forgot 'new'?)"
else:
suffix = ""
ty = type(obj).__name__
raise TypeError(f"tried to make non-object (of type {ty}) the ego object{suffix}")
else:
currentScenario._ego = obj
for scenario in runningScenarios:
if scenario._ego is None:
scenario._ego = obj
return egoObject
[docs]def workspace(workspace=None):
"""Function implementing loads and stores to the 'workspace' pseudo-variable.
See `ego`.
"""
if workspace is None:
if currentScenario._workspace is None:
raise InvalidScenarioError("referred to workspace not yet assigned")
elif not isinstance(workspace, Workspace):
raise TypeError(f"workspace {workspace} is not a Workspace")
elif needsSampling(workspace):
raise InvalidScenarioError("workspace must be a fixed region")
elif needsLazyEvaluation(workspace):
raise InvalidScenarioError(
"workspace uses value undefined " "outside of object definition"
)
else:
currentScenario._workspace = workspace
return currentScenario._workspace
[docs]def require(reqID, req, line, name, prob=1):
"""Function implementing the require statement."""
if not name:
name = f"requirement on line {line}"
if evaluatingRequirement:
raise InvalidScenarioError("tried to create a requirement inside a requirement")
if req.has_temporal_operator and prob != 1:
raise InvalidScenarioError(
"requirements with temporal operators must have probability of 1"
)
if currentSimulation is not None: # requirement being evaluated at runtime
if req.has_temporal_operator:
# support monitors on dynamic requirements and create dynamic requirements
currentScenario._addDynamicRequirement(
requirements.RequirementType.require, req, line, name
)
else:
if prob >= 1 or Range(0, 1) <= prob: # use Range so value can be recorded
result = req.evaluate()
assert not needsSampling(result)
if needsLazyEvaluation(result):
raise InvalidScenarioError(
f"requirement on line {line} uses value"
" undefined outside of object definition"
)
if not result:
raise RejectSimulationException(name)
else: # requirement being defined at compile time
currentScenario._addRequirement(
requirements.RequirementType.require, reqID, req, line, name, prob
)
def require_monitor(reqID, value, line, name):
if not name:
name = f"requirement on line {line}"
if currentSimulation is not None:
monitor = value.evaluate()
assert not needsSampling(monitor)
if needsLazyEvaluation(monitor):
raise InvalidScenarioError(
f"requirement on line {line} uses value"
" undefined outside of object definition"
)
if not isinstance(monitor, Monitor):
raise TypeError(f'"require monitor X" with X not a monitor on line {line}')
currentScenario._addMonitor(monitor)
else:
currentScenario._addRequirement(
requirements.RequirementType.monitor, reqID, value, line, name, 1
)
def record(reqID, value, line, name):
if not name:
name = f"record{line}"
makeRequirement(requirements.RequirementType.record, reqID, value, line, name)
def record_initial(reqID, value, line, name):
if not name:
name = f"record{line}"
makeRequirement(requirements.RequirementType.recordInitial, reqID, value, line, name)
def record_final(reqID, value, line, name):
if not name:
name = f"record{line}"
makeRequirement(requirements.RequirementType.recordFinal, reqID, value, line, name)
def require_always(reqID, req, line, name):
"""Function implementing the 'require always' statement."""
if not name:
name = f"requirement on line {line}"
makeRequirement(requirements.RequirementType.requireAlways, reqID, req, line, name)
def require_eventually(reqID, req, line, name):
"""Function implementing the 'require eventually' statement."""
if not name:
name = f"requirement on line {line}"
makeRequirement(
requirements.RequirementType.requireEventually, reqID, req, line, name
)
[docs]def terminate_when(reqID, req, line, name):
"""Function implementing the 'terminate when' statement."""
if not name:
name = f"termination condition on line {line}"
makeRequirement(requirements.RequirementType.terminateWhen, reqID, req, line, name)
[docs]def terminate_simulation_when(reqID, req, line, name):
"""Function implementing the 'terminate simulation when' statement."""
if not name:
name = f"termination condition on line {line}"
makeRequirement(
requirements.RequirementType.terminateSimulationWhen, reqID, req, line, name
)
def makeRequirement(ty, reqID, req, line, name):
if evaluatingRequirement:
raise InvalidScenarioError(f'tried to use "{ty.value}" inside a requirement')
elif currentBehavior is not None:
raise InvalidScenarioError(f'"{ty.value}" inside a behavior on line {line}')
elif currentSimulation is not None:
currentScenario._addDynamicRequirement(ty, req, line, name)
else: # requirement being defined at compile time
currentScenario._addRequirement(ty, reqID, req, line, name, 1)
def terminate_after(timeLimit, terminator=None):
if not isinstance(timeLimit, (builtins.float, builtins.int)):
raise TypeError('"terminate after N" with N not a number')
assert terminator in (None, "seconds", "steps")
inSeconds = terminator != "steps"
currentScenario._setTimeLimit(timeLimit, inSeconds=inSeconds)
[docs]def resample(dist):
"""The built-in resample function."""
if not isinstance(dist, Distribution):
return dist
try:
return dist.clone()
except NotImplementedError:
raise TypeError("cannot resample non-primitive distribution") from None
[docs]def verbosePrint(
*objects, level=1, indent=True, sep=" ", end="\n", file=sys.stdout, flush=False
):
"""Built-in function printing a message only in verbose mode.
Scenic's verbosity may be set using the :option:`-v` command-line option.
The simplest way to use this function is with code like
:scenic:`verbosePrint('hello world!')` or :scenic:`verbosePrint('details here', level=3)`;
the other keyword arguments are probably only useful when replacing more complex uses
of the Python `print` function.
Args:
objects: Object(s) to print (`str` will be called to make them strings).
level (int): Minimum verbosity level at which to print. Default is 1.
indent (bool): Whether to indent the message to align with messages generated by
Scenic (default true).
sep, end, file, flush: As in `print`.
"""
if errors.verbosityLevel >= level:
if indent:
if currentSimulation:
indent = " " if errors.verbosityLevel >= 3 else " "
else:
indent = " " * activity if errors.verbosityLevel >= 2 else " "
print(indent, end="", file=file)
print(*objects, sep=sep, end=end, file=file, flush=flush)
[docs]def localPath(relpath):
"""Convert a path relative to the calling Scenic file into an absolute path.
For example, :scenic:`localPath('resource.dat')` evaluates to the absolute path
of a file called ``resource.dat`` located in the same directory as the
Scenic file where this expression appears. Note that the path is returned as a
`pathlib.Path` object.
"""
filename = traceback.extract_stack(limit=2)[0].filename
base = Path(filename).parent
return base.joinpath(relpath).resolve()
[docs]def simulation():
"""Get the currently-running `Simulation`.
May only be called from code that runs at simulation time, e.g. inside
:term:`dynamic behaviors` and :keyword:`compose` blocks of scenarios.
"""
if isActive():
raise InvalidScenarioError("used simulation() outside a behavior")
assert currentSimulation is not None
return currentSimulation
def simulator(sim):
global simulatorFactory
simulatorFactory = sim
def in_initial_scenario():
return inInitialScenario
def override(*args):
if len(args) < 1:
raise TypeError('"override" missing an object')
elif len(args) < 2:
raise TypeError('"override" missing a list of specifiers')
obj = args[0]
if not isinstance(obj, Object):
raise TypeError(f'"override" passed non-Object {obj}')
specs = args[1:]
for spec in specs:
assert isinstance(spec, Specifier), spec
currentScenario._override(obj, specs)
def model(namespace, modelName):
global loadingModel
if loadingModel:
raise InvalidScenarioError('Scenic world model itself uses the "model" statement')
if lockedModel is not None:
modelName = lockedModel
try:
loadingModel = True
module = importlib.import_module(modelName)
except ModuleNotFoundError as e:
if e.name == modelName:
raise InvalidScenarioError(
f"could not import world model {modelName}"
) from None
else:
raise
finally:
loadingModel = False
names = module.__dict__.get("__all__", None)
if names is not None:
for name in names:
namespace[name] = getattr(module, name)
else:
for name, value in module.__dict__.items():
if not name.startswith("_"):
namespace[name] = value
[docs]def param(params):
"""Function implementing the param statement."""
global loadingModel
if evaluatingRequirement:
raise InvalidScenarioError(
"tried to create a global parameter inside a requirement"
)
elif currentSimulation is not None:
raise InvalidScenarioError(
"tried to create a global parameter during a simulation"
)
for name, value in params.items():
if name not in lockedParameters and (
not loadingModel or name not in _globalParameters
):
_globalParameters[name] = toDistribution(value)
class ParameterTableProxy(collections.abc.Mapping):
def __init__(self, map):
object.__setattr__(self, "_internal_map", map)
def __getitem__(self, name):
return self._internal_map[name]
def __iter__(self):
return iter(self._internal_map)
def __len__(self):
return len(self._internal_map)
def __getattr__(self, name):
return self.__getitem__(name) # allow namedtuple-like access
def __setattr__(self, name, value):
raise InvalidScenarioError(
'cannot modify globalParameters (use "param" statement)'
)
def _clone_table(self):
return ParameterTableProxy(self._internal_map.copy())
def globalParameters():
return ParameterTableProxy(_globalParameters)
[docs]def mutate(*objects, scale=1):
"""Function implementing the mutate statement."""
if evaluatingRequirement:
raise InvalidScenarioError("used mutate statement inside a requirement")
if len(objects) == 0:
objects = currentScenario._objects
if not isinstance(scale, (builtins.int, builtins.float)):
raise TypeError('"mutate X by Y" with Y not a number')
for obj in objects:
if not isinstance(obj, Object):
raise TypeError('"mutate X" with X not an object')
obj.mutationScale = scale
# Object will now require sampling even if it has no explicit dependencies.
obj._needsSampling = True
obj._isLazy = True
### Prefix operators
[docs]def Visible(region):
"""The :grammar:`visible <region>` operator."""
region = toType(region, Region, '"visible X" with X not a Region')
return region.intersect(ego().visibleRegion)
[docs]def NotVisible(region):
"""The :grammar:`not visible <region>` operator."""
region = toType(region, Region, '"not visible X" with X not a Region')
return region.difference(ego().visibleRegion)
# front of <object>, etc.
ops = (
"front",
"back",
"left",
"right",
"front left",
"front right",
"back left",
"back right",
"top",
"bottom",
"top front left",
"top front right",
"top back left",
"top back right",
"bottom front left",
"bottom front right",
"bottom back left",
"bottom back right",
)
template = '''\
def {function}(X):
"""The :grammar:`{syntax} of <object>` operator."""
if not isinstance(X, Object):
raise TypeError('"{syntax} of X" with X not an Object')
return X.{property}
'''
for op in ops:
func = "".join(word.capitalize() for word in op.split(" "))
prop = func[0].lower() + func[1:]
definition = template.format(function=func, syntax=op, property=prop)
exec(definition)
### Infix operators
[docs]def FieldAt(X, Y):
"""The :grammar:`<vector field> at <vector>` operator."""
if isinstance(X, type) and issubclass(X, Constructible):
raise TypeError('"X at Y" with X not a vector field. (Perhaps you forgot "new"?)')
if not isA(X, VectorField):
raise TypeError('"X at Y" with X not a vector field')
Y = toVector(Y, '"X at Y" with Y not a vector')
return X[Y]
[docs]def RelativeTo(X, Y) -> typing.Union[Vector, builtins.float, Orientation]:
"""The :scenic:`{X} relative to {Y}` polymorphic operator.
Allowed forms::
<value> relative to <value> # with at least one a field, the other a field or heading
<vector> relative to <oriented point> # and vice versa
<vector> relative to <vector>
<heading> relative to <heading>
<orientation> relative to <orientation>
"""
# Define lazy RelativeTo helper
@distributionFunction
def lazyRelativeTo(X, Y) -> typing.Union[Vector, builtins.float, Orientation]:
return RelativeTo(X, Y)
# Define type helpers
def knownOrientation(thing):
return isA(thing, Orientation) or (
(not isLazy(thing))
and canCoerce(thing, Orientation)
and (not canCoerce(thing, Vector))
)
def knownHeading(thing):
return isA(thing, numbers.Real) or (
(not isLazy(thing)) and canCoerce(thing, Heading)
)
def knownVector(thing):
return isA(thing, Vector) or ((not isLazy(thing)) and canCoerce(thing, Vector))
xf, yf = isA(X, VectorField), isA(Y, VectorField)
if xf or yf:
if xf and yf and X.valueType != Y.valueType:
raise TypeError('"X relative to Y" with X, Y fields of different types')
fieldType = X.valueType if xf else Y.valueType
error = '"X relative to Y" with field and value of different types'
def helper(context):
pos = context.position.toVector()
xp = X[pos] if xf else toType(X, fieldType, error)
yp = Y[pos] if yf else toType(Y, fieldType, error)
return yp + xp
return DelayedArgument({"position"}, helper)
elif isA(X, OrientedPoint) or isA(Y, OrientedPoint):
# Ensure X and Y aren't both oriented points
if isA(X, OrientedPoint) and isA(Y, OrientedPoint):
raise TypeError('"X relative to Y" with X, Y both oriented points')
# Extract the single oriented point and the other value
if isA(X, OrientedPoint):
op = X
other = Y
else:
op = Y
other = X
# Check the other value's type
if isA(other, numbers.Real):
return op.heading + toHeading(other)
elif isA(other, Orientation):
return toOrientation(Y) * toOrientation(X)
elif knownVector(other):
other = toVector(other)
return op.relativize(other)
# This case doesn't match (for now at least). Fall through.
pass
elif knownOrientation(X) and knownOrientation(Y):
xf = toOrientation(X)
yf = toOrientation(Y)
return yf * xf
elif knownHeading(X) and knownHeading(Y):
xf = toHeading(X, f'"X relative to Y" with Y a heading but X a {type(X)}')
yf = toHeading(Y, f'"X relative to Y" with X a heading but Y a {type(Y)}')
return xf + yf
elif knownVector(X) or knownVector(Y):
xf = toVector(X, f'"X relative to Y" with Y a vector but X a {type(X)}')
yf = toVector(Y, f'"X relative to Y" with X a vector but Y a {type(Y)}')
return xf + yf
if isLazy(X) or isLazy(Y):
# We can't determine what case to use at this point. Try again when things are sampled.
return lazyRelativeTo(X, Y)
raise TypeError(
f'"X relative to Y" with X and Y incompatible types (X a {type(X)}, Y a {type(Y)})'
)
[docs]def OffsetAlong(X, H, Y):
"""The :scenic:`{X} offset along {H} by {Y}` polymorphic operator.
Allowed forms::
<vector> offset along <heading> by <vector>
<vector> offset along <field> by <vector>
"""
X = toVector(X, '"X offset along H by Y" with X not a vector')
Y = toVector(Y, '"X offset along H by Y" with Y not a vector')
if isA(H, VectorField):
H = H[X]
H = toOrientation(
H, '"X offset along H by Y" with H not an orientation or vector field'
)
return X.offsetLocally(H, Y)
[docs]def RelativePosition(X, Y=None):
"""The :grammar:`relative position of <vector> [from <vector>]` operator.
If the :grammar:`from <vector>` is omitted, the position of ego is used.
"""
X = toVector(X, '"relative position of X from Y" with X not a vector')
if Y is None:
Y = ego()
Y = toVector(Y, '"relative position of X from Y" with Y not a vector')
return X - Y
[docs]def RelativeHeading(X, Y=None):
"""The :grammar:`relative heading of <heading> [from <heading>]` operator.
If the :grammar:`from <heading>` is omitted, the heading of ego is used.
"""
X = toOrientation(
X, '"relative heading of X from Y" with X not a heading or orientation'
)
if Y is None:
Y = ego().orientation
else:
Y = toOrientation(Y, '"relative heading of X from Y" with Y not a heading')
return normalizeAngle(X.yaw - Y.yaw)
[docs]def ApparentHeading(X, Y=None):
"""The :grammar:`apparent heading of <oriented point> [from <vector>]` operator.
If the :grammar:`from <vector>` is omitted, the position of ego is used.
"""
if not isA(X, OrientedPoint):
raise TypeError('"apparent heading of X from Y" with X not an OrientedPoint')
if Y is None:
Y = ego()
Y = toVector(Y, '"relative heading of X from Y" with Y not a vector')
return apparentHeadingAtPoint(X.position, X.heading, Y)
[docs]def DistanceFrom(X, Y=None):
"""The :scenic:`distance from {X} to {Y}` polymorphic operator.
Allowed forms::
distance from <vector> [to <vector>]
distance from <region> [to <vector>]
distance from <vector> to <region>
If the :grammar:`to <vector>` is omitted, the position of ego is used.
"""
X = toTypes(
X, (Vector, Region), '"distance from X to Y" with X neither a vector nor region'
)
if Y is None:
Y = ego()
Y = toTypes(
Y, (Vector, Region), '"distance from X to Y" with Y neither a vector nor region'
)
return X.distanceTo(Y)
[docs]def DistancePast(X, Y=None):
"""The :grammar:`distance past <vector> of <oriented point>` operator.
If the :grammar:`of {oriented point}` is omitted, the ego object is used.
"""
X = toVector(X, '"distance past X" with X not a vector')
if Y is None:
Y = ego()
Y = toType(Y, OrientedPoint, '"distance past X of Y" with Y not an OrientedPoint')
return Y.distancePast(X)
# TODO(shun): Migrate to `AngleFrom`
[docs]def AngleTo(X):
"""The :grammar:`angle to <vector>` operator (using the position of ego as the reference)."""
X = toVector(X, '"angle to X" with X not a vector')
return ego().angleTo(X)
[docs]def AngleFrom(X=None, Y=None):
"""The :grammar:`angle from <vector> to <vector>` operator."""
assert X is not None or Y is not None
if X is None:
X = ego()
X = toVector(X, '"angle from X to Y" with X not a vector')
if Y is None:
Y = ego()
Y = toVector(Y, '"angle from X to Y" with Y not a vector')
return X.angleTo(Y)
[docs]def AltitudeTo(X):
"""The :grammar:`angle to <vector>` operator (using the position of ego as the reference)."""
X = toVector(X, '"altitude to X" with X not a vector')
return ego().altitudeTo(X)
[docs]def AltitudeFrom(X=None, Y=None):
"""The :grammar:`altitude from <vector> to <vector>` operator."""
assert X is not None or Y is not None
if X is None:
X = ego()
X = toVector(X, '"altitude from X to Y" with X not a vector')
if Y is None:
Y = ego()
Y = toVector(Y, '"altitude from X to Y" with Y not a vector')
return X.altitudeTo(Y)
[docs]def Follow(F, X, D):
"""The :grammar:`follow <field> from <vector> for <number>` operator."""
if not isA(F, VectorField):
raise TypeError('"follow F from X for D" with F not a vector field')
X = toVector(X, '"follow F from X for D" with X not a vector')
D = toScalar(D, '"follow F from X for D" with D not a number')
pos = F.followFrom(X, D)
orientation = F[pos]
return OrientedPoint._with(position=pos, parentOrientation=orientation)
[docs]def VisibleFromOp(region, base):
"""The :grammar:`<region> visible from <point>` operator."""
region = toType(region, Region, '"X visible from Y" with X not a Region')
if not isA(base, Point):
raise TypeError('"X visible from Y" with Y not a Point')
return region.intersect(base.visibleRegion)
[docs]def NotVisibleFromOp(region, base):
"""The :grammar:`<region> not visible from <point>` operator."""
region = toType(region, Region, '"X visible from Y" with X not a Region')
if not isA(base, Point):
raise TypeError('"X not visible from Y" with Y not a Point')
return region.difference(base.visibleRegion)
[docs]def CanSee(X, Y):
"""The :scenic:`{X} can see {Y}` polymorphic operator.
Allowed forms::
<point> can see <vector>
<point> can see <point>
"""
if isActive():
raise InvalidScenarioError(
'"can see" operator prohibited at top level of Scenic programs'
)
if not isA(X, Point):
raise TypeError('"X can see Y" with X not a Point, OrientedPoint, or Object')
if not canCoerce(Y, Vector):
raise TypeError('"X can see Y" with Y not a Vector, Point, or Object')
objects = toDistribution(currentScenario._objects)
@distributionFunction
def canSeeHelper(X, Y, objects):
if not isA(Y, Point):
Y = toVector(
Y, '"X can see Y" with X not a Vector, Point, OrientedPoint, or Object'
)
occludingObjects = tuple(
obj for obj in objects if obj.occluding and X is not obj and Y is not obj
)
return X.canSee(Y, occludingObjects=occludingObjects)
return canSeeHelper(X, Y, objects)
[docs]@distributionFunction
def Intersects(X, Y):
"""The :scenic:`{X} intersects {Y}` operator."""
if isA(X, Object):
return X.intersects(Y)
else:
return Y.intersects(X)
### Specifiers
[docs]def With(prop, val):
"""The :grammar:`with <property> <value>` specifier.
Specifies the given property, with no dependencies.
"""
return Specifier(f"With({prop})", {prop: 1}, {prop: val})
[docs]def At(pos):
"""The :grammar:`at <vector>` specifier.
Specifies :prop:`position`, with no dependencies.
"""
pos = toVector(pos, 'specifier "at X" with X not a vector')
return Specifier("At", {"position": 1}, {"position": pos})
[docs]def In(region):
"""The :grammar:`in <region>` specifier.
Specifies :prop:`position`, and optionally, :prop:`parentOrientation` if the given region
has a preferred orientation, with no dependencies.
"""
region = toType(region, Region, 'specifier "in R" with R not a Region')
pos = Region.uniformPointIn(region)
props = {"position": 1}
values = {"position": pos}
if alwaysProvidesOrientation(region):
props["parentOrientation"] = 3
values["parentOrientation"] = region.orientation[pos]
return Specifier("In", props, values)
[docs]def ContainedIn(region):
"""The :grammar:`contained in <region>` specifier.
Specifies :prop:`position`, :prop:`regionContainedIn`, and optionally, :prop:`parentOrientation`
if the given region has a preferred orientation, with no dependencies.
"""
region = toType(region, Region, 'specifier "contained in R" with R not a Region')
pos = Region.uniformPointIn(region)
props = {"position": 1, "regionContainedIn": 1}
values = {"position": pos, "regionContainedIn": region}
if alwaysProvidesOrientation(region):
props["parentOrientation"] = 3
values["parentOrientation"] = region.orientation[pos]
return Specifier("ContainedIn", props, values)
[docs]def On(thing):
"""The :specifier:`on {X}` specifier.
Specifies :prop:`position`, and optionally, :prop:`parentOrientation` if the given region
has a preferred orientation. Depends on :prop:`onDirection`, :prop:`baseOffset`,
and :prop:`contactTolerance`.
Note that while :specifier:`on` can be used with `Region`, `Object` and `Vector`,
it cannot be used with a distribution containing anything other than `Region`.
May be used to modify an already-specified :prop:`position` property.
Allowed forms:
on <region>
on <object>
on <vector>
"""
if isA(thing, Object):
# Target is an Object: use its onSurface.
target = thing.onSurface
elif canCoerce(thing, Vector, exact=True):
# Target is a vector
target = toVector(thing)
elif canCoerce(thing, Region):
# Target is a region (or could theoretically be coerced to one),
# so we can use it as a target.
target = toType(thing, Region)
else:
raise TypeError('specifier "on R" with R not a Region, Object, or Vector')
props = {"position": 1}
if isA(target, Region) and alwaysProvidesOrientation(target):
props["parentOrientation"] = 2
def helper(context):
# Pick position based on whether we are specifying or modifying
if hasattr(context, "position"):
if isA(target, Vector):
raise TypeError('Cannot use modifying "on V" with V a vector.')
pos = projectVectorHelper(target, context.position, context.onDirection)
elif isA(target, Vector):
pos = target
else:
pos = Region.uniformPointIn(target)
values = {}
contactOffset = Vector(0, 0, context.contactTolerance / 2) - context.baseOffset
if "parentOrientation" in props:
values["parentOrientation"] = target.orientation[pos]
contactOffset = contactOffset.rotatedBy(values["parentOrientation"])
values["position"] = pos + contactOffset
return values
return ModifyingSpecifier(
"On",
props,
DelayedArgument({"onDirection", "baseOffset", "contactTolerance"}, helper),
modifiable_props={"position"},
)
@distributionFunction
def projectVectorHelper(region, pos, onDirection):
on_pos = region.projectVector(pos, onDirection=onDirection)
if on_pos is None:
raise RejectionException("Unable to place object on surface.")
else:
return on_pos
def alwaysProvidesOrientation(region):
"""Whether a Region or distribution over Regions always provides an orientation."""
if isinstance(region, Region):
return region.orientation is not None
elif isinstance(region, MultiplexerDistribution) and all(
alwaysProvidesOrientation(opt) for opt in region.options
):
return True
else: # TODO improve somehow!
try:
sample = region.sample()
return sample.orientation is not None or sample is nowhere
except RejectionException:
return False
[docs]def OffsetBy(offset):
"""The :grammar:`offset by <vector>` specifier.
Specifies :prop:`position`, and optionally :prop:`parentOrientation`, with no dependencies.
"""
offset = toVector(offset, 'specifier "offset by X" with X not a vector')
value = {
"position": RelativeTo(offset, ego()).toVector(),
"parentOrientation": ego().orientation,
}
return Specifier("OffsetBy", {"position": 1, "parentOrientation": 3}, value)
[docs]def OffsetAlongSpec(direction, offset):
"""The :specifier:`offset along {X} by {Y}` polymorphic specifier.
Specifies :prop:`position`, and optionally :prop:`parentOrientation`, with no dependencies.
Allowed forms::
offset along <heading> by <vector>
offset along <field> by <vector>
"""
pos = OffsetAlong(ego(), direction, offset)
parentOrientation = ego().orientation
return Specifier(
"OffsetAlong",
{"position": 1, "parentOrientation": 3},
{"position": pos, "parentOrientation": parentOrientation},
)
[docs]def Beyond(pos, offset, fromPt=None):
"""The :specifier:`beyond {X} by {Y} from {Z}` polymorphic specifier.
Specifies :prop:`position`, and optionally :prop:`parentOrientation`, with no dependencies.
Allowed forms::
beyond <vector> by <number> [from <vector>]
beyond <vector> by <vector> [from <vector>]
If the :grammar:`from <vector>` is omitted, the position of ego is used.
"""
# Ensure X can be coerced into vector form
pos = toVector(pos, 'specifier "beyond X by Y" with X not a vector')
# If no from vector is specified, assume ego
if fromPt is None:
fromPt = ego()
fromPt = toVector(fromPt, 'specifier "beyond X by Y from Z" with Z not a vector')
dType = underlyingType(offset)
if dType is builtins.float or dType is builtins.int:
offset = Vector(0, offset, 0)
else:
# offset is not float or int, so try to coerce it into vector form.
offset = toVector(
offset, 'specifier "beyond X by Y" with X not a number or vector'
)
# If the from vector is oriented, set that to orientation. Else assume global coords.
if isA(fromPt, OrientedPoint):
orientation = fromPt.orientation
else:
orientation = Orientation.fromEuler(0, 0, 0)
direction = pos - fromPt
sphericalCoords = direction.sphericalCoordinates()
offsetRotation = Orientation.fromEuler(sphericalCoords[1], sphericalCoords[2], 0)
new_direction = pos + offset.applyRotation(offsetRotation)
return Specifier(
"Beyond",
{"position": 1, "parentOrientation": 3},
{"position": new_direction, "parentOrientation": orientation},
)
[docs]def VisibleFrom(base):
"""The :grammar:`visible from <point>` specifier.
Specifies :prop:`_observingEntity` and :prop:`position`, with no dependencies.
"""
if not isA(base, Point):
raise TypeError('specifier "visible from O" with O not a Point')
def helper(self):
if mode2D:
position = Region.uniformPointIn(base.visibleRegion)
else:
containing_region = (
currentScenario._workspace.region
if self.regionContainedIn is None
and currentScenario._workspace is not None
else self.regionContainedIn
)
position = (
Region.uniformPointIn(everywhere, tag="visible")
if containing_region is None
else Region.uniformPointIn(containing_region)
)
return {"position": position, "_observingEntity": base}
return Specifier(
"Visible/VisibleFrom",
{"position": 3, "_observingEntity": 1},
DelayedArgument({"regionContainedIn"}, helper),
)
[docs]def VisibleSpec():
"""The :specifier:`visible` specifier (equivalent to :specifier:`visible from ego`).
Specifies :prop:`_observingEntity` and :prop:`position`, with no dependencies.
"""
return VisibleFrom(ego())
[docs]def NotVisibleFrom(base):
"""The :grammar:`not visible from <point>` specifier.
Specifies :prop:`_nonObservingEntity` and :prop:`position`, depending on :prop:`regionContainedIn`.
See `VisibleFrom`.
"""
if not isA(base, Point):
raise TypeError('specifier "not visible from O" with O not a Point')
def helper(self):
region = self.regionContainedIn
if region is None:
if currentScenario._workspace is None:
raise InvalidScenarioError(
'"not visible" specifier with no workspace or containing region defined'
)
region = currentScenario._workspace.region
if mode2D:
position = Region.uniformPointIn(region.difference(base.visibleRegion))
else:
# We can't limit the available region since any spot could potentially be occluded.
position = Region.uniformPointIn(convertToFootprint(region))
return {"position": position, "_nonObservingEntity": base}
return Specifier(
"NotVisible/NotVisibleFrom",
{"position": 3, "_nonObservingEntity": 1},
DelayedArgument({"regionContainedIn"}, helper),
)
[docs]def NotVisibleSpec():
"""The :specifier:`not visible` specifier (equivalent to :specifier:`not visible from ego`).
Specifies :prop:`_nonObservingEntity` and :prop:`position`, depending on :prop:`regionContainedIn`.
"""
return NotVisibleFrom(ego())
[docs]def LeftSpec(pos, dist=None):
"""The :specifier:`left of {X} by {Y}` polymorphic specifier.
Specifies :prop:`position`, and optionally, :prop:`parentOrientation`, depending on :prop:`width`.
Allowed forms::
left of <oriented point> [by <scalar/vector>]
left of <vector> [by <scalar/vector>]
If the :grammar:`by <scalar/vector>` is omitted, the object's contact tolerance is used.
"""
return directionalSpecHelper(
"Left of",
pos,
dist,
"width",
lambda dist: (dist, 0, 0),
lambda self, dims, tol, dx, dy, dz: Vector(
-self.width / 2 - dx - dims[0] / 2 - tol, dy, dz
),
)
[docs]def RightSpec(pos, dist=None):
"""The :specifier:`right of {X} by {Y}` polymorphic specifier.
Specifies :prop:`position`, and optionally :prop:`parentOrientation`, depending on :prop:`width`.
Allowed forms::
right of <oriented point> [by <scalar/vector>]
right of <vector> [by <scalar/vector>]
If the :grammar:`by <scalar/vector>` is omitted, zero is used.
"""
return directionalSpecHelper(
"Right of",
pos,
dist,
"width",
lambda dist: (dist, 0, 0),
lambda self, dims, tol, dx, dy, dz: Vector(
self.width / 2 + dx + dims[0] / 2 + tol, dy, dz
),
)
[docs]def Ahead(pos, dist=None):
"""The :specifier:`ahead of {X} by {Y}` polymorphic specifier.
Specifies :prop:`position`, and optionally :prop:`parentOrientation`, depending on :prop:`length`.
Allowed forms::
ahead of <oriented point> [by <scalar/vector>]
ahead of <vector> [by <scalar/vector>]
If the :grammar:`by <scalar/vector>` is omitted, the object's contact tolerance is used.
"""
return directionalSpecHelper(
"Ahead of",
pos,
dist,
"length",
lambda dist: (0, dist, 0),
lambda self, dims, tol, dx, dy, dz: Vector(
dx, self.length / 2 + dy + dims[1] / 2 + tol, dz
),
)
[docs]def Behind(pos, dist=None):
"""The :specifier:`behind {X} by {Y}` polymorphic specifier.
Specifies :prop:`position`, and optionally :prop:`parentOrientation`, depending on :prop:`length`.
Allowed forms::
behind <oriented point> [by <scalar/vector>]
behind <vector> [by <scalar/vector>]
If the :grammar:`by <scalar/vector>` is omitted, the object's contact tolerance is used.
"""
return directionalSpecHelper(
"Behind",
pos,
dist,
"length",
lambda dist: (0, dist, 0),
lambda self, dims, tol, dx, dy, dz: Vector(
dx, -self.length / 2 - dy - dims[1] / 2 - tol, dz
),
)
[docs]def Above(pos, dist=None):
"""The :specifier:`above {X} by {Y}` polymorphic specifier.
Specifies :prop:`position`, and optionally :prop:`parentOrientation`, depending on :prop:`height`.
Allowed forms::
above <oriented point> [by <scalar/vector>]
above <vector> [by <scalar/vector>]
If the :grammar:`by <scalar/vector>` is omitted, the object's contact tolerance is used.
"""
return directionalSpecHelper(
"Above",
pos,
dist,
"height",
lambda dist: (0, 0, dist),
lambda self, dims, tol, dx, dy, dz: Vector(
dx, dy, self.height / 2 + dz + dims[2] / 2 + tol
),
)
[docs]def Below(pos, dist=None):
"""The :specifier:`below {X} by {Y}` polymorphic specifier.
Specifies :prop`position`, and optionally :prop:`parentOrientation`, depending on :prop:`height`.
Allowed forms::
below <oriented point> [by <scalar/vector>]
below <vector> [by <scalar/vector>]
If the :grammar:`by <scalar/vector>` is omitted, the object's contact tolerance is used.
"""
return directionalSpecHelper(
"Below",
pos,
dist,
"height",
lambda dist: (0, 0, dist),
lambda self, dims, tol, dx, dy, dz: Vector(
dx, dy, -self.height / 2 - dz - dims[2] / 2 - tol
),
)
def directionalSpecHelper(syntax, pos, dist, axis, toComponents, makeOffset):
prop = {"position": 1}
if dist is None:
dx = dy = dz = 0
elif canCoerce(dist, builtins.float):
dx, dy, dz = toComponents(coerce(dist, builtins.float))
elif canCoerce(dist, Vector):
dx, dy, dz = coerce(dist, Vector)
else:
raise TypeError(f'"{syntax} X by D" with D not a number or vector')
@distributionFunction
def makeContactOffset(dist, ct):
if dist is None:
return ct / 2
else:
return 0
if isA(pos, Object):
prop["parentOrientation"] = 3
obj_dims = (pos.width, pos.length, pos.height)
val = lambda self: {
"position": pos.relativePosition(
makeOffset(
self,
obj_dims,
makeContactOffset(dist, self.contactTolerance),
dx,
dy,
dz,
)
),
"parentOrientation": pos.orientation,
}
new = DelayedArgument({axis, "contactTolerance"}, val)
elif isA(pos, OrientedPoint):
prop["parentOrientation"] = 3
val = lambda self: {
"position": pos.relativePosition(makeOffset(self, (0, 0, 0), 0, dx, dy, dz)),
"parentOrientation": pos.orientation,
}
new = DelayedArgument({axis}, val)
else:
pos = toVector(pos, f'specifier "{syntax} X" with X not a vector')
val = lambda self: {
"position": pos.offsetLocally(
self.orientation, makeOffset(self, (0, 0, 0), 0, dx, dy, dz)
)
}
new = DelayedArgument({axis, "orientation"}, val)
return Specifier(syntax, prop, new)
[docs]def Following(field, dist, fromPt=None):
"""The :specifier:`following {F} from {X} for {D}` specifier.
Specifies :prop:`position`, and optionally :prop:`parentOrientation`, with no dependencies.
Allowed forms::
following <field> [from <vector>] for <number>
If the :grammar:`from <vector>` is omitted, the position of ego is used.
"""
if fromPt is None:
fromPt = ego()
field = toType(field, VectorField)
fromPt = toVector(fromPt, '"following F from X for D" with X not a vector')
dist = toScalar(dist, '"following F for D" with D not a number')
pos = field.followFrom(fromPt, dist)
orientation = field[pos]
return Specifier(
"Following",
{"position": 1, "parentOrientation": 3},
{"position": pos, "parentOrientation": orientation},
)
[docs]def Facing(heading):
"""The :specifier:`facing {X}` polymorphic specifier.
Specifies :prop:`yaw`, :prop:`pitch`, and :prop:`roll`, depending on :prop:`parentOrientation`,
and depending on the form::
facing <number> # no further dependencies;
facing <field> # depends on 'position'
"""
if isA(heading, VectorField):
def helper(context):
headingAtPos = heading[context.position]
if alwaysGlobalOrientation(context.parentOrientation):
orientation = headingAtPos # simplify expr tree in common case
else:
orientation = context.parentOrientation.inverse * headingAtPos
return {
"yaw": orientation.yaw,
"pitch": orientation.pitch,
"roll": orientation.roll,
}
return Specifier(
"Facing",
{"yaw": 1, "pitch": 1, "roll": 1},
DelayedArgument({"position", "parentOrientation"}, helper),
)
else:
orientation = toOrientation(
heading, "facing x with x not a heading or orientation"
)
orientationDeps = requiredProperties(orientation)
def helper(context):
target_orientation = valueInContext(orientation, context)
euler = context.parentOrientation.localAnglesFor(target_orientation)
return {"yaw": euler[0], "pitch": euler[1], "roll": euler[2]}
return Specifier(
"Facing",
{"yaw": 1, "pitch": 1, "roll": 1},
DelayedArgument({"parentOrientation"} | orientationDeps, helper),
)
[docs]def FacingToward(pos):
"""The :grammar:`facing toward <vector>` specifier.
Specifies :prop:`yaw`, depending on :prop:`position` and :prop:`parentOrientation`.
"""
pos = toVector(pos, 'specifier "facing toward X" with X not a vector')
def helper(context):
direction = pos - context.position
rotated = direction.applyRotation(context.parentOrientation.inverse)
sphericalCoords = (
rotated.sphericalCoordinates()
) # Ignore the rho, sphericalCoords[0]
return {"yaw": sphericalCoords[1]}
return Specifier(
"FacingToward",
{"yaw": 1},
DelayedArgument({"position", "parentOrientation"}, helper),
)
[docs]def FacingDirectlyToward(pos):
"""The :grammar:`facing directly toward <vector>` specifier.
Specifies :prop:`yaw` and :prop:`pitch`, depends on :prop:`position` and :prop:`parentOrientation`.
"""
pos = toVector(pos, 'specifier "facing directly toward X" with X not a vector')
def helper(context):
"""
Same process as above, except by default also specify the pitch euler angle
"""
direction = pos - context.position
rotated = direction.applyRotation(context.parentOrientation.inverse)
sphericalCoords = rotated.sphericalCoordinates()
return {"yaw": sphericalCoords[1], "pitch": sphericalCoords[2]}
return Specifier(
"FacingDirectlyToward",
{"yaw": 1, "pitch": 1},
DelayedArgument({"position", "parentOrientation"}, helper),
)
[docs]def FacingAwayFrom(pos):
"""The :grammar:`facing away from <vector>` specifier.
Specifies :prop:`yaw`, depending on :prop:`position` and :prop:`parentOrientation`.
"""
pos = toVector(pos, 'specifier "facing away from X" with X not a vector')
def helper(context):
"""
As in FacingToward, except invert the resulting rotation axis
"""
direction = context.position - pos
rotated = direction.applyRotation(context.parentOrientation.inverse)
sphericalCoords = rotated.sphericalCoordinates()
return {"yaw": sphericalCoords[1]}
return Specifier(
"FacingAwayFrom",
{"yaw": 1},
DelayedArgument({"position", "parentOrientation"}, helper),
)
[docs]def FacingDirectlyAwayFrom(pos):
"""The :grammar:`facing directly away from <vector>` specifier.
Specifies :prop:`yaw` and :prop:`pitch`, depending on :prop:`position` and :prop:`parentOrientation`.
"""
pos = toVector(pos, 'specifier "facing away from X" with X not a vector')
def helper(context):
direction = context.position - pos
rotated = direction.applyRotation(context.parentOrientation.inverse)
sphericalCoords = rotated.sphericalCoordinates()
return {"yaw": sphericalCoords[1], "pitch": sphericalCoords[2]}
return Specifier(
"FacingDirectlyToward",
{"yaw": 1, "pitch": 1},
DelayedArgument({"position", "parentOrientation"}, helper),
)
[docs]def ApparentlyFacing(heading, fromPt=None):
"""The :grammar:`apparently facing <heading> [from <vector>]` specifier.
Specifies :prop:`yaw`, depending on :prop:`position` and :prop:`parentOrientation`.
If the :grammar:`from <vector>` is omitted, the position of ego is used.
"""
heading = toHeading(heading, 'specifier "apparently facing X" with X not a heading')
if fromPt is None:
fromPt = ego()
fromPt = toVector(
fromPt, 'specifier "apparently facing X from Y" with Y not a vector'
)
def helper(context):
return {"yaw": fromPt.angleTo(context.position) + heading}
return Specifier(
"ApparentlyFacing",
{"yaw": 1},
DelayedArgument({"position", "parentOrientation"}, helper),
)
### Primitive functions overriding Python builtins
# N.B. applying functools.wraps to preserve the metadata of the original
# functions seems to break pickling/unpickling
@distributionFunction
def filter(function, iterable):
return list(builtins.filter(function, iterable))
@distributionFunction
def str(*args, **kwargs):
return builtins.str(*args, **kwargs)
@distributionFunction
def float(*args, **kwargs):
return builtins.float(*args, **kwargs)
@distributionFunction
def int(*args, **kwargs):
return builtins.int(*args, **kwargs)
@distributionFunction
def round(*args, **kwargs):
return builtins.round(*args, **kwargs)
def len(obj):
return obj.__len__()
def range(*args):
if any(needsSampling(arg) for arg in args):
raise RandomControlFlowError("cannot construct a range with random parameters")
return builtins.range(*args)
### Temporal Operators Factories
def AtomicProposition(closure, syntaxId):
return propositions.Atomic(closure, syntaxId)
def PropositionAnd(reqs):
return propositions.And(reqs)
def PropositionOr(reqs):
return propositions.Or(reqs)
def PropositionNot(req):
return propositions.Not(req)
def Always(req):
return propositions.Always(req)
def Eventually(req):
return propositions.Eventually(req)
def Next(req):
return propositions.Next(req)
def Until(lhs, rhs):
return propositions.Until(lhs, rhs)
def Implies(lhs, rhs):
return propositions.Implies(lhs, rhs)