Skip to main content

Roboticks SDK - File Upload Implementation Guide

Executive Summary

The roboticks-sdk is a sophisticated C++17 framework with:
  • 2500+ lines of session/device management code
  • Working HTTP client with file upload via libcurl
  • MQTT integration for AWS IoT Core
  • Artifact tracking system ready for extensions
  • Background thread patterns for async operations
  • Signal-safe shutdown with resource cleanup
The best place to add file upload is by extending the existing Session and DeviceManager classes. No new infrastructure needed.

Current State: What Works

Already Implemented

  1. HTTP multipart upload - HttpClient::uploadFile() exists and works
  2. Session artifact tracking - Session::addArtifact() stores file metadata
  3. Session persistence - Basic framework for saving/loading sessions
  4. Status publishing - MQTT events for session lifecycle
  5. Thread patterns - Log upload thread shows async batching strategy
  6. Error handling - HTTP error codes, JSON parsing with try/catch

Already Partially Done (TODOs)

  1. File size calculation - Session.cpp line 104
    artifact.size_bytes = 0;  // TODO: Get actual file size
    
  2. Session serialization - DeviceManager.cpp line 1247
    bool DeviceManager::saveSessionToFile(std::shared_ptr<Session> session) {
        // TODO: Implement actual file saving
    }
    
  3. Session persistence - DeviceManager.cpp line 1253
    bool DeviceManager::loadSessionsFromDisk() {
        // TODO: Implement actual loading from disk
    }
    
  4. Upload implementation - DeviceManager.cpp line 1162
    bool DeviceManager::uploadSession(const std::string& session_id) {
        // TODO: Implement actual upload to backend
    }
    

Implementation Plan

Phase 1: Quick Wins (1-2 hours)

1.1 Fix artifact size calculation File: /Users/mujacic/roboticks-sdk/packages/roboticks-device/src/Session.cpp
// Before (line 104):
artifact.size_bytes = 0;  // TODO: Get actual file size

// After:
artifact.size_bytes = getFileSize(file_path);  // NEW
Add helper method to Session class:
// Private helper in Session.hpp:
private:
    size_t getFileSize(const std::string& file_path) const;
Implement in Session.cpp:
size_t Session::getFileSize(const std::string& file_path) const {
    try {
        return std::filesystem::file_size(file_path);
    } catch (const std::exception& e) {
        LOG_ERROR("Failed to get file size for {}: {}", file_path, e.what());
        return 0;
    }
}
Benefits:
  • Accurate artifact metadata
  • Needed for upload progress reporting
  • 10 lines of code
  • Prerequisite for upload features

1.2 Implement Session JSON serialization File: /Users/mujacic/roboticks-sdk/packages/roboticks-device/src/DeviceManager.cpp Currently saveSessionToFile() is stubbed:
bool DeviceManager::saveSessionToFile(std::shared_ptr<Session> session) {
    LOG_INFO("Saving session to file: {}", session->getSessionId());
    return true;  // STUB - needs real implementation
}
Implement using existing Session::toJson():
bool DeviceManager::saveSessionToFile(std::shared_ptr<Session> session) {
    try {
        fs::path session_dir = fs::path(config_.storage_path) / "active" / session->getSessionId();
        fs::create_directories(session_dir);

        fs::path session_file = session_dir / "session.json";
        std::ofstream file(session_file);
        if (!file) {
            LOG_ERROR("Failed to open session file for writing: {}", session_file.string());
            return false;
        }

        file << session->toJson();
        file.close();

        LOG_INFO("Session saved to file: {}", session_file.string());
        return true;
    } catch (const std::exception& e) {
        LOG_ERROR("Failed to save session: {}", e.what());
        return false;
    }
}
Benefits:
  • Session state survives restart
  • Enables recovery of incomplete sessions
  • Prerequisite for upload queue persistence
  • ~20 lines of code

Phase 2: Core Upload Feature (3-4 hours)

