File persistence_service.hpp
File List > astutedds > dcps > persistence_service.hpp
Go to the documentation of this file
//
// Copyright (c) 2026, Astute Systems PTY LTD
//
// This file is part of the Astute DDS developed by Astute Systems.
//
// See the commercial LICENSE file in the project root for full license details.
//
// @file persistence_service.hpp
// @brief DDS Persistence Service for TRANSIENT and PERSISTENT durability
//
// Implements the OMG DDS Persistence Service as specified in DDS 1.4 §2.2.3.4.
// The Persistence Service provides historical data delivery for DataReaders that
// have TRANSIENT or PERSISTENT durability QoS, enabling late-joining readers to
// receive data written before they were created.
//
// - TRANSIENT durability: samples kept in memory; survive DataWriter deletion
// but not Persistence Service restart.
// - PERSISTENT durability: samples written to disk; survive Persistence Service
// restart, delivering data even after service and writer restart.
//
// Reference: OMG DDS 1.4, §2.2.3.4 (DurabilityQosPolicy)
// OMG DDS 1.4, §2.2.3.20 (DurabilityServiceQosPolicy)
//
#ifndef ASTUTEDDS_DCPS_PERSISTENCE_SERVICE_HPP
#define ASTUTEDDS_DCPS_PERSISTENCE_SERVICE_HPP
#include <astutedds/dcps/qos.hpp>
#include <astutedds/rtps/rtps_types.hpp>
#include <deque>
#include <filesystem>
#include <map>
#include <mutex>
#include <string>
#include <vector>
namespace astutedds::dcps
{
// Forward declaration (CacheChange is defined in dcps.hpp)
struct CacheChange;
struct PersistedSample
{
rtps::SequenceNumber_t sequence_number{};
std::vector<uint8_t> serialized_data;
rtps::Time_t source_timestamp{};
rtps::ChangeKind_t kind{rtps::ChangeKind_t::ALIVE};
};
class PersistenceService
{
public:
// ── Singleton access ───────────────────────────────────────────────
static PersistenceService &instance();
// Non-copyable, non-movable singleton
PersistenceService(const PersistenceService &) = delete;
PersistenceService &operator=(const PersistenceService &) = delete;
PersistenceService(PersistenceService &&) = delete;
PersistenceService &operator=(PersistenceService &&) = delete;
// ── Configuration ──────────────────────────────────────────────────
void set_persistence_dir(const std::string &dir);
const std::string &persistence_dir() const { return persistence_dir_; }
// ── Write path ─────────────────────────────────────────────────────
void store_sample(const std::string &topic_name,
DurabilityQosPolicyKind durability,
const DurabilityServiceQosPolicy &service_qos,
const CacheChange &change);
// ── Read path ──────────────────────────────────────────────────────
std::vector<PersistedSample> get_samples(const std::string &topic_name);
// ── Lifecycle ──────────────────────────────────────────────────────
void notify_writer_deleted(const std::string &topic_name);
void remove_topic(const std::string &topic_name);
void purge_persistent_store(const std::string &topic_name);
bool has_samples(const std::string &topic_name) const;
std::size_t sample_count(const std::string &topic_name) const;
private:
// Private constructor — use instance()
PersistenceService();
~PersistenceService() = default;
// ── Per-topic store ────────────────────────────────────────────────
struct TopicStore
{
DurabilityQosPolicyKind durability{DurabilityQosPolicyKind::TRANSIENT_DURABILITY_QOS};
DurabilityServiceQosPolicy service_qos{};
std::deque<PersistedSample> samples;
bool loaded_from_disk{false}; // For PERSISTENT: true after load_from_disk()
};
// ── Helpers ────────────────────────────────────────────────────────
void enforce_history_limits(TopicStore &store);
void save_to_disk(const std::string &topic_name, const TopicStore &store);
void load_from_disk(const std::string &topic_name, TopicStore &store);
std::filesystem::path store_path(const std::string &topic_name) const;
// ── State ──────────────────────────────────────────────────────────
mutable std::mutex mutex_;
std::map<std::string, TopicStore> stores_;
std::string persistence_dir_;
};
} // namespace astutedds::dcps
#endif // ASTUTEDDS_DCPS_PERSISTENCE_SERVICE_HPP