File writer_filter.hpp
File List > astutedds > dcps > writer_filter.hpp
Go to the documentation of this file
//
// Copyright (c) 2026, Astute Systems PTY LTD
//
// This file is part of the Astute DDS developed by Astute Systems.
//
// See the commercial LICENSE file in the project root for full license details.
//
// @file writer_filter.hpp
// @brief Writer-side content filtering support
//
// Implements content filtering on the writer side to reduce network traffic
// by only sending data that matches reader filter expressions.
// Reference: DDS 1.4 Section 2.2.3
//
#ifndef ASTUTEDDS_DCPS_WRITER_FILTER_HPP
#define ASTUTEDDS_DCPS_WRITER_FILTER_HPP
#include <astutedds/rtps/rtps_types.hpp>
#include <any>
#include <atomic>
#include <cstdint>
#include <functional>
#include <memory>
#include <mutex>
#include <optional>
#include <string>
#include <unordered_map>
#include <variant>
#include <vector>
namespace astutedds::dcps
{
// ============================================================================
// Filter Expression Types
// ============================================================================
using FilterValue = std::variant<std::monostate, // null
bool, int32_t, int64_t, float, double, std::string>;
enum class FilterOperator
{
EQUAL,
NOT_EQUAL,
LESS_THAN,
LESS_EQUAL,
GREATER_THAN,
GREATER_EQUAL,
LIKE,
BETWEEN,
IN,
IS_NULL,
IS_NOT_NULL
};
enum class LogicalOperator
{
AND,
OR,
NOT
};
// ============================================================================
// Filter Condition Nodes
// ============================================================================
struct FilterNode
{
virtual ~FilterNode() = default;
virtual bool evaluate(const std::function<FilterValue(const std::string&)>& field_accessor) const = 0;
virtual std::unique_ptr<FilterNode> clone() const = 0;
};
struct ComparisonNode : FilterNode
{
std::string field_name;
FilterOperator op;
FilterValue value;
FilterValue value2;
bool evaluate(const std::function<FilterValue(const std::string&)>& field_accessor) const override;
std::unique_ptr<FilterNode> clone() const override;
};
struct LogicalNode : FilterNode
{
LogicalOperator op;
std::vector<std::unique_ptr<FilterNode>> children;
bool evaluate(const std::function<FilterValue(const std::string&)>& field_accessor) const override;
std::unique_ptr<FilterNode> clone() const override;
};
struct InNode : FilterNode
{
std::string field_name;
std::vector<FilterValue> values;
bool negated{false};
bool evaluate(const std::function<FilterValue(const std::string&)>& field_accessor) const override;
std::unique_ptr<FilterNode> clone() const override;
};
// ============================================================================
// Filter Expression Parser
// ============================================================================
class FilterExpressionParser
{
public:
static std::unique_ptr<FilterNode> parse(const std::string& expression,
const std::vector<std::string>& parameters = {});
static std::string get_last_error();
private:
static std::string last_error_;
};
// ============================================================================
// Compiled Filter
// ============================================================================
class CompiledFilter
{
public:
CompiledFilter() = default;
explicit CompiledFilter(std::unique_ptr<FilterNode> root);
CompiledFilter(const CompiledFilter& other);
CompiledFilter& operator=(const CompiledFilter& other);
CompiledFilter(CompiledFilter&&) = default;
CompiledFilter& operator=(CompiledFilter&&) = default;
bool is_valid() const { return root_ != nullptr; }
bool evaluate(const std::function<FilterValue(const std::string&)>& field_accessor) const;
std::vector<std::string> get_field_names() const;
private:
void collect_field_names(const FilterNode* node, std::vector<std::string>& names) const;
std::unique_ptr<FilterNode> root_;
};
// ============================================================================
// Reader Filter Info
// ============================================================================
struct ReaderFilterInfo
{
rtps::GUID_t reader_guid;
std::string filter_class_name;
std::string filter_expression;
std::vector<std::string> expression_parameters;
CompiledFilter compiled_filter;
bool filter_enabled{true};
static std::optional<ReaderFilterInfo> create(const rtps::GUID_t& guid, const std::string& class_name,
const std::string& expression,
const std::vector<std::string>& parameters);
};
// ============================================================================
// Writer Filter Manager
// ============================================================================
using FieldAccessor = std::function<FilterValue(const void* data, const std::string& field_name)>;
class WriterFilterManager
{
public:
explicit WriterFilterManager(FieldAccessor field_accessor);
~WriterFilterManager();
// Non-copyable
WriterFilterManager(const WriterFilterManager&) = delete;
WriterFilterManager& operator=(const WriterFilterManager&) = delete;
void register_reader_filter(const ReaderFilterInfo& info);
void unregister_reader_filter(const rtps::GUID_t& reader_guid);
bool update_filter_parameters(const rtps::GUID_t& reader_guid, const std::vector<std::string>& parameters);
std::vector<rtps::GUID_t> get_matching_readers(const void* data) const;
bool should_send_to_reader(const rtps::GUID_t& reader_guid, const void* data) const;
bool has_filters() const;
size_t filter_count() const;
void set_enabled(bool enabled);
bool is_enabled() const { return enabled_.load(); }
struct Stats
{
uint64_t samples_evaluated{0};
uint64_t samples_filtered{0};
uint64_t filter_evaluations{0};
uint64_t filter_passes{0};
};
Stats get_stats() const;
private:
FieldAccessor field_accessor_;
std::atomic<bool> enabled_{true};
mutable std::mutex mutex_;
std::unordered_map<std::string, ReaderFilterInfo> reader_filters_; // Key: GUID string
mutable Stats stats_;
std::string guid_to_string(const rtps::GUID_t& guid) const;
};
// ============================================================================
// Type-Specific Filter Helpers
// ============================================================================
template <typename T>
class TypedFieldAccessor
{
public:
using AccessorFunc = std::function<FilterValue(const T&, const std::string&)>;
void register_field(const std::string& name, std::function<FilterValue(const T&)> getter)
{
fields_[name] = [getter](const T& data, const std::string&) { return getter(data); };
}
FilterValue get_field(const T& data, const std::string& field_name) const
{
auto it = fields_.find(field_name);
if (it == fields_.end())
{
return std::monostate{};
}
return it->second(data, field_name);
}
FieldAccessor to_field_accessor() const
{
return [this](const void* data, const std::string& field_name) -> FilterValue
{
if (!data)
return std::monostate{};
return get_field(*static_cast<const T*>(data), field_name);
};
}
private:
std::unordered_map<std::string, AccessorFunc> fields_;
};
// ============================================================================
// Filter Propagation via Discovery
// ============================================================================
struct ContentFilterProperty
{
std::string content_filtered_topic_name;
std::string related_topic_name;
std::string filter_class_name;
std::string filter_expression;
std::vector<std::string> expression_parameters;
std::vector<uint8_t> serialize() const;
static std::optional<ContentFilterProperty> deserialize(const std::vector<uint8_t>& data);
};
} // namespace astutedds::dcps
#endif // ASTUTEDDS_DCPS_WRITER_FILTER_HPP