File endpoint_proxy.hpp

File List > astutedds > rtps > endpoint_proxy.hpp

Go to the documentation of this file


#ifndef ASTUTEDDS_RTPS_ENDPOINT_PROXY_HPP
#define ASTUTEDDS_RTPS_ENDPOINT_PROXY_HPP

#include <astutedds/rtps/rtps_types.hpp>
#include <astutedds/rtps/type_information_registry.hpp>
#include <astutedds/dcps/qos.hpp>
#include <string>
#include <vector>
#include <cstring>

namespace astutedds::rtps {

// PID constants for SEDP parameter list serialization
namespace PID {
    static constexpr uint16_t PAD = 0x0000;
    static constexpr uint16_t SENTINEL = 0x0001;
    static constexpr uint16_t TOPIC_NAME = 0x0005;
    static constexpr uint16_t TYPE_NAME = 0x0007;
    static constexpr uint16_t DURABILITY = 0x001d;
    static constexpr uint16_t DEADLINE = 0x0023;
    static constexpr uint16_t LATENCY_BUDGET = 0x0027;
    static constexpr uint16_t LIVELINESS = 0x001b;
    static constexpr uint16_t RELIABILITY = 0x001a;
    static constexpr uint16_t OWNERSHIP = 0x001f;
    static constexpr uint16_t OWNERSHIP_STRENGTH = 0x0006;
    static constexpr uint16_t DESTINATION_ORDER = 0x0025;
    static constexpr uint16_t USER_DATA = 0x002c;
    static constexpr uint16_t TIME_BASED_FILTER = 0x0004;
    static constexpr uint16_t PRESENTATION = 0x0021;
    static constexpr uint16_t PARTITION = 0x0029;
    static constexpr uint16_t TOPIC_DATA = 0x002e;
    static constexpr uint16_t GROUP_DATA = 0x002d;
    static constexpr uint16_t HISTORY = 0x0040;
    static constexpr uint16_t RESOURCE_LIMITS = 0x0041;
    static constexpr uint16_t ENDPOINT_GUID = 0x005a;
    static constexpr uint16_t PARTICIPANT_GUID = 0x0050;
    static constexpr uint16_t UNICAST_LOCATOR = 0x002f;
    static constexpr uint16_t MULTICAST_LOCATOR = 0x0030;
    static constexpr uint16_t DEFAULT_UNICAST_LOCATOR = 0x0031;
    static constexpr uint16_t DEFAULT_MULTICAST_LOCATOR = 0x0048;
    static constexpr uint16_t METATRAFFIC_UNICAST_LOCATOR = 0x0032;
    static constexpr uint16_t METATRAFFIC_MULTICAST_LOCATOR = 0x0033;
    static constexpr uint16_t PROTOCOL_VERSION = 0x0015;
    static constexpr uint16_t VENDOR_ID = 0x0016;
    static constexpr uint16_t EXPECTS_INLINE_QOS = 0x0043;
    static constexpr uint16_t CONTENT_FILTER_PROPERTY = 0x0035;
    static constexpr uint16_t DATA_REPRESENTATION = 0x0073;
    static constexpr uint16_t TYPE_CONSISTENCY = 0x0074;
    static constexpr uint16_t TYPE_INFORMATION = 0x0075;
}

inline size_t align4(size_t offset) {
    return (offset + 3) & ~static_cast<size_t>(3);
}

class ParameterListBuilder {
public:
    ParameterListBuilder() {
        // CDR encapsulation header: PL_CDR_LE (0x0003)
        buffer_.push_back(0x00);
        buffer_.push_back(0x03);
        buffer_.push_back(0x00);  // Options
        buffer_.push_back(0x00);
    }

    void add_guid(uint16_t pid, const GUID_t& guid) {
        add_pid_header(pid, 16);
        for (int i = 0; i < 12; ++i)
            buffer_.push_back(guid.guidPrefix.value[i]);
        buffer_.push_back(guid.entityId.entityKey[0]);
        buffer_.push_back(guid.entityId.entityKey[1]);
        buffer_.push_back(guid.entityId.entityKey[2]);
        buffer_.push_back(guid.entityId.entityKind);
    }

