Skip to main content

Messaging System

The Roboticks SDK provides a distributed publish/subscribe messaging system for inter-module communication.

Overview

Modules communicate by publishing messages to topics and subscribing to receive messages from topics. This decouples modules and enables flexible system composition.

Defining Ports in YAML

The easiest way to create publishers and subscribers is through YAML configuration:
module:
  tasks:
    - name: ProcessorTask
      inputs:
        - name: sensor_data
          topic: "/sensors/lidar"
          message_type: "roboticks.messages.sensors.PointCloud"
          qos: "BEST_EFFORT"

      outputs:
        - name: processed
          topic: "/processed/data"
          message_type: "roboticks.messages.sensors.PointCloud"
          qos: "RELIABLE"
This generates:
  • sensor_data_input_ - A subscriber to /sensors/lidar
  • processed_output_ - A publisher to /processed/data

Using Ports in Code

Publishing Messages

void MyTask::onUpdate() {
    // Create message
    roboticks::messages::sensors::PointCloud msg;
    msg.timestamp = getCurrentTime();
    msg.points = computePoints();

    // Publish using auto-generated port
    processed_output_->publish(msg);
}

Receiving Messages

void MyTask::onUpdate() {
    // Check if data is available
    if (sensor_data_input_->hasData()) {
        // Get the latest message
        const auto& msg = sensor_data_input_->getData();

        // Process the data
        processPointCloud(msg);
    }
}

Quality of Service (QoS)

The messaging system supports two QoS levels:
QoSUse CaseBehavior
RELIABLECommands, status, critical dataGuarantees delivery, may queue messages
BEST_EFFORTHigh-frequency sensors, streamingDrops messages if receiver is slow

Choosing QoS

# Use RELIABLE for important data
outputs:
  - name: command
    topic: "/robot/command"
    qos: "RELIABLE"

# Use BEST_EFFORT for sensor streams
inputs:
  - name: camera
    topic: "/camera/image"
    qos: "BEST_EFFORT"

Manual Port Creation

For advanced use cases, create ports programmatically:
#include <roboticks/messaging/Publisher.hpp>
#include <roboticks/messaging/Subscriber.hpp>

bool MyTask::onInitialize() {
    // Create publisher manually
    custom_pub_ = createPublisher<StringMessage>(
        "/custom/topic",
        QoS::RELIABLE
    );

    // Create subscriber with callback
    custom_sub_ = createSubscriber<StringMessage>(
        "/other/topic",
        [this](const StringMessage& msg) {
            handleMessage(msg);
        }
    );

    return true;
}

Message Types

Built-in Messages

The SDK includes common message types:
#include <roboticks/messages/common/StringMessage.hpp>
#include <roboticks/messages/common/NumberMessage.hpp>
#include <roboticks/messages/sensors/PointCloud.hpp>
#include <roboticks/messages/sensors/Image.hpp>

Defining Custom Messages

Create YAML message definitions in messages/definitions/:
# messages/definitions/my_messages.yaml
namespace: roboticks.messages.custom

messages:
  - name: RobotStatus
    fields:
      - name: robot_id
        type: string
      - name: battery_level
        type: float
      - name: position
        type: Vector3
      - name: is_active
        type: bool
Generate C++ headers:
python tools/generate_messages.py
# Creates messages/generated/RobotStatus.hpp

Using Custom Messages

#include <roboticks/messages/custom/RobotStatus.hpp>

void MyTask::publishStatus() {
    roboticks::messages::custom::RobotStatus status;
    status.robot_id = "robot-01";
    status.battery_level = 0.85f;
    status.position = {1.0, 2.0, 0.0};
    status.is_active = true;

    status_output_->publish(status);
}

Topic Naming Conventions

Use hierarchical topic names:
/sensors/lidar/points      # Sensor data
/sensors/camera/image
/control/velocity          # Control commands
/control/arm/position
/status/robot/health       # Status updates
/status/battery/level

Multi-Robot Communication

For fleet-wide communication, use namespaced topics:
# Robot 1
outputs:
  - name: status
    topic: "/robot1/status"

# Robot 2
inputs:
  - name: robot1_status
    topic: "/robot1/status"

Performance Tips

Instead of publishing every sensor reading individually, batch them:
if (buffer_.size() >= BATCH_SIZE) {
    output_->publish(buffer_);
    buffer_.clear();
}
Configure subscriber buffers based on your needs:
inputs:
  - name: fast_sensor
    buffer_size: 10  # Keep last 10 messages
Process messages asynchronously if they take time:
void onMessage(const Data& msg) {
    queue_.push(msg);  // Quick enqueue
    // Process in onUpdate()
}

Next Steps

Examples

See complete working examples