Source code for scenic.simulators.webots.simulator

"""Interface to Webots for dynamic simulations.

This interface is intended to be instantiated from inside the controller script
of a Webots `Robot node`_ with the ``supervisor`` field set to true. Such a
script can create a `WebotsSimulator` (passing in a reference to the supervisor
node) and then call its `simulate` method as usual to run a simulation. For an
example, see :file:`examples/webots/generic/controllers/scenic_supervisor.py`.

Scenarios written for this interface should use our generic Webots world model
:doc:`scenic.simulators.webots.model` or a model derived from it. Objects which
are instances of `WebotsObject` will be matched to Webots nodes; see the model
documentation for details.

.. _Robot node: https://www.cyberbotics.com/doc/reference/robot
"""

from collections import defaultdict
import ctypes
import math
from os import path
import tempfile
from textwrap import dedent

import trimesh

from scenic.core.regions import MeshVolumeRegion
from scenic.core.simulators import Simulation, Simulator
from scenic.core.type_support import toOrientation
from scenic.core.vectors import Vector
from scenic.simulators.webots.utils import ENU, WebotsCoordinateSystem


[docs]class WebotsSimulator(Simulator): """`Simulator` object for Webots. Args: supervisor: Supervisor node handle from the Webots Python API. """ def __init__(self, supervisor): super().__init__() self.supervisor = supervisor topLevelNodes = supervisor.getRoot().getField("children") worldInfo = None for i in range(topLevelNodes.getCount()): child = topLevelNodes.getMFNode(i) if child.getTypeName() == "WorldInfo": worldInfo = child break if not worldInfo: raise RuntimeError("Webots world does not contain a WorldInfo node") system = worldInfo.getField("coordinateSystem").getSFString() self.coordinateSystem = WebotsCoordinateSystem(system) def createSimulation(self, scene, **kwargs): return WebotsSimulation( scene, self.supervisor, coordinateSystem=self.coordinateSystem, **kwargs )
[docs]class WebotsSimulation(Simulation): """`Simulation` object for Webots. Attributes: supervisor: Webots supervisor node used for the simulation. This is exposed for the use of scenarios which need to call Webots APIs directly; e.g. :scenic:`simulation().supervisor.setLabel({...})`. """ def __init__(self, scene, supervisor, coordinateSystem=ENU, *, timestep, **kwargs): self.supervisor = supervisor self.coordinateSystem = coordinateSystem self.mode2D = scene.compileOptions.mode2D self.nextAdHocObjectId = 1 self.usedObjectNames = defaultdict(lambda: 0) # directory to store proto files for adhoc webots objects self.tmpMeshDir = tempfile.mkdtemp() timestep = supervisor.getBasicTimeStep() / 1000 if timestep is None else timestep super().__init__(scene, timestep=timestep, **kwargs) def setup(self): super().setup() # Reset Webots simulation self.supervisor.simulationResetPhysics() def createObjectInSimulator(self, obj): if not hasattr(obj, "webotsName"): return # not a Webots object # Find the name of the Webots node for this object. name = None if obj.webotsAdhoc is not None: # Dynamically generate object from Scenic object objectRawMesh = obj.shape.mesh objectScaledMesh = MeshVolumeRegion( mesh=objectRawMesh, dimensions=(obj.width, obj.length, obj.height), ).mesh objFilePath = path.join(self.tmpMeshDir, f"{self.nextAdHocObjectId}.obj") trimesh.exchange.export.export_mesh(objectScaledMesh, objFilePath) name = self._getAdhocObjectName(self.nextAdHocObjectId) protoName = ( "ScenicObjectWithPhysics" if isPhysicsEnabled(obj) else "ScenicObject" ) protoDef = dedent( f""" DEF {name} {protoName} {{ url "{objFilePath}" }} """ ) rootNode = self.supervisor.getRoot() rootChildrenField = rootNode.getField("children") rootChildrenField.importMFNodeFromString(-1, protoDef) self.nextAdHocObjectId += 1 else: if obj.webotsName: name = obj.webotsName else: ty = obj.webotsType if not ty: raise RuntimeError(f"object {obj} has no webotsName or webotsType") nextID = self.usedObjectNames[ty] self.usedObjectNames[ty] += 1 if nextID == 0 and self.supervisor.getFromDef(ty): name = ty else: name = f"{ty}_{nextID}" # Get handle to Webots node. webotsObj = self.supervisor.getFromDef(name) if webotsObj is None: raise SimulationCreationError(f"Webots object {name} does not exist in world") obj.webotsObject = webotsObj obj.webotsName = name # Set the fields of the Webots object: # position if self.mode2D: # 2D compatibility mode # Set initial elevation if unspecified if obj.elevation is None: pos = webotsObj.getField("translation").getSFVec3f() spos = self.coordinateSystem.positionToScenic(pos) obj.elevation = spos[2] # Overwrite Z value with elevation pos = self.coordinateSystem.positionFromScenic( Vector(obj.position.x, obj.position.y, obj.elevation) + obj.positionOffset ) webotsObj.getField("translation").setSFVec3f(pos) else: pos = self.coordinateSystem.positionFromScenic( obj.position + obj.positionOffset ) webotsObj.getField("translation").setSFVec3f(pos) # orientation offsetOrientation = toOrientation(obj.rotationOffset) webotsObj.getField("rotation").setSFRotation( self.coordinateSystem.orientationFromScenic( obj.orientation, offsetOrientation ) ) # density densityField = getFieldSafe(webotsObj, "density") if densityField is not None: if obj.density is None: # Get initial value for property if unspecified obj.density = densityField.getSFFloat() else: densityField.setSFFloat(float(obj.density)) # battery battery = getattr(obj, "battery", None) if battery: if not isinstance(battery, (tuple, list)) or len(battery) != 3: raise TypeError(f'"battery" of {name} does not have 3 components') field = webotsObj.getField("battery") field.setMFFloat(0, battery[0]) field.setMFFloat(1, battery[1]) field.setMFFloat(2, battery[2]) # customData customData = getattr(obj, "customData", None) if customData: if not isinstance(customData, str): raise TypeError(f'"customData" of {name} is not a string') webotsObj.getField("customData").setSFString(customData) # controller if obj.controller: controllerField = webotsObj.getField("controller") curCont = controllerField.getSFString() if obj.controller != curCont: # the following operation also causes the controller to be restarted controllerField.setSFString(obj.controller) elif obj.resetController: webotsObj.restartController() def step(self): ms = round(1000 * self.timestep) self.supervisor.step(ms) def getProperties(self, obj, properties): webotsObj = getattr(obj, "webotsObject", None) if not webotsObj: # static object with no Webots counterpart return {prop: getattr(obj, prop) for prop in properties} pos = webotsObj.getField("translation").getSFVec3f() x, y, z = self.coordinateSystem.positionToScenic(pos) lx, ly, lz, ax, ay, az = webotsObj.getVelocity() vx, vy, vz = self.coordinateSystem.positionToScenic((lx, ly, lz)) velocity = Vector(vx, vy, vz) speed = math.hypot(*velocity) angularSpeed = math.hypot(ax, ay, az) offsetOrientation = toOrientation(obj.rotationOffset) globalOrientation = self.coordinateSystem.orientationToScenic( webotsObj.getField("rotation").getSFRotation(), offsetOrientation ) yaw, pitch, roll = obj.parentOrientation.localAnglesFor(globalOrientation) values = dict( position=Vector(x, y, z), velocity=velocity, speed=speed, angularSpeed=angularSpeed, angularVelocity=Vector(ax, ay, az), yaw=yaw, pitch=pitch, roll=roll, elevation=z, ) if hasattr(obj, "battery"): field = webotsObj.getField("battery") val = (field.getMFFloat(0), obj.battery[1], obj.battery[2]) values["battery"] = val return values def destroy(self): # Destroy adhoc objects generated at the beginning of the simulation for i in range(1, self.nextAdHocObjectId): name = self._getAdhocObjectName(i) node = self.supervisor.getFromDef(name) node.remove() def _getAdhocObjectName(self, i: int) -> str: return f"SCENIC_ADHOC_{i}"
[docs]def getFieldSafe(webotsObject, fieldName): """Get field from webots object. Return null if no such field exists. Needed to workaround this issue (https://github.com/cyberbotics/webots/issues/5646) Args: webotsObject: webots object fieldName: name of the field to look for Returns: Field|None: Field object if the field with the given name exists. None otherwise. """ field = webotsObject.getField(fieldName) # this seems to always return some object, but return None if field is None if field is None: return None # if field is valid, it has a valid pointer if isinstance(field._ref, ctypes.c_void_p) and field._ref.value is not None: # then the field is valid and we return the reference return field # if the pointer points to None, then the field does not exist on this object return None
[docs]def isPhysicsEnabled(webotsObject): """Whether or not physics is enabled for this `WebotsObject`""" if webotsObject.webotsAdhoc is None: return webotsObject if isinstance(webotsObject.webotsAdhoc, dict): return webotsObject.webotsAdhoc.get("physics", True) raise TypeError(f"webotsAdhoc must be None or a dictionary")