Skip to main content

Commands & Rollouts Feature Specification

Table of Contents

  1. Overview
  2. Commands Feature
  3. Rollouts Feature
  4. Tunnel Feature
  5. Device Groups
  6. Data Flow
  7. Technical Specifications
  8. Implementation Checklist

Overview

This document describes the Commands and Rollouts features for the Roboticks platform. These features enable remote device management, command execution, and controlled deployment of packages to fleet devices via MQTT (AWS IoT Core).

Key Capabilities

  • Commands: Send commands to devices for diagnostics, control, and management
  • Rollouts: Controlled deployment of packages to device fleets with progressive strategies
  • Tunnel: Secure remote access to devices via SSH/terminal with web-based interface
  • Device Groups: Organize devices for targeted commands and rollouts
  • Audit Trail: Complete history of all commands, deployments, and tunnel sessions with user attribution

Commands Feature

Overview

The Commands feature allows users to send commands to individual or multiple devices via MQTT. Commands can be predefined (system operations) or custom (user-defined scripts).

Database Models

DeviceCommand Model

File: backend/app/models/command.py
from datetime import datetime
from typing import TYPE_CHECKING, Any, Optional
import enum

from sqlalchemy import JSON, DateTime, Enum, ForeignKey, Integer, String, Text
from sqlalchemy.orm import Mapped, mapped_column, relationship
from sqlalchemy.sql import func

from app.db.base import Base

if TYPE_CHECKING:
    from app.models.fleet import FleetDevice
    from app.models.project import Project
    from app.models.user import User


class CommandType(str, enum.Enum):
    """Predefined command types"""
    # System commands
    REBOOT = "reboot"
    SHUTDOWN = "shutdown"
    RESTART_SERVICE = "restart_service"
    UPDATE_FIRMWARE = "update_firmware"

    # Deployment commands
    DEPLOY_PACKAGE = "deploy_package"
    ROLLBACK_DEPLOYMENT = "rollback_deployment"

    # Session commands
    START_SESSION = "start_session"
    STOP_SESSION = "stop_session"

    # Diagnostic commands
    RUN_DIAGNOSTIC = "run_diagnostic"

    # Custom
    CUSTOM = "custom"


class CommandStatus(str, enum.Enum):
    """Command execution status"""
    PENDING = "pending"       # Command created, not yet sent
    SENT = "sent"             # Published to MQTT
    DELIVERED = "delivered"   # Device acknowledged receipt
    RUNNING = "running"       # Device executing command
    COMPLETED = "completed"   # Successfully executed
    FAILED = "failed"         # Execution failed
    TIMEOUT = "timeout"       # No response within timeout period
    CANCELLED = "cancelled"   # User cancelled before execution


class DeviceCommand(Base):
    """Commands sent to fleet devices"""
    __tablename__ = "device_commands"

    id: Mapped[int] = mapped_column(Integer, primary_key=True, index=True)
    project_id: Mapped[int] = mapped_column(Integer, ForeignKey("projects.id"), nullable=False, index=True)
    device_id: Mapped[int] = mapped_column(Integer, ForeignKey("fleet_devices.id"), nullable=False, index=True)

    # Command details
    command_type: Mapped[CommandType] = mapped_column(
        Enum(CommandType, values_callable=lambda x: [e.value for e in x]),
        nullable=False,
        index=True
    )
    command_name: Mapped[str] = mapped_column(String(255), nullable=False)
    command_payload: Mapped[dict[str, Any]] = mapped_column(JSON, nullable=False)

    # Example payloads:
    # {"service": "roboticks-agent"} for restart_service
    # {"deployment_id": 123, "download_url": "https://..."} for deploy_package
    # {"script": "systemctl status roboticks"} for custom

    # Execution tracking
    status: Mapped[CommandStatus] = mapped_column(
        Enum(CommandStatus, values_callable=lambda x: [e.value for e in x]),
        default=CommandStatus.PENDING,
        index=True
    )
    priority: Mapped[int] = mapped_column(Integer, default=5)  # 1=highest, 10=lowest

    # MQTT tracking
    mqtt_message_id: Mapped[Optional[str]] = mapped_column(String(255), nullable=True)
    mqtt_topic: Mapped[Optional[str]] = mapped_column(String(500), nullable=True)

    # Timing
    timeout_seconds: Mapped[int] = mapped_column(Integer, default=300)  # 5 min default
    sent_at: Mapped[Optional[datetime]] = mapped_column(DateTime(timezone=True), nullable=True)
    delivered_at: Mapped[Optional[datetime]] = mapped_column(DateTime(timezone=True), nullable=True)
    started_at: Mapped[Optional[datetime]] = mapped_column(DateTime(timezone=True), nullable=True)
    completed_at: Mapped[Optional[datetime]] = mapped_column(DateTime(timezone=True), nullable=True)

    # Response from device
    response_payload: Mapped[Optional[dict[str, Any]]] = mapped_column(JSON, nullable=True)
    error_message: Mapped[Optional[str]] = mapped_column(Text, nullable=True)
    exit_code: Mapped[Optional[int]] = mapped_column(Integer, nullable=True)

    # Metadata and audit trail
    created_by: Mapped[int] = mapped_column(Integer, ForeignKey("users.id"), nullable=False)
    notes: Mapped[Optional[str]] = mapped_column(Text, nullable=True)

    # Timestamps
    created_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), server_default=func.now())
    updated_at: Mapped[datetime] = mapped_column(
        DateTime(timezone=True),
        server_default=func.now(),
        onupdate=func.now()
    )

    # Relationships
    project: Mapped["Project"] = relationship("Project", back_populates="device_commands")
    device: Mapped["FleetDevice"] = relationship("FleetDevice", back_populates="commands")
    creator: Mapped["User"] = relationship("User")

API Endpoints

File: backend/app/api/v1/commands.py
# List predefined command templates
GET /api/v1/organizations/{org_slug}/projects/{project_slug}/commands/templates
Response: [
    {
        "type": "reboot",
        "name": "Reboot Device",
        "description": "Safely reboot the device",
        "parameters": [],
        "estimated_duration_seconds": 60,
        "icon": "restart_alt"
    },
    {
        "type": "deploy_package",
        "name": "Deploy Package",
        "description": "Deploy a package to the device",
        "parameters": [
            {"name": "deployment_id", "type": "int", "required": true, "description": "Deployment package ID"}
        ],
        "estimated_duration_seconds": 300,
        "icon": "cloud_upload"
    },
    {
        "type": "custom",
        "name": "Custom Command",
        "description": "Execute a custom shell command",
        "parameters": [
            {"name": "script", "type": "string", "required": true, "description": "Shell script to execute"}
        ],
        "estimated_duration_seconds": null,
        "icon": "code"
    }
]

# Send command to single device
POST /api/v1/organizations/{org_slug}/projects/{project_slug}/commands
Body: {
    "device_id": 123,
    "command_type": "reboot",
    "command_payload": {},
    "priority": 5,
    "timeout_seconds": 300,
    "notes": "Scheduled maintenance reboot"
}
Response: {
    "id": 1001,
    "status": "pending",
    "created_at": "2025-11-14T10:00:00Z"
}

# Send command to multiple devices (bulk operation)
POST /api/v1/organizations/{org_slug}/projects/{project_slug}/commands/bulk
Body: {
    "device_ids": [123, 456, 789],
    "command_type": "collect_logs",
    "command_payload": {"log_types": ["system", "application"]},
    "priority": 3,
    "timeout_seconds": 600
}
Response: {
    "commands_created": 3,
    "command_ids": [1001, 1002, 1003]
}

# List commands with filters (includes full audit trail)
GET /api/v1/organizations/{org_slug}/projects/{project_slug}/commands
Query params:
  - device_id: int (optional) - Filter by device
  - status: CommandStatus (optional) - Filter by status
  - command_type: CommandType (optional) - Filter by type
  - created_by: int (optional) - Filter by user who created
  - created_after: datetime (optional) - Filter by creation date
  - created_before: datetime (optional)
  - page: int (default: 1)
  - page_size: int (default: 20, max: 100)
Response: {
    "items": [
        {
            "id": 1001,
            "command_type": "reboot",
            "command_name": "Reboot Device",
            "device_id": 123,
            "device_name": "Bot-01",
            "status": "completed",
            "created_by": 5,
            "creator_email": "john@example.com",
            "created_at": "2025-11-14T10:00:00Z",
            "completed_at": "2025-11-14T10:01:30Z",
            "duration_seconds": 90
        }
    ],
    "total": 150,
    "page": 1,
    "page_size": 20
}

