Skip to main content

Session UUID Generation Workflow

Overview

This document describes the complete session creation workflow where the backend generates session UUIDs and communicates them back to devices via MQTT.

Architecture

┌─────────────────┐         MQTT Publish              ┌─────────────────┐
│                 │  roboticks/fleet/{device_id}/     │                 │
│  Device (SDK)   │ ──────── sessions ─────────────▶  │  AWS IoT Core   │
│                 │                                    │                 │
└─────────────────┘                                    └────────┬────────┘
         ▲                                                      │
         │                                                      │ IoT Rule
         │                                                      │ Trigger
         │                                                      ▼
         │                                             ┌─────────────────┐
         │                                             │                 │
         │                                             │ Lambda Function │
         │  MQTT Response                              │ session-handler │
         │  roboticks/fleet/{device_id}/               │                 │
         │    session/response                         └────────┬────────┘
         │                                                      │
         │                                                      │ INSERT
         │                                                      ▼
         │                                             ┌─────────────────┐
         │                                             │   PostgreSQL    │
         │                                             │   (generates    │
         └─────────────────────────────────────────────│    UUID via     │
                                                       │    default)     │
                                                       └─────────────────┘

Step-by-Step Flow

1. Device Initiates Session Creation

Device Code (C++):
void DeviceManager::createSession(const std::string& session_name) {
    nlohmann::json payload;
    payload["action"] = "create";
    payload["name"] = session_name;
    payload["status"] = "active";
    payload["metadata"] = {
        {"user", "operator_name"},
        {"location", "warehouse_a"}
    };
    payload["started_at"] = getCurrentTimestampMs();

    // NOTE: Do NOT include session_id - backend will generate it

    std::string topic = "roboticks/fleet/" + device_id_ + "/sessions";
    iot_client_->publish(topic, payload.dump(), AWS_MQTT_QOS_AT_LEAST_ONCE);

    // Wait for response on session/response topic
    session_creation_pending_ = true;
}
MQTT Message:
Topic: roboticks/fleet/ROBOT-123456/sessions
QoS: 1
Payload: {
  "action": "create",
  "name": "Morning Data Collection",
  "status": "active",
  "metadata": {
    "user": "operator_name",
    "location": "warehouse_a"
  },
  "started_at": 1699564800000
}

2. IoT Rule Processes Message

IoT Rule SQL:
SELECT
  principal() as certificate_arn,
  topic(3) as device_id,
  *
FROM 'roboticks/fleet/+/sessions'
Transformed Event:
{
  "certificate_arn": "arn:aws:iot:us-west-2:123456:cert/abc123...",
  "device_id": "ROBOT-123456",
  "action": "create",
  "name": "Morning Data Collection",
  "status": "active",
  "metadata": {...},
  "started_at": 1699564800000
}

3. Lambda Validates and Creates Session

Lambda Function (session-handler/index.py):
def lambda_handler(event, context):
    # 1. Extract and validate certificate
    certificate_arn = event.get("certificate_arn")
    certificate_id = certificate_arn.split("/")[-1]

    # 2. Extract device_id from topic
    device_id_from_topic = event.get("device_id")

    # 3. Query database for device info using certificate
    device_info = get_device_info_from_certificate(certificate_id)
    # Returns: {"device_id": 42, "device_id_dsn": "ROBOT-123456", "project_id": 10}

    # 4. SECURITY: Verify device_id matches certificate
    if device_info['device_id_dsn'] != device_id_from_topic:
        return {"statusCode": 403, "body": "Device ID mismatch"}

    # 5. Create session (backend generates UUID)
    result = create_session(device_info, event)

    # 6. Publish response back to device
    publish_session_response(device_info['device_id_dsn'], result)

    return {"statusCode": 200, "body": json.dumps(result)}

4. Database Generates UUID

