Skip to main content

SDK MQTT Topic and Session UUID Changes

Date: 2025-11-12 Status: ✅ Complete - Ready for Testing

Overview

This document details the SDK changes required to support the unified MQTT topic structure and backend-generated session UUIDs.

What Changed

1. Dynamic MQTT Topics

Before:
// Static topics shared by all devices
constexpr const char* FLEET_HEARTBEAT = "roboticks/fleet/heartbeat";
constexpr const char* FLEET_SESSIONS = "roboticks/fleet/sessions";
constexpr const char* FLEET_LOGS = "roboticks/fleet/logs";
After:
// Dynamic topics constructed with device_id after registration
std::string heartbeat_topic_;        // roboticks/fleet/{device_id}/heartbeat
std::string sessions_topic_;         // roboticks/fleet/{device_id}/sessions
std::string logs_topic_;             // roboticks/fleet/{device_id}/logs
std::string session_response_topic_; // roboticks/fleet/{device_id}/session/response

2. Session Creation Workflow

Before:
// Device generated UUID locally
std::string session_id = generateSessionId();
auto session = std::make_shared<Session>(session_id, metadata);

// Published to MQTT with device-generated UUID
json session_msg = {
    {"action", "create"},
    {"session_id", session_id},  // Device-generated
    ...
};
After:
// 1. Publish request WITHOUT session_id
json session_msg = {
    {"action", "create"},
    {"name", session_name},
    {"status", "active"},
    {"metadata", metadata}
    // NO session_id - backend will generate it
};

// 2. Wait for backend response (with 30 second timeout)
session_response_cv_.wait_for(lock, std::chrono::seconds(30), ...);

// 3. Create Session object with backend UUID
std::string backend_session_id = pending_session_id_;
auto session = std::make_shared<Session>(backend_session_id, metadata);

Files Modified

Header File: DeviceManager.hpp

Location: packages/roboticks-device/include/roboticks/device/DeviceManager.hpp Changes:
  1. Added Member Variables (lines 451-464):
// Dynamic MQTT topics (constructed with device_id after registration)
std::string heartbeat_topic_;
std::string sessions_topic_;
std::string logs_topic_;
std::string session_response_topic_;
std::string file_upload_request_topic_;
std::string file_upload_response_topic_;
std::string commands_topic_;

// Session response handling
std::string pending_session_id_;  // Temporary storage for backend UUID
bool waiting_for_session_response_;
mutable std::mutex session_response_mutex_;
std::condition_variable session_response_cv_;
  1. Added Method Declaration (line 325):
void onMqttSessionResponse(const std::string& topic, const std::string& payload);

Implementation File: DeviceManager.cpp

Location: packages/roboticks-device/src/DeviceManager.cpp Changes:
  1. Updated Topic Constants (lines 42-53):
// MQTT Topic Constants - Unified structure: roboticks/fleet/{device_id}/*
// Topics are built dynamically with device_id after registration
namespace topics {
    constexpr const char* FLEET_COMMANDS = "roboticks/fleet/commands";
    // Per-device topics are constructed after registration:
    // - roboticks/fleet/{device_id}/heartbeat (publish)
    // - roboticks/fleet/{device_id}/sessions (publish)
    // - roboticks/fleet/{device_id}/logs (publish)
    // - roboticks/fleet/{device_id}/session/response (subscribe)
    // - roboticks/fleet/{device_id}/file_upload/request (publish)
    // - roboticks/fleet/{device_id}/file_upload/response (subscribe)
} // namespace topics
  1. Dynamic Topic Construction (lines 147-155, 217-225):
After loading existing certificate:
// Build dynamic MQTT topics with device_id
heartbeat_topic_ = "roboticks/fleet/" + config_.dsn + "/heartbeat";
sessions_topic_ = "roboticks/fleet/" + config_.dsn + "/sessions";
logs_topic_ = "roboticks/fleet/" + config_.dsn + "/logs";
session_response_topic_ = "roboticks/fleet/" + config_.dsn + "/session/response";
file_upload_request_topic_ = "roboticks/fleet/" + config_.dsn + "/file_upload/request";
file_upload_response_topic_ = "roboticks/fleet/" + config_.dsn + "/file_upload/response";
commands_topic_ = "roboticks/fleet/commands";  // Broadcast topic
LOG_INFO("MQTT topics constructed for device: {}", config_.dsn);
After device registration (same code repeated).
  1. Subscribe to Session Response Topic (lines 178-199):
