File udp_transport.hpp
File List > astutedds > rtps > udp_transport.hpp
Go to the documentation of this file
#ifndef ASTUTEDDS_RTPS_UDP_TRANSPORT_HPP
#define ASTUTEDDS_RTPS_UDP_TRANSPORT_HPP
#include <astutedds/rtps/rtps_types.hpp>
#include <astutedds/rtps/logging.hpp>
#include <astutedds/rtps/discovery_config.hpp>
#include <astutedds/rtps/type_lookup_wire.hpp>
#include <astutedds/dcps/qos.hpp>
#include <functional>
#include <memory>
#include <map>
#include <set>
#include <string>
#include <vector>
#include <array>
#include <filesystem>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <atomic>
#include <chrono>
namespace astutedds::rtps
{
// RTPS Protocol constants
static constexpr uint8_t RTPS_MAGIC[4] = {'R', 'T', 'P', 'S'};
static constexpr uint8_t RTPS_VERSION_MAJOR = 2;
static constexpr uint8_t RTPS_VERSION_MINOR = 5;
static constexpr uint8_t ASTUTEDDS_VENDOR_ID[2] = {0x01, 0x21};
// Port calculation constants per RTPS 2.5 spec
static constexpr uint16_t RTPS_PORT_BASE = 7400;
static constexpr uint16_t RTPS_DOMAIN_GAIN = 250;
static constexpr uint16_t RTPS_PARTICIPANT_GAIN = 2;
// Standard RTPS multicast addresses (per RTPS spec Table 9.8)
static constexpr const char *RTPS_DISCOVERY_MULTICAST_ADDRESS = "239.255.0.1";
// Default user data multicast address per RTPS spec Table 9.8.
// Used as fallback when no PID_DEFAULT_MULTICAST_LOCATOR or
// PID_MULTICAST_LOCATOR has been learned from remote participants.
static constexpr const char *RTPS_DEFAULT_USER_MULTICAST_ADDRESS = "239.255.0.2";
// CDR encapsulation identifiers
static constexpr uint16_t CDR_BE = 0x0000;
static constexpr uint16_t CDR_LE = 0x0001;
static constexpr uint16_t PL_CDR_BE = 0x0002;
static constexpr uint16_t PL_CDR_LE = 0x0003;
struct DiscoveredParticipant
{
GuidPrefix_t guidPrefix;
std::array<uint8_t, 4> metatrafficUnicastAddress{};
uint16_t metatrafficUnicastPort{0};
std::array<uint8_t, 4> defaultUnicastAddress{};
uint16_t defaultUnicastPort{0};
std::array<uint8_t, 4> defaultMulticastAddress{};
uint16_t defaultMulticastPort{0};
uint32_t leaseDurationSec{100}; // From PID_PARTICIPANT_LEASE_DURATION (default 100s per RTPS 2.5)
std::array<uint8_t, 2> vendorId{}; // PID_VENDOR_ID (§8.2.4.1)
std::array<uint8_t, 2> protocolVersion{2, 1}; // PID_PROTOCOL_VERSION (§8.2.4.1) — default RTPS 2.1
std::string participantName; // PID_ENTITY_NAME (0x0062) — optional
std::chrono::steady_clock::time_point lastSeen;
};
struct DiscoveredWriter
{
GUID_t guid;
std::string topicName;
std::string typeName;
std::array<uint8_t, 4> unicastAddress{};
uint16_t unicastPort{0};
std::vector<std::string> partitions;
std::chrono::steady_clock::time_point lastSeen;
// QoS fields parsed from SEDP
int32_t reliabilityKind{-1}; // -1 = not present; 1 = BEST_EFFORT, 2 = RELIABLE (RTPS wire values)
int32_t durabilityKind{-1}; // -1 = not present; 0 = VOLATILE, 1 = TRANSIENT_LOCAL, 2 = TRANSIENT, 3 = PERSISTENT
int32_t ownershipKind{-1}; // -1 = not present; 0 = SHARED, 1 = EXCLUSIVE
int32_t ownershipStrength{0};
int32_t deadlineSec{-1}; // -1 = not present (infinite)
uint32_t deadlineNanosec{0};
std::vector<uint16_t> dataRepresentations; // PID_DATA_REPRESENTATION values
bool supportsXCDR1() const {
if (dataRepresentations.empty()) return true; // default per spec
for (auto r : dataRepresentations)
if (r == 0x0000) return true;
return false;
}
bool supportsXCDR2() const {
for (auto r : dataRepresentations)
if (r == 0x0002) return true;
return false;
}
std::array<uint8_t, 14> completeTypeIdHash{};
bool hasCompleteTypeId{false};
};
struct DiscoveredReader
{
GUID_t guid;
std::string topicName;
std::string typeName;
std::array<uint8_t, 4> unicastAddress{};
uint16_t unicastPort{0};
std::array<uint8_t, 4> multicastAddress{};
uint16_t multicastPort{0};
std::vector<uint16_t> dataRepresentations; // PID_DATA_REPRESENTATION values (e.g. 0x0000=XCDR1, 0x0002=XCDR2)
std::vector<std::string> partitions;
std::chrono::steady_clock::time_point lastSeen;
// QoS fields parsed from SEDP
int32_t reliabilityKind{-1}; // -1 = not present; 1 = BEST_EFFORT, 2 = RELIABLE (RTPS wire values)
int32_t durabilityKind{-1}; // -1 = not present; 0 = VOLATILE, 1 = TRANSIENT_LOCAL, 2 = TRANSIENT, 3 = PERSISTENT
int32_t ownershipKind{-1}; // -1 = not present; 0 = SHARED, 1 = EXCLUSIVE
int32_t deadlineSec{-1}; // -1 = not present (infinite)
uint32_t deadlineNanosec{0};
bool supportsXCDR1() const {
if (dataRepresentations.empty()) return true; // default per spec
for (auto r : dataRepresentations)
if (r == 0x0000) return true;
return false;
}
bool supportsXCDR2() const {
for (auto r : dataRepresentations)
if (r == 0x0002) return true;
return false;
}
std::array<uint8_t, 14> completeTypeIdHash{};
bool hasCompleteTypeId{false};
bool hasUnicast() const {
return unicastPort != 0;
}
bool hasMulticast() const {
return multicastPort != 0;
}
};
using DataReceivedCallback = std::function<void(
const std::string &topicName,
const std::vector<uint8_t> &data,
const GUID_t &writerGuid,
const SequenceNumber_t &sequenceNumber)>;
using ParticipantDiscoveredCallback = std::function<void(
const GuidPrefix_t &guidPrefix)>;
using WriterDiscoveredCallback = std::function<void(
const DiscoveredWriter &writer)>;
using ReaderDiscoveredCallback = std::function<void(
const DiscoveredReader &reader)>;
struct DiscoveryRuntimeInfo
{
bool config_loaded{false};
bool auto_reload_enabled{false};
bool allow_multicast{true};
bool strict_validation{false};
uint32_t domain_id{0};
std::array<uint8_t, 4> selected_interface{127, 0, 0, 1};
std::string config_path;
std::vector<std::string> peers;
std::string last_reload_error;
};
class RtpsUdpTransport
{
public:
explicit RtpsUdpTransport(uint32_t domainId = 0);
RtpsUdpTransport(uint32_t domainId, uint32_t participantId);
~RtpsUdpTransport();
// Non-copyable
RtpsUdpTransport(const RtpsUdpTransport &) = delete;
RtpsUdpTransport &operator=(const RtpsUdpTransport &) = delete;
bool init();
void stop();
bool is_running() const { return running_.load(); }
const GuidPrefix_t &guid_prefix() const { return guidPrefix_; }
uint32_t participant_id() const { return participantId_; }
// =====================================================================
// Discovery configuration
// =====================================================================
void set_discovery_config(const DiscoveryConfig& config);
bool load_discovery_config_file(const std::string& path);
bool reload_discovery_config();
DiscoveryRuntimeInfo get_discovery_runtime_info() const;
const DiscoveryConfig& discovery_config() const { return discovery_config_; }
// =====================================================================
// Publishing
// =====================================================================
EntityId_t register_writer(const std::string &topicName,
const std::string &typeName,
const std::vector<std::string> &partitions = {},
const dcps::DataWriterQos &qos = {});
void unregister_writer(const EntityId_t &entityId);
bool publish(const std::string &topicName,
const std::vector<uint8_t> &data);
bool publish(const std::string &topicName,
const std::vector<uint8_t> &xcdr1Data,
const std::vector<uint8_t> &xcdr2Data);
// =====================================================================
// Subscribing
// =====================================================================
EntityId_t register_reader(const std::string &topicName,
const std::string &typeName,
const std::vector<std::string> &partitions = {},
const dcps::DataReaderQos &qos = {});
void unregister_reader(const EntityId_t &entityId);
void subscribe(const std::string &topicName);
void unsubscribe(const std::string &topicName);
// =====================================================================
// Callbacks
// =====================================================================
void set_data_callback(DataReceivedCallback callback)
{
dataCallback_ = std::move(callback);
}
void set_participant_discovered_callback(ParticipantDiscoveredCallback callback)
{
participantCallback_ = std::move(callback);
}
void set_writer_discovered_callback(WriterDiscoveredCallback callback)
{
writerCallback_ = std::move(callback);
}
void set_reader_discovered_callback(ReaderDiscoveredCallback callback)
{
readerCallback_ = std::move(callback);
}
// =====================================================================
// Discovery info
// =====================================================================
std::vector<DiscoveredParticipant> get_discovered_participants() const;
std::vector<DiscoveredWriter> get_discovered_writers(
const std::string &topicName) const;
std::vector<DiscoveredWriter> get_all_discovered_writers() const;
std::vector<DiscoveredReader> get_discovered_readers(
const std::string &topicName) const;
std::vector<DiscoveredReader> get_all_discovered_readers() const;
// =====================================================================
// XTypes TypeLookup Service (DDS-XTypes 1.3 §7.6.3)
// =====================================================================
using TypeLookupCallback =
std::function<void(const std::string& topicName,
const std::vector<std::string>& fieldNames,
const std::vector<uint8_t>& fieldKinds,
const std::vector<ParsedTypeLookupReply::NestedFieldSchema>& nestedSchemas)>;
void set_type_lookup_callback(TypeLookupCallback cb)
{
tlsCallback_ = std::move(cb);
}
void send_type_lookup_request(const GuidPrefix_t& remoteGuid,
const std::array<uint8_t, 4>& metatrafficAddr,
uint16_t metatrafficPort,
const std::string& topicName,
const std::array<uint8_t, 14>& hash);
// =====================================================================
// Traffic statistics
// =====================================================================
struct TopicTrafficStats
{
uint64_t packetsReceived{0};
uint64_t bytesReceived{0};
std::chrono::steady_clock::time_point lastReceived{};
};
std::map<std::string, TopicTrafficStats> get_all_traffic_stats() const;
private:
// Port calculations per RTPS spec
uint16_t getDiscoveryMulticastPort() const;
uint16_t getDiscoveryUnicastPort() const;
uint16_t getUserMulticastPort() const;
uint16_t getUserUnicastPort() const;
// GUID generation
void generateGuidPrefix();
bool resolve_local_interface_selection();
bool apply_discovery_config(const DiscoveryConfig& config);
bool maybe_reload_discovery_config();
uint32_t getNextWriterId();
uint32_t getNextReaderId();
// Socket management
bool createSockets();
void closeSockets();
// Receive thread
void receiveLoop();
// Discovery
void discoveryLoop();
std::vector<uint8_t> buildSPDPAnnouncementMessage();
void sendSPDPAnnouncement();
void sendSPDPAnnouncementTo(const std::array<uint8_t, 4> &address, uint16_t port);
void sendSEDPAnnouncement(const std::string &topicName, bool isWriter,
const std::vector<std::string> &partitions = {});
void resendStoredSEDPTo(const GuidPrefix_t &remoteGuidPrefix,
const std::array<uint8_t, 4> &destAddr,
uint16_t destPort);
void sendPeriodicSEDPHeartbeats();
void sendInitialAcknacks(const GuidPrefix_t &remoteGuidPrefix,
const std::array<uint8_t, 4> &destAddr,
uint16_t destPort);
// RTPS message building
std::vector<uint8_t> buildRtpsHeader();
std::vector<uint8_t> buildDataSubmessage(const EntityId_t &writerId,
const EntityId_t &readerId,
uint64_t sequenceNumber,
const std::vector<uint8_t> &payload);
std::vector<uint8_t> buildHeartbeatSubmessage(const EntityId_t &writerId,
const EntityId_t &readerId,
uint64_t firstSeqNum,
uint64_t lastSeqNum,
uint32_t count);
std::vector<uint8_t> buildAckNackSubmessage(const EntityId_t &readerId,
const EntityId_t &writerId,
uint64_t firstSeqNum,
uint64_t lastSeqNum,
uint32_t count);
// RTPS message parsing
void parseRtpsMessage(const uint8_t *data, size_t len,
const std::array<uint8_t, 4> &senderAddr,
uint16_t senderPort);
void parseDataSubmessage(const uint8_t *data, size_t len,
uint8_t flags,
bool littleEndian,
const GuidPrefix_t &senderGuid);
void parseHeartbeat(const uint8_t *data, size_t len,
uint8_t flags,
bool littleEndian,
const GuidPrefix_t &senderGuid,
const std::array<uint8_t, 4> &senderAddr,
uint16_t senderPort);
void parseAckNack(const uint8_t *data, size_t len,
bool littleEndian,
const GuidPrefix_t &senderGuid,
const uint8_t *senderAddr,
uint16_t senderPort);
void parseSPDP(const uint8_t *data, size_t len,
bool littleEndian,
const GuidPrefix_t &senderGuid);
void parseSEDPPublication(const uint8_t *data, size_t len,
bool littleEndian,
const GuidPrefix_t &senderGuid);
void parseSEDPSubscription(const uint8_t *data, size_t len,
bool littleEndian,
const GuidPrefix_t &senderGuid);
void parseTlsReply(const uint8_t *data, size_t len);
void parseTlsRequest(const uint8_t *data, size_t len,
const GuidPrefix_t &senderGuid);
void send_type_lookup_reply(const GuidPrefix_t& remoteGuid,
const std::array<uint8_t, 4>& metatrafficAddr,
uint16_t metatrafficPort,
const std::array<uint8_t, 24>& relatedRequestId,
const std::string& typeName,
const std::vector<std::string>& fieldNames,
const std::vector<uint8_t>& fieldKinds,
const std::vector<ParsedTypeLookupReply::NestedFieldSchema>& nestedSchemas = {});
// Topic lookup from writer GUID
std::string lookupTopicForWriter(const GuidPrefix_t &prefix,
const EntityId_t &entityId) const;
// Participant lookup
bool getParticipantLocator(const GuidPrefix_t &prefix,
std::array<uint8_t, 4> &address,
uint16_t &port) const;
// Send helpers
void sendToMulticast(const std::vector<uint8_t> &data, bool discovery);
void sendToUnicast(const std::vector<uint8_t> &data,
const std::array<uint8_t, 4> &address,
uint16_t port,
bool discovery);
// Configuration
uint32_t domainId_;
uint32_t participantId_;
GuidPrefix_t guidPrefix_;
DiscoveryConfig discovery_config_;
bool discovery_config_set_{false};
std::array<uint8_t, 4> selected_local_ip_{127, 0, 0, 1};
std::string discovery_config_path_;
bool discovery_auto_reload_enabled_{false};
std::chrono::steady_clock::time_point last_discovery_reload_check_{};
std::filesystem::file_time_type discovery_config_mtime_{};
std::string discovery_reload_error_;
// Entity ID counters
std::atomic<uint32_t> nextWriterId_{0x100};
std::atomic<uint32_t> nextReaderId_{0x100};
// Global sequence number for SPDP only.
std::atomic<uint64_t> sequenceNumber_{1};
// Per-writer sequence counters for user-data writers.
// Each user writer has its own independent sequence space starting at 1.
std::map<std::string, uint64_t> writerSeqNums_; // topic -> next seqNum
// Sockets (file descriptors)
int metatrafficMulticastFd_{-1};
int metatrafficUnicastFd_{-1};
int usertrafficMulticastFd_{-1};
int usertrafficUnicastFd_{-1};
// Thread management
std::atomic<bool> running_{false};
std::mutex stop_mutex_;
std::condition_variable stop_cv_;
std::thread receiveThread_;
std::thread discoveryThread_;
// Mutex for thread safety (recursive to allow nested locking)
mutable std::recursive_mutex mutex_;
// Subscribed topics
std::set<std::string> subscribedTopics_;
// Writer EntityIds per topic
std::map<std::string, EntityId_t> writerEntities_;
// Writer type names per topic (for SEDP announcements)
std::map<std::string, std::string> writerTypeNames_;
// Writer partitions per topic
std::map<std::string, std::vector<std::string>> writerPartitions_;
// Reader EntityIds per topic
std::map<std::string, EntityId_t> readerEntities_;
// Reader type names per topic (for SEDP announcements)
std::map<std::string, std::string> readerTypeNames_;
// Reader partitions per topic
std::map<std::string, std::vector<std::string>> readerPartitions_;
// Writer QoS per topic (for SEDP announcements)
std::map<std::string, dcps::DataWriterQos> writerQos_;
// Reader QoS per topic (for SEDP announcements)
std::map<std::string, dcps::DataReaderQos> readerQos_;
// Discovered participants (keyed by GUID prefix)
std::map<std::array<uint8_t, 12>, DiscoveredParticipant> discoveredParticipants_;
// Discovered writers (keyed by topic name)
std::map<std::string, std::vector<DiscoveredWriter>> discoveredWriters_;
// Discovered readers (keyed by topic name)
std::map<std::string, std::vector<DiscoveredReader>> discoveredReaders_;
// GUID key for mapping
struct GUIDKey
{
GuidPrefix_t prefix;
EntityId_t entityId;
bool operator<(const GUIDKey &other) const;
};
// Writer GUID to topic mapping
std::map<GUIDKey, std::string> writerToTopic_;
// Per-writer sequence tracking for reliable delivery
struct WriterState
{
uint64_t highestReceivedSeqNum{0};
std::set<uint64_t> receivedSeqNums;
uint64_t firstAvailable{1};
uint64_t lastAvailable{0};
std::array<uint8_t, 4> unicastAddress{};
uint16_t unicastPort{0};
};
std::map<GUIDKey, WriterState> writerStates_;
// Sent sample cache for retransmission (per-topic → seqNum → sample)
struct SentSample
{
std::string topicName;
std::vector<uint8_t> serializedData; // XCDR1 payload (universal fallback)
std::vector<uint8_t> xcdr2Data; // XCDR2 payload (for XCDR2-only readers)
uint64_t sequenceNumber;
std::chrono::steady_clock::time_point timestamp;
};
std::map<std::string, std::map<uint64_t, SentSample>> sentSamples_;
static constexpr size_t MAX_SENT_SAMPLES_PER_TOPIC = 600;
// ACKNACK counters
std::map<GUIDKey, uint32_t> acknackCounts_;
// TRANSIENT_LOCAL reorder buffer: buffer out-of-order samples and
// deliver in seqNum order so historical data arrives before live data.
std::map<GUIDKey, std::map<uint64_t, std::vector<uint8_t>>> reorderBuffers_;
std::map<GUIDKey, uint64_t> nextDeliverSeqNum_;
std::map<GUIDKey, std::chrono::steady_clock::time_point> reorderStartTime_;
// ACKNACK rate-limiting: suppress redundant responses to HEARTBEATs.
struct AcknackState {
std::chrono::steady_clock::time_point lastSendTime{};
uint32_t lastRespondedHBCount{0};
bool lastWasCaughtUp{false};
};
std::map<GUIDKey, AcknackState> acknackRateLimit_;
// TypeLookup Service state (DDS-XTypes §7.6.3)
TypeLookupCallback tlsCallback_;
std::atomic<uint64_t> tlsSeqNum_{1};
std::map<uint64_t, std::string> tlsPendingTopics_;
std::set<std::string> tlsRequestedTopics_;
// SEDP message cache for retransmission
struct SEDPMessage
{
std::vector<uint8_t> data;
uint64_t sequenceNumber;
bool isWriter; // true for publication, false for subscription
};
std::map<std::string, SEDPMessage> sedpPublications_; // topic -> message
std::map<std::string, SEDPMessage> sedpSubscriptions_; // topic -> message
uint64_t sedpPubSeqNum_{0};
uint64_t sedpSubSeqNum_{0};
// Per-topic traffic statistics (user data, not discovery)
std::map<std::string, TopicTrafficStats> trafficStats_;
// Callbacks
DataReceivedCallback dataCallback_;
ParticipantDiscoveredCallback participantCallback_;
WriterDiscoveredCallback writerCallback_;
ReaderDiscoveredCallback readerCallback_;
};
} // namespace astutedds::rtps
#endif // ASTUTEDDS_RTPS_UDP_TRANSPORT_HPP