Skip to main content

Roboticks SDK Architecture Summary

Executive Overview

The Roboticks SDK is a C++17-based robotics framework for edge devices with:
  • Unified session management for tracking robot execution lifecycle
  • AWS IoT MQTT integration for cloud communication
  • Multi-tier artifact management (logs, recordings, telemetry)
  • Background thread-based collectors for logs and session data
  • HTTP REST client for device registration and capsule downloads
  • File-based persistence with auto-recovery on restart

1. Session Management Architecture

Session Lifecycle (Session.hpp/cpp)

CREATED → ACTIVE → [PAUSED ↔ ACTIVE] → COMPLETED → UPLOADED
           ↓ (error)
         FAILED
Key States:
  • CREATED: Session initialized but not started
  • ACTIVE: Running and collecting data
  • PAUSED: Temporarily suspended
  • COMPLETED: Finished successfully
  • UPLOADING: Artifacts being sent to backend
  • UPLOADED: All data transferred
  • FAILED: Terminated with error
Session Data Structure:
struct SessionArtifact {
    std::string name;           // e.g., "runtime.log"
    std::string type;           // log, recording, telemetry, map, etc.
    std::string file_path;      // Local storage path
    size_t size_bytes;          // File size (TODO: needs implementation)
    uint64_t created_at_us;     // Timestamp in microseconds
};

class Session {
    std::string session_id_;
    std::atomic<SessionStatus> status_;      // Thread-safe status
    uint64_t start_time_us_;
    uint64_t end_time_us_;
    std::unordered_map<std::string, std::string> metadata_;
    std::vector<SessionArtifact> artifacts_;
    std::mutex mutex_;                       // Protects non-atomic members
};
Key Methods:
  • start() - Begin session (CREATED → ACTIVE)
  • pause() / resume() - Pause/resume execution
  • complete() - Finish session (records end time)
  • fail(reason) - Terminate with error
  • addArtifact() - Register collected file
  • getArtifacts() - Retrieve all collected files
  • toJson() - Serialize for storage/transmission
Important Details:
  • Uses std::atomic<SessionStatus> for thread-safe state transitions
  • Mutex protects collections and metadata map
  • Destructor auto-completes active sessions on shutdown
  • Times stored in microseconds (epoch-based)

2. DeviceManager Overview

Core Responsibilities

The DeviceManager is a singleton facade that:
  1. Manages device registration with backend (one-time, cached in /etc/roboticks/)
  2. Maintains active sessions (create, pause, resume, complete)
  3. Publishes telemetry via MQTT to AWS IoT Core
  4. Collects logs from modules via ZeroMQ
  5. Handles commands from backend (via MQTT subscriptions)
  6. Manages storage (sessions directory with active/completed subdirs)

Configuration

struct DeviceManagerConfig {
    // Device identification (backend-assigned during registration)
    std::string dsn;                // Device Serial Number
    std::string project_secret;     // For auto-registration
    std::string device_type;        // DRONE, ROBOT, CAMERA, etc.
    std::string iot_endpoint;       // AWS IoT endpoint

    // Timing
    uint32_t heartbeat_interval_ms;      // 30000 (30s default)
    uint32_t command_poll_interval_ms;   // 5000 (polling disabled, using MQTT)

    // Storage
    std::string storage_path;            // /var/roboticks/sessions
    size_t max_local_storage_mb;         // 1024
    bool auto_upload;                    // false (manual upload)
    bool offline_buffering;              // true

    // Log collection
    size_t log_buffer_size;              // 10000 (ring buffer)
    size_t log_batch_size;               // 25 logs per upload
    uint32_t log_upload_interval_ms;     // 30000 (30s timeout)
    std::string log_topic;               // "/roboticks/logs" (ZeroMQ)
};

Storage Directory Structure

/var/roboticks/sessions/
├── active/                          # Currently running sessions
│   └── {session_id}/
│       ├── session.json             # Session metadata
│       ├── runtime.log              # Runtime logs
│       └── {module_name}.log        # Per-module logs
├── completed/                       # Finished, not yet uploaded
│   └── {session_id}/
│       ├── session.json
│       └── artifacts/
├── logs/                           # Legacy log storage (TODO)
└── {config_path}                   # YAML device config (if specified)

3. Initialization & Registration Flow

Step 1: Load Configuration

  • Checks multiple locations for device.yaml
  • Falls back to defaults if not found
  • Loads from YAML or environment variables

Step 2: Check for Existing Registration

If certificates exist in /etc/roboticks/:
  1. Load DSN from device_id file
  2. Load IoT endpoint from config.json
  3. Configure HTTP client with mTLS
  4. Create IoT client connection to AWS