# Get command details
GET /api/v1/organizations/{org_slug}/projects/{project_slug}/commands/{command_id}
Response: {
    "id": 1001,
    "command_type": "deploy_package",
    "command_name": "Deploy v2.1.0",
    "command_payload": {
        "deployment_id": 456,
        "download_url": "https://api.roboticks.io/..."
    },
    "status": "running",
    "priority": 2,
    "device": {
        "id": 123,
        "name": "Bot-01",
        "device_id": "bot-01-abc123",
        "status": "ONLINE"
    },
    "created_by": 5,
    "creator_email": "john@example.com",
    "notes": "Production rollout stage 1",
    "sent_at": "2025-11-14T10:00:00Z",
    "delivered_at": "2025-11-14T10:00:02Z",
    "started_at": "2025-11-14T10:00:05Z",
    "response_payload": {
        "progress": 75,
        "status": "installing"
    },
    "created_at": "2025-11-14T09:59:55Z"
}

# Cancel pending command
DELETE /api/v1/organizations/{org_slug}/projects/{project_slug}/commands/{command_id}
Response: {
    "id": 1001,
    "status": "cancelled",
    "cancelled_at": "2025-11-14T10:05:00Z"
}

MQTT Service

File: backend/app/services/mqtt_command_service.py
import json
from datetime import datetime
from typing import Optional

import boto3

from app.core.config import settings
from app.models.command import CommandStatus, DeviceCommand
from app.models.fleet import FleetDevice


class MQTTCommandService:
    """Service for sending commands to devices via AWS IoT Core MQTT"""

    def __init__(self):
        self.iot_client = boto3.client('iot-data', region_name=settings.AWS_REGION)

    def send_command(self, command: DeviceCommand, device: FleetDevice) -> bool:
        """
        Publish command to device's command topic.

        Topic format: roboticks/{project_id}/devices/{device_id}/commands
        QoS: 2 (exactly once delivery)

        Args:
            command: DeviceCommand instance
            device: FleetDevice instance

        Returns:
            True if successfully published, False otherwise
        """
        topic = f"roboticks/{device.project_id}/devices/{device.device_id}/commands"

        payload = {
            "command_id": command.id,
            "command_type": command.command_type,
            "payload": command.command_payload,
            "timeout_seconds": command.timeout_seconds,
            "priority": command.priority,
            "timestamp": datetime.utcnow().isoformat()
        }

        try:
            response = self.iot_client.publish(
                topic=topic,
                qos=2,  # Exactly once delivery
                payload=json.dumps(payload)
            )

            # Update command status
            command.status = CommandStatus.SENT
            command.sent_at = datetime.utcnow()
            command.mqtt_topic = topic
            command.mqtt_message_id = response.get('MessageId')

            return True

        except Exception as e:
            command.status = CommandStatus.FAILED
            command.error_message = f"Failed to publish to MQTT: {str(e)}"
            return False

    def process_command_response(
        self,
        project_id: int,
        device_id: str,
        command_id: int,
        response_data: dict
    ) -> None:
        """
        Process command response from device.

        Called by webhook/Lambda when device publishes to response topic:
        roboticks/{project_id}/devices/{device_id}/commands/response

        Response data format:
        {
            "command_id": 1001,
            "status": "completed",  # or "running", "failed"
            "response_payload": {...},
            "error_message": "...",  # if failed
            "exit_code": 0,
            "timestamp": "2025-11-14T10:00:00Z"
        }
        """
        # Implementation will update command status in database
        # This is called by a Lambda function or webhook endpoint
        pass


# Singleton instance
mqtt_command_service = MQTTCommandService()

Frontend Components

Commands Page

File: frontend/src/pages/Commands.tsx Features:
  • Table view of all commands in the project
  • Filters: Status, Type, Device, Date Range, User
  • Search by command name or device name
  • Real-time status updates (polling every 5 seconds for active commands)
  • Click row to view detailed command info
  • Bulk operations: Cancel multiple pending commands
Columns:
  1. Command (icon + name)
  2. Type (chip with color)
  3. Device (name + status indicator)
  4. Status (with progress for running commands)
  5. Created By (user name)
  6. Created At (relative time)
  7. Duration (calculated from timestamps)
  8. Actions (View Details, Cancel if pending)

New Command Dialog

Wizard Steps:
  1. Select Target(s)
    • Single device: Dropdown with search
    • Multiple devices: Multi-select with device groups
    • Show device online/offline status
  2. Choose Command
    • Card-based selection of predefined commands
    • Each card shows: Icon, Name, Description, Est. Duration
    • “Custom Command” option at the end
  3. Configure
    • Dynamic form based on command type
    • For deploy_package: Deployment selector
    • For custom: Code editor for script
    • Priority slider: Low (7-10), Normal (4-6), High (1-3)
    • Timeout setting with presets
  4. Review & Send
    • Summary of all selections
    • Warning if any devices are offline
    • Estimated total execution time
    • Option to add notes

Command Detail Dialog

Tabs:
  1. Overview: Status, timestamps, creator, device info
  2. Payload: JSON viewer for command_payload
  3. Response: JSON viewer for response_payload (if available)
  4. Logs: Error messages and execution details

Rollouts Feature

Overview

Rollouts enable controlled deployment of packages to device fleets with progressive strategies, health monitoring, and automatic rollback capabilities.

Database Models

DeviceGroup Model

File: backend/app/models/device_group.py
from datetime import datetime
from typing import TYPE_CHECKING, Any, Optional

from sqlalchemy import JSON, Boolean, DateTime, ForeignKey, Integer, String, Text
from sqlalchemy.orm import Mapped, mapped_column, relationship
from sqlalchemy.sql import func

from app.db.base import Base

if TYPE_CHECKING:
    from app.models.project import Project
    from app.models.rollout import Rollout
    from app.models.user import User


class DeviceGroup(Base):
    """Logical grouping of devices for targeted operations"""
    __tablename__ = "device_groups"

    id: Mapped[int] = mapped_column(Integer, primary_key=True, index=True)
    project_id: Mapped[int] = mapped_column(Integer, ForeignKey("projects.id"), nullable=False, index=True)

    name: Mapped[str] = mapped_column(String(255), nullable=False)
    description: Mapped[Optional[str]] = mapped_column(Text, nullable=True)

    # Group membership definition
    selection_type: Mapped[str] = mapped_column(String(50), nullable=False)
    # Values: "manual", "tag_based", "query"

    # For manual groups: explicit device list
    device_ids: Mapped[Optional[list[int]]] = mapped_column(JSON, nullable=True)

    # For tag_based groups: tag filters
    tag_filters: Mapped[Optional[dict[str, Any]]] = mapped_column(JSON, nullable=True)
    # Example: {"device_type": "DRONE", "site": "warehouse-1", "environment": "production"}

    # For query groups: dynamic query definition
    query_definition: Mapped[Optional[dict[str, Any]]] = mapped_column(JSON, nullable=True)
    # Example: {"where": {"status": "ONLINE", "firmware_version": ">=1.0.0"}}

    # Metadata
    created_by: Mapped[int] = mapped_column(Integer, ForeignKey("users.id"), nullable=False)
    is_active: Mapped[bool] = mapped_column(Boolean, default=True)

    created_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), server_default=func.now())
    updated_at: Mapped[datetime] = mapped_column(
        DateTime(timezone=True),
        server_default=func.now(),
        onupdate=func.now()
    )

    # Relationships
    project: Mapped["Project"] = relationship("Project", back_populates="device_groups")
    rollouts: Mapped[list["Rollout"]] = relationship("Rollout", back_populates="device_group")
    creator: Mapped["User"] = relationship("User")

Rollout Model

File: backend/app/models/rollout.py
import enum
from datetime import datetime
from typing import TYPE_CHECKING, Any, Optional

from sqlalchemy import JSON, Boolean, DateTime, Enum, Float, ForeignKey, Integer, String, Text
from sqlalchemy.orm import Mapped, mapped_column, relationship
from sqlalchemy.sql import func

from app.db.base import Base

if TYPE_CHECKING:
    from app.models.deployment import Deployment
    from app.models.device_group import DeviceGroup
    from app.models.project import Project
    from app.models.user import User