// Subscribe to broadcast command topic
bool subscribed_commands = iot_client_->subscribe(commands_topic_, 1,
    [this](const std::string& topic, const std::string& payload) {
        this->onMqttCommandReceived(topic, payload);
    });

// Subscribe to session response topic to receive backend-generated UUIDs
bool subscribed_session = iot_client_->subscribe(session_response_topic_, 1,
    [this](const std::string& topic, const std::string& payload) {
        this->onMqttSessionResponse(topic, payload);
    });
  1. Session Response Handler (lines 1068-1113):
void DeviceManager::onMqttSessionResponse(const std::string& topic, const std::string& payload) {
    LOG_INFO("Received MQTT session response, payload size: {} bytes", payload.size());

    try {
        json response = json::parse(payload);

        if (!response.contains("session_id")) {
            LOG_ERROR("Session response missing session_id field");
            return;
        }

        std::string backend_session_id = response["session_id"];
        LOG_INFO("Received backend-generated session_id: {}", backend_session_id);

        // Store the received session ID for the waiting createSession call
        {
            std::lock_guard<std::mutex> lock(session_response_mutex_);
            pending_session_id_ = backend_session_id;
            waiting_for_session_response_ = false;
        }

        // Notify the waiting thread
        session_response_cv_.notify_all();

    } catch (const std::exception& e) {
        LOG_ERROR("Error processing MQTT session response: {}", e.what());
        std::lock_guard<std::mutex> lock(session_response_mutex_);
        waiting_for_session_response_ = false;
        session_response_cv_.notify_all();
    }
}
  1. Refactored createSession Method (lines 1119-1215):
Key Changes:
  • Check IoT connection before starting
  • Set waiting flag and publish request WITHOUT session_id
  • Wait for backend response with 30-second timeout
  • Create Session object only after receiving backend UUID
  • Return nullptr if timeout or error
Complete flow:
std::shared_ptr<Session> DeviceManager::createSession(
    const std::unordered_map<std::string, std::string>& metadata,
    bool auto_start) {

    // Step 1: Prepare to wait for response
    {
        std::lock_guard<std::mutex> lock(session_response_mutex_);
        pending_session_id_.clear();
        waiting_for_session_response_ = true;
    }

    // Step 2: Publish creation request (WITHOUT session_id)
    json session_msg = {
        {"action", "create"},
        {"status", auto_start ? "active" : "pending"},
        {"name", session_name},
        {"metadata", metadata}
    };
    iot_client_->publish(sessions_topic_, session_msg.dump(), 1);

    // Step 3: Wait for backend response
    std::unique_lock<std::mutex> lock(session_response_mutex_);
    bool received = session_response_cv_.wait_for(lock, std::chrono::seconds(30),
        [this]() { return !waiting_for_session_response_; });

    if (!received) {
        LOG_ERROR("Timeout waiting for session creation response");
        return nullptr;
    }

    backend_session_id = pending_session_id_;

    // Step 4: Create Session object with backend UUID
    auto session = std::make_shared<Session>(backend_session_id, metadata);
    sessions_[backend_session_id] = session;

    return session;
}
  1. Updated All MQTT Publish Calls:
  • Heartbeat: iot_client_->publish(heartbeat_topic_, ...)
  • Sessions: iot_client_->publish(sessions_topic_, ...)
  • Logs: iot_client_->publish(logs_topic_, ...)

Key Behavior Changes

Session Creation Flow

Old Behavior:
  1. Device calls createSession()
  2. Device generates UUID locally
  3. Device creates Session object immediately
  4. Device publishes session info to MQTT
  5. Backend stores session with device UUID
New Behavior:
  1. Device calls createSession()
  2. Device publishes MQTT request WITHOUT session_id
  3. Device waits for backend response (blocking call)
  4. Backend generates UUID via PostgreSQL
  5. Backend publishes UUID back to device via session/response topic
  6. Device receives UUID and creates Session object
  7. Session ready for use

Timeout Handling

If backend doesn’t respond within 30 seconds:
  • createSession() returns nullptr
  • Logs error: “Timeout waiting for session creation response”
  • Application must handle null session pointer

