Source code for scenic.core.dynamics.invocables

"""General code for invocables, i.e. behaviors, monitors, and modular scenarios."""

import enum
import types

from scenic.core.distributions import Options
from scenic.core.errors import InvalidScenarioError
from scenic.core.lazy_eval import DelayedArgument, needsLazyEvaluation

from .guards import GuardViolation
from .utils import RejectSimulationException


[docs]class Invocable: """Abstract class with common code for behaviors and modular scenarios. Both of these types of objects can be called like functions, can have guards, and can suspend their own execution to invoke sub-behaviors/scenarios. """ def __init__(self, *args, **kwargs): import scenic.syntax.veneer as veneer if veneer.evaluatingGuard: raise InvalidScenarioError( "tried to invoke behavior/scenario from inside guard or interrupt condition" ) self._args = args self._kwargs = kwargs self._agent = None self._runningIterator = None self._isRunning = False def _start(self): assert not self._isRunning self._isRunning = True self._finalizeArguments() def _step(self): assert self._isRunning def _stop(self, reason=None): assert self._isRunning self._isRunning = False def _finalizeArguments(self): # Evaluate any lazy arguments whose evaluation was deferred until just before # this invocable starts running. args = [] for arg in self._args: if needsLazyEvaluation(arg): assert isinstance(arg, DelayedArgument) args.append(arg.evaluateInner(None)) else: args.append(arg) self._args = tuple(args) for name, arg in self._kwargs.items(): if needsLazyEvaluation(arg): assert isinstance(arg, DelayedArgument) self._kwargs[name] = arg.evaluateInner(None) def _invokeSubBehavior(self, agent, subs, modifier=None, schedule=None): def pickEnabledInvocable(opts): enabled = {} if isinstance(opts, dict): for sub, weight in opts.items(): if sub._isEnabledForAgent(agent): enabled[sub] = weight else: for sub in opts: if sub._isEnabledForAgent(agent): enabled[sub] = 1 if not enabled: raise RejectSimulationException('deadlock in "do choose/shuffle"') if len(enabled) == 1: choice = list(enabled)[0] else: choice = Options(enabled) return choice scheduler = None if schedule == "choose": if len(subs) == 1 and isinstance(subs[0], dict): subs = subs[0] subs = (pickEnabledInvocable(subs),) elif schedule == "shuffle": if len(subs) == 1 and isinstance(subs[0], dict): subs = subs[0] else: subs = {item: 1 for item in subs} def scheduler(): while subs: choice = pickEnabledInvocable(subs) subs.pop(choice) yield from self._invokeInner(agent, (choice,)) else: assert schedule is None if not scheduler: def scheduler(): yield from self._invokeInner(agent, subs) if modifier: if modifier.name == "for": # do X for Y [seconds | steps] import scenic.syntax.veneer as veneer timeLimit = modifier.value if not isinstance(timeLimit, (float, int)): raise TypeError('"do X for Y" with Y not a number') assert modifier.terminator in (None, "seconds", "steps") if modifier.terminator != "steps": timeLimit /= veneer.currentSimulation.timestep startTime = veneer.currentSimulation.currentTime condition = ( lambda: veneer.currentSimulation.currentTime - startTime >= timeLimit ) elif modifier.name == "until": # do X until Y condition = modifier.value else: raise RuntimeError( f"internal parsing error: impossible modifier {modifier}" ) def body(behavior, agent): yield from scheduler() def handler(behavior, agent): for sub in subs: if sub._isRunning: sub._stop(f'"{modifier.name}" condition met') return BlockConclusion.ABORT yield from runTryInterrupt(self, agent, body, [condition], [handler]) else: yield from scheduler()
[docs] def _invokeInner(self, agent, subs): """Run the given sub-behavior/scenario(s) in parallel. Implemented by subclasses. """ raise NotImplementedError
def _checkAllPreconditions(self): self.checkPreconditions(self._agent, *self._args, **self._kwargs) self.checkInvariants(self._agent, *self._args, **self._kwargs) def _isEnabledForAgent(self, agent): assert not self._isRunning assert self._agent is None try: self._agent = agent # in case `self` is used in a precondition self._checkAllPreconditions() return True except GuardViolation: return False finally: self._agent = None
# Try-interrupt blocks def runTryInterrupt(behavior, agent, body, conditions, handlers): body = InterruptBlock(None, body) interrupts = [InterruptBlock(c, h) for c, h in zip(conditions, handlers)] while True: # find next block to run, if any block = body for interrupt in interrupts: if interrupt.isEnabled or interrupt.isRunning: block = interrupt break result, concluded = block.step(behavior, agent) if concluded: if result is BlockConclusion.FINISHED and block is not body: continue # interrupt handler finished else: return result # entire try-interrupt statement will terminate else: yield result behavior.checkInvariants(None, *behavior._args, **behavior._kwargs)
[docs]@enum.unique class BlockConclusion(enum.Enum): FINISHED = enum.auto() ABORT = enum.auto() RETURN = enum.auto() BREAK = enum.auto() CONTINUE = enum.auto() def __call__(self, value): self.return_value = value return self
class InterruptBlock: def __init__(self, condition, body): self.condition = condition self.body = body self.runningIterator = None @property def isEnabled(self): import scenic.syntax.veneer as veneer with veneer.executeInGuard(): result = self.condition() if isinstance(result, DelayedArgument): # Condition cannot yet be evaluated because it depends on a scenario # local not yet initialized; we consider it to be false. return False return bool(result) @property def isRunning(self): return self.runningIterator is not None def step(self, behavior, agent): if not self.runningIterator: it = self.body(behavior, agent) if not isinstance(it, types.GeneratorType): return (it, True) self.runningIterator = it try: result = self.runningIterator.send(None) return (result, False) except StopIteration as e: self.runningIterator = None return (e.value, True)