Cross-Component Healing¶
In the Self-Healing with Fallbacks recipe, we learned how a component can heal itself (e.g., restarting or switching algorithms). But sophisticated autonomy requires more than self-repair – it requires system-level awareness, where components monitor each other and take corrective action.
In this recipe, we use Events to implement cross-component healing: one component detects a failure, and a different component executes the recovery.
Scenario A: The “Unstuck” Reflex¶
The Controller gets stuck in a local minimum (e.g., the robot is facing a corner). It reports an ALGORITHM_FAILURE because it cannot find a valid velocity command. We detect this status and ask the DriveManager to execute a blind “Unblock” maneuver – rotate in place or back up.
Tip
All component health status topics are accessible via component.status_topic.
Define the Event and Action¶
from kompass.ros import Event, Action, Topic
from sugar.msg import ComponentStatus
# Event: Controller reports algorithm failure
# keep_event_delay prevents re-triggering while recovery is in progress
event_controller_fail = Event(
controller.status_topic.msg.status
== ComponentStatus.STATUS_FAILURE_ALGORITHM_LEVEL,
keep_event_delay=60.0
)
# Action: DriveManager executes a recovery maneuver
unblock_action = Action(method=driver.move_to_unblock)
The keep_event_delay=60.0 ensures the unblock action fires at most once per minute, giving the controller time to recover before trying again.
Scenario B: The “Blind Mode” Reflex¶
The LocalMapper crashes, failing to provide the high-fidelity local map that the Controller depends on. Instead of halting, the Controller reconfigures itself to use raw sensor data directly (reactive mode).
from kompass.actions import update_parameter
# Event: Mapper is NOT healthy
# handle_once=True means this fires only ONCE during the system's lifetime
event_mapper_fault = Event(
mapper.status_topic.msg.status != ComponentStatus.STATUS_HEALTHY,
handle_once=True
)
# Action: Reconfigure Controller to bypass the mapper
activate_direct_sensor_mode = update_parameter(
component=controller,
param_name="use_direct_sensor",
new_value=True
)
Scenario C: Goal Handling via Events¶
In a production system, goals often arrive from external interfaces like RViz rather than being hardcoded. Events bridge the gap: we listen for clicked points and forward them to the Planner’s ActionServer.
Define the Goal Event¶
from kompass import event
from kompass.actions import ComponentActions
# Fire whenever a new PointStamped arrives on /clicked_point
event_clicked_point = event.OnGreater(
"rviz_goal",
Topic(name="/clicked_point", msg_type="PointStamped"),
0,
["header", "stamp", "sec"],
)
Define the Goal Action with a Parser¶
The clicked point message needs to be converted into a PlanPath.Goal. We write a parser function and attach it to the action:
from kompass_interfaces.action import PlanPath
from kompass_interfaces.msg import PathTrackingError
from geometry_msgs.msg import Pose, PointStamped
from kompass.actions import LogInfo
# Create the action server goal action
send_goal = ComponentActions.send_action_goal(
action_name="/planner/plan_path",
action_type=PlanPath,
action_request_msg=PlanPath.Goal(),
)
# Parse PointStamped into PlanPath.Goal
def goal_point_parser(*, msg: PointStamped, **_):
action_request = PlanPath.Goal()
goal = Pose()
goal.position.x = msg.point.x
goal.position.y = msg.point.y
action_request.goal = goal
end_tolerance = PathTrackingError()
end_tolerance.orientation_error = 0.2
end_tolerance.lateral_distance_error = 0.05
action_request.end_tolerance = end_tolerance
return action_request
send_goal.event_parser(goal_point_parser, output_mapping="action_request_msg")
Tip
ComponentActions.send_srv_request and ComponentActions.send_action_goal let you call any ROS 2 service or action server from an event – not just EMOS services.
Wiring Events to Actions¶
With all events and actions defined, we assemble the event-action dictionary. Each event maps to one or more actions:
events_actions = {
# RViz click -> log + send goal to planner
event_clicked_point: [LogInfo(msg="Got new goal point"), send_goal],
# Controller stuck -> unblock maneuver
event_controller_fail: unblock_action,
# Mapper down -> switch controller to direct sensor mode
event_mapper_fault: activate_direct_sensor_mode,
}
Complete Recipe¶
1import numpy as np
2import os
3
4from sugar.msg import ComponentStatus
5from kompass_interfaces.action import PlanPath
6from kompass_interfaces.msg import PathTrackingError
7from geometry_msgs.msg import Pose, PointStamped
8
9from kompass import event
10from kompass.actions import Action, ComponentActions, LogInfo, update_parameter
11from kompass.components import (
12 Controller, DriveManager, Planner, PlannerConfig, LocalMapper,
13)
14from kompass.config import RobotConfig
15from kompass.robot import (
16 AngularCtrlLimits, LinearCtrlLimits, RobotGeometry, RobotType,
17)
18from kompass.ros import Topic, Launcher, Event
19
20# --- Robot Configuration ---
21my_robot = RobotConfig(
22 model_type=RobotType.DIFFERENTIAL_DRIVE,
23 geometry_type=RobotGeometry.Type.CYLINDER,
24 geometry_params=np.array([0.1, 0.3]),
25 ctrl_vx_limits=LinearCtrlLimits(max_vel=0.2, max_acc=1.5, max_decel=2.5),
26 ctrl_omega_limits=AngularCtrlLimits(
27 max_vel=0.4, max_acc=2.0, max_decel=2.0, max_steer=np.pi / 3
28 ),
29)
30
31# --- Components ---
32planner = Planner(component_name="planner", config=PlannerConfig(loop_rate=1.0))
33planner.run_type = "ActionServer"
34
35controller = Controller(component_name="controller")
36controller.direct_sensor = False
37
38mapper = LocalMapper(component_name="mapper")
39driver = DriveManager(component_name="drive_manager")
40
41if os.environ.get("ROS_DISTRO") in ["rolling", "jazzy", "kilted"]:
42 cmd_msg_type = "TwistStamped"
43else:
44 cmd_msg_type = "Twist"
45
46driver.outputs(robot_command=Topic(name="/cmd_vel", msg_type=cmd_msg_type))
47
48# --- Cross-Component Events ---
49# 1. Controller stuck -> DriveManager unblocks
50event_controller_fail = Event(
51 controller.status_topic.msg.status
52 == ComponentStatus.STATUS_FAILURE_ALGORITHM_LEVEL,
53 keep_event_delay=60.0
54)
55unblock_action = Action(method=driver.move_to_unblock)
56
57# 2. Mapper down -> Controller switches to direct sensor mode
58event_mapper_fault = Event(
59 mapper.status_topic.msg.status != ComponentStatus.STATUS_HEALTHY,
60 handle_once=True
61)
62activate_direct_sensor_mode = update_parameter(
63 component=controller, param_name="use_direct_sensor", new_value=True
64)
65
66# 3. RViz click -> Planner goal
67event_clicked_point = event.OnGreater(
68 "rviz_goal",
69 Topic(name="/clicked_point", msg_type="PointStamped"),
70 0, ["header", "stamp", "sec"],
71)
72
73send_goal = ComponentActions.send_action_goal(
74 action_name="/planner/plan_path",
75 action_type=PlanPath,
76 action_request_msg=PlanPath.Goal(),
77)
78
79def goal_point_parser(*, msg: PointStamped, **_):
80 action_request = PlanPath.Goal()
81 goal = Pose()
82 goal.position.x = msg.point.x
83 goal.position.y = msg.point.y
84 action_request.goal = goal
85 end_tolerance = PathTrackingError()
86 end_tolerance.orientation_error = 0.2
87 end_tolerance.lateral_distance_error = 0.05
88 action_request.end_tolerance = end_tolerance
89 return action_request
90
91send_goal.event_parser(goal_point_parser, output_mapping="action_request_msg")
92
93# --- Wire Events -> Actions ---
94events_actions = {
95 event_clicked_point: [LogInfo(msg="Got new goal point"), send_goal],
96 event_controller_fail: unblock_action,
97 event_mapper_fault: activate_direct_sensor_mode,
98}
99
100# --- Launch ---
101odom_topic = Topic(name="/odometry/filtered", msg_type="Odometry")
102
103launcher = Launcher()
104launcher.kompass(
105 components=[planner, controller, mapper, driver],
106 events_actions=events_actions,
107 activate_all_components_on_start=True,
108 multi_processing=True,
109)
110launcher.inputs(location=odom_topic)
111launcher.robot = my_robot
112launcher.bringup()