    void add_string(uint16_t pid, const std::string& str) {
        uint32_t str_len = static_cast<uint32_t>(str.size() + 1);  // Include null terminator
        // Be liberal in what we send: use a 4-byte aligned parameterLength.
        // Some DDS stacks treat parameterLength as excluding padding (and then align), others
        // treat it as including padding. If we always use an aligned length, both styles work.
        const uint16_t unpadded = static_cast<uint16_t>(4 + str_len); // 4 for the CDR string length field
        const uint16_t len = static_cast<uint16_t>((unpadded + 3u) & ~3u);

        add_pid_header(pid, len);

        // String length (4 bytes, little-endian)
        buffer_.push_back(str_len & 0xFF);
        buffer_.push_back((str_len >> 8) & 0xFF);
        buffer_.push_back((str_len >> 16) & 0xFF);
        buffer_.push_back((str_len >> 24) & 0xFF);

        // String data + null terminator
        buffer_.insert(buffer_.end(), str.begin(), str.end());
        buffer_.push_back(0);

        // Padding to 4-byte boundary
        while ((buffer_.size() % 4) != 0)
            buffer_.push_back(0);
    }

    void add_locator(uint16_t pid, const Locator_t& loc) {
        add_pid_header(pid, 24);

        // Kind (4 bytes)
        uint32_t kind = static_cast<uint32_t>(loc.kind);
        buffer_.push_back(kind & 0xFF);
        buffer_.push_back((kind >> 8) & 0xFF);
        buffer_.push_back((kind >> 16) & 0xFF);
        buffer_.push_back((kind >> 24) & 0xFF);

        // Port (4 bytes)
        buffer_.push_back(loc.port & 0xFF);
        buffer_.push_back((loc.port >> 8) & 0xFF);
        buffer_.push_back((loc.port >> 16) & 0xFF);
        buffer_.push_back((loc.port >> 24) & 0xFF);

        // Address (16 bytes)
        buffer_.insert(buffer_.end(), loc.address.begin(), loc.address.end());
    }

    void add_unicast_locator_ipv4(uint16_t pid, const std::array<uint8_t, 4>& ip, uint32_t port) {
        Locator_t loc;
        loc.kind = LocatorKind_t::LOCATOR_KIND_UDPv4;
        loc.port = port;
        // IPv4-mapped IPv6 format: first 12 bytes are 0, last 4 are IPv4
        std::fill(loc.address.begin(), loc.address.begin() + 12, 0);
        loc.address[12] = ip[0];
        loc.address[13] = ip[1];
        loc.address[14] = ip[2];
        loc.address[15] = ip[3];
        add_locator(pid, loc);
    }

    void add_reliability(dcps::ReliabilityQosPolicyKind kind,
                        const Duration_t& max_blocking_time = Duration_t{0, 100000000}) {
        add_pid_header(PID::RELIABILITY, 12);

        // Reliability kind: 1=BEST_EFFORT, 2=RELIABLE (DDS wire format differs from enum)
        uint32_t wire_kind = (kind == dcps::ReliabilityQosPolicyKind::BEST_EFFORT_RELIABILITY_QOS) ? 1 : 2;
        buffer_.push_back(wire_kind & 0xFF);
        buffer_.push_back((wire_kind >> 8) & 0xFF);
        buffer_.push_back((wire_kind >> 16) & 0xFF);
        buffer_.push_back((wire_kind >> 24) & 0xFF);

        // max_blocking_time (seconds)
        int32_t sec = max_blocking_time.seconds;
        buffer_.push_back(sec & 0xFF);
        buffer_.push_back((sec >> 8) & 0xFF);
        buffer_.push_back((sec >> 16) & 0xFF);
        buffer_.push_back((sec >> 24) & 0xFF);

        // max_blocking_time (nanoseconds)
        uint32_t nsec = max_blocking_time.fraction;
        buffer_.push_back(nsec & 0xFF);
        buffer_.push_back((nsec >> 8) & 0xFF);
        buffer_.push_back((nsec >> 16) & 0xFF);
        buffer_.push_back((nsec >> 24) & 0xFF);
    }

