File udp_transport.hpp

File List > astutedds > rtps > udp_transport.hpp

Go to the documentation of this file


#ifndef ASTUTEDDS_RTPS_UDP_TRANSPORT_HPP
#define ASTUTEDDS_RTPS_UDP_TRANSPORT_HPP

#include <astutedds/rtps/rtps_types.hpp>
#include <astutedds/rtps/logging.hpp>
#include <astutedds/rtps/discovery_config.hpp>
#include <astutedds/rtps/type_lookup_wire.hpp>
#include <astutedds/dcps/qos.hpp>
#include <functional>
#include <memory>
#include <map>
#include <set>
#include <string>
#include <vector>
#include <array>
#include <filesystem>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <atomic>
#include <chrono>

namespace astutedds::rtps
{

    // RTPS Protocol constants
    static constexpr uint8_t RTPS_MAGIC[4] = {'R', 'T', 'P', 'S'};
    static constexpr uint8_t RTPS_VERSION_MAJOR = 2;
    static constexpr uint8_t RTPS_VERSION_MINOR = 5;
    static constexpr uint8_t ASTUTEDDS_VENDOR_ID[2] = {0x01, 0x21};

    // Port calculation constants per RTPS 2.5 spec
    static constexpr uint16_t RTPS_PORT_BASE = 7400;
    static constexpr uint16_t RTPS_DOMAIN_GAIN = 250;
    static constexpr uint16_t RTPS_PARTICIPANT_GAIN = 2;

    // Standard RTPS multicast addresses (per RTPS spec Table 9.8)
    static constexpr const char *RTPS_DISCOVERY_MULTICAST_ADDRESS = "239.255.0.1";
    // Default user data multicast address per RTPS spec Table 9.8.
    // Used as fallback when no PID_DEFAULT_MULTICAST_LOCATOR or
    // PID_MULTICAST_LOCATOR has been learned from remote participants.
    static constexpr const char *RTPS_DEFAULT_USER_MULTICAST_ADDRESS = "239.255.0.2";

    // CDR encapsulation identifiers
    static constexpr uint16_t CDR_BE = 0x0000;
    static constexpr uint16_t CDR_LE = 0x0001;
    static constexpr uint16_t PL_CDR_BE = 0x0002;
    static constexpr uint16_t PL_CDR_LE = 0x0003;

    struct DiscoveredParticipant
    {
        GuidPrefix_t guidPrefix;
        std::array<uint8_t, 4> metatrafficUnicastAddress{};
        uint16_t metatrafficUnicastPort{0};
        std::array<uint8_t, 4> defaultUnicastAddress{};
        uint16_t defaultUnicastPort{0};
        std::array<uint8_t, 4> defaultMulticastAddress{};
        uint16_t defaultMulticastPort{0};
        uint32_t leaseDurationSec{100};  // From PID_PARTICIPANT_LEASE_DURATION (default 100s per RTPS 2.5)
        std::array<uint8_t, 2> vendorId{};            // PID_VENDOR_ID (§8.2.4.1)
        std::array<uint8_t, 2> protocolVersion{2, 1}; // PID_PROTOCOL_VERSION (§8.2.4.1) — default RTPS 2.1
        std::string participantName;                  // PID_ENTITY_NAME (0x0062) — optional
        std::chrono::steady_clock::time_point lastSeen;
    };

    struct DiscoveredWriter
    {
        GUID_t guid;
        std::string topicName;
        std::string typeName;
        std::array<uint8_t, 4> unicastAddress{};
        uint16_t unicastPort{0};
        std::vector<std::string> partitions;
        std::chrono::steady_clock::time_point lastSeen;

        // QoS fields parsed from SEDP
        int32_t reliabilityKind{-1};   // -1 = not present; 1 = BEST_EFFORT, 2 = RELIABLE (RTPS wire values)
        int32_t durabilityKind{-1};    // -1 = not present; 0 = VOLATILE, 1 = TRANSIENT_LOCAL, 2 = TRANSIENT, 3 = PERSISTENT
        int32_t ownershipKind{-1};     // -1 = not present; 0 = SHARED, 1 = EXCLUSIVE
        int32_t ownershipStrength{0};
        int32_t deadlineSec{-1};       // -1 = not present (infinite)
        uint32_t deadlineNanosec{0};
        std::vector<uint16_t> dataRepresentations;  // PID_DATA_REPRESENTATION values

        bool supportsXCDR1() const {
            if (dataRepresentations.empty()) return true;  // default per spec
            for (auto r : dataRepresentations)
                if (r == 0x0000) return true;
            return false;
        }
        bool supportsXCDR2() const {
            for (auto r : dataRepresentations)
                if (r == 0x0002) return true;
            return false;
        }

