Skip to main content

Examples

Four end-to-end examples. Each one is self-contained, traces back to a requirement, and exercises a different SDK capability.
ExampleCapability shown
1. E-stop deadline test@confirms, @deadline, rclpy assertion helpers
2. Navigation drift test@requires_sim, assert_tf_transform, MCAP capture
3. Sensor-loss fault injectionfault_injection.drop_messages, mixed assertions
4. Multi-node launch testlaunch_testing, post-shutdown assertions

1. E-stop deadline test

RequirementREQ-001: E-stop halts motion within 100 ms. This is the minimal interesting test. One decorator, one deadline, one rclpy helper.
# tests/test_estop.py
import pytest
import rclpy
from std_msgs.msg import Bool
from roboticks import confirms, deadline
from roboticks.assertions import assert_topic_published


@pytest.fixture(scope="session", autouse=True)
def _ros():
    rclpy.init()
    yield
    rclpy.shutdown()


@confirms("REQ-001")
@deadline(milliseconds=100)
def test_estop_halts_motion_within_100ms(robot_under_test):
    """When the E-stop input is asserted, /safety/estop_engaged
    publishes True within 100 ms."""
    robot_under_test.trigger_estop()
    msg = assert_topic_published(
        topic="/safety/estop_engaged",
        msg_type=Bool,
        within=0.1,
        predicate=lambda m: m.data is True,
    )
    assert msg.data is True
What runs at upload:
<testcase name="test_estop_halts_motion_within_100ms" time="0.062">
  <properties>
    <property name="roboticks.confirms" value="REQ-001"/>
    <property name="roboticks.deadline_ms" value="100"/>
  </properties>
</testcase>
The platform closes the diagonal: REQ-001 → tests.test_estop::test_estop_halts_motion_within_100ms → passed.

2. Navigation drift test

RequirementREQ-031: Robot reaches the goal pose within 0.5 m of target after 30 s in the warehouse_loop world. This test needs a 3D physics world (Gazebo) and ends with a TF assertion. We record an MCAP for failure forensics.
# tests/sim/test_nav_drift.py
import time
import pytest
import rclpy
from geometry_msgs.msg import PoseStamped
from roboticks import confirms, requires_sim, mcap_capture
from roboticks.assertions import assert_tf_transform


@pytest.fixture(scope="session", autouse=True)
def _ros():
    rclpy.init()
    yield
    rclpy.shutdown()


@confirms("REQ-031")
@requires_sim("gazebo", gpu=True)
def test_robot_reaches_goal_within_tolerance(nav_stack, goal_publisher):
    """In warehouse_loop, the robot reaches (2.0, 1.0) within 0.5 m
    after at most 30 seconds."""

    with mcap_capture(
        topics=["/tf", "/cmd_vel", "/goal_pose", "/scan"],
        upload_on="failure",
    ):
        goal = PoseStamped()
        goal.header.frame_id = "map"
        goal.pose.position.x = 2.0
        goal.pose.position.y = 1.0
        goal_publisher.publish(goal)

        # Allow the stack 30 s to plan + execute
        time.sleep(30.0)

        tf = assert_tf_transform(
            source_frame="map",
            target_frame="base_link",
            within=2.0,
            translation=(2.0, 1.0, 0.0),
            tol=0.5,
        )

    assert tf is not None
The scheduler routes this test to the hosted-gazebo-gpu pool because of @requires_sim("gazebo", gpu=True). If you have a self-hosted GPU runner labelled gpu=true, it runs there for free instead.

3. Sensor-loss fault injection

RequirementsREQ-061: Robot decelerates gracefully on LIDAR loss. REQ-062: Robot publishes a warning on LIDAR degradation. This test confirms two requirements with one scenario. We drop 80% of /scan messages and assert that the robot slows (not stops, not crashes) and emits a warning.
# tests/test_sensor_loss.py
import pytest
import rclpy
from geometry_msgs.msg import Twist
from std_msgs.msg import String
from roboticks import confirms, deadline
from roboticks.assertions import assert_topic_published
from roboticks.fault_injection import drop_messages


