Python Quick Start

A minimal publisher and subscriber in under 30 lines each, using raw CDR-encoded bytes on the wire.

Wire format

The current bindings exchange CDR-encoded payloads as bytes. The examples below show the standard XCDR2 string encapsulation (0x00 0x01 0x00 0x00 header + uint32 length + UTF-8 bytes). Typed marshalling driven by IDL is on the roadmap.

1. Install

pip install astutedds

See Installation for venv, source, and .deb/.rpm options.

2. Subscriber (run first)

Save as hello_subscriber.py:

#!/usr/bin/env python3
import signal, struct, sys
import astutedds as dds

def decode_string(raw: bytes) -> str:
    length = struct.unpack_from("<I", raw, 4)[0]
    return raw[8 : 8 + length - 1].decode("utf-8")

def main() -> int:
    domain_id = int(sys.argv[1]) if len(sys.argv) > 1 else 0

    factory     = dds.DomainParticipantFactory.get_instance()
    participant = factory.create_participant(domain_id)
    participant.enable()

    topic      = participant.create_topic("HelloWorldTopic", "String")
    subscriber = participant.create_subscriber()

    qos = dds.DataReaderQos()
    qos.reliability.kind = dds.ReliabilityKind.RELIABLE
    qos.history.kind     = dds.HistoryKind.KEEP_LAST
    qos.history.depth    = 100
    reader = subscriber.create_datareader(topic, qos)

    waitset        = dds.WaitSet()
    read_condition = dds.ReadCondition(reader)
    guard          = dds.GuardCondition()
    waitset.attach(read_condition)
    waitset.attach(guard)

    signal.signal(signal.SIGINT, lambda *_: guard.set_trigger_value(True))

    print("Waiting for data (Ctrl-C to stop)...")
    while True:
        for cond in waitset.wait(timeout_secs=5.0) or []:
            if cond is guard:
                return 0
            for sample in reader.take():
                print(f"  [SUB] {decode_string(sample.data)!r}")

if __name__ == "__main__":
    sys.exit(main())

Run it:

python hello_subscriber.py

3. Publisher

Save as hello_publisher.py:

#!/usr/bin/env python3
import struct, sys, time
import astutedds as dds

def encode_string(s: str) -> bytes:
    payload = s.encode("utf-8") + b"\x00"
    return b"\x00\x01\x00\x00" + struct.pack("<I", len(payload)) + payload

def main() -> int:
    domain_id   = int(sys.argv[1]) if len(sys.argv) > 1 else 0
    num_samples = 10

    factory     = dds.DomainParticipantFactory.get_instance()
    participant = factory.create_participant(domain_id)
    participant.enable()

    topic     = participant.create_topic("HelloWorldTopic", "String")
    publisher = participant.create_publisher()

    qos = dds.DataWriterQos()
    qos.reliability.kind = dds.ReliabilityKind.RELIABLE
    qos.history.kind     = dds.HistoryKind.KEEP_LAST
    qos.history.depth    = num_samples
    writer = publisher.create_datawriter(topic, qos)

    time.sleep(1.0)   # allow SPDP/SEDP discovery to match endpoints
    for i in range(1, num_samples + 1):
        msg = f"Hello, World! [{i}/{num_samples}]"
        writer.write(encode_string(msg))
        print(f"  [PUB] {msg!r}")
        time.sleep(0.5)

    participant.stop()
    factory.delete_participant(participant)
    return 0

if __name__ == "__main__":
    sys.exit(main())

Run it in a second terminal:

python hello_publisher.py

4. Expected output

Subscriber:

Waiting for data (Ctrl-C to stop)...
  [SUB] 'Hello, World! [1/10]'
  [SUB] 'Hello, World! [2/10]'
  ...

Publisher:

  [PUB] 'Hello, World! [1/10]'
  [PUB] 'Hello, World! [2/10]'
  ...

What just happened?

  1. Discoveryparticipant.enable() starts SPDP. Once both sides see each other (~1 s), SEDP exchanges endpoint descriptions and the reader and writer match.
  2. QoS matching — both sides use RELIABLE + KEEP_LAST so reliable delivery kicks in automatically.
  3. Wire formatwriter.write(bytes) hands a CDR-encoded payload to the RTPS wire; reader.take() returns ReceivedSample objects whose .data is the raw CDR payload.
  4. WaitSet — the subscriber is event-driven: WaitSet.wait() blocks until either a sample arrives (via ReadCondition) or the user hits Ctrl-C (via GuardCondition).

Next steps

  • Examples — Hello World and a sensor publisher/subscriber with multi-field CDR encoding.
  • API Surface — what's exported from the astutedds package.