Source code for scenic.syntax.translator


"""Translator turning Scenic programs into Scenario objects.

The top-level interface to Scenic is provided by two functions:

* `scenarioFromString` -- compile a string of Scenic code;
* `scenarioFromFile` -- compile a Scenic file.

These output a `Scenario` object, from which scenes can be generated.
See the documentation for `Scenario` for details.

When imported, this module hooks the Python import system so that Scenic
modules can be imported using the ``import`` statement. This is primarily for the
translator's own use, but you could import Scenic modules from Python to
inspect them. Because Scenic uses Python's import system, the latter's rules
for finding modules apply, including the handling of packages.

Scenic is compiled in two main steps: translating the code into Python, and
executing the resulting Python module to generate a Scenario object encoding
the objects, distributions, etc. in the scenario. For details, see the function
`compileStream` below.
"""

import sys
import os
import io
import builtins
import traceback
import time
import inspect
import types
import importlib
import importlib.abc
import importlib.util
import itertools
from collections import namedtuple
from contextlib import contextmanager

import tokenize
from tokenize import NAME, NL, NEWLINE, ENDMARKER, OP, NUMBER, COLON, COMMENT, ENCODING
from tokenize import LPAR, RPAR, LSQB, RSQB, COMMA, DOUBLESLASH, DOUBLESLASHEQUAL
from tokenize import AT, LEFTSHIFT, RIGHTSHIFT, VBAR, AMPER, TILDE, CIRCUMFLEX, STAR
from tokenize import LEFTSHIFTEQUAL, RIGHTSHIFTEQUAL, VBAREQUAL, AMPEREQUAL, CIRCUMFLEXEQUAL
from tokenize import INDENT, DEDENT, STRING

import ast
from ast import parse, dump, NodeVisitor, NodeTransformer, copy_location, fix_missing_locations
from ast import Load, Store, Name, Call, Tuple, BinOp, MatMult, BitAnd, BitOr, BitXor, LShift
from ast import RShift, Starred, Lambda, AnnAssign, Set, Str, Num, Subscript, Index

from scenic.core.distributions import Samplable, needsSampling
from scenic.core.lazy_eval import needsLazyEvaluation
from scenic.core.workspaces import Workspace
from scenic.core.scenarios import Scenario
from scenic.core.object_types import Constructible
from scenic.core.utils import ParseError, RuntimeParseError, InvalidScenarioError
import scenic.core.pruning as pruning
import scenic.syntax.veneer as veneer
import scenic.syntax.relations as relations

### THE TOP LEVEL: compiling a Scenic program

