File dcps.hpp

File List > astutedds > dcps > dcps.hpp

Go to the documentation of this file


#ifndef ASTUTEDDS_DCPS_DCPS_HPP
#define ASTUTEDDS_DCPS_DCPS_HPP

#include <astutedds/dcps/qos.hpp>
#include <astutedds/dcps/dds_psm_types.hpp>
#include <astutedds/dcps/topic_description.hpp>
#include <astutedds/rtps/rtps_types.hpp>
#include <astutedds/rtps/udp_transport.hpp>
#include <memory>
#include <mutex>
#include <vector>
#include <deque>
#include <string>
#include <functional>
#include <set>
#include <map>
#include <atomic>
#include <thread>
#include <chrono>
#include <condition_variable>

namespace astutedds::dcps
{

    // Forward declarations
    class DomainParticipant;
    class DomainParticipantFactory;
    class Topic;
    class Publisher;
    class Subscriber;
    class DataWriter;
    class DataReader;
    class ContentFilteredTopic;

    // ReturnCode_t is now defined as an unscoped enum in dds_psm_types.hpp
    // (included above). No duplicate definition here.

    struct SampleInfo
    {
        bool valid_data{false};
        rtps::Time_t source_timestamp{};
        InstanceHandle_t instance_handle{HANDLE_NIL};
        InstanceHandle_t publication_handle{HANDLE_NIL};
        int32_t disposed_generation_count{0};
        int32_t no_writers_generation_count{0};
        int32_t sample_rank{0};
        int32_t generation_rank{0};
        int32_t absolute_generation_rank{0};

        // DDS PSM state fields
        SampleStateKind sample_state{NOT_READ_SAMPLE_STATE};
        ViewStateKind view_state{NEW_VIEW_STATE};
        InstanceStateKind instance_state{ALIVE_INSTANCE_STATE};
    };

    struct CacheChange
    {
        rtps::SequenceNumber_t sequenceNumber{};
        std::vector<uint8_t> serializedData;
        rtps::Time_t sourceTimestamp{};
        rtps::ChangeKind_t kind{rtps::ChangeKind_t::ALIVE};
    };

    struct ReceivedSample
    {
        rtps::SequenceNumber_t sequenceNumber{};
        std::vector<uint8_t> serializedData;
        rtps::Time_t sourceTimestamp{};
        rtps::GUID_t writerGuid{};
        bool valid{true};
        bool read{false};
        int32_t ownershipStrength{0};   // For EXCLUSIVE ownership filtering
        InstanceHandle_t instance_handle{HANDLE_NIL};  // Computed from @key fields
    };

    class Topic : public TopicDescription
    {
    public:
        Topic(const std::string &name, const std::string &type_name, DomainParticipant *participant);
        ~Topic() override = default;

        // TopicDescription interface
        const char *get_name() const override { return name_.c_str(); }
        const char *get_type_name() const override { return type_name_.c_str(); }

        // Legacy accessors (used by shapes_demo)
        const std::string &name() const { return name_; }
        const std::string &type_name() const { return type_name_; }
        DomainParticipant *participant() const { return participant_; }

        void set_qos(const TopicQos &qos) { qos_ = qos; }
        const TopicQos &get_qos() const { return qos_; }

    private:
        std::string name_;
        std::string type_name_;
        DomainParticipant *participant_;
        TopicQos qos_;
    };

    class DataWriter
    {
    public:
        DataWriter(Topic *topic, Publisher *publisher);
        virtual ~DataWriter() = default;
        DataWriter(const DataWriter &) = delete;
        DataWriter &operator=(const DataWriter &) = delete;

        bool write(const std::vector<uint8_t> &data);

        bool write(const std::vector<uint8_t> &xcdr1Data,
                   const std::vector<uint8_t> &xcdr2Data);

        uint64_t register_instance();

        bool unregister_instance(uint64_t handle);

