# Source code for scenic.core.pruning

```
"""Pruning parts of the sample space which violate requirements.
The top-level function here, `prune`, is called as the very last step of scenario
compilation (from `translator.constructScenarioFrom`).
"""
import builtins
import collections
import math
import time
import numpy
import shapely.geometry
import shapely.geos
from trimesh.transformations import translation_matrix
from scenic.core.distributions import (
AttributeDistribution,
FunctionDistribution,
MethodDistribution,
OperatorDistribution,
Samplable,
dependencies,
needsSampling,
supportInterval,
underlyingFunction,
)
from scenic.core.errors import InvalidScenarioError
from scenic.core.geometry import hypot, normalizeAngle, plotPolygon, polygonUnion
from scenic.core.object_types import Object, Point
import scenic.core.regions as regions
from scenic.core.regions import (
EmptyRegion,
MeshSurfaceRegion,
MeshVolumeRegion,
PolygonalRegion,
VoxelRegion,
)
from scenic.core.type_support import TypecheckedDistribution
from scenic.core.vectors import (
PolygonalVectorField,
Vector,
VectorField,
VectorMethodDistribution,
VectorOperatorDistribution,
)
from scenic.core.workspaces import Workspace
from scenic.syntax.relations import DistanceRelation, RelativeHeadingRelation
### Constants
PRUNING_PITCH = 0.01
### Utilities
[docs]def currentPropValue(obj, prop):
"""Get the current value of an object's property, taking into account prior pruning."""
value = getattr(obj, prop)
return value._conditioned if isinstance(value, Samplable) else value
[docs]def isMethodCall(thing, method):
"""Match calls to a given method, taking into account distribution decorators."""
if not isinstance(thing, (MethodDistribution, VectorMethodDistribution)):
return False
return thing.method is underlyingFunction(method)
[docs]def isFunctionCall(thing, function):
"""Match calls to a given function, taking into account distribution decorators."""
if not isinstance(thing, FunctionDistribution):
return False
return thing.function is underlyingFunction(function)
[docs]def matchInRegion(position):
"""Match uniform samples from a `Region`
Returns the Region, if any, along with any
offset that should be added to the base.
"""
# Case 1: Position is simply a point in a region
if isinstance(position, regions.PointInRegionDistribution):
reg = position.region
if isinstance(reg, Workspace):
reg = reg.region
return reg, None
# Case 2: Position is a point in a region with a vector offset.
if isinstance(position, VectorOperatorDistribution) and position.operator in (
"__add__",
"__radd__",
):
if isinstance(position.object, regions.PointInRegionDistribution):
reg = position.object.region
if isinstance(reg, Workspace):
reg = reg.region
assert len(position.operands) == 1
offset = position.operands[0]
return reg, offset
return None, None
[docs]def matchPolygonalField(heading, position):
"""Match orientation yaw defined by a `PolygonalVectorField` at the given position.
Matches the yaw attribute of orientations exactly equal to a `PolygonalVectorField`,
or offset by a bounded disturbance. Returns a triple consisting of the matched field
if any, together with lower/upper bounds on the disturbance.
"""
if isFunctionCall(heading, normalizeAngle):
assert len(heading.arguments) == 1
return matchPolygonalField(heading.arguments[0], position)
if (
isinstance(heading, TypecheckedDistribution)
and heading._valueType is builtins.float
):
return matchPolygonalField(heading._dist, position)
if (
isinstance(heading, AttributeDistribution) and heading.attribute == "yaw"
): # TODO generalize to other 3D angles?
orientation = heading.object
if (
isMethodCall(orientation, VectorField.__getitem__)
and isinstance(orientation.object, PolygonalVectorField)
and orientation.arguments == (position,)
):
return orientation.object, 0, 0
if isinstance(heading, OperatorDistribution) and heading.operator in (
"__add__",
"__radd__",
):
field, lower, upper = matchPolygonalField(heading.object, position)
if field is not None:
assert len(heading.operands) == 1
offset = heading.operands[0]
ol, oh = supportInterval(offset)
if ol is not None and oh is not None:
return field, lower + ol, upper + oh
return None, 0, 0
### Pruning procedures
[docs]def prune(scenario, verbosity=1):
"""Prune a `Scenario`, removing infeasible parts of the space.
This function directly modifies the Distributions used in the Scenario,
but leaves the conditional distribution under the scenario's requirements
unchanged. See `Samplable.conditionTo`.
Currently, the following pruning techniques are applied in order:
* Pruning based on containment (`pruneContainment`)
* Pruning based on relative heading bounds (`pruneRelativeHeading`)
"""
if verbosity >= 1:
print(" Pruning scenario...")
startTime = time.time()
pruneContainment(scenario, verbosity)
pruneRelativeHeading(scenario, verbosity)
pruneVisibility(scenario, verbosity)
if verbosity >= 1:
totalTime = time.time() - startTime
print(f" Pruned scenario in {totalTime:.4g} seconds.")
## Pruning based on containment
[docs]def pruneContainment(scenario, verbosity):
"""Prune based on the requirement that individual Objects fit within their container.
Specifically, if O is positioned uniformly (with a possible offset) in region B and
has container C, then we can instead pick a position uniformly in their intersection.
If we can also lower bound the radius of O, then we can first erode C by that distance
minus that maximum offset distance.
"""
for obj in scenario.objects:
# Extract the base region and container region, while doing minor checks.
base, offset = matchInRegion(obj.position)
if base is None or needsSampling(base):
continue
if isinstance(base, regions.EmptyRegion):
raise InvalidScenarioError(f"Object {obj} placed in empty region")
container = scenario.containerOfObject(obj)
if container is None or needsSampling(container):
continue
if isinstance(container, regions.EmptyRegion):
raise InvalidScenarioError(f"Object {obj} contained in empty region")
# Compute the maximum distance the object can be from the sampled point
if offset is not None:
# TODO: Support interval doesn't really work here for random values.
if isinstance(base, PolygonalRegion):
# Special handling for 2D regions that ignores vertical component of offset
offset_2d = Vector(offset.x, offset.y, 0)
_, maxDistance = supportInterval(offset_2d.norm())
else:
_, maxDistance = supportInterval(offset.norm())
else:
maxDistance = 0
# Compute the minimum radius of the object, with respect to the
# bounded dimensions of the container.
if (
isinstance(base, PolygonalRegion)
and supportInterval(obj.pitch) == (0, 0)
and supportInterval(obj.roll) == (0, 0)
):
# Special handling for 2D regions with no pitch or roll,
# using planar inradius instead.
minRadius, _ = supportInterval(obj.planarInradius)
else:
# For most regions, use full object inradius.
minRadius, _ = supportInterval(obj.inradius)
# Erode the container if possible and productive
if (
maxDistance is not None
and minRadius is not None
and (maxErosion := minRadius - maxDistance) > 0
):
if hasattr(container, "buffer"):
# We can do an exact erosion
container = container.buffer(-maxErosion)
elif isinstance(container, MeshVolumeRegion):
# We can attempt to erode a voxel approximation of the MeshVolumeRegion.
# Compute a voxel overapproximation of the mesh. Technically this is not
# an overapproximation, but one dilation with a rank 3 structuring unit
# with connectivity 3 is. To simplify, we just erode one less time than
# needed.
target_pitch = PRUNING_PITCH * max(container.mesh.extents)
voxelized_container = container.voxelized(target_pitch, lazy=True)
# Erode the voxel region. Erosion is done with a rank 3 structuring unit with
# connectivity 3 (a 3x3x3 cube of voxels). Each erosion pass can erode by at
# most math.hypot([pitch]*3). Therefore we can safely make at most
# floor(maxErosion/math.hypot([pitch]*3)) passes without eroding more
# than maxErosion. We also subtract 1 iteration for the reasons above.
iterations = (
math.floor(maxErosion / math.hypot(*([target_pitch] * 3))) - 1
)
if iterations > 0:
eroded_container = voxelized_container.dilation(
iterations=-iterations
)
# Now check if this erosion is useful, i.e. do we have less volume to sample from.
# If so, replace the original container.
if eroded_container.size < container.size:
container = eroded_container
# Restrict the base region to the possibly eroded container, unless
# they're the same in which case we're done
if base is container:
continue
newBase = base.intersect(container)
newBase.orientation = base.orientation
# Check if base was a volume and newBase is a surface,
# in which case the mesh operation might be undefined and we abort.
if isinstance(base, MeshVolumeRegion) and isinstance(newBase, MeshSurfaceRegion):
continue
# Check newBase properties
if isinstance(newBase, EmptyRegion):
raise InvalidScenarioError(f"Object {obj} does not fit in container")
percentage_pruned = percentagePruned(base, newBase)
if percentage_pruned is None:
if verbosity >= 1:
print(
f" Region containment constraint pruning attempted but could not compute percentage for {base} and {newBase}."
)
else:
if percentage_pruned <= 0.001:
# We didn't really prune anything, don't bother setting new position
continue
if verbosity >= 1:
print(
f" Region containment constraint pruned {percentage_pruned:.1f}% of space."
)
# Condition object to pruned position
newPos = regions.Region.uniformPointIn(newBase)
if offset is not None:
newPos += offset
obj.position.conditionTo(newPos)
## Pruning based on orientation
[docs]def pruneRelativeHeading(scenario, verbosity):
"""Prune based on requirements bounding the relative heading of an Object.
Specifically, if an object O is:
* positioned uniformly within a polygonal region B;
* aligned to a polygonal vector field F (up to a bounded offset);
and another object O' is:
* aligned to a polygonal vector field F' (up to a bounded offset);
* at most some finite maximum distance from O;
* required to have relative heading within a bounded offset of that of O;
then we can instead position O uniformly in the subset of B intersecting the cells
of F which satisfy the relative heading requirements w.r.t. some cell of F' which
is within the distance bound.
"""
# TODO Add test for empty pruned polygon (Might cause crash?)
# Check which objects are (approximately) aligned to polygonal vector fields
fields = {}
for obj in scenario.objects:
field, offsetL, offsetR = matchPolygonalField(obj.heading, obj.position)
if field is not None:
fields[obj] = (field, offsetL, offsetR)
# Check for relative heading relations among such objects
for obj, (field, offsetL, offsetR) in fields.items():
position = currentPropValue(obj, "position")
base, offset = matchInRegion(position)
# obj must be positioned uniformly in a Region
if base is None or needsSampling(base):
continue
if offset is not None:
continue
basePoly = regions.toPolygon(base)
if basePoly is None: # the Region must be polygonal
continue
newBasePoly = basePoly
for rel in obj._relations:
if isinstance(rel, RelativeHeadingRelation) and rel.target in fields:
tField, tOffsetL, tOffsetR = fields[rel.target]
maxDist = maxDistanceBetween(scenario, obj, rel.target)
if maxDist == float("inf"):
# the distance between the objects must be bounded
continue
feasible = feasibleRHPolygon(
field,
offsetL,
offsetR,
tField,
tOffsetL,
tOffsetR,
rel.lower,
rel.upper,
maxDist,
)
if feasible is None:
# the RH bounds may be too weak to restrict the space
continue
try:
pruned = newBasePoly & feasible
except shapely.geos.TopologicalError: # TODO how can we prevent these??
pruned = newBasePoly & feasible.buffer(0.1, cap_style=2)
if verbosity >= 1:
percent = 100 * (1.0 - (pruned.area / newBasePoly.area))
print(
f" Relative heading constraint pruned {percent:.1f}% of space."
)
newBasePoly = pruned
if newBasePoly is not basePoly:
newBase = regions.PolygonalRegion(
polygon=newBasePoly, orientation=base.orientation
)
newPos = regions.Region.uniformPointIn(newBase)
obj.position.conditionTo(newPos)
# Pruning based on visibility
def pruneVisibility(scenario, verbosity):
ego = scenario.egoObject
for obj in scenario.objects:
# Extract the base region if it exists
position = currentPropValue(obj, "position")
base, offset = matchInRegion(position)
if base is None or needsSampling(base):
continue
newBase = base
# Define a helper function to buffer an oberver's visibleRegion, resulting
# in a region that contains all points that could feasibly be the position
# of obj, if it is visible from the observer.
def bufferHelper(viewRegion):
# Compute a voxel overapproximation of the mesh. Technically this is not
# an overapproximation, but one dilation with a rank 3 structuring unit
# with connectivity 3 is. To simplify, we just dilate one additional time.
target_pitch = PRUNING_PITCH * max(viewRegion.mesh.extents)
voxelized_vr = viewRegion.voxelized(target_pitch, lazy=True)
# Dilate the voxel region. Dilation is done with a rank 3 structuring unit with
# connectivity 3 (a 3x3x3 cube of voxels). Each dilation pass must dilate by at
# least pitch. Therefore we must make at least ceiling((radius/2)/pitch) passes
# to ensure we have dilated by the half the circumradius of the object. We also
# add 1 iteration for the reasons above.
iterations = math.ceil((obj.radius / 2) / target_pitch) + 1
dilated_vr = voxelized_vr.dilation(iterations=iterations)
return dilated_vr
# Prune based off visibility/non-visibility requirements
if obj.requireVisible:
# We can restrict the base region to the buffered visible region
# of the ego.
if (
base is not ego.visibleRegion
and not needsSampling(ego.visibleRegion)
and not checkCyclical(base, ego.visibleRegion)
):
if verbosity >= 1:
print(
f" Pruning restricted base region of {obj} to visible region of ego."
)
newBase = newBase.intersect(bufferHelper(ego.visibleRegion))
if obj._observingEntity:
# We can restrict the base region to the buffered visible region
# of the observing entity. Only do this if the visible
# region is fixed, to avoid creating it at every timestep.
if (
base is not obj._observingEntity.visibleRegion
and not needsSampling(obj._observingEntity.visibleRegion)
and not checkCyclical(base, obj._observingEntity.visibleRegion)
):
if verbosity >= 1:
print(
f" Pruning restricted base region of {obj} to visible region of {obj._observingEntity}."
)
newBase = newBase.intersect(
bufferHelper(obj._observingEntity.visibleRegion)
)
# Check newBase properties
if isinstance(newBase, EmptyRegion):
raise InvalidScenarioError(
f"Object {obj} can not satisfy visibility/non-visibility constraints."
)
percentage_pruned = percentagePruned(base, newBase)
if percentage_pruned is None:
if verbosity >= 1:
print(
f" Visibility pruning attempted but could not compute percentage for {base} and {newBase}."
)
else:
if percentage_pruned <= 0.001:
# We didn't really prune anything, don't bother setting new position
continue
if verbosity >= 1:
print(f" Visibility pruning pruned {percentage_pruned:.1f}% of space.")
# Condition object to pruned position
newPos = regions.Region.uniformPointIn(newBase)
if offset is not None:
newPos += offset
obj.position.conditionTo(newPos)
[docs]def maxDistanceBetween(scenario, obj, target):
"""Upper bound the distance between the given Objects."""
# visDist is initialized to infinity. Then we can use
# various visibility constraints to upper bound it,
# keeping the tightest bound.
ego = scenario.egoObject
visDist = float("inf")
if obj is ego and target.requireVisible:
visDist = min(visDist, visibilityBound(ego, target))
if target is ego and obj.requireVisible:
visDist = min(visDist, visibilityBound(ego, obj))
if obj._observingEntity is target:
visDist = min(visDist, visibilityBound(target, obj))
if target._observingEntity is obj:
visDist = min(visDist, visibilityBound(obj, target))
# Check for any distance bounds implied by user-specified requirements
reqDist = float("inf")
for rel in obj._relations:
if isinstance(rel, DistanceRelation) and rel.target is target:
if rel.upper < reqDist:
reqDist = rel.upper
return min(visDist, reqDist)
[docs]def visibilityBound(obj, target):
"""Upper bound the distance from an Object to another it can see."""
# Upper bound on visible distance is a sum of several terms:
# 1. obj.visibleDistance
_, maxVisibleDistance = supportInterval(obj.visibleDistance)
if maxVisibleDistance is None:
return None
# 2. distance from obj's center to its camera
_, maxCameraX = supportInterval(obj.cameraOffset.x)
_, maxCameraY = supportInterval(obj.cameraOffset.y)
if maxCameraX is None or maxCameraY is None:
return None
maxVisibleDistance += math.hypot(maxCameraX, maxCameraY)
# 3. radius of target
_, maxRadius = supportInterval(target.radius)
if maxRadius is None:
return None
maxVisibleDistance += maxRadius
return maxVisibleDistance
[docs]def feasibleRHPolygon(
field, offsetL, offsetR, tField, tOffsetL, tOffsetR, lowerBound, upperBound, maxDist
):
"""Find where objects aligned to the given fields can satisfy the given RH bounds."""
if (
offsetR - offsetL >= math.tau
or tOffsetR - tOffsetL >= math.tau
or upperBound - lowerBound >= math.tau
):
return None
polygons = []
expanded = [(poly.buffer(maxDist), heading) for poly, heading in tField.cells]
for baseCell, baseHeading in field.cells:
# TODO skip cells not contained in base region?
for expandedTargetCell, targetHeading in expanded:
lower, upper = relativeHeadingRange(
baseHeading, offsetL, offsetR, targetHeading, tOffsetL, tOffsetR
)
if upper >= lowerBound and lower <= upperBound: # RH intervals overlap
intersection = baseCell & expandedTargetCell
if not intersection.is_empty:
assert isinstance(
intersection, shapely.geometry.Polygon
), intersection
polygons.append(intersection)
return polygonUnion(polygons)
[docs]def relativeHeadingRange(
baseHeading, offsetL, offsetR, targetHeading, tOffsetL, tOffsetR
):
"""Lower/upper bound the possible RH between two headings with bounded disturbances."""
if baseHeading is None or targetHeading is None:
# heading may not be constant within cell
return -math.pi, math.pi
lower = normalizeAngle(baseHeading + offsetL)
upper = normalizeAngle(baseHeading + offsetR)
points = [lower, upper]
if upper < lower:
points.extend((math.pi, -math.pi))
tLower = normalizeAngle(targetHeading + tOffsetL)
tUpper = normalizeAngle(targetHeading + tOffsetR)
tPoints = [tLower, tUpper]
if tUpper < tLower:
tPoints.extend((math.pi, -math.pi))
rhs = [tp - p for tp in tPoints for p in points] # TODO improve
return min(rhs), max(rhs)
def percentagePruned(base, newBase):
if (
base.dimensionality
and newBase.dimensionality
and base.dimensionality == newBase.dimensionality
):
ratio = newBase.size / base.size
percent = max(0, 100 * (1.0 - ratio))
return percent
return None
[docs]def checkCyclical(A, B):
"""Check for a potential circular dependency
Returns True if the scenario would have a circular dependency
if A depended on B.
"""
state = collections.defaultdict(lambda: 0)
def dfs(target):
# Check if the target is already completed/in-process
if state[target] == 2:
return False
elif state[target] == 1:
return True
# Set to in-process
state[target] = 1
# Recurse on children
deps = conditionedDeps(target)
for child in deps:
if child is A:
return True
if dfs(child):
return True
# Set to completed
state[target] = 2
return False
return dfs(B)
def conditionedDeps(samp):
return list(dependencies(samp._conditioned if isinstance(samp, Samplable) else samp))
```