Python Examples

Worked Python examples live in examples/python/ in the source tree. Each example is a standalone script that runs against any other DDS implementation on the same domain.

Hello World

A reliable string publisher / subscriber pair on topic HelloWorldTopic. Covered in full in the Quick Start.

File Role
examples/python/hello_world/hello_publisher.py Publishes 10 string samples, then exits.
examples/python/hello_world/hello_subscriber.py WaitSet-driven receive loop with Ctrl-C shutdown.

Run it:

# Terminal A
python examples/python/hello_world/hello_subscriber.py

# Terminal B
python examples/python/hello_world/hello_publisher.py

Sensor (multi-field CDR)

A Python port of examples/cxx/sensor/. Publishes a SensorReading struct at 2 Hz on topic TemperatureSensor.

File Role
examples/python/sensor/sensor_publisher.py Streams synthetic temperature + sine + triangle channels.
examples/python/sensor/sensor_subscriber.py Decodes and prints every sample.

The wire layout (SensorReading):

[0x00 0x01 0x00 0x00]   XCDR1 little-endian encapsulation header
[uint32 LE]             sensor_id
[float32 LE]            temperature_c
[float32 LE]            value_sine
[float32 LE]            value_triangle
[uint32 LE]             sequence

Encode in Python:

import struct

def encode_sensor_reading(sensor_id, temperature_c, value_sine, value_triangle, sequence):
    header = b"\x00\x01\x00\x00"
    fields = struct.pack(
        "<I f f f I",
        sensor_id, temperature_c, value_sine, value_triangle, sequence,
    )
    return header + fields

Decode on the receive side:

import struct

def decode_sensor_reading(raw: bytes):
    sensor_id, temperature_c, value_sine, value_triangle, sequence = struct.unpack_from(
        "<I f f f I", raw, 4
    )
    return sensor_id, temperature_c, value_sine, value_triangle, sequence

Run it:

# Terminal A
python examples/python/sensor/sensor_subscriber.py

# Terminal B
python examples/python/sensor/sensor_publisher.py

QoS — overriding common policies

The QoS classes are plain Python objects with read/write attributes. Mutate them, then pass to the relevant create_* call:

import astutedds as dds

# DataWriter QoS — reliable, keep-last 100, 50 ms latency budget.
wqos = dds.DataWriterQos()
wqos.reliability.kind        = dds.ReliabilityKind.RELIABLE
wqos.history.kind            = dds.HistoryKind.KEEP_LAST
wqos.history.depth           = 100
wqos.latency_budget.duration = dds.Duration.from_seconds(0.05)
wqos.durability.kind         = dds.DurabilityKind.TRANSIENT_LOCAL

# DataReader QoS — match the writer for compatibility.
rqos = dds.DataReaderQos()
rqos.reliability.kind = dds.ReliabilityKind.RELIABLE
rqos.history.kind     = dds.HistoryKind.KEEP_LAST
rqos.history.depth    = 100
rqos.durability.kind  = dds.DurabilityKind.TRANSIENT_LOCAL

WaitSet vs polling

For most real applications prefer the event-driven WaitSet over polling. The Hello World subscriber demonstrates the canonical pattern:

waitset        = dds.WaitSet()
read_condition = dds.ReadCondition(reader)
guard          = dds.GuardCondition()
waitset.attach(read_condition)
waitset.attach(guard)

while running:
    triggered = waitset.wait(timeout_secs=5.0)
    for cond in triggered or []:
        if cond is guard:
            running = False
            break
        if cond is read_condition:
            for sample in reader.take():
                handle(sample.data)

A GuardCondition is the idiomatic way to wake the WaitSet from a signal handler or worker thread.

Listeners and per-sample callbacks

The listener trampoline classes (DataReaderListener, DataWriterListener, SubscriberListener, PublisherListener, TopicListener, DomainParticipantListener) can be subclassed in Python. For the common "data arrived" path, a DataReader also exposes a lightweight per-sample callback via set_data_callback:

reader = subscriber.create_datareader(topic, qos)

def on_sample(sample):
    print(f"got {len(sample.data)} bytes, valid={sample.info.valid_data}")

reader.set_data_callback(on_sample)

For structured event handling, subclass a listener and route the events yourself:

class MyReaderListener(dds.DataReaderListener):
    def on_data_available(self, reader):
        for sample in reader.take():
            print(f"got {len(sample.data)} bytes")

    def on_subscription_matched(self, reader, status):
        print(f"matched writers: total={status.total_count}, "
              f"current={status.current_count}")

listener = MyReaderListener()
reader.set_data_callback(lambda sample: listener.on_data_available(reader))

Note

Attaching listeners directly through create_datareader(..., listener) is not yet exposed in the bindings — use set_data_callback (or the WaitSet pattern above) to drive your application loop today.

Multi-domain isolation

Each DomainParticipant is bound to a numeric DDS domain ID. Two scripts on different IDs never see each other:

python hello_subscriber.py 42   # domain 42 only
python hello_publisher.py  42   # publishes to domain 42 only

Interop with other DDS vendors

The Python bindings are wire-interoperable with any RTPS 2.5 implementation that respects XCDR1/XCDR2 encapsulation — including RTI Connext, eProsima Fast DDS, Eclipse Cyclone DDS, and OpenDDS. Just match the topic name, type name, and QoS profile on both sides.