File CorePolicy.hpp

File List > astutedds > dcps > CorePolicy.hpp

Go to the documentation of this file

//
// Copyright (c) 2026, Astute Systems PTY LTD
//
// This file is part of the Astute DDS developed by Astute Systems.
//
// See the commercial LICENSE file in the project root for full license details.
//
// @file CorePolicy.hpp
// @brief DDS C++ PSM (OMG DDS-PSM-Cxx 1.0 / ISO/IEC 19568:2017) namespace wrappers.
//
// Provides the standard dds:: namespace hierarchy over the native astutedds::dcps::
// API.  Compatible with code written for RTI Connext modern C++ API and Eclipse
// Cyclone DDS C++ bindings.
//
// Namespace mapping:
//   dds::core::policy::*          QoS policy factory helpers (14 policies)
//   dds::core::cond::*            Condition types for WaitSet
//   dds::core::WaitSet            Synchronous event-wait facility
//   dds::domain::DomainParticipant
//   dds::topic::Topic<T>, ContentFilteredTopic<T>, TypeTraits<T>
//   dds::sub::Subscriber, DataReader<T>
//   dds::pub::Publisher, DataWriter<T>
//

#ifndef ASTUTEDDS_DCPS_COREPOLICY_HPP
#define ASTUTEDDS_DCPS_COREPOLICY_HPP

#include <astutedds/cdr/xcdr2_codec.hpp>
#include <astutedds/dcps/content_filtered_topic.hpp>
#include <astutedds/dcps/qos.hpp>
#include <astutedds/dcps/typed.hpp>

#include <algorithm>
#include <chrono>
#include <initializer_list>
#include <memory>
#include <mutex>
#include <string>
#include <thread>
#include <typeinfo>
#include <vector>