        bool dispose(uint64_t handle);

        ReturnCode_t wait_for_acknowledgments(const Duration_t &max_wait)
        {
            if (qos_.reliability.kind != ReliabilityQosPolicyKind::RELIABLE_RELIABILITY_QOS)
                return ReturnCode_t::RETCODE_OK;

            // Target: all samples written so far have been acknowledged.
            // last_acked_sn_ is advanced by process_acknack() as readers confirm receipt.
            const uint64_t target =
                next_sequence_number_.low > 1 ? next_sequence_number_.low - 1 : 0;
            if (last_acked_sn_ >= target)
                return ReturnCode_t::RETCODE_OK;

            const auto duration = std::chrono::seconds(max_wait.sec)
                + std::chrono::nanoseconds(max_wait.nanosec);
            std::unique_lock<std::mutex> lock(mutex_);
            const bool acked = ack_cv_.wait_for(lock, duration,
                [this, target] { return last_acked_sn_ >= target; });
            return acked ? ReturnCode_t::RETCODE_OK : ReturnCode_t::RETCODE_TIMEOUT;
        }

        // Accessors
        Topic *topic() const { return topic_; }
        Topic *get_topic() const { return topic_; }  // PSM alias
        Publisher *publisher() const { return publisher_; }
        const rtps::GUID_t &guid() const { return guid_; }
        void set_guid(const rtps::GUID_t &guid) { guid_ = guid; }

        void set_qos(const DataWriterQos &qos);
        const DataWriterQos &get_qos() const { return qos_; }

        const std::deque<CacheChange> &history_cache() const { return history_cache_; }

        // Reliability support
        void add_matched_reader(const rtps::GUID_t &reader_guid);
        void remove_matched_reader(const rtps::GUID_t &reader_guid);
        void process_acknack(const rtps::GUID_t &reader_guid,
                             const std::set<uint32_t> &requested_sns,
                             rtps::Count_t count);

        // Transport callback - set by DomainParticipant
        using SendCallback = std::function<void(const std::vector<uint8_t> &)>;
        void set_send_callback(SendCallback callback) { send_callback_ = callback; }

        // Dual-encoding transport callback (XCDR1 + XCDR2)
        using SendDualCallback = std::function<void(const std::vector<uint8_t> &,
                                                     const std::vector<uint8_t> &)>;
        void set_send_dual_callback(SendDualCallback callback) { send_dual_callback_ = callback; }

        // Heartbeat callback — invoked by send_heartbeat(); set by DomainParticipant
        // to trigger an out-of-band HEARTBEAT submessage via the transport.
        using HeartbeatCallback = std::function<void()>;
        void set_heartbeat_callback(HeartbeatCallback callback) { heartbeat_callback_ = std::move(callback); }

        // DataWriter listener
        void set_listener(DataWriterListener *listener, StatusMask mask = STATUS_MASK_ALL)
        {
            listener_ = listener;
            listener_mask_ = mask;
        }
        DataWriterListener *get_listener() const { return listener_; }
        StatusMask get_listener_mask() const { return listener_mask_; }

        // Deadline monitoring
        void start_deadline_timer();
        void stop_deadline_timer();
        void notify_write();  // Reset deadline timer on each write

        // Lifespan: remove expired samples from cache
        void remove_expired_samples();

    private:
        void send_heartbeat();
        void retransmit_samples(const std::set<uint32_t> &sequence_numbers);
        bool check_resource_limits();
        void apply_history_and_limits();
        void deadline_thread_func();

        Topic *topic_;
        Publisher *publisher_;
        rtps::GUID_t guid_;
        rtps::SequenceNumber_t next_sequence_number_{0, 1};
        std::deque<CacheChange> history_cache_;
        DataWriterQos qos_;
        mutable std::mutex mutex_;
        SendCallback send_callback_;
        SendDualCallback send_dual_callback_;
        HeartbeatCallback heartbeat_callback_;