2.1 Add upload methods to Session class File: /Users/mujacic/roboticks-sdk/packages/roboticks-device/include/roboticks/device/Session.hpp Add to Session class:
public:
    /**
     * @brief Upload single artifact to backend
     * @param artifact_name Name of artifact to upload
     * @param api_endpoint Full API endpoint URL
     * @param http_client Reference to HTTP client
     * @return true if upload successful
     */
    bool uploadArtifact(const std::string& artifact_name,
                       const std::string& api_endpoint,
                       messaging::HttpClient& http_client);

    /**
     * @brief Upload all artifacts in session
     * @param api_endpoint Base API endpoint
     * @param http_client Reference to HTTP client
     * @param progress_callback Optional progress callback (completed, total)
     * @return true if all uploads successful
     */
    bool uploadAllArtifacts(const std::string& api_endpoint,
                           messaging::HttpClient& http_client,
                           std::function<void(size_t, size_t)> progress_callback = nullptr);

private:
    std::unordered_map<std::string, uint64_t> artifact_upload_sizes_;  // Track sizes
    std::unordered_map<std::string, bool> artifact_upload_status_;     // Track success
Implement in Session.cpp:
bool Session::uploadArtifact(const std::string& artifact_name,
                            const std::string& api_endpoint,
                            messaging::HttpClient& http_client) {
    std::lock_guard<std::mutex> lock(mutex_);

    // Find artifact
    auto artifact_it = std::find_if(artifacts_.begin(), artifacts_.end(),
        [&artifact_name](const SessionArtifact& a) { return a.name == artifact_name; });

    if (artifact_it == artifacts_.end()) {
        LOG_ERROR("Artifact not found: {}", artifact_name);
        return false;
    }

    const auto& artifact = *artifact_it;

    LOG_INFO("Uploading artifact: {} (size: {} bytes)", artifact.name, artifact.size_bytes);

    // Upload via HTTP multipart
    auto progress_cb = [&artifact](uint64_t uploaded, uint64_t total) {
        int percent = total > 0 ? (uploaded * 100) / total : 0;
        LOG_DEBUG("Upload progress: {}%", percent);
    };

    std::string url = api_endpoint + "/api/v1/sessions/" + session_id_ + "/artifacts";
    messaging::HttpResponse response = http_client.uploadFile(
        url,
        artifact.file_path,
        artifact.name,
        progress_cb
    );

    if (response.isSuccess()) {
        LOG_INFO("Artifact uploaded successfully: {}", artifact.name);
        artifact_upload_status_[artifact.name] = true;
        return true;
    } else {
        LOG_ERROR("Failed to upload artifact: {} (HTTP {})", artifact.name, response.status_code);
        artifact_upload_status_[artifact.name] = false;
        return false;
    }
}

bool Session::uploadAllArtifacts(const std::string& api_endpoint,
                               messaging::HttpClient& http_client,
                               std::function<void(size_t, size_t)> progress_callback) {
    std::lock_guard<std::mutex> lock(mutex_);

    if (artifacts_.empty()) {
        LOG_INFO("No artifacts to upload for session: {}", session_id_);
        return true;
    }

    LOG_INFO("Uploading {} artifacts for session: {}", artifacts_.size(), session_id_);

    size_t completed = 0;
    bool all_success = true;

    for (auto& artifact : artifacts_) {
        if (!uploadArtifact(artifact.name, api_endpoint, http_client)) {
            all_success = false;
        }
        completed++;

        if (progress_callback) {
            progress_callback(completed, artifacts_.size());
        }
    }

    return all_success;
}
Benefits:
  • Encapsulates upload logic in Session
  • Tracks individual artifact status
  • Supports progress callbacks
  • ~70 lines of code
  • Reusable from anywhere with Session reference

