Skip to main content

Writing Tests

This guide shows you how to write effective tests for your robotics modules and compositions using the Roboticks testing framework.

Test File Structure

Tests should follow pytest conventions and be placed in a tests/ directory:
tests/
|-- conftest.py              # Shared fixtures
|-- test_my_module.py        # Tests for MyModule
|-- test_composition.py      # Integration tests
+-- test_infrastructure.py   # Infrastructure validation

Adding the Test Framework to Path

The test framework files (zmq_monitor.py, pytest_roboticks_progress.py) are mounted at runtime in /opt/roboticks/test-framework/. Since this directory isn’t installed as a Python package, you need to add it to sys.path to import these utilities.
import sys
from pathlib import Path

# Add test framework to path to access:
# - zmq_monitor.py: ZMQ message capture for asserting on module output
# - pytest_roboticks_progress.py: Progress reporting plugin
test_framework_path = Path("/opt/roboticks/test-framework")
if test_framework_path.exists():
    sys.path.insert(0, str(test_framework_path))

# Now import framework components
from zmq_monitor import ZmqMonitor, ZmqMessage
Always check if the path exists before adding it. This allows your tests to run locally for development where the framework might be installed via pip (pip install -e submodules/roboticks-tests) or located elsewhere.
See the reference implementation at tests/test_example_hello_world.py for a complete example.

ZMQ Message Monitoring

The ZmqMonitor class lets you subscribe to ZeroMQ messages published by your modules and make assertions on them.

Basic Usage

import pytest
from zmq_monitor import ZmqMonitor

class TestMyModule:
    @pytest.fixture
    def zmq_monitor(self):
        """Create and start a ZMQ monitor for the test."""
        monitor = ZmqMonitor()
        # Subscribe to the DeviceManager's publisher endpoint
        # Modules publish to 5555, DeviceManager relays to 5556
        monitor.subscribe("tcp://localhost:5556", "/my/topic")
        monitor.start()

        yield monitor

        monitor.stop()

    def test_module_publishes_data(self, zmq_monitor):
        """Verify module publishes messages on expected topic."""
        msg = zmq_monitor.wait_for_message(
            topic="/my/topic",
            timeout=30.0
        )

        assert msg is not None, "Expected message not received within timeout"
        assert msg.topic == "/my/topic"
        assert msg.payload is not None

ZmqMonitor API

class ZmqMonitor:
    def subscribe(endpoint: str, topic: str = "") -> None:
        """
        Subscribe to a ZeroMQ endpoint with optional topic filter.

        Args:
            endpoint: ZeroMQ endpoint (e.g., 'tcp://localhost:5556')
            topic: Topic to filter (empty string = all messages)

        Note: Call before start(). Subscriptions after start() won't work.
        """

    def start() -> None:
        """Start monitoring in background thread."""

    def stop() -> None:
        """Stop monitoring and cleanup."""

    def wait_for_message(
        topic: str | None = None,
        predicate: Callable[[Any], bool] | None = None,
        timeout: float = 30.0
    ) -> ZmqMessage | None:
        """
        Wait for a message matching criteria.

        Args:
            topic: Optional topic to match
            predicate: Optional function to test message payload
            timeout: Timeout in seconds

        Returns:
            Matching message or None if timeout
        """

    def get_messages(topic: str | None = None) -> list[ZmqMessage]:
        """Get all captured messages, optionally filtered by topic."""

    def clear_messages() -> None:
        """Clear all captured messages."""

ZmqMessage Structure

@dataclass
class ZmqMessage:
    topic: str          # Message topic (e.g., "/hello/message")
    payload: Any        # Parsed JSON payload or raw bytes
    timestamp: datetime # When message was received
    raw_data: bytes     # Original raw message data

Advanced ZMQ Examples

Wait for Message with Specific Content

