File writer_history_cache.hpp

File List > astutedds > rtps > writer_history_cache.hpp

Go to the documentation of this file

//
// Copyright (c) 2026, Astute Systems PTY LTD
//
// This file is part of the Astute DDS developed by Astute Systems.
//
// See the commercial LICENSE file in the project root for full license details.
//
// @file writer_history_cache.hpp
// @brief RTPS Writer History Cache for reliable data delivery
//
// Implements the writer-side sample cache required for retransmission
// as defined in DDSI-RTPS 2.5 Section 8.4.7.
//

#ifndef ASTUTEDDS_RTPS_WRITER_HISTORY_CACHE_HPP
#define ASTUTEDDS_RTPS_WRITER_HISTORY_CACHE_HPP

#include <astutedds/rtps/rtps_types.hpp>

#include <chrono>
#include <map>
#include <mutex>
#include <optional>
#include <set>
#include <vector>

namespace astutedds::rtps
{

struct CacheChange
{
    SequenceNumber_t sequenceNumber;
    std::vector<uint8_t> serializedData;
    std::chrono::steady_clock::time_point timestamp;
    bool acknowledged{false};

    // Instance key (for keyed data)
    std::vector<uint8_t> instanceKey;
};

class WriterHistoryCache
{
public:
    enum class HistoryKind
    {
        KEEP_LAST,
        KEEP_ALL
    };

    explicit WriterHistoryCache(const GUID_t& writer_guid, HistoryKind history_kind = HistoryKind::KEEP_LAST,
                                size_t history_depth = 10);

    SequenceNumber_t add_sample(const std::vector<uint8_t>& data, const std::vector<uint8_t>& instance_key = {});

    std::optional<CacheChange> get_sample(const SequenceNumber_t& seq_num) const;

    std::vector<CacheChange> get_samples_in_range(const SequenceNumber_t& first, const SequenceNumber_t& last) const;

    void mark_acknowledged(const SequenceNumber_t& seq_num);

    void mark_acknowledged_up_to(const SequenceNumber_t& up_to_seq_num);

    SequenceNumber_t get_first_seq_num() const;

    SequenceNumber_t get_last_seq_num() const;

    std::vector<CacheChange> get_unacknowledged() const;

    bool empty() const;

    size_t size() const;

    void clear();

    void remove_older_than(std::chrono::milliseconds max_age);

private:
    void enforce_history_depth();
    uint64_t seq_to_key(const SequenceNumber_t& seq) const;

    GUID_t writer_guid_;
    HistoryKind history_kind_;
    size_t history_depth_;

    SequenceNumber_t next_seq_num_{0, 1};  // Start at 1
    std::map<uint64_t, CacheChange> samples_;
    mutable std::mutex mutex_;
};

class ReaderHistoryCache
{
public:
    explicit ReaderHistoryCache(const GUID_t& reader_guid);

    void record_received(const GUID_t& writer_guid, const SequenceNumber_t& seq_num, const std::vector<uint8_t>& data);

    std::set<uint64_t> get_missing(const GUID_t& writer_guid, const SequenceNumber_t& first_available,
                                   const SequenceNumber_t& last_available) const;

    SequenceNumber_t get_highest_received(const GUID_t& writer_guid) const;

    void clear_writer(const GUID_t& writer_guid);

private:
    struct WriterState
    {
        std::set<uint64_t> received_seq_nums;
        uint64_t highest_consecutive{0};
    };

    GUID_t reader_guid_;
    std::map<GUID_t, WriterState> writer_states_;
    mutable std::mutex mutex_;
};

}  // namespace astutedds::rtps

#endif  // ASTUTEDDS_RTPS_WRITER_HISTORY_CACHE_HPP