Events & Actions

Dynamic behavior switching based on real-time environmental context.

EMOS’s Event-Driven architecture enables dynamic behavior switching based on real-time environmental context. This allows robots to react instantly to changes in their internal state or external environment without complex, brittle if/else chains.

Events

An Event in EMOS monitors a specific ROS2 Topic, and defines a triggering condition based on the incoming topic data. You can write natural Python expressions (e.g., topic.msg.data > 5) to define exactly when an event should trigger the associated Action(s).

  • Compose Logic - Combine triggers using simple Pythonic syntax ((lidar_clear) & (goal_seen)).

  • Fuse Data - Monitor multiple topics simultaneously via a synchronized Blackboard that ensures data freshness.

  • Stay Fast - All evaluation happens asynchronously in a dedicated worker pool. Your main component loop never blocks.

Think in Behaviors

Events are designed to be read like a sentence: “If the battery is low AND we are far from home, THEN navigate to the charging dock.”

Tip

Events can be paired with EMOS Action(s) or with any standard ROS2 Launch Action

Defining Events

The Event API uses a fluent, expressive syntax that allows you to access ROS2 message attributes directly via topic.msg.

Basic Single-Topic Event

from ros_sugar.core import Event
from ros_sugar.io import Topic

# 1. Define the Source
# `data_timeout` parameter is optional. It ensures data is considered "stale" after 0.5s
battery = Topic(name="/battery_level", msg_type="Float32", data_timeout=0.5)

# 2. Define the Event
# Triggers when percentage drops below 20%
low_batt_event = Event(battery.msg.data < 20.0)

Composed Conditions (Logic & Multi-Topic)

You can combine multiple conditions using standard Python bitwise operators (&, |, ~) to create complex behavioral triggers. Events can also span multiple different topics. EMOS automatically manages a “Blackboard” of the latest messages from all involved topics, ensuring synchronization and data “freshness”.

  • Example: Trigger a “Stop” event only if an obstacle is detected AND the robot is currently in “Auto” mode.

from ros_sugar.core import Event
from ros_sugar.io import Topic

lidar_topic = Topic(name="/person_detected", msg_type="Bool", data_timeout=0.5)
status_topic = Topic(name="/robot_mode", msg_type="String", data_timeout=60.0)

# Complex Multi-Topic Condition
emergency_stop_event = Event((lidar_topic.msg.data.is_true()) & (status_topic.msg.data == "AUTO"))

Handling Stale Data

When combining multiple topics, data synchronization is critical. Use the data_timeout parameter on your Topic definition to ensure you never act on old sensor data.

Event Configuration

Refine when and how the event triggers using these parameters:

  • On Change (on_change=True) - Triggers only when the condition transitions from False to True (Edge Trigger). Useful for state transitions (e.g., “Goal Reached”) rather than continuous firing.

  • On Any (Topic) - If you pass the Topic object itself as the condition, the event triggers on every received message, regardless of content.

  • Handle Once (handle_once=True) - The event will fire exactly one time during the lifecycle of the system. Useful for initialization sequences.

  • Event Delay (keep_event_delay=2.0) - Prevents rapid firing (debouncing). Ignores subsequent triggers for the specified duration (in seconds).

Supported Conditional Operators

You can use standard Python operators or specific helper methods on any topic attribute to define the event triggering condition.

Operator / Method

Description

Example

==, !=

Equality checks.

topic.msg.status == "IDLE"

>, >=, <, <=

Numeric comparisons.

topic.msg.temperature > 75.0

.is_true()

Boolean True check.

topic.msg.is_ready.is_true()

.is_false(), ~

Boolean False check.

topic.msg.is_ready.is_false() or ~topic.msg.is_ready

.is_in(list)

Value exists in a list.

topic.msg.mode.is_in(["AUTO", "TELEOP"])

.not_in(list)

Value is not in a list.

topic.msg.id.not_in([0, 1])

.contains(val)

String/List contains a value.

topic.msg.description.contains("error")

.contains_any(list)

List contains at least one of the values.

topic.msg.error_codes.contains_any([404, 500])

.contains_all(list)

List contains all of the values.

topic.msg.detections.labels.contains_all(["window", "desk"])

.not_contains_any(list)

List contains none of the values.

topic.msg.active_ids.not_contains_any([99, 100])

Event Usage Examples