        // Matched reader tracking (for wait_for_acknowledgments and status reporting)
        std::set<rtps::GUID_t> matched_readers_;

        // Acknowledgment tracking: last_acked_sn_ is set to the highest SN
        // confirmed by all matched readers; ack_cv_ is signaled on each update.
        uint64_t last_acked_sn_{0};
        std::condition_variable ack_cv_;

        // DataWriter listener
        DataWriterListener *listener_{nullptr};
        StatusMask listener_mask_{STATUS_MASK_ALL};

        // Deadline monitoring
        std::thread deadline_thread_;
        std::condition_variable deadline_cv_;
        std::atomic<bool> deadline_running_{false};
        std::chrono::steady_clock::time_point last_write_time_;

        // Lifespan tracking — each cache entry gets a creation timestamp
        std::deque<std::chrono::steady_clock::time_point> cache_timestamps_;
    };

    class DataReader
    {
    public:
        DataReader(Topic *topic, Subscriber *subscriber);
        virtual ~DataReader() = default;
        DataReader(const DataReader &) = delete;
        DataReader &operator=(const DataReader &) = delete;

        std::vector<ReceivedSample> read(size_t max_samples = 0xFFFFFFFF);

        std::vector<ReceivedSample> take(size_t max_samples = 0xFFFFFFFF);

        ReturnCode_t read_next_sample(std::vector<uint8_t> &data, SampleInfo &info);

        ReturnCode_t take_next_sample(std::vector<uint8_t> &data, SampleInfo &info);

        void add_sample(const ReceivedSample &sample);

        ReturnCode_t get_key_value(std::vector<uint8_t> &key_holder, InstanceHandle_t handle);

        ReturnCode_t return_loan(std::vector<std::vector<uint8_t>> & /*data_values*/,
                                 std::vector<SampleInfo> & /*sample_infos*/)
        {
            return ReturnCode_t::RETCODE_OK;
        }

        // Accessors
        Topic *topic() const { return topic_; }
        TopicDescription *get_topicdescription() const;  // PSM: may be CFT
        Subscriber *subscriber() const { return subscriber_; }
        const rtps::GUID_t &guid() const { return guid_; }
        void set_guid(const rtps::GUID_t &guid) { guid_ = guid; }

        void set_qos(const DataReaderQos &qos);
        const DataReaderQos &get_qos() const { return qos_; }

        size_t unread_count() const;

        // Reliability support
        void add_matched_writer(const rtps::GUID_t &writer_guid);
        void remove_matched_writer(const rtps::GUID_t &writer_guid);
        void process_heartbeat(const rtps::GUID_t &writer_guid,
                               const rtps::SequenceNumber_t &first_sn,
                               const rtps::SequenceNumber_t &last_sn,
                               rtps::Count_t count,
                               bool final);

        // Data received callback
        using DataCallback = std::function<void(const ReceivedSample &)>;
        void set_data_callback(DataCallback callback) { data_callback_ = callback; }

        // Listener support (DDS §2.1.4.3 — DataReaderListener dispatch)
        void set_listener(DataReaderListener* listener) { listener_ = listener; }
        DataReaderListener* get_listener() const { return listener_; }

        // WaitSet notification — external condition variables to signal on data arrival
        using NotifyCallback = std::function<void()>;
        void add_notify_callback(NotifyCallback cb)
        {
            std::lock_guard<std::mutex> lock(mutex_);
            notify_callbacks_.push_back(std::move(cb));
        }

        // Topic description override (used when created via ContentFilteredTopic)
        void set_topicdescription(TopicDescription *td) { topicdescription_ = td; }

        // Deadline monitoring
        void start_deadline_timer();
        void stop_deadline_timer();
        void notify_receive();  // Reset deadline timer on each receive

    protected:
        TopicDescription *topicdescription_{nullptr};  // non-null when created via CFT