def test_counter_increments(self, zmq_monitor):
    """Wait for a message where counter exceeds threshold."""
    msg = zmq_monitor.wait_for_message(
        topic="/counter/output",
        predicate=lambda payload: payload.get("count", 0) > 10,
        timeout=60.0
    )

    assert msg is not None, "Never received counter > 10"
    assert msg.payload["count"] > 10

Verify Message Rate

def test_message_rate(self, zmq_monitor):
    """Verify module publishes at expected rate."""
    import time

    # Wait for messages to accumulate
    time.sleep(10)

    messages = zmq_monitor.get_messages(topic="/sensor/data")

    # Expect roughly 10 messages per second (1 Hz module)
    assert len(messages) >= 8, f"Expected ~10 messages, got {len(messages)}"
    assert len(messages) <= 12, f"Too many messages: {len(messages)}"

Verify Monotonic Counter

def test_counters_are_monotonic(self, zmq_monitor):
    """Verify counter values never decrease."""
    import time

    time.sleep(5)
    messages = zmq_monitor.get_messages(topic="/counter/output")

    counters = [
        msg.payload["count"]
        for msg in messages
        if isinstance(msg.payload, dict) and "count" in msg.payload
    ]

    for i in range(1, len(counters)):
        assert counters[i] >= counters[i-1], \
            f"Counter decreased: {counters[i-1]} -> {counters[i]}"

Log File Testing

For modules that write to log files, you can parse and assert on log contents:
import re
from pathlib import Path

class TestModuleLogs:
    def test_module_startup_logged(self):
        """Verify module logs successful startup."""
        log_file = Path("/var/roboticks/logs/modules/my-module.log")

        assert log_file.exists(), "Module log file not created"

        content = log_file.read_text()
        assert "Module initialized successfully" in content

    def test_no_errors_in_log(self):
        """Verify no ERROR level messages in module log."""
        log_file = Path("/var/roboticks/logs/modules/my-module.log")
        content = log_file.read_text()

        error_pattern = re.compile(r"\[ERROR\]")
        errors = error_pattern.findall(content)

        assert len(errors) == 0, f"Found {len(errors)} errors in log"

Infrastructure Tests

Test device registration and session creation:
import re
from pathlib import Path

class TestInfrastructure:
    def test_device_registered(self):
        """Verify device registration completed."""
        dsn_file = Path("/etc/roboticks/device_id")
        assert dsn_file.exists(), "Device ID file not created"

        dsn = dsn_file.read_text().strip()
        assert dsn.startswith("device-"), f"Unexpected DSN format: {dsn}"

    def test_session_created(self):
        """Verify session was created with valid UUID."""
        session_file = Path("/var/roboticks/sessions/.session_id")
        assert session_file.exists(), "Session ID file not created"

        session_id = session_file.read_text().strip()
        uuid_pattern = re.compile(
            r"^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$",
            re.IGNORECASE
        )
        assert uuid_pattern.match(session_id), \
            f"Session ID is not a valid UUID: {session_id}"

Progress Reporting

The pytest_roboticks_progress plugin automatically reports test progress to the Roboticks backend. It’s enabled when these environment variables are set:
  • ROBOTICKS_TEST_JOB_ID - Test job ID
  • ROBOTICKS_API_URL - Backend API URL

How It Works

The plugin hooks into pytest’s lifecycle:
  • Reports each test as “running” when it starts
  • Reports “passed”, “failed”, or “skipped” when it completes
  • Includes duration, error messages, and captured output

Viewing Progress

Test progress appears in real-time on the Roboticks dashboard under your test job.

Complete Example

Here’s a complete test file demonstrating multiple testing approaches:
"""
HelloWorld Module Tests

Tests the HelloWorldModule functionality using ZMQ message monitoring.
"""
import os
import re
import sys
import time
from pathlib import Path

import pytest

# Add test framework to path
test_framework_path = Path("/opt/roboticks/test-framework")
if test_framework_path.exists():
    sys.path.insert(0, str(test_framework_path))

