File persistence_service.hpp

File List > astutedds > dcps > persistence_service.hpp

Go to the documentation of this file

//
// Copyright (c) 2026, Astute Systems PTY LTD
//
// This file is part of the Astute DDS developed by Astute Systems.
//
// See the commercial LICENSE file in the project root for full license details.
//
// @file persistence_service.hpp
// @brief DDS Persistence Service for TRANSIENT and PERSISTENT durability
//
// Implements the OMG DDS Persistence Service as specified in DDS 1.4 §2.2.3.4.
// The Persistence Service provides historical data delivery for DataReaders that
// have TRANSIENT or PERSISTENT durability QoS, enabling late-joining readers to
// receive data written before they were created.
//
// - TRANSIENT durability: samples kept in memory; survive DataWriter deletion
//   but not Persistence Service restart.
// - PERSISTENT durability: samples written to disk; survive Persistence Service
//   restart, delivering data even after service and writer restart.
//
// Reference: OMG DDS 1.4, §2.2.3.4 (DurabilityQosPolicy)
//            OMG DDS 1.4, §2.2.3.20 (DurabilityServiceQosPolicy)
//

#ifndef ASTUTEDDS_DCPS_PERSISTENCE_SERVICE_HPP
#define ASTUTEDDS_DCPS_PERSISTENCE_SERVICE_HPP

#include <astutedds/dcps/qos.hpp>
#include <astutedds/rtps/rtps_types.hpp>

#include <deque>
#include <filesystem>
#include <map>
#include <mutex>
#include <string>
#include <vector>

namespace astutedds::dcps
{

    // Forward declaration (CacheChange is defined in dcps.hpp)
    struct CacheChange;

    struct PersistedSample
    {
        rtps::SequenceNumber_t sequence_number{};
        std::vector<uint8_t> serialized_data;
        rtps::Time_t source_timestamp{};
        rtps::ChangeKind_t kind{rtps::ChangeKind_t::ALIVE};
    };

    class PersistenceService
    {
    public:
        // ── Singleton access ───────────────────────────────────────────────

        static PersistenceService &instance();

        // Non-copyable, non-movable singleton
        PersistenceService(const PersistenceService &) = delete;
        PersistenceService &operator=(const PersistenceService &) = delete;
        PersistenceService(PersistenceService &&) = delete;
        PersistenceService &operator=(PersistenceService &&) = delete;

        // ── Configuration ──────────────────────────────────────────────────

        void set_persistence_dir(const std::string &dir);

        const std::string &persistence_dir() const { return persistence_dir_; }

        // ── Write path ─────────────────────────────────────────────────────

        void store_sample(const std::string &topic_name,
                          DurabilityQosPolicyKind durability,
                          const DurabilityServiceQosPolicy &service_qos,
                          const CacheChange &change);

        // ── Read path ──────────────────────────────────────────────────────

        std::vector<PersistedSample> get_samples(const std::string &topic_name);

        // ── Lifecycle ──────────────────────────────────────────────────────

        void notify_writer_deleted(const std::string &topic_name);

        void remove_topic(const std::string &topic_name);

        void purge_persistent_store(const std::string &topic_name);

        bool has_samples(const std::string &topic_name) const;

        std::size_t sample_count(const std::string &topic_name) const;

    private:
        // Private constructor — use instance()
        PersistenceService();
        ~PersistenceService() = default;

        // ── Per-topic store ────────────────────────────────────────────────

        struct TopicStore
        {
            DurabilityQosPolicyKind durability{DurabilityQosPolicyKind::TRANSIENT_DURABILITY_QOS};
            DurabilityServiceQosPolicy service_qos{};
            std::deque<PersistedSample> samples;
            bool loaded_from_disk{false}; // For PERSISTENT: true after load_from_disk()
        };

        // ── Helpers ────────────────────────────────────────────────────────

        void enforce_history_limits(TopicStore &store);

        void save_to_disk(const std::string &topic_name, const TopicStore &store);

        void load_from_disk(const std::string &topic_name, TopicStore &store);

        std::filesystem::path store_path(const std::string &topic_name) const;

        // ── State ──────────────────────────────────────────────────────────

        mutable std::mutex mutex_;
        std::map<std::string, TopicStore> stores_;
        std::string persistence_dir_;
    };

} // namespace astutedds::dcps

#endif // ASTUTEDDS_DCPS_PERSISTENCE_SERVICE_HPP