class RolloutStrategy(str, enum.Enum):
    """Deployment strategy types"""
    ALL_AT_ONCE = "all_at_once"       # Deploy to all devices simultaneously
    PROGRESSIVE = "progressive"        # Gradual rollout with stages
    CANARY = "canary"                  # Small test group first, then all
    BLUE_GREEN = "blue_green"          # 50/50 split for A/B testing


class RolloutStatus(str, enum.Enum):
    """Rollout execution status"""
    DRAFT = "draft"                    # Created but not started
    SCHEDULED = "scheduled"            # Scheduled for future
    IN_PROGRESS = "in_progress"        # Currently rolling out
    PAUSED = "paused"                  # Temporarily paused
    COMPLETED = "completed"            # Successfully completed
    FAILED = "failed"                  # Failed (with optional rollback)
    CANCELLED = "cancelled"            # User cancelled


class Rollout(Base):
    """Controlled deployment of packages to device fleets"""
    __tablename__ = "rollouts"

    id: Mapped[int] = mapped_column(Integer, primary_key=True, index=True)
    project_id: Mapped[int] = mapped_column(Integer, ForeignKey("projects.id"), nullable=False, index=True)

    name: Mapped[str] = mapped_column(String(255), nullable=False)
    description: Mapped[Optional[str]] = mapped_column(Text, nullable=True)

    # Source template (if created from template)
    template_id: Mapped[Optional[int]] = mapped_column(
        Integer,
        ForeignKey("rollout_templates.id"),
        nullable=True
    )

    # Target
    deployment_id: Mapped[int] = mapped_column(
        Integer,
        ForeignKey("deployments.id"),
        nullable=False,
        index=True
    )
    device_group_id: Mapped[Optional[int]] = mapped_column(
        Integer,
        ForeignKey("device_groups.id"),
        nullable=True
    )
    # If device_group_id is null, target_device_ids must be set
    target_device_ids: Mapped[Optional[list[int]]] = mapped_column(JSON, nullable=True)

    # Strategy
    strategy: Mapped[RolloutStrategy] = mapped_column(
        Enum(RolloutStrategy, values_callable=lambda x: [e.value for e in x]),
        nullable=False
    )
    strategy_config: Mapped[dict[str, Any]] = mapped_column(JSON, nullable=False)
    # Example for progressive:
    # {
    #     "stages": [
    #         {"percentage": 10, "wait_minutes": 30},
    #         {"percentage": 50, "wait_minutes": 60},
    #         {"percentage": 100, "wait_minutes": 0}
    #     ],
    #     "health_check_enabled": true,
    #     "auto_rollback_on_failure": true,
    #     "failure_threshold_percentage": 20
    # }

    # Status tracking
    status: Mapped[RolloutStatus] = mapped_column(
        Enum(RolloutStatus, values_callable=lambda x: [e.value for e in x]),
        default=RolloutStatus.DRAFT,
        index=True
    )
    current_stage: Mapped[int] = mapped_column(Integer, default=0)

    # Progress metrics
    total_devices: Mapped[int] = mapped_column(Integer, default=0)
    devices_targeted: Mapped[int] = mapped_column(Integer, default=0)
    devices_succeeded: Mapped[int] = mapped_column(Integer, default=0)
    devices_failed: Mapped[int] = mapped_column(Integer, default=0)
    devices_pending: Mapped[int] = mapped_column(Integer, default=0)

    # Health monitoring
    health_check_enabled: Mapped[bool] = mapped_column(Boolean, default=True)
    health_check_passing_percentage: Mapped[Optional[float]] = mapped_column(Float, nullable=True)

    # Timing
    scheduled_at: Mapped[Optional[datetime]] = mapped_column(DateTime(timezone=True), nullable=True)
    started_at: Mapped[Optional[datetime]] = mapped_column(DateTime(timezone=True), nullable=True)
    paused_at: Mapped[Optional[datetime]] = mapped_column(DateTime(timezone=True), nullable=True)
    completed_at: Mapped[Optional[datetime]] = mapped_column(DateTime(timezone=True), nullable=True)

    # Rollback
    rollback_deployment_id: Mapped[Optional[int]] = mapped_column(
        Integer,
        ForeignKey("deployments.id"),
        nullable=True
    )
    auto_rollback_enabled: Mapped[bool] = mapped_column(Boolean, default=False)
    rollback_triggered_at: Mapped[Optional[datetime]] = mapped_column(
        DateTime(timezone=True),
        nullable=True
    )

    # Metadata
    created_by: Mapped[int] = mapped_column(Integer, ForeignKey("users.id"), nullable=False)
    notes: Mapped[Optional[str]] = mapped_column(Text, nullable=True)

    created_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), server_default=func.now())
    updated_at: Mapped[datetime] = mapped_column(
        DateTime(timezone=True),
        server_default=func.now(),
        onupdate=func.now()
    )

    # Relationships
    project: Mapped["Project"] = relationship("Project", back_populates="rollouts")
    deployment: Mapped["Deployment"] = relationship("Deployment", foreign_keys=[deployment_id])
    rollback_deployment: Mapped[Optional["Deployment"]] = relationship(
        "Deployment",
        foreign_keys=[rollback_deployment_id]
    )
    device_group: Mapped[Optional["DeviceGroup"]] = relationship(
        "DeviceGroup",
        back_populates="rollouts"
    )
    device_rollouts: Mapped[list["DeviceRollout"]] = relationship(
        "DeviceRollout",
        back_populates="rollout",
        cascade="all, delete-orphan"
    )
    template: Mapped[Optional["RolloutTemplate"]] = relationship("RolloutTemplate")
    creator: Mapped["User"] = relationship("User")

DeviceRollout Model

File: backend/app/models/rollout.py (continued)
class DeviceRolloutStatus(str, enum.Enum):
    """Device-level rollout status"""
    PENDING = "pending"           # Waiting for rollout to reach this device
    DOWNLOADING = "downloading"   # Downloading package
    INSTALLING = "installing"     # Installing/deploying
    VERIFYING = "verifying"       # Health check in progress
    COMPLETED = "completed"       # Successfully deployed
    FAILED = "failed"             # Deployment failed
    ROLLED_BACK = "rolled_back"   # Rolled back to previous version


class DeviceRollout(Base):
    """Tracks rollout status for individual devices"""
    __tablename__ = "device_rollouts"

    id: Mapped[int] = mapped_column(Integer, primary_key=True, index=True)
    rollout_id: Mapped[int] = mapped_column(
        Integer,
        ForeignKey("rollouts.id"),
        nullable=False,
        index=True
    )
    device_id: Mapped[int] = mapped_column(
        Integer,
        ForeignKey("fleet_devices.id"),
        nullable=False,
        index=True
    )

    # Version tracking
    previous_deployment_id: Mapped[Optional[int]] = mapped_column(
        Integer,
        ForeignKey("deployments.id"),
        nullable=True
    )
    target_deployment_id: Mapped[int] = mapped_column(
        Integer,
        ForeignKey("deployments.id"),
        nullable=False
    )

    # Status
    status: Mapped[DeviceRolloutStatus] = mapped_column(
        Enum(DeviceRolloutStatus, values_callable=lambda x: [e.value for e in x]),
        default=DeviceRolloutStatus.PENDING,
        index=True
    )
    stage_number: Mapped[int] = mapped_column(Integer, default=0)

    # Download progress (proxied through API)
    download_url: Mapped[Optional[str]] = mapped_column(String(1000), nullable=True)
    download_started_at: Mapped[Optional[datetime]] = mapped_column(
        DateTime(timezone=True),
        nullable=True
    )
    download_completed_at: Mapped[Optional[datetime]] = mapped_column(
        DateTime(timezone=True),
        nullable=True
    )
    download_progress_percentage: Mapped[Optional[int]] = mapped_column(Integer, nullable=True)

    # Installation
    installation_started_at: Mapped[Optional[datetime]] = mapped_column(
        DateTime(timezone=True),
        nullable=True
    )
    installation_completed_at: Mapped[Optional[datetime]] = mapped_column(
        DateTime(timezone=True),
        nullable=True
    )

    # Health check
    health_check_passed: Mapped[Optional[bool]] = mapped_column(Boolean, nullable=True)
    health_check_details: Mapped[Optional[dict[str, Any]]] = mapped_column(JSON, nullable=True)

    # Error tracking
    error_message: Mapped[Optional[str]] = mapped_column(Text, nullable=True)
    error_code: Mapped[Optional[str]] = mapped_column(String(100), nullable=True)
    retry_count: Mapped[int] = mapped_column(Integer, default=0)

    # Command reference
    command_id: Mapped[Optional[int]] = mapped_column(
        Integer,
        ForeignKey("device_commands.id"),
        nullable=True
    )

    created_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), server_default=func.now())
    updated_at: Mapped[datetime] = mapped_column(
        DateTime(timezone=True),
        server_default=func.now(),
        onupdate=func.now()
    )

    # Relationships
    rollout: Mapped["Rollout"] = relationship("Rollout", back_populates="device_rollouts")
    device: Mapped["FleetDevice"] = relationship("FleetDevice")
    target_deployment: Mapped["Deployment"] = relationship(
        "Deployment",
        foreign_keys=[target_deployment_id]
    )
    previous_deployment: Mapped[Optional["Deployment"]] = relationship(
        "Deployment",
        foreign_keys=[previous_deployment_id]
    )
    command: Mapped[Optional["DeviceCommand"]] = relationship("DeviceCommand")

