Skip to main content

Documentation Index

Fetch the complete documentation index at: https://docs.roboticks.io/llms.txt

Use this file to discover all available pages before exploring further.

Assertions

roboticks.assertions ships five rclpy-aware assertion helpers. Each one spins a node briefly, evaluates a predicate, and raises AssertionError (or TimeoutError) with an informative message on failure.
from roboticks.assertions import (
    assert_topic_published,
    assert_service_response,
    assert_action_result,
    assert_param_equals,
    assert_tf_transform,
)
rclpy-guarded. Importing roboticks.assertions on a host without rclpy installed raises a clear RuntimeError pointing you at the ROS2 install docs. Decorators in roboticks itself are not guarded — you can import them anywhere.

assert_topic_published

Wait until a message matching the predicate is published on topic.

Signature

def assert_topic_published(
    topic: str,
    msg_type: type,
    *,
    within: float = 5.0,
    predicate: Callable[[Any], bool] | None = None,
    qos: rclpy.qos.QoSProfile | None = None,
    node: rclpy.node.Node | None = None,
) -> Any: ...
ParameterTypeDefaultDescription
topicstrFully-qualified topic (e.g. /cmd_vel).
msg_typemessage classThe ROS2 message class to subscribe with.
withinfloat5.0Wall-clock seconds to wait.
predicateCallable[[msg], bool]accept-firstFilter — wait for the first message that returns True.
qosQoSProfilerclpy defaultSubscription QoS.
nodeNodeephemeralUse an existing node instead of spinning a one-shot subscriber.

Raises

  • TimeoutError — no matching message in within seconds.
  • RuntimeError — rclpy not initialised, topic type mismatch.

Returns

The first message matching the predicate, as a deserialised Python object of msg_type.

Example

from geometry_msgs.msg import Twist
from roboticks.assertions import assert_topic_published

msg = assert_topic_published(
    "/cmd_vel", Twist,
    within=3.0,
    predicate=lambda m: m.linear.x > 0.1,
)
assert msg.linear.x > 0.1

assert_service_response

Send a request, wait for the response.

Signature

def assert_service_response(
    service: str,
    srv_type: type,
    request: Any,
    *,
    within: float = 5.0,
    node: rclpy.node.Node | None = None,
) -> Any: ...

Raises

  • TimeoutError — service unavailable or response not received in within seconds.
  • RuntimeError — rclpy not initialised, service type mismatch.

Returns

The deserialised response object.

Example

from example_interfaces.srv import AddTwoInts
from roboticks.assertions import assert_service_response

response = assert_service_response(
    "/add_two_ints", AddTwoInts,
    AddTwoInts.Request(a=2, b=3),
    within=2.0,
)
assert response.sum == 5

assert_action_result

Send a goal, wait for the terminal result (success or failure).

Signature

def assert_action_result(
    action: str,
    action_type: type,
    goal: Any,
    *,
    within: float = 30.0,
    feedback_cb: Callable[[Any], None] | None = None,
    node: rclpy.node.Node | None = None,
) -> ClientGoalHandle: ...
ParameterDescription
actionAction name (e.g. /navigate_to_pose).
action_typeThe ROS2 action class.
goalA <ActionType>.Goal() instance.
withinSeconds to wait for a terminal result.
feedback_cbOptional callback fired on each feedback message.

Raises

  • TimeoutError — no terminal result in within seconds.
  • AssertionError — goal rejected by action server.
  • RuntimeError — rclpy not initialised.

Returns

The completed ClientGoalHandle. Access .result() for the action-specific result.

Example

from nav2_msgs.action import NavigateToPose
from roboticks.assertions import assert_action_result

goal = NavigateToPose.Goal()
goal.pose.pose.position.x = 2.0

handle = assert_action_result(
    "/navigate_to_pose", NavigateToPose,
    goal, within=45.0,
)
assert handle.result().result.error_code == 0

assert_param_equals

Read a parameter from a node and assert its value.

Signature

def assert_param_equals(
    node_name: str,
    param: str,
    expected: Any,
    *,
    within: float = 2.0,
    node: rclpy.node.Node | None = None,
) -> None: ...
ParameterDescription
node_nameFully-qualified node name (e.g. /nav2_planner).
paramParameter name.
expectedExpected value. Comparison is ==.
withinSeconds to wait for the param service to be available.

Raises

  • TimeoutError — parameter service unavailable.
  • AssertionError — parameter value doesn’t match expected.
  • RuntimeError — node doesn’t exist, parameter not declared.

Example

from roboticks.assertions import assert_param_equals

assert_param_equals("/nav2_planner", "use_sim_time", True)
assert_param_equals("/nav2_planner", "plugin_lib_names", ["GridBased"])

assert_tf_transform

Wait until a TF transform from source_frame to target_frame is available, then assert on its components.

Signature

def assert_tf_transform(
    source_frame: str,
    target_frame: str,
    *,
    within: float = 5.0,
    translation: tuple[float, float, float] | None = None,
    rotation: tuple[float, float, float, float] | None = None,
    tol: float = 0.01,
    node: rclpy.node.Node | None = None,
) -> TransformStamped: ...
ParameterDescription
source_frameSource TF frame ID.
target_frameTarget TF frame ID.
withinSeconds to wait for the transform.
translationOptional expected (x, y, z) — asserted within tol.
rotationOptional expected quaternion (x, y, z, w) — asserted within tol.
tolTolerance for translation/rotation comparison.

Raises

  • TimeoutError — TF transform not available.
  • AssertionError — translation/rotation deviates beyond tol.

Returns

The geometry_msgs/msg/TransformStamped object.

Example

from roboticks.assertions import assert_tf_transform

tf = assert_tf_transform(
    "map", "base_link",
    within=5.0,
    translation=(0.0, 0.0, 0.0),
    tol=0.05,
)
print(tf.header.stamp)

Shared semantics

ConcernBehaviour
Node reuseAll helpers accept node= to share a long-lived node. Defaults to an ephemeral node created per-call.
ThreadingEach helper internally spins on a SingleThreadedExecutor for the duration of the wait. Safe to call from pytest’s default sync runner.
rclpy.initCaller must have called rclpy.init() (typical in a session-scoped fixture).
CancellationTest framework SIGINT (Ctrl-C) cleanly aborts the wait.
LoggingEach helper logs to the roboticks.assertions logger at DEBUG.

Next

Fault injection

Pair assertions with fault injection for negative tests.

Writing tests in pytest

Worked examples that put these helpers in context.

MCAP capture

Record bag files when assertions fail.

Launch testing

System-test patterns that use these helpers across processes.