        std::array<uint8_t, 14> completeTypeIdHash{};
        bool hasCompleteTypeId{false};
    };

    struct DiscoveredReader
    {
        GUID_t guid;
        std::string topicName;
        std::string typeName;
        std::array<uint8_t, 4> unicastAddress{};
        uint16_t unicastPort{0};
        std::array<uint8_t, 4> multicastAddress{};
        uint16_t multicastPort{0};
        std::vector<uint16_t> dataRepresentations;  // PID_DATA_REPRESENTATION values (e.g. 0x0000=XCDR1, 0x0002=XCDR2)
        std::vector<std::string> partitions;
        std::chrono::steady_clock::time_point lastSeen;

        // QoS fields parsed from SEDP
        int32_t reliabilityKind{-1};   // -1 = not present; 1 = BEST_EFFORT, 2 = RELIABLE (RTPS wire values)
        int32_t durabilityKind{-1};    // -1 = not present; 0 = VOLATILE, 1 = TRANSIENT_LOCAL, 2 = TRANSIENT, 3 = PERSISTENT
        int32_t ownershipKind{-1};     // -1 = not present; 0 = SHARED, 1 = EXCLUSIVE
        int32_t deadlineSec{-1};       // -1 = not present (infinite)
        uint32_t deadlineNanosec{0};

        bool supportsXCDR1() const {
            if (dataRepresentations.empty()) return true;  // default per spec
            for (auto r : dataRepresentations)
                if (r == 0x0000) return true;
            return false;
        }
        bool supportsXCDR2() const {
            for (auto r : dataRepresentations)
                if (r == 0x0002) return true;
            return false;
        }

        std::array<uint8_t, 14> completeTypeIdHash{};
        bool hasCompleteTypeId{false};

        bool hasUnicast() const {
            return unicastPort != 0;
        }
        bool hasMulticast() const {
            return multicastPort != 0;
        }
    };

    using DataReceivedCallback = std::function<void(
        const std::string &topicName,
        const std::vector<uint8_t> &data,
        const GUID_t &writerGuid,
        const SequenceNumber_t &sequenceNumber)>;

    using ParticipantDiscoveredCallback = std::function<void(
        const GuidPrefix_t &guidPrefix)>;

    using WriterDiscoveredCallback = std::function<void(
        const DiscoveredWriter &writer)>;

    using ReaderDiscoveredCallback = std::function<void(
        const DiscoveredReader &reader)>;

    struct DiscoveryRuntimeInfo
    {
        bool config_loaded{false};
        bool auto_reload_enabled{false};
        bool allow_multicast{true};
        bool strict_validation{false};
        uint32_t domain_id{0};
        std::array<uint8_t, 4> selected_interface{127, 0, 0, 1};
        std::string config_path;
        std::vector<std::string> peers;
        std::string last_reload_error;
    };

    class RtpsUdpTransport
    {
    public:
        explicit RtpsUdpTransport(uint32_t domainId = 0);
        RtpsUdpTransport(uint32_t domainId, uint32_t participantId);
        ~RtpsUdpTransport();

        // Non-copyable
        RtpsUdpTransport(const RtpsUdpTransport &) = delete;
        RtpsUdpTransport &operator=(const RtpsUdpTransport &) = delete;

        bool init();

        void stop();

        bool is_running() const { return running_.load(); }

        const GuidPrefix_t &guid_prefix() const { return guidPrefix_; }

        uint32_t participant_id() const { return participantId_; }

        // =====================================================================
        // Discovery configuration
        // =====================================================================

        void set_discovery_config(const DiscoveryConfig& config);

        bool load_discovery_config_file(const std::string& path);

        bool reload_discovery_config();

        DiscoveryRuntimeInfo get_discovery_runtime_info() const;

        const DiscoveryConfig& discovery_config() const { return discovery_config_; }

        // =====================================================================
        // Publishing
        // =====================================================================

        EntityId_t register_writer(const std::string &topicName,
                                   const std::string &typeName,
                                   const std::vector<std::string> &partitions = {},
                                   const dcps::DataWriterQos &qos = {});

        void unregister_writer(const EntityId_t &entityId);

        bool publish(const std::string &topicName,
                     const std::vector<uint8_t> &data);

        bool publish(const std::string &topicName,
                     const std::vector<uint8_t> &xcdr1Data,
                     const std::vector<uint8_t> &xcdr2Data);

        // =====================================================================
        // Subscribing
        // =====================================================================