2.2 Integrate with DeviceManager upload workflow File: /Users/mujacic/roboticks-sdk/packages/roboticks-device/src/DeviceManager.cpp Update the existing uploadSession() method (currently stubbed at line 1153):
bool DeviceManager::uploadSession(const std::string& session_id) {
    auto session = getSession(session_id);
    if (!session) {
        LOG_ERROR("Session not found: {}", session_id);
        return false;
    }

    LOG_INFO("Uploading session: {}", session_id);

    // Mark as uploading
    session->setStatus(SessionStatus::UPLOADING);  // TODO: add setStatus() method

    // Publish upload started event
    if (iot_client_ && iot_client_->isConnected()) {
        std::lock_guard<std::mutex> lock(iot_mutex_);
        json status_msg = {
            {"action", "upload_started"},
            {"session_id", session_id},
            {"device_id", config_.dsn},
            {"timestamp_us", std::chrono::duration_cast<std::chrono::microseconds>(
                std::chrono::system_clock::now().time_since_epoch()).count()}
        };
        iot_client_->publish(topics::FLEET_SESSIONS, status_msg.dump(), 1);
    }

    // Upload all artifacts
    bool success = session->uploadAllArtifacts(
        std::string(ROBOTICKS_API_URL),
        http_client_,
        [session_id](size_t completed, size_t total) {
            LOG_DEBUG("Session {} upload progress: {}/{}", session_id, completed, total);
        }
    );

    if (success) {
        // Mark as uploaded
        session->setStatus(SessionStatus::UPLOADED);  // TODO: add setStatus() method

        // Publish upload completed event
        if (iot_client_ && iot_client_->isConnected()) {
            std::lock_guard<std::mutex> lock(iot_mutex_);
            json status_msg = {
                {"action", "upload_completed"},
                {"session_id", session_id},
                {"device_id", config_.dsn},
                {"status", "uploaded"},
                {"timestamp_us", std::chrono::duration_cast<std::chrono::microseconds>(
                    std::chrono::system_clock::now().time_since_epoch()).count()}
            };
            iot_client_->publish(topics::FLEET_SESSIONS, status_msg.dump(), 1);
        }

        LOG_INFO("Session uploaded successfully: {}", session_id);
        triggerEvent("uploaded", session_id);
        return true;
    } else {
        LOG_ERROR("Session upload failed: {}", session_id);

        // Retry will happen on next cycle via uploadCompletedSessions()
        return false;
    }
}
Update uploadCompletedSessions() to try uploads:
void DeviceManager::uploadCompletedSessions() {
    auto sessions = getAllSessions();

    for (const auto& session : sessions) {
        SessionStatus status = session->getStatus();

        // Upload completed sessions
        if (status == SessionStatus::COMPLETED) {
            LOG_INFO("Uploading session: {}", session->getSessionId());

            if (uploadSession(session->getSessionId())) {
                LOG_INFO("Session uploaded successfully");
            } else {
                LOG_WARN("Failed to upload session, will retry later");
                // Retry will happen on next cycle
            }
        }
        // Skip already uploaded sessions
        else if (status == SessionStatus::UPLOADED) {
            LOG_DEBUG("Session already uploaded: {}", session->getSessionId());
        }
    }
}
Add setStatus() method to Session class for proper state transitions:
// In Session.hpp (public):
void setStatus(SessionStatus new_status) { status_.store(new_status); }

// In Session.cpp (if you want logging):
void Session::setStatus(SessionStatus new_status) {
    SessionStatus old_status = status_.exchange(new_status);
    LOG_DEBUG("Session {} status changed: {} → {}",
        session_id_, static_cast<int>(old_status), static_cast<int>(new_status));
}
Benefits:
  • Automatic upload on session completion
  • Publish status events via MQTT
  • Retry logic built-in
  • Integration with existing DeviceManager cycle

Phase 3: Advanced Features (2-3 hours, optional)

3.1 Background upload thread (like log uploader) For high-volume scenarios, create a dedicated uploader:
// In DeviceManager.hpp:
private:
    std::thread artifact_upload_thread_;
    std::atomic<bool> artifact_upload_running_;
    std::queue<std::string> artifact_upload_queue_;
    std::mutex artifact_upload_mutex_;
    std::condition_variable artifact_upload_cv_;

    void artifactUploadThread();
    void enqueueSessionForUpload(const std::string& session_id);
This allows:
  • Parallel uploading without blocking main loop
  • Rate limiting per device
  • Better resource utilization for large sessions
  • Same pattern as log upload thread
3.2 Retry logic with exponential backoff
// In Session class:
struct UploadRetry {
    std::string artifact_name;
    int retry_count;
    std::chrono::steady_clock::time_point next_retry_time;
};

std::vector<UploadRetry> pending_retries_;

bool uploadWithRetry(const std::string& artifact_name,
                    messaging::HttpClient& http_client,
                    int max_retries = 3);
