Source code for scenic.simulators.lgsvl.simulator

"""Dynamic simulator interface for LGSVL."""

import math
import time
import warnings

import lgsvl

import scenic.core.simulators as simulators
from scenic.core.vectors import Vector
import scenic.simulators.lgsvl.utils as utils
from scenic.syntax.veneer import verbosePrint


[docs]class LGSVLSimulator(simulators.Simulator): """A connection to an instance of LGSVL. See the `SVL documentation`_ for details on how to set the parameters below. Uses a default timestep of 0.1 seconds. Args: sceneID (str): Identifier for the map ("scene") to load in SVL. address (str): Address where SVL is running. port (int): Port on which to connect to SVL. alwaysReload (bool): Whether to force reloading the map upon connecting, even if the simulator already has the desired map loaded. .. _SVL documentation: https://www.svlsimulator.com/docs/python-api/python-api """ def __init__(self, sceneID, address="localhost", port=8181, alwaysReload=False): super().__init__() verbosePrint("Connecting to LGSVL Simulator...") self.client = lgsvl.Simulator(address=address, port=port) if alwaysReload or self.client.current_scene != sceneID: self.client.load(scene=sceneID) verbosePrint("Map loaded in simulator.") def createSimulation(self, scene, **kwargs): return LGSVLSimulation(scene, self.client, **kwargs)
[docs]class LGSVLSimulation(simulators.Simulation): """Subclass of `Simulation` for LGSVL.""" def __init__(self, scene, client, *, timestep, **kwargs): self.client = client self.usingApollo = False self.data = {} self.collisionOccurred = False if timestep is None: timestep = scene.params.get("time_step", 0.1) # backwards-compatibility super().__init__(scene, timestep=timestep, **kwargs) def setup(self): # Reset simulator (deletes all existing objects) self.client.reset() super().setup() def createObjectInSimulator(self, obj): if not hasattr(obj, "lgsvlObject"): return # not an LGSVL object # Figure out what type of LGSVL object this is if not hasattr(obj, "lgsvlName"): raise RuntimeError(f"object {obj} does not have an lgsvlName property") if not hasattr(obj, "lgsvlAgentType"): raise RuntimeError(f"object {obj} does not have an lgsvlAgentType property") name = obj.lgsvlName agentType = obj.lgsvlAgentType # Set up position and orientation state = lgsvl.AgentState() elevation = obj.elevation if elevation is None: elevation = self.groundElevationAt(obj.position) state.transform.position = utils.scenicToLGSVLPosition(obj.position, elevation) state.transform.rotation = utils.scenicToLGSVLRotation(obj.heading) # Create LGSVL object lgsvlObj = self.client.add_agent(name, agentType, state) obj.lgsvlObject = lgsvlObj # Initialize Data self.data[obj] = {} # Initialize Apollo if needed if getattr(obj, "apolloVehicle", None): self.initApolloFor(obj, lgsvlObj) def groundElevationAt(self, pos): origin = utils.scenicToLGSVLPosition(pos, 100000) result = self.client.raycast(origin, lgsvl.Vector(0, -1, 0), 1) if result is None: warnings.warn( f"no ground at position {pos}", simulators.SimulatorInterfaceWarning ) return 0 return result.point.y
[docs] def initApolloFor(self, obj, lgsvlObj): """Initialize Apollo for an ego vehicle. Uses LG's interface which injects packets into Dreamview. """ if self.usingApollo: raise RuntimeError("can only use one Apollo vehicle") self.usingApollo = True def on_collision(agent1, agent2, contact): if agent1 is not None and agent1.name == lgsvlObj.name: self.data[obj]["collision"] = True if agent2 is not None and agent2.name == lgsvlObj.name: self.data[obj]["collision"] = True if self.data[obj]["collision"]: self.collisionOccurred = True # Initialize Data self.data[obj]["collision"] = False lgsvlObj.on_collision(on_collision) # connect bridge from LGSVL to Apollo lgsvlObj.connect_bridge(obj.bridgeHost, obj.bridgePort) # set up connection and map/vehicle configuration from lgsvl import dreamview dv = dreamview.Connection(self.client, lgsvlObj) obj.dreamview = dv hdMap = self.scene.params["apolloHDMap"] dv.set_hd_map(hdMap) dv.set_vehicle(obj.apolloVehicle)
def executeActions(self, allActions): super().executeActions(allActions) # Apply state/control updates which were accumulated while executing the actions for obj in self.agents: if obj._stateUpdated: obj.lgsvlObject.state = obj.state obj._stateUpdated = False ctrl = getattr(obj, "_control", None) if ctrl is not None: obj.lgsvlObject.apply_control(ctrl, obj._stickyControl) obj._control = None def step(self): self.client.run(time_limit=self.timestep) def getProperties(self, obj, properties): lgsvlObj = obj.lgsvlObject state = lgsvlObj.state obj.state = state # cache state for subsequent updates velocity = utils.lgsvlToScenicPosition(state.velocity) speed = math.hypot(*velocity) values = dict( position=utils.lgsvlToScenicPosition(state.position), elevation=utils.lgsvlToScenicElevation(state.position), heading=utils.lgsvlToScenicRotation(state.rotation), velocity=velocity, speed=speed, angularSpeed=utils.lgsvlToScenicAngularSpeed(state.rotation), ) return values def destroy(self): self.client.reset()