        EntityId_t register_reader(const std::string &topicName,
                                   const std::string &typeName,
                                   const std::vector<std::string> &partitions = {},
                                   const dcps::DataReaderQos &qos = {});

        void unregister_reader(const EntityId_t &entityId);

        void subscribe(const std::string &topicName);

        void unsubscribe(const std::string &topicName);

        // =====================================================================
        // Callbacks
        // =====================================================================

        void set_data_callback(DataReceivedCallback callback)
        {
            dataCallback_ = std::move(callback);
        }

        void set_participant_discovered_callback(ParticipantDiscoveredCallback callback)
        {
            participantCallback_ = std::move(callback);
        }

        void set_writer_discovered_callback(WriterDiscoveredCallback callback)
        {
            writerCallback_ = std::move(callback);
        }

        void set_reader_discovered_callback(ReaderDiscoveredCallback callback)
        {
            readerCallback_ = std::move(callback);
        }

        // =====================================================================
        // Discovery info
        // =====================================================================

        std::vector<DiscoveredParticipant> get_discovered_participants() const;

        std::vector<DiscoveredWriter> get_discovered_writers(
            const std::string &topicName) const;

        std::vector<DiscoveredWriter> get_all_discovered_writers() const;

        std::vector<DiscoveredReader> get_discovered_readers(
            const std::string &topicName) const;

        std::vector<DiscoveredReader> get_all_discovered_readers() const;

        // =====================================================================
        // XTypes TypeLookup Service (DDS-XTypes 1.3 §7.6.3)
        // =====================================================================

        using TypeLookupCallback =
            std::function<void(const std::string& topicName,
                               const std::vector<std::string>& fieldNames,
                               const std::vector<uint8_t>& fieldKinds,
                               const std::vector<ParsedTypeLookupReply::NestedFieldSchema>& nestedSchemas)>;

        void set_type_lookup_callback(TypeLookupCallback cb)
        {
            tlsCallback_ = std::move(cb);
        }

        void send_type_lookup_request(const GuidPrefix_t&          remoteGuid,
                                      const std::array<uint8_t, 4>& metatrafficAddr,
                                      uint16_t                      metatrafficPort,
                                      const std::string&            topicName,
                                      const std::array<uint8_t, 14>& hash);

        // =====================================================================
        // Traffic statistics
        // =====================================================================

        struct TopicTrafficStats
        {
            uint64_t packetsReceived{0};  
            uint64_t bytesReceived{0};    
            std::chrono::steady_clock::time_point lastReceived{};
        };

        std::map<std::string, TopicTrafficStats> get_all_traffic_stats() const;

    private:
        // Port calculations per RTPS spec
        uint16_t getDiscoveryMulticastPort() const;
        uint16_t getDiscoveryUnicastPort() const;
        uint16_t getUserMulticastPort() const;
        uint16_t getUserUnicastPort() const;

        // GUID generation
        void generateGuidPrefix();
        bool resolve_local_interface_selection();
        bool apply_discovery_config(const DiscoveryConfig& config);
        bool maybe_reload_discovery_config();
        uint32_t getNextWriterId();
        uint32_t getNextReaderId();

        // Socket management
        bool createSockets();
        void closeSockets();

        // Receive thread
        void receiveLoop();

        // Discovery
        void discoveryLoop();
        std::vector<uint8_t> buildSPDPAnnouncementMessage();
        void sendSPDPAnnouncement();
        void sendSPDPAnnouncementTo(const std::array<uint8_t, 4> &address, uint16_t port);
        void sendSEDPAnnouncement(const std::string &topicName, bool isWriter,
                                  const std::vector<std::string> &partitions = {});
        void resendStoredSEDPTo(const GuidPrefix_t &remoteGuidPrefix,
                                const std::array<uint8_t, 4> &destAddr,
                                uint16_t destPort);
        void sendPeriodicSEDPHeartbeats();
        void sendInitialAcknacks(const GuidPrefix_t &remoteGuidPrefix,
                                 const std::array<uint8_t, 4> &destAddr,
                                 uint16_t destPort);

        // RTPS message building
        std::vector<uint8_t> buildRtpsHeader();
        std::vector<uint8_t> buildDataSubmessage(const EntityId_t &writerId,
                                                 const EntityId_t &readerId,
                                                 uint64_t sequenceNumber,
                                                 const std::vector<uint8_t> &payload);
        std::vector<uint8_t> buildHeartbeatSubmessage(const EntityId_t &writerId,
                                                      const EntityId_t &readerId,
                                                      uint64_t firstSeqNum,
                                                      uint64_t lastSeqNum,
                                                      uint32_t count);
        std::vector<uint8_t> buildAckNackSubmessage(const EntityId_t &readerId,
                                                    const EntityId_t &writerId,
                                                    uint64_t firstSeqNum,
                                                    uint64_t lastSeqNum,
                                                    uint32_t count);