    private:
        void send_acknack(const rtps::GUID_t &writer_guid);
        bool check_resource_limits() const;
        void apply_history_and_limits();
        void deadline_thread_func();

        Topic *topic_;
        Subscriber *subscriber_;
        rtps::GUID_t guid_;
        std::deque<ReceivedSample> history_cache_;
        DataReaderQos qos_;
        mutable std::mutex mutex_;
        DataCallback data_callback_;
        DataReaderListener* listener_{nullptr};
        std::vector<NotifyCallback> notify_callbacks_;

        // Matched writer tracking (for process_heartbeat and send_acknack)
        std::set<rtps::GUID_t> matched_writers_;

        // ACKNACK callback — invoked by send_acknack(); set by DomainParticipant
        // to deliver an ACKNACK via the transport for the given writer.
        using AcknackCallback = std::function<void(const rtps::GUID_t &)>;
        AcknackCallback acknack_callback_;

    public:
        void set_acknack_callback(AcknackCallback cb) { acknack_callback_ = std::move(cb); }

    private:

        // Time-based filter: track last accepted time per writer
        std::map<rtps::GUID_t, std::chrono::steady_clock::time_point> last_accepted_time_;

        // Ownership: per-instance tracking of current owner (EXCLUSIVE)
        struct InstanceOwnership {
            rtps::GUID_t ownerGuid{};
            int32_t ownerStrength{0};
        };
        std::map<int32_t, InstanceOwnership> instance_owners_;  // key = instance_handle hash

        // Instance registry: maps instance_handle → serialized key bytes (DDS §2.2.2.5.3)
        std::map<InstanceHandle_t, std::vector<uint8_t>> instance_key_data_;

        // Deadline monitoring
        std::thread deadline_thread_;
        std::condition_variable deadline_cv_;
        std::atomic<bool> deadline_running_{false};
        std::chrono::steady_clock::time_point last_receive_time_;
    };

    class Publisher
    {
    public:
        Publisher(DomainParticipant *participant);
        ~Publisher() = default;
        Publisher(const Publisher &) = delete;
        Publisher &operator=(const Publisher &) = delete;

        DataWriter *create_datawriter(Topic *topic, const DataWriterQos &qos = DataWriterQos{});

        DataWriter *create_datawriter(Topic *topic, const DataWriterQos &qos,
                                      DataWriterListener *listener,
                                      StatusMask mask = STATUS_MASK_ALL)
        {
            DataWriter *writer = create_datawriter(topic, qos);
            if (writer && listener)
            {
                writer->set_listener(listener, mask);
            }
            return writer;
        }

        DataWriterQos default_datawriter_qos() const { return DataWriterQos{}; }

        ReturnCode_t get_default_datawriter_qos(DataWriterQos &qos) const
        {
            qos = DataWriterQos{};
            return ReturnCode_t::RETCODE_OK;
        }

        bool delete_datawriter(DataWriter *writer);

        ReturnCode_t begin_coherent_changes()
        {
            coherent_active_ = true;
            return ReturnCode_t::RETCODE_OK;
        }

        ReturnCode_t end_coherent_changes();

        bool is_coherent_active() const { return coherent_active_; }

        void buffer_coherent_write(DataWriter *writer,
                                   const std::vector<uint8_t> &data);
        void buffer_coherent_write_dual(DataWriter *writer,
                                        const std::vector<uint8_t> &xcdr1,
                                        const std::vector<uint8_t> &xcdr2);

        DomainParticipant *participant() const { return participant_; }

        void set_qos(const PublisherQos &qos) { qos_ = qos; }
        const PublisherQos &get_qos() const { return qos_; }

        const std::vector<std::unique_ptr<DataWriter>> &writers() const { return writers_; }

        std::vector<DataWriter *> get_datawriters() const
        {
            std::vector<DataWriter *> result;
            result.reserve(writers_.size());
            for (const auto &w : writers_)
            {
                result.push_back(w.get());
            }
            return result;
        }

