Cross-Component Healing

In the Self-Healing with Fallbacks recipe, we learned how a component can heal itself (e.g., restarting or switching algorithms). But sophisticated autonomy requires more than self-repair – it requires system-level awareness, where components monitor each other and take corrective action.

In this recipe, we use Events to implement cross-component healing: one component detects a failure, and a different component executes the recovery.


Scenario A: The “Unstuck” Reflex

The Controller gets stuck in a local minimum (e.g., the robot is facing a corner). It reports an ALGORITHM_FAILURE because it cannot find a valid velocity command. We detect this status and ask the DriveManager to execute a blind “Unblock” maneuver – rotate in place or back up.

Tip

All component health status topics are accessible via component.status_topic.

Define the Event and Action

from kompass.ros import Event, Action, Topic
from sugar.msg import ComponentStatus

# Event: Controller reports algorithm failure
# keep_event_delay prevents re-triggering while recovery is in progress
event_controller_fail = Event(
    controller.status_topic.msg.status
    == ComponentStatus.STATUS_FAILURE_ALGORITHM_LEVEL,
    keep_event_delay=60.0
)

# Action: DriveManager executes a recovery maneuver
unblock_action = Action(method=driver.move_to_unblock)

The keep_event_delay=60.0 ensures the unblock action fires at most once per minute, giving the controller time to recover before trying again.


Scenario B: The “Blind Mode” Reflex

The LocalMapper crashes, failing to provide the high-fidelity local map that the Controller depends on. Instead of halting, the Controller reconfigures itself to use raw sensor data directly (reactive mode).

from kompass.actions import update_parameter

# Event: Mapper is NOT healthy
# handle_once=True means this fires only ONCE during the system's lifetime
event_mapper_fault = Event(
    mapper.status_topic.msg.status != ComponentStatus.STATUS_HEALTHY,
    handle_once=True
)

# Action: Reconfigure Controller to bypass the mapper
activate_direct_sensor_mode = update_parameter(
    component=controller,
    param_name="use_direct_sensor",
    new_value=True
)

Scenario C: Goal Handling via Events

In a production system, goals often arrive from external interfaces like RViz rather than being hardcoded. Events bridge the gap: we listen for clicked points and forward them to the Planner’s ActionServer.

Define the Goal Event

from kompass import event
from kompass.actions import ComponentActions

# Fire whenever a new PointStamped arrives on /clicked_point
event_clicked_point = event.OnGreater(
    "rviz_goal",
    Topic(name="/clicked_point", msg_type="PointStamped"),
    0,
    ["header", "stamp", "sec"],
)

Define the Goal Action with a Parser

The clicked point message needs to be converted into a PlanPath.Goal. We write a parser function and attach it to the action:

from kompass_interfaces.action import PlanPath
from kompass_interfaces.msg import PathTrackingError
from geometry_msgs.msg import Pose, PointStamped
from kompass.actions import LogInfo

# Create the action server goal action
send_goal = ComponentActions.send_action_goal(
    action_name="/planner/plan_path",
    action_type=PlanPath,
    action_request_msg=PlanPath.Goal(),
)

# Parse PointStamped into PlanPath.Goal
def goal_point_parser(*, msg: PointStamped, **_):
    action_request = PlanPath.Goal()
    goal = Pose()
    goal.position.x = msg.point.x
    goal.position.y = msg.point.y
    action_request.goal = goal
    end_tolerance = PathTrackingError()
    end_tolerance.orientation_error = 0.2
    end_tolerance.lateral_distance_error = 0.05
    action_request.end_tolerance = end_tolerance
    return action_request

send_goal.event_parser(goal_point_parser, output_mapping="action_request_msg")

Tip

ComponentActions.send_srv_request and ComponentActions.send_action_goal let you call any ROS 2 service or action server from an event – not just EMOS services.


Wiring Events to Actions

With all events and actions defined, we assemble the event-action dictionary. Each event maps to one or more actions:

events_actions = {
    # RViz click -> log + send goal to planner
    event_clicked_point: [LogInfo(msg="Got new goal point"), send_goal],
    # Controller stuck -> unblock maneuver
    event_controller_fail: unblock_action,
    # Mapper down -> switch controller to direct sensor mode
    event_mapper_fault: activate_direct_sensor_mode,
}

Complete Recipe

