Skip to content

V2XManager API

V2XManager enables Vehicle-to-Everything (V2X) communication for cooperative autonomous driving through information sharing and coordination between CAVs.

Component Purpose Configuration
communication_range V2X range limit Distance-based filtering
message_passing Info sharing Position, velocity, intentions
noise_simulation Realistic errors Position noise, timing delays
platoon_support Coordination Leader-follower communication

Communication Setup

What it does: Configures V2X communication with realistic range limits and noise models

def __init__(self, cav_world, config_yaml, vehicle_id):
    """
    Initialize V2X communication manager

    Args:
        cav_world: Global CAV registry
        config_yaml: V2X configuration parameters
        vehicle_id: Unique vehicle identifier
    """
    # Communication parameters
    self.communication_range = config_yaml.get('communication_range', 35.0)  # meters
    self.loc_noise = config_yaml.get('loc_noise', 0.0)  # position noise
    self.time_delay = config_yaml.get('time_delay', 0.1)  # latency (seconds)

    # Vehicle identification
    self.vid = vehicle_id
    self.cav_world = cav_world

    # Communication state
    self.ego_pos = None
    self.ego_speed = None
    self.neighbors = {}  # Nearby CAVs
    self.platoon_id = None
v2x:
  communication_range: 35.0    # V2X range in meters
  loc_noise: 0.1              # Position uncertainty (meters)
  time_delay: 0.1             # Communication latency (seconds)
  message_loss_rate: 0.05     # Packet loss probability
  bandwidth_limit: 10.0       # Mbps capacity limit

  # Platoon-specific settings
  platoon:
    leader_range: 50.0        # Extended range for platoon leader
    update_frequency: 10      # Hz for platoon coordination
def apply_communication_noise(self, position, velocity):
    """
    Simulate realistic V2X communication errors
    """
    # Position noise (GPS/localization uncertainty)
    if self.loc_noise > 0:
        noise_x = np.random.normal(0, self.loc_noise)
        noise_y = np.random.normal(0, self.loc_noise)
        position = carla.Location(
            x=position.x + noise_x,
            y=position.y + noise_y,
            z=position.z
        )

    # Velocity noise
    if self.loc_noise > 0:
        vel_noise = np.random.normal(0, self.loc_noise * 0.1)
        velocity = carla.Vector3D(
            x=velocity.x + vel_noise,
            y=velocity.y + vel_noise,
            z=velocity.z
        )

    return position, velocity

Benefits: Realistic communication simulation, configurable range and noise

Message Broadcasting

What it does: Handles information sharing between vehicles within communication range

def update_info(self, ego_pos, ego_speed):
    """
    Update vehicle state and broadcast to neighbors

    Args:
        ego_pos: carla.Transform current position
        ego_speed: carla.Vector3D current velocity
    """
    # Apply communication noise
    noisy_pos, noisy_speed = self.apply_communication_noise(
        ego_pos.location, ego_speed
    )

    # Update local state
    self.ego_pos = carla.Transform(
        location=noisy_pos,
        rotation=ego_pos.rotation
    )
    self.ego_speed = noisy_speed

    # Broadcast to nearby vehicles
    self.broadcast_state()

    # Update neighbor information
    self.update_neighbors()
def search(self):
    """
    Discover nearby CAVs within communication range

    Returns:
        dict: {vehicle_id: distance} for neighbors
    """
    neighbors = {}

    # Get all vehicle managers from CAV world
    all_vehicles = self.cav_world.get_all_vehicle_managers()

    for vid, vehicle_manager in all_vehicles.items():
        if vid == self.vid:
            continue  # Skip self

        # Calculate distance
        other_pos = vehicle_manager.localizer.get_ego_pos()
        distance = self._calculate_distance(self.ego_pos, other_pos)

        # Check if within communication range
        if distance <= self.communication_range:
            neighbors[vid] = {
                'distance': distance,
                'position': other_pos,
                'velocity': vehicle_manager.localizer.get_ego_speed(),
                'last_update': time.time()
            }

    self.neighbors = neighbors
    return neighbors
class V2XMessage:
    def __init__(self, sender_id, message_type, content, timestamp):
        self.sender_id = sender_id
        self.message_type = message_type  # 'position', 'intention', 'emergency'
        self.content = content
        self.timestamp = timestamp
        self.received_time = None

def broadcast_state(self):
    """Broadcast current vehicle state"""
    message = V2XMessage(
        sender_id=self.vid,
        message_type='position',
        content={
            'position': self.ego_pos,
            'velocity': self.ego_speed,
            'heading': self.ego_pos.rotation.yaw,
            'acceleration': self.get_acceleration()
        },
        timestamp=time.time()
    )

    # Send to all neighbors
    for neighbor_id in self.neighbors.keys():
        self.send_message(neighbor_id, message)

