Skip to main content

SDK File Upload Implementation - Complete

Overview

This document describes the complete implementation of file upload functionality in the roboticks-SDK, integrating with AWS IoT presigned URLs for secure, direct-to-S3 uploads.

Implementation Summary

1. ✅ Session Control (Completed Previously)

  • Disabled auto-session creation
  • Added startSession() and stopSession() methods
  • MQTT-based remote commands (START_SESSION, STOP_SESSION)
  • ZeroMQ-based module-to-DeviceManager commands
  • HelloWorld module demonstration

2. ✅ File Upload During Session Teardown

Implementation Location: DeviceManager.cpp lines 1808-2033

Key Methods

uploadSessionFiles(session_id) - Main upload orchestrator
  • Retrieves all artifacts from completed session
  • Subscribes to file upload response topic
  • Requests presigned URL for each artifact
  • Waits for responses with 30-second timeout
  • Returns true if all files uploaded successfully
requestPresignedUrl(session_id, filename, content_type, size_mb) - Request handler
  • Creates FileUploadRequest tracking structure
  • Publishes to roboticks/fleet/{dsn}/file_upload/request with:
    {
      "session_id": 123,
      "filename": "image.jpg",
      "content_type": "image/jpeg",
      "size_mb": 5
    }
    
  • Marks request as pending for callback tracking
onFileUploadResponse(topic, payload) - Response handler
  • Parses presigned URL and form fields from Lambda response
  • Extracts filename from S3 key
  • Calls uploadFileToS3() with presigned URL
  • Updates pending upload state
  • Notifies waiting threads via condition variable
uploadFileToS3(file_path, presigned_url, form_fields) - S3 uploader
  • TODO: Implement actual S3 upload using libcurl
  • Should perform multipart/form-data POST with:
    • All form_fields from presigned response
    • File content as last field
  • Currently logs operation and returns success (placeholder)

Content Type Detection

Automatically determines content type based on file extension:
  • .logtext/plain
  • .jsonapplication/json
  • .jpg, .jpegimage/jpeg
  • .pngimage/png
  • Default: application/octet-stream

Integration with Session Finalization

Modified finalizeSession() to call uploadSessionFiles() after completing session:
// Upload session files to S3 using presigned URLs
LOG_INFO("Uploading session files for: {}", session_id);
bool files_uploaded = uploadSessionFiles(session_id);

if (files_uploaded) {
    LOG_INFO("Session files uploaded successfully: {}", session_id);
} else {
    LOG_WARN("Some session files failed to upload: {}", session_id);
    // Continue anyway - session is still finalized
}

3. ✅ Signal Handling for Graceful Teardown

Implementation Location: DeviceManager.cpp lines 27-40, 237-242, 306-307

Signal Handler Setup

Global State:
static DeviceManager* g_device_manager_instance = nullptr;

static void signalHandler(int signal) {
    std::cout << "\n[DeviceManager] Received signal " << signal
              << " - initiating graceful shutdown" << std::endl;

    if (g_device_manager_instance) {
        g_device_manager_instance->shutdown();
    }

    std::signal(signal, SIG_DFL);
}
Registration (during initialize()):
// Register signal handlers for graceful shutdown
g_device_manager_instance = this;
std::signal(SIGINT, signalHandler);   // Ctrl+C
std::signal(SIGTERM, signalHandler);  // Docker stop / kill
LOG_INFO("Signal handlers registered (SIGINT, SIGTERM)");
Cleanup (during shutdown()):
g_device_manager_instance = nullptr;

Graceful Shutdown Flow

  1. Signal Received (SIGINT/SIGTERM)
  2. signalHandler() Called
  3. DeviceManager::shutdown() Triggered:
    • Stop heartbeat and command polling threads
    • Shutdown log collection (flush all logs)
    • Shutdown session control
    • Finalize active session (includes file upload)
    • Save all remaining sessions to disk
    • Disconnect IoT client
    • Clear global pointer
  4. Exit
This ensures that when a container is stopped (Docker, Kubernetes) or Ctrl+C is pressed:
  • Active session is properly completed
  • All files are uploaded to S3
  • Logs are flushed
  • Graceful cleanup occurs

