"""Pruning parts of the sample space which violate requirements."""
import math
import time
import shapely.geometry
import shapely.geos
from scenic.core.distributions import (Samplable, MethodDistribution, OperatorDistribution,
needsSampling, supportInterval, underlyingFunction)
from scenic.core.object_types import Point, Object
from scenic.core.geometry import normalizeAngle, polygonUnion, plotPolygon
from scenic.core.vectors import VectorField, PolygonalVectorField, VectorMethodDistribution
from scenic.core.workspaces import Workspace
from scenic.syntax.relations import RelativeHeadingRelation, DistanceRelation
from scenic.core.utils import InvalidScenarioError
import scenic.core.regions as regions
### Utilities
[docs]def currentPropValue(obj, prop):
"""Get the current value of an object's property, taking into account prior pruning."""
value = getattr(obj, prop)
return value._conditioned if isinstance(value, Samplable) else value
[docs]def isMethodCall(thing, method):
"""Match calls to a given method, taking into account distribution decorators."""
if not isinstance(thing, (MethodDistribution, VectorMethodDistribution)):
return False
return thing.method is underlyingFunction(method)
[docs]def matchInRegion(position):
"""Match uniform samples from a Region, returning the Region if any."""
position = position.toVector()
if isinstance(position, regions.PointInRegionDistribution):
reg = position.region
if isinstance(reg, Workspace):
reg = reg.region
return reg
return None
[docs]def matchPolygonalField(heading, position):
"""Match headings defined by a PolygonalVectorField at the given position.
Matches headings exactly equal to a PolygonalVectorField, or offset by a
bounded disturbance. Returns a triplet consisting of the matched field if
any, together with lower/upper bounds on the disturbance.
"""
if (isMethodCall(heading, VectorField.__getitem__)
and isinstance(heading.object, PolygonalVectorField)
and heading.arguments == (position.toVector(),)):
return heading.object, 0, 0
elif isinstance(heading, OperatorDistribution) and heading.operator in ('__add__', '__radd__'):
field, lower, upper = matchPolygonalField(heading.object, position)
if field is not None:
assert len(heading.operands) == 1
offset = heading.operands[0]
ol, oh = supportInterval(offset)
if ol is not None and oh is not None:
return field, lower + ol, upper + oh
return None, 0, 0
### Pruning procedures
[docs]def prune(scenario, verbosity=1):
"""Prune a Scenario, removing infeasible parts of the space.
This function directly modifies the Distributions used in the Scenario,
but leaves the conditional distribution under the scenario's requirements
unchanged.
"""
if verbosity >= 1:
print(' Pruning scenario...')
startTime = time.time()
pruneContainment(scenario, verbosity)
pruneRelativeHeading(scenario, verbosity)
if verbosity >= 1:
totalTime = time.time() - startTime
print(f' Pruned scenario in {totalTime:.4g} seconds.')
## Pruning based on containment
[docs]def pruneContainment(scenario, verbosity):
"""Prune based on the requirement that individual Objects fit within their container.
Specifically, if O is positioned uniformly in region B and has container C, then we
can instead pick a position uniformly in their intersection. If we can also lower
bound the radius of O, then we can first erode C by that distance.
"""
for obj in scenario.objects:
base = matchInRegion(obj.position)
if base is None: # match objects positioned uniformly in a Region
continue
if isinstance(base, regions.EmptyRegion):
raise InvalidScenarioError(f'Object {obj} placed in empty region')
basePoly = regions.toPolygon(base)
if basePoly is None: # to prune, the Region must be polygonal
continue
if basePoly.is_empty:
raise InvalidScenarioError(f'Object {obj} placed in empty region')
container = scenario.containerOfObject(obj)
containerPoly = regions.toPolygon(container)
if containerPoly is None: # the object's container must also be polygonal
return None
minRadius, _ = supportInterval(obj.inradius)
if minRadius is not None: # if we can lower bound the radius, erode the container
containerPoly = containerPoly.buffer(-minRadius)
elif base is container:
continue
newBasePoly = basePoly & containerPoly # restrict the base Region to the container
if newBasePoly.is_empty:
raise InvalidScenarioError(f'Object {obj} does not fit in container')
if verbosity >= 1:
if basePoly.area > 0:
ratio = newBasePoly.area / basePoly.area
else:
ratio = newBasePoly.length / basePoly.length
percent = 100 * (1.0 - ratio)
print(f' Region containment constraint pruned {percent:.1f}% of space.')
newBase = regions.regionFromShapelyObject(newBasePoly, orientation=base.orientation)
newPos = regions.Region.uniformPointIn(newBase)
obj.position.conditionTo(newPos)
## Pruning based on orientation
[docs]def pruneRelativeHeading(scenario, verbosity):
"""Prune based on requirements bounding the relative heading of an Object.
Specifically, if an object O is:
* positioned uniformly within a polygonal region B;
* aligned to a polygonal vector field F (up to a bounded offset);
and another object O' is:
* aligned to a polygonal vector field F' (up to a bounded offset);
* at most some finite maximum distance from O;
* required to have relative heading within a bounded offset of that of O;
then we can instead position O uniformly in the subset of B intersecting the cells
of F which satisfy the relative heading requirements w.r.t. some cell of F' which
is within the distance bound.
"""
# Check which objects are (approximately) aligned to polygonal vector fields
fields = {}
for obj in scenario.objects:
field, offsetL, offsetR = matchPolygonalField(obj.heading, obj.position)
if field is not None:
fields[obj] = (field, offsetL, offsetR)
# Check for relative heading relations among such objects
for obj, (field, offsetL, offsetR) in fields.items():
position = currentPropValue(obj, 'position')
base = matchInRegion(position)
if base is None: # obj must be positioned uniformly in a Region
continue
basePoly = regions.toPolygon(base)
if basePoly is None: # the Region must be polygonal
continue
newBasePoly = basePoly
for rel in obj._relations:
if isinstance(rel, RelativeHeadingRelation) and rel.target in fields:
tField, tOffsetL, tOffsetR = fields[rel.target]
maxDist = maxDistanceBetween(scenario, obj, rel.target)
if maxDist == float('inf'): # the distance between the objects must be bounded
continue
feasible = feasibleRHPolygon(field, offsetL, offsetR,
tField, tOffsetL, tOffsetR,
rel.lower, rel.upper, maxDist)
if feasible is None: # the RH bounds may be too weak to restrict the space
continue
try:
pruned = newBasePoly & feasible
except shapely.geos.TopologicalError: # TODO how can we prevent these??
pruned = newBasePoly & feasible.buffer(0.1, cap_style=2)
if verbosity >= 1:
percent = 100 * (1.0 - (pruned.area / newBasePoly.area))
print(f' Relative heading constraint pruned {percent:.1f}% of space.')
newBasePoly = pruned
if newBasePoly is not basePoly:
newBase = regions.PolygonalRegion(polygon=newBasePoly,
orientation=base.orientation)
newPos = regions.Region.uniformPointIn(newBase)
obj.position.conditionTo(newPos)
[docs]def maxDistanceBetween(scenario, obj, target):
"""Upper bound the distance between the given Objects."""
# If one of the objects is the ego, use visibility requirements
ego = scenario.egoObject
if obj is ego and target.requireVisible:
visDist = visibilityBound(ego, target)
elif target is ego and obj.requireVisible:
visDist = visibilityBound(ego, obj)
else:
visDist = float('inf')
# Check for any distance bounds implied by user-specified requirements
reqDist = float('inf')
for rel in obj._relations:
if isinstance(rel, DistanceRelation) and rel.target is target:
if rel.upper < reqDist:
reqDist = rel.upper
return min(visDist, reqDist)
[docs]def visibilityBound(obj, target):
"""Upper bound the distance from an Object to another it can see."""
# Upper bound on visible distance is a sum of several terms:
# 1. obj.visibleDistance
_, maxVisibleDistance = supportInterval(obj.visibleDistance)
if maxVisibleDistance is None:
return None
# 2. distance from obj's center to its camera
_, maxCameraX = supportInterval(obj.cameraOffset.x)
_, maxCameraY = supportInterval(obj.cameraOffset.y)
if maxCameraX is None or maxCameraY is None:
return None
maxVisibleDistance += math.hypot(maxCameraX, maxCameraY)
# 3. radius of target
_, maxRadius = supportInterval(target.radius)
if maxRadius is None:
return None
maxVisibleDistance += maxRadius
return maxVisibleDistance
[docs]def feasibleRHPolygon(field, offsetL, offsetR,
tField, tOffsetL, tOffsetR,
lowerBound, upperBound, maxDist):
"""Find where objects aligned to the given fields can satisfy the given RH bounds."""
if (offsetR - offsetL >= math.tau
or tOffsetR - tOffsetL >= math.tau
or upperBound - lowerBound >= math.tau):
return None
polygons = []
expanded = [(poly.buffer(maxDist), heading) for poly, heading in tField.cells]
for baseCell, baseHeading in field.cells: # TODO skip cells not contained in base region?
for expandedTargetCell, targetHeading in expanded:
lower, upper = relativeHeadingRange(baseHeading, offsetL, offsetR,
targetHeading, tOffsetL, tOffsetR)
if (upper >= lowerBound and lower <= upperBound): # RH intervals overlap
intersection = baseCell & expandedTargetCell
if not intersection.is_empty:
assert isinstance(intersection, shapely.geometry.Polygon), intersection
polygons.append(intersection)
return polygonUnion(polygons)
[docs]def relativeHeadingRange(baseHeading, offsetL, offsetR,
targetHeading, tOffsetL, tOffsetR):
"""Lower/upper bound the possible RH between two headings with bounded disturbances."""
if baseHeading is None or targetHeading is None: # heading may not be constant within cell
return -math.pi, math.pi
lower = normalizeAngle(baseHeading + offsetL)
upper = normalizeAngle(baseHeading + offsetR)
points = [lower, upper]
if upper < lower:
points.extend((math.pi, -math.pi))
tLower = normalizeAngle(targetHeading + tOffsetL)
tUpper = normalizeAngle(targetHeading + tOffsetR)
tPoints = [tLower, tUpper]
if tUpper < tLower:
tPoints.extend((math.pi, -math.pi))
rhs = [tp - p for tp in tPoints for p in points] # TODO improve
return min(rhs), max(rhs)