# Events & Actions **Dynamic behavior switching based on real-time environmental context.** EMOS's Event-Driven architecture enables dynamic behavior switching based on real-time environmental context. This allows robots to react instantly to changes in their internal state or external environment without complex, brittle if/else chains. ## Events An Event in EMOS monitors a specific **ROS2 Topic**, and defines a triggering condition based on the incoming topic data. You can write natural Python expressions (e.g., `topic.msg.data > 5`) to define exactly when an event should trigger the associated Action(s). - {material-regular}`hub;1.5em;sd-text-primary` Compose Logic - Combine triggers using simple Pythonic syntax (`(lidar_clear) & (goal_seen)`). - {material-regular}`sync;1.5em;sd-text-primary` Fuse Data - Monitor multiple topics simultaneously via a synchronized **Blackboard** that ensures data freshness. - {material-regular}`speed;1.5em;sd-text-primary` Stay Fast - All evaluation happens asynchronously in a dedicated worker pool. Your main component loop **never blocks**. :::{admonition} Think in Behaviors :class: tip Events are designed to be read like a sentence: *"If the battery is low AND we are far from home, THEN navigate to the charging dock."* ::: :::{tip} Events can be paired with EMOS [`Action`](#actions)(s) or with any standard [ROS2 Launch Action](https://docs.ros.org/en/kilted/Tutorials/Intermediate/Launch/Using-Event-Handlers.html) ::: ### Defining Events The Event API uses a fluent, expressive syntax that allows you to access ROS2 message attributes directly via `topic.msg`. #### Basic Single-Topic Event ```python from ros_sugar.core import Event from ros_sugar.io import Topic # 1. Define the Source # `data_timeout` parameter is optional. It ensures data is considered "stale" after 0.5s battery = Topic(name="/battery_level", msg_type="Float32", data_timeout=0.5) # 2. Define the Event # Triggers when percentage drops below 20% low_batt_event = Event(battery.msg.data < 20.0) ``` #### Composed Conditions (Logic & Multi-Topic) You can combine multiple conditions using standard Python bitwise operators (`&`, `|`, `~`) to create complex behavioral triggers. Events can also span multiple different topics. EMOS automatically manages a "Blackboard" of the latest messages from all involved topics, ensuring synchronization and data "freshness". - **Example**: Trigger a "Stop" event only if an obstacle is detected AND the robot is currently in "Auto" mode. ```python from ros_sugar.core import Event from ros_sugar.io import Topic lidar_topic = Topic(name="/person_detected", msg_type="Bool", data_timeout=0.5) status_topic = Topic(name="/robot_mode", msg_type="String", data_timeout=60.0) # Complex Multi-Topic Condition emergency_stop_event = Event((lidar_topic.msg.data.is_true()) & (status_topic.msg.data == "AUTO")) ``` :::{admonition} Handling Stale Data :class: warning When combining multiple topics, data synchronization is critical. Use the `data_timeout` parameter on your `Topic` definition to ensure you never act on old sensor data. ::: ### Event Configuration Refine *when* and *how* the event triggers using these parameters: * {material-regular}`change_circle` On Change (`on_change=True`) - Triggers **only** when the condition transitions from `False` to `True` (Edge Trigger). Useful for state transitions (e.g., "Goal Reached") rather than continuous firing. * {material-regular}`all_inclusive` On Any (`Topic`) - If you pass the `Topic` object itself as the condition, the event triggers on **every received message**, regardless of content. * {material-regular}`looks_one` Handle Once (`handle_once=True`) - The event will fire exactly one time during the lifecycle of the system. Useful for initialization sequences. * {material-regular}`timer` Event Delay (`keep_event_delay=2.0`) - Prevents rapid firing (debouncing). Ignores subsequent triggers for the specified duration (in seconds). ### Supported Conditional Operators You can use standard Python operators or specific helper methods on any topic attribute to define the event triggering condition. | Operator / Method | Description | Example | | :--- | :--- | :--- | | **`==`**, **`!=`** | Equality checks. | `topic.msg.status == "IDLE"` | | **`>`**, **`>=`**, **`<`**, **`<=`** | Numeric comparisons. | `topic.msg.temperature > 75.0` | | **`.is_true()`** | Boolean True check. | `topic.msg.is_ready.is_true()` | | **`.is_false()`**, **`~`** | Boolean False check. | `topic.msg.is_ready.is_false()` or `~topic.msg.is_ready` | | **`.is_in(list)`** | Value exists in a list. | `topic.msg.mode.is_in(["AUTO", "TELEOP"])` | | **`.not_in(list)`** | Value is not in a list. | `topic.msg.id.not_in([0, 1])` | | **`.contains(val)`** | String/List contains a value. | `topic.msg.description.contains("error")` | | **`.contains_any(list)`** | List contains *at least one* of the values. | `topic.msg.error_codes.contains_any([404, 500])` | | **`.contains_all(list)`** | List contains *all* of the values. | `topic.msg.detections.labels.contains_all(["window", "desk"])` | | **`.not_contains_any(list)`** | List contains *none* of the values. | `topic.msg.active_ids.not_contains_any([99, 100])` | ### Event Usage Examples #### Automatic Adaptation (Terrain Switching) Scenario: A perception or ML node publishes a string to `/terrain_type`. We want to change the robot's gait when the terrain changes. ```{code-block} python :caption: quadruped_controller.py :linenos: from typing import Literal from ros_sugar.component import BaseComponent class QuadrupedController(BaseComponent): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) # Some logic def switch_gait_controller(self, controller_type: Literal['stairs', 'sand', 'snow', 'gravel']): self.get_logger().info("New terrain detected! Switching gait.") # Logic to change controller parameters... ``` ```{code-block} python :caption: quadruped_controller_recipe.py :linenos: from my_pkg.components import QuadrupedController from ros_sugar.core import Event, Action from ros_sugar.io import Topic from ros_sugar import Launcher quad_controller = QuadrupedController(component_name="quadruped_controller") # Define the Event Topic terrain_topic = Topic(name="/terrain_type", msg_type="String") # Define the Event # Logic: Trigger when the detected terrain changes # on_change=True ensures we only trigger the switch the FIRST time stairs are seen. # Add an optional delay to prevent rapid event triggering event_terrain_changed = Event(terrain_topic, on_change=True, keep_event_delay=60.0) # Define the Action # Call self.switch_gait_controller() when triggered and pass the detected terrain to the method change_gait_action = Action(method=self.activate_stairs_controller, args=(terrain_topic.msg.data)) # Register my_launcher = Launcher() my_launcher.add_pkg( components=[quad_controller], events_actions={stairs_event: change_gait_action}, ) ``` #### Autonomous Drone Safety Scenario: An autonomous drone **stops** if an obstacle is close OR the bumper is hit. It also sends a warning if the battery is low AND we are far from the land. ```python from ros_sugar.core import Event, Action from ros_sugar.io import Topic # --- Topics --- proximity_sensor = Topic(name="/radar_front", msg_type="Float32", data_timeout=0.2) bumper = Topic(name="/bumper", msg_type="Bool", data_timeout=0.1) battery = Topic(name="/battery", msg_type="Float32") location = Topic(name="/pose", msg_type="Pose") # --- Conditions --- # 1. Safety Condition (Composite OR) # Stop if proximity_sensor < 0.2m OR Bumper is Hit is_danger = (proximity_sensor.msg.data < 0.2) | (bumper.msg.data.is_true()) # 2. Return Home Condition (Composite AND) # Return if Battery < 20% AND Distance > 100m needs_return = (battery.msg.data < 20.0) & (location.position.z > 100.0) # --- Events --- safety_event = Event(is_danger) return_event = Event(needs_return, on_change=True) ``` --- ## Actions **Executable context-aware behaviors for your robotic system.** Actions are not just static function calls; they are **dynamic, context-aware routines** that can adapt their parameters in real-time based on live system data. They can represent: - {material-regular}`smart_toy;1.2em;sd-text-primary` Component Behaviors — Routines defined within your components. *e.g., Stopping the robot, executing a motion pattern, or saying a sentence.* - {material-regular}`settings;1.2em;sd-text-primary` System Behaviors — Lifecycle management, configuration and plumbing. *e.g., Reconfiguring a node, restarting a driver, or re-routing input streams.* - {material-regular}`extension;1.2em;sd-text-primary` User Custom Behaviors — Arbitrary Python functions. *e.g., Calling an external REST API, logging to a file, or sending a slack notification.* ### Trigger Mechanisms Actions sit dormant until activated by one of two mechanisms: - {material-regular}`flash_on;1.2em;sd-text-primary` Event-Driven (Reflexive) - Triggered instantly when a specific **Event** condition is met. **Example:** "Obstacle Detected" $\rightarrow$ `stop_robot()` - {material-regular}`healing;1.2em;sd-text-primary` Fallback-Driven (Restorative) - Triggered automatically by a Component when its internal **Health Status** degrades. **Example:** "Camera Driver Failed" $\rightarrow$ `restart_driver()` ### The `Action` Class At its core, the `Action` class is a wrapper around any Python callable. It packages a function along with its arguments, preparing them for execution at runtime. But unlike standard Python functions, EMOS Actions possess a superpower: [Dynamic Data Injection](#dynamic-data-injection). You can bind their arguments directly to live ROS2 Topics, allowing the Action to fetch the latest topic message or a specific message argument the moment it triggers. ```python class Action: def __init__(self, method: Callable, args: tuple = (), kwargs: Optional[Dict] = None): ``` - `method`: The function or routine to execute. - `args`: Positional arguments (can be static values OR dynamic Topic values). - `kwargs`: Keyword arguments (can be static values OR dynamic Topic values). ### Basic Usage ```python from ros_sugar.component import BaseComponent from ros_sugar.core import Action import logging def custom_routine(): logging.info("I am executing an action!") my_component = BaseComponent(node_name='test_component') # 1. Component Method action1 = Action(method=my_component.start) # 2. Method with keyword arguments action2 = Action(method=my_component.update_parameter, kwargs={"param_name": "fallback_rate", "new_value": 1000}) # 3. External Function action3 = Action(method=custom_routine) ``` ### Dynamic Data Injection **This is EMOS's superpower.** You can create complex, context-aware behaviors without writing any "glue code" or custom parsers. When you bind an Action argument to a `Topic`, the system automatically resolves the binding at runtime, fetching the current value from the topic attributes and injecting it into your function. #### Example: Cross-Topic Data Access **Scenario**: An event occurs on Topic 1. You want to log a message that includes the current status from Topic 2 and a sensor reading from Topic 3. ```python from ros_sugar.core import Event, Action from ros_sugar.io import Topic # 1. Define Topics topic_1 = Topic(name="system_alarm", msg_type="Bool") topic_2 = Topic(name="robot_mode", msg_type="String") topic_3 = Topic(name="battery_voltage", msg_type="Float32") # 2. Define the Event # Trigger when Topic 1 becomes True event_on_first_topic = Event(topic_1.msg.data.is_true()) # 3. Define the Target Function def log_context_message(mode, voltage): print(f"System Alarm! Current Mode: {mode}, Voltage: {voltage}V") # 4. Define the Dynamic Action # We bind the function arguments directly to the data fields of Topic 2 and Topic 3 my_action = Action( method=log_context_message, # At runtime, these are replaced by the actual values from the topics args=(topic_2.msg.data, topic_3.msg.data) ) ``` ### Pre-defined Actions EMOS provides a suite of pre-defined, thread-safe actions for managing components and system resources via the `ros_sugar.actions` module. :::{admonition} Import Note :class: tip All pre-defined actions are **keyword-only** arguments. They can be imported directly: `from ros_sugar.actions import start, stop, reconfigure` ::: #### Component-Level Actions These actions directly manipulate the state or configuration of a specific `BaseComponent` derived object. | Action Method | Arguments | Description | | :-------------------------------------- | :------------------------------------------------------------ | :------------------------------------------------------------------------------------------------------------------------------------------ | | **`start`** | `component` | Triggers the component's Lifecycle transition to **Active**. | | **`stop`** | `component` | Triggers the component's Lifecycle transition to **Inactive**. | | **`restart`** | `component`
`wait_time` (opt) | Stops the component, waits `wait_time` seconds (default 0), and Starts it again. | | **`reconfigure`** | `component`
`new_config`
`keep_alive` | Reloads the component with a new configuration object or file path.
`keep_alive=True` (default) keeps the node running during update. | | **`update_parameter`** | `component`
`param_name`
`new_value`
`keep_alive` | Updates a **single** configuration parameter. | | **`update_parameters`** | `component`
`params_names`
`new_values`
`keep_alive` | Updates **multiple** configuration parameters simultaneously. | | **`send_component_service_request`** | `component`
`srv_request_msg` | Sends a request to the component's main service with a specific message. | | **`trigger_component_service`** | `component` | Triggers the component's main service.
Creates the request message dynamically during runtime from the incoming Event topic data. | | **`send_component_action_server_goal`** | `component`
`request_msg` | Sends a goal to the component's main action server with a specific message. | | **`trigger_component_action_server`** | `component` | Triggers the component's main action server.
Creates the request message dynamically during runtime from the incoming Event topic data. | #### System-Level Actions These actions interact with the broader ROS2 system and are executed by the central `Monitor`. | Action Method | Arguments | Description | | :-------------------------- | :---------------------------------------------- | :----------------------------------------------------------------------- | | **`log`** | `msg`
`logger_name` (opt) | Logs a message to the ROS console. | | **`publish_message`** | `topic`
`msg`
`publish_rate`/`period` | Publishes a specific message to a topic. Can be single-shot or periodic. | | **`send_srv_request`** | `srv_name`
`srv_type`
`srv_request_msg` | Sends a request to a ROS 2 Service with a specific message. | | **`trigger_service`** | `srv_name`
`srv_type` | Triggers the a given ROS2 service. | | **`send_action_goal`** | `server_name`
`server_type`
`request_msg` | Sends a specific goal to a ROS 2 Action Server. | | **`trigger_action_server`** | `server_name`
`server_type` | Triggers a given ROS2 action server. | :::{admonition} Automatic Data Conversion :class: note When using **`trigger_*`** actions paired with an Event, EMOS attempts to create the required service/action request from the incoming Event topic data automatically via **duck typing**. If automatic conversion is not possible, or if the action is not paired with an Event, it sends a default (empty) request. :::