Python Examples
Worked Python examples live in examples/python/
in the source tree. Each example is a standalone script that runs against any
other DDS implementation on the same domain.
Hello World
A reliable string publisher / subscriber pair on topic HelloWorldTopic.
Covered in full in the Quick Start.
| File | Role |
|---|---|
examples/python/hello_world/hello_publisher.py |
Publishes 10 string samples, then exits. |
examples/python/hello_world/hello_subscriber.py |
WaitSet-driven receive loop with Ctrl-C shutdown. |
Run it:
# Terminal A
python examples/python/hello_world/hello_subscriber.py
# Terminal B
python examples/python/hello_world/hello_publisher.py
Sensor (multi-field CDR)
A Python port of examples/cxx/sensor/. Publishes a SensorReading
struct at 2 Hz on topic TemperatureSensor.
| File | Role |
|---|---|
examples/python/sensor/sensor_publisher.py |
Streams synthetic temperature + sine + triangle channels. |
examples/python/sensor/sensor_subscriber.py |
Decodes and prints every sample. |
The wire layout (SensorReading):
[0x00 0x01 0x00 0x00] XCDR1 little-endian encapsulation header
[uint32 LE] sensor_id
[float32 LE] temperature_c
[float32 LE] value_sine
[float32 LE] value_triangle
[uint32 LE] sequence
Encode in Python:
import struct
def encode_sensor_reading(sensor_id, temperature_c, value_sine, value_triangle, sequence):
header = b"\x00\x01\x00\x00"
fields = struct.pack(
"<I f f f I",
sensor_id, temperature_c, value_sine, value_triangle, sequence,
)
return header + fields
Decode on the receive side:
import struct
def decode_sensor_reading(raw: bytes):
sensor_id, temperature_c, value_sine, value_triangle, sequence = struct.unpack_from(
"<I f f f I", raw, 4
)
return sensor_id, temperature_c, value_sine, value_triangle, sequence
Run it:
# Terminal A
python examples/python/sensor/sensor_subscriber.py
# Terminal B
python examples/python/sensor/sensor_publisher.py
QoS — overriding common policies
The QoS classes are plain Python objects with read/write attributes. Mutate
them, then pass to the relevant create_* call:
import astutedds as dds
# DataWriter QoS — reliable, keep-last 100, 50 ms latency budget.
wqos = dds.DataWriterQos()
wqos.reliability.kind = dds.ReliabilityKind.RELIABLE
wqos.history.kind = dds.HistoryKind.KEEP_LAST
wqos.history.depth = 100
wqos.latency_budget.duration = dds.Duration.from_seconds(0.05)
wqos.durability.kind = dds.DurabilityKind.TRANSIENT_LOCAL
# DataReader QoS — match the writer for compatibility.
rqos = dds.DataReaderQos()
rqos.reliability.kind = dds.ReliabilityKind.RELIABLE
rqos.history.kind = dds.HistoryKind.KEEP_LAST
rqos.history.depth = 100
rqos.durability.kind = dds.DurabilityKind.TRANSIENT_LOCAL
WaitSet vs polling
For most real applications prefer the event-driven WaitSet over polling. The Hello World subscriber demonstrates the canonical pattern:
waitset = dds.WaitSet()
read_condition = dds.ReadCondition(reader)
guard = dds.GuardCondition()
waitset.attach(read_condition)
waitset.attach(guard)
while running:
triggered = waitset.wait(timeout_secs=5.0)
for cond in triggered or []:
if cond is guard:
running = False
break
if cond is read_condition:
for sample in reader.take():
handle(sample.data)
A GuardCondition is the idiomatic way to wake the WaitSet from a signal
handler or worker thread.
Listeners and per-sample callbacks
The listener trampoline classes (DataReaderListener, DataWriterListener,
SubscriberListener, PublisherListener, TopicListener,
DomainParticipantListener) can be subclassed in Python. For the common
"data arrived" path, a DataReader also exposes a lightweight per-sample
callback via set_data_callback:
reader = subscriber.create_datareader(topic, qos)
def on_sample(sample):
print(f"got {len(sample.data)} bytes, valid={sample.info.valid_data}")
reader.set_data_callback(on_sample)
For structured event handling, subclass a listener and route the events yourself:
class MyReaderListener(dds.DataReaderListener):
def on_data_available(self, reader):
for sample in reader.take():
print(f"got {len(sample.data)} bytes")
def on_subscription_matched(self, reader, status):
print(f"matched writers: total={status.total_count}, "
f"current={status.current_count}")
listener = MyReaderListener()
reader.set_data_callback(lambda sample: listener.on_data_available(reader))
Note
Attaching listeners directly through create_datareader(..., listener)
is not yet exposed in the bindings — use set_data_callback (or the
WaitSet pattern above) to drive your application loop today.
Multi-domain isolation
Each DomainParticipant is bound to a numeric DDS domain ID. Two scripts on
different IDs never see each other:
python hello_subscriber.py 42 # domain 42 only
python hello_publisher.py 42 # publishes to domain 42 only
Interop with other DDS vendors
The Python bindings are wire-interoperable with any RTPS 2.5 implementation that respects XCDR1/XCDR2 encapsulation — including RTI Connext, eProsima Fast DDS, Eclipse Cyclone DDS, and OpenDDS. Just match the topic name, type name, and QoS profile on both sides.