Python Quick Start
A minimal publisher and subscriber in under 30 lines each, using raw
CDR-encoded bytes on the wire.
Wire format
The current bindings exchange CDR-encoded payloads as bytes. The
examples below show the standard XCDR2 string encapsulation
(0x00 0x01 0x00 0x00 header + uint32 length + UTF-8 bytes).
Typed marshalling driven by IDL is on the roadmap.
1. Install
pip install astutedds
See Installation for venv, source, and .deb/.rpm
options.
2. Subscriber (run first)
Save as hello_subscriber.py:
#!/usr/bin/env python3
import signal, struct, sys
import astutedds as dds
def decode_string(raw: bytes) -> str:
length = struct.unpack_from("<I", raw, 4)[0]
return raw[8 : 8 + length - 1].decode("utf-8")
def main() -> int:
domain_id = int(sys.argv[1]) if len(sys.argv) > 1 else 0
factory = dds.DomainParticipantFactory.get_instance()
participant = factory.create_participant(domain_id)
participant.enable()
topic = participant.create_topic("HelloWorldTopic", "String")
subscriber = participant.create_subscriber()
qos = dds.DataReaderQos()
qos.reliability.kind = dds.ReliabilityKind.RELIABLE
qos.history.kind = dds.HistoryKind.KEEP_LAST
qos.history.depth = 100
reader = subscriber.create_datareader(topic, qos)
waitset = dds.WaitSet()
read_condition = dds.ReadCondition(reader)
guard = dds.GuardCondition()
waitset.attach(read_condition)
waitset.attach(guard)
signal.signal(signal.SIGINT, lambda *_: guard.set_trigger_value(True))
print("Waiting for data (Ctrl-C to stop)...")
while True:
for cond in waitset.wait(timeout_secs=5.0) or []:
if cond is guard:
return 0
for sample in reader.take():
print(f" [SUB] {decode_string(sample.data)!r}")
if __name__ == "__main__":
sys.exit(main())
Run it:
python hello_subscriber.py
3. Publisher
Save as hello_publisher.py:
#!/usr/bin/env python3
import struct, sys, time
import astutedds as dds
def encode_string(s: str) -> bytes:
payload = s.encode("utf-8") + b"\x00"
return b"\x00\x01\x00\x00" + struct.pack("<I", len(payload)) + payload
def main() -> int:
domain_id = int(sys.argv[1]) if len(sys.argv) > 1 else 0
num_samples = 10
factory = dds.DomainParticipantFactory.get_instance()
participant = factory.create_participant(domain_id)
participant.enable()
topic = participant.create_topic("HelloWorldTopic", "String")
publisher = participant.create_publisher()
qos = dds.DataWriterQos()
qos.reliability.kind = dds.ReliabilityKind.RELIABLE
qos.history.kind = dds.HistoryKind.KEEP_LAST
qos.history.depth = num_samples
writer = publisher.create_datawriter(topic, qos)
time.sleep(1.0) # allow SPDP/SEDP discovery to match endpoints
for i in range(1, num_samples + 1):
msg = f"Hello, World! [{i}/{num_samples}]"
writer.write(encode_string(msg))
print(f" [PUB] {msg!r}")
time.sleep(0.5)
participant.stop()
factory.delete_participant(participant)
return 0
if __name__ == "__main__":
sys.exit(main())
Run it in a second terminal:
python hello_publisher.py
4. Expected output
Subscriber:
Waiting for data (Ctrl-C to stop)...
[SUB] 'Hello, World! [1/10]'
[SUB] 'Hello, World! [2/10]'
...
Publisher:
[PUB] 'Hello, World! [1/10]'
[PUB] 'Hello, World! [2/10]'
...
What just happened?
- Discovery —
participant.enable()starts SPDP. Once both sides see each other (~1 s), SEDP exchanges endpoint descriptions and the reader and writer match. - QoS matching — both sides use
RELIABLE+KEEP_LASTso reliable delivery kicks in automatically. - Wire format —
writer.write(bytes)hands a CDR-encoded payload to the RTPS wire;reader.take()returnsReceivedSampleobjects whose.datais the raw CDR payload. - WaitSet — the subscriber is event-driven:
WaitSet.wait()blocks until either a sample arrives (viaReadCondition) or the user hits Ctrl-C (viaGuardCondition).
Next steps
- Examples — Hello World and a sensor publisher/subscriber with multi-field CDR encoding.
- API Surface — what's exported from the
astuteddspackage.