3.3 Offline buffering Queue uploads to persistent storage if backend unavailable:
// In DeviceManager.hpp:
std::queue<std::string> offline_upload_queue_;  // Persistent queue
bool saveUploadQueue();
bool loadUploadQueue();
void processOfflineQueue();

File Change Summary

Modified Files

FileChangesLOC
Session.hppAdd upload methods, size helper30
Session.cppImplement upload logic, size calc80
DeviceManager.hppAdd upload thread members (optional)5
DeviceManager.cppReal uploadSession(), integrate with cycle60

Total Code to Write: ~150-200 LOC

Total Time Estimate:

  • Phase 1 (Quick wins): 1-2 hours
  • Phase 2 (Core feature): 3-4 hours
  • Phase 3 (Advanced): 2-3 hours (optional)

Testing Strategy

Unit Tests

// Test Session::uploadArtifact()
TEST(SessionUpload, SingleArtifactUpload) {
    auto session = std::make_shared<Session>("test-session");
    session->addArtifact("test.log", "log", "/tmp/test.log");

    // Mock HTTP client
    MockHttpClient mock_client;
    EXPECT_CALL(mock_client, uploadFile)
        .WillOnce(Return(HttpResponse{.status_code = 200}));

    EXPECT_TRUE(session->uploadArtifact("test.log", "http://localhost", mock_client));
}

// Test Session::uploadAllArtifacts()
TEST(SessionUpload, MultipleArtifacts) {
    auto session = std::make_shared<Session>("test-session");
    session->addArtifact("log1.log", "log", "/tmp/log1.log");
    session->addArtifact("log2.log", "log", "/tmp/log2.log");

    // Both succeed
    MockHttpClient mock_client;
    EXPECT_CALL(mock_client, uploadFile)
        .Times(2)
        .WillRepeatedly(Return(HttpResponse{.status_code = 200}));

    EXPECT_TRUE(session->uploadAllArtifacts("http://localhost", mock_client));
}

// Test file size calculation
TEST(Session, GetFileSizeLookup) {
    auto session = std::make_shared<Session>("test");
    // Create test file
    std::ofstream file("/tmp/test_1kb.log");
    file.write(std::string(1024, 'x').c_str(), 1024);
    file.close();

    session->addArtifact("test_1kb.log", "log", "/tmp/test_1kb.log");
    auto artifacts = session->getArtifacts();

    EXPECT_EQ(artifacts[0].size_bytes, 1024);
}

Integration Tests

TEST(DeviceManager, UploadCompletedSession) {
    // Setup
    DeviceManager& manager = DeviceManager::getInstance();
    auto session = manager.createSession({}, false);

    // Complete session
    session->complete();

    // Trigger upload
    manager.uploadSession(session->getSessionId());

    // Verify MQTT event published
    // Verify artifacts uploaded
    // Verify session status is UPLOADED
}

Manual Testing

  1. Create test device config
  2. Start device manager with debug logging
  3. Let it collect artifacts
  4. Verify HTTP requests logged
  5. Check backend received files

Deployment Checklist

  • Unit tests pass (>90% coverage on upload code)
  • Integration tests pass
  • Manual testing on Jetson device
  • Graceful shutdown with pending uploads
  • MQTT status events publishing correctly
  • File permissions correct (600 for private keys)
  • Error handling for network failures
  • Progress callbacks working
  • Cleanup old uploaded sessions
  • Thread safety with stress tests

Common Pitfalls to Avoid

  1. Forgetting to lock mutex - Always use std::lock_guard when accessing shared state
  2. Blocking main thread - Keep uploads in background or async
  3. Not flushing on shutdown - Ensure graceful SIGTERM flushes pending uploads
  4. Hardcoding URLs - Use config.storage_path, ROBOTICKS_API_URL consistently
  5. No error recovery - Retry failed uploads, don’t just log and exit
  6. Missing size validation - Check file exists before upload
  7. Race conditions - Session state changes must be atomic or protected

Success Criteria

Final implementation should:
  • Upload all session artifacts to backend on completion
  • Support progress tracking
  • Handle network failures gracefully
  • Publish MQTT status events
  • Clean shutdown without data loss
  • Thread-safe concurrent uploads
  • Respect device storage limits
  • Pass all automated tests
  • Scale to multiple large sessions