// ============================================================================
// dds — OMG DDS-PSM-Cxx 1.0 top-level namespace
// ============================================================================
namespace dds
{

// Forward declarations
namespace topic
{
template <typename T>
class Topic;
template <typename T>
class ContentFilteredTopic;
} // namespace topic

namespace sub
{
class Subscriber;
template <typename T>
class DataReader;
} // namespace sub

namespace pub
{
class Publisher;
template <typename T>
class DataWriter;
} // namespace pub

namespace core
{
namespace cond
{
class StatusCondition;
class ReadCondition;
class GuardCondition;
} // namespace cond
} // namespace core

// ============================================================================
// dds::topic::TypeTraits
// ============================================================================
namespace topic
{
template <typename T>
struct TypeTraits
{
    using xcdr2_codec = T;
    using xcdr1_codec = T;
};
} // namespace topic

// ============================================================================
// dds::core — Duration, QoS policies, conditions, WaitSet
// ============================================================================
namespace core
{

using Duration = astutedds::rtps::Duration_t;

inline Duration Duration_infinite() { return astutedds::rtps::Time_t::TIME_INFINITE(); }
inline Duration Duration_zero()     { return astutedds::rtps::Time_t::TIME_ZERO(); }

// ============================================================================
// dds::core::policy
// ============================================================================
namespace policy
{

enum class DataRepresentationId : int16_t
{
    XCDR1 = 0x0000,
    XCDR2 = 0x0002
};

inline astutedds::dcps::DataRepresentationModifier
DataRepresentation(std::initializer_list<DataRepresentationId> ids)
{
    std::vector<int16_t> values;
    values.reserve(ids.size());
    std::transform(ids.begin(), ids.end(), std::back_inserter(values),
                   [](auto id) { return static_cast<int16_t>(id); });
    return astutedds::dcps::DataRepresentation(std::move(values));
}

struct Reliability
{
    static astutedds::dcps::ReliabilityQosModifier Reliable(
        Duration max_blocking_time = Duration(0, 100000000))
    {
        return astutedds::dcps::Reliability(
            astutedds::dcps::ReliabilityQosPolicyKind::RELIABLE_RELIABILITY_QOS,
            max_blocking_time);
    }
    static astutedds::dcps::ReliabilityQosModifier BestEffort()
    {
        return astutedds::dcps::Reliability(
            astutedds::dcps::ReliabilityQosPolicyKind::BEST_EFFORT_RELIABILITY_QOS,
            Duration(0, 0));
    }
};

struct History
{
    static astutedds::dcps::HistoryKeepLastModifier KeepLast(int32_t depth)
    {
        return astutedds::dcps::HistoryKeepLast(depth);
    }
    static astutedds::dcps::HistoryKeepAllModifier KeepAll()
    {
        return astutedds::dcps::HistoryKeepAll();
    }
};

struct Durability
{
    static astutedds::dcps::DurabilityQosModifier Volatile()
    {
        return astutedds::dcps::Durability(
            astutedds::dcps::DurabilityQosPolicyKind::VOLATILE_DURABILITY_QOS);
    }
    static astutedds::dcps::DurabilityQosModifier TransientLocal()
    {
        return astutedds::dcps::Durability(
            astutedds::dcps::DurabilityQosPolicyKind::TRANSIENT_LOCAL_DURABILITY_QOS);
    }
    static astutedds::dcps::DurabilityQosModifier Transient()
    {
        return astutedds::dcps::Durability(
            astutedds::dcps::DurabilityQosPolicyKind::TRANSIENT_DURABILITY_QOS);
    }
    static astutedds::dcps::DurabilityQosModifier Persistent()
    {
        return astutedds::dcps::Durability(
            astutedds::dcps::DurabilityQosPolicyKind::PERSISTENT_DURABILITY_QOS);
    }
};

struct Deadline
{
    static astutedds::dcps::DeadlineModifier period(Duration d)
    {
        return astutedds::dcps::Deadline(d);
    }
};

struct Liveliness
{
    static astutedds::dcps::LivelinessModifier Automatic(
        Duration lease = astutedds::rtps::Time_t::TIME_INFINITE())
    {
        return astutedds::dcps::Liveliness(
            astutedds::dcps::LivelinessQosPolicyKind::AUTOMATIC_LIVELINESS_QOS, lease);
    }
    static astutedds::dcps::LivelinessModifier ManualByParticipant(
        Duration lease = astutedds::rtps::Time_t::TIME_INFINITE())
    {
        return astutedds::dcps::Liveliness(
            astutedds::dcps::LivelinessQosPolicyKind::MANUAL_BY_PARTICIPANT_LIVELINESS_QOS, lease);
    }
    static astutedds::dcps::LivelinessModifier ManualByTopic(
        Duration lease = astutedds::rtps::Time_t::TIME_INFINITE())
    {
        return astutedds::dcps::Liveliness(
            astutedds::dcps::LivelinessQosPolicyKind::MANUAL_BY_TOPIC_LIVELINESS_QOS, lease);
    }
};

struct Ownership
{
    static astutedds::dcps::OwnershipModifier Shared()
    {
        return astutedds::dcps::Ownership(
            astutedds::dcps::OwnershipQosPolicyKind::SHARED_OWNERSHIP_QOS);
    }
    static astutedds::dcps::OwnershipModifier Exclusive()
    {
        return astutedds::dcps::Ownership(
            astutedds::dcps::OwnershipQosPolicyKind::EXCLUSIVE_OWNERSHIP_QOS);
    }
};

inline astutedds::dcps::OwnershipStrengthModifier OwnershipStrength(int32_t value)
{
    return astutedds::dcps::OwnershipStrength(value);
}

inline astutedds::dcps::PartitionModifier Partition(std::vector<std::string> names)
{
    return astutedds::dcps::Partition(std::move(names));
}
inline astutedds::dcps::PartitionModifier Partition(std::string name)
{
    return astutedds::dcps::Partition(std::vector<std::string>{std::move(name)});
}

inline astutedds::dcps::ResourceLimitsModifier ResourceLimits(
    int32_t max_samples, int32_t max_instances = -1, int32_t max_samples_per_instance = -1)
{
    return astutedds::dcps::ResourceLimits(max_samples, max_instances,
                                           max_samples_per_instance);
}

inline astutedds::dcps::LifespanModifier Lifespan(Duration duration)
{
    return astutedds::dcps::Lifespan(duration);
}

// ---------------------------------------------------------------------------
// LatencyBudget (DDS §2.2.3.9) — DataWriter and DataReader
// ---------------------------------------------------------------------------
inline astutedds::dcps::LatencyBudgetModifier LatencyBudget(Duration duration)
{
    return astutedds::dcps::LatencyBudget(duration);
}

// ---------------------------------------------------------------------------
// TimeBasedFilter (DDS §2.2.3.16) — DataReader only
// ---------------------------------------------------------------------------
inline astutedds::dcps::TimeBasedFilterModifier TimeBasedFilter(Duration minimum_separation)
{
    return astutedds::dcps::TimeBasedFilter(minimum_separation);
}

// ---------------------------------------------------------------------------
// DestinationOrder (DDS §2.2.3.7)
// ---------------------------------------------------------------------------
struct DestinationOrder
{
    static astutedds::dcps::DestinationOrderModifier ByReceptionTimestamp()
    {
        return astutedds::dcps::DestinationOrder(
            astutedds::dcps::DestinationOrderQosPolicyKind::
                BY_RECEPTION_TIMESTAMP_DESTINATIONORDER_QOS);
    }

