Context-Aware Actions

In previous recipes, our actions used static arguments – pre-defined at configuration time. For example, in Self-Healing with Fallbacks, we defined Action(method=controller.set_algorithm, args=(ControllersID.PURE_PURSUIT,)) where the target algorithm is hardcoded.

But what if the action depends on what the robot is seeing, or where it was told to go? Real-world autonomy requires dynamic data injection – action arguments fetched from the system at the time of execution.


The Concept: Static vs Dynamic

Type

Argument Set At

Example

Static

Configuration time

args=(ControllersID.PURE_PURSUIT,)

Dynamic

Event firing time

args=(command_topic.msg.data,)

With dynamic injection, you pass a topic message field as an argument. EMOS resolves the actual value when the event fires, not when the recipe is written.



Intelligence Example: Dynamic Prompt Injection

The same pattern works for the intelligence layer. Consider a Vision component that detects objects, and a VLM that should describe whatever was detected – not just “person”:

from agents.ros import Topic, Event, Action, FixedInput
from agents.components import Vision, VLM

# Vision outputs
detections = Topic(name="/detections", msg_type="Detections")
camera_image = Topic(name="/image_raw", msg_type="Image")

# Event: any object detected
event_object_detected = Event(
    detections.msg.labels.length() > 0,
    on_change=True,
    keep_event_delay=5
)

# Dynamic prompt: inject the detected label into the VLM query
def describe_detected_object(label: str):
    """Called with the actual detected label at event time."""
    return f"A {label} has been detected. Describe what you see."

action_describe = Action(
    method=describe_detected_object,
    args=(detections.msg.labels[0],)  # First detected label, resolved dynamically
)

Complete Navigation Recipe

Launch this script, then publish a string to /user_command (e.g., ros2 topic pub /user_command std_msgs/String "data: kitchen" --once) to see the robot navigate.

semantic_navigation.py
 1import os
 2import subprocess
 3import numpy as np
 4
 5from sugar.msg import ComponentStatus
 6from kompass.components import (
 7    Controller, DriveManager, Planner, PlannerConfig, LocalMapper,
 8)
 9from kompass.config import RobotConfig
10from kompass.control import ControllersID
11from kompass.robot import (
12    AngularCtrlLimits, LinearCtrlLimits, RobotGeometry, RobotType,
13)
14from kompass.ros import Topic, Launcher, Action, Event
15
16# --- Waypoint Database ---
17WAYPOINTS = {
18    "kitchen":   {"x": 2.0, "y": 0.5},
19    "reception": {"x": 0.0, "y": 0.0},
20    "station_a": {"x": -1.5, "y": 2.0},
21}
22
23def navigate_to_location(location_name: str):
24    key = location_name.strip().lower()
25    if key not in WAYPOINTS:
26        print(f"Unknown location: {key}")
27        return
28    coords = WAYPOINTS[key]
29    topic_cmd = (
30        f"ros2 topic pub --once /clicked_point geometry_msgs/msg/PointStamped "
31        f"'{{header: {{frame_id: \"map\"}}, point: {{x: {coords['x']}, y: {coords['y']}, z: 0.0}}}}'"
32    )
33    subprocess.run(topic_cmd, shell=True)
34
35# --- Command Topic ---
36command_topic = Topic(name="/user_command", msg_type="String")
37
38# --- Robot Configuration ---
39my_robot = RobotConfig(
40    model_type=RobotType.DIFFERENTIAL_DRIVE,
41    geometry_type=RobotGeometry.Type.CYLINDER,
42    geometry_params=np.array([0.1, 0.3]),
43    ctrl_vx_limits=LinearCtrlLimits(max_vel=0.2, max_acc=1.5, max_decel=2.5),
44    ctrl_omega_limits=AngularCtrlLimits(
45        max_vel=0.4, max_acc=2.0, max_decel=2.0, max_steer=np.pi / 3
46    ),
47)
48
49# --- Components ---
50planner = Planner(component_name="planner", config=PlannerConfig(loop_rate=1.0))
51goal_topic = Topic(name="/clicked_point", msg_type="PointStamped")
52planner.inputs(goal_point=goal_topic)
53
54controller = Controller(component_name="controller")
55controller.direct_sensor = False
56controller.algorithm = ControllersID.DWA
57
58mapper = LocalMapper(component_name="mapper")
59driver = DriveManager(component_name="drive_manager")
60
61if os.environ.get("ROS_DISTRO") in ["rolling", "jazzy", "kilted"]:
62    cmd_msg_type = "TwistStamped"
63else:
64    cmd_msg_type = "Twist"
65driver.outputs(robot_command=Topic(name="/cmd_vel", msg_type=cmd_msg_type))
66
67# --- Context-Aware Event & Action ---
68event_command_received = Event(
69    event_condition=(
70        command_topic
71        & (mapper.status_topic.msg.status == ComponentStatus.STATUS_HEALTHY)
72    ),
73)
74
75action_process_command = Action(
76    method=navigate_to_location,
77    args=(command_topic.msg.data,)  # Dynamic injection
78)
79
80events_actions = {
81    event_command_received: action_process_command,
82}
83
84# --- Launch ---
85launcher = Launcher()
86launcher.kompass(
87    components=[planner, controller, driver, mapper],
88    activate_all_components_on_start=True,
89    multi_processing=True,
90    events_actions=events_actions,
91)
92
93odom_topic = Topic(name="/odometry/filtered", msg_type="Odometry")
94launcher.inputs(location=odom_topic)
95launcher.robot = my_robot
96launcher.bringup()