RolloutTemplate Model

File: backend/app/models/rollout_template.py
from datetime import datetime
from typing import TYPE_CHECKING, Any, Optional

from sqlalchemy import JSON, Boolean, DateTime, ForeignKey, Integer, String, Text
from sqlalchemy.orm import Mapped, mapped_column, relationship
from sqlalchemy.sql import func

from app.db.base import Base

if TYPE_CHECKING:
    from app.models.project import Project
    from app.models.user import User


class RolloutTemplate(Base):
    """Reusable rollout configuration templates"""
    __tablename__ = "rollout_templates"

    id: Mapped[int] = mapped_column(Integer, primary_key=True, index=True)
    project_id: Mapped[int] = mapped_column(
        Integer,
        ForeignKey("projects.id"),
        nullable=False,
        index=True
    )

    name: Mapped[str] = mapped_column(String(255), nullable=False)
    description: Mapped[Optional[str]] = mapped_column(Text, nullable=True)

    # Template configuration
    strategy: Mapped[str] = mapped_column(String(50), nullable=False)
    strategy_config: Mapped[dict[str, Any]] = mapped_column(JSON, nullable=False)

    # Default settings
    health_check_enabled: Mapped[bool] = mapped_column(Boolean, default=True)
    auto_rollback_enabled: Mapped[bool] = mapped_column(Boolean, default=False)

    # Metadata
    is_default: Mapped[bool] = mapped_column(Boolean, default=False)
    is_active: Mapped[bool] = mapped_column(Boolean, default=True)
    created_by: Mapped[int] = mapped_column(Integer, ForeignKey("users.id"), nullable=False)

    created_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), server_default=func.now())
    updated_at: Mapped[datetime] = mapped_column(
        DateTime(timezone=True),
        server_default=func.now(),
        onupdate=func.now()
    )

    # Relationships
    project: Mapped["Project"] = relationship("Project", back_populates="rollout_templates")
    creator: Mapped["User"] = relationship("User")


# Example templates:
# 1. "Conservative Progressive" - 10%, 30%, 100% with 2hr waits
# 2. "Fast Progressive" - 20%, 50%, 100% with 30min waits
# 3. "Canary (5 devices)" - 5 devices, then all
# 4. "All at Once" - Deploy to all immediately

Rollout Templates

Templates allow users to save and reuse rollout configurations for consistency across deployments.

Template API Endpoints

File: backend/app/api/v1/rollout_templates.py
# List templates
GET /api/v1/organizations/{org_slug}/projects/{project_slug}/rollout-templates
Response: {
    "items": [
        {
            "id": 1,
            "name": "Conservative Progressive",
            "description": "Slow rollout with verification at each stage",
            "strategy": "progressive",
            "strategy_config": {
                "stages": [
                    {"percentage": 10, "wait_minutes": 120},
                    {"percentage": 30, "wait_minutes": 120},
                    {"percentage": 100, "wait_minutes": 0}
                ]
            },
            "is_default": true,
            "created_by": 5,
            "creator_email": "john@example.com"
        }
    ]
}

# Create template
POST /api/v1/organizations/{org_slug}/projects/{project_slug}/rollout-templates
Body: {
    "name": "Custom Progressive",
    "description": "My custom rollout strategy",
    "strategy": "progressive",
    "strategy_config": {
        "stages": [
            {"percentage": 10, "wait_minutes": 30},
            {"percentage": 50, "wait_minutes": 60},
            {"percentage": 100, "wait_minutes": 0}
        ],
        "health_check_enabled": true,
        "auto_rollback_on_failure": true,
        "failure_threshold_percentage": 15
    },
    "health_check_enabled": true,
    "auto_rollback_enabled": true,
    "is_default": false
}

# Update template
PATCH /api/v1/organizations/{org_slug}/projects/{project_slug}/rollout-templates/{template_id}

# Delete template
DELETE /api/v1/organizations/{org_slug}/projects/{project_slug}/rollout-templates/{template_id}

# Set as default
POST /api/v1/organizations/{org_slug}/projects/{project_slug}/rollout-templates/{template_id}/set-default

Default Templates

The system should create these default templates when a project is created:
  1. All at Once
    • Deploy to all devices simultaneously
    • Use for: Small fleets, non-critical updates
  2. Fast Progressive
    • 20% → 50% → 100%
    • Wait times: 30min, 60min
    • Use for: Standard rollouts
  3. Conservative Progressive (default)
    • 10% → 30% → 100%
    • Wait times: 2hr, 2hr
    • Use for: Critical updates, production
  4. Canary (5 devices)
    • 5 devices → all remaining
    • Wait time: 1hr
    • Use for: Testing new versions

Package Download Proxy

Devices download packages through an API proxy endpoint (not direct S3) for better tracking and control.

Download Proxy Endpoint

File: backend/app/api/v1/downloads.py
# Generate download URL for device
GET /api/v1/downloads/packages/{deployment_id}
Headers:
  - X-Device-ID: device_id (from IoT certificate)
  - X-Device-Secret: project_secret
Response: {
    "download_url": "https://api.roboticks.io/api/v1/downloads/packages/456/file",
    "expires_at": "2025-11-14T11:00:00Z",
    "file_size_bytes": 125829120,
    "checksum_sha256": "abc123..."
}

# Download package file (proxied from S3)
GET /api/v1/downloads/packages/{deployment_id}/file
Headers:
  - X-Device-ID: device_id
  - X-Download-Token: token (from previous endpoint)
Response: Binary stream (package file)

# This endpoint:
# 1. Validates device access
# 2. Logs download start
# 3. Streams from S3
# 4. Tracks download completion
# 5. Updates DeviceRollout progress

Progress Reporting

Devices report progress every 15 seconds during download and installation.

Progress Report Format

MQTT Topic: roboticks/{project_id}/devices/{device_id}/rollout/progress Payload:
{
    "device_rollout_id": 1234,
    "rollout_id": 567,
    "deployment_id": 890,
    "status": "downloading",  // or "installing", "verifying", "completed", "failed"
    "progress_percentage": 45,
    "current_step": "Downloading package",
    "error_message": null,
    "timestamp": "2025-11-14T10:15:30Z"
}

Tunnel Feature

Overview

The Tunnel feature provides secure remote access to fleet devices through web-based terminal and SSH tunneling. It uses AWS IoT Core MQTT for bidirectional communication, allowing users to execute commands, debug issues, and interact with devices in real-time without VPN or direct network access.

Key Features

  • Web-Based Terminal: xterm.js-powered terminal in browser
  • SSH Protocol Support: Standard SSH clients can connect through tunnel
  • Session Management: Track all tunnel sessions with full audit trail
  • Multi-User Support: Multiple users can tunnel to same device (different sessions)
  • Session Recording: Optional session recording for compliance
  • Idle Timeout: Automatic disconnection after inactivity
  • Rate Limiting: Prevent abuse with per-user connection limits

Database Models

DeviceTunnel Model

File: backend/app/models/tunnel.py
from datetime import datetime
from typing import TYPE_CHECKING, Optional
import enum

from sqlalchemy import Boolean, DateTime, Enum, ForeignKey, Integer, String, Text
from sqlalchemy.orm import Mapped, mapped_column, relationship
from sqlalchemy.sql import func

from app.db.base import Base