    static astutedds::dcps::DestinationOrderModifier BySourceTimestamp()
    {
        return astutedds::dcps::DestinationOrder(
            astutedds::dcps::DestinationOrderQosPolicyKind::
                BY_SOURCE_TIMESTAMP_DESTINATIONORDER_QOS);
    }
};

// ---------------------------------------------------------------------------
// WriterDataLifecycle (DDS §2.2.3.18) — DataWriter only
// ---------------------------------------------------------------------------
inline astutedds::dcps::WriterDataLifecycleModifier WriterDataLifecycle(
    bool autodispose_unregistered_instances = true)
{
    return astutedds::dcps::WriterDataLifecycle(autodispose_unregistered_instances);
}

// ---------------------------------------------------------------------------
// ReaderDataLifecycle (DDS §2.2.3.19) — DataReader only
// ---------------------------------------------------------------------------
inline astutedds::dcps::ReaderDataLifecycleModifier ReaderDataLifecycle(
    Duration nowriter_delay = astutedds::rtps::Time_t::TIME_INFINITE(),
    Duration disposed_delay = astutedds::rtps::Time_t::TIME_INFINITE())
{
    return astutedds::dcps::ReaderDataLifecycle(nowriter_delay, disposed_delay);
}

// ---------------------------------------------------------------------------
// Presentation (DDS §2.2.3.6) — Publisher and Subscriber
// ---------------------------------------------------------------------------
struct Presentation
{
    enum class AccessScopeKind
    {
        INSTANCE = 0, // INSTANCE_PRESENTATION_QOS
        TOPIC    = 1, // TOPIC_PRESENTATION_QOS
        GROUP    = 2  // GROUP_PRESENTATION_QOS
    };

    static astutedds::dcps::PresentationModifier InstanceAccessScope(
        bool coherent = false, bool ordered = false)
    {
        return astutedds::dcps::Presentation(
            astutedds::dcps::PresentationQosPolicyAccessScopeKind::INSTANCE_PRESENTATION_QOS,
            coherent, ordered);
    }

    static astutedds::dcps::PresentationModifier TopicAccessScope(
        bool coherent = false, bool ordered = false)
    {
        return astutedds::dcps::Presentation(
            astutedds::dcps::PresentationQosPolicyAccessScopeKind::TOPIC_PRESENTATION_QOS,
            coherent, ordered);
    }

    static astutedds::dcps::PresentationModifier GroupAccessScope(
        bool coherent = false, bool ordered = false)
    {
        return astutedds::dcps::Presentation(
            astutedds::dcps::PresentationQosPolicyAccessScopeKind::GROUP_PRESENTATION_QOS,
            coherent, ordered);
    }
};

// ---------------------------------------------------------------------------
// UserData (DDS §2.2.3.2) — DataWriter / DataReader
// ---------------------------------------------------------------------------
inline astutedds::dcps::UserDataModifier UserData(std::vector<uint8_t> value)
{
    return astutedds::dcps::UserData(std::move(value));
}

// ---------------------------------------------------------------------------
// GroupData (DDS §2.2.3.x) — Publisher / Subscriber
// ---------------------------------------------------------------------------
inline astutedds::dcps::GroupDataModifier GroupData(std::vector<uint8_t> value)
{
    return astutedds::dcps::GroupData(std::move(value));
}

}  // namespace policy

// ============================================================================
// dds::core::cond
// ============================================================================
namespace cond
{

class Condition
{
public:
    virtual ~Condition() = default;
    virtual bool trigger_value() const = 0;
};

class ReadCondition : public Condition
{
public:
    explicit ReadCondition(astutedds::dcps::DataReader* reader) : reader_(reader) {}
    bool trigger_value() const override
    {
        return reader_ != nullptr && reader_->unread_count() > 0;
    }
    astutedds::dcps::DataReader* reader() const { return reader_; }
private:
    astutedds::dcps::DataReader* reader_{nullptr};
};

class StatusCondition : public Condition
{
public:
    explicit StatusCondition(astutedds::dcps::DataReader* reader) : reader_(reader) {}
    bool trigger_value() const override
    {
        return reader_ != nullptr && reader_->unread_count() > 0;
    }
    astutedds::dcps::DataReader* reader() const { return reader_; }
private:
    astutedds::dcps::DataReader* reader_{nullptr};
};

class GuardCondition : public Condition
{
public:
    GuardCondition() = default;
    bool trigger_value() const override { return trigger_; }
    void set_trigger_value(bool v)
    {
        trigger_ = v;
        if (v && notify_cb_) notify_cb_();
    }

