File reliability_manager.hpp

File List > astutedds > rtps > reliability_manager.hpp

Go to the documentation of this file

//
// Copyright (c) 2026, Astute Systems PTY LTD
//
// This file is part of the Astute DDS developed by Astute Systems.
//
// See the commercial LICENSE file in the project root for full license details.
//
// @file reliability_manager.hpp
// @brief RTPS Reliability Protocol Manager
//
// Implements the HEARTBEAT/ACKNACK reliability protocol from DDSI-RTPS 2.5.
// Reference: https://www.omg.org/spec/DDSI-RTPS/2.5/PDF Section 8.4
//

#ifndef ASTUTEDDS_RTPS_RELIABILITY_MANAGER_HPP
#define ASTUTEDDS_RTPS_RELIABILITY_MANAGER_HPP

#include <astutedds/rtps/rtps_types.hpp>

#include <chrono>
#include <map>
#include <mutex>
#include <set>
#include <vector>

namespace astutedds::rtps
{

struct ReaderProxy
{
    GUID_t readerGuid;
    SequenceNumber_t lastAcknowledgedSN{0, 0};
    std::set<uint32_t> requestedChanges;  // Sequence numbers requested for repair
    std::chrono::steady_clock::time_point lastHeartbeatTime;
    Count_t heartbeatCount{0};
};

struct WriterProxy
{
    GUID_t writerGuid;
    SequenceNumber_t availableSeqNumMax{0, 0};  // Highest sequence number available
    std::set<uint32_t> receivedChanges;         // Sequence numbers received
    std::chrono::steady_clock::time_point lastHeartbeatTime;
    Count_t lastHeartbeatCount{0};
};

class ReliabilityManagerWriter
{
public:
    explicit ReliabilityManagerWriter(const GUID_t& writer_guid,
                                      std::chrono::milliseconds heartbeat_period = std::chrono::milliseconds(200));

    void add_reader(const GUID_t& reader_guid);

    void remove_reader(const GUID_t& reader_guid);

    std::set<uint32_t> process_acknack(const GUID_t& reader_guid, const std::set<uint32_t>& reader_sn_set,
                                       Count_t count);

    bool should_send_heartbeat(const SequenceNumber_t& first_sn, const SequenceNumber_t& last_sn);

    std::vector<uint8_t> create_heartbeat(const EntityId_t& reader_id, const SequenceNumber_t& first_sn,
                                          const SequenceNumber_t& last_sn, bool final = false);

    std::set<uint32_t> get_unacknowledged(const SequenceNumber_t& up_to_sn) const;

private:
    GUID_t writer_guid_;
    std::chrono::milliseconds heartbeat_period_;
    std::chrono::steady_clock::time_point last_heartbeat_time_;
    Count_t heartbeat_count_{0};

    std::map<GUID_t, ReaderProxy> reader_proxies_;
    mutable std::mutex mutex_;
};

class ReliabilityManagerReader
{
public:
    explicit ReliabilityManagerReader(const GUID_t& reader_guid);

    void add_writer(const GUID_t& writer_guid);

    void remove_writer(const GUID_t& writer_guid);

    void mark_received(const GUID_t& writer_guid, const SequenceNumber_t& sequence_number);

    bool process_heartbeat(const GUID_t& writer_guid, const SequenceNumber_t& first_sn, const SequenceNumber_t& last_sn,
                           Count_t count, bool final);

    std::vector<uint8_t> create_acknack(const GUID_t& writer_guid);

    std::set<uint32_t> get_missing(const GUID_t& writer_guid) const;

private:
    GUID_t reader_guid_;
    std::map<GUID_t, WriterProxy> writer_proxies_;
    mutable std::mutex mutex_;
    Count_t acknack_count_{0};
};

}  // namespace astutedds::rtps

#endif  // ASTUTEDDS_RTPS_RELIABILITY_MANAGER_HPP