[docs]def scenarioFromString(string, filename='<string>', cacheImports=False): """Compile a string of Scenic code into a `Scenario`. The optional **filename** is used for error messages.""" stream = io.BytesIO(string.encode()) return scenarioFromStream(stream, filename=filename, cacheImports=cacheImports)
[docs]def scenarioFromFile(path, cacheImports=False): """Compile a Scenic file into a `Scenario`. Args: path (str): path to a Scenic file cacheImports (bool): Whether to cache any imported Scenic modules. The default behavior is to not do this, so that subsequent attempts to import such modules will cause them to be recompiled. If it is safe to cache Scenic modules across multiple compilations, set this argument to True. Then importing a Scenic module will have the same behavior as importing a Python module. Returns: A `Scenario` object representing the Scenic scenario. """ if not os.path.exists(path): raise FileNotFoundError(path) fullpath = os.path.realpath(path) head, extension = os.path.splitext(fullpath) if not extension or extension[1:] not in scenicExtensions: ok = ', '.join(scenicExtensions) err = f'Scenic scenario does not have valid extension ({ok})' raise RuntimeError(err) directory, name = os.path.split(head) with open(path, 'rb') as stream: return scenarioFromStream(stream, filename=fullpath, path=path, cacheImports=cacheImports)
[docs]def scenarioFromStream(stream, filename='<stream>', path=None, cacheImports=False): """Compile a stream of Scenic code into a `Scenario`.""" # Compile the code as if it were a top-level module oldModules = list(sys.modules.keys()) try: with topLevelNamespace(path) as namespace: compileStream(stream, namespace, filename=filename) finally: if not cacheImports: toRemove = [] for name, module in sys.modules.items(): if name not in oldModules and getattr(module, '_isScenicModule', False): toRemove.append(name) for name in toRemove: del sys.modules[name] # Construct a Scenario from the resulting namespace return constructScenarioFrom(namespace)
[docs]@contextmanager def topLevelNamespace(path=None): """Creates an environment like that of a Python script being run directly. Specifically, __name__ is '__main__', __file__ is the path used to invoke the script (not necessarily its absolute path), and the parent directory is added to the path so that 'import blobbo' will import blobbo from that directory if it exists there. """ directory = os.getcwd() if path is None else os.path.dirname(path) namespace = { '__name__': '__main__' } if path is not None: namespace['__file__'] = path sys.path.insert(0, directory) try: yield namespace finally: del sys.path[0]
[docs]def compileStream(stream, namespace, filename='<stream>'): """Compile a stream of Scenic code and execute it in a namespace. The compilation procedure consists of the following main steps: 1. Tokenize the input using the Python tokenizer. 2. Partition the tokens into blocks separated by import statements. This is done by the `partitionByImports` function. 3. Translate Scenic constructions into valid Python syntax. This is done by the `TokenTranslator`. 4. Parse the resulting Python code into an AST using the Python parser. 5. Modify the AST to achieve the desired semantics for Scenic. This is done by the `translateParseTree` function. 6. Compile and execute the modified AST. 7. After executing all blocks, extract the global state (e.g. objects). This is done by the `storeScenarioStateIn` function. """ if verbosity >= 2: veneer.verbosePrint(f' Compiling Scenic module from {filename}...') startTime = time.time() # Tokenize input stream try: tokens = list(tokenize.tokenize(stream.readline)) except tokenize.TokenError as e: line = e.args[1][0] if isinstance(e.args[1], tuple) else e.args[1] raise TokenParseError(line, 'file ended during multiline string or expression') # Partition into blocks with all imports at the end (since imports could # pull in new constructor (Scenic class) definitions, which change the way # subsequent tokens are transformed) blocks = partitionByImports(tokens) veneer.activate() newSourceBlocks = [] try: # Execute preamble exec(compile(preamble, '<veneer>', 'exec'), namespace) # Execute each block for blockNum, block in enumerate(blocks): # Find all custom constructors defined so far (possibly imported) constructors = findConstructorsIn(namespace) # Translate tokens to valid Python syntax startLine = max(1, block[0][2][0]) translator = TokenTranslator(constructors) newSource, allConstructors = translator.translate(block) trimmed = newSource[2*(startLine-1):] # remove blank lines used to align errors newSourceBlocks.append(trimmed) if dumpTranslatedPython: print(f'### Begin translated Python from block {blockNum} of {filename}') print(newSource) print('### End translated Python') # Parse the translated source tree = parseTranslatedSource(newSource, filename) # Modify the parse tree to produce the correct semantics newTree, requirements = translateParseTree(tree, allConstructors) if dumpFinalAST: print(f'### Begin final AST from block {blockNum} of {filename}') print(ast.dump(newTree, include_attributes=True)) print('### End final AST') # Compile the modified tree code = compileTranslatedTree(newTree, filename) # Execute it executeCodeIn(code, namespace, filename) # Extract scenario state from veneer and store it storeScenarioStateIn(namespace, requirements, filename) finally: veneer.deactivate() if verbosity >= 2: totalTime = time.time() - startTime veneer.verbosePrint(f' Compiled Scenic module in {totalTime:.4g} seconds.') allNewSource = ''.join(newSourceBlocks) return code, allNewSource
### TRANSLATION PHASE ZERO: definitions of language elements not already in Python ## Options showInternalBacktrace = False dumpTranslatedPython = False dumpFinalAST = False verbosity = 0 usePruning = True ## Preamble # (included at the beginning of every module to be translated; # imports the implementations of the public language features) preamble = """\ from scenic.syntax.veneer import * """ ## Get Python names of various elements ## (for checking consistency between the translator and the veneer) api = set(veneer.__all__) ## Functions used internally rangeConstructor = 'Range' createDefault = 'PropertyDefault' internalFunctions = { rangeConstructor, createDefault } # sanity check: these functions actually exist for imp in internalFunctions: assert imp in api, imp ## Statements implemented by functions requireStatement = 'require' paramStatement = 'param' functionStatements = { requireStatement, paramStatement, 'mutate' } # sanity check: implementations of statements actually exist for imp in functionStatements: assert imp in api, imp ## Built-in functions builtinFunctions = { 'resample', 'verbosePrint' } # sanity check: implementations of built-in functions actually exist for imp in builtinFunctions: assert imp in api, imp ## Constructors and specifiers # statement defining a new constructor (Scenic class); # we still recognize 'constructor' for backwards-compatibility constructorStatements = ('class', 'constructor') Constructor = namedtuple('Constructor', ('name', 'parent', 'specifiers')) pointSpecifiers = { ('visible', 'from'): 'VisibleFrom', ('offset', 'by'): 'OffsetBy', ('offset', 'along'): 'OffsetAlongSpec', ('at',): 'At', ('in',): 'In', ('on',): 'In', ('beyond',): 'Beyond', ('visible',): 'VisibleSpec', ('left', 'of'): 'LeftSpec', ('right', 'of'): 'RightSpec', ('ahead', 'of'): 'Ahead', ('behind',): 'Behind', ('following',): 'Following', } orientedPointSpecifiers = { ('apparently', 'facing'): 'ApparentlyFacing', ('facing', 'toward'): 'FacingToward', ('facing',): 'Facing' } objectSpecifiers = { } # sanity check: implementations of specifiers actually exist for imp in pointSpecifiers.values(): assert imp in api, imp for imp in orientedPointSpecifiers.values(): assert imp in api, imp for imp in objectSpecifiers.values(): assert imp in api, imp builtinConstructors = { 'Point': Constructor('Point', None, pointSpecifiers), 'OrientedPoint': Constructor('OrientedPoint', 'Point', orientedPointSpecifiers), 'Object': Constructor('Object', 'OrientedPoint', objectSpecifiers) } functionStatements.update(builtinConstructors) # sanity check: built-in constructors actually exist for const in builtinConstructors: assert const in api, const ## Prefix operators prefixOperators = { ('relative', 'position'): 'RelativePosition', ('relative', 'heading'): 'RelativeHeading', ('apparent', 'heading'): 'ApparentHeading', ('distance', 'from'): 'DistanceFrom', ('distance', 'to'): 'DistanceFrom', ('angle', 'from'): 'AngleFrom', ('angle', 'to'): 'AngleTo', ('ego', '='): 'ego', ('front', 'left'): 'FrontLeft', ('front', 'right'): 'FrontRight', ('back', 'left'): 'BackLeft', ('back', 'right'): 'BackRight', ('front',): 'Front', ('back',): 'Back', ('left',): 'Left', ('right',): 'Right', ('follow',): 'Follow', ('visible',): 'Visible' } assert all(1 <= len(op) <= 2 for op in prefixOperators) prefixIncipits = { op[0] for op in prefixOperators } assert not any(op in functionStatements for op in prefixIncipits) # sanity check: implementations of prefix operators actually exist for imp in prefixOperators.values(): assert imp in api, imp ## Infix operators # pseudo-operator for encoding argument packages for (3+)-ary ops packageToken = (RIGHTSHIFT, '>>') packageNode = RShift InfixOp = namedtuple('InfixOp', ('syntax', 'implementation', 'arity', 'token', 'node')) infixOperators = ( # existing Python operators with new semantics InfixOp('@', 'Vector', 2, None, MatMult), # operators not in Python (in decreasing precedence order) InfixOp('at', 'FieldAt', 2, (LEFTSHIFT, '<<'), LShift), InfixOp('relative to', 'RelativeTo', 2, (AMPER, '&'), BitAnd), InfixOp('offset by', 'RelativeTo', 2, (AMPER, '&'), BitAnd), InfixOp('offset along', 'OffsetAlong', 3, (CIRCUMFLEX, '^'), BitXor), InfixOp('can see', 'CanSee', 2, (VBAR, '|'), BitOr), # just syntactic conveniences, not really operators InfixOp('from', None, 2, (COMMA, ','), None), InfixOp('for', None, 2, (COMMA, ','), None), InfixOp('to', None, 2, (COMMA, ','), None), InfixOp('by', None, 2, packageToken, None) ) infixTokens = {} infixImplementations = {} infixIncipits = set() for op in infixOperators: # if necessary, set up map from Scenic to Python syntax if op.token is not None: tokens = tuple(op.syntax.split(' ')) assert 1 <= len(tokens) <= 2, op assert tokens not in infixTokens, op infixTokens[tokens] = op.token incipit = tokens[0] assert incipit not in functionStatements, op infixIncipits.add(incipit) # if necessary, set up map from Python to Scenic semantics imp = op.implementation if imp is not None: assert imp in api, op node = op.node if node in infixImplementations: # two operators may have the same implementation oldArity, oldName = infixImplementations[node] assert op.arity == oldArity, (op, oldName) assert imp == oldName, (op, oldName) else: infixImplementations[node] = (op.arity, imp) allIncipits = prefixIncipits | infixIncipits ## Direct syntax replacements replacements = { # TODO police the usage of these? could yield bizarre error messages 'of': tuple(), 'deg': ((STAR, '*'), (NUMBER, '0.01745329252')), 'ego': ((NAME, 'ego'), (LPAR, '('), (RPAR, ')')) } ## Illegal and reserved syntax illegalTokens = { LEFTSHIFT, RIGHTSHIFT, VBAR, AMPER, TILDE, CIRCUMFLEX, LEFTSHIFTEQUAL, RIGHTSHIFTEQUAL, VBAREQUAL, AMPEREQUAL, CIRCUMFLEXEQUAL, DOUBLESLASH, DOUBLESLASHEQUAL } # sanity check: stand-in tokens for infix operators must be illegal for token in infixTokens.values(): ttype = token[0] assert (ttype is COMMA or ttype in illegalTokens), token keywords = (set(constructorStatements) | internalFunctions | functionStatements | replacements.keys()) ### TRANSLATION PHASE ONE: handling imports ## Meta path finder and loader for Scenic files scenicExtensions = ('sc', 'scenic') class ScenicMetaFinder(importlib.abc.MetaPathFinder): def find_spec(self, name, paths, target): if paths is None: paths = sys.path modname = name else: modname = name.rpartition('.')[2] for path in paths: for extension in scenicExtensions: filename = modname + '.' + extension filepath = os.path.join(path, filename) if os.path.exists(filepath): filepath = os.path.abspath(filepath) spec = importlib.util.spec_from_file_location(name, filepath, loader=ScenicLoader(filepath, filename)) return spec return None class ScenicLoader(importlib.abc.InspectLoader): def __init__(self, filepath, filename): self.filepath = filepath self.filename = filename def create_module(self, spec): return None def exec_module(self, module): # Read source file and compile it with open(self.filepath, 'r') as stream: source = stream.read() with open(self.filepath, 'rb') as stream: code, pythonSource = compileStream(stream, module.__dict__, filename=self.filepath) # Mark as a Scenic module module._isScenicModule = True # Save code, source, and translated source for later inspection module._code = code module._source = source module._pythonSource = pythonSource def is_package(self, fullname): return False def get_code(self, fullname): module = importlib.import_module(fullname) assert module._isScenicModule, module return module._code def get_source(self, fullname): module = importlib.import_module(fullname) assert module._isScenicModule, module return module._pythonSource # register the meta path finder sys.meta_path.insert(0, ScenicMetaFinder()) ## Post-import hook to inherit objects, etc. from imported Scenic modules
[docs]def hooked_import(*args, **kwargs): """Version of __import__ hooked by Scenic to capture Scenic modules.""" module = original_import(*args, **kwargs) if getattr(module, '_isScenicModule', False): if veneer.isActive(): veneer.allObjects.extend(module._objects) veneer.globalParameters.update(module._params) veneer.externalParameters.extend(module._externalParams) veneer.inheritedReqs.extend(module._requirements) return module
original_import = builtins.__import__ builtins.__import__ = hooked_import ## Miscellaneous utilities
[docs]def partitionByImports(tokens): """Partition the tokens into blocks ending with import statements.""" blocks = [] currentBlock = [] duringImport = False haveImported = False finishLine = False parenLevel = 0 for token in tokens: startNewBlock = False if token.exact_type == LPAR: parenLevel += 1 elif token.exact_type == RPAR: parenLevel -= 1 if finishLine: if token.type in (NEWLINE, NL) and parenLevel == 0: finishLine = False if duringImport: duringImport = False haveImported = True else: assert not duringImport finishLine = True if token.type == NAME and token.string == 'import' or token.string == 'from': duringImport = True elif token.type in (NEWLINE, NL, COMMENT, ENCODING): finishLine = False elif haveImported: # could use new constructors; needs to be in a new block startNewBlock = True if startNewBlock: blocks.append(currentBlock) currentBlock = [token] haveImported = False else: currentBlock.append(token) blocks.append(currentBlock) # add last block return blocks
[docs]def findConstructorsIn(namespace): """Find all constructors (Scenic classes) defined in a namespace.""" constructors = [] for name, value in namespace.items(): if inspect.isclass(value) and issubclass(value, Constructible): if name in builtinConstructors: continue parent = None for base in value.__bases__: if issubclass(base, Constructible): assert parent is None parent = base constructors.append(Constructor(name, parent.__name__, {})) return constructors
### TRANSLATION PHASE TWO: translation at the level of tokens
[docs]class TokenParseError(ParseError): """Parse error occurring during token translation.""" def __init__(self, tokenOrLine, message): line = tokenOrLine.start[0] if hasattr(tokenOrLine, 'start') else tokenOrLine self.lineno = line super().__init__('Parse error in line ' + str(line) + ': ' + message)
[docs]class Peekable: """Utility class to allow iterator lookahead.""" def __init__(self, gen): self.gen = iter(gen) self.current = next(self.gen, None) def __iter__(self): return self def __next__(self): cur = self.current if cur is None: raise StopIteration self.current = next(self.gen, None) return cur def peek(self): return self.current
def peek(thing): return thing.peek()
[docs]class TokenTranslator: """Translates a Scenic token stream into valid Python syntax. This is a stateful process because constructor (Scenic class) definitions change the way subsequent code is parsed. """ def __init__(self, constructors=()): self.functions = set(functionStatements) self.constructors = dict(builtinConstructors) for constructor in constructors: name = constructor.name assert name not in self.constructors self.constructors[name] = constructor self.functions.add(name) def createConstructor(self, name, parent, specs={}): if parent is None: parent = 'Object' # default superclass self.constructors[name] = Constructor(name, parent, specs) self.functions.add(name) return parent def specifiersForConstructor(self, const): name, parent, specs = self.constructors[const] if parent is None: return specs else: ps = dict(self.specifiersForConstructor(parent)) ps.update(specs) return ps
[docs] def translate(self, tokens): """Do the actual translation of the token stream.""" tokens = Peekable(tokens) newTokens = [] functionStack = [] inConstructor = False # inside a constructor or one of its specifiers specifiersIndented = False parenLevel = 0 row, col = 0, 0 # position of next token to write out orow, ocol = 0, 0 # end of last token in the original source startOfLine = True # TODO improve hack? functions = self.functions constructors = self.constructors for token in tokens: ttype = token.exact_type tstring = token.string skip = False endToken = token # token to advance past in column count movedUpTo = False def injectToken(tok, spaceAfter=0): """Add a token to the output stream, trying to preserve spacing.""" nonlocal row, col, movedUpTo if not movedUpTo: moveUpTo(token) moveBeyond(token) movedUpTo = True ty, string = tok[:2] if len(tok) >= 3: moveBeyond(tok) srow, scol = tok[2] erow, ecol = tok[3] width = ecol - scol height = erow - srow else: width = len(string) height = 0 ncol = ecol if height > 0 else col + width newToken = (ty, string, (row, col), (row+height, ncol), '') newTokens.append(newToken) if ty in (NEWLINE, NL): row += 1 col = 0 elif height > 0: row += height col = ncol else: col += width + spaceAfter def moveUpTo(tok): nonlocal row, col, orow, ocol nrow, ncol = tok[2] if nrow > orow: row = nrow col = ncol else: gap = ncol - ocol assert gap >= 0, (tok, row, col, ocol) col += gap def moveBeyond(tok): nonlocal orow, ocol nrow, ncol = tok[3] if nrow > orow or (nrow == orow and ncol > ocol): orow = nrow ocol = ncol def advance(skip=True): nextToken = next(tokens) if skip: moveBeyond(nextToken) else: injectToken(nextToken) return nextToken def callFunction(function, argument=None): nonlocal skip, matched, functionStack functionStack.append((function, parenLevel)) injectToken((NAME, function)) injectToken((LPAR, '(')) if argument is not None: injectToken((NAME, argument)) injectToken((COMMA, ',')) skip = True matched = True # Catch Python operators that can't be used in Scenic if ttype in illegalTokens: raise TokenParseError(token, f'illegal operator "{tstring}"') # Determine which operators are allowed in current context context, startLevel = functionStack[-1] if functionStack else (None, None) inConstructorContext = (context in constructors and parenLevel == startLevel) if inConstructorContext: inConstructor = True allowedPrefixOps = self.specifiersForConstructor(context) allowedInfixOps = dict() else: allowedPrefixOps = prefixOperators allowedInfixOps = infixTokens # Parse next token if ttype == LPAR or ttype == LSQB: # keep track of nesting level parenLevel += 1 elif ttype == RPAR or ttype == RSQB: # ditto parenLevel -= 1 elif ttype == STRING: # special case for global parameters with quoted names: # transform "name"=value into "name", value if (len(functionStack) > 0 and functionStack[-1][0] == paramStatement and peek(tokens).string == '='): next(tokens) # consume '=' injectToken(token) injectToken((COMMA, ',')) skip = True elif ttype == NAME: # the interesting case: almost all new syntax falls in here # try to match 2-word language constructs matched = False nextToken = peek(tokens) # lookahead so we can give 2-word ops precedence if nextToken is not None: endToken = nextToken # tentatively; will be overridden if no match nextString = nextToken.string twoWords = (tstring, nextString) if startOfLine and tstring == 'for': # TODO improve hack? matched = True endToken = token elif startOfLine and tstring in constructorStatements: # class definition if nextToken.type != NAME or nextString in keywords: raise TokenParseError(nextToken, f'invalid class name "{nextString}"') nextToken = next(tokens) # consume name parent = None pythonClass = False if peek(tokens).exact_type == LPAR: # superclass specification next(tokens) nextToken = next(tokens) parent = nextToken.string if nextToken.exact_type != NAME or parent in keywords: raise TokenParseError(nextToken, f'invalid superclass "{parent}"') if parent not in self.constructors: if tstring != 'class': raise TokenParseError(nextToken, f'superclass "{parent}" is not a Scenic class') # appears to be a Python class definition pythonClass = True else: nextToken = next(tokens) if nextToken.exact_type != RPAR: raise TokenParseError(nextToken, 'malformed class definition') injectToken((NAME, 'class'), spaceAfter=1) injectToken((NAME, nextString)) injectToken((LPAR, '(')) if pythonClass: # pass Python class definitions through unchanged while nextToken.exact_type != COLON: injectToken(nextToken) nextToken = next(tokens) injectToken(nextToken) else: if peek(tokens).exact_type != COLON: raise TokenParseError(nextToken, 'malformed class definition') parent = self.createConstructor(nextString, parent) injectToken((NAME, parent)) injectToken((RPAR, ')')) skip = True matched = True endToken = nextToken elif twoWords in allowedPrefixOps: # 2-word prefix operator callFunction(allowedPrefixOps[twoWords]) advance() # consume second word elif not startOfLine and twoWords in allowedInfixOps: # 2-word infix operator injectToken(allowedInfixOps[twoWords]) advance() # consume second word skip = True matched = True elif inConstructorContext and tstring == 'with': # special case for 'with' specifier callFunction('With', argument=f'"{nextString}"') advance() # consume property name elif tstring == requireStatement and nextString == '[': # special case for require[p] next(tokens) # consume '[' nextToken = next(tokens) if nextToken.exact_type != NUMBER: raise TokenParseError(nextToken, 'soft requirement must have constant probability') prob = nextToken.string nextToken = next(tokens) if nextToken.exact_type != RSQB: raise TokenParseError(nextToken, 'malformed soft requirement') callFunction(requireStatement, argument=prob) endToken = nextToken if not matched: # 2-word constructs don't match; try 1-word endToken = token oneWord = (tstring,) if oneWord in allowedPrefixOps: # 1-word prefix operator callFunction(allowedPrefixOps[oneWord]) elif not startOfLine and oneWord in allowedInfixOps: # 1-word infix operator injectToken(allowedInfixOps[oneWord]) skip = True elif inConstructorContext: # couldn't match any 1- or 2-word specifier raise TokenParseError(token, f'unknown specifier "{tstring}"') elif tstring in functions: # built-in function callFunction(tstring) elif tstring in replacements: # direct replacement for tok in replacements[tstring]: injectToken(tok) skip = True elif startOfLine and tstring == 'from': # special case to allow 'from X import Y' pass elif tstring in keywords: # some malformed usage raise TokenParseError(token, f'unexpected keyword "{tstring}"') else: pass # nothing matched; pass through unchanged to Python # Detect the end of function argument lists if len(functionStack) > 0: context, startLevel = functionStack[-1] while parenLevel < startLevel: # we've closed all parens for the current function functionStack.pop() injectToken((RPAR, ')')) context, startLevel = (None, 0) if len(functionStack) == 0 else functionStack[-1] if inConstructor and parenLevel == startLevel and ttype == COMMA: # starting a new specifier while functionStack and context not in constructors: functionStack.pop() injectToken((RPAR, ')')) context, startLevel = (None, 0) if len(functionStack) == 0 else functionStack[-1] # allow the next specifier to be on the next line, if indented injectToken(token) # emit comma immediately skip = True nextToken = peek(tokens) specOnNewLine = False while nextToken.exact_type in (NEWLINE, NL, COMMENT, ENDMARKER): specOnNewLine = True if nextToken.exact_type == COMMENT: advance(skip=False) # preserve comment nextToken = peek(tokens) if nextToken.exact_type not in (NEWLINE, NL): raise TokenParseError(nextToken, 'comma with no specifier following') advance(skip=False) # preserve newline nextToken = peek(tokens) if specOnNewLine and not specifiersIndented: nextToken = next(tokens) # consume indent if nextToken.exact_type != INDENT: raise TokenParseError(nextToken, 'expected indented specifier (extra comma on previous line?)') injectToken(nextToken) specifiersIndented = True elif ttype == NEWLINE or ttype == ENDMARKER or ttype == COMMENT: # end of line inConstructor = False if parenLevel != 0: raise TokenParseError(token, 'unmatched parens/brackets') while len(functionStack) > 0: functionStack.pop() injectToken((RPAR, ')')) # Output token unchanged, unless handled above if not skip: injectToken(token) else: moveBeyond(endToken) startOfLine = (ttype in (ENCODING, NEWLINE, NL, INDENT, DEDENT)) rewrittenSource = tokenize.untokenize(newTokens) if not isinstance(rewrittenSource, str): # TODO improve? rewrittenSource = str(rewrittenSource, encoding='utf-8') return rewrittenSource, self.constructors
### TRANSLATION PHASE THREE: parsing of Python resulting from token translation
[docs]class PythonParseError(SyntaxError, ParseError): """Parse error occurring during Python parsing or compilation.""" @classmethod def fromSyntaxError(cls, exc): msg, (filename, lineno, offset, line) = exc.args try: # attempt to recover line from original file with open(filename, 'r') as f: line = list(itertools.islice(f, lineno-1, lineno)) assert len(line) == 1 line = line[0] offset = min(offset, len(line)) # TODO improve? except FileNotFoundError: pass newExc = cls(msg, (filename, lineno, offset, line)) return newExc.with_traceback(exc.__traceback__)
def parseTranslatedSource(source, filename): try: tree = parse(source, filename=filename) return tree except SyntaxError as e: cause = e if showInternalBacktrace else None raise PythonParseError.fromSyntaxError(e) from cause ### TRANSLATION PHASE FOUR: modifying the parse tree noArgs = ast.arguments( args=[], vararg=None, kwonlyargs=[], kw_defaults=[], kwarg=None, defaults=[]) selfArg = ast.arguments( args=[ast.arg(arg='self', annotation=None)], vararg=None, kwonlyargs=[], kw_defaults=[], kwarg=None, defaults=[]) if sys.version_info >= (3, 8): # TODO cleaner way to handle this? noArgs.posonlyargs = [] selfArg.posonlyargs = []
[docs]class AttributeFinder(NodeVisitor): """Utility class for finding all referenced attributes of a given name.""" @staticmethod def find(target, node): af = AttributeFinder(target) af.visit(node) return af.attributes def __init__(self, target): super().__init__() self.target = target self.attributes = set() def visit_Attribute(self, node): val = node.value if isinstance(val, Name) and val.id == self.target: self.attributes.add(node.attr) self.visit(val)
[docs]class ASTParseError(ParseError): """Parse error occuring during modification of the Python AST.""" def __init__(self, line, message): self.lineno = line super().__init__('Parse error in line ' + str(line) + ': ' + message)
class ASTSurgeon(NodeTransformer): def __init__(self, constructors): super().__init__() self.constructors = set(constructors.keys()) self.requirements = [] def parseError(self, node, message): raise ASTParseError(node.lineno, message) def unpack(self, arg, expected, node): """Unpack arguments to ternary (and up) infix operators.""" assert expected > 0 if isinstance(arg, BinOp) and isinstance(arg.op, packageNode): if expected == 1: raise self.parseError(node, 'gave too many arguments to infix operator') else: return self.unpack(arg.left, expected - 1, node) + [self.visit(arg.right)] elif expected > 1: raise self.parseError(node, 'gave too few arguments to infix operator') else: return [self.visit(arg)] def visit_BinOp(self, node): """Convert infix operators to calls to the corresponding Scenic operator implementations.""" left = node.left right = node.right op = node.op if isinstance(op, packageNode): # unexpected argument package raise self.parseError(node, 'unexpected keyword "by"') elif type(op) in infixImplementations: # an operator with non-Python semantics arity, impName = infixImplementations[type(op)] implementation = Name(impName, Load()) copy_location(implementation, node) assert arity >= 2 args = [self.visit(left)] + self.unpack(right, arity-1, node) newNode = Call(implementation, args, []) else: # all other operators have the Python semantics newNode = BinOp(self.visit(left), op, self.visit(right)) return copy_location(newNode, node) def visit_Tuple(self, node): """Convert pairs into uniform distributions.""" if isinstance(node.ctx, Store): return self.generic_visit(node) if len(node.elts) != 2: raise self.parseError(node, 'interval must have exactly two endpoints') newElts = [self.visit(elt) for elt in node.elts] return copy_location(Call(Name(rangeConstructor, Load()), newElts, []), node) def visit_Call(self, node): """Wrap require statements with lambdas and unpack any argument packages.""" func = node.func if isinstance(func, Name) and func.id == requireStatement: # Require statement # Soft reqs have 2 arguments, including the probability, which is given as the # first argument by the token translator; so we allow an extra argument here and # validate it later on (in case the user wrongly gives 2 arguments to require). if not (1 <= len(node.args) <= 2): raise self.parseError(node, 'require takes exactly one argument') if len(node.keywords) != 0: raise self.parseError(node, 'require takes no keyword arguments') cond = node.args[-1] if isinstance(cond, Starred): raise self.parseError(node, 'argument unpacking cannot be used with require') req = self.visit(cond) reqID = Num(len(self.requirements)) # save ID number self.requirements.append(req) # save condition for later inspection when pruning closure = Lambda(noArgs, req) # enclose requirement in a lambda lineNum = Num(node.lineno) # save line number for error messages copy_location(closure, req) copy_location(lineNum, req) newArgs = [reqID, closure, lineNum] if len(node.args) == 2: # get probability for soft requirements prob = node.args[0] if not isinstance(prob, Num): raise self.parseError(node, 'malformed requirement ' '(should be a single expression)') newArgs.append(prob) return copy_location(Call(func, newArgs, []), node) else: # Ordinary function call newFunc = self.visit(func) newArgs = [] # Translate arguments, unpacking any argument packages for arg in node.args: if isinstance(arg, BinOp) and isinstance(arg.op, packageNode): newArgs.extend(self.unpack(arg, 2, node)) else: newArgs.append(self.visit(arg)) newKeywords = [self.visit(kwarg) for kwarg in node.keywords] return copy_location(Call(newFunc, newArgs, newKeywords), node) def visit_ClassDef(self, node): """Process property defaults for Scenic classes.""" if node.name in self.constructors: # Scenic class definition newBody = [] for child in node.body: child = self.visit(child) if isinstance(child, AnnAssign): # default value for property origValue = child.annotation target = child.target # extract any attributes for this property metaAttrs = [] if isinstance(target, Subscript): sl = target.slice if not isinstance(sl, Index): self.parseError(sl, 'malformed attributes for property default') sl = sl.value if isinstance(sl, Name): metaAttrs.append(sl.id) elif isinstance(sl, Tuple): for elt in sl.elts: if not isinstance(elt, Name): self.parseError(elt, 'malformed attributes for property default') metaAttrs.append(elt.id) else: self.parseError(sl, 'malformed attributes for property default') newTarget = Name(target.value.id, Store()) copy_location(newTarget, target) target = newTarget # find dependencies of the default value properties = AttributeFinder.find('self', origValue) # create default value object args = [ Set([Str(prop) for prop in properties]), Set([Str(attr) for attr in metaAttrs]), Lambda(selfArg, origValue) ] value = Call(Name(createDefault, Load()), args, []) copy_location(value, origValue) newChild = AnnAssign( target=target, annotation=value, value=None, simple=True) child = copy_location(newChild, child) newBody.append(child) node.body = newBody return node else: # ordinary Python class # it's impossible at the moment to define a Python class in a Scenic file, # but we'll leave this check here for future-proofing for base in node.bases: name = None if isinstance(base, Call): name = base.func.id elif isinstance(base, Name): name = base.id if name is not None and name in self.constructors: self.parseError(node, f'Python class {node.name} derives from Scenic class {name}') return self.generic_visit(node)
[docs]def translateParseTree(tree, constructors): """Modify the Python AST to produce the desired Scenic semantics.""" surgeon = ASTSurgeon(constructors) tree = fix_missing_locations(surgeon.visit(tree)) return tree, surgeon.requirements
### TRANSLATION PHASE FIVE: AST compilation def compileTranslatedTree(tree, filename): try: return compile(tree, filename, 'exec') except SyntaxError as e: cause = e if showInternalBacktrace else None raise PythonParseError.fromSyntaxError(e) from cause ### TRANSLATION PHASE SIX: Python execution
[docs]def generateTracebackFrom(exc, sourceFile): """Trim an exception's traceback to the last line of Scenic code.""" # find last stack frame in the source file tbexc = traceback.TracebackException.from_exception(exc) last = None tbs = [] currentTb = exc.__traceback__ for depth, frame in enumerate(tbexc.stack): assert currentTb is not None tbs.append(currentTb) currentTb = currentTb.tb_next if frame.filename == sourceFile: last = depth assert last is not None # create new trimmed traceback lastTb = tbs[last] lastLine = lastTb.tb_lineno tbs = tbs[:last] try: currentTb = types.TracebackType(None, lastTb.tb_frame, lastTb.tb_lasti, lastLine) except TypeError: # Python 3.6 does not allow creation of traceback objects, so we just # return the original traceback return exc.__traceback__, lastLine for tb in reversed(tbs): currentTb = types.TracebackType(currentTb, tb.tb_frame, tb.tb_lasti, tb.tb_lineno) return currentTb, lastLine
[docs]class InterpreterParseError(ParseError): """Parse error occuring during Python execution.""" def __init__(self, exc, line): self.lineno = line exc_name = type(exc).__name__ super().__init__(f'Parse error in line {line}: {exc_name}: {exc}')
[docs]def executeCodeIn(code, namespace, filename): """Execute the final translated Python code in the given namespace.""" executePythonFunction(lambda: exec(code, namespace), filename)
[docs]def executePythonFunction(func, filename): """Execute a Python function, giving correct Scenic backtraces for any exceptions.""" try: return func() except RuntimeParseError as e: cause = e if showInternalBacktrace else None tb, line = generateTracebackFrom(e, filename) raise InterpreterParseError(e, line).with_traceback(tb) from cause
### TRANSLATION PHASE SEVEN: scenario construction
[docs]def storeScenarioStateIn(namespace, requirementSyntax, filename): """Post-process an executed Scenic module, extracting state from the veneer.""" # Extract created Objects namespace['_objects'] = tuple(veneer.allObjects) namespace['_egoObject'] = veneer.egoObject # Extract global parameters namespace['_params'] = veneer.globalParameters for name, value in veneer.globalParameters.items(): if needsLazyEvaluation(value): raise InvalidScenarioError(f'parameter {name} uses value {value}' ' undefined outside of object definition') # Extract external parameters namespace['_externalParams'] = tuple(veneer.externalParameters) # Extract requirements, scan for relations used for pruning, and create closures requirements = veneer.pendingRequirements finalReqs = veneer.inheritedReqs requirementDeps = set() # things needing to be sampled to evaluate the requirements namespace['_requirements'] = finalReqs namespace['_requirementDeps'] = requirementDeps def makeClosure(req, bindings, ego, line): """Create a closure testing the requirement in the correct runtime state.""" def evaluator(): result = req() assert not needsSampling(result) if needsLazyEvaluation(result): raise InvalidScenarioError(f'requirement on line {line} uses value' ' undefined outside of object definition') return result def closure(values): # rebind any names referring to sampled objects for name, value in bindings.items(): if value in values: namespace[name] = values[value] # evaluate requirement condition, reporting errors on the correct line try: veneer.evaluatingRequirement = True # rebind ego object, which can be referred to implicitly assert veneer.egoObject is None if ego is not None: veneer.egoObject = values[ego] result = executePythonFunction(evaluator, filename) finally: veneer.evaluatingRequirement = False veneer.egoObject = None return result return closure for reqID, (req, bindings, ego, line, prob) in requirements.items(): # Check whether requirement implies any relations used for pruning reqNode = requirementSyntax[reqID] relations.inferRelationsFrom(reqNode, bindings, ego, line) # Gather dependencies of the requirement for value in bindings.values(): if needsSampling(value): requirementDeps.add(value) if needsLazyEvaluation(value): raise InvalidScenarioError(f'requirement on line {line} uses value {value}' ' undefined outside of object definition') if ego is not None: assert isinstance(ego, Samplable) requirementDeps.add(ego) # Construct closure finalReqs.append((makeClosure(req, bindings, ego, line), prob))
[docs]def constructScenarioFrom(namespace): """Build a Scenario object from an executed Scenic module.""" # Extract ego object if namespace['_egoObject'] is None: raise InvalidScenarioError('did not specify ego object') # Extract workspace, if one is specified if 'workspace' in namespace: workspace = namespace['workspace'] if not isinstance(workspace, Workspace): raise InvalidScenarioError(f'workspace {workspace} is not a Workspace') if needsSampling(workspace): raise InvalidScenarioError('workspace must be a fixed region') if needsLazyEvaluation(workspace): raise InvalidScenarioError('workspace uses value undefined ' 'outside of object definition') else: workspace = None # Create Scenario object scenario = Scenario(workspace, namespace['_objects'], namespace['_egoObject'], namespace['_params'], namespace['_externalParams'], namespace['_requirements'], namespace['_requirementDeps']) # Prune infeasible parts of the space if usePruning: pruning.prune(scenario, verbosity=verbosity) return scenario