Topic Isolation

Each device now has its own topic namespace:
Device ROBOT-123456:
  Publishes to:
    - roboticks/fleet/ROBOT-123456/heartbeat
    - roboticks/fleet/ROBOT-123456/sessions
    - roboticks/fleet/ROBOT-123456/logs
    - roboticks/fleet/ROBOT-123456/file_upload/request

  Subscribes to:
    - roboticks/fleet/commands (broadcast)
    - roboticks/fleet/ROBOT-123456/session/response
    - roboticks/fleet/ROBOT-123456/file_upload/response

Testing Instructions

Build SDK

cd ~/roboticks-sdk
./build.sh

Test Session Creation

// In your application code
auto& device_mgr = DeviceManager::getInstance();

// Initialize device manager
DeviceManagerConfig config = loadConfig("config.yaml");
device_mgr.initialize(config);

// Create session (this will now block until backend responds)
std::unordered_map<std::string, std::string> metadata;
metadata["name"] = "Test Session";
metadata["location"] = "Lab A";

auto session = device_mgr.createSession(metadata, true);

if (!session) {
    LOG_ERROR("Failed to create session - backend timeout or error");
    return false;
}

LOG_INFO("Session created with backend UUID: {}", session->getSessionId());

Verify MQTT Traffic

Monitor CloudWatch logs for: Device publishes to: roboticks/fleet/{device_id}/sessions
{
  "action": "create",
  "name": "Test Session",
  "status": "active",
  "metadata": {
    "location": "Lab A"
  }
}
Backend responds to: roboticks/fleet/{device_id}/session/response
{
  "session_id": "a3ce880a-89c4-f903-0f1d-53e23562f29b",
  "name": "Test Session",
  "status": "active",
  "s3_base_path": "sessions/a3ce880a-89c4-f903-0f1d-53e23562f29b/"
}

Expected Logs

[DeviceManager] Loaded DSN: ROBOT-123456
[DeviceManager] MQTT topics constructed for device: ROBOT-123456
[DeviceManager] Connected to AWS IoT Core
[DeviceManager] Subscribed to MQTT command topic: roboticks/fleet/commands
[DeviceManager] Subscribed to MQTT session response topic: roboticks/fleet/ROBOT-123456/session/response
[DeviceManager] Session creation request published via MQTT, waiting for backend response...
[DeviceManager] Received MQTT session response, payload size: 234 bytes
[DeviceManager] Received backend-generated session_id: a3ce880a-89c4-f903-0f1d-53e23562f29b
[DeviceManager] Session created successfully with backend UUID: a3ce880a-89c4-f903-0f1d-53e23562f29b

Error Scenarios

1. Backend Timeout

[DeviceManager] Session creation request published via MQTT, waiting for backend response...
[ERROR] Timeout waiting for session creation response from backend
Cause: Backend didn’t respond within 30 seconds Resolution: Check backend Lambda logs, verify IoT Rule is active

2. Connection Lost

[ERROR] Cannot create session: IoT client not connected
Cause: Device not connected to AWS IoT Core Resolution: Check certificate, endpoint, network connectivity

3. Invalid Response

[ERROR] Session response missing session_id field
Cause: Backend sent malformed response Resolution: Check Lambda function implementation

Migration Notes

Breaking Changes

  1. Session Creation is Now Blocking
    • Old: createSession() returned immediately
    • New: createSession() blocks up to 30 seconds
  2. Session Creation Can Fail
    • Old: Always returned a Session object
    • New: Returns nullptr on timeout/error
  3. Application Must Check for Null
    auto session = device_mgr.createSession(metadata, true);
    if (!session) {
        // Handle error
    }
    

Backward Compatibility

Not backward compatible with old backend infrastructure:
  • Devices with updated SDK require updated backend
  • Old SDK devices will fail with new backend (403 errors)
  • Gradual rollout recommended: update backend first, then devices

Performance Impact

  • Session Creation Latency: +100-150ms (backend round-trip)
  • Heartbeat/Logs: No change (already async)
  • Memory: +7 string member variables per DeviceManager instance

Rollback Plan

If issues arise with SDK:
  1. Revert to previous SDK version:
    cd ~/roboticks-sdk
    git checkout <previous-commit>
    ./build.sh
    
  2. Backend must also be rolled back (topics are incompatible)

References