File writer_history_cache.hpp
File List > astutedds > rtps > writer_history_cache.hpp
Go to the documentation of this file
//
// Copyright (c) 2026, Astute Systems PTY LTD
//
// This file is part of the Astute DDS developed by Astute Systems.
//
// See the commercial LICENSE file in the project root for full license details.
//
// @file writer_history_cache.hpp
// @brief RTPS Writer History Cache for reliable data delivery
//
// Implements the writer-side sample cache required for retransmission
// as defined in DDSI-RTPS 2.5 Section 8.4.7.
//
#ifndef ASTUTEDDS_RTPS_WRITER_HISTORY_CACHE_HPP
#define ASTUTEDDS_RTPS_WRITER_HISTORY_CACHE_HPP
#include <astutedds/rtps/rtps_types.hpp>
#include <chrono>
#include <map>
#include <mutex>
#include <optional>
#include <set>
#include <vector>
namespace astutedds::rtps
{
struct CacheChange
{
SequenceNumber_t sequenceNumber;
std::vector<uint8_t> serializedData;
std::chrono::steady_clock::time_point timestamp;
bool acknowledged{false};
// Instance key (for keyed data)
std::vector<uint8_t> instanceKey;
};
class WriterHistoryCache
{
public:
enum class HistoryKind
{
KEEP_LAST,
KEEP_ALL
};
explicit WriterHistoryCache(const GUID_t& writer_guid, HistoryKind history_kind = HistoryKind::KEEP_LAST,
size_t history_depth = 10);
SequenceNumber_t add_sample(const std::vector<uint8_t>& data, const std::vector<uint8_t>& instance_key = {});
std::optional<CacheChange> get_sample(const SequenceNumber_t& seq_num) const;
std::vector<CacheChange> get_samples_in_range(const SequenceNumber_t& first, const SequenceNumber_t& last) const;
void mark_acknowledged(const SequenceNumber_t& seq_num);
void mark_acknowledged_up_to(const SequenceNumber_t& up_to_seq_num);
SequenceNumber_t get_first_seq_num() const;
SequenceNumber_t get_last_seq_num() const;
std::vector<CacheChange> get_unacknowledged() const;
bool empty() const;
size_t size() const;
void clear();
void remove_older_than(std::chrono::milliseconds max_age);
private:
void enforce_history_depth();
uint64_t seq_to_key(const SequenceNumber_t& seq) const;
GUID_t writer_guid_;
HistoryKind history_kind_;
size_t history_depth_;
SequenceNumber_t next_seq_num_{0, 1}; // Start at 1
std::map<uint64_t, CacheChange> samples_;
mutable std::mutex mutex_;
};
class ReaderHistoryCache
{
public:
explicit ReaderHistoryCache(const GUID_t& reader_guid);
void record_received(const GUID_t& writer_guid, const SequenceNumber_t& seq_num, const std::vector<uint8_t>& data);
std::set<uint64_t> get_missing(const GUID_t& writer_guid, const SequenceNumber_t& first_available,
const SequenceNumber_t& last_available) const;
SequenceNumber_t get_highest_received(const GUID_t& writer_guid) const;
void clear_writer(const GUID_t& writer_guid);
private:
struct WriterState
{
std::set<uint64_t> received_seq_nums;
uint64_t highest_consecutive{0};
};
GUID_t reader_guid_;
std::map<GUID_t, WriterState> writer_states_;
mutable std::mutex mutex_;
};
} // namespace astutedds::rtps
#endif // ASTUTEDDS_RTPS_WRITER_HISTORY_CACHE_HPP