If certificates don’t exist:
  1. Check for project_secret (required for auto-registration)
  2. Call registerDevice() → POST /api/v1/fleet/devices/register
  3. Backend responds with certificate + DSN
  4. Save to /etc/roboticks/ (600 permissions for private key)
  5. Proceed with IoT connection

Step 3: Initialize Subsystems

  1. Messaging System - ZeroMQ transport for internal comms (binds early)
  2. Session Storage - Create directories, cleanup leftover sessions
  3. MQTT Connection - Subscribe to command topic
  4. Log Collection - Create ZeroMQ subscriber + start upload thread
  5. Background Threads - Start heartbeat (disabled command polling)

Step 4: Auto-Start Session

  • Creates initial session with metadata
  • Publishes creation to backend via MQTT

Step 5: Write Ready File

  • Creates /tmp/roboticks-device-ready for orchestration tools

4. MQTT Communication Patterns

Topics & QoS

FLEET_HEARTBEAT        = "roboticks/fleet/heartbeat"           (QoS 0)
FLEET_SESSIONS         = "roboticks/fleet/sessions"            (QoS 1)
FLEET_LOGS             = "roboticks/fleet/logs"                (QoS 1)
DEVICE_COMMANDS_PREFIX = "roboticks/devices/{dsn}/commands"    (subscribed)

Heartbeat Flow (every 30s)

Device → MQTT → Backend
{
    "dsn": "device-123",
    "status": "ONLINE",
    "cpu_usage_percent": 45.5,
    "memory_usage_percent": 62.3,
    // ... other telemetry
}

Session Lifecycle Events

1. CREATE: {"action": "create", "session_id": "...", "device_id": "...", "status": "active"}
2. COMPLETE: {"action": "complete", "session_id": "...", "status": "completed", "duration_seconds": 123.45}
3. CLEANUP: {"action": "complete", "status": "interrupted", "cleanup_reason": "device_restart"}

Command Handling

  • Subscribed to topic at startup (not polling)
  • Callback: onMqttCommandReceived() → parse JSON → find handler → execute
  • Command types: DEPLOY_CAPSULE, RESTART, UPDATE_CONFIG, START_SESSION, STOP_SESSION, OTA_UPDATE

Log Upload

MQTT Topic: "roboticks/fleet/logs"
Payload: JSON array of 25 logs (batched)
{
    "logs": [
        {
            "session_id": "...",
            "module_name": "camera_driver",
            "level": "INFO",
            "message": "Frame captured",
            "timestamp_us": 1731520000000000
        }
    ],
    "batch_size": 25,
    "timestamp_us": 1731520000000000
}

5. Log Collection System

Architecture

Modules (ZeroMQ PUB)

    /roboticks/logs topic (ZeroMQ SUB)

DeviceManager::onLogReceived()

Ring Buffer (max 10,000 logs)

Upload Thread (batch 25 logs every 30s)

MQTT Publish → AWS IoT Core

Backend Lambda → Database

Log Buffer Management

std::deque<LogMessage> log_buffer_;              // Ring buffer
std::condition_variable log_upload_cv_;          // Signal upload thread
std::atomic<uint64_t> total_logs_collected_;     // Statistics
std::atomic<uint64_t> total_logs_uploaded_;
std::atomic<uint64_t> total_logs_failed_;
std::atomic<uint64_t> total_logs_dropped_;       // Overflow drops

Upload Strategy

  • Batch Trigger 1: Buffer reaches 25 logs → upload immediately
  • Batch Trigger 2: 30 seconds timeout → upload whatever is buffered
  • On Failure: Requeue failed logs back to front (bounded by buffer size)
  • On Shutdown: Flush all remaining logs before exit

6. HTTP Communication

Client Setup

http_client_.setBaseUrl("https://api.roboticks.io");
http_client_.setTimeout(30000);
http_client_.setClientCertificate(cert_path, key_path);

Key Endpoints

POST   /api/v1/fleet/devices/register
       → Returns: certificate, private_key, public_key, iot_endpoint, role_alias, dsn

GET    /api/v1/fleet/devices/{dsn}/commands
       → Returns: [Command] array

POST   /api/v1/fleet/commands/{command_id}/ack
       → Acknowledges command completion

POST   /api/v1/fleet/deployments/{capsule_id}/status
       → Reports deployment progress

GET    /api/v1/capsules/{capsule_id}/download
       → Downloads capsule tar.gz file

POST   /api/v1/sessions/{session_id}/artifacts
       → Uploads session artifacts

File Upload Method

  • Uses multipart/form-data
  • Supports progress callbacks
  • Implemented via libcurl