    void add_durability(dcps::DurabilityQosPolicyKind kind) {
        add_pid_header(PID::DURABILITY, 4);

        uint32_t wire_kind = static_cast<uint32_t>(kind);
        buffer_.push_back(wire_kind & 0xFF);
        buffer_.push_back((wire_kind >> 8) & 0xFF);
        buffer_.push_back((wire_kind >> 16) & 0xFF);
        buffer_.push_back((wire_kind >> 24) & 0xFF);
    }

    void add_ownership(dcps::OwnershipQosPolicyKind kind) {
        add_pid_header(PID::OWNERSHIP, 4);

        uint32_t wire_kind = static_cast<uint32_t>(kind);
        buffer_.push_back(wire_kind & 0xFF);
        buffer_.push_back((wire_kind >> 8) & 0xFF);
        buffer_.push_back((wire_kind >> 16) & 0xFF);
        buffer_.push_back((wire_kind >> 24) & 0xFF);
    }

    void add_ownership_strength(int32_t value) {
        add_pid_header(PID::OWNERSHIP_STRENGTH, 4);

        uint32_t v = static_cast<uint32_t>(value);
        buffer_.push_back(v & 0xFF);
        buffer_.push_back((v >> 8) & 0xFF);
        buffer_.push_back((v >> 16) & 0xFF);
        buffer_.push_back((v >> 24) & 0xFF);
    }

    void add_history(dcps::HistoryQosPolicyKind kind, int32_t depth) {
        add_pid_header(PID::HISTORY, 8);

        uint32_t wire_kind = static_cast<uint32_t>(kind);
        buffer_.push_back(wire_kind & 0xFF);
        buffer_.push_back((wire_kind >> 8) & 0xFF);
        buffer_.push_back((wire_kind >> 16) & 0xFF);
        buffer_.push_back((wire_kind >> 24) & 0xFF);

        buffer_.push_back(depth & 0xFF);
        buffer_.push_back((depth >> 8) & 0xFF);
        buffer_.push_back((depth >> 16) & 0xFF);
        buffer_.push_back((depth >> 24) & 0xFF);
    }

    void add_deadline(const Duration_t& period) {
        add_pid_header(PID::DEADLINE, 8);

        int32_t sec = period.seconds;
        buffer_.push_back(sec & 0xFF);
        buffer_.push_back((sec >> 8) & 0xFF);
        buffer_.push_back((sec >> 16) & 0xFF);
        buffer_.push_back((sec >> 24) & 0xFF);

        // DDS wire format uses nanoseconds for Duration_t sub-second field.
        // For DURATION_INFINITE (seconds=0x7FFFFFFF), send 0xFFFFFFFF per spec.
        uint32_t nsec = (sec >= 0x7FFFFFFF) ? 0xFFFFFFFF : period.nanosec;
        buffer_.push_back(nsec & 0xFF);
        buffer_.push_back((nsec >> 8) & 0xFF);
        buffer_.push_back((nsec >> 16) & 0xFF);
        buffer_.push_back((nsec >> 24) & 0xFF);
    }

    void add_liveliness(dcps::LivelinessQosPolicyKind kind, const Duration_t& lease_duration) {
        add_pid_header(PID::LIVELINESS, 12);

        uint32_t wire_kind = static_cast<uint32_t>(kind);
        buffer_.push_back(wire_kind & 0xFF);
        buffer_.push_back((wire_kind >> 8) & 0xFF);
        buffer_.push_back((wire_kind >> 16) & 0xFF);
        buffer_.push_back((wire_kind >> 24) & 0xFF);

        int32_t sec = lease_duration.seconds;
        buffer_.push_back(sec & 0xFF);
        buffer_.push_back((sec >> 8) & 0xFF);
        buffer_.push_back((sec >> 16) & 0xFF);
        buffer_.push_back((sec >> 24) & 0xFF);

        // DDS wire format uses nanoseconds for Duration_t sub-second field.
        // For DURATION_INFINITE (seconds=0x7FFFFFFF), send 0xFFFFFFFF per spec.
        uint32_t nsec = (lease_duration.seconds >= 0x7FFFFFFF) ? 0xFFFFFFFF : lease_duration.nanosec;
        buffer_.push_back(nsec & 0xFF);
        buffer_.push_back((nsec >> 8) & 0xFF);
        buffer_.push_back((nsec >> 16) & 0xFF);
        buffer_.push_back((nsec >> 24) & 0xFF);
    }