cross_component_healing.py
  1import numpy as np
  2import os
  3
  4from sugar.msg import ComponentStatus
  5from kompass_interfaces.action import PlanPath
  6from kompass_interfaces.msg import PathTrackingError
  7from geometry_msgs.msg import Pose, PointStamped
  8
  9from kompass import event
 10from kompass.actions import Action, ComponentActions, LogInfo, update_parameter
 11from kompass.components import (
 12    Controller, DriveManager, Planner, PlannerConfig, LocalMapper,
 13)
 14from kompass.config import RobotConfig
 15from kompass.robot import (
 16    AngularCtrlLimits, LinearCtrlLimits, RobotGeometry, RobotType,
 17)
 18from kompass.ros import Topic, Launcher, Event
 19
 20# --- Robot Configuration ---
 21my_robot = RobotConfig(
 22    model_type=RobotType.DIFFERENTIAL_DRIVE,
 23    geometry_type=RobotGeometry.Type.CYLINDER,
 24    geometry_params=np.array([0.1, 0.3]),
 25    ctrl_vx_limits=LinearCtrlLimits(max_vel=0.2, max_acc=1.5, max_decel=2.5),
 26    ctrl_omega_limits=AngularCtrlLimits(
 27        max_vel=0.4, max_acc=2.0, max_decel=2.0, max_steer=np.pi / 3
 28    ),
 29)
 30
 31# --- Components ---
 32planner = Planner(component_name="planner", config=PlannerConfig(loop_rate=1.0))
 33planner.run_type = "ActionServer"
 34
 35controller = Controller(component_name="controller")
 36controller.direct_sensor = False
 37
 38mapper = LocalMapper(component_name="mapper")
 39driver = DriveManager(component_name="drive_manager")
 40
 41if os.environ.get("ROS_DISTRO") in ["rolling", "jazzy", "kilted"]:
 42    cmd_msg_type = "TwistStamped"
 43else:
 44    cmd_msg_type = "Twist"
 45
 46driver.outputs(robot_command=Topic(name="/cmd_vel", msg_type=cmd_msg_type))
 47
 48# --- Cross-Component Events ---
 49# 1. Controller stuck -> DriveManager unblocks
 50event_controller_fail = Event(
 51    controller.status_topic.msg.status
 52    == ComponentStatus.STATUS_FAILURE_ALGORITHM_LEVEL,
 53    keep_event_delay=60.0
 54)
 55unblock_action = Action(method=driver.move_to_unblock)
 56
 57# 2. Mapper down -> Controller switches to direct sensor mode
 58event_mapper_fault = Event(
 59    mapper.status_topic.msg.status != ComponentStatus.STATUS_HEALTHY,
 60    handle_once=True
 61)
 62activate_direct_sensor_mode = update_parameter(
 63    component=controller, param_name="use_direct_sensor", new_value=True
 64)
 65
 66# 3. RViz click -> Planner goal
 67event_clicked_point = event.OnGreater(
 68    "rviz_goal",
 69    Topic(name="/clicked_point", msg_type="PointStamped"),
 70    0, ["header", "stamp", "sec"],
 71)
 72
 73send_goal = ComponentActions.send_action_goal(
 74    action_name="/planner/plan_path",
 75    action_type=PlanPath,
 76    action_request_msg=PlanPath.Goal(),
 77)
 78
 79def goal_point_parser(*, msg: PointStamped, **_):
 80    action_request = PlanPath.Goal()
 81    goal = Pose()
 82    goal.position.x = msg.point.x
 83    goal.position.y = msg.point.y
 84    action_request.goal = goal
 85    end_tolerance = PathTrackingError()
 86    end_tolerance.orientation_error = 0.2
 87    end_tolerance.lateral_distance_error = 0.05
 88    action_request.end_tolerance = end_tolerance
 89    return action_request
 90
 91send_goal.event_parser(goal_point_parser, output_mapping="action_request_msg")
 92
 93# --- Wire Events -> Actions ---
 94events_actions = {
 95    event_clicked_point: [LogInfo(msg="Got new goal point"), send_goal],
 96    event_controller_fail: unblock_action,
 97    event_mapper_fault: activate_direct_sensor_mode,
 98}
 99
100# --- Launch ---
101odom_topic = Topic(name="/odometry/filtered", msg_type="Odometry")
102
103launcher = Launcher()
104launcher.kompass(
105    components=[planner, controller, mapper, driver],
106    events_actions=events_actions,
107    activate_all_components_on_start=True,
108    multi_processing=True,
109)
110launcher.inputs(location=odom_topic)
111launcher.robot = my_robot
112launcher.bringup()