Documentation Index Fetch the complete documentation index at: https://docs.roboticks.io/llms.txt
Use this file to discover all available pages before exploring further.
Messaging System
The Roboticks SDK provides a distributed publish/subscribe messaging system for inter-module communication.
Overview
Modules communicate by publishing messages to topics and subscribing to receive messages from topics. This decouples modules and enables flexible system composition.
Defining Ports in YAML
The easiest way to create publishers and subscribers is through YAML configuration:
module :
tasks :
- name : ProcessorTask
inputs :
- name : sensor_data
topic : "/sensors/lidar"
message_type : "roboticks.messages.sensors.PointCloud"
qos : "BEST_EFFORT"
outputs :
- name : processed
topic : "/processed/data"
message_type : "roboticks.messages.sensors.PointCloud"
qos : "RELIABLE"
This generates:
sensor_data_input_ - A subscriber to /sensors/lidar
processed_output_ - A publisher to /processed/data
Using Ports in Code
Publishing Messages
void MyTask :: onUpdate () {
// Create message
roboticks :: messages :: sensors ::PointCloud msg;
msg . timestamp = getCurrentTime ();
msg . points = computePoints ();
// Publish using auto-generated port
processed_output_ -> publish (msg);
}
Receiving Messages
void MyTask :: onUpdate () {
// Check if data is available
if ( sensor_data_input_ -> hasData ()) {
// Get the latest message
const auto & msg = sensor_data_input_ -> getData ();
// Process the data
processPointCloud (msg);
}
}
Quality of Service (QoS)
The messaging system supports two QoS levels:
QoS Use Case Behavior RELIABLECommands, status, critical data Guarantees delivery, may queue messages BEST_EFFORTHigh-frequency sensors, streaming Drops messages if receiver is slow
Choosing QoS
# Use RELIABLE for important data
outputs :
- name : command
topic : "/robot/command"
qos : "RELIABLE"
# Use BEST_EFFORT for sensor streams
inputs :
- name : camera
topic : "/camera/image"
qos : "BEST_EFFORT"
Manual Port Creation
For advanced use cases, create ports programmatically:
#include <roboticks/messaging/Publisher.hpp>
#include <roboticks/messaging/Subscriber.hpp>
bool MyTask :: onInitialize () {
// Create publisher manually
custom_pub_ = createPublisher < StringMessage >(
"/custom/topic" ,
QoS ::RELIABLE
);
// Create subscriber with callback
custom_sub_ = createSubscriber < StringMessage >(
"/other/topic" ,
[ this ]( const StringMessage & msg ) {
handleMessage (msg);
}
);
return true ;
}
Message Types
Built-in Messages
The SDK includes common message types:
#include <roboticks/messages/common/StringMessage.hpp>
#include <roboticks/messages/common/NumberMessage.hpp>
#include <roboticks/messages/sensors/PointCloud.hpp>
#include <roboticks/messages/sensors/Image.hpp>
Defining Custom Messages
Create YAML message definitions in messages/definitions/:
# messages/definitions/my_messages.yaml
namespace : roboticks.messages.custom
messages :
- name : RobotStatus
fields :
- name : robot_id
type : string
- name : battery_level
type : float
- name : position
type : Vector3
- name : is_active
type : bool
Generate C++ headers:
python tools/generate_messages.py
# Creates messages/generated/RobotStatus.hpp
Using Custom Messages
#include <roboticks/messages/custom/RobotStatus.hpp>
void MyTask :: publishStatus () {
roboticks :: messages :: custom ::RobotStatus status;
status . robot_id = "robot-01" ;
status . battery_level = 0.85 f ;
status . position = { 1.0 , 2.0 , 0.0 };
status . is_active = true ;
status_output_ -> publish (status);
}
Topic Naming Conventions
Use hierarchical topic names:
/sensors/lidar/points # Sensor data
/sensors/camera/image
/control/velocity # Control commands
/control/arm/position
/status/robot/health # Status updates
/status/battery/level
Multi-Robot Communication
For fleet-wide communication, use namespaced topics:
# Robot 1
outputs :
- name : status
topic : "/robot1/status"
# Robot 2
inputs :
- name : robot1_status
topic : "/robot1/status"
Performance Tips
Batch high-frequency data
Instead of publishing every sensor reading individually, batch them: if ( buffer_ . size () >= BATCH_SIZE) {
output_ -> publish (buffer_);
buffer_ . clear ();
}
Use appropriate buffer sizes
Configure subscriber buffers based on your needs: inputs :
- name : fast_sensor
buffer_size : 10 # Keep last 10 messages
Avoid blocking in callbacks
Process messages asynchronously if they take time: void onMessage ( const Data & msg ) {
queue_ . push (msg); // Quick enqueue
// Process in onUpdate()
}
Next Steps
Examples See complete working examples