    void add_destination_order(dcps::DestinationOrderQosPolicyKind kind) {
        add_pid_header(PID::DESTINATION_ORDER, 4);

        uint32_t wire_kind = static_cast<uint32_t>(kind);
        buffer_.push_back(wire_kind & 0xFF);
        buffer_.push_back((wire_kind >> 8) & 0xFF);
        buffer_.push_back((wire_kind >> 16) & 0xFF);
        buffer_.push_back((wire_kind >> 24) & 0xFF);
    }

    void add_presentation(dcps::PresentationQosPolicyAccessScopeKind scope, 
                         bool coherent, bool ordered) {
        add_pid_header(PID::PRESENTATION, 8);

        uint32_t wire_scope = static_cast<uint32_t>(scope);
        buffer_.push_back(wire_scope & 0xFF);
        buffer_.push_back((wire_scope >> 8) & 0xFF);
        buffer_.push_back((wire_scope >> 16) & 0xFF);
        buffer_.push_back((wire_scope >> 24) & 0xFF);

        buffer_.push_back(coherent ? 1 : 0);
        buffer_.push_back(ordered ? 1 : 0);
        buffer_.push_back(0);  // Padding
        buffer_.push_back(0);  // Padding
    }

    void add_data_representation(const std::vector<int16_t>& representations) {
        if (representations.empty()) return;

        uint32_t count = static_cast<uint32_t>(representations.size());
        // Use a 4-byte aligned parameterLength (see add_string for rationale).
        const uint16_t unpadded = static_cast<uint16_t>(4 + 2 * count);
        const uint16_t len = static_cast<uint16_t>((unpadded + 3u) & ~3u);

        add_pid_header(PID::DATA_REPRESENTATION, len);

        // Count
        buffer_.push_back(count & 0xFF);
        buffer_.push_back((count >> 8) & 0xFF);
        buffer_.push_back((count >> 16) & 0xFF);
        buffer_.push_back((count >> 24) & 0xFF);

        // Representations (short values)
        for (int16_t rep : representations) {
            buffer_.push_back(rep & 0xFF);
            buffer_.push_back((rep >> 8) & 0xFF);
        }

        // Padding
        while ((buffer_.size() % 4) != 0)
            buffer_.push_back(0);
    }

    void add_resource_limits(int32_t max_samples, int32_t max_instances, 
                            int32_t max_samples_per_instance) {
        add_pid_header(PID::RESOURCE_LIMITS, 12);

        buffer_.push_back(max_samples & 0xFF);
        buffer_.push_back((max_samples >> 8) & 0xFF);
        buffer_.push_back((max_samples >> 16) & 0xFF);
        buffer_.push_back((max_samples >> 24) & 0xFF);

        buffer_.push_back(max_instances & 0xFF);
        buffer_.push_back((max_instances >> 8) & 0xFF);
        buffer_.push_back((max_instances >> 16) & 0xFF);
        buffer_.push_back((max_instances >> 24) & 0xFF);

        buffer_.push_back(max_samples_per_instance & 0xFF);
        buffer_.push_back((max_samples_per_instance >> 8) & 0xFF);
        buffer_.push_back((max_samples_per_instance >> 16) & 0xFF);
        buffer_.push_back((max_samples_per_instance >> 24) & 0xFF);
    }