if TYPE_CHECKING:
    from app.models.fleet import FleetDevice
    from app.models.project import Project
    from app.models.user import User


class TunnelStatus(str, enum.Enum):
    """Tunnel session status"""
    PENDING = "pending"           # Created, waiting for device connection
    CONNECTING = "connecting"     # Device acknowledged, establishing tunnel
    CONNECTED = "connected"       # Active tunnel session
    DISCONNECTED = "disconnected" # Cleanly closed
    TIMEOUT = "timeout"           # Timed out waiting for device
    ERROR = "error"               # Connection error


class TunnelType(str, enum.Enum):
    """Type of tunnel session"""
    WEB_TERMINAL = "web_terminal"  # Web-based xterm.js terminal
    SSH = "ssh"                     # SSH client connection
    PORT_FORWARD = "port_forward"   # Port forwarding tunnel


class DeviceTunnel(Base):
    """Remote access tunnel sessions to devices"""
    __tablename__ = "device_tunnels"

    id: Mapped[int] = mapped_column(Integer, primary_key=True, index=True)
    project_id: Mapped[int] = mapped_column(Integer, ForeignKey("projects.id"), nullable=False, index=True)
    device_id: Mapped[int] = mapped_column(Integer, ForeignKey("fleet_devices.id"), nullable=False, index=True)
    user_id: Mapped[int] = mapped_column(Integer, ForeignKey("users.id"), nullable=False, index=True)

    # Tunnel configuration
    tunnel_type: Mapped[TunnelType] = mapped_column(
        Enum(TunnelType, values_callable=lambda x: [e.value for e in x]),
        nullable=False
    )
    tunnel_id: Mapped[str] = mapped_column(String(64), unique=True, index=True)  # UUID for this tunnel

    # Port forwarding configuration (for SSH and port_forward types)
    local_port: Mapped[Optional[int]] = mapped_column(Integer, nullable=True)
    remote_port: Mapped[Optional[int]] = mapped_column(Integer, nullable=True)
    # Example: SSH would be remote_port=22, local_port=dynamic

    # Status tracking
    status: Mapped[TunnelStatus] = mapped_column(
        Enum(TunnelStatus, values_callable=lambda x: [e.value for e in x]),
        default=TunnelStatus.PENDING,
        index=True
    )

    # MQTT topics for this tunnel
    device_tunnel_topic: Mapped[Optional[str]] = mapped_column(String(500), nullable=True)
    # roboticks/{project_id}/devices/{device_id}/tunnel/{tunnel_id}/input
    server_tunnel_topic: Mapped[Optional[str]] = mapped_column(String(500), nullable=True)
    # roboticks/{project_id}/devices/{device_id}/tunnel/{tunnel_id}/output

    # Session management
    timeout_seconds: Mapped[int] = mapped_column(Integer, default=1800)  # 30min default
    idle_timeout_seconds: Mapped[int] = mapped_column(Integer, default=600)  # 10min idle
    last_activity_at: Mapped[Optional[datetime]] = mapped_column(DateTime(timezone=True), nullable=True)

    # Connection tracking
    connected_at: Mapped[Optional[datetime]] = mapped_column(DateTime(timezone=True), nullable=True)
    disconnected_at: Mapped[Optional[datetime]] = mapped_column(DateTime(timezone=True), nullable=True)
    disconnect_reason: Mapped[Optional[str]] = mapped_column(String(255), nullable=True)
    # Reasons: "user_closed", "idle_timeout", "device_offline", "max_duration", "error"

    # Session recording (optional, for compliance)
    recording_enabled: Mapped[bool] = mapped_column(Boolean, default=False)
    recording_s3_path: Mapped[Optional[str]] = mapped_column(String(500), nullable=True)

    # Statistics
    bytes_sent: Mapped[int] = mapped_column(Integer, default=0)
    bytes_received: Mapped[int] = mapped_column(Integer, default=0)
    commands_executed: Mapped[int] = mapped_column(Integer, default=0)  # For web terminal

    # Error tracking
    error_message: Mapped[Optional[str]] = mapped_column(Text, nullable=True)

    # Client info
    client_ip: Mapped[Optional[str]] = mapped_column(String(45), nullable=True)  # IPv6 support
    user_agent: Mapped[Optional[str]] = mapped_column(String(500), nullable=True)

    # Timestamps
    created_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), server_default=func.now())
    updated_at: Mapped[datetime] = mapped_column(
        DateTime(timezone=True),
        server_default=func.now(),
        onupdate=func.now()
    )

    # Relationships
    project: Mapped["Project"] = relationship("Project", back_populates="device_tunnels")
    device: Mapped["FleetDevice"] = relationship("FleetDevice", back_populates="tunnels")
    user: Mapped["User"] = relationship("User")

API Endpoints

File: backend/app/api/v1/tunnels.py
# Create new tunnel session
POST /api/v1/organizations/{org_slug}/projects/{project_slug}/tunnels
Body: {
    "device_id": 123,
    "tunnel_type": "web_terminal",
    "remote_port": 22,  # SSH port on device
    "timeout_seconds": 1800,
    "recording_enabled": false
}
Response: {
    "id": 1001,
    "tunnel_id": "tun-abc123def456",
    "status": "pending",
    "websocket_url": "wss://api.roboticks.io/api/v1/tunnels/tun-abc123def456/ws",
    "created_at": "2025-11-14T10:00:00Z",
    "expires_at": "2025-11-14T10:30:00Z"
}

# Get tunnel status
GET /api/v1/organizations/{org_slug}/projects/{project_slug}/tunnels/{tunnel_id}
Response: {
    "id": 1001,
    "tunnel_id": "tun-abc123def456",
    "tunnel_type": "web_terminal",
    "status": "connected",
    "device": {
        "id": 123,
        "name": "Bot-01",
        "status": "ONLINE"
    },
    "user_id": 5,
    "user_email": "john@example.com",
    "connected_at": "2025-11-14T10:00:05Z",
    "last_activity_at": "2025-11-14T10:15:32Z",
    "bytes_sent": 12480,
    "bytes_received": 5632,
    "created_at": "2025-11-14T10:00:00Z"
}

# List tunnel sessions (with filters)
GET /api/v1/organizations/{org_slug}/projects/{project_slug}/tunnels
Query params:
  - device_id: int (optional)
  - user_id: int (optional)
  - status: TunnelStatus (optional)
  - tunnel_type: TunnelType (optional)
  - active_only: bool (default: false) - Only show connected tunnels
  - page: int (default: 1)
  - page_size: int (default: 20, max: 100)
Response: {
    "items": [
        {
            "id": 1001,
            "tunnel_id": "tun-abc123def456",
            "device_name": "Bot-01",
            "user_email": "john@example.com",
            "status": "connected",
            "tunnel_type": "web_terminal",
            "connected_at": "2025-11-14T10:00:05Z",
            "duration_seconds": 932
        }
    ],
    "total": 45,
    "page": 1,
    "page_size": 20
}

# Close tunnel session
DELETE /api/v1/organizations/{org_slug}/projects/{project_slug}/tunnels/{tunnel_id}
Body: {
    "reason": "user_closed"
}
Response: {
    "id": 1001,
    "status": "disconnected",
    "disconnected_at": "2025-11-14T10:30:00Z",
    "duration_seconds": 1795
}

# WebSocket endpoint for web terminal
WS /api/v1/tunnels/{tunnel_id}/ws
Protocol: WebSocket with JSON messages
Auth: JWT token in query param or Authorization header

# Inbound messages (client → server → device):
{
    "type": "input",
    "data": "ls -la\n"  # Terminal input (base64 encoded for binary)
}
{
    "type": "resize",
    "rows": 24,
    "cols": 80
}
{
    "type": "ping"  # Keepalive
}

# Outbound messages (device → server → client):
{
    "type": "output",
    "data": "total 48\ndrwxr-xr-x..."  # Terminal output (base64 encoded)
}
{
    "type": "status",
    "status": "connected"
}
{
    "type": "error",
    "message": "Device disconnected"
}
{
    "type": "pong"
}

MQTT Service

File: backend/app/services/mqtt_tunnel_service.py
import json
import uuid
from datetime import datetime, timedelta
from typing import Optional

import boto3

from app.core.config import settings
from app.models.tunnel import DeviceTunnel, TunnelStatus, TunnelType
from app.models.fleet import FleetDevice