def broadcast_intention(self, intention):
    """Broadcast driving intention (lane change, merging, etc.)"""
    message = V2XMessage(
        sender_id=self.vid,
        message_type='intention',
        content={
            'intention': intention,  # 'lane_change_left', 'merging', etc.
            'target_location': self.get_target_location(),
            'confidence': 0.9
        },
        timestamp=time.time()
    )

    self.broadcast_to_neighbors(message)

Features: State broadcasting, neighbor discovery, intention sharing

Platoon Communication

What it does: Specialized communication protocols for platoon coordination

def join_platoon(self, platoon_id, role='follower'):
    """
    Join a vehicle platoon

    Args:
        platoon_id: str unique platoon identifier
        role: 'leader' or 'follower'
    """
    self.platoon_id = platoon_id
    self.platoon_role = role

    # Extended communication range for platoons
    if role == 'leader':
        self.communication_range = max(
            self.communication_range, 50.0
        )

    # Register with platoon
    platoon_message = V2XMessage(
        sender_id=self.vid,
        message_type='platoon_join',
        content={
            'platoon_id': platoon_id,
            'role': role,
            'capabilities': self.get_vehicle_capabilities()
        },
        timestamp=time.time()
    )

    self.broadcast_to_platoon(platoon_message)

def in_platoon(self):
    """Check if vehicle is in an active platoon"""
    return self.platoon_id is not None
def update_platoon_coordination(self):
    """
    Handle platoon-specific communication
    """
    if not self.in_platoon():
        return

    if self.platoon_role == 'leader':
        # Broadcast platoon commands
        platoon_command = {
            'target_speed': self.get_target_speed(),
            'formation': self.get_desired_formation(),
            'next_maneuver': self.get_planned_maneuver()
        }

        message = V2XMessage(
            sender_id=self.vid,
            message_type='platoon_command',
            content=platoon_command,
            timestamp=time.time()
        )

        self.broadcast_to_platoon(message)

    else:  # follower
        # Send status to leader
        follower_status = {
            'following_distance': self.get_following_distance(),
            'readiness': self.assess_maneuver_readiness(),
            'health_status': self.get_system_health()
        }

        message = V2XMessage(
            sender_id=self.vid,
            message_type='platoon_status',
            content=follower_status,
            timestamp=time.time()
        )

        self.send_to_platoon_leader(message)
def execute_coordinated_lane_change(self, direction):
    """
    Coordinate lane change across entire platoon
    """
    if self.platoon_role == 'leader':
        # Initiate coordinated maneuver
        maneuver_plan = {
            'maneuver_type': 'lane_change',
            'direction': direction,
            'execution_time': time.time() + 3.0,  # 3 second prep
            'sequence': self.calculate_lane_change_sequence()
        }

        message = V2XMessage(
            sender_id=self.vid,
            message_type='coordinated_maneuver',
            content=maneuver_plan,
            timestamp=time.time()
        )

        self.broadcast_to_platoon(message)
        return True

    return False  # Only leader can initiate

Features: Leader-follower protocols, coordinated maneuvers, formation control

Integration Examples

Usage Patterns

Common ways to use V2XManager:

# Create V2X manager
v2x_manager = V2XManager(
    cav_world=cav_world,
    config_yaml=config['v2x'],
    vehicle_id=vehicle_id
)

# In simulation loop
while True:
    # Update vehicle state
    v2x_manager.update_info(ego_pos, ego_speed)

    # Discover neighbors
    neighbors = v2x_manager.search()

    # Use neighbor information for cooperative planning
    if neighbors:
        cooperative_plan = plan_with_neighbors(neighbors)
    else:
        cooperative_plan = plan_independently()
# Join platoon as follower
v2x_manager.join_platoon('platoon_001', role='follower')

# Coordination loop
while v2x_manager.in_platoon():
    # Update platoon communication
    v2x_manager.update_platoon_coordination()

    # Get platoon commands
    if v2x_manager.has_platoon_command():
        command = v2x_manager.get_latest_platoon_command()
        target_speed = command['target_speed']
        formation = command['formation']

        # Execute platoon behavior
        behavior_agent.execute_platoon_behavior(target_speed, formation)
# Share perception data via V2X
class CooperativePerceptionManager(PerceptionManager):
    def __init__(self, vehicle, config, ml_manager, v2x_manager):
        super().__init__(vehicle, config, ml_manager)
        self.v2x_manager = v2x_manager

    def detect_with_cooperation(self, ego_pos):
        # Local detection
        local_objects = super().detect(ego_pos)

        # Get shared detections from neighbors
        neighbors = self.v2x_manager.get_neighbors()
        shared_objects = []

        for neighbor_id, neighbor_info in neighbors.items():
            if neighbor_info.get('shared_detections'):
                shared_objects.extend(neighbor_info['shared_detections'])

        # Fuse local and shared detections
        fused_objects = self.fuse_detections(local_objects, shared_objects)

        # Share local detections
        self.v2x_manager.broadcast_detections(local_objects)

        return fused_objects

Related: