File liveliness_manager.hpp
File List > astutedds > dcps > liveliness_manager.hpp
Go to the documentation of this file
//
// Copyright (c) 2026, Astute Systems PTY LTD
//
// This file is part of the Astute DDS developed by Astute Systems.
//
// See the commercial LICENSE file in the project root for full license details.
//
// @file liveliness_manager.hpp
// @brief DDS Liveliness QoS Policy Manager
//
// Implements AUTOMATIC, MANUAL_BY_PARTICIPANT, and MANUAL_BY_TOPIC liveliness policies.
// Reference: DDS DCPS specification 2.2.3.10
//
#ifndef ASTUTEDDS_DCPS_LIVELINESS_MANAGER_HPP
#define ASTUTEDDS_DCPS_LIVELINESS_MANAGER_HPP
#include <astutedds/dcps/qos.hpp>
#include <astutedds/rtps/rtps_types.hpp>
#include <atomic>
#include <chrono>
#include <condition_variable>
#include <functional>
#include <map>
#include <memory>
#include <mutex>
#include <string>
#include <thread>
#include <vector>
namespace astutedds::dcps
{
// Forward declarations
class DataWriter;
class DataReader;
class DomainParticipant;
struct LivelinessStatus
{
bool alive{true};
std::chrono::steady_clock::time_point last_asserted;
LivelinessQosPolicyKind kind{LivelinessQosPolicyKind::AUTOMATIC_LIVELINESS_QOS};
rtps::Duration_t lease_duration{rtps::Time_t::TIME_INFINITE()};
};
struct LivelinessChangedStatus
{
int32_t alive_count{0};
int32_t not_alive_count{0};
int32_t alive_count_change{0};
int32_t not_alive_count_change{0};
rtps::GUID_t last_publication_handle;
};
struct LivelinessLostStatus
{
int32_t total_count{0};
int32_t total_count_change{0};
};
using LivelinessChangedCallback = std::function<void(const LivelinessChangedStatus&)>;
using LivelinessLostCallback = std::function<void(const LivelinessLostStatus&)>;
struct TrackedWriter
{
rtps::GUID_t guid;
LivelinessQosPolicyKind kind;
std::chrono::steady_clock::time_point last_asserted;
std::chrono::milliseconds lease_duration;
bool alive{true};
bool is_local{false};
};
class LivelinessManager
{
public:
explicit LivelinessManager(const rtps::GUID_t& participant_guid);
~LivelinessManager();
// Non-copyable
LivelinessManager(const LivelinessManager&) = delete;
LivelinessManager& operator=(const LivelinessManager&) = delete;
void start();
void stop();
void register_writer(const rtps::GUID_t& writer_guid, const LivelinessQosPolicy& qos);
void unregister_writer(const rtps::GUID_t& writer_guid);
void add_remote_writer(const rtps::GUID_t& writer_guid, const LivelinessQosPolicy& qos);
void remove_remote_writer(const rtps::GUID_t& writer_guid);
bool assert_liveliness(const rtps::GUID_t& writer_guid);
bool assert_liveliness_for_participant();
void on_data_received(const rtps::GUID_t& writer_guid);
LivelinessStatus get_status(const rtps::GUID_t& writer_guid) const;
LivelinessChangedStatus get_liveliness_changed_status() const;
void set_liveliness_changed_callback(LivelinessChangedCallback callback);
void set_liveliness_lost_callback(LivelinessLostCallback callback);
bool is_writer_alive(const rtps::GUID_t& writer_guid) const;
int32_t alive_count() const;
int32_t not_alive_count() const;
private:
void monitoring_loop();
void check_lease_expiration();
void assert_automatic_liveliness();
static std::chrono::milliseconds duration_to_ms(const rtps::Duration_t& duration);
void notify_liveliness_changed(const rtps::GUID_t& writer_guid, bool alive);
void notify_liveliness_lost(const rtps::GUID_t& writer_guid);
private:
rtps::GUID_t participant_guid_;
// Writer tracking
mutable std::mutex writers_mutex_;
std::map<rtps::GUID_t, TrackedWriter> tracked_writers_;
// Status tracking
mutable std::mutex status_mutex_;
LivelinessChangedStatus liveliness_changed_status_;
LivelinessLostStatus liveliness_lost_status_;
// Callbacks
LivelinessChangedCallback liveliness_changed_callback_;
LivelinessLostCallback liveliness_lost_callback_;
// Monitoring thread
std::thread monitoring_thread_;
std::atomic<bool> running_{false};
std::mutex stop_mutex_;
std::condition_variable stop_cv_;
std::chrono::milliseconds check_interval_{100}; // 100ms default check interval
};
class WriterLiveliness
{
public:
WriterLiveliness(LivelinessManager& manager, const rtps::GUID_t& writer_guid, const LivelinessQosPolicy& qos);
~WriterLiveliness();
bool assert_liveliness();
void on_data_written();
LivelinessLostStatus get_liveliness_lost_status() const;
void set_listener(LivelinessLostCallback callback);
private:
LivelinessManager& manager_;
rtps::GUID_t writer_guid_;
LivelinessQosPolicy qos_;
LivelinessLostCallback callback_;
};
class ReaderLiveliness
{
public:
ReaderLiveliness(LivelinessManager& manager, const rtps::GUID_t& reader_guid, const LivelinessQosPolicy& qos);
~ReaderLiveliness() = default;
LivelinessChangedStatus get_liveliness_changed_status() const;
void set_listener(LivelinessChangedCallback callback);
bool is_matched_writer_alive(const rtps::GUID_t& writer_guid) const;
private:
LivelinessManager& manager_;
rtps::GUID_t reader_guid_;
LivelinessQosPolicy qos_;
};
} // namespace astutedds::dcps
#endif // ASTUTEDDS_DCPS_LIVELINESS_MANAGER_HPP