File reliability_manager.hpp
File List > astutedds > rtps > reliability_manager.hpp
Go to the documentation of this file
//
// Copyright (c) 2026, Astute Systems PTY LTD
//
// This file is part of the Astute DDS developed by Astute Systems.
//
// See the commercial LICENSE file in the project root for full license details.
//
// @file reliability_manager.hpp
// @brief RTPS Reliability Protocol Manager
//
// Implements the HEARTBEAT/ACKNACK reliability protocol from DDSI-RTPS 2.5.
// Reference: https://www.omg.org/spec/DDSI-RTPS/2.5/PDF Section 8.4
//
#ifndef ASTUTEDDS_RTPS_RELIABILITY_MANAGER_HPP
#define ASTUTEDDS_RTPS_RELIABILITY_MANAGER_HPP
#include <astutedds/rtps/rtps_types.hpp>
#include <chrono>
#include <map>
#include <mutex>
#include <set>
#include <vector>
namespace astutedds::rtps
{
struct ReaderProxy
{
GUID_t readerGuid;
SequenceNumber_t lastAcknowledgedSN{0, 0};
std::set<uint32_t> requestedChanges; // Sequence numbers requested for repair
std::chrono::steady_clock::time_point lastHeartbeatTime;
Count_t heartbeatCount{0};
};
struct WriterProxy
{
GUID_t writerGuid;
SequenceNumber_t availableSeqNumMax{0, 0}; // Highest sequence number available
std::set<uint32_t> receivedChanges; // Sequence numbers received
std::chrono::steady_clock::time_point lastHeartbeatTime;
Count_t lastHeartbeatCount{0};
};
class ReliabilityManagerWriter
{
public:
explicit ReliabilityManagerWriter(const GUID_t& writer_guid,
std::chrono::milliseconds heartbeat_period = std::chrono::milliseconds(200));
void add_reader(const GUID_t& reader_guid);
void remove_reader(const GUID_t& reader_guid);
std::set<uint32_t> process_acknack(const GUID_t& reader_guid, const std::set<uint32_t>& reader_sn_set,
Count_t count);
bool should_send_heartbeat(const SequenceNumber_t& first_sn, const SequenceNumber_t& last_sn);
std::vector<uint8_t> create_heartbeat(const EntityId_t& reader_id, const SequenceNumber_t& first_sn,
const SequenceNumber_t& last_sn, bool final = false);
std::set<uint32_t> get_unacknowledged(const SequenceNumber_t& up_to_sn) const;
private:
GUID_t writer_guid_;
std::chrono::milliseconds heartbeat_period_;
std::chrono::steady_clock::time_point last_heartbeat_time_;
Count_t heartbeat_count_{0};
std::map<GUID_t, ReaderProxy> reader_proxies_;
mutable std::mutex mutex_;
};
class ReliabilityManagerReader
{
public:
explicit ReliabilityManagerReader(const GUID_t& reader_guid);
void add_writer(const GUID_t& writer_guid);
void remove_writer(const GUID_t& writer_guid);
void mark_received(const GUID_t& writer_guid, const SequenceNumber_t& sequence_number);
bool process_heartbeat(const GUID_t& writer_guid, const SequenceNumber_t& first_sn, const SequenceNumber_t& last_sn,
Count_t count, bool final);
std::vector<uint8_t> create_acknack(const GUID_t& writer_guid);
std::set<uint32_t> get_missing(const GUID_t& writer_guid) const;
private:
GUID_t reader_guid_;
std::map<GUID_t, WriterProxy> writer_proxies_;
mutable std::mutex mutex_;
Count_t acknack_count_{0};
};
} // namespace astutedds::rtps
#endif // ASTUTEDDS_RTPS_RELIABILITY_MANAGER_HPP