try:
    from zmq_monitor import ZmqMonitor, ZmqMessage
    ZMQ_AVAILABLE = True
except ImportError:
    ZMQ_AVAILABLE = False
    ZmqMonitor = None

# Configuration
ZMQ_ENDPOINT = os.environ.get("ZMQ_TEST_ENDPOINT", "tcp://localhost:5556")
HELLO_TOPIC = "/hello/message"


class TestHelloWorldZmq:
    """Test HelloWorld module using ZMQ monitoring."""

    @pytest.fixture
    def zmq_monitor(self):
        if not ZMQ_AVAILABLE:
            pytest.skip("ZMQ monitor not available")

        monitor = ZmqMonitor()
        monitor.subscribe(ZMQ_ENDPOINT, HELLO_TOPIC)
        monitor.start()
        time.sleep(0.5)  # Allow connection to establish

        yield monitor

        monitor.stop()

    @pytest.mark.skipif(not ZMQ_AVAILABLE, reason="ZMQ not available")
    def test_publishes_messages(self, zmq_monitor):
        """Verify module publishes messages."""
        msg = zmq_monitor.wait_for_message(
            topic=HELLO_TOPIC,
            timeout=30.0
        )
        assert msg is not None
        assert msg.topic == HELLO_TOPIC

    @pytest.mark.skipif(not ZMQ_AVAILABLE, reason="ZMQ not available")
    def test_message_structure(self, zmq_monitor):
        """Verify message has expected fields."""
        msg = zmq_monitor.wait_for_message(
            topic=HELLO_TOPIC,
            timeout=30.0
        )
        assert msg is not None

        payload = msg.payload
        if isinstance(payload, dict):
            assert "message" in payload or "data" in payload

    @pytest.mark.skipif(not ZMQ_AVAILABLE, reason="ZMQ not available")
    def test_receives_multiple_messages(self, zmq_monitor):
        """Verify we receive multiple messages over time."""
        time.sleep(10)

        messages = zmq_monitor.get_messages(topic=HELLO_TOPIC)
        assert len(messages) >= 5, \
            f"Expected at least 5 messages, got {len(messages)}"


class TestHelloWorldInfrastructure:
    """Test infrastructure (always runs)."""

    def test_session_created(self):
        """Verify session file exists with valid UUID."""
        session_file = Path("/var/roboticks/sessions/.session_id")
        assert session_file.exists()

        content = session_file.read_text().strip()
        uuid_re = re.compile(
            r"^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$",
            re.IGNORECASE
        )
        assert uuid_re.match(content)

    def test_device_registered(self):
        """Verify device registration completed."""
        dsn_file = Path("/etc/roboticks/device_id")
        assert dsn_file.exists()

        dsn = dsn_file.read_text().strip()
        assert dsn.startswith("device-")


if __name__ == "__main__":
    pytest.main([__file__, "-v"])

Best Practices

Always use pytest fixtures for ZMQ monitors to ensure proper cleanup:
@pytest.fixture
def zmq_monitor(self):
    monitor = ZmqMonitor()
    monitor.subscribe(...)
    monitor.start()
    yield monitor
    monitor.stop()  # Always called, even on test failure
Check for optional dependencies and skip tests gracefully:
try:
    from zmq_monitor import ZmqMonitor
    ZMQ_AVAILABLE = True
except ImportError:
    ZMQ_AVAILABLE = False

@pytest.mark.skipif(not ZMQ_AVAILABLE, reason="ZMQ not available")
def test_zmq_messages(self):
    ...
Set timeouts based on expected module behavior:
  • Fast modules (>1 Hz): 10-30 second timeouts
  • Slow modules (<0.1 Hz): 60-120 second timeouts
  • Initialization-dependent: Allow extra time for startup
If running multiple ZMQ tests, clear messages to avoid interference:
def test_first(self, zmq_monitor):
    msg = zmq_monitor.wait_for_message(...)
    zmq_monitor.clear_messages()  # Clear for next test

Next Steps