Source code for scenic.core.dynamics.actions

"""Actions taken by dynamic agents."""

import abc


[docs]class Action(abc.ABC): """An :term:`action` which can be taken by an agent for one step of a simulation."""
[docs] def canBeTakenBy(self, agent): """Whether this action is allowed to be taken by the given agent. The default implementation always returns True. """ return True
[docs] @abc.abstractmethod def applyTo(self, agent, simulation): """Apply this action to the given agent in the given simulation. This method should call simulator APIs so that the agent will take this action during the next simulated time step. Depending on the simulator API, it may be necessary to batch each agent's actions into a single update: in that case you can have this method set some state on the agent, then apply the actual update in an overridden implementation of `Simulation.executeActions`. For examples, see the CARLA interface: `scenic.simulators.carla.actions` has some CARLA-specific actions which directly call CARLA APIs, while the generic steering and braking actions from `scenic.domains.driving.actions` are implemented using the batching approach (see for example the ``setThrottle`` method of the class `scenic.simulators.carla.model.Vehicle`, which sets state later read by ``CarlaSimulation.executeActions`` in `scenic.simulators.carla.simulator`). """ raise NotImplementedError
[docs]class _EndSimulationAction(Action): """Special action indicating it is time to end the simulation. Only for internal use. """ def __init__(self, line): self.line = line def __str__(self): return f'"terminate simulation" executed on line {self.line}' def applyTo(self, agent, simulation): assert False
[docs]class _EndScenarioAction(Action): """Special action indicating it is time to end the current scenario. Only for internal use. """ def __init__(self, scenario, line): self.scenario = scenario self.line = line def __str__(self): return f'"terminate" executed on line {self.line}' def applyTo(self, agent, simulation): assert False