    private:
        DomainParticipant *participant_;
        PublisherQos qos_;
        std::vector<std::unique_ptr<DataWriter>> writers_;
        uint32_t next_writer_id_{1};

        // Coherent changes buffering
        bool coherent_active_{false};
        struct CoherentEntry {
            DataWriter *writer;
            std::vector<uint8_t> xcdr1;
            std::vector<uint8_t> xcdr2;
            bool dual;
        };
        std::vector<CoherentEntry> coherent_buffer_;
    };

    class Subscriber
    {
    public:
        Subscriber(DomainParticipant *participant);
        ~Subscriber() = default;
        Subscriber(const Subscriber &) = delete;
        Subscriber &operator=(const Subscriber &) = delete;

        DataReader *create_datareader(Topic *topic, const DataReaderQos &qos = DataReaderQos{});

        DataReader *create_datareader(TopicDescription *td, const DataReaderQos &qos,
                                      DataReaderListener *listener = nullptr,
                                      StatusMask mask = STATUS_MASK_ALL);

        DataReaderQos default_datareader_qos() const { return DataReaderQos{}; }

        ReturnCode_t get_default_datareader_qos(DataReaderQos &qos) const
        {
            qos = DataReaderQos{};
            return ReturnCode_t::RETCODE_OK;
        }

        bool delete_datareader(DataReader *reader);

        ReturnCode_t begin_access()
        {
            access_active_ = true;
            return ReturnCode_t::RETCODE_OK;
        }

        ReturnCode_t end_access()
        {
            access_active_ = false;
            return ReturnCode_t::RETCODE_OK;
        }

        bool is_access_active() const { return access_active_; }

        DomainParticipant *participant() const { return participant_; }

        void set_qos(const SubscriberQos &qos) { qos_ = qos; }
        const SubscriberQos &get_qos() const { return qos_; }

        const std::vector<std::unique_ptr<DataReader>> &readers() const { return readers_; }

        std::vector<DataReader *> get_datareaders() const
        {
            std::vector<DataReader *> result;
            result.reserve(readers_.size());
            for (const auto &r : readers_)
            {
                result.push_back(r.get());
            }
            return result;
        }

    private:
        DomainParticipant *participant_;
        SubscriberQos qos_;
        std::vector<std::unique_ptr<DataReader>> readers_;
        uint32_t next_reader_id_{1};
        bool access_active_{false};
    };

    class DomainParticipant
    {
    public:
        DomainParticipant(uint32_t domain_id, uint32_t participant_id);
        ~DomainParticipant();
        DomainParticipant(const DomainParticipant &) = delete;
        DomainParticipant &operator=(const DomainParticipant &) = delete;

        bool enable();

        void stop();

        // ── Topic creation ──────────────────────────────────────────────

        Topic *create_topic(const std::string &topic_name,
                            const std::string &type_name,
                            const TopicQos &qos = TopicQos{});

        Topic *create_topic(const char *topic_name,
                            const char *type_name,
                            const TopicQos &qos,
                            TopicListener *listener,
                            StatusMask mask = STATUS_MASK_ALL)
        {
            (void)listener;
            (void)mask;
            return create_topic(std::string(topic_name), std::string(type_name), qos);
        }

        bool delete_topic(Topic *topic);

        // ── ContentFilteredTopic ────────────────────────────────────────

        ContentFilteredTopic *create_contentfilteredtopic(
            const char *name,
            Topic *related_topic,
            const char *filter_expression,
            const StringSeq &expression_parameters);

        bool delete_contentfilteredtopic(ContentFilteredTopic *cft);

        // ── Publisher / Subscriber creation ─────────────────────────────

        Publisher *create_publisher(const PublisherQos &qos = PublisherQos{});

