File CorePolicy.hpp
File List > astutedds > dcps > CorePolicy.hpp
Go to the documentation of this file
//
// Copyright (c) 2026, Astute Systems PTY LTD
//
// This file is part of the Astute DDS developed by Astute Systems.
//
// See the commercial LICENSE file in the project root for full license details.
//
// @file CorePolicy.hpp
// @brief DDS C++ PSM (OMG DDS-PSM-Cxx 1.0 / ISO/IEC 19568:2017) namespace wrappers.
//
// Provides the standard dds:: namespace hierarchy over the native astutedds::dcps::
// API. Compatible with code written for RTI Connext modern C++ API and Eclipse
// Cyclone DDS C++ bindings.
//
// Namespace mapping:
// dds::core::policy::* QoS policy factory helpers (14 policies)
// dds::core::cond::* Condition types for WaitSet
// dds::core::WaitSet Synchronous event-wait facility
// dds::domain::DomainParticipant
// dds::topic::Topic<T>, ContentFilteredTopic<T>, TypeTraits<T>
// dds::sub::Subscriber, DataReader<T>
// dds::pub::Publisher, DataWriter<T>
//
#ifndef ASTUTEDDS_DCPS_COREPOLICY_HPP
#define ASTUTEDDS_DCPS_COREPOLICY_HPP
#include <astutedds/cdr/xcdr2_codec.hpp>
#include <astutedds/dcps/content_filtered_topic.hpp>
#include <astutedds/dcps/qos.hpp>
#include <astutedds/dcps/typed.hpp>
#include <algorithm>
#include <chrono>
#include <initializer_list>
#include <memory>
#include <mutex>
#include <string>
#include <thread>
#include <typeinfo>
#include <vector>
// ============================================================================
// dds — OMG DDS-PSM-Cxx 1.0 top-level namespace
// ============================================================================
namespace dds
{
// Forward declarations
namespace topic
{
template <typename T>
class Topic;
template <typename T>
class ContentFilteredTopic;
} // namespace topic
namespace sub
{
class Subscriber;
template <typename T>
class DataReader;
} // namespace sub
namespace pub
{
class Publisher;
template <typename T>
class DataWriter;
} // namespace pub
namespace core
{
namespace cond
{
class StatusCondition;
class ReadCondition;
class GuardCondition;
} // namespace cond
} // namespace core
// ============================================================================
// dds::topic::TypeTraits
// ============================================================================
namespace topic
{
template <typename T>
struct TypeTraits
{
using xcdr2_codec = T;
using xcdr1_codec = T;
};
} // namespace topic
// ============================================================================
// dds::core — Duration, QoS policies, conditions, WaitSet
// ============================================================================
namespace core
{
using Duration = astutedds::rtps::Duration_t;
inline Duration Duration_infinite() { return astutedds::rtps::Time_t::TIME_INFINITE(); }
inline Duration Duration_zero() { return astutedds::rtps::Time_t::TIME_ZERO(); }
// ============================================================================
// dds::core::policy
// ============================================================================
namespace policy
{
enum class DataRepresentationId : int16_t
{
XCDR1 = 0x0000,
XCDR2 = 0x0002
};
inline astutedds::dcps::DataRepresentationModifier
DataRepresentation(std::initializer_list<DataRepresentationId> ids)
{
std::vector<int16_t> values;
values.reserve(ids.size());
std::transform(ids.begin(), ids.end(), std::back_inserter(values),
[](auto id) { return static_cast<int16_t>(id); });
return astutedds::dcps::DataRepresentation(std::move(values));
}
struct Reliability
{
static astutedds::dcps::ReliabilityQosModifier Reliable(
Duration max_blocking_time = Duration(0, 100000000))
{
return astutedds::dcps::Reliability(
astutedds::dcps::ReliabilityQosPolicyKind::RELIABLE_RELIABILITY_QOS,
max_blocking_time);
}
static astutedds::dcps::ReliabilityQosModifier BestEffort()
{
return astutedds::dcps::Reliability(
astutedds::dcps::ReliabilityQosPolicyKind::BEST_EFFORT_RELIABILITY_QOS,
Duration(0, 0));
}
};
struct History
{
static astutedds::dcps::HistoryKeepLastModifier KeepLast(int32_t depth)
{
return astutedds::dcps::HistoryKeepLast(depth);
}
static astutedds::dcps::HistoryKeepAllModifier KeepAll()
{
return astutedds::dcps::HistoryKeepAll();
}
};
struct Durability
{
static astutedds::dcps::DurabilityQosModifier Volatile()
{
return astutedds::dcps::Durability(
astutedds::dcps::DurabilityQosPolicyKind::VOLATILE_DURABILITY_QOS);
}
static astutedds::dcps::DurabilityQosModifier TransientLocal()
{
return astutedds::dcps::Durability(
astutedds::dcps::DurabilityQosPolicyKind::TRANSIENT_LOCAL_DURABILITY_QOS);
}
static astutedds::dcps::DurabilityQosModifier Transient()
{
return astutedds::dcps::Durability(
astutedds::dcps::DurabilityQosPolicyKind::TRANSIENT_DURABILITY_QOS);
}
static astutedds::dcps::DurabilityQosModifier Persistent()
{
return astutedds::dcps::Durability(
astutedds::dcps::DurabilityQosPolicyKind::PERSISTENT_DURABILITY_QOS);
}
};
struct Deadline
{
static astutedds::dcps::DeadlineModifier period(Duration d)
{
return astutedds::dcps::Deadline(d);
}
};
struct Liveliness
{
static astutedds::dcps::LivelinessModifier Automatic(
Duration lease = astutedds::rtps::Time_t::TIME_INFINITE())
{
return astutedds::dcps::Liveliness(
astutedds::dcps::LivelinessQosPolicyKind::AUTOMATIC_LIVELINESS_QOS, lease);
}
static astutedds::dcps::LivelinessModifier ManualByParticipant(
Duration lease = astutedds::rtps::Time_t::TIME_INFINITE())
{
return astutedds::dcps::Liveliness(
astutedds::dcps::LivelinessQosPolicyKind::MANUAL_BY_PARTICIPANT_LIVELINESS_QOS, lease);
}
static astutedds::dcps::LivelinessModifier ManualByTopic(
Duration lease = astutedds::rtps::Time_t::TIME_INFINITE())
{
return astutedds::dcps::Liveliness(
astutedds::dcps::LivelinessQosPolicyKind::MANUAL_BY_TOPIC_LIVELINESS_QOS, lease);
}
};
struct Ownership
{
static astutedds::dcps::OwnershipModifier Shared()
{
return astutedds::dcps::Ownership(
astutedds::dcps::OwnershipQosPolicyKind::SHARED_OWNERSHIP_QOS);
}
static astutedds::dcps::OwnershipModifier Exclusive()
{
return astutedds::dcps::Ownership(
astutedds::dcps::OwnershipQosPolicyKind::EXCLUSIVE_OWNERSHIP_QOS);
}
};
inline astutedds::dcps::OwnershipStrengthModifier OwnershipStrength(int32_t value)
{
return astutedds::dcps::OwnershipStrength(value);
}
inline astutedds::dcps::PartitionModifier Partition(std::vector<std::string> names)
{
return astutedds::dcps::Partition(std::move(names));
}
inline astutedds::dcps::PartitionModifier Partition(std::string name)
{
return astutedds::dcps::Partition(std::vector<std::string>{std::move(name)});
}
inline astutedds::dcps::ResourceLimitsModifier ResourceLimits(
int32_t max_samples, int32_t max_instances = -1, int32_t max_samples_per_instance = -1)
{
return astutedds::dcps::ResourceLimits(max_samples, max_instances,
max_samples_per_instance);
}
inline astutedds::dcps::LifespanModifier Lifespan(Duration duration)
{
return astutedds::dcps::Lifespan(duration);
}
// ---------------------------------------------------------------------------
// LatencyBudget (DDS §2.2.3.9) — DataWriter and DataReader
// ---------------------------------------------------------------------------
inline astutedds::dcps::LatencyBudgetModifier LatencyBudget(Duration duration)
{
return astutedds::dcps::LatencyBudget(duration);
}
// ---------------------------------------------------------------------------
// TimeBasedFilter (DDS §2.2.3.16) — DataReader only
// ---------------------------------------------------------------------------
inline astutedds::dcps::TimeBasedFilterModifier TimeBasedFilter(Duration minimum_separation)
{
return astutedds::dcps::TimeBasedFilter(minimum_separation);
}
// ---------------------------------------------------------------------------
// DestinationOrder (DDS §2.2.3.7)
// ---------------------------------------------------------------------------
struct DestinationOrder
{
static astutedds::dcps::DestinationOrderModifier ByReceptionTimestamp()
{
return astutedds::dcps::DestinationOrder(
astutedds::dcps::DestinationOrderQosPolicyKind::
BY_RECEPTION_TIMESTAMP_DESTINATIONORDER_QOS);
}
static astutedds::dcps::DestinationOrderModifier BySourceTimestamp()
{
return astutedds::dcps::DestinationOrder(
astutedds::dcps::DestinationOrderQosPolicyKind::
BY_SOURCE_TIMESTAMP_DESTINATIONORDER_QOS);
}
};
// ---------------------------------------------------------------------------
// WriterDataLifecycle (DDS §2.2.3.18) — DataWriter only
// ---------------------------------------------------------------------------
inline astutedds::dcps::WriterDataLifecycleModifier WriterDataLifecycle(
bool autodispose_unregistered_instances = true)
{
return astutedds::dcps::WriterDataLifecycle(autodispose_unregistered_instances);
}
// ---------------------------------------------------------------------------
// ReaderDataLifecycle (DDS §2.2.3.19) — DataReader only
// ---------------------------------------------------------------------------
inline astutedds::dcps::ReaderDataLifecycleModifier ReaderDataLifecycle(
Duration nowriter_delay = astutedds::rtps::Time_t::TIME_INFINITE(),
Duration disposed_delay = astutedds::rtps::Time_t::TIME_INFINITE())
{
return astutedds::dcps::ReaderDataLifecycle(nowriter_delay, disposed_delay);
}
// ---------------------------------------------------------------------------
// Presentation (DDS §2.2.3.6) — Publisher and Subscriber
// ---------------------------------------------------------------------------
struct Presentation
{
enum class AccessScopeKind
{
INSTANCE = 0, // INSTANCE_PRESENTATION_QOS
TOPIC = 1, // TOPIC_PRESENTATION_QOS
GROUP = 2 // GROUP_PRESENTATION_QOS
};
static astutedds::dcps::PresentationModifier InstanceAccessScope(
bool coherent = false, bool ordered = false)
{
return astutedds::dcps::Presentation(
astutedds::dcps::PresentationQosPolicyAccessScopeKind::INSTANCE_PRESENTATION_QOS,
coherent, ordered);
}
static astutedds::dcps::PresentationModifier TopicAccessScope(
bool coherent = false, bool ordered = false)
{
return astutedds::dcps::Presentation(
astutedds::dcps::PresentationQosPolicyAccessScopeKind::TOPIC_PRESENTATION_QOS,
coherent, ordered);
}
static astutedds::dcps::PresentationModifier GroupAccessScope(
bool coherent = false, bool ordered = false)
{
return astutedds::dcps::Presentation(
astutedds::dcps::PresentationQosPolicyAccessScopeKind::GROUP_PRESENTATION_QOS,
coherent, ordered);
}
};
// ---------------------------------------------------------------------------
// UserData (DDS §2.2.3.2) — DataWriter / DataReader
// ---------------------------------------------------------------------------
inline astutedds::dcps::UserDataModifier UserData(std::vector<uint8_t> value)
{
return astutedds::dcps::UserData(std::move(value));
}
// ---------------------------------------------------------------------------
// GroupData (DDS §2.2.3.x) — Publisher / Subscriber
// ---------------------------------------------------------------------------
inline astutedds::dcps::GroupDataModifier GroupData(std::vector<uint8_t> value)
{
return astutedds::dcps::GroupData(std::move(value));
}
} // namespace policy
// ============================================================================
// dds::core::cond
// ============================================================================
namespace cond
{
class Condition
{
public:
virtual ~Condition() = default;
virtual bool trigger_value() const = 0;
};
class ReadCondition : public Condition
{
public:
explicit ReadCondition(astutedds::dcps::DataReader* reader) : reader_(reader) {}
bool trigger_value() const override
{
return reader_ != nullptr && reader_->unread_count() > 0;
}
astutedds::dcps::DataReader* reader() const { return reader_; }
private:
astutedds::dcps::DataReader* reader_{nullptr};
};
class StatusCondition : public Condition
{
public:
explicit StatusCondition(astutedds::dcps::DataReader* reader) : reader_(reader) {}
bool trigger_value() const override
{
return reader_ != nullptr && reader_->unread_count() > 0;
}
astutedds::dcps::DataReader* reader() const { return reader_; }
private:
astutedds::dcps::DataReader* reader_{nullptr};
};
class GuardCondition : public Condition
{
public:
GuardCondition() = default;
bool trigger_value() const override { return trigger_; }
void set_trigger_value(bool v)
{
trigger_ = v;
if (v && notify_cb_) notify_cb_();
}
using NotifyCallback = std::function<void()>;
void set_notify_callback(NotifyCallback cb) { notify_cb_ = std::move(cb); }
private:
bool trigger_{false};
NotifyCallback notify_cb_;
};
} // namespace cond
// ============================================================================
// dds::core::WaitSet
// ============================================================================
class WaitSet
{
public:
using ConditionSeq = std::vector<std::shared_ptr<cond::Condition>>;
WaitSet& operator+=(std::shared_ptr<cond::Condition> cond)
{
std::lock_guard<std::mutex> lock(mutex_);
conditions_.push_back(cond);
// Register notify callback on the underlying DataReader so it
// wakes us when data arrives (avoids polling).
auto* reader = get_reader_from_condition(cond.get());
if (reader)
{
reader->add_notify_callback([this] {
cv_.notify_all();
});
}
// Wire GuardCondition notification
wire_guard_condition(cond.get());
return *this;
}
WaitSet& operator-=(const std::shared_ptr<cond::Condition>& cond)
{
std::lock_guard<std::mutex> lock(mutex_);
conditions_.erase(
std::remove(conditions_.begin(), conditions_.end(), cond),
conditions_.end());
return *this;
}
void notify() { cv_.notify_all(); }
ConditionSeq wait(const Duration& timeout = astutedds::rtps::Time_t::TIME_INFINITE())
{
using clock = std::chrono::steady_clock;
const bool infinite_wait = (timeout.seconds == 0x7fffffff);
const auto deadline = infinite_wait
? clock::time_point::max()
: clock::now() +
std::chrono::seconds(timeout.seconds) +
std::chrono::nanoseconds(timeout.nanosec);
std::unique_lock<std::mutex> lock(mutex_);
while (true)
{
// Check all conditions
ConditionSeq triggered;
for (auto& c : conditions_)
{
if (c->trigger_value())
triggered.push_back(c);
}
if (!triggered.empty())
return triggered;
// Block until notified or timeout
if (infinite_wait)
{
cv_.wait(lock);
}
else
{
if (cv_.wait_until(lock, deadline) == std::cv_status::timeout)
return {}; // Timeout — no conditions triggered
}
}
}
ConditionSeq get_conditions() const
{
std::lock_guard<std::mutex> lock(mutex_);
return conditions_;
}
private:
static astutedds::dcps::DataReader* get_reader_from_condition(cond::Condition* c)
{
if (auto* rc = dynamic_cast<cond::ReadCondition*>(c))
return rc->reader();
if (auto* sc = dynamic_cast<cond::StatusCondition*>(c))
return sc->reader();
return nullptr;
}
void wire_guard_condition(cond::Condition* c)
{
if (auto* gc = dynamic_cast<cond::GuardCondition*>(c))
{
gc->set_notify_callback([this] { cv_.notify_all(); });
}
}
mutable std::mutex mutex_;
std::condition_variable cv_;
ConditionSeq conditions_;
};
} // namespace core
// ============================================================================
// dds::domain::DomainParticipant
// ============================================================================
namespace domain
{
class DomainParticipant
{
public:
explicit DomainParticipant(uint32_t domain_id, uint32_t participant_id = 0)
: native_(new astutedds::dcps::DomainParticipant(domain_id, participant_id))
{
native_->enable();
}
DomainParticipant(uint32_t domain_id,
const astutedds::dcps::DomainParticipantQos& qos,
uint32_t participant_id = 0)
: native_(new astutedds::dcps::DomainParticipant(domain_id, participant_id))
{
native_->set_qos(qos);
native_->enable();
}
DomainParticipant(const DomainParticipant&) = delete;
DomainParticipant& operator=(const DomainParticipant&) = delete;
~DomainParticipant() { delete native_; }
astutedds::dcps::DomainParticipant* native() const { return native_; }
private:
astutedds::dcps::DomainParticipant* native_{nullptr};
};
} // namespace domain
// ============================================================================
// dds::topic::Topic<T> and ContentFilteredTopic<T>
// ============================================================================
namespace topic
{
template <typename T>
class Topic
{
public:
Topic(dds::domain::DomainParticipant& participant, const std::string& name)
: participant_(&participant)
{
native_ = participant.native()->create_topic(name, resolve_type_name());
}
Topic(const Topic&) = delete;
Topic& operator=(const Topic&) = delete;
~Topic()
{
if (native_ && participant_ && participant_->native())
participant_->native()->delete_topic(native_);
}
astutedds::dcps::Topic* native() const { return native_; }
private:
template <typename U>
static auto resolve_type_name_impl(int) -> decltype(U::type_name(), std::string())
{
return U::type_name();
}
template <typename U>
static auto resolve_type_name_impl(long) -> decltype(U::get_type_name(), std::string())
{
return U::get_type_name();
}
template <typename U>
static std::string resolve_type_name_impl(...)
{
return typeid(U).name();
}
static std::string resolve_type_name() { return resolve_type_name_impl<T>(0); }
dds::domain::DomainParticipant* participant_{nullptr};
astutedds::dcps::Topic* native_{nullptr};
};
template <typename T>
class ContentFilteredTopic
{
public:
ContentFilteredTopic(dds::domain::DomainParticipant& /*participant*/,
Topic<T>& topic,
const std::string& name,
const std::string& filter_expression,
const std::vector<std::string>& params = {})
: native_(new astutedds::dcps::ContentFilteredTopic(
name, topic.native(), filter_expression, params))
{
}
ContentFilteredTopic(const ContentFilteredTopic&) = delete;
ContentFilteredTopic& operator=(const ContentFilteredTopic&) = delete;
~ContentFilteredTopic() { delete native_; }
astutedds::dcps::ContentFilteredTopic* native() const { return native_; }
private:
astutedds::dcps::ContentFilteredTopic* native_{nullptr};
};
} // namespace topic
// ============================================================================
// dds::sub::Subscriber and DataReader<T>
// ============================================================================
namespace sub
{
class Subscriber
{
public:
explicit Subscriber(dds::domain::DomainParticipant& participant)
: participant_(&participant),
native_(participant.native()->create_subscriber())
{
}
Subscriber(dds::domain::DomainParticipant& participant,
const astutedds::dcps::SubscriberQos& qos)
: participant_(&participant),
native_(participant.native()->create_subscriber(qos))
{
}
Subscriber(const Subscriber&) = delete;
Subscriber& operator=(const Subscriber&) = delete;
~Subscriber()
{
if (native_ && participant_ && participant_->native())
participant_->native()->delete_subscriber(native_);
}
astutedds::dcps::Subscriber* native() const { return native_; }
astutedds::dcps::DataReaderQos default_datareader_qos() const
{
return astutedds::dcps::DataReaderQos{};
}
void set_qos(const astutedds::dcps::SubscriberQos& qos) { native_->set_qos(qos); }
private:
dds::domain::DomainParticipant* participant_{nullptr};
astutedds::dcps::Subscriber* native_{nullptr};
};
template <typename T>
class DataReader
{
public:
DataReader(Subscriber& subscriber,
dds::topic::Topic<T>& topic,
const astutedds::dcps::DataReaderQos& qos = {})
: subscriber_(&subscriber)
{
raw_reader_ = subscriber.native()->create_datareader(topic.native(), qos);
}
DataReader(Subscriber& subscriber,
dds::topic::ContentFilteredTopic<T>& cft,
const astutedds::dcps::DataReaderQos& qos = {})
: subscriber_(&subscriber)
{
raw_reader_ = subscriber.native()->create_datareader(
static_cast<astutedds::dcps::TopicDescription*>(cft.native()), qos);
}
DataReader(const DataReader&) = delete;
DataReader& operator=(const DataReader&) = delete;
~DataReader()
{
if (raw_reader_ && subscriber_ && subscriber_->native())
subscriber_->native()->delete_datareader(raw_reader_);
}
astutedds::dcps::ReturnCode_t take_next(T& sample,
astutedds::dcps::SampleInfo& info)
{
using Codec2 = typename dds::topic::TypeTraits<T>::xcdr2_codec;
using Codec1 = typename dds::topic::TypeTraits<T>::xcdr1_codec;
astutedds::dcps::TypedDataReader<T, Codec2, Codec1> typed(raw_reader_);
return typed.take_next(sample, info);
}
astutedds::dcps::ReturnCode_t read_next(T& sample,
astutedds::dcps::SampleInfo& info)
{
using Codec2 = typename dds::topic::TypeTraits<T>::xcdr2_codec;
using Codec1 = typename dds::topic::TypeTraits<T>::xcdr1_codec;
astutedds::dcps::TypedDataReader<T, Codec2, Codec1> typed(raw_reader_);
return typed.read_next(sample, info);
}
std::shared_ptr<dds::core::cond::StatusCondition> status_condition()
{
return std::make_shared<dds::core::cond::StatusCondition>(raw_reader_);
}
std::shared_ptr<dds::core::cond::ReadCondition> read_condition()
{
return std::make_shared<dds::core::cond::ReadCondition>(raw_reader_);
}
astutedds::dcps::DataReader* raw() const { return raw_reader_; }
private:
Subscriber* subscriber_{nullptr};
astutedds::dcps::DataReader* raw_reader_{nullptr};
};
} // namespace sub
// ============================================================================
// dds::pub::Publisher and DataWriter<T>
// ============================================================================
namespace pub
{
class Publisher
{
public:
explicit Publisher(dds::domain::DomainParticipant& participant)
: participant_(&participant),
native_(participant.native()->create_publisher())
{
}
Publisher(dds::domain::DomainParticipant& participant,
const astutedds::dcps::PublisherQos& qos)
: participant_(&participant),
native_(participant.native()->create_publisher(qos))
{
}
Publisher(const Publisher&) = delete;
Publisher& operator=(const Publisher&) = delete;
~Publisher()
{
if (native_ && participant_ && participant_->native())
participant_->native()->delete_publisher(native_);
}
astutedds::dcps::Publisher* native() const { return native_; }
astutedds::dcps::DataWriterQos default_datawriter_qos() const
{
return astutedds::dcps::DataWriterQos{};
}
void set_qos(const astutedds::dcps::PublisherQos& qos) { native_->set_qos(qos); }
private:
dds::domain::DomainParticipant* participant_{nullptr};
astutedds::dcps::Publisher* native_{nullptr};
};
template <typename T>
class DataWriter
{
public:
DataWriter(Publisher& publisher,
dds::topic::Topic<T>& topic,
const astutedds::dcps::DataWriterQos& qos = {})
: publisher_(&publisher)
{
raw_writer_ = publisher.native()->create_datawriter(topic.native(), qos);
}
DataWriter(const DataWriter&) = delete;
DataWriter& operator=(const DataWriter&) = delete;
~DataWriter()
{
if (raw_writer_ && publisher_ && publisher_->native())
publisher_->native()->delete_datawriter(raw_writer_);
}
bool write(const T& sample)
{
using Codec2 = typename dds::topic::TypeTraits<T>::xcdr2_codec;
using Codec1 = typename dds::topic::TypeTraits<T>::xcdr1_codec;
astutedds::dcps::TypedDataWriter<T, Codec2, Codec1> typed(raw_writer_);
return typed.write(sample);
}
astutedds::dcps::DataWriter* raw() const { return raw_writer_; }
private:
Publisher* publisher_{nullptr};
astutedds::dcps::DataWriter* raw_writer_{nullptr};
};
} // namespace pub
} // namespace dds
#endif // ASTUTEDDS_DCPS_COREPOLICY_HPP