    using NotifyCallback = std::function<void()>;
    void set_notify_callback(NotifyCallback cb) { notify_cb_ = std::move(cb); }

private:
    bool trigger_{false};
    NotifyCallback notify_cb_;
};

}  // namespace cond

// ============================================================================
// dds::core::WaitSet
// ============================================================================
class WaitSet
{
public:
    using ConditionSeq = std::vector<std::shared_ptr<cond::Condition>>;

    WaitSet& operator+=(std::shared_ptr<cond::Condition> cond)
    {
        std::lock_guard<std::mutex> lock(mutex_);
        conditions_.push_back(cond);

        // Register notify callback on the underlying DataReader so it
        // wakes us when data arrives (avoids polling).
        auto* reader = get_reader_from_condition(cond.get());
        if (reader)
        {
            reader->add_notify_callback([this] {
                cv_.notify_all();
            });
        }

        // Wire GuardCondition notification
        wire_guard_condition(cond.get());

        return *this;
    }

    WaitSet& operator-=(const std::shared_ptr<cond::Condition>& cond)
    {
        std::lock_guard<std::mutex> lock(mutex_);
        conditions_.erase(
            std::remove(conditions_.begin(), conditions_.end(), cond),
            conditions_.end());
        return *this;
    }

    void notify() { cv_.notify_all(); }

    ConditionSeq wait(const Duration& timeout = astutedds::rtps::Time_t::TIME_INFINITE())
    {
        using clock = std::chrono::steady_clock;
        const bool infinite_wait = (timeout.seconds == 0x7fffffff);
        const auto deadline = infinite_wait
            ? clock::time_point::max()
            : clock::now() +
                std::chrono::seconds(timeout.seconds) +
                std::chrono::nanoseconds(timeout.nanosec);

        std::unique_lock<std::mutex> lock(mutex_);
        while (true)
        {
            // Check all conditions
            ConditionSeq triggered;
            for (auto& c : conditions_)
            {
                if (c->trigger_value())
                    triggered.push_back(c);
            }
            if (!triggered.empty())
                return triggered;

            // Block until notified or timeout
            if (infinite_wait)
            {
                cv_.wait(lock);
            }
            else
            {
                if (cv_.wait_until(lock, deadline) == std::cv_status::timeout)
                    return {};  // Timeout — no conditions triggered
            }
        }
    }

    ConditionSeq get_conditions() const
    {
        std::lock_guard<std::mutex> lock(mutex_);
        return conditions_;
    }

private:
    static astutedds::dcps::DataReader* get_reader_from_condition(cond::Condition* c)
    {
        if (auto* rc = dynamic_cast<cond::ReadCondition*>(c))
            return rc->reader();
        if (auto* sc = dynamic_cast<cond::StatusCondition*>(c))
            return sc->reader();
        return nullptr;
    }

    void wire_guard_condition(cond::Condition* c)
    {
        if (auto* gc = dynamic_cast<cond::GuardCondition*>(c))
        {
            gc->set_notify_callback([this] { cv_.notify_all(); });
        }
    }

    mutable std::mutex mutex_;
    std::condition_variable cv_;
    ConditionSeq conditions_;
};

}  // namespace core

// ============================================================================
// dds::domain::DomainParticipant
// ============================================================================
namespace domain
{

class DomainParticipant
{
public:
    explicit DomainParticipant(uint32_t domain_id, uint32_t participant_id = 0)
        : native_(new astutedds::dcps::DomainParticipant(domain_id, participant_id))
    {
        native_->enable();
    }