SQL Execution:
def create_session(device_info, payload):
    # Add device info to metadata (preserved if device deleted)
    metadata = payload.get("metadata", {})
    metadata['device_name'] = "Warehouse Robot 42"
    metadata['device_id'] = "ROBOT-123456"

    # INSERT without session_id - database generates it
    cursor.execute("""
        INSERT INTO sessions (
            name,
            fleet_device_id,
            project_id,
            status,
            session_metadata,
            started_at,
            created_at
        ) VALUES (
            %s, %s, %s, %s, %s, %s, CURRENT_TIMESTAMP
        )
        RETURNING id, session_id
    """, (name, device_info["device_id"], device_info["project_id"],
          status, json.dumps(metadata), started_at))

    result = cursor.fetchone()
    db_id = result["id"]                    # 42 (integer primary key)
    session_id = result["session_id"]       # "a3ce880a-89c4..." (UUID)

    # Update S3 path with generated UUID
    s3_base_path = f"sessions/{session_id}/"
    cursor.execute("""
        UPDATE sessions SET s3_base_path = %s WHERE id = %s
    """, (s3_base_path, db_id))

    return {
        "session_id": session_id,
        "name": name,
        "status": status,
        "s3_base_path": s3_base_path
    }
Database Table:
CREATE TABLE sessions (
    id SERIAL PRIMARY KEY,
    session_id VARCHAR UNIQUE NOT NULL DEFAULT gen_random_uuid()::text,
    name VARCHAR NOT NULL,
    fleet_device_id INTEGER REFERENCES fleet_devices(id),
    project_id INTEGER REFERENCES projects(id) ON DELETE CASCADE,
    status VARCHAR,
    session_metadata JSONB,
    s3_base_path VARCHAR,
    started_at TIMESTAMP,
    completed_at TIMESTAMP,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

5. Lambda Publishes Response

Python Code:
def publish_session_response(device_id_dsn, session_data):
    import boto3

    iot_client = boto3.client('iot-data')
    response_topic = f"roboticks/fleet/{device_id_dsn}/session/response"

    response_payload = json.dumps(session_data)

    iot_client.publish(
        topic=response_topic,
        qos=1,
        payload=response_payload
    )
MQTT Response:
Topic: roboticks/fleet/ROBOT-123456/session/response
QoS: 1
Payload: {
  "session_id": "a3ce880a-89c4-f903-0f1d-53e23562f29b",
  "name": "Morning Data Collection",
  "status": "active",
  "s3_base_path": "sessions/a3ce880a-89c4-f903-0f1d-53e23562f29b/"
}

6. Device Receives and Stores UUID

Device Code (C++):
void DeviceManager::handleSessionResponse(const std::string& payload) {
    try {
        auto response = nlohmann::json::parse(payload);

        if (!response.contains("session_id")) {
            ROBOTICKS_ERROR(logger_, "Session response missing session_id");
            return;
        }

        std::string session_id = response["session_id"];
        std::string session_name = response["name"];
        std::string s3_base_path = response["s3_base_path"];

        ROBOTICKS_INFO(logger_, "Received session_id: {}", session_id);

        // Store for all subsequent operations
        current_session_id_ = session_id;
        current_session_name_ = session_name;
        current_s3_base_path_ = s3_base_path;

        // Update RemoteSinks with session_id for log tagging
        updateRemoteSinksWithSessionId(session_id);

        // Mark session creation as complete
        session_creation_pending_ = false;

        // Notify application that session is ready
        notifySessionCreated(session_id);

    } catch (const std::exception& e) {
        ROBOTICKS_ERROR(logger_, "Failed to parse session response: {}", e.what());
    }
}

7. Device Uses UUID for All Operations

Logs:
void DeviceManager::publishLogBatch(const std::vector<LogMessage>& logs) {
    nlohmann::json payload;
    payload["session_id"] = current_session_id_;  // Backend-provided UUID
    payload["logs"] = logs;
    payload["batch_size"] = logs.size();
    payload["timestamp_us"] = getCurrentTimestampUs();

    std::string topic = "roboticks/fleet/" + device_id_ + "/logs";
    iot_client_->publish(topic, payload.dump(), AWS_MQTT_QOS_AT_LEAST_ONCE);
}
File Uploads:
void DeviceManager::uploadFile(const std::string& filename) {
    nlohmann::json request;
    request["session_id"] = current_session_id_;  // Backend-provided UUID
    request["filename"] = filename;
    request["content_type"] = "application/octet-stream";

    std::string topic = "roboticks/fleet/" + device_id_ + "/file_upload/request";
    iot_client_->publish(topic, request.dump(), AWS_MQTT_QOS_AT_LEAST_ONCE);
}
Session Completion:
void DeviceManager::completeSession() {
    nlohmann::json payload;
    payload["action"] = "complete";
    payload["session_id"] = current_session_id_;  // Backend-provided UUID
    payload["completed_at"] = getCurrentTimestampMs();
    payload["duration_seconds"] = calculateDuration();

    std::string topic = "roboticks/fleet/" + device_id_ + "/sessions";
    iot_client_->publish(topic, payload.dump(), AWS_MQTT_QOS_AT_LEAST_ONCE);
}

Error Handling

Device Side

void DeviceManager::createSession(const std::string& name) {
    // Start timeout timer (30 seconds)
    session_creation_timeout_ = std::chrono::steady_clock::now() + 30s;

    // Publish create request
    publishSessionCreate(name);

    // Wait for response or timeout
    while (session_creation_pending_) {
        if (std::chrono::steady_clock::now() > session_creation_timeout_) {
            ROBOTICKS_ERROR(logger_, "Session creation timeout");
            session_creation_pending_ = false;
            return false;
        }
        std::this_thread::sleep_for(100ms);
    }

    return current_session_id_.empty() == false;
}

Backend Side

# Lambda returns appropriate status codes
if not certificate_arn:
    return {"statusCode": 400, "body": "Missing certificate_arn"}

if not device_info:
    return {"statusCode": 404, "body": "Device not found"}

if device_id_mismatch:
    return {"statusCode": 403, "body": "Device ID mismatch"}

if database_error:
    return {"statusCode": 500, "body": "Failed to create session"}

Sequence Diagram

Device              IoT Core         Lambda           PostgreSQL        Device
  |                    |                |                  |               |
  |--- CREATE -------->|                |                  |               |
  |  (no session_id)   |                |                  |               |
  |                    |                |                  |               |
  |                    |-- TRIGGER ---->|                  |               |
  |                    |                |                  |               |
  |                    |                |--- VALIDATE ---->|               |
  |                    |                |    certificate   |               |
  |                    |                |<--- DEVICE_INFO -|               |
  |                    |                |                  |               |
  |                    |                |--- INSERT ------>|               |
  |                    |                |  (no session_id) |               |
  |                    |                |                  |               |
  |                    |                |                  | gen_random_   |
  |                    |                |                  | uuid()        |
  |                    |                |                  |               |
  |                    |                |<-- RETURNING ----|               |
  |                    |                |    session_id    |               |
  |                    |                |                  |               |
  |                    |<-- PUBLISH ----|                  |               |
  |                    |    response    |                  |               |
  |                    |                |                  |               |
  |<--- RESPONSE ------|                |                  |               |
  |  (with UUID)       |                |                  |               |
  |                    |                |                  |               |
  | Store session_id   |                |                  |               |
  | Use for logs/files |                |                  |               |

Database Migration

The session_id field is added via Alembic migration:
# File: backend/alembic/versions/20251112_add_session_id_uuid.py

def upgrade():
    # Add session_id column (nullable initially)
    op.add_column('sessions', sa.Column('session_id', sa.String(), nullable=True))

    # Backfill existing sessions with UUIDs
    conn.execute(text("""
        UPDATE sessions
        SET session_id = gen_random_uuid()::text
        WHERE session_id IS NULL
    """))

    # Make non-nullable
    op.alter_column('sessions', 'session_id', nullable=False)

    # Add unique constraint and index
    op.create_unique_constraint('uq_sessions_session_id', 'sessions', ['session_id'])
    op.create_index('ix_sessions_session_id', 'sessions', ['session_id'])

Benefits of Backend-Generated UUIDs

  1. Single Source of Truth: Backend controls session identification
  2. Security: Prevents devices from creating predictable or conflicting IDs
  3. Auditability: All session IDs come from authenticated database inserts
  4. Consistency: UUIDs follow PostgreSQL’s UUID v4 standard
  5. Simplicity: Devices don’t need UUID generation libraries
  6. Debugging: Session creation is traceable through Lambda logs

References

  • Lambda Implementation: infrastructure/lambda/session-handler/index.py
  • Database Migration: backend/alembic/versions/20251112_add_session_id_uuid.py
  • Database Model: backend/app/models/session.py:20
  • API Endpoints: backend/app/api/v1/sessions.py
  • MQTT Topics: docs/MQTT_TOPIC_MIGRATION.md