class MQTTTunnelService:
    """Service for managing device tunnels via AWS IoT Core MQTT"""

    def __init__(self):
        self.iot_client = boto3.client('iot-data', region_name=settings.AWS_REGION)

    def create_tunnel(
        self,
        tunnel: DeviceTunnel,
        device: FleetDevice
    ) -> bool:
        """
        Create a new tunnel by publishing tunnel request to device.

        Topic: roboticks/{project_id}/devices/{device_id}/tunnel/control
        QoS: 1 (at least once delivery)

        Args:
            tunnel: DeviceTunnel instance
            device: FleetDevice instance

        Returns:
            True if successfully published, False otherwise
        """
        control_topic = f"roboticks/{device.project_id}/devices/{device.device_id}/tunnel/control"

        # Generate tunnel topics
        tunnel.device_tunnel_topic = (
            f"roboticks/{device.project_id}/devices/{device.device_id}/tunnel/{tunnel.tunnel_id}/input"
        )
        tunnel.server_tunnel_topic = (
            f"roboticks/{device.project_id}/devices/{device.device_id}/tunnel/{tunnel.tunnel_id}/output"
        )

        payload = {
            "action": "create_tunnel",
            "tunnel_id": tunnel.tunnel_id,
            "tunnel_type": tunnel.tunnel_type,
            "remote_port": tunnel.remote_port or 22,
            "input_topic": tunnel.device_tunnel_topic,
            "output_topic": tunnel.server_tunnel_topic,
            "timeout_seconds": tunnel.timeout_seconds,
            "timestamp": datetime.utcnow().isoformat()
        }

        try:
            self.iot_client.publish(
                topic=control_topic,
                qos=1,
                payload=json.dumps(payload)
            )

            tunnel.status = TunnelStatus.CONNECTING
            return True

        except Exception as e:
            tunnel.status = TunnelStatus.ERROR
            tunnel.error_message = f"Failed to publish tunnel request: {str(e)}"
            return False

    def close_tunnel(
        self,
        tunnel: DeviceTunnel,
        device: FleetDevice,
        reason: str = "user_closed"
    ) -> bool:
        """
        Close an active tunnel by publishing close request to device.

        Args:
            tunnel: DeviceTunnel instance
            device: FleetDevice instance
            reason: Disconnect reason

        Returns:
            True if successfully published, False otherwise
        """
        control_topic = f"roboticks/{device.project_id}/devices/{device.device_id}/tunnel/control"

        payload = {
            "action": "close_tunnel",
            "tunnel_id": tunnel.tunnel_id,
            "reason": reason,
            "timestamp": datetime.utcnow().isoformat()
        }

        try:
            self.iot_client.publish(
                topic=control_topic,
                qos=1,
                payload=json.dumps(payload)
            )

            tunnel.status = TunnelStatus.DISCONNECTED
            tunnel.disconnected_at = datetime.utcnow()
            tunnel.disconnect_reason = reason
            return True

        except Exception as e:
            tunnel.error_message = f"Failed to publish close request: {str(e)}"
            return False

    def send_to_device(
        self,
        tunnel: DeviceTunnel,
        data: bytes
    ) -> bool:
        """
        Send data from web client to device through tunnel.

        Publishes to: roboticks/{project_id}/devices/{device_id}/tunnel/{tunnel_id}/input

        Args:
            tunnel: DeviceTunnel instance
            data: Raw bytes to send to device

        Returns:
            True if successfully published, False otherwise
        """
        if not tunnel.device_tunnel_topic:
            return False

        try:
            self.iot_client.publish(
                topic=tunnel.device_tunnel_topic,
                qos=0,  # QoS 0 for real-time data (fire and forget)
                payload=data
            )

            tunnel.bytes_sent += len(data)
            tunnel.last_activity_at = datetime.utcnow()
            return True

        except Exception as e:
            return False

    def process_device_output(
        self,
        tunnel_id: str,
        data: bytes
    ) -> None:
        """
        Process output from device (called by MQTT webhook).

        Device publishes to: roboticks/{project_id}/devices/{device_id}/tunnel/{tunnel_id}/output
        Lambda/webhook receives this and calls this method.

        Args:
            tunnel_id: Tunnel ID
            data: Raw bytes from device
        """
        # Implementation:
        # 1. Look up tunnel by tunnel_id
        # 2. Update bytes_received and last_activity_at
        # 3. Forward data to WebSocket client via WebSocket manager
        # 4. Optionally write to recording file if recording_enabled
        pass


# Singleton instance
mqtt_tunnel_service = MQTTTunnelService()

WebSocket Manager

File: backend/app/services/websocket_manager.py
from typing import Dict, Set
from fastapi import WebSocket
import asyncio


class WebSocketManager:
    """Manages WebSocket connections for tunnel sessions"""

    def __init__(self):
        # tunnel_id -> Set of WebSocket connections
        self.active_connections: Dict[str, Set[WebSocket]] = {}
        self._lock = asyncio.Lock()

    async def connect(self, tunnel_id: str, websocket: WebSocket):
        """Register new WebSocket connection for tunnel"""
        await websocket.accept()
        async with self._lock:
            if tunnel_id not in self.active_connections:
                self.active_connections[tunnel_id] = set()
            self.active_connections[tunnel_id].add(websocket)

    async def disconnect(self, tunnel_id: str, websocket: WebSocket):
        """Unregister WebSocket connection"""
        async with self._lock:
            if tunnel_id in self.active_connections:
                self.active_connections[tunnel_id].discard(websocket)
                if not self.active_connections[tunnel_id]:
                    del self.active_connections[tunnel_id]

    async def send_to_tunnel(self, tunnel_id: str, message: dict):
        """Send message to all WebSocket clients connected to tunnel"""
        if tunnel_id not in self.active_connections:
            return

        # Send to all connected clients
        disconnected = set()
        for websocket in self.active_connections[tunnel_id]:
            try:
                await websocket.send_json(message)
            except Exception:
                disconnected.add(websocket)

        # Clean up disconnected clients
        if disconnected:
            async with self._lock:
                self.active_connections[tunnel_id] -= disconnected

    async def broadcast_output(self, tunnel_id: str, data: bytes):
        """Broadcast device output to all connected WebSocket clients"""
        import base64
        message = {
            "type": "output",
            "data": base64.b64encode(data).decode('utf-8')
        }
        await self.send_to_tunnel(tunnel_id, message)


# Singleton instance
ws_manager = WebSocketManager()

Frontend Components

Terminal Page (New)

File: frontend/src/pages/Terminal.tsx Features:
  • Embedded xterm.js terminal
  • Device selector dropdown
  • Connection status indicator
  • Session controls (Connect, Disconnect)
  • Terminal settings (font size, theme)
  • Session statistics (bytes sent/received, duration)
  • Reconnect on disconnect
Layout:
┌────────────────────────────────────────────────────┐
│ Terminal                                           │
├────────────────────────────────────────────────────┤
│ Device: [Bot-01 ▼] [Connect] [Disconnect]         │
│ Status: ● Connected | Duration: 05:32 | ↑ 12KB ↓ 5KB
├────────────────────────────────────────────────────┤
│                                                    │
│ $ ls -la                                           │
│ total 48                                           │
│ drwxr-xr-x  5 root root 4096 Nov 14 10:00 .       │
│ drwxr-xr-x 18 root root 4096 Nov 13 09:30 ..      │
│ -rw-r--r--  1 root root  220 Nov 10 08:15 .bashrc │
│ $█                                                 │
│                                                    │
│                                                    │
│                                                    │
│                                                    │
│                                                    │
└────────────────────────────────────────────────────┘
Key Components:
  • xterm.js for terminal emulation
  • WebSocket client for bidirectional communication
  • Automatic reconnection logic
  • Terminal resize handling (responsive)

Tunnel Sessions Page

File: frontend/src/pages/Tunnels.tsx (or add tab to Fleet page) Features:
  • Table view of active and historical tunnel sessions
  • Filters: Device, User, Status, Date Range
  • Real-time status updates for active tunnels
  • “Kill Session” button for active tunnels (admin only)
  • Session details dialog with statistics
Columns:
  1. User (name + avatar)
  2. Device (name + status)
  3. Type (Web Terminal / SSH / Port Forward)
  4. Status (with duration for active)
  5. Connected At
  6. Duration / Disconnected At
  7. Data Transfer (↑↓ bytes)
  8. Actions (View Details, Kill Session)

Device Agent Implementation