    void add_partition(const std::vector<std::string>& partitions) {
        if (partitions.empty()) return;

        // Build the CDR payload for a sequence<string> in a temp buffer
        // so we can compute parameterLength.
        std::vector<uint8_t> body;
        // Sequence length (count of strings)
        uint32_t count = static_cast<uint32_t>(partitions.size());
        body.push_back(count & 0xFF);
        body.push_back((count >> 8) & 0xFF);
        body.push_back((count >> 16) & 0xFF);
        body.push_back((count >> 24) & 0xFF);

        for (const auto& s : partitions) {
            uint32_t str_len = static_cast<uint32_t>(s.size() + 1); // include null
            body.push_back(str_len & 0xFF);
            body.push_back((str_len >> 8) & 0xFF);
            body.push_back((str_len >> 16) & 0xFF);
            body.push_back((str_len >> 24) & 0xFF);
            body.insert(body.end(), s.begin(), s.end());
            body.push_back(0); // null terminator
            // Pad each string to 4-byte boundary
            while ((body.size() % 4) != 0)
                body.push_back(0);
        }

        uint16_t len = static_cast<uint16_t>((body.size() + 3u) & ~3u);
        add_pid_header(PID::PARTITION, len);
        buffer_.insert(buffer_.end(), body.begin(), body.end());
        // Pad overall to 4-byte boundary
        while ((buffer_.size() % 4) != 0)
            buffer_.push_back(0);
    }

    void add_expects_inline_qos(bool expects) {
        add_pid_header(PID::EXPECTS_INLINE_QOS, 4);
        uint32_t val = expects ? 1 : 0;
        buffer_.push_back(val & 0xFF);
        buffer_.push_back((val >> 8) & 0xFF);
        buffer_.push_back((val >> 16) & 0xFF);
        buffer_.push_back((val >> 24) & 0xFF);
    }

    void add_type_information_from_type_name(const std::string& typeName) {
        if (typeName.empty()) return;
        std::array<uint8_t, 14> hash{};
        if (!lookup_type_information_hash(typeName, hash)) {
            // Fallback for types that have not registered metadata yet.
            auto fnv1a64 = [](const std::string& s, uint64_t seed) -> uint64_t {
                uint64_t h = seed;
                for (unsigned char c : s) {
                    h ^= static_cast<uint64_t>(c);
                    h *= 1099511628211ull;
                }
                return h;
            };

            const uint64_t h1 = fnv1a64(typeName, 1469598103934665603ull);
            const uint64_t h2 = fnv1a64(typeName, 1099511628211ull ^ 0x9e3779b97f4a7c15ull);
            for (int i = 0; i < 8; ++i) {
                hash[static_cast<size_t>(i)] = static_cast<uint8_t>((h1 >> (8 * i)) & 0xFF);
            }
            for (int i = 0; i < 6; ++i) {
                hash[static_cast<size_t>(8 + i)] = static_cast<uint8_t>((h2 >> (8 * i)) & 0xFF);
            }

            // Keep fallback hash discoverable for incoming TypeLookup requests
            // that were derived from early SEDP announcements.
            register_type_information_hash(typeName, hash);
        }

        // TypeInformation wire payload (40 bytes):
        // minimal TypeIdentifierWithSize (20)
        // complete TypeIdentifierWithSize (20)
        // each: discriminator(1) + hash(14) + pad(1) + serialized_size(4)
        add_pid_header(PID::TYPE_INFORMATION, 40);

        // minimal: EK_MINIMAL (0x7C)
        buffer_.push_back(0x7C);
        buffer_.insert(buffer_.end(), hash.begin(), hash.end());
        buffer_.push_back(0x00);  // padding
        buffer_.push_back(0x00);  // serialized_size = 0
        buffer_.push_back(0x00);
        buffer_.push_back(0x00);
        buffer_.push_back(0x00);

        // complete: EK_COMPLETE (0x7B)
        buffer_.push_back(0x7B);
        buffer_.insert(buffer_.end(), hash.begin(), hash.end());
        buffer_.push_back(0x00);  // padding
        buffer_.push_back(0x00);  // serialized_size = 0
        buffer_.push_back(0x00);
        buffer_.push_back(0x00);
        buffer_.push_back(0x00);
    }

    void add_protocol_version(uint8_t major, uint8_t minor) {
        add_pid_header(PID::PROTOCOL_VERSION, 4);
        buffer_.push_back(major);
        buffer_.push_back(minor);
        buffer_.push_back(0);  // Padding
        buffer_.push_back(0);
    }

    void add_vendor_id(uint8_t v0, uint8_t v1) {
        add_pid_header(PID::VENDOR_ID, 4);
        buffer_.push_back(v0);
        buffer_.push_back(v1);
        buffer_.push_back(0);  // Padding
        buffer_.push_back(0);
    }