@pytest.fixture(scope="session", autouse=True)
def _ros():
    rclpy.init()
    yield
    rclpy.shutdown()


@confirms("REQ-061", "REQ-062")
@deadline(milliseconds=10_000)
def test_robot_degrades_gracefully_on_lidar_loss(robot_under_test):
    """With 80% of /scan dropped, /cmd_vel.linear.x drops below 0.3 m/s
    and /diagnostics emits a degraded-LIDAR warning."""

    robot_under_test.command_velocity(1.0)  # full speed initially

    with drop_messages("/scan", rate=0.8):
        # REQ-061: velocity drops
        cmd = assert_topic_published(
            "/cmd_vel", Twist,
            within=3.0,
            predicate=lambda m: 0.0 < m.linear.x < 0.3,
        )
        assert cmd.linear.x < 0.3

        # REQ-062: warning emitted
        warn = assert_topic_published(
            "/diagnostics", String,
            within=3.0,
            predicate=lambda m: "lidar" in m.data.lower(),
        )
        assert "lidar" in warn.data.lower()
The @confirms("REQ-061", "REQ-062") lights up both rows of the matrix from the same test.

4. Multi-node launch test

RequirementREQ-098: When nav2_planner is killed, the watchdog restarts it within 5 s and re-emits a "configure" transition event. This needs the planner running as a real separate process, plus a watchdog, plus an observer. Launch testing is the right tool.
# tests/system/test_watchdog_respawn.py
import unittest
import pytest
import launch_testing.actions
import rclpy
from lifecycle_msgs.msg import TransitionEvent
from roboticks import confirms
from roboticks.fault_injection import kill_node
from roboticks.launch_testing import make_node_action, generate_test_description


@pytest.mark.launch_test
def generate_test_description_fn():
    planner = make_node_action(
        package="nav2_planner",
        executable="planner_server",
    )
    watchdog = make_node_action(
        package="my_safety",
        executable="lifecycle_watchdog",
    )
    return generate_test_description(
        planner,
        watchdog,
        launch_testing.actions.ReadyToTest(),
    )


@confirms("REQ-098")
class TestWatchdogRespawn(unittest.TestCase):
    @classmethod
    def setUpClass(cls):
        rclpy.init()
        cls.node = rclpy.create_node("test_observer")
        cls.events: list[TransitionEvent] = []
        cls.sub = cls.node.create_subscription(
            TransitionEvent, "/nav2_planner/transition_event",
            lambda e: cls.events.append(e), 10,
        )

    @classmethod
    def tearDownClass(cls):
        cls.node.destroy_node()
        rclpy.shutdown()

    def test_planner_respawned_within_5s(self):
        with kill_node("/nav2_planner") as killed:
            deadline = self.node.get_clock().now().nanoseconds + 5_000_000_000
            while self.node.get_clock().now().nanoseconds < deadline:
                rclpy.spin_once(self.node, timeout_sec=0.1)
                if any(e.transition.label == "configure" for e in self.events):
                    break
            self.assertTrue(killed.respawned,
                "watchdog did not respawn the planner")
            self.assertTrue(
                any(e.transition.label == "configure" for e in self.events),
                "no configure transition observed within 5 s",
            )

Patterns common to all four

  • Session-scoped rclpy.init() / rclpy.shutdown(). Put it in conftest.py once.
  • One @confirms(...) per intentional traceability link. Avoid spraying requirement IDs across unrelated tests.
  • MCAP capture only when the failure forensics matter. It’s cheap, but not free; default to upload_on="failure".
  • Sim only when sim is the only way. A test that needs Gazebo is a test that needs a GPU and a sim minute — pay for it deliberately.

Where to copy from

The roboticks-sdk repo has these examples (and more) as runnable code:
git clone https://github.com/roboticks-io/roboticks-sdk.git
cd roboticks-sdk/examples
pytest -v

Next

Decorators

The decorators these examples lean on.

Assertions

The rclpy helpers the examples invoke.

Fault injection

The primitives example 3 uses.

Launch testing

The system-test pattern example 4 follows.