4. ✅ Interrupted Session Handling

Implementation Location: DeviceManager.cpp lines 2432-2522

Cleanup on Startup

Modified cleanupLeftoverSessions() to handle interrupted sessions: Flow:
  1. Scan storage_path/active/ directory for session folders
  2. For each session directory:
    • Read session.json
    • Check if completed_at is null (incomplete session)
    • If incomplete:
      • Mark as “interrupted” status
      • Send completion message via MQTT:
        {
          "action": "complete",
          "session_id": "uuid",
          "device_id": "ROBOT-xxxxx",
          "status": "interrupted",
          "completed_at": 1234567890,
          "cleanup_reason": "device_restart"
        }
        
      • Load session from disk
      • Upload all session files via uploadSessionFiles()
      • Update session.json with completed_at and interrupted status
Key Code:
// Upload session files for interrupted session
LOG_INFO("Uploading files for interrupted session: {}", session_id);

auto loaded_session = loadSessionFromDisk(session_id);
if (loaded_session) {
    bool uploaded = uploadSessionFiles(session_id);
    if (uploaded) {
        LOG_INFO("Successfully uploaded files for interrupted session: {}", session_id);
    } else {
        LOG_WARN("Failed to upload some files for interrupted session: {}", session_id);
    }
} else {
    LOG_WARN("Could not load interrupted session for file upload: {}", session_id);
}
This ensures that even if a device crashes or loses power:
  • On next startup, interrupted sessions are detected
  • Files are uploaded to S3
  • Backend is notified of interrupted status
  • Data is not lost

Architecture Diagram

┌─────────────────────────────────────────────────────────────────────┐
│                     Session Lifecycle with File Upload               │
└─────────────────────────────────────────────────────────────────────┘

1. Session Start
   ├─> startSession() called
   ├─> Session status: CREATED → ACTIVE
   └─> MQTT publish: session created

2. Session Running
   ├─> Modules add artifacts via addArtifact()
   ├─> Logs collected
   └─> Data written to session directory

3. Session Stop (Normal)
   ├─> stopSession() called
   ├─> finalizeSession() invoked
   │   ├─> Complete session (status → COMPLETED)
   │   ├─> Save session.json to disk
   │   ├─> MQTT publish: session completed
   │   └─> uploadSessionFiles()
   │       ├─> For each artifact:
   │       │   ├─> Request presigned URL (MQTT)
   │       │   ├─> Wait for response
   │       │   └─> Upload to S3 via presigned URL
   │       └─> Mark session as UPLOADED
   └─> Trigger "completed" event

4. Session Stop (Signal - Ctrl+C / SIGTERM)
   ├─> signalHandler() triggered
   ├─> shutdown() called
   │   ├─> Flush logs
   │   ├─> Finalize active session
   │   │   └─> (Same as normal stop - includes file upload)
   │   └─> Save all sessions
   └─> Graceful exit

5. Interrupted Session (Crash/Power Loss)
   ├─> Next startup: initialize() called
   ├─> cleanupLeftoverSessions() runs
   │   ├─> Find sessions without completed_at
   │   ├─> Load session from disk
   │   ├─> Mark as "interrupted"
   │   ├─> MQTT publish: session interrupted
   │   ├─> uploadSessionFiles()
   │   │   └─> (Same as normal upload)
   │   └─> Update session.json
   └─> Continue normal operation

Data Structures

FileUploadRequest

struct FileUploadRequest {
    std::string session_id;
    std::string filename;
    std::string file_path;
    std::string content_type;
    size_t size_mb;
    bool pending;  // true until response received
};
Stored in: std::unordered_map<std::string, FileUploadRequest> pending_uploads_

Session Artifact (from Session.hpp)

struct SessionArtifact {
    std::string name;
    std::string type;  // log, recording, telemetry, map, etc.
    std::string file_path;
    size_t size_bytes;
    uint64_t created_at_us;
};

MQTT Topics

Device Publishes (Request)

  • Topic: roboticks/fleet/{device_id}/file_upload/request
  • Payload:
    {
      "session_id": 123,
      "filename": "image.jpg",
      "content_type": "image/jpeg",
      "size_mb": 5
    }
    