7. Background Threads

Heartbeat Thread

Location: DeviceManager::heartbeatThread()
Interval: config_.heartbeat_interval_ms (30s default)
Work:     sendHeartbeat(device_info)
Control:  heartbeat_running_ atomic flag

Command Polling Thread

Location: DeviceManager::commandPollingThread()
Status:   DISABLED (using MQTT subscriptions instead)
Interval: config_.command_poll_interval_ms (would be 5s)

Log Upload Thread

Location: DeviceManager::logUploadThread()
Trigger:  Buffer size ≥ 25 logs OR 30s timeout
Work:     uploadLogBatch() → MQTT publish
Wait:     condition_variable with timeout
Control:  log_upload_running_ atomic flag

Shutdown Flow

signal(SIGINT/SIGTERM) → shutdown_requested = 1

main loop breaks       ↓
stopHeartbeat()        ↓
stopCommandPolling()   ↓
shutdownLogCollection() (flushes buffer)

finalizeSession()      ↓
saveSessionToFile()    ↓
Device Manager cleaned up

8. Teardown & Cleanup

DeviceManager::shutdown()

  1. Stop heartbeat thread
  2. Stop command polling thread
  3. Shutdown log collection (flush all buffered logs)
  4. Finalize active session (publish completion event)
  5. Save all sessions to disk
  6. Mark as uninitialized

Session Destruction

  • Destructor auto-completes active sessions
  • All files persisted to disk before cleanup

File Cleanup

  • Old sessions removed by cleanupOldSessions(30) (>30 days old)
  • Incomplete sessions recovered on startup
  • Session files saved to /var/roboticks/sessions/{active|completed}/{session_id}/

9. Signal Handling

Setup (in device_manager_main.cpp)

static volatile sig_atomic_t shutdown_requested = 0;

void signalHandler(int signal) {
    LOG_ERROR("[DeviceManager] Received signal {}, shutting down...", signal);
    shutdown_requested = 1;
}

// In main():
signal(SIGINT, signalHandler);   // Ctrl+C
signal(SIGTERM, signalHandler);  // Graceful shutdown

Main Loop

while (!shutdown_requested) {
    std::this_thread::sleep_for(std::chrono::seconds(5));

    collectRuntimeLogs();
    collectModuleLogs();

    if (iteration % 12 == 0)  // Every 60s
        uploadCompletedSessions();

    if (iteration % 720 == 0) // Every hour
        cleanupOldSessions(30);

    iteration++;
}

10. File Handling & Artifacts

Current Implementation

bool Session::addArtifact(const std::string& name,
                         const std::string& type,
                         const std::string& file_path) {
    SessionArtifact artifact;
    artifact.name = name;
    artifact.type = type;
    artifact.file_path = file_path;
    artifact.size_bytes = 0;  // ← TODO: Get actual file size
    artifact.created_at_us = current_timestamp();

    artifacts_.push_back(artifact);
    return true;
}

Collected File Types

  • runtime.log - Main runtime log
  • {module_name}.log - Per-module logs
  • (Future) recording - Video/sensor recordings
  • (Future) telemetry - Raw sensor data
  • (Future) map - Generated maps/point clouds

Collection Methods

collectRuntimeLogs()  → Copy /var/roboticks/logs/runtime.log
collectModuleLogs()   → Find all *.log in /var/roboticks/logs/modules/

Storage Layout

/var/roboticks/sessions/active/{session_id}/
├── session.json              (metadata)
├── runtime.log               (collected)
├── perception_node.log       (collected)
├── control_node.log          (collected)
└── (more files as collected)

11. Key Design Patterns

Patterns Used

PatternLocationPurpose
SingletonDeviceManagerGlobal device state, thread-safe
FacadeDeviceManagerUnified interface to complex subsystems
ObserverEvent callbacksDecouple session events from handlers
Ring BufferLog bufferFixed memory, auto-drop oldest on overflow
Producer-ConsumerLog systemAsync log collection ↔ upload threads
RAIILock guardsException-safe mutex protection
Atomic typesFlags/countersLock-free thread synchronization

Thread Safety

  • std::atomic<bool> for flags (stop signals, ready states)
  • std::mutex + std::lock_guard<> for collections (sessions, logs, handlers)
  • std::condition_variable for thread coordination (log upload)
  • All callbacks execute in caller’s thread (no spawning)

File: /Users/mujacic/roboticks-sdk/packages/roboticks-device/include/roboticks/device/Session.hpp Add methods to Session class:
// Add to Session class
class Session {
    // ... existing methods ...

    // File upload to backend
    bool uploadArtifact(const std::string& artifact_name, const std::string& api_endpoint);
    bool uploadAllArtifacts(const std::string& api_endpoint);

