File liveliness_manager.hpp

File List > astutedds > dcps > liveliness_manager.hpp

Go to the documentation of this file

//
// Copyright (c) 2026, Astute Systems PTY LTD
//
// This file is part of the Astute DDS developed by Astute Systems.
//
// See the commercial LICENSE file in the project root for full license details.
//
// @file liveliness_manager.hpp
// @brief DDS Liveliness QoS Policy Manager
//
// Implements AUTOMATIC, MANUAL_BY_PARTICIPANT, and MANUAL_BY_TOPIC liveliness policies.
// Reference: DDS DCPS specification 2.2.3.10
//

#ifndef ASTUTEDDS_DCPS_LIVELINESS_MANAGER_HPP
#define ASTUTEDDS_DCPS_LIVELINESS_MANAGER_HPP

#include <astutedds/dcps/qos.hpp>
#include <astutedds/rtps/rtps_types.hpp>

#include <atomic>
#include <chrono>
#include <condition_variable>
#include <functional>
#include <map>
#include <memory>
#include <mutex>
#include <string>
#include <thread>
#include <vector>

namespace astutedds::dcps
{

// Forward declarations
class DataWriter;
class DataReader;
class DomainParticipant;

struct LivelinessStatus
{
    bool alive{true};
    std::chrono::steady_clock::time_point last_asserted;
    LivelinessQosPolicyKind kind{LivelinessQosPolicyKind::AUTOMATIC_LIVELINESS_QOS};
    rtps::Duration_t lease_duration{rtps::Time_t::TIME_INFINITE()};
};

struct LivelinessChangedStatus
{
    int32_t alive_count{0};                
    int32_t not_alive_count{0};            
    int32_t alive_count_change{0};         
    int32_t not_alive_count_change{0};     
    rtps::GUID_t last_publication_handle;  
};

struct LivelinessLostStatus
{
    int32_t total_count{0};         
    int32_t total_count_change{0};  
};

using LivelinessChangedCallback = std::function<void(const LivelinessChangedStatus&)>;
using LivelinessLostCallback = std::function<void(const LivelinessLostStatus&)>;

struct TrackedWriter
{
    rtps::GUID_t guid;
    LivelinessQosPolicyKind kind;
    std::chrono::steady_clock::time_point last_asserted;
    std::chrono::milliseconds lease_duration;
    bool alive{true};
    bool is_local{false};
};

class LivelinessManager
{
public:
    explicit LivelinessManager(const rtps::GUID_t& participant_guid);

    ~LivelinessManager();

    // Non-copyable
    LivelinessManager(const LivelinessManager&) = delete;
    LivelinessManager& operator=(const LivelinessManager&) = delete;

    void start();

    void stop();

    void register_writer(const rtps::GUID_t& writer_guid, const LivelinessQosPolicy& qos);

    void unregister_writer(const rtps::GUID_t& writer_guid);

    void add_remote_writer(const rtps::GUID_t& writer_guid, const LivelinessQosPolicy& qos);

    void remove_remote_writer(const rtps::GUID_t& writer_guid);

    bool assert_liveliness(const rtps::GUID_t& writer_guid);

    bool assert_liveliness_for_participant();

    void on_data_received(const rtps::GUID_t& writer_guid);

    LivelinessStatus get_status(const rtps::GUID_t& writer_guid) const;

    LivelinessChangedStatus get_liveliness_changed_status() const;

    void set_liveliness_changed_callback(LivelinessChangedCallback callback);

    void set_liveliness_lost_callback(LivelinessLostCallback callback);

    bool is_writer_alive(const rtps::GUID_t& writer_guid) const;

    int32_t alive_count() const;

    int32_t not_alive_count() const;

private:
    void monitoring_loop();

    void check_lease_expiration();

    void assert_automatic_liveliness();

    static std::chrono::milliseconds duration_to_ms(const rtps::Duration_t& duration);

    void notify_liveliness_changed(const rtps::GUID_t& writer_guid, bool alive);

    void notify_liveliness_lost(const rtps::GUID_t& writer_guid);

private:
    rtps::GUID_t participant_guid_;

    // Writer tracking
    mutable std::mutex writers_mutex_;
    std::map<rtps::GUID_t, TrackedWriter> tracked_writers_;

    // Status tracking
    mutable std::mutex status_mutex_;
    LivelinessChangedStatus liveliness_changed_status_;
    LivelinessLostStatus liveliness_lost_status_;

    // Callbacks
    LivelinessChangedCallback liveliness_changed_callback_;
    LivelinessLostCallback liveliness_lost_callback_;

    // Monitoring thread
    std::thread monitoring_thread_;
    std::atomic<bool> running_{false};
    std::mutex stop_mutex_;
    std::condition_variable stop_cv_;
    std::chrono::milliseconds check_interval_{100};  // 100ms default check interval
};

class WriterLiveliness
{
public:
    WriterLiveliness(LivelinessManager& manager, const rtps::GUID_t& writer_guid, const LivelinessQosPolicy& qos);

    ~WriterLiveliness();

    bool assert_liveliness();

    void on_data_written();

    LivelinessLostStatus get_liveliness_lost_status() const;

    void set_listener(LivelinessLostCallback callback);

private:
    LivelinessManager& manager_;
    rtps::GUID_t writer_guid_;
    LivelinessQosPolicy qos_;
    LivelinessLostCallback callback_;
};

class ReaderLiveliness
{
public:
    ReaderLiveliness(LivelinessManager& manager, const rtps::GUID_t& reader_guid, const LivelinessQosPolicy& qos);

    ~ReaderLiveliness() = default;

    LivelinessChangedStatus get_liveliness_changed_status() const;

    void set_listener(LivelinessChangedCallback callback);

    bool is_matched_writer_alive(const rtps::GUID_t& writer_guid) const;

private:
    LivelinessManager& manager_;
    rtps::GUID_t reader_guid_;
    LivelinessQosPolicy qos_;
};

}  // namespace astutedds::dcps

#endif  // ASTUTEDDS_DCPS_LIVELINESS_MANAGER_HPP