        // RTPS message parsing
        void parseRtpsMessage(const uint8_t *data, size_t len,
                              const std::array<uint8_t, 4> &senderAddr,
                              uint16_t senderPort);
        void parseDataSubmessage(const uint8_t *data, size_t len,
                                 uint8_t flags,
                                 bool littleEndian,
                                 const GuidPrefix_t &senderGuid);
        void parseHeartbeat(const uint8_t *data, size_t len,
                            uint8_t flags,
                            bool littleEndian,
                            const GuidPrefix_t &senderGuid,
                            const std::array<uint8_t, 4> &senderAddr,
                            uint16_t senderPort);
        void parseAckNack(const uint8_t *data, size_t len,
                          bool littleEndian,
                          const GuidPrefix_t &senderGuid,
                          const uint8_t *senderAddr,
                          uint16_t senderPort);
        void parseSPDP(const uint8_t *data, size_t len,
                       bool littleEndian,
                       const GuidPrefix_t &senderGuid);
        void parseSEDPPublication(const uint8_t *data, size_t len,
                                  bool littleEndian,
                                  const GuidPrefix_t &senderGuid);
        void parseSEDPSubscription(const uint8_t *data, size_t len,
                                   bool littleEndian,
                                   const GuidPrefix_t &senderGuid);

        void parseTlsReply(const uint8_t *data, size_t len);
        void parseTlsRequest(const uint8_t *data, size_t len,
                     const GuidPrefix_t &senderGuid);
        void send_type_lookup_reply(const GuidPrefix_t& remoteGuid,
                        const std::array<uint8_t, 4>& metatrafficAddr,
                        uint16_t metatrafficPort,
                        const std::array<uint8_t, 24>& relatedRequestId,
                        const std::string& typeName,
                        const std::vector<std::string>& fieldNames,
                        const std::vector<uint8_t>& fieldKinds,
                        const std::vector<ParsedTypeLookupReply::NestedFieldSchema>& nestedSchemas = {});

        // Topic lookup from writer GUID
        std::string lookupTopicForWriter(const GuidPrefix_t &prefix,
                                         const EntityId_t &entityId) const;

        // Participant lookup
        bool getParticipantLocator(const GuidPrefix_t &prefix,
                                   std::array<uint8_t, 4> &address,
                                   uint16_t &port) const;

        // Send helpers
        void sendToMulticast(const std::vector<uint8_t> &data, bool discovery);
        void sendToUnicast(const std::vector<uint8_t> &data,
                           const std::array<uint8_t, 4> &address,
                           uint16_t port,
                           bool discovery);

        // Configuration
        uint32_t domainId_;
        uint32_t participantId_;
        GuidPrefix_t guidPrefix_;
        DiscoveryConfig discovery_config_;
        bool discovery_config_set_{false};
        std::array<uint8_t, 4> selected_local_ip_{127, 0, 0, 1};
        std::string discovery_config_path_;
        bool discovery_auto_reload_enabled_{false};
        std::chrono::steady_clock::time_point last_discovery_reload_check_{};
        std::filesystem::file_time_type discovery_config_mtime_{};
        std::string discovery_reload_error_;

        // Entity ID counters
        std::atomic<uint32_t> nextWriterId_{0x100};
        std::atomic<uint32_t> nextReaderId_{0x100};

        // Global sequence number for SPDP only.
        std::atomic<uint64_t> sequenceNumber_{1};

        // Per-writer sequence counters for user-data writers.
        // Each user writer has its own independent sequence space starting at 1.
        std::map<std::string, uint64_t> writerSeqNums_;  // topic -> next seqNum

        // Sockets (file descriptors)
        int metatrafficMulticastFd_{-1};
        int metatrafficUnicastFd_{-1};
        int usertrafficMulticastFd_{-1};
        int usertrafficUnicastFd_{-1};

        // Thread management
        std::atomic<bool> running_{false};
        std::mutex stop_mutex_;
        std::condition_variable stop_cv_;
        std::thread receiveThread_;
        std::thread discoveryThread_;

        // Mutex for thread safety (recursive to allow nested locking)
        mutable std::recursive_mutex mutex_;

        // Subscribed topics
        std::set<std::string> subscribedTopics_;

        // Writer EntityIds per topic
        std::map<std::string, EntityId_t> writerEntities_;

        // Writer type names per topic (for SEDP announcements)
        std::map<std::string, std::string> writerTypeNames_;