    // Batch upload with progress tracking
    bool uploadArtifactBatch(const std::vector<std::string>& artifact_names,
                            const std::string& api_endpoint,
                            std::function<void(size_t completed, size_t total)> progress_cb);
};
Storage: Add to Session private members:
std::unordered_map<std::string, uint64_t> artifact_upload_sizes_;  // Track file sizes
std::unordered_map<std::string, SessionStatus> artifact_upload_status_;
Benefits:
  • Sessions already track artifacts
  • Fits existing lifecycle (COMPLETED → UPLOADING → UPLOADED)
  • Can reuse existing HTTP client

Option B: Extend DeviceManager Upload (Better for Device-Level)

File: /Users/mujacic/roboticks-sdk/packages/roboticks-device/src/DeviceManager.cpp Add to DeviceManager:
bool DeviceManager::uploadSessionArtifacts(const std::string& session_id) {
    auto session = getSession(session_id);
    if (!session) return false;

    auto artifacts = session->getArtifacts();

    // Batch upload via HTTP multipart
    for (const auto& artifact : artifacts) {
        if (!uploadFileArtifact(session_id, artifact)) {
            return false;
        }
    }

    // Mark session as UPLOADING → UPLOADED
    session->setStatus(SessionStatus::UPLOADING);
    // ... update status to UPLOADED after success

    return true;
}
Benefits:
  • Device-level control of uploads
  • Can rate-limit/batch multiple sessions
  • Better for offline buffering strategy

Option C: New Artifact Upload Thread (Best for Scale)

File: Create new ArtifactUploader class
class ArtifactUploader {
    void uploadThread();  // Background upload of completed sessions
    void enqueueSession(const std::string& session_id);

private:
    std::queue<std::string> upload_queue_;
    std::thread upload_thread_;
    std::atomic<bool> running_;
    // ... with same thread patterns as log uploader
};
Benefits:
  • Parallel uploads don’t block log system
  • Independent retry logic
  • Scales to multiple sessions

13. Integration Points Summary

For File Upload Implementation

Must Use:
  1. Session::addArtifact() - Register files before upload
  2. DeviceManager::http_client_ - HTTP requests (already with mTLS)
  3. DeviceManager::iot_client_ - Publish upload status events
  4. config_.storage_path - Know where files are stored
Must Respect:
  1. ✅ Thread safety - Use std::lock_guard + mutexes
  2. ✅ Shutdown sequence - Check shutdown_requested flag
  3. ✅ Signal handling - Don’t block graceful SIGTERM
  4. ✅ MQTT QoS - Use QoS=1 for important updates
Can Reuse:
  1. http_client_.uploadFile() - Already supports multipart uploads
  2. ✅ Progress callback pattern - Used in capsule download
  3. ✅ Status reporting - Device already publishes to backend
  4. ✅ Event system - registerEventCallback() for progress tracking

14. Current TODOs in Codebase

LocationTODOPriority
Session.cpp:104Get actual file size on addArtifactHigh
DeviceManager.cpp:1162Implement actual upload to backendHigh
DeviceManager.cpp:1247Implement session file serializationHigh
DeviceManager.cpp:1253Load sessions from disk on startupMedium
VariousSupport multipart form uploadsHigh
VariousOffline buffering (queue uploads)Medium
VariousRetry logic for failed uploadsMedium

Summary Table

ComponentLanguageFilesPurpose
SessionC++Session.hpp/cppSession lifecycle & artifact tracking
DeviceManagerC++DeviceManager.hpp/cppDevice orchestration & cloud sync
IoTClientC++IoTClient.hpp/cppAWS MQTT communication
HttpClientC++HttpClient.hpp/cppREST API communication
DeviceManagerMainC++device_manager_main.cppEntrypoint & signal handling
ConfigurationYAMLdevice.yamlRuntime config
CertificatesPEM/etc/roboticks/*Device identity (mTLS)
Session StorageJSON/var/roboticks/sessions/*Persistent session state

Quick Reference: Where to Add What

Need to...                          → Add to...
─────────────────────────────────────────────────────────
Upload a file to backend            → DeviceManager::uploadSessionArtifacts()
Collect a new artifact type         → DeviceManager::collectModuleLogs() pattern
Handle device lifecycle event       → registerEventCallback() in run()
Add new MQTT topic                  → topics namespace + subscribe/publish
Implement new command type          → setupDefaultCommandHandlers()
Store file metadata                 → Session::artifact_* private members
Retry failed uploads                → logUploadThread() pattern for queuing
Monitor upload progress             → http_client_.uploadFile(progress_cb)