File dcps.hpp
File List > astutedds > dcps > dcps.hpp
Go to the documentation of this file
#ifndef ASTUTEDDS_DCPS_DCPS_HPP
#define ASTUTEDDS_DCPS_DCPS_HPP
#include <astutedds/dcps/qos.hpp>
#include <astutedds/dcps/dds_psm_types.hpp>
#include <astutedds/dcps/topic_description.hpp>
#include <astutedds/rtps/rtps_types.hpp>
#include <astutedds/rtps/udp_transport.hpp>
#include <memory>
#include <mutex>
#include <vector>
#include <deque>
#include <string>
#include <functional>
#include <set>
#include <map>
#include <atomic>
#include <thread>
#include <chrono>
#include <condition_variable>
namespace astutedds::dcps
{
// Forward declarations
class DomainParticipant;
class DomainParticipantFactory;
class Topic;
class Publisher;
class Subscriber;
class DataWriter;
class DataReader;
class ContentFilteredTopic;
// ReturnCode_t is now defined as an unscoped enum in dds_psm_types.hpp
// (included above). No duplicate definition here.
struct SampleInfo
{
bool valid_data{false};
rtps::Time_t source_timestamp{};
InstanceHandle_t instance_handle{HANDLE_NIL};
InstanceHandle_t publication_handle{HANDLE_NIL};
int32_t disposed_generation_count{0};
int32_t no_writers_generation_count{0};
int32_t sample_rank{0};
int32_t generation_rank{0};
int32_t absolute_generation_rank{0};
// DDS PSM state fields
SampleStateKind sample_state{NOT_READ_SAMPLE_STATE};
ViewStateKind view_state{NEW_VIEW_STATE};
InstanceStateKind instance_state{ALIVE_INSTANCE_STATE};
};
struct CacheChange
{
rtps::SequenceNumber_t sequenceNumber{};
std::vector<uint8_t> serializedData;
rtps::Time_t sourceTimestamp{};
rtps::ChangeKind_t kind{rtps::ChangeKind_t::ALIVE};
};
struct ReceivedSample
{
rtps::SequenceNumber_t sequenceNumber{};
std::vector<uint8_t> serializedData;
rtps::Time_t sourceTimestamp{};
rtps::GUID_t writerGuid{};
bool valid{true};
bool read{false};
int32_t ownershipStrength{0}; // For EXCLUSIVE ownership filtering
InstanceHandle_t instance_handle{HANDLE_NIL}; // Computed from @key fields
};
class Topic : public TopicDescription
{
public:
Topic(const std::string &name, const std::string &type_name, DomainParticipant *participant);
~Topic() override = default;
// TopicDescription interface
const char *get_name() const override { return name_.c_str(); }
const char *get_type_name() const override { return type_name_.c_str(); }
// Legacy accessors (used by shapes_demo)
const std::string &name() const { return name_; }
const std::string &type_name() const { return type_name_; }
DomainParticipant *participant() const { return participant_; }
void set_qos(const TopicQos &qos) { qos_ = qos; }
const TopicQos &get_qos() const { return qos_; }
private:
std::string name_;
std::string type_name_;
DomainParticipant *participant_;
TopicQos qos_;
};
class DataWriter
{
public:
DataWriter(Topic *topic, Publisher *publisher);
virtual ~DataWriter() = default;
DataWriter(const DataWriter &) = delete;
DataWriter &operator=(const DataWriter &) = delete;
bool write(const std::vector<uint8_t> &data);
bool write(const std::vector<uint8_t> &xcdr1Data,
const std::vector<uint8_t> &xcdr2Data);
uint64_t register_instance();
bool unregister_instance(uint64_t handle);
bool dispose(uint64_t handle);
ReturnCode_t wait_for_acknowledgments(const Duration_t &max_wait)
{
if (qos_.reliability.kind != ReliabilityQosPolicyKind::RELIABLE_RELIABILITY_QOS)
return ReturnCode_t::RETCODE_OK;
// Target: all samples written so far have been acknowledged.
// last_acked_sn_ is advanced by process_acknack() as readers confirm receipt.
const uint64_t target =
next_sequence_number_.low > 1 ? next_sequence_number_.low - 1 : 0;
if (last_acked_sn_ >= target)
return ReturnCode_t::RETCODE_OK;
const auto duration = std::chrono::seconds(max_wait.sec)
+ std::chrono::nanoseconds(max_wait.nanosec);
std::unique_lock<std::mutex> lock(mutex_);
const bool acked = ack_cv_.wait_for(lock, duration,
[this, target] { return last_acked_sn_ >= target; });
return acked ? ReturnCode_t::RETCODE_OK : ReturnCode_t::RETCODE_TIMEOUT;
}
// Accessors
Topic *topic() const { return topic_; }
Topic *get_topic() const { return topic_; } // PSM alias
Publisher *publisher() const { return publisher_; }
const rtps::GUID_t &guid() const { return guid_; }
void set_guid(const rtps::GUID_t &guid) { guid_ = guid; }
void set_qos(const DataWriterQos &qos);
const DataWriterQos &get_qos() const { return qos_; }
const std::deque<CacheChange> &history_cache() const { return history_cache_; }
// Reliability support
void add_matched_reader(const rtps::GUID_t &reader_guid);
void remove_matched_reader(const rtps::GUID_t &reader_guid);
void process_acknack(const rtps::GUID_t &reader_guid,
const std::set<uint32_t> &requested_sns,
rtps::Count_t count);
// Transport callback - set by DomainParticipant
using SendCallback = std::function<void(const std::vector<uint8_t> &)>;
void set_send_callback(SendCallback callback) { send_callback_ = callback; }
// Dual-encoding transport callback (XCDR1 + XCDR2)
using SendDualCallback = std::function<void(const std::vector<uint8_t> &,
const std::vector<uint8_t> &)>;
void set_send_dual_callback(SendDualCallback callback) { send_dual_callback_ = callback; }
// Heartbeat callback — invoked by send_heartbeat(); set by DomainParticipant
// to trigger an out-of-band HEARTBEAT submessage via the transport.
using HeartbeatCallback = std::function<void()>;
void set_heartbeat_callback(HeartbeatCallback callback) { heartbeat_callback_ = std::move(callback); }
// DataWriter listener
void set_listener(DataWriterListener *listener, StatusMask mask = STATUS_MASK_ALL)
{
listener_ = listener;
listener_mask_ = mask;
}
DataWriterListener *get_listener() const { return listener_; }
StatusMask get_listener_mask() const { return listener_mask_; }
// Deadline monitoring
void start_deadline_timer();
void stop_deadline_timer();
void notify_write(); // Reset deadline timer on each write
// Lifespan: remove expired samples from cache
void remove_expired_samples();
private:
void send_heartbeat();
void retransmit_samples(const std::set<uint32_t> &sequence_numbers);
bool check_resource_limits();
void apply_history_and_limits();
void deadline_thread_func();
Topic *topic_;
Publisher *publisher_;
rtps::GUID_t guid_;
rtps::SequenceNumber_t next_sequence_number_{0, 1};
std::deque<CacheChange> history_cache_;
DataWriterQos qos_;
mutable std::mutex mutex_;
SendCallback send_callback_;
SendDualCallback send_dual_callback_;
HeartbeatCallback heartbeat_callback_;
// Matched reader tracking (for wait_for_acknowledgments and status reporting)
std::set<rtps::GUID_t> matched_readers_;
// Acknowledgment tracking: last_acked_sn_ is set to the highest SN
// confirmed by all matched readers; ack_cv_ is signaled on each update.
uint64_t last_acked_sn_{0};
std::condition_variable ack_cv_;
// DataWriter listener
DataWriterListener *listener_{nullptr};
StatusMask listener_mask_{STATUS_MASK_ALL};
// Deadline monitoring
std::thread deadline_thread_;
std::condition_variable deadline_cv_;
std::atomic<bool> deadline_running_{false};
std::chrono::steady_clock::time_point last_write_time_;
// Lifespan tracking — each cache entry gets a creation timestamp
std::deque<std::chrono::steady_clock::time_point> cache_timestamps_;
};
class DataReader
{
public:
DataReader(Topic *topic, Subscriber *subscriber);
virtual ~DataReader() = default;
DataReader(const DataReader &) = delete;
DataReader &operator=(const DataReader &) = delete;
std::vector<ReceivedSample> read(size_t max_samples = 0xFFFFFFFF);
std::vector<ReceivedSample> take(size_t max_samples = 0xFFFFFFFF);
ReturnCode_t read_next_sample(std::vector<uint8_t> &data, SampleInfo &info);
ReturnCode_t take_next_sample(std::vector<uint8_t> &data, SampleInfo &info);
void add_sample(const ReceivedSample &sample);
ReturnCode_t get_key_value(std::vector<uint8_t> &key_holder, InstanceHandle_t handle);
ReturnCode_t return_loan(std::vector<std::vector<uint8_t>> & /*data_values*/,
std::vector<SampleInfo> & /*sample_infos*/)
{
return ReturnCode_t::RETCODE_OK;
}
// Accessors
Topic *topic() const { return topic_; }
TopicDescription *get_topicdescription() const; // PSM: may be CFT
Subscriber *subscriber() const { return subscriber_; }
const rtps::GUID_t &guid() const { return guid_; }
void set_guid(const rtps::GUID_t &guid) { guid_ = guid; }
void set_qos(const DataReaderQos &qos);
const DataReaderQos &get_qos() const { return qos_; }
size_t unread_count() const;
// Reliability support
void add_matched_writer(const rtps::GUID_t &writer_guid);
void remove_matched_writer(const rtps::GUID_t &writer_guid);
void process_heartbeat(const rtps::GUID_t &writer_guid,
const rtps::SequenceNumber_t &first_sn,
const rtps::SequenceNumber_t &last_sn,
rtps::Count_t count,
bool final);
// Data received callback
using DataCallback = std::function<void(const ReceivedSample &)>;
void set_data_callback(DataCallback callback) { data_callback_ = callback; }
// Listener support (DDS §2.1.4.3 — DataReaderListener dispatch)
void set_listener(DataReaderListener* listener) { listener_ = listener; }
DataReaderListener* get_listener() const { return listener_; }
// WaitSet notification — external condition variables to signal on data arrival
using NotifyCallback = std::function<void()>;
void add_notify_callback(NotifyCallback cb)
{
std::lock_guard<std::mutex> lock(mutex_);
notify_callbacks_.push_back(std::move(cb));
}
// Topic description override (used when created via ContentFilteredTopic)
void set_topicdescription(TopicDescription *td) { topicdescription_ = td; }
// Deadline monitoring
void start_deadline_timer();
void stop_deadline_timer();
void notify_receive(); // Reset deadline timer on each receive
protected:
TopicDescription *topicdescription_{nullptr}; // non-null when created via CFT
private:
void send_acknack(const rtps::GUID_t &writer_guid);
bool check_resource_limits() const;
void apply_history_and_limits();
void deadline_thread_func();
Topic *topic_;
Subscriber *subscriber_;
rtps::GUID_t guid_;
std::deque<ReceivedSample> history_cache_;
DataReaderQos qos_;
mutable std::mutex mutex_;
DataCallback data_callback_;
DataReaderListener* listener_{nullptr};
std::vector<NotifyCallback> notify_callbacks_;
// Matched writer tracking (for process_heartbeat and send_acknack)
std::set<rtps::GUID_t> matched_writers_;
// ACKNACK callback — invoked by send_acknack(); set by DomainParticipant
// to deliver an ACKNACK via the transport for the given writer.
using AcknackCallback = std::function<void(const rtps::GUID_t &)>;
AcknackCallback acknack_callback_;
public:
void set_acknack_callback(AcknackCallback cb) { acknack_callback_ = std::move(cb); }
private:
// Time-based filter: track last accepted time per writer
std::map<rtps::GUID_t, std::chrono::steady_clock::time_point> last_accepted_time_;
// Ownership: per-instance tracking of current owner (EXCLUSIVE)
struct InstanceOwnership {
rtps::GUID_t ownerGuid{};
int32_t ownerStrength{0};
};
std::map<int32_t, InstanceOwnership> instance_owners_; // key = instance_handle hash
// Instance registry: maps instance_handle → serialized key bytes (DDS §2.2.2.5.3)
std::map<InstanceHandle_t, std::vector<uint8_t>> instance_key_data_;
// Deadline monitoring
std::thread deadline_thread_;
std::condition_variable deadline_cv_;
std::atomic<bool> deadline_running_{false};
std::chrono::steady_clock::time_point last_receive_time_;
};
class Publisher
{
public:
Publisher(DomainParticipant *participant);
~Publisher() = default;
Publisher(const Publisher &) = delete;
Publisher &operator=(const Publisher &) = delete;
DataWriter *create_datawriter(Topic *topic, const DataWriterQos &qos = DataWriterQos{});
DataWriter *create_datawriter(Topic *topic, const DataWriterQos &qos,
DataWriterListener *listener,
StatusMask mask = STATUS_MASK_ALL)
{
DataWriter *writer = create_datawriter(topic, qos);
if (writer && listener)
{
writer->set_listener(listener, mask);
}
return writer;
}
DataWriterQos default_datawriter_qos() const { return DataWriterQos{}; }
ReturnCode_t get_default_datawriter_qos(DataWriterQos &qos) const
{
qos = DataWriterQos{};
return ReturnCode_t::RETCODE_OK;
}
bool delete_datawriter(DataWriter *writer);
ReturnCode_t begin_coherent_changes()
{
coherent_active_ = true;
return ReturnCode_t::RETCODE_OK;
}
ReturnCode_t end_coherent_changes();
bool is_coherent_active() const { return coherent_active_; }
void buffer_coherent_write(DataWriter *writer,
const std::vector<uint8_t> &data);
void buffer_coherent_write_dual(DataWriter *writer,
const std::vector<uint8_t> &xcdr1,
const std::vector<uint8_t> &xcdr2);
DomainParticipant *participant() const { return participant_; }
void set_qos(const PublisherQos &qos) { qos_ = qos; }
const PublisherQos &get_qos() const { return qos_; }
const std::vector<std::unique_ptr<DataWriter>> &writers() const { return writers_; }
std::vector<DataWriter *> get_datawriters() const
{
std::vector<DataWriter *> result;
result.reserve(writers_.size());
for (const auto &w : writers_)
{
result.push_back(w.get());
}
return result;
}
private:
DomainParticipant *participant_;
PublisherQos qos_;
std::vector<std::unique_ptr<DataWriter>> writers_;
uint32_t next_writer_id_{1};
// Coherent changes buffering
bool coherent_active_{false};
struct CoherentEntry {
DataWriter *writer;
std::vector<uint8_t> xcdr1;
std::vector<uint8_t> xcdr2;
bool dual;
};
std::vector<CoherentEntry> coherent_buffer_;
};
class Subscriber
{
public:
Subscriber(DomainParticipant *participant);
~Subscriber() = default;
Subscriber(const Subscriber &) = delete;
Subscriber &operator=(const Subscriber &) = delete;
DataReader *create_datareader(Topic *topic, const DataReaderQos &qos = DataReaderQos{});
DataReader *create_datareader(TopicDescription *td, const DataReaderQos &qos,
DataReaderListener *listener = nullptr,
StatusMask mask = STATUS_MASK_ALL);
DataReaderQos default_datareader_qos() const { return DataReaderQos{}; }
ReturnCode_t get_default_datareader_qos(DataReaderQos &qos) const
{
qos = DataReaderQos{};
return ReturnCode_t::RETCODE_OK;
}
bool delete_datareader(DataReader *reader);
ReturnCode_t begin_access()
{
access_active_ = true;
return ReturnCode_t::RETCODE_OK;
}
ReturnCode_t end_access()
{
access_active_ = false;
return ReturnCode_t::RETCODE_OK;
}
bool is_access_active() const { return access_active_; }
DomainParticipant *participant() const { return participant_; }
void set_qos(const SubscriberQos &qos) { qos_ = qos; }
const SubscriberQos &get_qos() const { return qos_; }
const std::vector<std::unique_ptr<DataReader>> &readers() const { return readers_; }
std::vector<DataReader *> get_datareaders() const
{
std::vector<DataReader *> result;
result.reserve(readers_.size());
for (const auto &r : readers_)
{
result.push_back(r.get());
}
return result;
}
private:
DomainParticipant *participant_;
SubscriberQos qos_;
std::vector<std::unique_ptr<DataReader>> readers_;
uint32_t next_reader_id_{1};
bool access_active_{false};
};
class DomainParticipant
{
public:
DomainParticipant(uint32_t domain_id, uint32_t participant_id);
~DomainParticipant();
DomainParticipant(const DomainParticipant &) = delete;
DomainParticipant &operator=(const DomainParticipant &) = delete;
bool enable();
void stop();
// ── Topic creation ──────────────────────────────────────────────
Topic *create_topic(const std::string &topic_name,
const std::string &type_name,
const TopicQos &qos = TopicQos{});
Topic *create_topic(const char *topic_name,
const char *type_name,
const TopicQos &qos,
TopicListener *listener,
StatusMask mask = STATUS_MASK_ALL)
{
(void)listener;
(void)mask;
return create_topic(std::string(topic_name), std::string(type_name), qos);
}
bool delete_topic(Topic *topic);
// ── ContentFilteredTopic ────────────────────────────────────────
ContentFilteredTopic *create_contentfilteredtopic(
const char *name,
Topic *related_topic,
const char *filter_expression,
const StringSeq &expression_parameters);
bool delete_contentfilteredtopic(ContentFilteredTopic *cft);
// ── Publisher / Subscriber creation ─────────────────────────────
Publisher *create_publisher(const PublisherQos &qos = PublisherQos{});
Publisher *create_publisher(const PublisherQos &qos,
PublisherListener *listener,
StatusMask mask = STATUS_MASK_ALL)
{
(void)listener;
(void)mask;
return create_publisher(qos);
}
bool delete_publisher(Publisher *publisher);
Subscriber *create_subscriber(const SubscriberQos &qos = SubscriberQos{});
Subscriber *create_subscriber(const SubscriberQos &qos,
SubscriberListener *listener,
StatusMask mask = STATUS_MASK_ALL)
{
(void)listener;
(void)mask;
return create_subscriber(qos);
}
bool delete_subscriber(Subscriber *subscriber);
// ── Default QoS out-param accessors (PSM) ──────────────────────
ReturnCode_t get_default_topic_qos(TopicQos &qos) const
{
qos = TopicQos{};
return ReturnCode_t::RETCODE_OK;
}
ReturnCode_t get_default_publisher_qos(PublisherQos &qos) const
{
qos = PublisherQos{};
return ReturnCode_t::RETCODE_OK;
}
ReturnCode_t get_default_subscriber_qos(SubscriberQos &qos) const
{
qos = SubscriberQos{};
return ReturnCode_t::RETCODE_OK;
}
// ── Cleanup ────────────────────────────────────────────────────
ReturnCode_t delete_contained_entities();
// ── Type Support Factory ───────────────────────────────────────
using DataWriterFactory = std::function<DataWriter *(Topic *, Publisher *)>;
using DataReaderFactory = std::function<DataReader *(Topic *, Subscriber *)>;
void register_type_factory(const std::string &type_name,
DataWriterFactory wf, DataReaderFactory rf);
DataWriterFactory get_writer_factory(const std::string &type_name) const;
DataReaderFactory get_reader_factory(const std::string &type_name) const;
// ── Accessors ──────────────────────────────────────────────────
uint32_t domain_id() const { return domain_id_; }
uint32_t participant_id() const { return participant_id_; }
const rtps::GuidPrefix_t &guid_prefix() const { return guid_prefix_; }
void set_qos(const DomainParticipantQos &qos) { qos_ = qos; }
const DomainParticipantQos &get_qos() const { return qos_; }
bool is_enabled() const { return running_.load(); }
// Listener management
void set_listener(DomainParticipantListener *listener, StatusMask mask = STATUS_MASK_ALL)
{
listener_ = listener;
listener_mask_ = mask;
}
DomainParticipantListener *get_listener() const { return listener_; }
StatusMask get_listener_mask() const { return listener_mask_; }
void send_data(const std::string &topic_name, const std::vector<uint8_t> &data);
void send_data(const std::string &topic_name,
const std::vector<uint8_t> &xcdr1Data,
const std::vector<uint8_t> &xcdr2Data);
using SendDataCallback = std::function<void(const std::string &topic_name, const std::vector<uint8_t> &data)>;
void set_send_data_callback(SendDataCallback callback) { send_data_callback_ = callback; }
using SubscribeCallback = std::function<void(const std::string &topic_name)>;
void set_subscribe_callback(SubscribeCallback callback) { subscribe_callback_ = callback; }
void register_writer(const std::string &topic_name,
const std::string &type_name,
const std::vector<std::string> &partitions = {},
const DataWriterQos &qos = {});
void register_reader(const std::string &topic_name,
const std::string &type_name,
const std::vector<std::string> &partitions = {},
const DataReaderQos &qos = {});
void notify_subscription(const std::string &topic_name);
void deliver_data(const std::string &topic_name, const std::vector<uint8_t> &data,
const rtps::GUID_t &writer_guid = rtps::GUID_t{},
int32_t ownershipStrength = 0,
const rtps::SequenceNumber_t &sequenceNumber = rtps::SequenceNumber_t{});
void replay_history_to_reader(DataWriter *writer);
using DataReceivedCallback = std::function<void(const std::string &topic, const ReceivedSample &)>;
void set_data_received_callback(DataReceivedCallback callback) { data_received_callback_ = callback; }
void match_reader_to_discovered_writers(DataReader *reader, const std::vector<std::string> &subPartitions);
void match_writer_to_discovered_readers(DataWriter *writer, const std::vector<std::string> &pubPartitions);
private:
void discovery_loop();
void generate_guid_prefix();
rtps::GUID_t create_writer_guid(uint32_t writer_id);
rtps::GUID_t create_reader_guid(uint32_t reader_id);
uint32_t domain_id_;
uint32_t participant_id_;
rtps::GuidPrefix_t guid_prefix_;
DomainParticipantQos qos_;
std::atomic<bool> running_{false};
std::thread discovery_thread_;
mutable std::mutex mutex_;
std::vector<std::unique_ptr<Topic>> topics_;
std::vector<std::unique_ptr<Publisher>> publishers_;
std::vector<std::unique_ptr<Subscriber>> subscribers_;
std::vector<std::unique_ptr<ContentFilteredTopic>> cfts_;
SendDataCallback send_data_callback_;
SubscribeCallback subscribe_callback_;
DataReceivedCallback data_received_callback_;
// Listener
DomainParticipantListener *listener_{nullptr};
StatusMask listener_mask_{0};
// Type support factories (populated by register_type_factory)
std::map<std::string, DataWriterFactory> writer_factories_;
std::map<std::string, DataReaderFactory> reader_factories_;
// Built-in RTPS transport
std::unique_ptr<rtps::RtpsUdpTransport> transport_;
// Tracking sets for SEDP callback deduplication and data delivery gating.
// matchedRemoteWriters_ contains GUIDs of remote writers whose QoS is
// compatible with at least one local reader. deliver_data() only delivers
// samples from writers in this set.
std::set<rtps::GUID_t> matchedRemoteWriters_;
std::set<rtps::GUID_t> incompatRemoteWriters_;
std::set<rtps::GUID_t> matchedRemoteReaders_;
std::set<rtps::GUID_t> incompatRemoteReaders_;
};
class DomainParticipantFactory
{
public:
static DomainParticipantFactory &instance();
static DomainParticipantFactory *get_instance()
{
return &instance();
}
DomainParticipant *create_participant(uint32_t domain_id,
const DomainParticipantQos &qos = DomainParticipantQos{});
DomainParticipant *create_participant(DomainId_t domain_id,
const DomainParticipantQos &qos,
DomainParticipantListener *listener,
StatusMask mask = STATUS_MASK_ALL)
{
auto *dp = create_participant(static_cast<uint32_t>(domain_id), qos);
if (dp && listener)
{
dp->set_listener(listener, mask);
}
return dp;
}
ReturnCode_t get_default_participant_qos(DomainParticipantQos &qos) const
{
qos = DomainParticipantQos{};
return ReturnCode_t::RETCODE_OK;
}
bool delete_participant(DomainParticipant *participant);
private:
DomainParticipantFactory() = default;
std::mutex mutex_;
std::vector<std::unique_ptr<DomainParticipant>> participants_;
uint32_t next_participant_id_{0};
};
} // namespace astutedds::dcps
// Include Cyclone-style namespace compatibility layer after class definitions
#include <astutedds/dcps/CorePolicy.hpp>
// Standard DDS namespace alias for compatibility
namespace DDS = astutedds::dcps;
#endif // ASTUTEDDS_DCPS_DCPS_HPP