Automatic Adaptation (Terrain Switching)

Scenario: A perception or ML node publishes a string to /terrain_type. We want to change the robot’s gait when the terrain changes.

quadruped_controller.py
 1from typing import Literal
 2from ros_sugar.component import BaseComponent
 3
 4class QuadrupedController(BaseComponent):
 5    def __init__(self, *args, **kwargs):
 6        super().__init__(*args, **kwargs)
 7        # Some logic
 8
 9    def switch_gait_controller(self, controller_type: Literal['stairs', 'sand', 'snow', 'gravel']):
10        self.get_logger().info("New terrain detected! Switching gait.")
11        # Logic to change controller parameters...
quadruped_controller_recipe.py
 1from my_pkg.components import QuadrupedController
 2from ros_sugar.core import Event, Action
 3from ros_sugar.io import Topic
 4from ros_sugar import Launcher
 5
 6quad_controller = QuadrupedController(component_name="quadruped_controller")
 7
 8# Define the Event Topic
 9terrain_topic = Topic(name="/terrain_type", msg_type="String")
10
11# Define the Event
12# Logic: Trigger when the detected terrain changes
13# on_change=True ensures we only trigger the switch the FIRST time stairs are seen.
14# Add an optional delay to prevent rapid event triggering
15event_terrain_changed = Event(terrain_topic, on_change=True, keep_event_delay=60.0)
16
17# Define the Action
18# Call self.switch_gait_controller() when triggered and pass the detected terrain to the method
19change_gait_action = Action(method=self.activate_stairs_controller, args=(terrain_topic.msg.data))
20
21# Register
22my_launcher = Launcher()
23my_launcher.add_pkg(
24            components=[quad_controller],
25            events_actions={stairs_event: change_gait_action},
26        )

Autonomous Drone Safety

Scenario: An autonomous drone stops if an obstacle is close OR the bumper is hit. It also sends a warning if the battery is low AND we are far from the land.

from ros_sugar.core import Event, Action
from ros_sugar.io import Topic

# --- Topics ---
proximity_sensor   = Topic(name="/radar_front", msg_type="Float32", data_timeout=0.2)
bumper  = Topic(name="/bumper", msg_type="Bool", data_timeout=0.1)
battery = Topic(name="/battery", msg_type="Float32")
location = Topic(name="/pose", msg_type="Pose")

# --- Conditions ---
# 1. Safety Condition (Composite OR)
# Stop if proximity_sensor < 0.2m OR Bumper is Hit
is_danger = (proximity_sensor.msg.data < 0.2) | (bumper.msg.data.is_true())

# 2. Return Home Condition (Composite AND)
# Return if Battery < 20% AND Distance > 100m
needs_return = (battery.msg.data < 20.0) & (location.position.z > 100.0)

# --- Events ---
safety_event = Event(is_danger)

return_event = Event(needs_return, on_change=True)

Actions

Executable context-aware behaviors for your robotic system.

Actions are not just static function calls; they are dynamic, context-aware routines that can adapt their parameters in real-time based on live system data.

They can represent:

  • Component Behaviors — Routines defined within your components. e.g., Stopping the robot, executing a motion pattern, or saying a sentence.

  • System Behaviors — Lifecycle management, configuration and plumbing. e.g., Reconfiguring a node, restarting a driver, or re-routing input streams.

  • User Custom Behaviors — Arbitrary Python functions. e.g., Calling an external REST API, logging to a file, or sending a slack notification.

Trigger Mechanisms

Actions sit dormant until activated by one of two mechanisms:

  • Event-Driven (Reflexive) - Triggered instantly when a specific Event condition is met. Example: “Obstacle Detected” \(\rightarrow\) stop_robot()

  • Fallback-Driven (Restorative) - Triggered automatically by a Component when its internal Health Status degrades. Example: “Camera Driver Failed” \(\rightarrow\) restart_driver()

The Action Class

At its core, the Action class is a wrapper around any Python callable. It packages a function along with its arguments, preparing them for execution at runtime.

But unlike standard Python functions, EMOS Actions possess a superpower: Dynamic Data Injection. You can bind their arguments directly to live ROS2 Topics, allowing the Action to fetch the latest topic message or a specific message argument the moment it triggers.

