Writing Tests
This guide shows you how to write effective tests for your robotics modules and compositions using the Roboticks testing framework.
Test File Structure
Tests should follow pytest conventions and be placed in a tests/ directory:
tests/
|-- conftest.py # Shared fixtures
|-- test_my_module.py # Tests for MyModule
|-- test_composition.py # Integration tests
+-- test_infrastructure.py # Infrastructure validation
Adding the Test Framework to Path
The test framework files (zmq_monitor.py, pytest_roboticks_progress.py) are mounted at runtime in /opt/roboticks/test-framework/. Since this directory isn’t installed as a Python package, you need to add it to sys.path to import these utilities.
import sys
from pathlib import Path
# Add test framework to path to access:
# - zmq_monitor.py: ZMQ message capture for asserting on module output
# - pytest_roboticks_progress.py: Progress reporting plugin
test_framework_path = Path( "/opt/roboticks/test-framework" )
if test_framework_path.exists():
sys.path.insert( 0 , str (test_framework_path))
# Now import framework components
from zmq_monitor import ZmqMonitor, ZmqMessage
Always check if the path exists before adding it. This allows your tests to run locally for development where the framework might be installed via pip (pip install -e submodules/roboticks-tests) or located elsewhere.
See the reference implementation at tests/test_example_hello_world.py for a complete example.
ZMQ Message Monitoring
The ZmqMonitor class lets you subscribe to ZeroMQ messages published by your modules and make assertions on them.
Basic Usage
import pytest
from zmq_monitor import ZmqMonitor
class TestMyModule :
@pytest.fixture
def zmq_monitor ( self ):
"""Create and start a ZMQ monitor for the test."""
monitor = ZmqMonitor()
# Subscribe to the DeviceManager's publisher endpoint
# Modules publish to 5555, DeviceManager relays to 5556
monitor.subscribe( "tcp://localhost:5556" , "/my/topic" )
monitor.start()
yield monitor
monitor.stop()
def test_module_publishes_data ( self , zmq_monitor ):
"""Verify module publishes messages on expected topic."""
msg = zmq_monitor.wait_for_message(
topic = "/my/topic" ,
timeout = 30.0
)
assert msg is not None , "Expected message not received within timeout"
assert msg.topic == "/my/topic"
assert msg.payload is not None
ZmqMonitor API
class ZmqMonitor :
def subscribe ( endpoint : str , topic : str = "" ) -> None :
"""
Subscribe to a ZeroMQ endpoint with optional topic filter.
Args:
endpoint: ZeroMQ endpoint (e.g., 'tcp://localhost:5556')
topic: Topic to filter (empty string = all messages)
Note: Call before start(). Subscriptions after start() won't work.
"""
def start () -> None :
"""Start monitoring in background thread."""
def stop () -> None :
"""Stop monitoring and cleanup."""
def wait_for_message (
topic : str | None = None ,
predicate : Callable[[Any], bool ] | None = None ,
timeout : float = 30.0
) -> ZmqMessage | None :
"""
Wait for a message matching criteria.
Args:
topic: Optional topic to match
predicate: Optional function to test message payload
timeout: Timeout in seconds
Returns:
Matching message or None if timeout
"""
def get_messages ( topic : str | None = None ) -> list[ZmqMessage]:
"""Get all captured messages, optionally filtered by topic."""
def clear_messages () -> None :
"""Clear all captured messages."""
ZmqMessage Structure
@dataclass
class ZmqMessage :
topic: str # Message topic (e.g., "/hello/message")
payload: Any # Parsed JSON payload or raw bytes
timestamp: datetime # When message was received
raw_data: bytes # Original raw message data
Advanced ZMQ Examples
Wait for Message with Specific Content
def test_counter_increments ( self , zmq_monitor ):
"""Wait for a message where counter exceeds threshold."""
msg = zmq_monitor.wait_for_message(
topic = "/counter/output" ,
predicate = lambda payload : payload.get( "count" , 0 ) > 10 ,
timeout = 60.0
)
assert msg is not None , "Never received counter > 10"
assert msg.payload[ "count" ] > 10
Verify Message Rate
def test_message_rate ( self , zmq_monitor ):
"""Verify module publishes at expected rate."""
import time
# Wait for messages to accumulate
time.sleep( 10 )
messages = zmq_monitor.get_messages( topic = "/sensor/data" )
# Expect roughly 10 messages per second (1 Hz module)
assert len (messages) >= 8 , f "Expected ~10 messages, got { len (messages) } "
assert len (messages) <= 12 , f "Too many messages: { len (messages) } "
Verify Monotonic Counter
def test_counters_are_monotonic ( self , zmq_monitor ):
"""Verify counter values never decrease."""
import time
time.sleep( 5 )
messages = zmq_monitor.get_messages( topic = "/counter/output" )
counters = [
msg.payload[ "count" ]
for msg in messages
if isinstance (msg.payload, dict ) and "count" in msg.payload
]
for i in range ( 1 , len (counters)):
assert counters[i] >= counters[i - 1 ], \
f "Counter decreased: { counters[i - 1 ] } -> { counters[i] } "
Log File Testing
For modules that write to log files, you can parse and assert on log contents:
import re
from pathlib import Path
class TestModuleLogs :
def test_module_startup_logged ( self ):
"""Verify module logs successful startup."""
log_file = Path( "/var/roboticks/logs/modules/my-module.log" )
assert log_file.exists(), "Module log file not created"
content = log_file.read_text()
assert "Module initialized successfully" in content
def test_no_errors_in_log ( self ):
"""Verify no ERROR level messages in module log."""
log_file = Path( "/var/roboticks/logs/modules/my-module.log" )
content = log_file.read_text()
error_pattern = re.compile( r " \[ ERROR \] " )
errors = error_pattern.findall(content)
assert len (errors) == 0 , f "Found { len (errors) } errors in log"
Infrastructure Tests
Test device registration and session creation:
import re
from pathlib import Path
class TestInfrastructure :
def test_device_registered ( self ):
"""Verify device registration completed."""
dsn_file = Path( "/etc/roboticks/device_id" )
assert dsn_file.exists(), "Device ID file not created"
dsn = dsn_file.read_text().strip()
assert dsn.startswith( "device-" ), f "Unexpected DSN format: { dsn } "
def test_session_created ( self ):
"""Verify session was created with valid UUID."""
session_file = Path( "/var/roboticks/sessions/.session_id" )
assert session_file.exists(), "Session ID file not created"
session_id = session_file.read_text().strip()
uuid_pattern = re.compile(
r " ^ [ 0-9a-f ] {8} - [ 0-9a-f ] {4} - [ 0-9a-f ] {4} - [ 0-9a-f ] {4} - [ 0-9a-f ] {12} $ " ,
re. IGNORECASE
)
assert uuid_pattern.match(session_id), \
f "Session ID is not a valid UUID: { session_id } "
Progress Reporting
The pytest_roboticks_progress plugin automatically reports test progress to the Roboticks backend. It’s enabled when these environment variables are set:
ROBOTICKS_TEST_JOB_ID - Test job ID
ROBOTICKS_API_URL - Backend API URL
How It Works
The plugin hooks into pytest’s lifecycle:
Reports each test as “running” when it starts
Reports “passed”, “failed”, or “skipped” when it completes
Includes duration, error messages, and captured output
Viewing Progress
Test progress appears in real-time on the Roboticks dashboard under your test job.
Complete Example
Here’s a complete test file demonstrating multiple testing approaches:
"""
HelloWorld Module Tests
Tests the HelloWorldModule functionality using ZMQ message monitoring.
"""
import os
import re
import sys
import time
from pathlib import Path
import pytest
# Add test framework to path
test_framework_path = Path( "/opt/roboticks/test-framework" )
if test_framework_path.exists():
sys.path.insert( 0 , str (test_framework_path))
try :
from zmq_monitor import ZmqMonitor, ZmqMessage
ZMQ_AVAILABLE = True
except ImportError :
ZMQ_AVAILABLE = False
ZmqMonitor = None
# Configuration
ZMQ_ENDPOINT = os.environ.get( "ZMQ_TEST_ENDPOINT" , "tcp://localhost:5556" )
HELLO_TOPIC = "/hello/message"
class TestHelloWorldZmq :
"""Test HelloWorld module using ZMQ monitoring."""
@pytest.fixture
def zmq_monitor ( self ):
if not ZMQ_AVAILABLE :
pytest.skip( "ZMQ monitor not available" )
monitor = ZmqMonitor()
monitor.subscribe( ZMQ_ENDPOINT , HELLO_TOPIC )
monitor.start()
time.sleep( 0.5 ) # Allow connection to establish
yield monitor
monitor.stop()
@pytest.mark.skipif ( not ZMQ_AVAILABLE , reason = "ZMQ not available" )
def test_publishes_messages ( self , zmq_monitor ):
"""Verify module publishes messages."""
msg = zmq_monitor.wait_for_message(
topic = HELLO_TOPIC ,
timeout = 30.0
)
assert msg is not None
assert msg.topic == HELLO_TOPIC
@pytest.mark.skipif ( not ZMQ_AVAILABLE , reason = "ZMQ not available" )
def test_message_structure ( self , zmq_monitor ):
"""Verify message has expected fields."""
msg = zmq_monitor.wait_for_message(
topic = HELLO_TOPIC ,
timeout = 30.0
)
assert msg is not None
payload = msg.payload
if isinstance (payload, dict ):
assert "message" in payload or "data" in payload
@pytest.mark.skipif ( not ZMQ_AVAILABLE , reason = "ZMQ not available" )
def test_receives_multiple_messages ( self , zmq_monitor ):
"""Verify we receive multiple messages over time."""
time.sleep( 10 )
messages = zmq_monitor.get_messages( topic = HELLO_TOPIC )
assert len (messages) >= 5 , \
f "Expected at least 5 messages, got { len (messages) } "
class TestHelloWorldInfrastructure :
"""Test infrastructure (always runs)."""
def test_session_created ( self ):
"""Verify session file exists with valid UUID."""
session_file = Path( "/var/roboticks/sessions/.session_id" )
assert session_file.exists()
content = session_file.read_text().strip()
uuid_re = re.compile(
r " ^ [ 0-9a-f ] {8} - [ 0-9a-f ] {4} - [ 0-9a-f ] {4} - [ 0-9a-f ] {4} - [ 0-9a-f ] {12} $ " ,
re. IGNORECASE
)
assert uuid_re.match(content)
def test_device_registered ( self ):
"""Verify device registration completed."""
dsn_file = Path( "/etc/roboticks/device_id" )
assert dsn_file.exists()
dsn = dsn_file.read_text().strip()
assert dsn.startswith( "device-" )
if __name__ == "__main__" :
pytest.main([ __file__ , "-v" ])
Best Practices
Use Fixtures for Setup/Teardown
Always use pytest fixtures for ZMQ monitors to ensure proper cleanup: @pytest.fixture
def zmq_monitor ( self ):
monitor = ZmqMonitor()
monitor.subscribe( ... )
monitor.start()
yield monitor
monitor.stop() # Always called, even on test failure
Handle Missing Dependencies Gracefully
Check for optional dependencies and skip tests gracefully: try :
from zmq_monitor import ZmqMonitor
ZMQ_AVAILABLE = True
except ImportError :
ZMQ_AVAILABLE = False
@pytest.mark.skipif ( not ZMQ_AVAILABLE , reason = "ZMQ not available" )
def test_zmq_messages ( self ):
...
Set timeouts based on expected module behavior:
Fast modules (>1 Hz): 10-30 second timeouts
Slow modules (<0.1 Hz): 60-120 second timeouts
Initialization-dependent: Allow extra time for startup
Clear Messages Between Tests
If running multiple ZMQ tests, clear messages to avoid interference: def test_first ( self , zmq_monitor ):
msg = zmq_monitor.wait_for_message( ... )
zmq_monitor.clear_messages() # Clear for next test
Next Steps