"""Translator turning Scenic programs into Scenario objects.
The top-level interface to Scenic is provided by two functions:
* `scenarioFromString` -- compile a string of Scenic code;
* `scenarioFromFile` -- compile a Scenic file.
These output a `Scenario` object, from which scenes can be generated.
See the documentation for `Scenario` for details.
When imported, this module hooks the Python import system so that Scenic
modules can be imported using the ``import`` statement. This is primarily for the
translator's own use, but you could import Scenic modules from Python to
inspect them. [#import]_ Because Scenic uses Python's import system, the latter's
rules for finding modules apply, including the handling of packages.
Scenic is compiled in two main steps: translating the code into Python, and
executing the resulting Python module to generate a Scenario object encoding
the objects, distributions, etc. in the scenario. For details, see the function
`compileStream` below.
.. rubric:: Footnotes
.. [#import] Note however that care must be taken when importing Scenic modules
which will later be used when compiling multiple Scenic scenarios. Because
Python caches modules, there is the possibility of one version of a Scenic
module persisting even when it should be recompiled during the compilation
of another module that imports it.
Scenic handles the most common case, that of Scenic modules which refer to
other Scenic modules at the top level; but it is not practical to catch all
possible cases. In particular, importing a Python package which contains
Scenic modules as submodules and then later compiling those modules more
than once within the same Python process may lead to errors or unexpected
behavior. See the **cacheImports** argument of `scenarioFromFile`.
"""
import sys
import os
import io
import builtins
import time
import inspect
import types
import typing
import importlib
import importlib.abc
import importlib.util
import itertools
from collections import namedtuple, defaultdict, deque
from contextlib import contextmanager
import tokenize
from tokenize import NAME, NL, NEWLINE, ENDMARKER, OP, NUMBER, COLON, COMMENT, ENCODING
from tokenize import LPAR, RPAR, LSQB, RSQB, RBRACE, COMMA, DOUBLESLASH, DOUBLESLASHEQUAL
from tokenize import AT, LEFTSHIFT, RIGHTSHIFT, VBAR, AMPER, TILDE, CIRCUMFLEX, STAR
from tokenize import LEFTSHIFTEQUAL, RIGHTSHIFTEQUAL, VBAREQUAL, AMPEREQUAL, CIRCUMFLEXEQUAL
from tokenize import INDENT, DEDENT, STRING, SEMI, DOT
import ast
from ast import parse, dump, NodeVisitor, NodeTransformer, copy_location, fix_missing_locations
from ast import Load, Store, Del, Name, Call, Tuple, BinOp, MatMult, BitAnd, BitOr, BitXor, LShift
from ast import RShift, Starred, Lambda, AnnAssign, Set, Str, Subscript, Index, IfExp
from ast import Num, Yield, YieldFrom, FunctionDef, Attribute, Constant, Assign, Expr
from ast import Return, Raise, If, UnaryOp, Not, ClassDef, Nonlocal, Global, Compare, Is, Try
from ast import Break, Continue, AsyncFunctionDef, Pass, While, List
from scenic.core.distributions import Samplable, RejectionException, needsSampling, toDistribution
from scenic.core.lazy_eval import needsLazyEvaluation
from scenic.core.object_types import Constructible
import scenic.core.errors as errors
from scenic.core.errors import (TokenParseError, PythonParseError, ASTParseError,
InvalidScenarioError)
import scenic.core.dynamics as dynamics
import scenic.core.pruning as pruning
import scenic.syntax.veneer as veneer
### THE TOP LEVEL: compiling a Scenic program
def scenarioFromString(string, params={}, model=None, scenario=None,
[docs] filename='<string>', cacheImports=False):
"""Compile a string of Scenic code into a `Scenario`.
The optional **filename** is used for error messages.
Other arguments are as in `scenarioFromFile`.
"""
stream = io.BytesIO(string.encode())
return scenarioFromStream(stream, params=params, model=model, scenario=scenario,
filename=filename, cacheImports=cacheImports)
def scenarioFromFile(path, params={}, model=None, scenario=None, cacheImports=False):
[docs] """Compile a Scenic file into a `Scenario`.
Args:
path (str): Path to a Scenic file.
params (dict): Global parameters to override, as a dictionary mapping
parameter names to their desired values.
model (str): Scenic module to use as :term:`world model`.
scenario (str): If there are multiple :term:`modular scenarios` in the
file, which one to compile; if not specified, a scenario called 'Main'
is used if it exists.
cacheImports (bool): Whether to cache any imported Scenic modules.
The default behavior is to not do this, so that subsequent attempts
to import such modules will cause them to be recompiled. If it is
safe to cache Scenic modules across multiple compilations, set this
argument to True. Then importing a Scenic module will have the same
behavior as importing a Python module. See `purgeModulesUnsafeToCache`
for a more detailed discussion of the internals behind this.
Returns:
A `Scenario` object representing the Scenic scenario.
"""
if not os.path.exists(path):
raise FileNotFoundError(path)
fullpath = os.path.realpath(path)
head, extension = os.path.splitext(fullpath)
if not extension or extension not in scenicExtensions:
ok = ', '.join(scenicExtensions)
err = f'Scenic scenario does not have valid extension ({ok})'
raise RuntimeError(err)
directory, name = os.path.split(head)
with open(path, 'rb') as stream:
return scenarioFromStream(stream, params=params, model=model, scenario=scenario,
filename=fullpath, path=path, cacheImports=cacheImports)
def scenarioFromStream(stream, params={}, model=None, scenario=None,
[docs] filename='<stream>', path=None, cacheImports=False):
"""Compile a stream of Scenic code into a `Scenario`."""
# Compile the code as if it were a top-level module
namespace = compileTopLevelStream(stream, path, params, model, filename, cacheImports)
# Construct a Scenario from the resulting namespace
return constructScenarioFrom(namespace, scenario)
def compileTopLevelStream(stream, path, params, model, filename,
cacheImports=False, dumpScenic3=False):
oldModules = list(sys.modules.keys())
try:
with topLevelNamespace(path) as namespace:
compileStream(stream, namespace, params=params, model=model,
filename=filename, dumpScenic3=dumpScenic3)
finally:
if not cacheImports:
purgeModulesUnsafeToCache(oldModules)
return namespace
@contextmanager
[docs]def topLevelNamespace(path=None):
"""Creates an environment like that of a Python script being run directly.
Specifically, __name__ is '__main__', __file__ is the path used to invoke
the script (not necessarily its absolute path), and the parent directory is
added to the path so that 'import blobbo' will import blobbo from that
directory if it exists there.
"""
directory = os.getcwd() if path is None else os.path.dirname(path)
namespace = { '__name__': '__main__' }
if path is not None:
namespace['__file__'] = path
sys.path.insert(0, directory)
try:
yield namespace
finally:
# Remove directory from sys.path, being a little careful in case the
# Scenic program modified it (unlikely but possible).
try:
sys.path.remove(directory)
except ValueError:
pass
def purgeModulesUnsafeToCache(oldModules):
[docs] """Uncache loaded modules which should not be kept after compilation.
Keeping Scenic modules in `sys.modules` after compilation will cause
subsequent attempts at compiling the same module to reuse the compiled
scenario: this is usually not what is desired, since compilation can depend
on external state (in particular overridden global parameters, used e.g. to
specify the map for driving domain scenarios).
Args:
oldModules: List of names of modules loaded before compilation. These
will be skipped.
"""
toRemove = []
# copy sys.modules in case it mutates during iteration (actually happens!)
for name, module in sys.modules.copy().items():
if isinstance(module, ScenicModule) and name not in oldModules:
toRemove.append(name)
for name in toRemove:
parent, _, child = name.rpartition('.')
parent = sys.modules.get(parent)
if parent:
# Remove reference to purged module from parent module. This is necessary
# so that future imports of the purged module will properly refer to the
# newly-loaded version of it. See below for a long disquisition on this.
del parent.__dict__[child]
# Here are details on why the above line is necessary and the sorry history
# of my attempts to fix this type of bug (hopefully this note will prevent
# further self-sabotage). Suppose we have a Python package 'package'
# with a Scenic submodule 'submodule'. A Scenic program with the line
# from package import submodule
# will import 2 packages, namely package and package.submodule, when first
# compiled. We will then purge package.submodule from sys.modules, but not
# package, since it is an ordinary module. So if the program is compiled a
# second time, the line above will NOT import package.submodule, but simply
# access the attribute 'submodule' of the existing package 'package'. So the
# reference to the old version of package.submodule will leak out.
# (An alternative approach, which I used to use, would be to purge all
# modules containing even indirect references to Scenic modules, but this
# opens a can of worms: the implementation of
# import parent.child
# does not set the 'child' attribute of 'parent' if 'parent.child' is already
# in sys.modules, violating an invariant that Python expects [see
# https://docs.python.org/3/reference/import.html#submodules] and leading to
# confusing errors. So if parent is purged because it has some child which is
# a Scenic module, *all* of its children must then be purged. Since the
# scenic module itself can contain indirect references to Scenic modules (the
# world models), this means we have to purge the entire scenic package. But
# then whoever did 'import scenic' at the top level will be left with a
# reference to the old version of the Scenic module.)
del sys.modules[name]
def compileStream(stream, namespace, params={}, model=None, filename='<stream>',
[docs] dumpScenic3=False):
"""Compile a stream of Scenic code and execute it in a namespace.
The compilation procedure consists of the following main steps:
1. Tokenize the input using the Python tokenizer.
2. Partition the tokens into blocks separated by import statements.
This is done by the `partitionByImports` function.
3. Translate Scenic constructions into valid Python syntax.
This is done by the `TokenTranslator`.
4. Parse the resulting Python code into an AST using the Python parser.
5. Modify the AST to achieve the desired semantics for Scenic.
This is done by the `translateParseTree` function.
6. Compile and execute the modified AST.
7. After executing all blocks, extract the global state (e.g. objects).
This is done by the `storeScenarioStateIn` function.
"""
if errors.verbosityLevel >= 2:
veneer.verbosePrint(f' Compiling Scenic module from {filename}...')
startTime = time.time()
# Tokenize input stream
try:
tokens = list(tokenize.tokenize(stream.readline))
except tokenize.TokenError as e:
line = e.args[1][0] if isinstance(e.args[1], tuple) else e.args[1]
raise TokenParseError(line, filename, 'file ended during multiline string or expression')
# Partition into blocks with all imports at the end (since imports could
# pull in new constructor (Scenic class) definitions, which change the way
# subsequent tokens are transformed)
blocks = partitionByImports(tokens)
veneer.activate(params, model, filename, namespace)
newSourceBlocks = []
try:
# Execute preamble
exec(compile(preamble, '<veneer>', 'exec'), namespace)
namespace[namespaceReference] = namespace
# Execute each block
for blockNum, block in enumerate(blocks):
# Find all custom constructors defined so far (possibly imported)
constructors = findConstructorsIn(namespace)
# Translate tokens to valid Python syntax
startLine = max(1, block[0][2][0])
translator = TokenTranslator(constructors, filename)
newSource, allConstructors = translator.translate(block, dumpScenic3=dumpScenic3)
trimmed = newSource[2*(startLine-1):] # fix up blank lines used to align errors
newSource = '\n'*(startLine-1) + trimmed
newSourceBlocks.append(trimmed)
if dumpTranslatedPython:
print(f'### Begin translated Python from block {blockNum} of {filename}')
print(newSource)
print('### End translated Python')
# Parse the translated source
tree = parseTranslatedSource(newSource, filename)
# Modify the parse tree to produce the correct semantics
newTree, requirements = translateParseTree(tree, allConstructors, filename)
if dumpFinalAST:
print(f'### Begin final AST from block {blockNum} of {filename}')
print(ast.dump(newTree, include_attributes=True))
print('### End final AST')
if dumpASTPython:
try:
import astor
except ModuleNotFoundError as e:
raise RuntimeError('dumping the Python equivalent of the AST'
'requires the astor package')
print(f'### Begin Python equivalent of final AST from block {blockNum} of {filename}')
print(astor.to_source(newTree, add_line_information=True))
print('### End Python equivalent of final AST')
# Compile the modified tree
code = compileTranslatedTree(newTree, filename)
# Execute it
executeCodeIn(code, namespace)
# Extract scenario state from veneer and store it
storeScenarioStateIn(namespace, requirements)
finally:
veneer.deactivate()
if errors.verbosityLevel >= 2:
totalTime = time.time() - startTime
veneer.verbosePrint(f' Compiled Scenic module in {totalTime:.4g} seconds.')
allNewSource = ''.join(newSourceBlocks)
return code, allNewSource
### TRANSLATION PHASE ZERO: definitions of language elements not already in Python
## Options
dumpTranslatedPython = False
dumpFinalAST = False
dumpASTPython = False
usePruning = True
## Preamble
# (included at the beginning of every module to be translated;
# imports the implementations of the public language features)
preamble = """\
from scenic.syntax.veneer import *
"""
## Get Python names of various elements
## (for checking consistency between the translator and the veneer)
api = set(veneer.__all__)
## Functions used internally
createDefault = 'PropertyDefault'
scenarioClass = 'DynamicScenario'
behaviorClass = 'Behavior'
monitorClass = 'Monitor'
createTerminationAction = 'makeTerminationAction'
internalFunctions = {
createDefault, scenarioClass,
behaviorClass, monitorClass, createTerminationAction,
}
# sanity check: these functions actually exist
for imp in internalFunctions:
assert imp in api, imp
## Built-in functions
builtinFunctions = { 'resample', 'verbosePrint', 'simulation', 'localPath' }
# sanity check: implementations of built-in functions actually exist
for imp in builtinFunctions:
assert imp in api, imp
## Built-in names (values which cannot be overwritten)
globalParametersName = 'globalParameters'
builtinNames = { globalParametersName }
# sanity check: built-in names actually exist
for name in builtinNames:
assert name in api, name
## Specially-assigned names (accessing one of which calls a function)
trackedNames = { 'ego', 'workspace' }
# sanity check: getter/setter for each name exists
for name in trackedNames:
assert name in api, name
## Simple statements
paramStatement = 'param'
mutateStatement = 'mutate'
requireStatement = 'require'
softRequirement = 'require_soft' # not actually a statement, but a marker for the parser
requireAlwaysStatement = ('require', 'always')
requireEventuallyStatement = ('require', 'eventually')
terminateWhenStatement = ('terminate', 'when')
terminateSimulationWhenStatement = ('terminate', 'simulation', 'when')
terminateAfterStatement = ('terminate', 'after')
recordStatement = 'record'
recordInitialStatement = ('record', 'initial')
recordFinalStatement = ('record', 'final')
actionStatement = 'take' # statement invoking a primitive action
waitStatement = 'wait' # statement invoking a no-op action
terminateStatement = 'terminate' # statement ending the simulation
abortStatement = 'abort' # statement ending a try-interrupt statement
invokeStatement = 'do' # statement invoking a behavior or scenario
invocationSchedules = ('choose', 'shuffle')
invokeVariants = { (invokeStatement, sched) for sched in invocationSchedules }
overrideStatement = 'override' # statement overriding an object in a sub-scenario
modelStatement = 'model'
namespaceReference = '_Scenic_module_namespace' # used in the implementation of 'model'
simulatorStatement = 'simulator'
oneWordStatements = { # TODO clean up
paramStatement, mutateStatement, requireStatement,
actionStatement, waitStatement, terminateStatement,
abortStatement, invokeStatement, simulatorStatement,
recordStatement,
}
twoWordStatements = {
requireAlwaysStatement, requireEventuallyStatement,
terminateWhenStatement, terminateAfterStatement,
recordInitialStatement, recordFinalStatement,
*invokeVariants
}
threeWordStatements = { terminateSimulationWhenStatement }
threeWordIncipits = { tokens[:2]: tokens[2] for tokens in threeWordStatements } # TODO improve
for incipit, last in threeWordIncipits.items():
for tok2 in twoWordStatements:
assert tok2 != incipit, (tok2, last)
# statements implemented by functions
functionStatements = {
requireStatement, paramStatement, mutateStatement,
modelStatement, simulatorStatement, overrideStatement,
recordStatement,
}
twoWordFunctionStatements = {
requireAlwaysStatement, requireEventuallyStatement,
terminateWhenStatement, terminateAfterStatement,
recordInitialStatement, recordFinalStatement,
}
threeWordFunctionStatements = { terminateSimulationWhenStatement }
def functionForStatement(tokens):
return '_'.join(tokens) if isinstance(tokens, tuple) else tokens
def nameForStatement(tokens):
return ' '.join(tokens) if isinstance(tokens, tuple) else tokens
statementForImp = {
functionForStatement(s): nameForStatement(s)
for s in functionStatements | twoWordStatements
}
# sanity check: implementations actually exist
for imp in functionStatements:
assert imp in api, imp
for tokens in twoWordFunctionStatements:
assert len(tokens) == 2
imp = functionForStatement(tokens)
assert imp in api, imp
for tokens in threeWordFunctionStatements:
assert len(tokens) == 3
imp = functionForStatement(tokens)
assert imp in api, imp
# statements allowed inside behaviors
behavioralStatements = {
requireStatement, softRequirement, actionStatement, waitStatement,
terminateStatement, abortStatement, invokeStatement, *invokeVariants,
}
behavioralImps = { functionForStatement(s) for s in behavioralStatements }
# statements allowed inside scenario composition blocks
compositionalStatements = {
requireStatement, softRequirement, waitStatement, terminateStatement, abortStatement,
invokeStatement, *invokeVariants, overrideStatement,
}
compositionalImps = { functionForStatement(s) for s in compositionalStatements }
recordStatements = (
recordStatement, functionForStatement(recordInitialStatement),
functionForStatement(recordFinalStatement),
)
# statements encoding requirements, etc. which need special handling
# to wrap their argument in a closure
requirementStatements = recordStatements + (
requireStatement, softRequirement,
functionForStatement(requireAlwaysStatement),
functionForStatement(requireEventuallyStatement),
functionForStatement(terminateWhenStatement),
functionForStatement(terminateSimulationWhenStatement),
)
statementRaiseMarker = '_Scenic_statement_'
## Try-interrupt blocks
interruptWhenStatement = ('interrupt', 'when')
interruptExceptMarker = '_Scenic_interrupt_'
## Constructors and specifiers
# statement defining a new constructor (Scenic class);
# we still recognize 'constructor' for backwards-compatibility # TODO drop this keyword?
constructorStatements = ('class', 'constructor')
Constructor = namedtuple('Constructor', ('name', 'bases'))
builtinSpecifiers = {
# position
('visible', 'from'): 'VisibleFrom',
('not', 'visible'): 'NotVisibleSpec',
('offset', 'by'): 'OffsetBy',
('offset', 'along'): 'OffsetAlongSpec',
('at',): 'At',
('in',): 'In',
('on',): 'In',
('beyond',): 'Beyond',
('visible',): 'VisibleSpec',
('left', 'of'): 'LeftSpec',
('right', 'of'): 'RightSpec',
('ahead', 'of'): 'Ahead',
('behind',): 'Behind',
('following',): 'Following',
# heading
('apparently', 'facing'): 'ApparentlyFacing',
('facing', 'toward'): 'FacingToward',
('facing',): 'Facing'
}
# sanity check: implementations of specifiers actually exist
for imp in builtinSpecifiers.values():
assert imp in api, imp
builtinConstructors = {
'Point': Constructor('Point', None),
'OrientedPoint': Constructor('OrientedPoint', 'Point'),
'Object': Constructor('Object', 'OrientedPoint')
}
# sanity check: built-in constructors actually exist
for const in builtinConstructors:
assert const in api, const
## Other compound statements
behaviorStatement = 'behavior' # statement defining a new behavior
monitorStatement = 'monitor' # statement defining a new monitor
scenarioMarker = '_Scenic_scenario_' # not a statement, but a marker for the parser
# Scenario blocks
setupBlock = 'setup'
composeBlock = 'compose'
scenarioBlocks = { setupBlock, composeBlock, 'precondition', 'invariant' }
## Prefix operators
prefixOperators = {
('relative', 'position'): 'RelativePosition',
('relative', 'heading'): 'RelativeHeading',
('apparent', 'heading'): 'ApparentHeading',
('distance', 'from'): 'DistanceFrom',
('distance', 'to'): 'DistanceFrom',
('distance', 'past'): 'DistancePast',
('angle', 'from'): 'AngleFrom',
('angle', 'to'): 'AngleTo',
('front', 'left'): 'FrontLeft',
('front', 'right'): 'FrontRight',
('back', 'left'): 'BackLeft',
('back', 'right'): 'BackRight',
('front', 'of'): 'Front',
('back', 'of'): 'Back',
('left', 'of'): 'Left',
('right', 'of'): 'Right',
('follow',): 'Follow',
('visible',): 'Visible',
('not', 'visible'): 'NotVisible',
}
assert all(1 <= len(op) <= 2 for op in prefixOperators)
prefixIncipits = { op[0] for op in prefixOperators }
assert not any(op in oneWordStatements for op in prefixIncipits)
assert not any(op in twoWordStatements for op in prefixOperators)
# sanity check: implementations of prefix operators actually exist
for imp in prefixOperators.values():
assert imp in api, imp
## Modifiers and terminators
class ModifierInfo(typing.NamedTuple):
[docs] name: str
terminators: typing.Tuple[str]
contexts: typing.Optional[typing.Tuple[str]] = ()
modifiers = (
ModifierInfo('for', ('seconds', 'steps'), (invokeStatement,)),
ModifierInfo('until', (), (invokeStatement,)),
)
modifierNames = {}
for mod in modifiers:
assert mod.name not in modifierNames, mod
modifierNames[mod.name] = mod
terminatorsForStatements = {
functionForStatement(terminateAfterStatement): ('seconds', 'steps'),
}
## Infix operators
# pseudo-operator for encoding argument packages for (3+)-ary ops
packageToken = (RIGHTSHIFT, '>>')
packageNode = RShift
class InfixOp(typing.NamedTuple):
[docs] syntax: str
implementation: typing.Optional[str]
arity: int
token: typing.Tuple[int, str]
node: ast.AST
contexts: typing.Optional[typing.Tuple[str]] = ()
infixOperators = (
# existing Python operators with new semantics
InfixOp('@', 'Vector', 2, None, MatMult),
# operators not in Python (in decreasing precedence order)
InfixOp('at', 'FieldAt', 2, (LEFTSHIFT, '<<'), LShift),
InfixOp('relative to', 'RelativeTo', 2, (AMPER, '&'), BitAnd),
InfixOp('offset by', 'RelativeTo', 2, (AMPER, '&'), BitAnd),
InfixOp('offset along', 'OffsetAlong', 3, (CIRCUMFLEX, '^'), BitXor),
InfixOp('can see', 'CanSee', 2, (VBAR, '|'), BitOr),
# just syntactic conveniences, not really operators
InfixOp('from', None, 2, (COMMA, ','), None),
InfixOp('for', None, 2, (COMMA, ','), None, ('Follow', 'Following')),
InfixOp('to', None, 2, (COMMA, ','), None),
InfixOp('as', None, 2, (COMMA, ','), None, requirementStatements),
InfixOp('of', None, 2, (COMMA, ','), None, ('DistancePast')),
InfixOp('by', None, 2, packageToken, None)
)
infixTokens = {}
infixImplementations = {}
infixIncipits = set()
for op in infixOperators:
# if necessary, set up map from Scenic to Python syntax
if op.token is not None:
tokens = tuple(op.syntax.split(' '))
assert 1 <= len(tokens) <= 2, op
assert tokens not in infixTokens, op
assert tokens not in twoWordStatements, op
infixTokens[tokens] = op
incipit = tokens[0]
assert incipit not in oneWordStatements, op
infixIncipits.add(incipit)
# if necessary, set up map from Python to Scenic semantics
imp = op.implementation
if imp is not None:
assert imp in api, op
node = op.node
if node in infixImplementations: # two operators may have the same implementation
oldArity, oldName = infixImplementations[node]
assert op.arity == oldArity, (op, oldName)
assert imp == oldName, (op, oldName)
else:
infixImplementations[node] = (op.arity, imp)
# infix operators which do not need to occur inside function calls
generalInfixOps = {
tokens: op.token for tokens, op in infixTokens.items()
if (op.implementation or op.token == packageToken) and not op.contexts
}
## Direct syntax replacements
replacements = { # TODO police the usage of these? could yield bizarre error messages
'of': tuple(),
'deg': ((STAR, '*'), (NUMBER, '0.01745329252')),
'globalParameters': ((NAME, 'globalParameters'), (LPAR, '('), (RPAR, ')')),
}
twoWordReplacements = {
('initial', 'scenario'): ((NAME, 'in_initial_scenario'), (LPAR, '('), (RPAR, ')')),
}
## Illegal and reserved syntax
illegalTokens = {
LEFTSHIFT, RIGHTSHIFT, VBAR, AMPER, TILDE, CIRCUMFLEX,
LEFTSHIFTEQUAL, RIGHTSHIFTEQUAL, VBAREQUAL, AMPEREQUAL, CIRCUMFLEXEQUAL,
DOUBLESLASH, DOUBLESLASHEQUAL
}
# sanity check: stand-in tokens for infix operators must be illegal
for op in infixTokens.values():
ttype = op.token[0]
assert (ttype is COMMA or ttype in illegalTokens), op
illegalConstructs = {
('async', 'def'), # used to parse behaviors, so disallowed otherwise
}
keywords = (
set(constructorStatements)
| internalFunctions
| replacements.keys()
)
### TRANSLATION PHASE ONE: handling imports
## Loader for Scenic files, producing ScenicModules
class ScenicLoader(importlib.abc.InspectLoader):
def __init__(self, name, filepath):
self.filepath = filepath
def create_module(self, spec):
return ScenicModule(spec.name)
def exec_module(self, module):
# Read source file and compile it
with open(self.filepath, 'r') as stream:
source = stream.read()
with open(self.filepath, 'rb') as stream:
code, pythonSource = compileStream(stream, module.__dict__, filename=self.filepath)
# Save code, source, and translated source for later inspection
module._code = code
module._source = source
module._pythonSource = pythonSource
# If we're in the process of compiling another Scenic module, inherit
# objects, parameters, etc. from this one
if veneer.isActive():
veneer.currentScenario._inherit(module._scenario)
def is_package(self, fullname):
return False
def get_code(self, fullname):
module = importlib.import_module(fullname)
assert isinstance(module, ScenicModule), module
return module._code
def get_source(self, fullname):
module = importlib.import_module(fullname)
assert isinstance(module, ScenicModule), module
return module._pythonSource
# Hack to give instances of ScenicModule a falsy __module__ to prevent
# Sphinx from getting confused. (Autodoc doesn't expect modules to have
# that attribute, and we can't del it.) We only do this during Sphinx
# runs since it breaks pickling of the modules.
oldname = __name__
if getattr(veneer, '_buildingSphinx', False):
__name__ = None
class ScenicModule(types.ModuleType):
def __getstate__(self):
state = self.__dict__.copy()
del state['__builtins__']
return (self.__name__, state)
def __setstate__(self, state):
name, state = state
self.__init__(name) # needed to create __dict__
self.__dict__.update(state)
self.__builtins__ = builtins.__dict__
__name__ = oldname
## Finder for Scenic (and Python) files
scenicExtensions = ('.scenic', '.sc')
import importlib.machinery as machinery
loaders = [
(machinery.ExtensionFileLoader, machinery.EXTENSION_SUFFIXES),
(machinery.SourceFileLoader, machinery.SOURCE_SUFFIXES),
(machinery.SourcelessFileLoader, machinery.BYTECODE_SUFFIXES),
(ScenicLoader, scenicExtensions),
]
class ScenicFileFinder(importlib.abc.PathEntryFinder):
def __init__(self, path):
self._inner = machinery.FileFinder(path, *loaders)
def find_spec(self, fullname, target=None):
return self._inner.find_spec(fullname, target=target)
def invalidate_caches(self):
self._inner.invalidate_caches()
# Support pkgutil.iter_modules() (used by Sphinx autosummary's recursive mode);
# we need to use a subclass of FileFinder since pkgutil's implementation for
# vanilla FileFinder uses inspect.getmodulename, which doesn't recognize the
# .scenic file extension.
def iter_modules(self, prefix):
# This is mostly copied from pkgutil._iter_file_finder_modules
yielded = {}
try:
filenames = os.listdir(self._inner.path)
except OSError:
return
filenames.sort()
for fn in filenames:
modname = inspect.getmodulename(fn)
if not modname:
# Check for Scenic modules
base = os.path.basename(fn)
for ext in scenicExtensions:
if base.endswith(ext):
modname = base[:-len(ext)]
break
if modname == '__init__' or modname in yielded:
continue
path = os.path.join(self._inner.path, fn)
ispkg = False
if not modname and os.path.isdir(path) and '.' not in fn:
modname = fn
try:
dircontents = os.listdir(path)
except OSError:
# ignore unreadable directories like import does
dircontents = []
for fn in dircontents:
subname = inspect.getmodulename(fn)
if subname == '__init__':
ispkg = True
break
else:
continue # not a package
if modname and '.' not in modname:
yielded[modname] = 1
yield prefix + modname, ispkg
# Install path hook using our finder
def scenic_path_hook(path):
if not path:
path = os.getcwd()
if not os.path.isdir(path):
raise ImportError('only directories are supported', path=path)
return ScenicFileFinder(path)
sys.path_hooks.insert(0, scenic_path_hook)
sys.path_importer_cache.clear()
## Miscellaneous utilities
def partitionByImports(tokens):
[docs] """Partition the tokens into blocks ending with import statements.
We avoid splitting top-level try-except statements, to allow the pattern of trying
to import an optional module and catching an ImportError. If someone tries to define
objects inside such a statement, woe unto them.
"""
blocks = []
currentBlock = []
duringImport = False
seenTry = False
haveImported = False
finishLine = False
parenLevel = 0
tokens = Peekable(tokens)
for token in tokens:
startNewBlock = False
if token.exact_type == LPAR:
parenLevel += 1
elif token.exact_type == RPAR:
parenLevel -= 1
if finishLine:
if token.type in (NEWLINE, NL) and parenLevel == 0:
finishLine = False
if duringImport:
duringImport = False
haveImported = True
else:
assert not duringImport
finishLine = True
if token.type in (DEDENT, NEWLINE, NL, COMMENT, ENCODING):
finishLine = False
if (seenTry and token.type == DEDENT and token.start[1] == 0
and peek(tokens).string not in ('except', 'else', 'finally')):
seenTry = False
haveImported = True # just in case the try contained imports
elif token.start[1] == 0:
if token.string in ('import', 'from', modelStatement):
duringImport = True
else:
if haveImported:
# could use new constructors; needs to be in a new block
startNewBlock = True
if token.string == 'try':
seenTry = True
if startNewBlock:
blocks.append(currentBlock)
currentBlock = [token]
haveImported = False
else:
currentBlock.append(token)
blocks.append(currentBlock) # add last block
return blocks
def findConstructorsIn(namespace):
[docs] """Find all constructors (Scenic classes) defined in a namespace."""
constructors = []
for name, value in namespace.items():
if inspect.isclass(value) and issubclass(value, Constructible):
if name in builtinConstructors:
continue
parents = []
for base in value.__bases__:
if issubclass(base, Constructible):
parents.append(base.__name__)
constructors.append(Constructor(name, parents))
return constructors
### TRANSLATION PHASE TWO: translation at the level of tokens
class Peekable:
[docs] """Utility class to allow iterator lookahead."""
def __init__(self, gen):
self._gen = iter(gen)
self._cache = deque()
def __iter__(self):
return self
def __next__(self):
self._lookahead(1)
cur = self._cache.popleft()
if cur is None:
raise StopIteration
return cur
def _lookahead(self, n):
needed = n - len(self._cache)
while needed > 0:
self._cache.append(next(self._gen, None))
needed -= 1
def peek(self, n=1):
self._lookahead(n)
return self._cache[n-1]
def peek(thing, n=1):
return thing.peek(n)
class TokenTranslator:
[docs] """Translates a Scenic token stream into valid Python syntax.
This is a stateful process because constructor (Scenic class) definitions
change the way subsequent code is parsed.
"""
def __init__(self, constructors=(), filename='<unknown>'):
self.constructors = dict(builtinConstructors)
for constructor in constructors:
name = constructor.name
assert name not in self.constructors
self.constructors[name] = constructor
self.filename = filename
def parseError(self, tokenOrLine, message):
raise TokenParseError(tokenOrLine, self.filename, message)
def isConstructorContext(self, name):
return name in self.constructors or name == overrideStatement
def createConstructor(self, name, parents):
parents = tuple(parents)
assert parents
self.constructors[name] = Constructor(name, parents)
def specifiersForConstructor(self, const):
# Currently all specifiers can be used with any constructor;
# I'm leaving this here in case we later allow custom specifiers to be inherited
return builtinSpecifiers
#name, parents = self.constructors[const]
def translate(self, tokens, dumpScenic3=False):
[docs] """Do the actual translation of the token stream."""
tokens = Peekable(tokens)
newTokens = []
functionStack = []
context, startLevel = None, None
specifiersIndented = False
parenLevel = 0
row, col = 0, 0 # position of next token to write out
orow, ocol = 0, 0 # end of last token in the original source
startOfLine = True
startOfStatement = True # position where a (simple) statement could begin
constructors = self.constructors
for token in tokens:
ttype = token.exact_type
tstring = token.string
skip = False
endToken = token # token to advance past in column count
movedUpTo = False
def injectToken(tok, spaceAfter=0):
"""Add a token to the output stream, trying to preserve spacing."""
nonlocal row, col, movedUpTo
if not movedUpTo:
moveUpTo(token)
moveBeyond(token)
movedUpTo = True
ty, string = tok[:2]
if len(tok) >= 3:
moveBeyond(tok)
srow, scol = tok[2]
erow, ecol = tok[3]
width = ecol - scol
height = erow - srow
else:
width = len(string)
height = 0
ncol = ecol if height > 0 else col + width
newToken = (ty, string, (row, col), (row+height, ncol), '')
newTokens.append(newToken)
if ty in (NEWLINE, NL):
row += 1
col = 0
elif height > 0:
row += height
col = ncol
else:
col += width + spaceAfter
def moveUpTo(tok):
nonlocal row, col, orow, ocol
nrow, ncol = tok[2]
if nrow > orow:
row = nrow
col = ncol
else:
gap = ncol - ocol
assert gap >= 0, (tok, row, col, ocol)
col += gap
def moveBeyond(tok):
nonlocal orow, ocol
nrow, ncol = tok[3]
if nrow > orow or (nrow == orow and ncol > ocol):
orow = nrow
ocol = ncol
def advance(skip=True):
nextToken = next(tokens)
if skip:
moveBeyond(nextToken)
else:
injectToken(nextToken)
return nextToken
def callFunction(function, argument=None, implementation=None):
nonlocal skip, matched, functionStack
functionStack.append((function, parenLevel))
implementation = function if implementation is None else implementation
injectToken((NAME, implementation))
injectToken((LPAR, '('))
if argument is not None:
injectToken(argument)
injectToken((COMMA, ','))
skip = True
matched = True
def popFunction():
nonlocal context, startLevel
functionStack.pop()
injectToken((RPAR, ')'))
context, startLevel = (None, 0) if len(functionStack) == 0 else functionStack[-1]
def wrapStatementCall():
injectToken((NAME, 'raise'), spaceAfter=1)
injectToken((NAME, statementRaiseMarker), spaceAfter=1)
injectToken((NAME, 'from'), spaceAfter=1)
# Catch Python operators that can't be used in Scenic
if ttype in illegalTokens:
self.parseError(token, f'illegal operator "{tstring}"')
# Determine which operators are allowed in current context
allowedPrefixOps = prefixOperators
allowedInfixOps = dict()
allowedModifiers = dict()
allowedTerminators = set()
inConstructorContext = False
context, startLevel = functionStack[-1] if functionStack else (None, None)
if parenLevel == startLevel:
if self.isConstructorContext(context):
inConstructorContext = True
allowedPrefixOps = self.specifiersForConstructor(context)
else:
for opTokens, op in infixTokens.items():
if not op.contexts or context in op.contexts:
allowedInfixOps[opTokens] = op.token
for name, mod in modifierNames.items():
if not mod.contexts or context in mod.contexts:
allowedModifiers[name] = mod.name
if context in modifierNames:
allowedTerminators = modifierNames[context].terminators
elif context in terminatorsForStatements:
allowedTerminators = terminatorsForStatements[context]
else:
allowedInfixOps = generalInfixOps
# Parse next token
if ttype == LPAR or ttype == LSQB: # keep track of nesting level
parenLevel += 1
elif ttype == RPAR or ttype == RSQB: # ditto
parenLevel -= 1
elif dumpScenic3 and ttype == NEWLINE:
dumpScenic3.recordLogicalLine(token)
elif dumpScenic3 and ttype in (INDENT, DEDENT):
dumpScenic3.recordIndentation(token)
elif ttype == STRING:
# special case for global parameters with quoted names:
# transform "name"=value into "name", value
if (len(functionStack) > 0 and functionStack[-1][0] == paramStatement
and peek(tokens).string == '='):
next(tokens) # consume '='
injectToken(token)
injectToken((COMMA, ','))
skip = True
elif ttype == NAME: # the interesting case: almost all new syntax falls in here
# try to match 2-word language constructs
matched = False
nextToken = peek(tokens) # lookahead so we can give 2-word ops precedence
if nextToken is not None:
endToken = nextToken # tentatively; will be overridden if no match
nextString = nextToken.string
twoWords = (tstring, nextString)
if startOfLine and tstring == 'for': # TODO improve hack?
matched = True
endToken = token
elif startOfLine and tstring in constructorStatements: # class definition
if nextToken.type != NAME or nextString in keywords:
self.parseError(nextToken, f'invalid class name "{nextString}"')
nextToken = next(tokens) # consume name
bases, scenicParents = [], []
if peek(tokens).exact_type == LPAR: # superclass specification
next(tokens)
nextToken = next(tokens)
while nextToken.exact_type != RPAR:
base = nextToken.string
if nextToken.exact_type != NAME:
self.parseError(nextToken, f'invalid superclass "{base}"')
bases.append(base)
if base in self.constructors:
scenicParents.append(base)
if peek(tokens).exact_type == COMMA:
next(tokens)
nextToken = next(tokens)
if not scenicParents and tstring != 'class':
self.parseError(nextToken,
f'Scenic class definition with no Scenic superclasses')
if peek(tokens).exact_type != COLON:
self.parseError(peek(tokens), 'malformed class definition')
if not bases:
bases = scenicParents = ('Object',) # default superclass
if scenicParents:
self.createConstructor(nextString, scenicParents)
injectToken((NAME, 'class'), spaceAfter=1)
injectToken((NAME, nextString))
injectToken((LPAR, '('))
injectToken((NAME, bases[0]))
for base in bases[1:]:
injectToken((COMMA, ','), spaceAfter=1)
injectToken((NAME, base))
injectToken((RPAR, ')'))
skip = True
matched = True
endToken = nextToken
elif startOfLine and tstring == 'scenario': # scenario definition
if nextToken.type != NAME:
self.parseError(nextToken, f'invalid scenario name "{nextString}"')
className = scenarioMarker + nextString
injectToken((NAME, 'async'), spaceAfter=1)
injectToken((NAME, 'def'), spaceAfter=1)
injectToken((NAME, className))
advance() # consume name
skip = True
matched = True
elif startOfLine and tstring == behaviorStatement: # behavior definition
if nextToken.type != NAME:
self.parseError(nextToken, f'invalid behavior name "{nextString}"')
injectToken((NAME, 'async'), spaceAfter=1)
injectToken((NAME, 'def'), spaceAfter=1)
skip = True
matched = True
endToken = token
elif startOfLine and tstring == monitorStatement: # monitor definition
if nextToken.type != NAME:
self.parseError(nextToken, f'invalid monitor name "{nextString}"')
injectToken((NAME, 'async'), spaceAfter=1)
injectToken((NAME, 'def'), spaceAfter=1)
injectToken((NAME, dynamics.functionForMonitor(nextString)))
injectToken((LPAR, '('))
injectToken((RPAR, ')'))
advance() # consume name
if peek(tokens).exact_type != COLON:
self.parseError(nextToken, 'malformed monitor definition')
skip = True
matched = True
if dumpScenic3:
dumpScenic3.recordMonitor(nextString, peek(tokens))
elif twoWords in allowedPrefixOps: # 2-word prefix operator
callFunction(allowedPrefixOps[twoWords])
advance() # consume second word
if dumpScenic3 and inConstructorContext:
dumpScenic3.recordSpecifier(token, nextToken)
elif not startOfStatement and twoWords in allowedInfixOps: # 2-word infix operator
injectToken(allowedInfixOps[twoWords])
advance() # consume second word
skip = True
matched = True
elif startOfLine and twoWords == interruptWhenStatement:
# special case for interrupt when
injectToken((NAME, 'except'), spaceAfter=1)
callFunction(interruptExceptMarker)
advance() # consume second word
matched = True
elif startOfStatement and twoWords in threeWordIncipits: # 3-word statement
endToken = peek(tokens, 2)
thirdWord = endToken.string
expected = threeWordIncipits[twoWords]
if thirdWord != expected: # TODO do proper 3-word lookahead?
self.parseError(endToken,
f'expected "{expected}", got "{thirdWord}"')
wrapStatementCall()
function = functionForStatement(twoWords + (thirdWord,))
callFunction(function)
advance() # consume second word
advance() # consume third word
matched = True
elif startOfStatement and twoWords in twoWordStatements: # 2-word statement
wrapStatementCall()
function = functionForStatement(twoWords)
callFunction(function)
advance() # consume second word
matched = True
elif inConstructorContext and tstring == 'with': # special case for 'with' specifier
callFunction('With', argument=(STRING, f'"{nextString}"'))
advance() # consume property name
if dumpScenic3:
dumpScenic3.recordSpecifier(token, nextToken)
elif startOfStatement and tstring == requireStatement and nextString == '[':
# special case for require[p]
next(tokens) # consume '['
nextToken = next(tokens)
if nextToken.exact_type != NUMBER:
self.parseError(nextToken,
'soft requirement must have constant probability')
prob = nextToken.string
if not 0 <= float(prob) <= 1:
self.parseError(nextToken, 'probability must be between 0 and 1')
nextToken = next(tokens)
if nextToken.exact_type != RSQB:
self.parseError(nextToken, 'malformed soft requirement')
wrapStatementCall()
callFunction(softRequirement, argument=(NUMBER, prob))
endToken = nextToken
elif twoWords in twoWordReplacements: # 2-word direct replacement
for tok in twoWordReplacements[twoWords]:
injectToken(tok, spaceAfter=1)
advance() # consume second word
skip = True
elif twoWords in illegalConstructs:
construct = ' '.join(twoWords)
self.parseError(token,
f'Python construct "{construct}" not allowed in Scenic')
if not matched:
# 2-word constructs don't match; try 1-word
endToken = token
oneWord = (tstring,)
if oneWord in allowedPrefixOps: # 1-word prefix operator
callFunction(allowedPrefixOps[oneWord])
if dumpScenic3 and inConstructorContext:
dumpScenic3.recordSpecifier(token)
elif not startOfStatement and oneWord in allowedInfixOps: # 1-word infix operator
injectToken(allowedInfixOps[oneWord])
skip = True
elif inConstructorContext: # couldn't match any 1- or 2-word specifier
self.parseError(token, f'unknown specifier "{tstring}"')
elif not startOfStatement and tstring in allowedModifiers:
injectToken((COMMA, ','))
callFunction(tstring, argument=(STRING, f'"{tstring}"'),
implementation='Modifier')
elif not startOfStatement and tstring in allowedTerminators:
injectToken((COMMA, ','))
injectToken((STRING, f'"{tstring}"'))
popFunction()
skip = True
elif startOfStatement and tstring in oneWordStatements: # 1-word statement
wrapStatementCall()
callFunction(tstring)
elif token.start[1] == 0 and tstring == modelStatement: # model statement
components = []
while peek(tokens).exact_type not in (COMMENT, NEWLINE):
nextToken = next(tokens)
if nextToken.exact_type != NAME and nextToken.string != '.':
self.parseError(nextToken, 'invalid module name')
components.append(nextToken.string)
if not components:
self.parseError(token, 'model statement is missing module name')
components.append("'")
literal = "'" + ''.join(components)
wrapStatementCall()
callFunction(modelStatement, argument=(NAME, namespaceReference))
injectToken((STRING, literal))
elif startOfStatement and tstring == overrideStatement: # override statement
nextToken = next(tokens)
if nextToken.exact_type != NAME:
self.parseError(nextToken, 'object to override must be an identifier')
wrapStatementCall()
callFunction(tstring, argument=nextToken)
elif startOfLine and tstring in scenarioBlocks: # named block of scenario
if peek(tokens).exact_type != COLON:
self.parseError(peek(tokens), f'malformed "{tstring}" block')
injectToken((NAME, 'async'), spaceAfter=1)
injectToken((NAME, 'def'), spaceAfter=1)
injectToken((NAME, tstring))
injectToken((LPAR, '('))
injectToken((RPAR, ')'))
skip = True
elif (tstring in self.constructors
and peek(tokens).exact_type not in (RPAR, RSQB, RBRACE, COMMA, DOT, COLON)):
# instance definition
callFunction(tstring)
if dumpScenic3:
dumpScenic3.recordInstanceCreation(token)
elif tstring in replacements: # direct replacement
for tok in replacements[tstring]:
injectToken(tok, spaceAfter=1)
skip = True
elif startOfLine and tstring == 'from': # special case to allow 'from X import Y'
pass
elif tstring in keywords: # some malformed usage
self.parseError(token, f'unexpected keyword "{tstring}"')
elif tstring in illegalConstructs:
self.parseError(token,
f'Python construct "{tstring}" not allowed in Scenic')
else:
pass # nothing matched; pass through unchanged to Python
# Detect the end of function argument lists
if len(functionStack) > 0:
context, startLevel = functionStack[-1]
while parenLevel < startLevel: # we've closed all parens for the current function
popFunction()
inConstructor = any(self.isConstructorContext(context) for context, sl in functionStack)
if inConstructor and parenLevel == startLevel and ttype == COMMA: # starting a new specifier
while functionStack and not self.isConstructorContext(context):
popFunction()
# allow the next specifier to be on the next line, if indented
injectToken(token) # emit comma immediately
skip = True
nextToken = peek(tokens)
specOnNewLine = False
while nextToken.exact_type in (NEWLINE, NL, COMMENT, ENDMARKER):
specOnNewLine = True
if nextToken.exact_type == COMMENT:
advance(skip=False) # preserve comment
nextToken = peek(tokens)
if nextToken.exact_type not in (NEWLINE, NL):
self.parseError(nextToken, 'comma with no specifier following')
advance(skip=False) # preserve newline
nextToken = peek(tokens)
if specOnNewLine and not specifiersIndented:
nextToken = next(tokens) # consume indent
if nextToken.exact_type != INDENT:
self.parseError(nextToken,
'expected indented specifier (extra comma on previous line?)')
injectToken(nextToken)
specifiersIndented = True
elif ttype in (NEWLINE, ENDMARKER, COMMENT, SEMI): # end of line or statement
if parenLevel != 0:
self.parseError(token, 'unmatched parens/brackets')
interrupt = False
if functionStack and functionStack[0][0] == interruptExceptMarker:
lastToken = newTokens[-1]
if lastToken[1] != ':':
self.parseError(nextToken, 'expected colon for interrupt')
newTokens.pop() # remove colon for now
interrupt = True
while len(functionStack) > 0:
functionStack.pop()
injectToken((RPAR, ')'))
if interrupt:
injectToken((COLON, ':'))
# Output token unchanged, unless handled above
if not skip:
injectToken(token)
else:
moveBeyond(endToken)
startOfLine = (ttype in (ENCODING, NEWLINE, NL, INDENT, DEDENT))
startOfStatement = (startOfLine or (ttype == SEMI)) and not functionStack
rewrittenSource = tokenize.untokenize(newTokens)
if not isinstance(rewrittenSource, str): # TODO improve?
rewrittenSource = str(rewrittenSource, encoding='utf-8')
return rewrittenSource, self.constructors
### TRANSLATION PHASE THREE: parsing of Python resulting from token translation
def parseTranslatedSource(source, filename):
try:
tree = parse(source, filename=filename)
return tree
except SyntaxError as e:
raise PythonParseError(e) from None
### TRANSLATION PHASE FOUR: modifying the parse tree
temporaryName = '_Scenic_temporary_name'
behaviorArgName = '_Scenic_current_behavior'
checkPreconditionsName = 'checkPreconditions'
checkInvariantsName = 'checkInvariants'
interruptPrefix = '_Scenic_interrupt'
abortFlag = Attribute(Name('BlockConclusion', Load()), 'ABORT', Load())
breakFlag = Attribute(Name('BlockConclusion', Load()), 'BREAK', Load())
continueFlag = Attribute(Name('BlockConclusion', Load()), 'CONTINUE', Load())
returnFlag = Attribute(Name('BlockConclusion', Load()), 'RETURN', Load())
finishedFlag = Attribute(Name('BlockConclusion', Load()), 'FINISHED', Load())
noArgs = ast.arguments(
posonlyargs=[],
args=[], vararg=None,
kwonlyargs=[], kw_defaults=[],
kwarg=None, defaults=[])
selfArg = ast.arguments(
posonlyargs=[],
args=[ast.arg(arg='self', annotation=None)], vararg=None,
kwonlyargs=[], kw_defaults=[],
kwarg=None, defaults=[])
tempArg = ast.arguments(
posonlyargs=[],
args=[ast.arg(arg=temporaryName, annotation=None)], vararg=None,
kwonlyargs=[], kw_defaults=[],
kwarg=None, defaults=[Constant(None)])
initialBehaviorArgs = [
ast.arg(arg=behaviorArgName, annotation=None),
ast.arg(arg='self', annotation=None)
]
onlyBehaviorArgs = ast.arguments(
posonlyargs=[],
args=initialBehaviorArgs, vararg=None,
kwonlyargs=[], kw_defaults=[],
kwarg=None, defaults=[])
class AttributeFinder(NodeVisitor):
[docs] """Utility class for finding all referenced attributes of a given name."""
@staticmethod
def find(target, node):
af = AttributeFinder(target)
af.visit(node)
return af.attributes
def __init__(self, target):
super().__init__()
self.target = target
self.attributes = set()
def visit_Attribute(self, node):
val = node.value
if isinstance(val, Name) and val.id == self.target:
self.attributes.add(node.attr)
self.visit(val)
class LocalFinder(NodeVisitor):
[docs] """Utility class for finding all local variables of a code block."""
@staticmethod
def findIn(block, ignoreTemporaries=True):
lf = LocalFinder()
for statement in block:
lf.visit(statement)
if ignoreTemporaries:
names = set(name for name in lf.names if not name.startswith(temporaryName))
else:
names = lf.names
return names - lf.globals - lf.nonlocals
def __init__(self):
self.names = set()
self.globals = set()
self.nonlocals = set()
def visit_Global(self, node):
self.globals.update(node.names)
def visit_Nonlocal(self, node):
self.nonlocals.update(node.names)
def visit_FunctionDef(self, node):
self.names.add(node.name)
self.visit(node.args)
for decorator in node.decorator_list:
self.visit(decorator)
if node.returns is not None:
self.visit(node.returns)
# do not visit body; it's another block
def visit_Lambda(self, node):
self.visit(node.args)
# do not visit body; it's another block
def visit_ClassDef(self, node):
self.names.add(node.name)
for child in itertools.chain(node.bases, node.keywords, node.decorator_list):
self.visit(child)
# do not visit body; it's another block
def visit_Import(self, node):
for alias in node.names:
bound = alias.asname
if bound is None:
bound = alias.name
self.names.add(bound)
def visit_ImportFrom(self, node):
self.visit_Import(node)
def visit_Name(self, node):
if isinstance(node.ctx, (Store, Del)):
self.names.add(node.id)
def visit_ExceptHandler(self, node):
if node.name is not None:
self.names.add(node.name)
self.generic_visit(node)
class ASTSurgeon(NodeTransformer):
def __init__(self, constructors, filename):
super().__init__()
self.constructors = set(constructors.keys())
self.filename = filename
self.requirements = []
self.inRequire = False
self.inCompose = False
self.inBehavior = False
self.inGuard = False
self.inTryInterrupt = False
self.inInterruptBlock = False
self.inLoop = False
self.usedBreak = False
self.usedContinue = False
self.callDepth = 0
self.behaviorLocals = set()
def parseError(self, node, message):
raise ASTParseError(node, message, self.filename)
def unpack(self, arg, expected, node):
"""Unpack arguments to ternary (and up) infix operators."""
assert expected > 0
if isinstance(arg, BinOp) and isinstance(arg.op, packageNode):
if expected == 1:
self.parseError(node, 'gave too many arguments to infix operator')
else:
return self.unpack(arg.left, expected - 1, node) + [self.visit(arg.right)]
elif expected > 1:
self.parseError(node, 'gave too few arguments to infix operator')
else:
return [self.visit(arg)]
def visit(self, node):
if isinstance(node, ast.AST):
return super().visit(node)
elif isinstance(node, list):
newStatements = []
for statement in node:
newStatement = self.visit(statement)
if isinstance(newStatement, ast.AST):
newStatements.append(newStatement)
else:
newStatements.extend(newStatement)
return newStatements
else:
raise RuntimeError(f'unknown object {node} encountered during AST surgery')
def visit_Name(self, node):
if node.id in builtinNames:
if not isinstance(node.ctx, Load):
self.parseError(node, f'unexpected keyword "{node.id}"')
elif node.id in trackedNames:
assert isinstance(node.ctx, Load)
return copy_location(Call(Name(node.id, Load()), [], []), node)
elif node.id in self.behaviorLocals:
lookup = Attribute(Name(behaviorArgName, Load()), node.id, node.ctx)
return copy_location(lookup, node)
return node
def visit_Assign(self, node):
def assignsTrackable(targets):
for target in targets:
if isinstance(target, Name):
if target.id in trackedNames:
return target.id
elif isinstance(target, (Tuple, List)):
trackable = assignsTrackable(target.elts)
if trackable:
return trackable
return False
trackable = assignsTrackable(node.targets)
if trackable:
if len(node.targets) > 1 or not isinstance(node.targets[0], Name):
self.parseError(node, f'only simple assignments to "{trackable}" are allowed')
call = Call(Name(trackable, Load()), [self.visit(node.value)], [])
return copy_location(Expr(call), node)
return self.generic_visit(node)
def visit_BinOp(self, node):
"""Convert infix operators to calls to the corresponding Scenic internal functions."""
left = node.left
right = node.right
op = node.op
if isinstance(op, packageNode): # unexpected argument package
self.parseError(node, 'unexpected keyword "by"')
elif type(op) in infixImplementations: # an operator with non-Python semantics
arity, impName = infixImplementations[type(op)]
implementation = Name(impName, Load())
copy_location(implementation, node)
assert arity >= 2
args = [self.visit(left)] + self.unpack(right, arity-1, node)
newNode = Call(implementation, args, [])
else: # all other operators have the Python semantics
newNode = BinOp(self.visit(left), op, self.visit(right))
return copy_location(newNode, node)
def visit_Raise(self, node):
"""Handle Scenic statements encoded as raise statements.
In particular:
* wrap require statements with lambdas;
* handle primitive action invocations inside behaviors;
* call the veneer implementations of other statements.
"""
if not isinstance(node.exc, Name) or node.exc.id != statementRaiseMarker:
return self.generic_visit(node) # an ordinary raise statement
assert isinstance(node.cause, Call)
assert isinstance(node.cause.func, Name)
node = node.cause # move to inner call
func = node.func
assert isinstance(func, Name)
if self.inCompose and func.id not in compositionalImps:
statement = statementForImp.get(func.id, func.id)
self.parseError(node, f'"{statement}" cannot be used in a {composeBlock} block')
if self.inBehavior and func.id not in behavioralImps:
statement = statementForImp.get(func.id, func.id)
self.parseError(node, f'"{statement}" cannot be used in a behavior')
if func.id in requirementStatements: # require, terminate when, etc.
recording = func.id in recordStatements
checkedArgs = node.args
if func.id == softRequirement: # extract probability as first arg for soft reqs
func.id = requireStatement
prob = node.args[0]
assert isinstance(prob, (Constant, Num))
if isinstance(prob, Constant):
assert isinstance(prob.value, (float, int))
checkedArgs = node.args[1:]
else:
prob = None
self.validateSimpleCall(node, (1, 2), args=checkedArgs)
value = checkedArgs[0]
if len(checkedArgs) > 1:
name = checkedArgs[1]
if isinstance(name, Name):
name = Constant(name.id)
elif isinstance(name, Str):
pass
elif isinstance(name, Num):
name = Constant(str(name.n))
elif isinstance(name, Constant):
name = Constant(str(name.value))
else:
self.parseError(name, f'malformed name for "{func.id}" statement')
else:
name = Constant(None)
assert not self.inRequire
self.inRequire = True
req = self.visit(value)
self.inRequire = False
reqID = Constant(len(self.requirements)) # save ID number
self.requirements.append(req) # save condition for later inspection when pruning
closure = Lambda(noArgs, req) # enclose requirement in a lambda
lineNum = Constant(node.lineno) # save line number for error messages
copy_location(closure, req)
copy_location(lineNum, req)
newArgs = [reqID, closure, lineNum, name]
if prob:
newArgs.append(prob)
return copy_location(Expr(Call(func, newArgs, [])), node)
elif func.id == simulatorStatement:
self.validateSimpleCall(node, 1)
sim = self.visit(node.args[0])
closure = copy_location(Lambda(noArgs, sim), sim)
return copy_location(Expr(Call(func, [closure], [])), node)
elif (func.id == invokeStatement
or func.id.startswith(invokeStatement + '_')):
# Sub-behavior or sub-scenario statement
if func.id.startswith(invokeStatement + '_'):
stmt = statementForImp[func.id]
schedule = func.id[len(invokeStatement)+1:]
assert '_' not in schedule
keywords = [ast.keyword('schedule', Constant(schedule))]
else:
stmt = func.id
keywords = []
schedule = None
seenModifier = False
invoked = []
args = []
for arg in node.args:
if (isinstance(arg, Call) and isinstance(arg.func, Name)
and arg.func.id == 'Modifier'):
if seenModifier:
self.parseError(arg,
f'incompatible qualifiers for "{stmt}" statement')
seenModifier = True
assert len(arg.args) >= 2
mod = arg.args[0]
assert isinstance(mod, (Constant, Str))
mod = mod.value if isinstance(mod, Constant) else mod.s
if mod == 'until':
arg.args[1] = Lambda(noArgs, arg.args[1])
args.append(self.visit(arg))
elif seenModifier:
self.parseError(arg, f'malformed "{stmt}" statement')
else:
invoked.append(self.visit(arg))
maxInvoked = 1 if self.inBehavior and not self.inCompose and not schedule else None
self.validateSimpleCall(node, (1, maxInvoked), onlyInBehaviors=True, args=invoked)
subHandler = Attribute(Name(behaviorArgName, Load()), '_invokeSubBehavior', Load())
subArgs = [Name('self', Load()), Tuple(invoked, Load())] + args
subRunner = Call(subHandler, subArgs, keywords)
return self.generateInvocation(node, subRunner, invoker=YieldFrom)
elif func.id == actionStatement: # Action statement
assert not self.inCompose
self.validateSimpleCall(node, (1, None), onlyInBehaviors=True)
action = Tuple(self.visit(node.args), Load())
return self.generateInvocation(node, action)
elif func.id == waitStatement: # Wait statement
self.validateSimpleCall(node, 0, onlyInBehaviors=True)
return self.generateInvocation(node, Constant(()))
elif func.id == terminateStatement: # Terminate statement
self.validateSimpleCall(node, 0, onlyInBehaviors=True)
termination = Call(Name(createTerminationAction, Load()),
[Constant(node.lineno)], [])
return self.generateInvocation(node, termination)
elif func.id == abortStatement: # abort statement for try-interrupt statements
if not self.inTryInterrupt:
self.parseError(node, '"abort" outside of try-interrupt statement')
self.validateSimpleCall(node, 0, onlyInBehaviors=True)
return copy_location(Return(abortFlag), node)
else:
# statement implemented directly by a function; leave call intact
newCall = self.visit_Call(node)
return copy_location(Expr(newCall), node)
def validateSimpleCall(self, node, numArgs, onlyInBehaviors=False, args=None):
func = node.func
name = func.id
if onlyInBehaviors and not self.inBehavior:
self.parseError(node, f'"{name}" can only be used in a behavior')
args = node.args if args is None else args
if isinstance(numArgs, tuple):
assert len(numArgs) == 2
low, high = numArgs
if high is not None and len(args) > high:
self.parseError(node, f'"{name}" takes at most {high} argument(s)')
if len(args) < low:
self.parseError(node, f'"{name}" takes at least {low} argument(s)')
elif len(args) != numArgs:
self.parseError(node, f'"{name}" takes exactly {numArgs} argument(s)')
if len(node.keywords) != 0:
self.parseError(node, f'"{name}" takes no keyword arguments')
for arg in args:
if isinstance(arg, Starred):
self.parseError(node, f'argument unpacking cannot be used with "{name}"')
def generateInvocation(self, node, actionlike, invoker=Yield):
"""Generate an invocation of an action, behavior, or scenario."""
invokeAction = Expr(invoker(actionlike))
checker = Attribute(Name(behaviorArgName, Load()), checkInvariantsName, Load())
args = Starred(Attribute(Name(behaviorArgName, Load()), '_args', Load()), Load())
kwargs = ast.keyword(None, Attribute(Name(behaviorArgName, Load()), '_kwargs', Load()))
checkInvariants = Expr(Call(checker, [Name('self', Load()), args], [kwargs]))
return [copy_location(invokeAction, node), copy_location(checkInvariants, node)]
def visit_Try(self, node):
"""Handle try-interrupt blocks."""
interrupts = []
exceptionHandlers = []
for handler in node.handlers:
ty = handler.type
if (isinstance(ty, Call) and isinstance(ty.func, Name)
and ty.func.id == interruptExceptMarker):
assert handler.name is None
if len(ty.args) != 1:
self.parseError(handler, '"interrupt when" takes a single expression')
interrupts.append((ty.args[0], handler.body))
else:
exceptionHandlers.append(handler)
if not interrupts: # an ordinary try-except block
return self.generic_visit(node)
statements = []
oldInTryInterrupt = self.inTryInterrupt
self.inTryInterrupt = True
# Construct body
oldInInterruptBlock, oldInLoop = self.inInterruptBlock, self.inLoop
self.inInterruptBlock = True
self.inLoop = False
self.usedBreak = False
self.usedContinue = False
def makeInterruptBlock(name, body):
newBody = self.visit(body)
allLocals = LocalFinder.findIn(newBody)
if allLocals:
newBody.insert(0, Nonlocal(list(allLocals)))
newBody.append(Return(finishedFlag))
return FunctionDef(name, onlyBehaviorArgs, newBody, [], None)
bodyName = f'{interruptPrefix}_body'
statements.append(makeInterruptBlock(bodyName, node.body))
# Construct interrupt handlers and condition checkers
handlerNames, conditionNames = [], []
for i, (condition, block) in enumerate(interrupts):
handlerName = f'{interruptPrefix}_handler_{i}'
handlerNames.append(handlerName)
conditionName = f'{interruptPrefix}_condition_{i}'
conditionNames.append(conditionName)
statements.append(makeInterruptBlock(handlerName, block))
self.inGuard = True
checker = Lambda(noArgs, self.visit(condition))
self.inGuard = False
defChecker = Assign([Name(conditionName, Store())], checker)
statements.append(defChecker)
self.inInterruptBlock, self.inLoop = oldInInterruptBlock, oldInLoop
self.inTryInterrupt = oldInTryInterrupt
# Prepare tuples of interrupt conditions and handlers
# (in order from high priority to low, so reversed relative to the syntax)
conditions = Tuple([Name(n, Load()) for n in reversed(conditionNames)], Load())
handlers = Tuple([Name(n, Load()) for n in reversed(handlerNames)], Load())
# Construct code to execute the try-interrupt statement
args = [
Name(behaviorArgName, Load()),
Name('self', Load()),
Name(bodyName, Load()),
conditions,
handlers
]
callRuntime = Call(Name('runTryInterrupt', Load()), args, [])
runTI = Assign([Name(temporaryName, Store())], YieldFrom(callRuntime))
statements.append(runTI)
result = Name(temporaryName, Load())
if self.usedBreak:
test = Compare(result, [Is()], [breakFlag])
brk = copy_location(Break(), self.usedBreak)
statements.append(If(test, [brk], []))
if self.usedContinue:
test = Compare(result, [Is()], [continueFlag])
cnt = copy_location(Continue(), self.usedContinue)
statements.append(If(test, [cnt], []))
test = Compare(result, [Is()], [returnFlag])
retCheck = If(test, [Return(Attribute(result, 'return_value', Load()))], [])
statements.append(retCheck)
# Construct overall try-except statement
if exceptionHandlers or node.finalbody:
newTry = Try(statements,
[self.visit(handler) for handler in exceptionHandlers],
self.visit(node.orelse),
self.visit(node.finalbody))
return copy_location(newTry, node)
else:
return statements
def visit_For(self, node):
old = self.inLoop
self.inLoop = True
newNode = self.generic_visit(node)
self.inLoop = old
return newNode
def visit_While(self, node):
old = self.inLoop
self.inLoop = True
newNode = self.generic_visit(node)
self.inLoop = old
return newNode
def visit_FunctionDef(self, node):
oldInLoop, oldInInterruptBlock = self.inLoop, self.inInterruptBlock
self.inLoop, self.inInterruptBlock = False, False
newNode = self.generic_visit(node)
self.inLoop, self.inInterruptBlock = oldInLoop, oldInInterruptBlock
return newNode
def visit_Break(self, node):
if self.inInterruptBlock and not self.inLoop:
if not self.usedBreak:
self.usedBreak = node
newNode = Return(breakFlag)
return copy_location(newNode, node)
else:
return self.generic_visit(node)
def visit_Continue(self, node):
if self.inInterruptBlock and not self.inLoop:
if not self.usedContinue:
self.usedContinue = node
newNode = Return(continueFlag)
return copy_location(newNode, node)
else:
return self.generic_visit(node)
def visit_Return(self, node):
if self.inInterruptBlock:
value = Constant(None) if node.value is None else node.value
ret = Return(Call(returnFlag, [value], []))
return copy_location(ret, node)
else:
return self.generic_visit(node)
def visit_Call(self, node):
"""Handle Scenic syntax and semantics of function calls.
In particular:
* unpack argument packages for operators;
* check for iterable unpacking applied to distributions.
"""
func = node.func
newArgs = []
# Translate arguments, unpacking any argument packages
self.callDepth += 1
wrappedStar = False
for arg in node.args:
if isinstance(arg, BinOp) and isinstance(arg.op, packageNode):
newArgs.extend(self.unpack(arg, 2, node))
elif isinstance(arg, Starred) and not self.inBehavior:
wrappedStar = True
checkedVal = Call(Name('wrapStarredValue', Load()),
[self.visit(arg.value), Constant(arg.value.lineno)],
[])
newArgs.append(Starred(checkedVal, Load()))
else:
newArgs.append(self.visit(arg))
newKeywords = [self.visit(kwarg) for kwarg in node.keywords]
newFunc = self.visit(func)
self.callDepth -= 1
if wrappedStar:
newNode = Call(Name('callWithStarArgs', Load()), [newFunc] + newArgs, newKeywords)
else:
newNode = Call(newFunc, newArgs, newKeywords)
newNode = copy_location(newNode, node)
ast.fix_missing_locations(newNode)
return newNode
def visit_AsyncFunctionDef(self, node):
"""Process Scenic constructs parsed as async function definitions.
These include:
* scenario definitions;
* behavior (and monitor) definitions.
"""
if node.name.startswith(scenarioMarker): # scenario definition
return self.transformScenarioDefinition(node)
else:
return self.transformBehaviorDefinition(node)
def transformScenarioDefinition(self, node):
if self.inCompose:
self.parseError(node, f'cannot define a scenario inside a {composeBlock} block')
if self.inBehavior:
self.parseError(node, 'cannot define a scenario inside a behavior')
# Set up arguments for setup and compose blocks
args = node.args
args.args = initialBehaviorArgs + args.args
args = self.visit(args)
# Extract named blocks from scenario body, if any
simple = False # simple scenario with no blocks
setup, compose = None, None
preconditions, invariants = [], []
for statement in node.body:
if isinstance(statement, AsyncFunctionDef):
if statement.name == setupBlock:
if setup:
self.parseError(statement,
f'scenario contains multiple "{setupBlock}" blocks')
setup = statement
elif statement.name == composeBlock:
if compose:
self.parseError(statement,
f'scenario contains multiple "{composeBlock}" blocks')
compose = statement
elif statement.name in ('precondition', 'invariant'):
if len(statement.body) != 1 or not isinstance(statement.body[0], Expr):
self.parseError(statement.body[0], f'malformed precondition/invariant')
self.inGuard = True
test = self.visit(statement.body[0].value)
self.inGuard = False
assert isinstance(test, ast.AST)
if statement.name == 'precondition':
preconditions.append(test)
else:
invariants.append(test)
else:
simple = True
else:
# scenario contains actual code; assume it is simple
simple = True
if simple: # simple scenario: entire body is implicitly the setup block
if setup:
self.parseError(setup, f'simple scenario cannot have a "{setupBlock}" block')
if compose:
self.parseError(compose, f'simple scenario cannot have a "{composeBlock}" block')
if preconditions or invariants:
self.parseError(node, f'simple scenario cannot have preconditions/invariants')
# Find all locals of the scenario, which will be shared amongst the various blocks
allLocals = set()
if compose:
allLocals.update(LocalFinder.findIn(compose.body))
if setup:
allLocals.update(LocalFinder.findIn(setup.body))
oldBL = self.behaviorLocals
self.behaviorLocals = allLocals
# Construct compose block
self.inCompose = self.inBehavior = True
guardCheckers = self.makeGuardCheckers(args, preconditions, invariants)
if compose or preconditions or invariants:
if compose:
body = self.visit(compose.body)
else:
# generate no-op compose block to ensure invariants are checked
wait = self.generateInvocation(node, Constant(()))
body = [While(Constant(True), wait, [])]
compose = node # for copy_location below
newDef = FunctionDef('_compose', args, body, [], None)
compose = copy_location(newDef, compose)
else:
compose = Assign([Name('_compose', Store())], Constant(None))
self.inCompose = self.inBehavior = False
# Construct setup block
if setup or simple:
if setup:
oldBody = setup.body
oldLoc = setup
else:
oldBody = node.body
oldLoc = node
newBody = self.visit(oldBody)
newDef = FunctionDef('_setup', args, newBody, [], None)
setup = copy_location(newDef, oldLoc)
else:
setup = Assign([Name('_setup', Store())], Constant(None))
self.behaviorLocals = oldBL
# Assemble scenario definition
name = node.name[len(scenarioMarker):]
saveLocals = Assign([Name('_locals', Store())], Constant(frozenset(allLocals)))
body = guardCheckers + [saveLocals, setup, compose]
newDef = ClassDef(name, [Name(scenarioClass, Load())], [], body, [])
return copy_location(newDef, node)
def transformBehaviorDefinition(self, node):
if self.inCompose:
self.parseError(node, f'cannot define a behavior inside a {composeBlock} block')
if self.inBehavior:
self.parseError(node, 'cannot define a behavior inside a behavior')
# copy arguments to the behavior object's namespace
args = node.args
copyArgs = []
allArgs = itertools.chain(args.args, args.kwonlyargs)
if sys.version_info >= (3, 8):
allArgs = itertools.chain(args.posonlyargs, allArgs)
for arg in allArgs:
dest = Attribute(Name(behaviorArgName, Load()), arg.arg, Store())
copyArgs.append(copy_location(Assign([dest], Name(arg.arg, Load())), arg))
# add private current behavior argument and implicit 'self' argument
newArgs = self.visit(node.args)
newArgs.args = initialBehaviorArgs + newArgs.args
# process body
self.inBehavior = True
oldBL = self.behaviorLocals
self.behaviorLocals = allLocals = LocalFinder.findIn(node.body)
oldInLoop = self.inLoop
self.inLoop = False
preconditions = []
invariants = []
newStatements = []
# preserve docstring, if any
if (isinstance(node.body[0], Expr) and isinstance(node.body[0].value, Constant)
and isinstance(node.body[0].value.value, str)):
docstring = [node.body[0]]
oldStatements = node.body[1:]
else:
docstring = []
oldStatements = node.body
# find precondition and invariant definitions
for statement in oldStatements:
if isinstance(statement, AsyncFunctionDef):
group = None
if statement.name == 'precondition':
group = preconditions
elif statement.name == 'invariant':
group = invariants
else:
self.parseError(statement, 'unknown type of behavior attribute')
if len(statement.body) != 1 or not isinstance(statement.body[0], Expr):
self.parseError(statement.body, f'malformed behavior {statement.name}')
self.inGuard = True
test = self.visit(statement.body[0].value)
self.inGuard = False
assert isinstance(test, ast.AST)
group.append(test)
else:
newStatement = self.visit(statement)
if isinstance(newStatement, ast.AST):
newStatements.append(newStatement)
else:
newStatements.extend(newStatement)
guardCheckers = self.makeGuardCheckers(newArgs, preconditions, invariants)
newBody = copyArgs + newStatements
self.inBehavior = False
self.behaviorLocals = oldBL
self.inLoop = oldInLoop
# convert to class definition
saveLocals = Assign([Name('_locals', Store())], Constant(frozenset(allLocals)))
decorators = [self.visit(decorator) for decorator in node.decorator_list]
genDefn = FunctionDef('makeGenerator', newArgs, newBody, decorators, node.returns)
classBody = docstring + guardCheckers + [saveLocals, genDefn]
name = node.name
if dynamics.isAMonitorName(name):
superclass = monitorClass
name = dynamics.monitorName(name)
else:
superclass = behaviorClass
newDefn = ClassDef(name, [Name(superclass, Load())], [], classBody, [])
return copy_location(newDefn, node)
def makeGuardCheckers(self, args, preconditions, invariants):
# generate precondition checker
precondChecks = []
for precondition in preconditions:
call = Call(Name('PreconditionViolation', Load()),
[Name(behaviorArgName, Load()), Constant(precondition.lineno)],
[])
throw = Raise(exc=call, cause=None)
check = If(test=UnaryOp(Not(), precondition), body=[throw], orelse=[])
precondChecks.append(copy_location(check, precondition))
definePChecker = FunctionDef(checkPreconditionsName, args,
precondChecks + [ast.Pass()], [], None)
# generate invariant checker
invChecks = []
for invariant in invariants:
call = Call(Name('InvariantViolation', Load()),
[Name(behaviorArgName, Load()), Constant(invariant.lineno)],
[])
throw = Raise(exc=call, cause=None)
check = If(test=UnaryOp(Not(), invariant), body=[throw], orelse=[])
invChecks.append(copy_location(check, invariant))
defineIChecker = FunctionDef(checkInvariantsName, args,
invChecks + [ast.Pass()], [], None)
# assemble function body preamble
preamble = [
definePChecker,
defineIChecker,
]
return preamble
def visit_Yield(self, node):
if self.inCompose:
self.parseError(node, f'"yield" is not allowed inside a {composeBlock} block')
if self.inBehavior:
self.parseError(node, '"yield" is not allowed inside a behavior')
return self.generic_visit(node)
def visit_YieldFrom(self, node):
if self.inCompose:
self.parseError(node, f'"yield from" is not allowed inside a {composeBlock} block')
if self.inBehavior:
self.parseError(node, '"yield from" is not allowed inside a behavior')
return self.generic_visit(node)
def visit_ClassDef(self, node):
"""Handle Scenic constructs parsed as class definitions.
In particular:
* transform Scenic class definitions into ordinary Python ones;
* leave ordinary Python class definitions alone.
"""
if node.name in self.constructors: # Scenic class definition
return self.transformScenicClass(node)
else: # ordinary Python class
for base in node.bases:
name = None
if isinstance(base, Call):
name = base.func.id
elif isinstance(base, Name):
name = base.id
if name is not None and name in self.constructors:
self.parseError(node,
f'Python class {node.name} derives from Scenic class {name}')
return self.generic_visit(node)
def transformScenicClass(self, node):
"""Process property defaults for Scenic classes."""
newBody = []
for child in node.body:
child = self.visit(child)
if isinstance(child, AnnAssign): # default value for property
origValue = child.annotation
target = child.target
# extract any attributes for this property
metaAttrs = []
if isinstance(target, Subscript):
sl = target.slice
if isinstance(sl, Index): # needed for compatibility with Python 3.8 and earlier
sl = sl.value
if isinstance(sl, Name):
metaAttrs.append(sl.id)
elif isinstance(sl, Tuple):
for elt in sl.elts:
if not isinstance(elt, Name):
self.parseError(elt,
'malformed attributes for property default')
metaAttrs.append(elt.id)
else:
self.parseError(sl, 'malformed attributes for property default')
newTarget = Name(target.value.id, Store())
copy_location(newTarget, target)
target = newTarget
# find dependencies of the default value
properties = AttributeFinder.find('self', origValue)
# create default value object
args = [
Set([Str(prop) for prop in properties]),
Set([Str(attr) for attr in metaAttrs]),
Lambda(selfArg, origValue)
]
value = Call(Name(createDefault, Load()), args, [])
copy_location(value, origValue)
newChild = AnnAssign(
target=target, annotation=value,
value=None, simple=True)
child = copy_location(newChild, child)
newBody.append(child)
node.body = newBody
return node
def translateParseTree(tree, constructors, filename):
[docs] """Modify the Python AST to produce the desired Scenic semantics."""
surgeon = ASTSurgeon(constructors, filename)
tree = fix_missing_locations(surgeon.visit(tree))
return tree, surgeon.requirements
### TRANSLATION PHASE FIVE: AST compilation
def compileTranslatedTree(tree, filename):
try:
return compile(tree, filename, 'exec')
except SyntaxError as e:
raise PythonParseError(e) from None
### TRANSLATION PHASE SIX: Python execution
def executeCodeIn(code, namespace):
[docs] """Execute the final translated Python code in the given namespace."""
try:
exec(code, namespace)
except RejectionException as e:
# Determined statically that the scenario has probability zero.
errors.optionallyDebugRejection(e)
if errors.showInternalBacktrace:
raise InvalidScenarioError(e.args[0]) from e
else:
raise InvalidScenarioError(e.args[0]).with_traceback(e.__traceback__) from None
### TRANSLATION PHASE SEVEN: scenario construction
def storeScenarioStateIn(namespace, requirementSyntax):
[docs] """Post-process an executed Scenic module, extracting state from the veneer."""
# Save requirement syntax and other module-level information
moduleScenario = veneer.currentScenario
factory = veneer.simulatorFactory
bns = gatherBehaviorNamespacesFrom(moduleScenario._behaviors)
def handle(scenario):
scenario._requirementSyntax = requirementSyntax
if isinstance(scenario, type):
scenario._simulatorFactory = staticmethod(factory)
else:
scenario._simulatorFactory = factory
scenario._behaviorNamespaces = bns
handle(moduleScenario)
namespace['_scenarios'] = tuple(veneer.scenarios)
for scenarioClass in veneer.scenarios:
handle(scenarioClass)
# Extract requirements, scan for relations used for pruning, and create closures
# (only for top-level scenario; modular scenarios will be handled when instantiated)
moduleScenario._compileRequirements()
# Save global parameters
for name, value in veneer._globalParameters.items():
if needsLazyEvaluation(value):
raise InvalidScenarioError(f'parameter {name} uses value {value}'
' undefined outside of object definition')
for scenario in veneer.scenarios:
scenario._bindGlobals(veneer._globalParameters)
moduleScenario._bindGlobals(veneer._globalParameters)
namespace['_scenario'] = moduleScenario
def gatherBehaviorNamespacesFrom(behaviors):
[docs] """Gather any global namespaces which could be referred to by behaviors.
We'll need to rebind any sampled values in them at runtime.
"""
behaviorNamespaces = {}
def registerNamespace(modName, ns):
oldNS = behaviorNamespaces.get(modName)
if oldNS:
# Already registered; just do a consistency check to avoid bizarre
# bugs from having multiple versions of the same module around.
if oldNS is not ns:
raise RuntimeError(
f'scenario refers to multiple versions of module {modName}; '
'perhaps you imported it before you started compilation?')
return
behaviorNamespaces[modName] = ns
for name, value in ns.items():
if isinstance(value, ScenicModule):
registerNamespace(value.__name__, value.__dict__)
else:
# Convert values requiring sampling to Distributions
dval = toDistribution(value)
if dval is not value:
ns[name] = dval
for behavior in behaviors:
modName = behavior.__module__
globalNamespace = behavior.makeGenerator.__globals__
registerNamespace(modName, globalNamespace)
return behaviorNamespaces
def constructScenarioFrom(namespace, scenarioName=None):
[docs] """Build a Scenario object from an executed Scenic module."""
modularScenarios = namespace['_scenarios']
def isModularScenario(thing):
return isinstance(thing, type) and issubclass(thing, dynamics.DynamicScenario)
if not scenarioName and isModularScenario(namespace.get('Main', None)):
scenarioName = 'Main'
if scenarioName:
ty = namespace.get(scenarioName, None)
if not isModularScenario(ty):
raise RuntimeError(f'no scenario "{scenarioName}" found')
if ty._requiresArguments():
raise RuntimeError(f'cannot instantiate scenario "{scenarioName}"'
' with no arguments') from None
dynScenario = ty()
elif len(modularScenarios) > 1:
raise RuntimeError('multiple choices for scenario to run '
'(specify using the --scenario option)')
elif modularScenarios and not modularScenarios[0]._requiresArguments():
dynScenario = modularScenarios[0]()
else:
dynScenario = namespace['_scenario']
if not dynScenario._prepared: # true for all except top-level scenarios
# Execute setup block (if any) to create objects and requirements;
# extract any requirements and scan for relations used for pruning
dynScenario._prepare(delayPreconditionCheck=True)
scenario = dynScenario._toScenario(namespace)
# Prune infeasible parts of the space
if usePruning:
pruning.prune(scenario, verbosity=errors.verbosityLevel)
return scenario