class Action:
    def __init__(self, method: Callable, args: tuple = (), kwargs: Optional[Dict] = None):
  • method: The function or routine to execute.

  • args: Positional arguments (can be static values OR dynamic Topic values).

  • kwargs: Keyword arguments (can be static values OR dynamic Topic values).

Basic Usage

from ros_sugar.component import BaseComponent
from ros_sugar.core import Action
import logging

def custom_routine():
    logging.info("I am executing an action!")

my_component = BaseComponent(node_name='test_component')

# 1. Component Method
action1 = Action(method=my_component.start)

# 2. Method with keyword arguments
action2 = Action(method=my_component.update_parameter, kwargs={"param_name": "fallback_rate", "new_value": 1000})

# 3. External Function
action3 = Action(method=custom_routine)

Dynamic Data Injection

This is EMOS’s superpower.

You can create complex, context-aware behaviors without writing any “glue code” or custom parsers.

When you bind an Action argument to a Topic, the system automatically resolves the binding at runtime, fetching the current value from the topic attributes and injecting it into your function.

Example: Cross-Topic Data Access

Scenario: An event occurs on Topic 1. You want to log a message that includes the current status from Topic 2 and a sensor reading from Topic 3.

from ros_sugar.core import Event, Action
from ros_sugar.io import Topic

# 1. Define Topics
topic_1 = Topic(name="system_alarm", msg_type="Bool")
topic_2 = Topic(name="robot_mode", msg_type="String")
topic_3 = Topic(name="battery_voltage", msg_type="Float32")

# 2. Define the Event
# Trigger when Topic 1 becomes True
event_on_first_topic = Event(topic_1.msg.data.is_true())

# 3. Define the Target Function
def log_context_message(mode, voltage):
    print(f"System Alarm! Current Mode: {mode}, Voltage: {voltage}V")

# 4. Define the Dynamic Action
# We bind the function arguments directly to the data fields of Topic 2 and Topic 3
my_action = Action(
    method=log_context_message,
    # At runtime, these are replaced by the actual values from the topics
    args=(topic_2.msg.data, topic_3.msg.data)
)

Pre-defined Actions

EMOS provides a suite of pre-defined, thread-safe actions for managing components and system resources via the ros_sugar.actions module.

Import Note

All pre-defined actions are keyword-only arguments. They can be imported directly: from ros_sugar.actions import start, stop, reconfigure

Component-Level Actions

These actions directly manipulate the state or configuration of a specific BaseComponent derived object.

Action Method

Arguments

Description

start

component

Triggers the component’s Lifecycle transition to Active.

stop

component

Triggers the component’s Lifecycle transition to Inactive.

restart

component
wait_time (opt)

Stops the component, waits wait_time seconds (default 0), and Starts it again.

reconfigure

component
new_config
keep_alive

Reloads the component with a new configuration object or file path.
keep_alive=True (default) keeps the node running during update.

update_parameter

component
param_name
new_value
keep_alive

Updates a single configuration parameter.

update_parameters

component
params_names
new_values
keep_alive

Updates multiple configuration parameters simultaneously.

send_component_service_request

component
srv_request_msg

Sends a request to the component’s main service with a specific message.

trigger_component_service

component

Triggers the component’s main service.
Creates the request message dynamically during runtime from the incoming Event topic data.

send_component_action_server_goal

component
request_msg

Sends a goal to the component’s main action server with a specific message.

trigger_component_action_server

component

Triggers the component’s main action server.
Creates the request message dynamically during runtime from the incoming Event topic data.

System-Level Actions

These actions interact with the broader ROS2 system and are executed by the central Monitor.

Action Method

Arguments

Description

log

msg
logger_name (opt)

Logs a message to the ROS console.

publish_message

topic
msg
publish_rate/period

Publishes a specific message to a topic. Can be single-shot or periodic.

send_srv_request

srv_name
srv_type
srv_request_msg

Sends a request to a ROS 2 Service with a specific message.

trigger_service

srv_name
srv_type

Triggers the a given ROS2 service.

send_action_goal

server_name
server_type
request_msg

Sends a specific goal to a ROS 2 Action Server.

trigger_action_server

server_name
server_type

Triggers a given ROS2 action server.

Automatic Data Conversion

When using trigger_* actions paired with an Event, EMOS attempts to create the required service/action request from the incoming Event topic data automatically via duck typing.

If automatic conversion is not possible, or if the action is not paired with an Event, it sends a default (empty) request.