        Publisher *create_publisher(const PublisherQos &qos,
                                    PublisherListener *listener,
                                    StatusMask mask = STATUS_MASK_ALL)
        {
            (void)listener;
            (void)mask;
            return create_publisher(qos);
        }

        bool delete_publisher(Publisher *publisher);

        Subscriber *create_subscriber(const SubscriberQos &qos = SubscriberQos{});

        Subscriber *create_subscriber(const SubscriberQos &qos,
                                      SubscriberListener *listener,
                                      StatusMask mask = STATUS_MASK_ALL)
        {
            (void)listener;
            (void)mask;
            return create_subscriber(qos);
        }

        bool delete_subscriber(Subscriber *subscriber);

        // ── Default QoS out-param accessors (PSM) ──────────────────────

        ReturnCode_t get_default_topic_qos(TopicQos &qos) const
        {
            qos = TopicQos{};
            return ReturnCode_t::RETCODE_OK;
        }

        ReturnCode_t get_default_publisher_qos(PublisherQos &qos) const
        {
            qos = PublisherQos{};
            return ReturnCode_t::RETCODE_OK;
        }

        ReturnCode_t get_default_subscriber_qos(SubscriberQos &qos) const
        {
            qos = SubscriberQos{};
            return ReturnCode_t::RETCODE_OK;
        }

        // ── Cleanup ────────────────────────────────────────────────────

        ReturnCode_t delete_contained_entities();

        // ── Type Support Factory ───────────────────────────────────────

        using DataWriterFactory = std::function<DataWriter *(Topic *, Publisher *)>;
        using DataReaderFactory = std::function<DataReader *(Topic *, Subscriber *)>;

        void register_type_factory(const std::string &type_name,
                                   DataWriterFactory wf, DataReaderFactory rf);

        DataWriterFactory get_writer_factory(const std::string &type_name) const;

        DataReaderFactory get_reader_factory(const std::string &type_name) const;

        // ── Accessors ──────────────────────────────────────────────────

        uint32_t domain_id() const { return domain_id_; }
        uint32_t participant_id() const { return participant_id_; }
        const rtps::GuidPrefix_t &guid_prefix() const { return guid_prefix_; }

        void set_qos(const DomainParticipantQos &qos) { qos_ = qos; }
        const DomainParticipantQos &get_qos() const { return qos_; }

        bool is_enabled() const { return running_.load(); }

        // Listener management
        void set_listener(DomainParticipantListener *listener, StatusMask mask = STATUS_MASK_ALL)
        {
            listener_ = listener;
            listener_mask_ = mask;
        }
        DomainParticipantListener *get_listener() const { return listener_; }
        StatusMask get_listener_mask() const { return listener_mask_; }

        void send_data(const std::string &topic_name, const std::vector<uint8_t> &data);

        void send_data(const std::string &topic_name,
                       const std::vector<uint8_t> &xcdr1Data,
                       const std::vector<uint8_t> &xcdr2Data);

        using SendDataCallback = std::function<void(const std::string &topic_name, const std::vector<uint8_t> &data)>;
        void set_send_data_callback(SendDataCallback callback) { send_data_callback_ = callback; }

        using SubscribeCallback = std::function<void(const std::string &topic_name)>;
        void set_subscribe_callback(SubscribeCallback callback) { subscribe_callback_ = callback; }

        void register_writer(const std::string &topic_name,
                             const std::string &type_name,
                             const std::vector<std::string> &partitions = {},
                             const DataWriterQos &qos = {});

        void register_reader(const std::string &topic_name,
                             const std::string &type_name,
                             const std::vector<std::string> &partitions = {},
                             const DataReaderQos &qos = {});

        void notify_subscription(const std::string &topic_name);

        void deliver_data(const std::string &topic_name, const std::vector<uint8_t> &data,
                          const rtps::GUID_t &writer_guid = rtps::GUID_t{},
                          int32_t ownershipStrength = 0,
                          const rtps::SequenceNumber_t &sequenceNumber = rtps::SequenceNumber_t{});