The device agent (roboticks-agent) needs to implement tunnel functionality: File: device-agent/tunnel_handler.py (pseudocode)
import pty
import os
import select
import termios
import struct
import fcntl


class TunnelHandler:
    """Handles tunnel sessions on device"""

    def __init__(self, iot_client):
        self.iot_client = iot_client
        self.active_tunnels = {}  # tunnel_id -> tunnel_info

    def on_tunnel_control(self, message):
        """Handle tunnel control messages"""
        action = message['action']

        if action == 'create_tunnel':
            self.create_tunnel(message)
        elif action == 'close_tunnel':
            self.close_tunnel(message['tunnel_id'])

    def create_tunnel(self, config):
        """Create new tunnel session"""
        tunnel_id = config['tunnel_id']
        tunnel_type = config['tunnel_type']
        input_topic = config['input_topic']
        output_topic = config['output_topic']
        remote_port = config.get('remote_port', 22)

        if tunnel_type == 'web_terminal':
            # Spawn PTY (pseudo-terminal)
            pid, fd = pty.fork()

            if pid == 0:  # Child process
                # Execute shell
                os.execvp('/bin/bash', ['/bin/bash', '-l'])
            else:  # Parent process
                # Store tunnel info
                self.active_tunnels[tunnel_id] = {
                    'type': tunnel_type,
                    'fd': fd,
                    'pid': pid,
                    'input_topic': input_topic,
                    'output_topic': output_topic
                }

                # Subscribe to input topic
                self.iot_client.subscribe(input_topic, qos=0)

                # Start reading from PTY and publishing to output topic
                self.start_pty_reader(tunnel_id, fd, output_topic)

        elif tunnel_type == 'ssh':
            # For SSH, create TCP tunnel to local SSH server
            self.create_tcp_tunnel(tunnel_id, '127.0.0.1', 22, input_topic, output_topic)

    def start_pty_reader(self, tunnel_id, fd, output_topic):
        """Read from PTY and publish to MQTT"""
        def reader_loop():
            while tunnel_id in self.active_tunnels:
                try:
                    # Check if there's data to read (non-blocking)
                    r, _, _ = select.select([fd], [], [], 0.1)
                    if r:
                        data = os.read(fd, 1024)
                        if data:
                            # Publish to output topic
                            self.iot_client.publish(
                                topic=output_topic,
                                qos=0,
                                payload=data
                            )
                except OSError:
                    # PTY closed
                    self.close_tunnel(tunnel_id)
                    break

        # Run in separate thread
        import threading
        thread = threading.Thread(target=reader_loop, daemon=True)
        thread.start()

    def on_tunnel_input(self, tunnel_id, data):
        """Handle input data from server"""
        if tunnel_id not in self.active_tunnels:
            return

        tunnel = self.active_tunnels[tunnel_id]
        fd = tunnel['fd']

        try:
            # Write to PTY
            os.write(fd, data)
        except OSError:
            # PTY closed
            self.close_tunnel(tunnel_id)

    def resize_terminal(self, tunnel_id, rows, cols):
        """Resize terminal window"""
        if tunnel_id not in self.active_tunnels:
            return

        tunnel = self.active_tunnels[tunnel_id]
        fd = tunnel['fd']

        # Set terminal size
        size = struct.pack('HHHH', rows, cols, 0, 0)
        fcntl.ioctl(fd, termios.TIOCSWINSZ, size)

    def close_tunnel(self, tunnel_id):
        """Close tunnel session"""
        if tunnel_id not in self.active_tunnels:
            return

        tunnel = self.active_tunnels[tunnel_id]

        # Close PTY
        os.close(tunnel['fd'])

        # Kill child process
        import signal
        os.kill(tunnel['pid'], signal.SIGTERM)

        # Unsubscribe from input topic
        self.iot_client.unsubscribe(tunnel['input_topic'])

        # Remove from active tunnels
        del self.active_tunnels[tunnel_id]