    void finalize() {
        add_pid_header(PID::SENTINEL, 0);
    }

    const std::vector<uint8_t>& buffer() const { return buffer_; }

    std::vector<uint8_t> take_buffer() { return std::move(buffer_); }

private:
    void add_pid_header(uint16_t pid, uint16_t length) {
        buffer_.push_back(pid & 0xFF);
        buffer_.push_back((pid >> 8) & 0xFF);
        buffer_.push_back(length & 0xFF);
        buffer_.push_back((length >> 8) & 0xFF);
    }

    std::vector<uint8_t> buffer_;
};

class ReaderProxy {
public:
    // Identification
    GUID_t readerGuid{};
    std::string topicName;
    std::string typeName;

    // Locators (where to send data to this reader)
    LocatorList_t unicastLocatorList;
    LocatorList_t multicastLocatorList;

    // QoS policies
    dcps::ReliabilityQosPolicy reliability;
    dcps::DurabilityQosPolicy durability;
    dcps::HistoryQosPolicy history;
    dcps::OwnershipQosPolicy ownership;
    dcps::DeadlineQosPolicy deadline;
    dcps::LivelinessQosPolicy liveliness;
    dcps::DestinationOrderQosPolicy destination_order;
    dcps::PresentationQosPolicy presentation;
    dcps::ResourceLimitsQosPolicy resource_limits;
    dcps::DataRepresentationQosPolicy representation;

    // Partition QoS (Publisher/Subscriber level, but communicated per-endpoint in SEDP)
    std::vector<std::string> partitions;

    // Whether this reader expects inline QoS in DATA submessages
    bool expectsInlineQos{false};

    void set_default_qos() {
          reliability.kind = dcps::ReliabilityQosPolicyKind::BEST_EFFORT_RELIABILITY_QOS;
        reliability.max_blocking_time = Duration_t{0, 100000000};  // 100ms

        durability.kind = dcps::DurabilityQosPolicyKind::VOLATILE_DURABILITY_QOS;

        history.kind = dcps::HistoryQosPolicyKind::KEEP_LAST_HISTORY_QOS;
        history.depth = 1;

        ownership.kind = dcps::OwnershipQosPolicyKind::SHARED_OWNERSHIP_QOS;

        deadline.period = Duration_t::TIME_INFINITE();

        liveliness.kind = dcps::LivelinessQosPolicyKind::AUTOMATIC_LIVELINESS_QOS;
        liveliness.lease_duration = Duration_t::TIME_INFINITE();

        destination_order.kind = dcps::DestinationOrderQosPolicyKind::BY_RECEPTION_TIMESTAMP_DESTINATIONORDER_QOS;

        presentation.access_scope = dcps::PresentationQosPolicyAccessScopeKind::INSTANCE_PRESENTATION_QOS;
        presentation.coherent_access = false;
        presentation.ordered_access = false;

        resource_limits.max_samples = -1;  // unlimited
        resource_limits.max_instances = -1;
        resource_limits.max_samples_per_instance = -1;

        // Support both XCDR1 and XCDR2
        representation.value = {0x0000, 0x0002};
    }

    std::vector<uint8_t> serialize_to_parameter_list() const {
        ParameterListBuilder builder;

        // Required identification
        builder.add_guid(PID::ENDPOINT_GUID, readerGuid);
        builder.add_string(PID::TOPIC_NAME, topicName);
        builder.add_string(PID::TYPE_NAME, typeName);
        builder.add_type_information_from_type_name(typeName);

        // Unicast locators (required for receiving data)
        for (const auto& loc : unicastLocatorList) {
            builder.add_locator(PID::UNICAST_LOCATOR, loc);
        }

        // Multicast locators (optional)
        for (const auto& loc : multicastLocatorList) {
            builder.add_locator(PID::MULTICAST_LOCATOR, loc);
        }

        // QoS policies — all RxO policies must be serialized for correct matching.
        builder.add_reliability(reliability.kind, reliability.max_blocking_time);
        builder.add_durability(durability.kind);
        builder.add_ownership(ownership.kind);
        builder.add_deadline(deadline.period);

        // Partition QoS
        if (!partitions.empty()) {
            builder.add_partition(partitions);
        }

        // Data representation (important for type matching)
        if (!representation.value.empty()) {
            builder.add_data_representation(representation.value);
        }

        // Expects inline QoS flag
        if (expectsInlineQos) {
            builder.add_expects_inline_qos(true);
        }

        builder.finalize();
        return builder.take_buffer();
    }
};

class WriterProxy {
public:
    // Identification
    GUID_t writerGuid{};
    std::string topicName;
    std::string typeName;

