The MARLMapManager extends OpenCDA's map management with MARL-specific features for multi-agent scenarios, handling custom maps, junction-based spawn points, and visualization.
fromomegaconfimportDictConfigimportcarlafromopencda_marl.core.world.map_managerimportMARLMapManager# Initialize with detailed configconfig=DictConfig({"map":{"name":"intersection","safe_distance":5.0,# Base spacing for waypoints"spawn_offset":2.0,# Multiplier for upstream spawn distance"dest_offset":2.0,# Multiplier for downstream dest distance"spawn_z_lift":0.3,# Z lift to avoid ground collision"wp_step":1.0# Stepping granularity along lanes}})client=carla.Client("localhost",2000)map_manager=MARLMapManager(config,client)
# Uses hardcoded MAP_REGISTRY from opencda_marl.core.worldMAP_REGISTRY={"intersection":{"type":"marl","description":"4-way intersection for MARL training","xodr_path":"opencda_marl/assets/maps/intersection.xodr","fbx_path":"opencda_marl/assets/maps/intersection.fbx",# Optional"built_in":False# Auto-detected if available in CARLA},"Town05":{"type":"carla","description":"CARLA built-in town","built_in":True}}
classMARLMapManager:def__init__(self,config:DictConfig,client:carla.Client):self.config=config.get("map",{})self.client=clientself.world=Noneself.map=Noneself.map_registry=MAP_REGISTRY.copy()# Enhanced metadata with junction connectionsself._meta={"map_name":None,"map_type":None,"junctions":[],# Junction data with connections"spawn_points":[]# Generated from junction connections}# Automatic initializationself._verify_registry()self._initialize_map()
defload_map(self,map_name:str)->carla.World:"""Load a map for MARL training."""is_built_in=self.map_registry[map_name].get("built_in")ifis_built_in:# Load built-in CARLA mapworld=self.client.load_world(map_name)else:# Load custom XODR map using OpenCDA utilitiesxodr_path=self.map_registry[map_name].get("xodr_path")world=load_customized_world(xodr_path,self.client)self.world=worldself.map=world.get_map()# Update metadataself._meta["map_name"]=map_nameself._meta["map_type"]=self.map_registry[map_name].get("type")logger.success(f"✓ Map '{map_name}' loaded successfully")returnself.world
def_verify_registry(self):"""Verify and auto-detect maps in registry."""available_maps=self.client.get_available_maps()forname,infoinself.map_registry.items():# Validate required fieldsifnotinfo.get("type"):raiseValueError(f"Map {name} has no type")ifnotinfo.get("description"):raiseValueError(f"Map {name} has no description")# Verify custom map files existifnotinfo.get("built_in")andinfo.get("xodr_path"):ifnotos.path.exists(info["xodr_path"]):raiseFileNotFoundError(f"XODR file not found: {info['xodr_path']}")# Auto-detect CARLA built-in mapsifself._discover_carla_maps(name):self.map_registry[name]["built_in"]=Truelogger.success(f"✓ Registry ({len(self.map_registry)} maps) verification completed")
defregister_custom_map(self,name:str,xodr_path:str,**kwargs):"""Register a custom map at runtime."""fromopencda_marl.core.worldimportregister_mapregister_map(name,xodr_path,**kwargs)logger.info(f"Registered custom map: {name} from {xodr_path}")
def_get_junctions(self):"""Collect junctions with spawn/dest points from connections."""# Configuration parameterssafe_distance=float(self.config.get("safe_distance",5.0))spawn_offset=float(self.config.get("spawn_offset",2))# Upstream multiplierdest_offset=float(self.config.get("dest_offset",2))# Downstream multiplierz_lift=float(self.config.get("spawn_z_lift",0.3))# Avoid ground collisionstep_size=float(self.config.get("wp_step",1.0))# Lane steppingwaypoints=self.map.generate_waypoints(distance=safe_distance)seen=set()junctions=[]forwpinwaypoints:ifnotwp.is_junction:continuej=wp.get_junction()ifj.idinseen:continueseen.add(j.id)# Get junction properties...# Analyze junction connectionsconnections=j.get_waypoints(carla.LaneType.Driving)conn_info=[]for(entry_wp,exit_wp)inconnections:# Calculate spawn point upstream from entry# Calculate destination downstream from exit# Create transforms with Z lift# Snap to drivable lanes for safety...conn_info.append({"entry_wp":entry_wp,"exit_wp":exit_wp,...})junctions.append({"id":j.id,...})self._meta["junctions"]=junctions
def_marl_spawn_init(self):"""Generate spawn points from junction connections."""self._meta["spawn_points"]=[]forjinself._meta["junctions"]:fori,cinenumerate(j["connections"]):self._meta["spawn_points"].append({"id":f"j{j['id']}_conn{i}",...})
defget_spawn_points(self,num:int=None,dest:bool=False,detail:bool=False):"""Get spawn points with optional destinations."""spawn_points=self._meta["spawn_points"]ifnumisnotNone:num=min(num,len(spawn_points))assertnum>=0,"num must be greater than or equal to 0"spawn_points=spawn_points[:num]ifdetail:# Return full spawn point data with IDsreturnspawn_pointselse:ifdest:# Return (transform, destination) tuplesreturn[(sp["transform"],sp["dest"])forspinspawn_points]else:# Return (transform, None) tuples for compatibilityreturn[(sp["transform"],None)forspinspawn_points]
defget_random_spawn_points(self,num:int=1,detail:bool=False):"""Get random spawn points for agents."""available=self.get_spawn_points(detail=detail)iflen(available)>=num:returnrandom.sample(available,num)else:# Allow repetition if not enough unique pointsreturn[random.choice(available)for_inrange(num)]
defget_available_maps(self):"""Get dictionary of all available maps from registry."""returnself.map_registrydeflist_maps(self):"""Print formatted list of available maps."""print("\nAvailable MARL Maps (from Registry):")print("="*120)print(f"{'Name':15} | {'Type':12} | {'Built-in':8} | {'FBX':8} | {'Description'}")print("-"*120)forname,infoinself.map_registry.items():has_fbx="✓"ifinfo.get("fbx_path")andos.path.exists(info["fbx_path"])else"✗"built_in="✓"ifinfo.get("built_in")else"✗"print(f"{name:15} | {info['type']:12} | {built_in:^8} | {has_fbx:^8} | {info['description']}")
# configs/marl_training.yamlmap:name:"intersection"# Map name from registrysafe_distance:5.0# Base spacing for waypoint generation (m)spawn_offset:2.0# Upstream spawn distance multiplierdest_offset:2.0# Downstream destination multiplier spawn_z_lift:0.3# Z elevation to avoid ground collision (m)wp_step:1.0# Lane stepping granularity (m)# Map registry configurationMAP_REGISTRY = {"intersection":{"type":"marl","description":"4-wayintersectionwithjunction-basedspawns","xodr_path":"opencda_marl/assets/maps/intersection.xodr","fbx_path":"opencda_marl/assets/maps/intersection.fbx","built_in":False}}
fromopencda.scenario_testing.utils.sim_apiimportScenarioManagerfromopencda_marl.core.world.map_managerimportMARLMapManager# Initialize MARL map manager with advanced configconfig=DictConfig({"map":{"name":"Town05","safe_distance":5.0,"spawn_offset":2.0,"dest_offset":2.0}})map_manager=MARLMapManager(config,client)# Get spawn points with destinations for MARL agentsspawn_dest_pairs=map_manager.get_spawn_points(num=4,dest=True)# Create vehicles using OpenCDA with destination goalsscenario_manager=ScenarioManager(scenario_params)vehicles=[]fori,(spawn_tf,dest_tf)inenumerate(spawn_dest_pairs):vehicle=scenario_manager.create_single_cav(spawn_transform=spawn_tf,config_override={"agent_id":f"agent_{i}","destination":dest_tf})vehicles.append(vehicle)