Source code for scenic.core.dynamics.scenarios

"""Dynamic scenarios."""

import ast
from collections import defaultdict
import dataclasses
import functools
import inspect
import weakref

import rv_ltl

import scenic
import scenic.core.dynamics as dynamics
from scenic.core.errors import InvalidScenarioError, ScenicSyntaxError
from scenic.core.lazy_eval import DelayedArgument, needsLazyEvaluation
from scenic.core.requirements import (
    DynamicRequirement,
    PendingRequirement,
    RequirementType,
)
from scenic.core.utils import alarm, argsToString
from scenic.core.workspaces import Workspace

from .actions import _EndScenarioAction, _EndSimulationAction
from .behaviors import Behavior, Monitor
from .invocables import Invocable
from .utils import RejectSimulationException, StuckBehaviorWarning


[docs]class DynamicScenario(Invocable): """Internal class for scenarios which can execute during dynamic simulations. Provides additional information complementing `Scenario`, which originally only supported static scenarios. The two classes should probably eventually be merged. """ def __init_subclass__(cls, *args, **kwargs): import scenic.syntax.veneer as veneer veneer.registerDynamicScenarioClass(cls) target = cls._setup or cls._compose or (lambda self, agent: 0) target = functools.partial(target, 0, 0) # account for Scenic-inserted args cls.__signature__ = inspect.signature(target) _requirementSyntax = None # overridden by subclasses _simulatorFactory = None _globalParameters = None _locals = () def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self._ego = None self._workspace = None self._instances = [] # ordered for reproducibility # _objects should contain a reference to the most complete version of # the objects in this scene (sampled > unsampled) self._objects = [] # ordered for reproducibility self._sampledObjects = self._objects self._externalParameters = [] self._pendingRequirements = defaultdict(list) self._requirements = [] # things needing to be sampled to evaluate the requirements self._requirementDeps = set() self._agents = [] self._monitors = [] self._behaviors = [] self._monitorRequirements = [] self._temporalRequirements = [] self._terminationConditions = [] self._terminateSimulationConditions = [] self._recordedExprs = [] self._recordedInitialExprs = [] self._recordedFinalExprs = [] self._subScenarios = [] self._endWithBehaviors = False self._timeLimit = None self._timeLimitIsInSeconds = False self._prepared = False self._delayingPreconditionCheck = False self._dummyNamespace = None self._timeLimitInSteps = None # computed at simulation time self._elapsedTime = 0 self._eventuallySatisfied = None self._overrides = {} self._requirementMonitors = None @classmethod def _dummy(cls, namespace): scenario = cls() scenario._setup = None scenario._compose = None scenario._prepared = True scenario._dummyNamespace = namespace return scenario
[docs] @classmethod def _requiresArguments(cls): """Whether this scenario cannot be instantiated without arguments.""" if cls._setup: func = cls._setup elif cls._compose: func = cls._compose else: return True sig = inspect.signature(func) try: sig.bind(None, None) # first two arguments are added internally by Scenic return False except TypeError: return True
@property def ego(self): if self._ego is None: return DelayedArgument((), lambda context: self._ego, _internal=True) return self._ego @property def objects(self): return tuple(self._objects)
[docs] def _bindTo(self, scene): """Bind this scenario to a sampled scene when starting a new simulation.""" self._ego = scene.egoObject self._workspace = scene.workspace self._objects = list(scene.objects) self._agents = [obj for obj in scene.objects if obj.behavior is not None] self._monitors = list(scene.monitors) self._temporalRequirements = scene.temporalRequirements self._terminationConditions = scene.terminationConditions self._terminateSimulationConditions = scene.terminateSimulationConditions self._recordedExprs = scene.recordedExprs self._recordedInitialExprs = scene.recordedInitialExprs self._recordedFinalExprs = scene.recordedFinalExprs
[docs] def _prepare(self, delayPreconditionCheck=False): """Prepare the scenario for execution, executing its setup block.""" import scenic.syntax.veneer as veneer assert not self._prepared self._prepared = True self._finalizeArguments() # TODO generalize _prepare for Invocable? veneer.prepareScenario(self) with veneer.executeInScenario(self, inheritEgo=True): # Check preconditions and invariants if delayPreconditionCheck: self._delayingPreconditionCheck = True else: self._checkAllPreconditions() # Execute setup block if self._setup is not None: assert not any(needsLazyEvaluation(arg) for arg in self._args) assert not any(needsLazyEvaluation(arg) for arg in self._kwargs.values()) self._setup(None, *self._args, **self._kwargs) veneer.finishScenarioSetup(self) # Extract requirements, scan for relations used for pruning, and create closures self._compileRequirements()
@classmethod def _bindGlobals(cls, globs): cls._globalParameters = globs
[docs] def _start(self): """Start the scenario, starting its compose block, behaviors, and monitors.""" import scenic.syntax.veneer as veneer super()._start() assert self._prepared # Check preconditions if they could not be checked earlier if self._delayingPreconditionCheck: self._checkAllPreconditions() # Compute time limit now that we know the simulation timestep self._elapsedTime = 0 self._timeLimitInSteps = self._timeLimit if self._timeLimitIsInSeconds: self._timeLimitInSteps /= veneer.currentSimulation.timestep # create monitors for each requirement used for this simulation self._requirementMonitors = [r.toMonitor() for r in self._temporalRequirements] veneer.startScenario(self) with veneer.executeInScenario(self): # Start compose block if self._compose is not None: if not inspect.isgeneratorfunction(self._compose): from scenic.syntax.translator import composeBlock raise InvalidScenarioError( f'"{composeBlock}" does not invoke any scenarios' ) self._runningIterator = self._compose(None, *self._args, **self._kwargs) # Initialize behavior coroutines of agents for agent in self._agents: behavior = agent.behavior assert isinstance(behavior, Behavior), behavior behavior._assignTo(agent) # Initialize monitor coroutines for monitor in self._monitors: monitor._start()
[docs] def _step(self): """Execute the (already-started) scenario for one time step. Returns: `None` if the scenario will continue executing; otherwise a string describing why it has terminated. """ import scenic.syntax.veneer as veneer super()._step() # Check temporal requirements for m in self._requirementMonitors: result = m.value() if result == rv_ltl.B4.FALSE: raise RejectSimulationException(str(m)) # Check if we have reached the time limit, if any if ( self._timeLimitInSteps is not None and self._elapsedTime >= self._timeLimitInSteps ): return self._stop("reached time limit") self._elapsedTime += 1 # Execute compose block, if any composeDone = False if self._runningIterator is None: composeDone = True # compose block ended in an earlier step else: def alarmHandler(signum, frame): if sys.gettrace(): return # skip the warning if we're in the debugger warnings.warn( f"the compose block of scenario {self} is taking a long time; " 'maybe you have an infinite loop with no "wait" statement?', StuckBehaviorWarning, ) timeout = dynamics.stuckBehaviorWarningTimeout with veneer.executeInScenario(self), alarm(timeout, alarmHandler): try: result = self._runningIterator.send(None) if isinstance(result, (_EndSimulationAction, _EndScenarioAction)): return self._stop(result) except StopIteration: self._runningIterator = None composeDone = True # If there is a compose block and it has finished, we're done if self._compose is not None and composeDone: return self._stop("finished compose block") # Optionally end when all our agents' behaviors have ended if self._endWithBehaviors: if all(agent.behavior._isFinished for agent in self._agents): return self._stop("all behaviors finished") # Check if any termination conditions apply for req in self._terminationConditions: if req.evaluate(): return self._stop(req) # Scenario will not terminate yet return None
[docs] def _stop(self, reason, quiet=False): """Stop the scenario's execution, for the given reason.""" import scenic.syntax.veneer as veneer assert self._isRunning # Stop monitors and subscenarios. for monitor in self._monitors: if monitor._isRunning: monitor._stop() self._monitors = [] for sub in self._subScenarios: if sub._isRunning: sub._stop("parent scenario ending", quiet=quiet) self._runningIterator = None # Revert overrides. for obj, oldVals in self._overrides.items(): obj._revert(oldVals) # Inform the veneer we have stopped, and mark ourselves finished. veneer.endScenario(self, reason, quiet=quiet) super()._stop(reason) # Reject if a temporal requirement was not satisfied. if not quiet: for req in self._requirementMonitors: if req.lastValue.is_falsy: raise RejectSimulationException(str(req)) self._requirementMonitors = None return reason
def _invokeInner(self, agent, subs): for sub in subs: if not isinstance(sub, DynamicScenario): raise TypeError(f"expected a scenario, got {sub}") sub._prepare() sub._start() self._subScenarios = list(subs) while True: newSubs = [] for sub in self._subScenarios: terminationReason = sub._step() if isinstance(terminationReason, _EndSimulationAction): yield terminationReason assert False, self # should never get here since simulation ends elif terminationReason is None: newSubs.append(sub) self._subScenarios = newSubs if not newSubs: return yield None # Check if any sub-scenarios stopped during action execution self._subScenarios = [sub for sub in self._subScenarios if sub._isRunning] def _evaluateRecordedExprs(self, ty): if ty is RequirementType.record: place = "_recordedExprs" elif ty is RequirementType.recordInitial: place = "_recordedInitialExprs" elif ty is RequirementType.recordFinal: place = "_recordedFinalExprs" else: assert False, "invalid record type requested" return self._evaluateRecordedExprsAt(place) def _evaluateRecordedExprsAt(self, place): values = {} for rec in getattr(self, place): values[rec.name] = rec.evaluate() for sub in self._subScenarios: subvals = sub._evaluateRecordedExprsAt(place) values.update(subvals) return values def _runMonitors(self): terminationReason = None endScenario = None for monitor in self._monitors: action = monitor._step() # do not exit early, since subsequent monitors could reject the simulation if isinstance(action, _EndSimulationAction): terminationReason = action elif isinstance(action, _EndScenarioAction): assert action.scenario is None endScenario = action for sub in self._subScenarios: subreason = sub._runMonitors() if subreason is not None: terminationReason = subreason if endScenario: self._stop(endScenario) return terminationReason or endScenario def _checkSimulationTerminationConditions(self): for req in self._terminateSimulationConditions: if req.isTrue().is_truthy: return req return None @property def _allAgents(self): agents = list(self._agents) for sub in self._subScenarios: agents.extend(sub._allAgents) return agents def _inherit(self, other): if not self._workspace: self._workspace = other._workspace self._instances.extend(other._instances) self._objects.extend(other._objects) self._agents.extend(other._agents) self._globalParameters.update(other._globalParameters) self._externalParameters.extend(other._externalParameters) self._requirements.extend(other._requirements) self._behaviors.extend(other._behaviors) def _registerInstance(self, inst): self._instances.append(inst) def _registerObject(self, obj): self._registerInstance(obj) self._objects.append(obj) if getattr(obj, "behavior", None) is not None: self._agents.append(obj) obj._parentScenario = weakref.ref(self)
[docs] def _addRequirement(self, ty, reqID, req, line, name, prob): """Save a requirement defined at compile-time for later processing.""" assert reqID not in self._pendingRequirements preq = PendingRequirement(ty, req, line, prob, name, self._ego) self._pendingRequirements[reqID] = preq
[docs] def _addDynamicRequirement(self, ty, req, line, name): """Add a requirement defined during a dynamic simulation.""" dreq = DynamicRequirement(ty, req, line, name) self._temporalRequirements.append(dreq)
[docs] def _addMonitor(self, monitor): """Add a monitor during a dynamic simulation.""" assert isinstance(monitor, Monitor) self._monitors.append(monitor) if self._isRunning: monitor._start()
def _compileRequirements(self): namespace = self._dummyNamespace if self._dummyNamespace else self.__dict__ requirementSyntax = self._requirementSyntax assert requirementSyntax is not None for reqID, requirement in self._pendingRequirements.items(): syntax = requirementSyntax[reqID] if requirementSyntax else None # Catch the simple case where someone has most likely forgotten the "monitor" # keyword. if ( (not requirement.ty == RequirementType.monitor) and isinstance(syntax, ast.Call) and isinstance(syntax.func, ast.Name) and syntax.func.id in namespace and isinstance(namespace[syntax.func.id], type) and issubclass( namespace[syntax.func.id], scenic.core.dynamics.behaviors.Monitor ) ): raise ScenicSyntaxError( f"Missing 'monitor' keyword after 'require' when instantiating '{syntax.func.id}'" ) compiledReq = requirement.compile(namespace, self, syntax) self._registerCompiledRequirement(compiledReq) self._requirementDeps.update(compiledReq.dependencies) def _registerCompiledRequirement(self, req): if req.ty is RequirementType.require: place = self._requirements elif req.ty is RequirementType.monitor: place = self._monitorRequirements elif req.ty is RequirementType.terminateWhen: place = self._terminationConditions elif req.ty is RequirementType.terminateSimulationWhen: place = self._terminateSimulationConditions elif req.ty is RequirementType.record: place = self._recordedExprs elif req.ty is RequirementType.recordInitial: place = self._recordedInitialExprs elif req.ty is RequirementType.recordFinal: place = self._recordedFinalExprs else: raise RuntimeError(f"internal error: requirement {req} has unknown type!") place.append(req) def _setTimeLimit(self, timeLimit, inSeconds=True): self._timeLimit = timeLimit self._timeLimitIsInSeconds = inSeconds def _override(self, obj, specifiers): oldVals = obj._override(specifiers) if obj not in self._overrides: self._overrides[obj] = oldVals def _toScenario(self, namespace): assert self._prepared if not self._workspace: self._workspace = Workspace() # default empty workspace astHash = namespace["_astHash"] name = None if self._dummyNamespace else self.__class__.__name__ options = dataclasses.replace(namespace["_compileOptions"], scenario=name) from scenic.core.scenarios import Scenario scenario = Scenario( self._workspace, self._simulatorFactory, self._instances, self._objects, self._ego, self._globalParameters, self._externalParameters, self._requirements, self._requirementDeps, self._monitorRequirements, self._behaviorNamespaces, self, astHash, options, ) # TODO unify these! return scenario def __getattr__(self, name): if name in self._locals: return DelayedArgument( (), lambda context: getattr(self, name), _internal=True ) return object.__getattribute__(self, name) def __str__(self): if self._dummyNamespace: return "top-level scenario" else: args = argsToString(self._args, self._kwargs) return f"{self.__class__.__name__}({args})"