    // Locators
    LocatorList_t unicastLocatorList;
    LocatorList_t multicastLocatorList;

    // QoS policies
    dcps::ReliabilityQosPolicy reliability;
    dcps::DurabilityQosPolicy durability;
    dcps::HistoryQosPolicy history;
    dcps::OwnershipQosPolicy ownership;
    dcps::OwnershipStrengthQosPolicy ownership_strength;
    dcps::DeadlineQosPolicy deadline;
    dcps::LivelinessQosPolicy liveliness;
    dcps::DestinationOrderQosPolicy destination_order;
    dcps::PresentationQosPolicy presentation;
    dcps::ResourceLimitsQosPolicy resource_limits;
    dcps::DataRepresentationQosPolicy representation;

    // Partition QoS (Publisher/Subscriber level, but communicated per-endpoint in SEDP)
    std::vector<std::string> partitions;

    void set_default_qos() {
        reliability.kind = dcps::ReliabilityQosPolicyKind::BEST_EFFORT_RELIABILITY_QOS;
        reliability.max_blocking_time = Duration_t{0, 100000000};

        durability.kind = dcps::DurabilityQosPolicyKind::VOLATILE_DURABILITY_QOS;

        history.kind = dcps::HistoryQosPolicyKind::KEEP_LAST_HISTORY_QOS;
        history.depth = 1;

        ownership.kind = dcps::OwnershipQosPolicyKind::SHARED_OWNERSHIP_QOS;
        ownership_strength.value = 0;

        deadline.period = Duration_t::TIME_INFINITE();

        liveliness.kind = dcps::LivelinessQosPolicyKind::AUTOMATIC_LIVELINESS_QOS;
        liveliness.lease_duration = Duration_t::TIME_INFINITE();

        destination_order.kind = dcps::DestinationOrderQosPolicyKind::BY_RECEPTION_TIMESTAMP_DESTINATIONORDER_QOS;

        presentation.access_scope = dcps::PresentationQosPolicyAccessScopeKind::INSTANCE_PRESENTATION_QOS;
        presentation.coherent_access = false;
        presentation.ordered_access = false;

        resource_limits.max_samples = -1;
        resource_limits.max_instances = -1;
        resource_limits.max_samples_per_instance = -1;

        representation.value = {0x0000, 0x0002};
    }

    std::vector<uint8_t> serialize_to_parameter_list() const {
        ParameterListBuilder builder;

        // Required identification
        builder.add_guid(PID::ENDPOINT_GUID, writerGuid);
        builder.add_string(PID::TOPIC_NAME, topicName);
        builder.add_string(PID::TYPE_NAME, typeName);
        builder.add_type_information_from_type_name(typeName);

        // Locators
        for (const auto& loc : unicastLocatorList) {
            builder.add_locator(PID::UNICAST_LOCATOR, loc);
        }
        for (const auto& loc : multicastLocatorList) {
            builder.add_locator(PID::MULTICAST_LOCATOR, loc);
        }

        // QoS policies — all RxO policies must be serialized for correct matching.
        builder.add_reliability(reliability.kind, reliability.max_blocking_time);
        builder.add_durability(durability.kind);
        builder.add_ownership(ownership.kind);
        builder.add_ownership_strength(ownership_strength.value);
        builder.add_deadline(deadline.period);

        // Partition QoS
        if (!partitions.empty()) {
            builder.add_partition(partitions);
        }

        if (!representation.value.empty()) {
            builder.add_data_representation(representation.value);
        }

        builder.finalize();
        return builder.take_buffer();
    }
};

} // namespace astutedds::rtps

#endif // ASTUTEDDS_RTPS_ENDPOINT_PROXY_HPP