        // Writer partitions per topic
        std::map<std::string, std::vector<std::string>> writerPartitions_;

        // Reader EntityIds per topic
        std::map<std::string, EntityId_t> readerEntities_;

        // Reader type names per topic (for SEDP announcements)
        std::map<std::string, std::string> readerTypeNames_;

        // Reader partitions per topic
        std::map<std::string, std::vector<std::string>> readerPartitions_;

        // Writer QoS per topic (for SEDP announcements)
        std::map<std::string, dcps::DataWriterQos> writerQos_;

        // Reader QoS per topic (for SEDP announcements)
        std::map<std::string, dcps::DataReaderQos> readerQos_;

        // Discovered participants (keyed by GUID prefix)
        std::map<std::array<uint8_t, 12>, DiscoveredParticipant> discoveredParticipants_;

        // Discovered writers (keyed by topic name)
        std::map<std::string, std::vector<DiscoveredWriter>> discoveredWriters_;

        // Discovered readers (keyed by topic name)
        std::map<std::string, std::vector<DiscoveredReader>> discoveredReaders_;

        // GUID key for mapping
        struct GUIDKey
        {
            GuidPrefix_t prefix;
            EntityId_t entityId;

            bool operator<(const GUIDKey &other) const;
        };

        // Writer GUID to topic mapping
        std::map<GUIDKey, std::string> writerToTopic_;

        // Per-writer sequence tracking for reliable delivery
        struct WriterState
        {
            uint64_t highestReceivedSeqNum{0};
            std::set<uint64_t> receivedSeqNums;
            uint64_t firstAvailable{1};
            uint64_t lastAvailable{0};
            std::array<uint8_t, 4> unicastAddress{};
            uint16_t unicastPort{0};
        };
        std::map<GUIDKey, WriterState> writerStates_;

        // Sent sample cache for retransmission (per-topic → seqNum → sample)
        struct SentSample
        {
            std::string topicName;
            std::vector<uint8_t> serializedData;   // XCDR1 payload (universal fallback)
            std::vector<uint8_t> xcdr2Data;        // XCDR2 payload (for XCDR2-only readers)
            uint64_t sequenceNumber;
            std::chrono::steady_clock::time_point timestamp;
        };
        std::map<std::string, std::map<uint64_t, SentSample>> sentSamples_;
        static constexpr size_t MAX_SENT_SAMPLES_PER_TOPIC = 600;

        // ACKNACK counters
        std::map<GUIDKey, uint32_t> acknackCounts_;

        // TRANSIENT_LOCAL reorder buffer: buffer out-of-order samples and
        // deliver in seqNum order so historical data arrives before live data.
        std::map<GUIDKey, std::map<uint64_t, std::vector<uint8_t>>> reorderBuffers_;
        std::map<GUIDKey, uint64_t> nextDeliverSeqNum_;
        std::map<GUIDKey, std::chrono::steady_clock::time_point> reorderStartTime_;

        // ACKNACK rate-limiting: suppress redundant responses to HEARTBEATs.
        struct AcknackState {
            std::chrono::steady_clock::time_point lastSendTime{};
            uint32_t lastRespondedHBCount{0};
            bool lastWasCaughtUp{false};
        };
        std::map<GUIDKey, AcknackState> acknackRateLimit_;

        // TypeLookup Service state (DDS-XTypes §7.6.3)
        TypeLookupCallback tlsCallback_;
        std::atomic<uint64_t> tlsSeqNum_{1};  
        std::map<uint64_t, std::string> tlsPendingTopics_;  
        std::set<std::string> tlsRequestedTopics_;          

        // SEDP message cache for retransmission
        struct SEDPMessage
        {
            std::vector<uint8_t> data;
            uint64_t sequenceNumber;
            bool isWriter; // true for publication, false for subscription
        };
        std::map<std::string, SEDPMessage> sedpPublications_;  // topic -> message
        std::map<std::string, SEDPMessage> sedpSubscriptions_; // topic -> message
        uint64_t sedpPubSeqNum_{0};
        uint64_t sedpSubSeqNum_{0};

        // Per-topic traffic statistics (user data, not discovery)
        std::map<std::string, TopicTrafficStats> trafficStats_;

        // Callbacks
        DataReceivedCallback dataCallback_;
        ParticipantDiscoveredCallback participantCallback_;
        WriterDiscoveredCallback writerCallback_;
        ReaderDiscoveredCallback readerCallback_;
    };

} // namespace astutedds::rtps

#endif // ASTUTEDDS_RTPS_UDP_TRANSPORT_HPP