Skip to main content

MQTT Topic Structure Migration Guide

Overview

This document describes the unified MQTT topic structure for Roboticks fleet device communication and the migration from the old structure to the new one.

New Topic Structure

All device communication now uses the consistent pattern: roboticks/fleet/{device_id}/*

Device → Backend (Publish Topics)

TopicQoSDescription
roboticks/fleet/{device_id}/heartbeat0Device status updates (metrics, location)
roboticks/fleet/{device_id}/sessions1Session lifecycle (create/complete)
roboticks/fleet/{device_id}/logs1Batched log messages
roboticks/fleet/{device_id}/file_upload/request1Request presigned URLs for S3 uploads

Backend → Device (Subscribe Topics)

TopicQoSDescription
roboticks/fleet/{device_id}/session/response1Session creation responses with UUID
roboticks/fleet/{device_id}/file_upload/response1Presigned URL responses
roboticks/fleet/commands1Broadcast commands (all devices)

Old vs New Structure

Before (Mixed Structure)

roboticks/fleet/heartbeat                          # All devices publish here
roboticks/fleet/sessions                           # All devices publish here
roboticks/fleet/logs                               # All devices publish here
roboticks/devices/{device_id}/commands             # Per-device commands
roboticks/fleet/{device_id}/file_upload/request    # Per-device (already correct)

After (Unified Structure)

roboticks/fleet/{device_id}/heartbeat              # Per-device publishing
roboticks/fleet/{device_id}/sessions               # Per-device publishing
roboticks/fleet/{device_id}/logs                   # Per-device publishing
roboticks/fleet/commands                           # Broadcast to all devices
roboticks/fleet/{device_id}/session/response       # Per-device responses
roboticks/fleet/{device_id}/file_upload/request    # Per-device (unchanged)
roboticks/fleet/{device_id}/file_upload/response   # Per-device responses

Security Enhancement

The new structure enables certificate-based device verification:
  1. Topic-based Device ID: Device ID is extracted from topic path using topic(3) in IoT Rules
  2. Certificate-based Verification: Lambda functions verify the device_id from topic matches the certificate
  3. Defense Against Spoofing: A device cannot publish to another device’s topic without the correct certificate

Lambda Verification Pattern

# Extract device_id from topic (via IoT Rule)
device_id_from_topic = event.get("device_id")

# Extract certificate ARN (via IoT Rule principal())
certificate_arn = event.get("certificate_arn")

# Query database: certificate_id → device_id_dsn
device_info = get_device_info_from_certificate(certificate_id)

# SECURITY: Verify match
if device_info['device_id_dsn'] != device_id_from_topic:
    return {"statusCode": 403, "body": "Device ID mismatch"}

Session Creation Workflow

New Request/Response Flow

1. Device Publishes Session Create (WITHOUT session_id)
Topic: roboticks/fleet/{device_id}/sessions
Payload: {
  "action": "create",
  "name": "My Session",
  "status": "active",
  "metadata": {...},
  "started_at": 1699564800000
}
2. Backend Generates UUID
  • Lambda validates certificate matches device_id
  • Inserts session into database WITHOUT session_id
  • PostgreSQL generates UUID automatically via default value
  • Lambda retrieves generated UUID using RETURNING clause
3. Backend Responds with UUID
Topic: roboticks/fleet/{device_id}/session/response
Payload: {
  "session_id": "a3ce880a-89c4-f903-0f1d-53e23562f29b",
  "name": "My Session",
  "status": "active",
  "s3_base_path": "sessions/a3ce880a-89c4-f903-0f1d-53e23562f29b/"
}
4. Device Uses Received UUID
  • Store session_id for all subsequent operations
  • Include in log messages
  • Include in file upload requests
  • Send in completion message

Old Flow (Deprecated)

# Device generated UUID and sent it (NO LONGER VALID)
{
  "action": "create",
  "session_id": "device-generated-uuid",  # Remove this
  ...
}

S3 Path Structure

New Simplified Structure

sessions/{session_id}/
  ├── file1.jpg
  ├── file2.bag
  └── logs/
      └── batch_001.json

Old Structure (Deprecated)

sessions/{device_id}/{session_id}/
  └── files...
Rationale:
  • Session UUIDs are globally unique
  • Device info preserved in session metadata
  • Simpler path structure
  • Works even if device is deleted

Migration Steps

Backend Infrastructure (✅ Completed)

  1. IoT Policy Updated
    • File: infrastructure/lib/roboticks-stack.ts:146-190
    • All topics now use roboticks/fleet/{device_id}/* pattern
    • Removed legacy topic permissions
  2. IoT Rules Updated
    • All rules extract device_id using topic(3)
    • DeviceHeartbeatRule: roboticks/fleet/+/heartbeat
    • SessionRule: roboticks/fleet/+/sessions
    • DeviceLogsRule: roboticks/fleet/+/logs
  3. Lambda Functions Updated
    • Added device_id verification in all handlers
    • session-handler publishes responses back to device
    • presigned-url-handler uses UUID session_id
  4. Database Schema Updated
    • Added session_id UUID column with auto-generation
    • Changed S3 paths to sessions/{session_id}/
    • Device info stored in metadata

Device SDK (🔄 In Progress)

Required changes in roboticks-sdk:
  1. Update MQTT Topics (DeviceManager.cpp lines 43-48)
    // OLD
    const std::string HEARTBEAT_TOPIC = "roboticks/fleet/heartbeat";
    const std::string SESSIONS_TOPIC = "roboticks/fleet/sessions";
    const std::string LOGS_TOPIC = "roboticks/fleet/logs";
    
    // NEW
    const std::string HEARTBEAT_TOPIC = "roboticks/fleet/" + device_id_ + "/heartbeat";
    const std::string SESSIONS_TOPIC = "roboticks/fleet/" + device_id_ + "/sessions";
    const std::string LOGS_TOPIC = "roboticks/fleet/" + device_id_ + "/logs";
    
  2. Subscribe to Session Response
    std::string session_response_topic = "roboticks/fleet/" + device_id_ + "/session/response";
    iot_client_->subscribe(session_response_topic, AWS_MQTT_QOS_AT_LEAST_ONCE,
                          [this](auto msg) { handleSessionResponse(msg); });
    
  3. Remove Manual UUID Generation
    • Delete code that generates session_id on device
    • Remove session_id from create payload
    • Wait for backend response
  4. Handle Session Response
    void DeviceManager::handleSessionResponse(const std::string& payload) {
        auto response = nlohmann::json::parse(payload);
        std::string session_id = response["session_id"];
        std::string s3_base_path = response["s3_base_path"];
    
        // Store for use in logs, files, completion
        current_session_id_ = session_id;
        current_s3_base_path_ = s3_base_path;
    }
    
  5. Use Received UUID
    • Tag all logs with received session_id
    • Include in file upload requests
    • Send in completion message

IoT Policy Changes

Before

{
  "Effect": "Allow",
  "Action": ["iot:Publish"],
  "Resource": [
    "arn:aws:iot:region:account:topic/roboticks/fleet/heartbeat",
    "arn:aws:iot:region:account:topic/roboticks/fleet/sessions",
    "arn:aws:iot:region:account:topic/roboticks/fleet/logs"
  ]
}

After

{
  "Effect": "Allow",
  "Action": ["iot:Publish"],
  "Resource": [
    "arn:aws:iot:region:account:topic/roboticks/fleet/${iot:Connection.Thing.ThingName}/heartbeat",
    "arn:aws:iot:region:account:topic/roboticks/fleet/${iot:Connection.Thing.ThingName}/sessions",
    "arn:aws:iot:region:account:topic/roboticks/fleet/${iot:Connection.Thing.ThingName}/logs",
    "arn:aws:iot:region:account:topic/roboticks/fleet/${iot:Connection.Thing.ThingName}/file_upload/request"
  ]
}

Deployment

Backend Deployment

cd infrastructure
cdk diff --all       # Review changes
cdk deploy --all     # Deploy updates

SDK Updates

cd roboticks-sdk
# Update code per migration steps above
./build.sh
# Test with staging environment first

Testing Checklist

  • Device can connect with new topic structure
  • Heartbeats publish to correct per-device topic
  • Session create receives UUID response from backend
  • Logs include backend-provided session_id
  • File uploads use backend-provided session_id
  • Session completion works with UUID
  • Security verification blocks mismatched device_id

Rollback Plan

If issues arise:
  1. Backend: Previous CDK stack can be redeployed
  2. IoT Policy: Reverts automatically with CDK
  3. SDK: Keep old binary version available
  4. Database: Migration has downgrade script

References

  • Infrastructure Stack: infrastructure/lib/roboticks-stack.ts
  • Lambda Functions: infrastructure/lambda/session-handler/index.py
  • SDK Device Manager: roboticks-sdk/packages/roboticks-device/src/DeviceManager.cpp
  • Architecture Design: docs/ARCHITECTURE_DESIGN.md