Device Subscribes (Response)

  • Topic: roboticks/fleet/{device_id}/file_upload/response
  • Payload:
    {
      "success": true,
      "url": "https://s3.amazonaws.com/...",
      "fields": {
        "key": "sessions/{device_id}/{session_id}/image.jpg",
        "AWSAccessKeyId": "...",
        "policy": "...",
        "signature": "...",
        ...
      }
    }
    

Session Events

  • Topic: roboticks/fleet/sessions
  • Actions: “create”, “complete”
  • Statuses: “active”, “completed”, “interrupted”

Backend Integration

Lambda: presigned-url-handler

Trigger: IoT Rule on roboticks/fleet/+/file_upload/request Validation:
  1. Extract certificate_arn from principal
  2. Extract device_id from topic(3)
  3. Query PostgreSQL to validate device exists
  4. Verify certificate matches device
  5. Verify session belongs to device
Response:
  • Generate presigned POST URL for S3
  • Include form fields for secure upload
  • Publish to device’s response topic
S3 Path: sessions/{device_id}/{session_id}/{filename}

Backend Session API

Updates:
  • Session create: Expect “active” status from devices
  • Session complete: Can be “completed” or “interrupted”
  • interrupted status: Device restarted, files uploaded on next boot

Usage Examples

Example 1: Module Adds Artifacts

// In module's onUpdate() or onStop()
auto& dm = roboticks::device::DeviceManager::getInstance();
auto session = dm.getActiveSession();

if (session) {
    // Add log file
    session->addArtifact("module.log", "log", "/var/roboticks/logs/module.log");

    // Add recording
    session->addArtifact("recording.mp4", "recording", "/data/recording.mp4");

    // Add telemetry
    session->addArtifact("telemetry.json", "telemetry", "/data/telemetry.json");
}

Example 2: Manual Session with Files

auto& dm = roboticks::device::DeviceManager::getInstance();

// Start session
dm.startSession();

auto session = dm.getActiveSession();

// ... do work, add artifacts ...
session->addArtifact("data.csv", "data", "/tmp/data.csv");

// Stop session (triggers file upload)
dm.stopSession();
// Files are automatically uploaded before stopSession() returns

Example 3: HelloWorld with Session

// In HelloWorldModule::onStart()
sendSessionControlCommand("start");

// In HelloWorldModule::onUpdate() after 60s
sendSessionControlCommand("stop");
// DeviceManager handles file upload automatically

Testing

Test Scenario 1: Normal Session Completion

  1. Start HelloWorld composition
  2. Session starts automatically
  3. Wait 60 seconds
  4. Session stops automatically
  5. Verify:
    • Session marked as completed in backend
    • Files appear in S3 under correct path
    • No pending uploads remain

Test Scenario 2: Graceful Shutdown (Ctrl+C)

  1. Start HelloWorld composition
  2. Session starts
  3. Press Ctrl+C after 30 seconds
  4. Verify:
    • Log shows “Received signal 2”
    • Log shows “Finalizing active session”
    • Log shows “Uploading session files”
    • Session marked as completed
    • Files uploaded to S3
    • Process exits cleanly

Test Scenario 3: Interrupted Session Recovery

  1. Start HelloWorld composition
  2. Session starts, files created
  3. Kill process forcefully: kill -9 <pid>
  4. Restart composition
  5. Verify:
    • Log shows “Found incomplete session from previous run”
    • Log shows “Uploading files for interrupted session”
    • Session marked as “interrupted” in backend
    • Files uploaded to S3
    • New session can start normally

Test Scenario 4: Docker Container Stop

  1. Run in Docker container
  2. docker stop <container>
  3. Verify:
    • SIGTERM received
    • Graceful shutdown occurs
    • Session finalized
    • Files uploaded

Known Limitations & TODOs

1. S3 Upload Implementation (TODO)

