"""Scenario and scene objects."""
import random
import time
from scenic.core.distributions import Samplable, RejectionException, needsSampling
from scenic.core.lazy_eval import needsLazyEvaluation
from scenic.core.external_params import ExternalSampler
from scenic.core.workspaces import Workspace
from scenic.core.vectors import Vector
from scenic.core.utils import areEquivalent, InvalidScenarioError
[docs]class Scene:
"""A scene generated from a Scenic scenario.
Attributes:
objects (tuple(:obj:`~scenic.core.object_types.Object`)): All objects in the
scene. The ``ego`` object is first.
egoObject (:obj:`~scenic.core.object_types.Object`): The ``ego`` object.
params (dict): Dictionary mapping the name of each global parameter to its value.
workspace (:obj:`~scenic.core.workspaces.Workspace`): Workspace for the scenario.
"""
def __init__(self, workspace, objects, egoObject, params):
self.workspace = workspace
self.objects = tuple(objects)
self.egoObject = egoObject
self.params = params
[docs] def show(self, zoom=None, block=True):
"""Render a schematic of the scene for debugging."""
import matplotlib.pyplot as plt
# display map
self.workspace.show(plt)
# draw objects
for obj in self.objects:
obj.show(self.workspace, plt, highlight=(obj is self.egoObject))
# zoom in if requested
if zoom != None:
self.workspace.zoomAround(plt, self.objects, expansion=zoom)
plt.show(block=block)
[docs]class Scenario:
"""A compiled Scenic scenario, from which scenes can be sampled."""
def __init__(self, workspace,
objects, egoObject,
params, externalParams,
requirements, requirementDeps):
if workspace is None:
workspace = Workspace() # default empty workspace
self.workspace = workspace
ordered = []
for obj in objects:
ordered.append(obj)
if obj is egoObject: # make ego the first object
ordered[0], ordered[-1] = ordered[-1], ordered[0]
assert ordered[0] is egoObject
self.objects = tuple(ordered)
self.egoObject = egoObject
self.params = dict(params)
self.externalParams = tuple(externalParams)
self.requirements = tuple(requirements)
self.externalSampler = ExternalSampler.forParameters(self.externalParams, self.params)
# dependencies must use fixed order for reproducibility
paramDeps = tuple(p for p in self.params.values() if isinstance(p, Samplable))
self.dependencies = self.objects + paramDeps + tuple(requirementDeps)
self.validate()
def isEquivalentTo(self, other):
if type(other) is not Scenario:
return False
return (areEquivalent(other.workspace, self.workspace)
and areEquivalent(other.objects, self.objects)
and areEquivalent(other.params, self.params)
and areEquivalent(other.externalParams, self.externalParams)
and areEquivalent(other.requirements, self.requirements)
and other.externalSampler == self.externalSampler)
def containerOfObject(self, obj):
if hasattr(obj, 'regionContainedIn') and obj.regionContainedIn is not None:
return obj.regionContainedIn
else:
return self.workspace.region
[docs] def validate(self):
"""Make some simple static checks for inconsistent built-in requirements."""
objects = self.objects
staticVisibility = not needsSampling(self.egoObject.visibleRegion)
staticBounds = [self.hasStaticBounds(obj) for obj in objects]
for i in range(len(objects)):
oi = objects[i]
# skip objects with unknown positions or bounding boxes
if not staticBounds[i]:
continue
# Require object to be contained in the workspace/valid region
container = self.containerOfObject(oi)
if not needsSampling(container) and not container.containsObject(oi):
raise InvalidScenarioError(f'Object at {oi.position} does not fit in container')
# Require object to be visible from the ego object
if staticVisibility and oi.requireVisible is True and oi is not self.egoObject:
if not self.egoObject.canSee(oi):
raise InvalidScenarioError(f'Object at {oi.position} is not visible from ego')
# Require object to not intersect another object
for j in range(i):
oj = objects[j]
if not staticBounds[j]:
continue
if oi.intersects(oj):
raise InvalidScenarioError(f'Object at {oi.position} intersects'
f' object at {oj.position}')
def hasStaticBounds(self, obj):
if needsSampling(obj.position):
return False
if any(needsSampling(corner) for corner in obj.corners):
return False
return True
[docs] def generate(self, maxIterations=2000, verbosity=0, feedback=None):
"""Sample a `Scene` from this scenario.
Args:
maxIterations (int): Maximum number of rejection sampling iterations.
verbosity (int): Verbosity level.
feedback (float): Feedback to pass to external samplers doing active sampling.
See :mod:`scenic.core.external_params`.
Returns:
A pair with the sampled `Scene` and the number of iterations used.
Raises:
`RejectionException`: if no valid sample is found in **maxIterations** iterations.
"""
objects = self.objects
# choose which custom requirements will be enforced for this sample
activeReqs = [req for req, prob in self.requirements if random.random() <= prob]
# do rejection sampling until requirements are satisfied
rejection = True
iterations = 0
while rejection is not None:
if iterations > 0: # rejected the last sample
if verbosity >= 2:
print(f' Rejected sample {iterations} because of: {rejection}')
if self.externalSampler is not None:
feedback = self.externalSampler.rejectionFeedback
if iterations >= maxIterations:
raise RejectionException(f'failed to generate scenario in {iterations} iterations')
iterations += 1
try:
if self.externalSampler is not None:
self.externalSampler.sample(feedback)
sample = Samplable.sampleAll(self.dependencies)
except RejectionException as e:
rejection = e
continue
rejection = None
ego = sample[self.egoObject]
# Normalize types of some built-in properties
for obj in objects:
sampledObj = sample[obj]
assert not needsSampling(sampledObj)
assert isinstance(sampledObj.position, Vector)
sampledObj.heading = float(sampledObj.heading)
# Check built-in requirements
for i in range(len(objects)):
vi = sample[objects[i]]
# Require object to be contained in the workspace/valid region
container = self.containerOfObject(vi)
if not container.containsObject(vi):
rejection = 'object containment'
break
# Require object to be visible from the ego object
if vi.requireVisible and vi is not ego and not ego.canSee(vi):
rejection = 'object visibility'
break
# Require object to not intersect another object
for j in range(i):
vj = sample[objects[j]]
if vi.intersects(vj):
rejection = 'object intersection'
break
if rejection is not None:
break
if rejection is not None:
continue
# Check user-specified requirements
for req in activeReqs:
if not req(sample):
rejection = 'user-specified requirement'
break
# obtained a valid sample; assemble a scene from it
sampledObjects = tuple(sample[obj] for obj in objects)
sampledParams = {}
for param, value in self.params.items():
sampledValue = sample[value] if isinstance(value, Samplable) else value
assert not needsLazyEvaluation(sampledValue)
sampledParams[param] = sampledValue
scene = Scene(self.workspace, sampledObjects, ego, sampledParams)
return scene, iterations
[docs] def resetExternalSampler(self):
"""Reset the scenario's external sampler, if any.
If the Python random seed is reset before calling this function, this
should cause the sequence of generated scenes to be deterministic."""
self.externalSampler = ExternalSampler.forParameters(self.externalParams, self.params)