        void replay_history_to_reader(DataWriter *writer);

        using DataReceivedCallback = std::function<void(const std::string &topic, const ReceivedSample &)>;
        void set_data_received_callback(DataReceivedCallback callback) { data_received_callback_ = callback; }

        void match_reader_to_discovered_writers(DataReader *reader, const std::vector<std::string> &subPartitions);

        void match_writer_to_discovered_readers(DataWriter *writer, const std::vector<std::string> &pubPartitions);

    private:
        void discovery_loop();
        void generate_guid_prefix();
        rtps::GUID_t create_writer_guid(uint32_t writer_id);
        rtps::GUID_t create_reader_guid(uint32_t reader_id);

        uint32_t domain_id_;
        uint32_t participant_id_;
        rtps::GuidPrefix_t guid_prefix_;
        DomainParticipantQos qos_;

        std::atomic<bool> running_{false};
        std::thread discovery_thread_;
        mutable std::mutex mutex_;

        std::vector<std::unique_ptr<Topic>> topics_;
        std::vector<std::unique_ptr<Publisher>> publishers_;
        std::vector<std::unique_ptr<Subscriber>> subscribers_;
        std::vector<std::unique_ptr<ContentFilteredTopic>> cfts_;

        SendDataCallback send_data_callback_;
        SubscribeCallback subscribe_callback_;
        DataReceivedCallback data_received_callback_;

        // Listener
        DomainParticipantListener *listener_{nullptr};
        StatusMask listener_mask_{0};

        // Type support factories (populated by register_type_factory)
        std::map<std::string, DataWriterFactory> writer_factories_;
        std::map<std::string, DataReaderFactory> reader_factories_;

        // Built-in RTPS transport
        std::unique_ptr<rtps::RtpsUdpTransport> transport_;

        // Tracking sets for SEDP callback deduplication and data delivery gating.
        // matchedRemoteWriters_ contains GUIDs of remote writers whose QoS is
        // compatible with at least one local reader. deliver_data() only delivers
        // samples from writers in this set.
        std::set<rtps::GUID_t> matchedRemoteWriters_;
        std::set<rtps::GUID_t> incompatRemoteWriters_;
        std::set<rtps::GUID_t> matchedRemoteReaders_;
        std::set<rtps::GUID_t> incompatRemoteReaders_;
    };

    class DomainParticipantFactory
    {
    public:
        static DomainParticipantFactory &instance();

        static DomainParticipantFactory *get_instance()
        {
            return &instance();
        }

        DomainParticipant *create_participant(uint32_t domain_id,
                                              const DomainParticipantQos &qos = DomainParticipantQos{});

        DomainParticipant *create_participant(DomainId_t domain_id,
                                              const DomainParticipantQos &qos,
                                              DomainParticipantListener *listener,
                                              StatusMask mask = STATUS_MASK_ALL)
        {
            auto *dp = create_participant(static_cast<uint32_t>(domain_id), qos);
            if (dp && listener)
            {
                dp->set_listener(listener, mask);
            }
            return dp;
        }

        ReturnCode_t get_default_participant_qos(DomainParticipantQos &qos) const
        {
            qos = DomainParticipantQos{};
            return ReturnCode_t::RETCODE_OK;
        }

        bool delete_participant(DomainParticipant *participant);

    private:
        DomainParticipantFactory() = default;

        std::mutex mutex_;
        std::vector<std::unique_ptr<DomainParticipant>> participants_;
        uint32_t next_participant_id_{0};
    };

} // namespace astutedds::dcps

// Include Cyclone-style namespace compatibility layer after class definitions
#include <astutedds/dcps/CorePolicy.hpp>

// Standard DDS namespace alias for compatibility
namespace DDS = astutedds::dcps;

#endif // ASTUTEDDS_DCPS_DCPS_HPP