    DomainParticipant(uint32_t domain_id,
                      const astutedds::dcps::DomainParticipantQos& qos,
                      uint32_t participant_id = 0)
        : native_(new astutedds::dcps::DomainParticipant(domain_id, participant_id))
    {
        native_->set_qos(qos);
        native_->enable();
    }

    DomainParticipant(const DomainParticipant&) = delete;
    DomainParticipant& operator=(const DomainParticipant&) = delete;

    ~DomainParticipant() { delete native_; }

    astutedds::dcps::DomainParticipant* native() const { return native_; }

private:
    astutedds::dcps::DomainParticipant* native_{nullptr};
};

}  // namespace domain

// ============================================================================
// dds::topic::Topic<T> and ContentFilteredTopic<T>
// ============================================================================
namespace topic
{

template <typename T>
class Topic
{
public:
    Topic(dds::domain::DomainParticipant& participant, const std::string& name)
        : participant_(&participant)
    {
        native_ = participant.native()->create_topic(name, resolve_type_name());
    }

    Topic(const Topic&) = delete;
    Topic& operator=(const Topic&) = delete;

    ~Topic()
    {
        if (native_ && participant_ && participant_->native())
            participant_->native()->delete_topic(native_);
    }

    astutedds::dcps::Topic* native() const { return native_; }

private:
    template <typename U>
    static auto resolve_type_name_impl(int) -> decltype(U::type_name(), std::string())
    {
        return U::type_name();
    }

    template <typename U>
    static auto resolve_type_name_impl(long) -> decltype(U::get_type_name(), std::string())
    {
        return U::get_type_name();
    }

    template <typename U>
    static std::string resolve_type_name_impl(...)
    {
        return typeid(U).name();
    }

    static std::string resolve_type_name() { return resolve_type_name_impl<T>(0); }

    dds::domain::DomainParticipant* participant_{nullptr};
    astutedds::dcps::Topic* native_{nullptr};
};

template <typename T>
class ContentFilteredTopic
{
public:
    ContentFilteredTopic(dds::domain::DomainParticipant& /*participant*/,
                         Topic<T>& topic,
                         const std::string& name,
                         const std::string& filter_expression,
                         const std::vector<std::string>& params = {})
        : native_(new astutedds::dcps::ContentFilteredTopic(
              name, topic.native(), filter_expression, params))
    {
    }

    ContentFilteredTopic(const ContentFilteredTopic&) = delete;
    ContentFilteredTopic& operator=(const ContentFilteredTopic&) = delete;

    ~ContentFilteredTopic() { delete native_; }

    astutedds::dcps::ContentFilteredTopic* native() const { return native_; }

private:
    astutedds::dcps::ContentFilteredTopic* native_{nullptr};
};

}  // namespace topic

// ============================================================================
// dds::sub::Subscriber and DataReader<T>
// ============================================================================
namespace sub
{

class Subscriber
{
public:
    explicit Subscriber(dds::domain::DomainParticipant& participant)
        : participant_(&participant),
          native_(participant.native()->create_subscriber())
    {
    }

    Subscriber(dds::domain::DomainParticipant& participant,
               const astutedds::dcps::SubscriberQos& qos)
        : participant_(&participant),
          native_(participant.native()->create_subscriber(qos))
    {
    }

    Subscriber(const Subscriber&) = delete;
    Subscriber& operator=(const Subscriber&) = delete;

    ~Subscriber()
    {
        if (native_ && participant_ && participant_->native())
            participant_->native()->delete_subscriber(native_);
    }

    astutedds::dcps::Subscriber* native() const { return native_; }

    astutedds::dcps::DataReaderQos default_datareader_qos() const
    {
        return astutedds::dcps::DataReaderQos{};
    }

    void set_qos(const astutedds::dcps::SubscriberQos& qos) { native_->set_qos(qos); }

private:
    dds::domain::DomainParticipant* participant_{nullptr};
    astutedds::dcps::Subscriber* native_{nullptr};
};

template <typename T>
class DataReader
{
public:
    DataReader(Subscriber& subscriber,
               dds::topic::Topic<T>& topic,
               const astutedds::dcps::DataReaderQos& qos = {})
        : subscriber_(&subscriber)
    {
        raw_reader_ = subscriber.native()->create_datareader(topic.native(), qos);
    }

    DataReader(Subscriber& subscriber,
               dds::topic::ContentFilteredTopic<T>& cft,
               const astutedds::dcps::DataReaderQos& qos = {})
        : subscriber_(&subscriber)
    {
        raw_reader_ = subscriber.native()->create_datareader(
            static_cast<astutedds::dcps::TopicDescription*>(cft.native()), qos);
    }