Current State: uploadFileToS3() is a placeholder Required:
bool DeviceManager::uploadFileToS3(
    const std::string& file_path,
    const std::string& presigned_url,
    const std::unordered_map<std::string, std::string>& form_fields) {

    // TODO: Implement using libcurl
    // 1. Initialize curl multi-part form
    // 2. Add all form_fields as form fields
    // 3. Add file content as last field
    // 4. POST to presigned_url
    // 5. Check response status (204 No Content = success)

    return true;
}
Dependencies: libcurl (already used by HttpClient) Example Implementation:
CURL* curl = curl_easy_init();
struct curl_httppost* formpost = nullptr;
struct curl_httppost* lastptr = nullptr;

// Add form fields
for (const auto& [key, value] : form_fields) {
    curl_formadd(&formpost, &lastptr,
                 CURLFORM_COPYNAME, key.c_str(),
                 CURLFORM_COPYCONTENTS, value.c_str(),
                 CURLFORM_END);
}

// Add file
curl_formadd(&formpost, &lastptr,
             CURLFORM_COPYNAME, "file",
             CURLFORM_FILE, file_path.c_str(),
             CURLFORM_END);

curl_easy_setopt(curl, CURLOPT_URL, presigned_url.c_str());
curl_easy_setopt(curl, CURLOPT_HTTPPOST, formpost);

CURLcode res = curl_easy_perform(curl);

curl_formfree(formpost);
curl_easy_cleanup(curl);

return res == CURLE_OK;

2. File Size Calculation

Current: Artifact size_bytes is always 0 (TODO in Session.cpp line 104) Fix:
bool Session::addArtifact(const std::string& name,
                         const std::string& type,
                         const std::string& file_path) {
    // ... existing code ...

    // Get actual file size
    if (std::filesystem::exists(file_path)) {
        artifact.size_bytes = std::filesystem::file_size(file_path);
    } else {
        artifact.size_bytes = 0;
    }

    // ... rest of code ...
}

3. Upload Retry Logic

Current: Single attempt per file Enhancement: Add retry with exponential backoff for failed uploads

4. Upload Progress Tracking

Enhancement: Callback mechanism for upload progress reporting

5. Concurrent Uploads

Current: Sequential uploads (one at a time) Enhancement: Parallel uploads with thread pool for faster completion

Performance Considerations

Upload Timeout

  • 30 seconds per file
  • For large files or slow networks, may need adjustment
  • Consider: timeout = std::max(30, size_mb * 10) seconds

Memory Usage

  • Files not loaded into memory (direct streaming upload)
  • Minimal overhead per artifact tracking

Network Usage

  • Direct device-to-S3 (no backend proxy)
  • Efficient use of AWS infrastructure
  • Bandwidth costs apply to device’s network

Security

Authentication

  • mTLS certificate validates device identity
  • Certificate ID matched against database
  • Session ID validated to prevent cross-device access

Authorization

  • Device can only upload to its own sessions
  • S3 path enforced: sessions/{device_id}/{session_id}/
  • Presigned URL expires after 1 hour

Data Integrity

  • S3 calculates MD5 checksums automatically
  • ETag returned can be stored for verification

Monitoring & Debugging

Log Messages

Success:
[DeviceManager] Uploading session files for: <session_id>
[DeviceManager] Requesting presigned URL for <filename>
[DeviceManager] Received file upload response
[DeviceManager] Successfully uploaded file: <filename>
[DeviceManager] All artifacts uploaded successfully
Failures:
[ERROR] Failed to request presigned URL for: <filename>
[ERROR] Timeout waiting for presigned URL response for: <filename>
[ERROR] Failed to upload file: <filename>
[WARN] Some session files failed to upload

Metrics to Track

  • Upload success rate per session
  • Average upload time per file size
  • Timeout frequency
  • Retry counts
  • Interrupted session recovery rate

Changelog

2025-01-12

  • ✅ Implemented uploadSessionFiles() method
  • ✅ Implemented requestPresignedUrl() method
  • ✅ Implemented onFileUploadResponse() callback
  • ✅ Added uploadFileToS3() placeholder (needs libcurl implementation)
  • ✅ Integrated file upload into finalizeSession()
  • ✅ Added SIGINT/SIGTERM signal handlers
  • ✅ Implemented graceful shutdown with file upload
  • ✅ Enhanced interrupted session cleanup with file upload
  • ✅ Added FileUploadRequest tracking structure
  • ✅ Documented complete implementation