Skip to main content

Documentation Index

Fetch the complete documentation index at: https://docs.roboticks.io/llms.txt

Use this file to discover all available pages before exploring further.

Fault injection reference

roboticks.fault_injection ships four context managers that manipulate the rclpy DDS layer for the duration of a with block. For the task-oriented introduction with worked examples, see Testing → Fault injection; this page is the API reference.
from roboticks.fault_injection import (
    drop_messages,
    delay_messages,
    kill_node,
    corrupt_topic,
)

drop_messages

Probabilistically drop messages on a topic.

Signature

def drop_messages(
    topic: str,
    *,
    rate: float,
    seed: int | None = None,
) -> ContextManager[None]: ...
ParameterTypeConstraint
topicstrFully-qualified topic name.
ratefloat0.0 (drop nothing) to 1.0 (drop everything).
seedint | NoneOptional RNG seed for reproducible drops.

Behaviour

  • Installs an interceptor on the topic at context entry; uninstalls at exit.
  • Drops are evaluated per message, not per second. A rate of 0.5 drops roughly half the messages.
  • If multiple drop_messages blocks nest on the same topic, the product of rates applies (composition is multiplicative).
  • Out-of-range rate raises ValueError at entry.

Example

with drop_messages("/scan", rate=0.3):
    # 30% of /scan messages are dropped during this block
    run_test_body()

delay_messages

Add a fixed latency to every message on a topic.

Signature

def delay_messages(
    topic: str,
    *,
    ms: int,
    jitter_ms: int = 0,
) -> ContextManager[None]: ...
ParameterTypeConstraint
topicstrFully-qualified topic name.
msintBase latency in milliseconds. Must be ≥ 0.
jitter_msintUniform jitter added to each message (±jitter_ms / 2). Defaults to 0.

Behaviour

  • Each intercepted message is held for ms ± jitter_ms/2 before being released to subscribers.
  • Holding is implemented as a threading.Timer per message. High-rate topics with high ms will hold many in-flight messages; budget memory accordingly.
  • Order is preserved (per-topic FIFO).

Example

with delay_messages("/scan", ms=300, jitter_ms=50):
    # Each /scan message held 275-325 ms
    run_test_body()

kill_node

SIGTERM a named node and assert it stays down for the duration of the block.

Signature

def kill_node(
    name: str,
    *,
    signal: int = signal.SIGTERM,
    wait_for_exit: float = 2.0,
) -> ContextManager[NodeKilledHandle]: ...
ParameterTypeConstraint
namestrFully-qualified node name (e.g. /nav2_planner).
signalintSignal to send. Defaults to SIGTERM.
wait_for_exitfloatSeconds to wait for the process to actually exit.

Returns (as context value)

A NodeKilledHandle with:
AttributeDescription
pidPID of the killed process.
started_atTimestamp of the kill.
respawnedTrue if a watchdog/respawn brought the node back during the block.

Raises

  • LookupError — no node with that name visible on the ROS graph at entry.
  • RuntimeError — process refused to exit within wait_for_exit.

Example

with kill_node("/nav2_planner") as killed:
    # planner is down here
    time.sleep(3.0)
    assert killed.respawned, "watchdog did not bring the planner back"

corrupt_topic

Apply a mutator function to every message on a topic.

Signature

def corrupt_topic(
    topic: str,
    *,
    mutator: Callable[[Any], Any],
) -> ContextManager[None]: ...
ParameterTypeConstraint
topicstrFully-qualified topic name.
mutatorCallable[[msg], msg]Receives a deserialised message; must return one of the same type.

Behaviour

  • The mutator runs between the publisher and subscribers — every subscriber sees the mutated value.
  • Mutator exceptions propagate up to the publishing thread and abort the test. Catch inside the mutator if you want resilient behaviour.
  • The mutator may mutate in place and return the same object, or return a fresh one.

Example

from sensor_msgs.msg import LaserScan

def clamp_max(msg: LaserScan) -> LaserScan:
    msg.ranges = [min(r, 1.0) for r in msg.ranges]
    return msg

with corrupt_topic("/scan", mutator=clamp_max):
    # planner sees a LIDAR that never reads beyond 1 m
    run_test_body()

Composition

All four context managers compose with each other and with mcap_capture:
with mcap_capture(topics=["/scan", "/cmd_vel"]):
    with delay_messages("/scan", ms=200):
        with drop_messages("/scan", rate=0.05):
            run_test_body()
The order of nesting doesn’t change the observable result; the DDS-layer interceptor multiplexes per-topic state.

DDS implementation notes

  • Implemented via rclpy’s middleware plug-points — no DDS vendor extensions required.
  • Per-test isolated: the interceptor installs into the current process only; other processes in a launch_testing graph see the un-faulted topic.
  • For across-process fault injection (e.g. fault a topic between two separately-launched nodes), use launch_testing integration and inject from inside the test process that owns the topic.

Limits

LimitValueNotes
Concurrent drop/delay per topic8More than 8 nested blocks raise.
Concurrent corrupt per topic4More than 4 nested blocks raise.
Max delay per message60 sDefends against unbounded queues.
Max in-flight delayed messages per topic1024Excess is silently dropped with a logger warning.

Next

Fault injection tutorial

Task-oriented examples and decision tree.

Assertions

The natural pair with fault injection.

MCAP capture

Record the degraded signal for forensics.

Launch testing

Fault injection across a multi-process graph.