    DataReader(const DataReader&) = delete;
    DataReader& operator=(const DataReader&) = delete;

    ~DataReader()
    {
        if (raw_reader_ && subscriber_ && subscriber_->native())
            subscriber_->native()->delete_datareader(raw_reader_);
    }

    astutedds::dcps::ReturnCode_t take_next(T& sample,
                                            astutedds::dcps::SampleInfo& info)
    {
        using Codec2 = typename dds::topic::TypeTraits<T>::xcdr2_codec;
        using Codec1 = typename dds::topic::TypeTraits<T>::xcdr1_codec;
        astutedds::dcps::TypedDataReader<T, Codec2, Codec1> typed(raw_reader_);
        return typed.take_next(sample, info);
    }

    astutedds::dcps::ReturnCode_t read_next(T& sample,
                                            astutedds::dcps::SampleInfo& info)
    {
        using Codec2 = typename dds::topic::TypeTraits<T>::xcdr2_codec;
        using Codec1 = typename dds::topic::TypeTraits<T>::xcdr1_codec;
        astutedds::dcps::TypedDataReader<T, Codec2, Codec1> typed(raw_reader_);
        return typed.read_next(sample, info);
    }

    std::shared_ptr<dds::core::cond::StatusCondition> status_condition()
    {
        return std::make_shared<dds::core::cond::StatusCondition>(raw_reader_);
    }

    std::shared_ptr<dds::core::cond::ReadCondition> read_condition()
    {
        return std::make_shared<dds::core::cond::ReadCondition>(raw_reader_);
    }

    astutedds::dcps::DataReader* raw() const { return raw_reader_; }

private:
    Subscriber* subscriber_{nullptr};
    astutedds::dcps::DataReader* raw_reader_{nullptr};
};

}  // namespace sub

// ============================================================================
// dds::pub::Publisher and DataWriter<T>
// ============================================================================
namespace pub
{

class Publisher
{
public:
    explicit Publisher(dds::domain::DomainParticipant& participant)
        : participant_(&participant),
          native_(participant.native()->create_publisher())
    {
    }

    Publisher(dds::domain::DomainParticipant& participant,
              const astutedds::dcps::PublisherQos& qos)
        : participant_(&participant),
          native_(participant.native()->create_publisher(qos))
    {
    }

    Publisher(const Publisher&) = delete;
    Publisher& operator=(const Publisher&) = delete;

    ~Publisher()
    {
        if (native_ && participant_ && participant_->native())
            participant_->native()->delete_publisher(native_);
    }

    astutedds::dcps::Publisher* native() const { return native_; }

    astutedds::dcps::DataWriterQos default_datawriter_qos() const
    {
        return astutedds::dcps::DataWriterQos{};
    }

    void set_qos(const astutedds::dcps::PublisherQos& qos) { native_->set_qos(qos); }

private:
    dds::domain::DomainParticipant* participant_{nullptr};
    astutedds::dcps::Publisher* native_{nullptr};
};

template <typename T>
class DataWriter
{
public:
    DataWriter(Publisher& publisher,
               dds::topic::Topic<T>& topic,
               const astutedds::dcps::DataWriterQos& qos = {})
        : publisher_(&publisher)
    {
        raw_writer_ = publisher.native()->create_datawriter(topic.native(), qos);
    }

    DataWriter(const DataWriter&) = delete;
    DataWriter& operator=(const DataWriter&) = delete;

    ~DataWriter()
    {
        if (raw_writer_ && publisher_ && publisher_->native())
            publisher_->native()->delete_datawriter(raw_writer_);
    }

    bool write(const T& sample)
    {
        using Codec2 = typename dds::topic::TypeTraits<T>::xcdr2_codec;
        using Codec1 = typename dds::topic::TypeTraits<T>::xcdr1_codec;
        astutedds::dcps::TypedDataWriter<T, Codec2, Codec1> typed(raw_writer_);
        return typed.write(sample);
    }

    astutedds::dcps::DataWriter* raw() const { return raw_writer_; }

private:
    Publisher* publisher_{nullptr};
    astutedds::dcps::DataWriter* raw_writer_{nullptr};
};

}  // namespace pub

}  // namespace dds

#endif  // ASTUTEDDS_DCPS_COREPOLICY_HPP