File writer_filter.hpp

File List > astutedds > dcps > writer_filter.hpp

Go to the documentation of this file

//
// Copyright (c) 2026, Astute Systems PTY LTD
//
// This file is part of the Astute DDS developed by Astute Systems.
//
// See the commercial LICENSE file in the project root for full license details.
//
// @file writer_filter.hpp
// @brief Writer-side content filtering support
//
// Implements content filtering on the writer side to reduce network traffic
// by only sending data that matches reader filter expressions.
// Reference: DDS 1.4 Section 2.2.3
//

#ifndef ASTUTEDDS_DCPS_WRITER_FILTER_HPP
#define ASTUTEDDS_DCPS_WRITER_FILTER_HPP

#include <astutedds/rtps/rtps_types.hpp>

#include <any>
#include <atomic>
#include <cstdint>
#include <functional>
#include <memory>
#include <mutex>
#include <optional>
#include <string>
#include <unordered_map>
#include <variant>
#include <vector>

namespace astutedds::dcps
{

// ============================================================================
// Filter Expression Types
// ============================================================================

using FilterValue = std::variant<std::monostate,  // null
                                 bool, int32_t, int64_t, float, double, std::string>;

enum class FilterOperator
{
    EQUAL,
    NOT_EQUAL,
    LESS_THAN,
    LESS_EQUAL,
    GREATER_THAN,
    GREATER_EQUAL,
    LIKE,     
    BETWEEN,  
    IN,       
    IS_NULL,  
    IS_NOT_NULL
};

enum class LogicalOperator
{
    AND,
    OR,
    NOT
};

// ============================================================================
// Filter Condition Nodes
// ============================================================================

struct FilterNode
{
    virtual ~FilterNode() = default;
    virtual bool evaluate(const std::function<FilterValue(const std::string&)>& field_accessor) const = 0;
    virtual std::unique_ptr<FilterNode> clone() const = 0;
};

struct ComparisonNode : FilterNode
{
    std::string field_name;
    FilterOperator op;
    FilterValue value;
    FilterValue value2;  

    bool evaluate(const std::function<FilterValue(const std::string&)>& field_accessor) const override;
    std::unique_ptr<FilterNode> clone() const override;
};

struct LogicalNode : FilterNode
{
    LogicalOperator op;
    std::vector<std::unique_ptr<FilterNode>> children;

    bool evaluate(const std::function<FilterValue(const std::string&)>& field_accessor) const override;
    std::unique_ptr<FilterNode> clone() const override;
};

struct InNode : FilterNode
{
    std::string field_name;
    std::vector<FilterValue> values;
    bool negated{false};  

    bool evaluate(const std::function<FilterValue(const std::string&)>& field_accessor) const override;
    std::unique_ptr<FilterNode> clone() const override;
};

// ============================================================================
// Filter Expression Parser
// ============================================================================

class FilterExpressionParser
{
public:
    static std::unique_ptr<FilterNode> parse(const std::string& expression,
                                             const std::vector<std::string>& parameters = {});

    static std::string get_last_error();

private:
    static std::string last_error_;
};

// ============================================================================
// Compiled Filter
// ============================================================================

class CompiledFilter
{
public:
    CompiledFilter() = default;
    explicit CompiledFilter(std::unique_ptr<FilterNode> root);

    CompiledFilter(const CompiledFilter& other);
    CompiledFilter& operator=(const CompiledFilter& other);
    CompiledFilter(CompiledFilter&&) = default;
    CompiledFilter& operator=(CompiledFilter&&) = default;

    bool is_valid() const { return root_ != nullptr; }

    bool evaluate(const std::function<FilterValue(const std::string&)>& field_accessor) const;

    std::vector<std::string> get_field_names() const;

private:
    void collect_field_names(const FilterNode* node, std::vector<std::string>& names) const;

    std::unique_ptr<FilterNode> root_;
};

// ============================================================================
// Reader Filter Info
// ============================================================================

struct ReaderFilterInfo
{
    rtps::GUID_t reader_guid;
    std::string filter_class_name;  
    std::string filter_expression;
    std::vector<std::string> expression_parameters;
    CompiledFilter compiled_filter;
    bool filter_enabled{true};

    static std::optional<ReaderFilterInfo> create(const rtps::GUID_t& guid, const std::string& class_name,
                                                  const std::string& expression,
                                                  const std::vector<std::string>& parameters);
};

// ============================================================================
// Writer Filter Manager
// ============================================================================

using FieldAccessor = std::function<FilterValue(const void* data, const std::string& field_name)>;

class WriterFilterManager
{
public:
    explicit WriterFilterManager(FieldAccessor field_accessor);

    ~WriterFilterManager();

    // Non-copyable
    WriterFilterManager(const WriterFilterManager&) = delete;
    WriterFilterManager& operator=(const WriterFilterManager&) = delete;

    void register_reader_filter(const ReaderFilterInfo& info);

    void unregister_reader_filter(const rtps::GUID_t& reader_guid);

    bool update_filter_parameters(const rtps::GUID_t& reader_guid, const std::vector<std::string>& parameters);

    std::vector<rtps::GUID_t> get_matching_readers(const void* data) const;

    bool should_send_to_reader(const rtps::GUID_t& reader_guid, const void* data) const;

    bool has_filters() const;

    size_t filter_count() const;

    void set_enabled(bool enabled);
    bool is_enabled() const { return enabled_.load(); }

    struct Stats
    {
        uint64_t samples_evaluated{0};
        uint64_t samples_filtered{0};    
        uint64_t filter_evaluations{0};  
        uint64_t filter_passes{0};       
    };
    Stats get_stats() const;

private:
    FieldAccessor field_accessor_;
    std::atomic<bool> enabled_{true};

    mutable std::mutex mutex_;
    std::unordered_map<std::string, ReaderFilterInfo> reader_filters_;  // Key: GUID string

    mutable Stats stats_;

    std::string guid_to_string(const rtps::GUID_t& guid) const;
};

// ============================================================================
// Type-Specific Filter Helpers
// ============================================================================

template <typename T>
class TypedFieldAccessor
{
public:
    using AccessorFunc = std::function<FilterValue(const T&, const std::string&)>;

    void register_field(const std::string& name, std::function<FilterValue(const T&)> getter)
    {
        fields_[name] = [getter](const T& data, const std::string&) { return getter(data); };
    }

    FilterValue get_field(const T& data, const std::string& field_name) const
    {
        auto it = fields_.find(field_name);
        if (it == fields_.end())
        {
            return std::monostate{};
        }
        return it->second(data, field_name);
    }

    FieldAccessor to_field_accessor() const
    {
        return [this](const void* data, const std::string& field_name) -> FilterValue
        {
            if (!data)
                return std::monostate{};
            return get_field(*static_cast<const T*>(data), field_name);
        };
    }

private:
    std::unordered_map<std::string, AccessorFunc> fields_;
};

// ============================================================================
// Filter Propagation via Discovery
// ============================================================================

struct ContentFilterProperty
{
    std::string content_filtered_topic_name;
    std::string related_topic_name;
    std::string filter_class_name;
    std::string filter_expression;
    std::vector<std::string> expression_parameters;

    std::vector<uint8_t> serialize() const;

    static std::optional<ContentFilterProperty> deserialize(const std::vector<uint8_t>& data);
};

}  // namespace astutedds::dcps

#endif  // ASTUTEDDS_DCPS_WRITER_FILTER_HPP