Security Considerations

  1. Authentication:
    • WebSocket connections require valid JWT token
    • Device authentication via AWS IoT Core certificates
    • Per-user rate limiting (max concurrent tunnels)
  2. Authorization:
    • Check user has project access before creating tunnel
    • Check device belongs to project
    • Optional: Role-based access (only admins can tunnel to production)
  3. Session Recording:
    • Configurable per project or per tunnel
    • Stored in S3 with encryption
    • Retention policy (default: 90 days)
    • Audit log of who accessed recordings
  4. Network Security:
    • WebSocket over TLS (wss://)
    • MQTT over TLS
    • No direct device network access (all via AWS IoT Core)
  5. Rate Limiting:
    • Max 3 concurrent tunnels per user
    • Max 10 concurrent tunnels per device
    • Max tunnel duration: 2 hours (configurable)
    • Idle timeout: 10 minutes (configurable)

Performance Considerations

  1. Latency:
    • Expected latency: 50-200ms (depends on device location)
    • MQTT QoS 0 for tunnel data (minimal overhead)
    • Direct WebSocket connection (no polling)
  2. Bandwidth:
    • Terminal data is small (~1-10KB/s typical usage)
    • MQTT message size limit: 128KB
    • Chunk large outputs into multiple messages
  3. Scalability:
    • WebSocket connections handled by FastAPI/Uvicorn
    • Horizontal scaling via load balancer
    • MQTT scales to millions of devices (AWS IoT Core)
  4. Resource Usage:
    • One WebSocket connection per user session
    • One MQTT subscription per tunnel per device
    • Memory: ~1MB per active tunnel (server-side)

Device Groups

Group Types

  1. Manual Groups
    • Explicitly list device IDs
    • Static membership
    • Example: “Test Fleet” = [device1, device2, device3]
  2. Tag-Based Groups
    • Dynamic membership based on device tags
    • Automatically includes/excludes devices as tags change
    • Example: All devices with {"environment": "production", "site": "warehouse-1"}
  3. Query Groups
    • Advanced filtering with conditions
    • Example: All DRONE devices with firmware >= 1.0.0 and status ONLINE

Group API Endpoints

File: backend/app/api/v1/device_groups.py
# Create group
POST /api/v1/organizations/{org_slug}/projects/{project_slug}/device-groups
Body: {
    "name": "Production Warehouse",
    "description": "All production devices in warehouse",
    "selection_type": "tag_based",
    "tag_filters": {
        "environment": "production",
        "site": "warehouse-1"
    }
}

# List groups
GET /api/v1/organizations/{org_slug}/projects/{project_slug}/device-groups

# Get group with device count
GET /api/v1/organizations/{org_slug}/projects/{project_slug}/device-groups/{group_id}
Response: {
    "id": 123,
    "name": "Production Warehouse",
    "selection_type": "tag_based",
    "tag_filters": {"environment": "production"},
    "device_count": 45,
    "created_by": 5,
    "creator_email": "john@example.com"
}

# Get devices in group (evaluates dynamic groups)
GET /api/v1/organizations/{org_slug}/projects/{project_slug}/device-groups/{group_id}/devices
Response: {
    "devices": [
        {
            "id": 1,
            "name": "Bot-01",
            "device_id": "bot-01-abc",
            "status": "ONLINE",
            "current_deployment_id": 456,
            "tags": {"environment": "production", "site": "warehouse-1"}
        }
    ],
    "total": 45
}

# Update group
PATCH /api/v1/organizations/{org_slug}/projects/{project_slug}/device-groups/{group_id}

# Delete group
DELETE /api/v1/organizations/{org_slug}/projects/{project_slug}/device-groups/{group_id}

Data Flow

Command Flow

User (WebUI)

    ├─> POST /commands
    │       │
    │       ▼
    │   FastAPI Backend
    │       │
    │       ├─> Create DeviceCommand in DB
    │       ├─> Call MQTTCommandService.send_command()
    │       │       │
    │       │       ▼
    │       │   AWS IoT Core
    │       │   Topic: roboticks/{proj}/devices/{dev}/commands
    │       │   QoS: 2 (exactly once)
    │       │       │
    │       │       ▼
    │       │   Device Agent (subscribes to topic)
    │       │       │
    │       │       ├─> Receives command
    │       │       ├─> Executes command
    │       │       └─> Publishes response
    │       │               │
    │       │               ▼
    │       │       AWS IoT Core
    │       │       Topic: roboticks/{proj}/devices/{dev}/commands/response
    │       │               │
    │       │               ├─> IoT Rule triggers Lambda
    │       │               └─> Lambda calls webhook
    │       │                       │
    │       │                       ▼
    │       └───────────────> POST /webhooks/command-response
    │                               │
    │                               ├─> Update DeviceCommand status
    │                               └─> Store response_payload

    └─> WebUI polls GET /commands/{id} for updates
            (every 5 seconds for active commands)

Rollout Flow

User (WebUI)

    ├─> POST /rollouts (with template_id optional)
    │       │
    │       ▼
    │   FastAPI Backend
    │       │
    │       ├─> Create Rollout in DB
    │       ├─> Resolve target devices (from group or list)
    │       ├─> Create DeviceRollout for each device
    │       ├─> Assign devices to stages
    │       │       │
    │       │       ▼
    │       └─> POST /rollouts/{id}/start
    │               │
    │               ▼
    │           Rollout Service
    │               │
    │               ├─> Execute Stage 1
    │               │       │
    │               │       ├─> For each device in stage:
    │               │       │   ├─> Generate download URL (API proxy)
    │               │       │   ├─> Create DeviceCommand (DEPLOY_PACKAGE)
    │               │       │   └─> Send via MQTT (QoS 2)
    │               │       │
    │               │       ▼
    │               │   AWS IoT Core
    │               │   Topic: roboticks/{proj}/devices/{dev}/commands
    │               │       │
    │               │       ▼
    │               │   Device Agent
    │               │       │
    │               │       ├─> Download package via API proxy
    │               │       │   (GET /downloads/packages/{id}/file)
    │               │       │
    │               │       ├─> Install package
    │               │       │
    │               │       └─> Report progress every 15 seconds
    │               │               │
    │               │               ▼
    │               │       AWS IoT Core
    │               │       Topic: roboticks/{proj}/devices/{dev}/rollout/progress
    │               │               │
    │               │               ├─> IoT Rule → Lambda → Webhook
    │               │               └─> POST /webhooks/rollout-progress
    │               │                       │
    │               │                       ▼
    │               │                   Rollout Service
    │               │                       │
    │               │                       ├─> Update DeviceRollout
    │               │                       ├─> Update Rollout progress
    │               │                       ├─> Check failure threshold
    │               │                       └─> Advance stage if complete
    │               │
    │               ├─> Wait for stage completion
    │               ├─> Check health & failures
    │               ├─> Execute Stage 2
    │               └─> ... repeat until complete

    └─> WebUI polls GET /rollouts/{id} for updates
            (every 5 seconds during rollout)

Technical Specifications

MQTT Configuration

  • QoS Level: 2 (Exactly once delivery)
  • Topic Structure:
    • Commands: roboticks/{project_id}/devices/{device_id}/commands
    • Command Responses: roboticks/{project_id}/devices/{device_id}/commands/response
    • Rollout Progress: roboticks/{project_id}/devices/{device_id}/rollout/progress
  • Message Format: JSON
  • Retention: Commands retained for 1 hour if device offline

Package Downloads

  • Method: API Proxy (not direct S3)
  • Endpoint: GET /api/v1/downloads/packages/{deployment_id}/file
  • Authentication: Device certificate + download token
  • Tracking: Backend logs download start, progress, completion
  • Benefits:
    • Better access control
    • Download metrics
    • Ability to throttle/cancel
    • Error tracking

Progress Reporting

  • Frequency: Every 15 seconds during active operations
  • Topics: Device-specific progress topic
  • Payload: Status, progress percentage, current step, errors
  • Processing: Lambda → Webhook → Backend updates DeviceRollout

Concurrency Control

  • Per Device: Maximum 1 active deployment at a time
  • Per Rollout: No limit on devices (scales with fleet size)
  • Per Project: No hard limit, but monitored for performance

Data Retention

  • Commands: Forever (full audit trail)
  • Command Responses: Forever
  • Rollouts: Forever
  • DeviceRollouts: Forever
  • MQTT Messages: 1 hour retention on broker

Database Indexes

Required indexes for performance:
-- Commands
CREATE INDEX idx_commands_project_status ON device_commands(project_id, status);
CREATE INDEX idx_commands_device ON device_commands(device_id);
CREATE INDEX idx_commands_created_by ON device_commands(created_by);
CREATE INDEX idx_commands_created_at ON device_commands(created_at DESC);

-- Rollouts
CREATE INDEX idx_rollouts_project_status ON rollouts(project_id, status);
CREATE INDEX idx_rollouts_deployment ON rollouts(deployment_id);

-- DeviceRollouts
CREATE INDEX idx_device_rollouts_rollout ON device_rollouts(rollout_id);
CREATE INDEX idx_device_rollouts_device ON device_rollouts(device_id);
CREATE INDEX idx_device_rollouts_status ON device_rollouts(status);

Implementation Checklist

Phase 1: Core Models & Database

  • Create tunnel.py model
  • Create command.py model
  • Create device_group.py model
  • Create rollout.py model
  • Create rollout_template.py model
  • Generate Alembic migration
  • Add relationships to Project and FleetDevice models
  • Run migration on dev database

Phase 2: Backend Services - Tunnel (PRIORITY)

  • Implement MQTTTunnelService
  • Implement WebSocketManager
  • Create webhook endpoint for tunnel output
  • Add Lambda function for tunnel MQTT routing
  • Implement tunnel idle timeout monitor

Phase 3: Backend API Endpoints - Tunnel (PRIORITY)

  • Tunnel API (create, get, list, close)
  • WebSocket endpoint for tunnel connections
  • Tunnel session recording storage
  • Rate limiting middleware

Phase 4: Frontend - Tunnel (PRIORITY)

  • Install xterm.js and xterm-addon-fit
  • Create Terminal page with xterm.js
  • Implement WebSocket client for tunnel
  • Add device selector and connection controls
  • Add session statistics display
  • Create Tunnel Sessions management page
  • Add tunnel session history view

Phase 5: Backend Services - Commands & Rollouts

  • Implement MQTTCommandService
  • Implement RolloutService
  • Implement download proxy endpoint
  • Create webhook endpoints for MQTT responses
  • Add Lambda functions for IoT Rules

Phase 6: Backend API Endpoints - Commands & Rollouts

  • Commands API (list, create, get, cancel)
  • Command templates endpoint
  • Device Groups API (CRUD)
  • Rollouts API (CRUD, start, pause, resume, rollback)
  • Rollout Templates API (CRUD)
  • Download proxy endpoints

Phase 7: Frontend - Commands

  • Create Commands page with table view
  • Add filters and search
  • Implement New Command wizard dialog
  • Create Command Detail dialog
  • Add real-time status polling
  • Add bulk operations

Phase 8: Frontend - Device Groups

  • Create Device Groups page
  • New Group wizard (manual, tag-based, query)
  • Group editing and device preview
  • Integration with Commands and Rollouts

Phase 9: Frontend - Rollouts

  • Create Rollouts page with active/completed sections
  • Implement New Rollout wizard
  • Create Rollout Detail view with progress
  • Add rollout controls (pause, resume, rollback)
  • Real-time progress updates
  • Rollout Templates management page

Phase 10: Testing & Documentation

  • Unit tests for models
  • Integration tests for services
  • API endpoint tests
  • Frontend component tests
  • End-to-end tunnel test
  • End-to-end rollout test
  • Update API documentation
  • Create user guide

Phase 11: Device Agent Updates

  • Implement TunnelHandler for PTY management
  • Update agent to subscribe to tunnel control topic
  • Subscribe to command topics
  • Implement command execution handlers
  • Add package download via proxy
  • Implement progress reporting (every 15s)
  • Add health check reporting
  • Error handling and retry logic

Future Enhancements (Not in Initial Implementation)

  1. Command Templates Library: User-created reusable commands
  2. Command Scheduling: Schedule commands for future execution
  3. Command Chaining: Execute sequence of commands
  4. Approval Workflow: Require approval for critical commands
  5. Advanced Health Checks: Custom health check scripts
  6. A/B Testing Metrics: Automatic version comparison
  7. Notification System: Email/Slack alerts
  8. Geographic Grouping: Location-based device groups
  9. Time-zone Aware Rollouts: Deploy during off-peak per region
  10. Rollout Simulation: Dry-run mode
  11. Auto-advance Stages: Progress automatically on health check pass
  12. Manual Stage Approval: Require manual approval per stage

References


Document Version: 1.0 Last Updated: 2025-11-14 Author: Roboticks Engineering Team