Implementing BIT Subscribers

This guide shows how to create custom applications that subscribe to BIT test results in any programming language.

Overview

The BIT Manager publishes test results via Zenoh pub/sub when run with the --zenoh flag. Any application can subscribe to these results by:

  1. Connecting to the Zenoh network
  2. Subscribing to BIT topics (bit/{hostname}/{PBIT|CBIT|FBIT})
  3. Deserializing protobuf messages
  4. Processing test results

This enables you to: - Build custom dashboards and monitoring tools - Integrate BIT results into existing systems - Create language-specific clients (Python, C++, JavaScript, etc.) - Store results in databases or analytics platforms - Trigger alerts and automated responses

Architecture

┌─────────────────┐                    ┌──────────────────┐
│  BIT Manager    │  Zenoh pub/sub     │  Your Subscriber │
│  (Publisher)    │───────────────────>│  (Any Language)  │
│                 │  protobuf msgs     │                  │
└─────────────────┘                    └──────────────────┘
         ├─ bit/hostname/PBIT
         ├─ bit/hostname/CBIT
         └─ bit/hostname/FBIT

Example Implementations

Complete working examples are provided in the examples/ directory:

  • Rust: examples/bit_monitor_rust/ - Self-contained Rust subscriber
  • C++: examples/bit_monitor_cpp/ - C++17 with zenoh-c
  • Python: examples/bit_monitor_python/ - Python 3.10+ with zenoh-python

Each example includes: - Full source code - Build instructions - README with usage details - Shared protobuf definitions (examples/proto/bit.proto)

Topic Structure

Subscription Pattern

Subscribe to topics in the format:

bit/{hostname}/*

Or specifically:

bit/{hostname}/PBIT  # Power-On Built-In Tests
bit/{hostname}/CBIT  # Continuous Built-In Tests
bit/{hostname}/FBIT  # Factory Built-In Tests

Message Format

All messages are serialized using Protocol Buffers (bit.proto):

message BuiltInTest {
    uint64 timestamp = 1;
    optional PbitResult pbit = 2;
    optional CbitResult cbit = 3;
    optional FbitResult fbit = 4;
}

message PbitResult {
    TestResult result = 1;
}

message TestResult {
    string test_name = 1;
    string description = 2;
    bool success = 3;
    optional string error_message = 4;
}

Quick Start Guides

Python Subscriber

import zenoh
from bit_pb2 import BuiltInTest

# Open Zenoh session
session = zenoh.open()

# Subscribe to all test types for a specific hostname
hostname = "my-device"
subscriber = session.declare_subscriber(f"bit/{hostname}/*")

# Process messages
for sample in subscriber:
    # Deserialize protobuf
    result = BuiltInTest()
    result.ParseFromString(bytes(sample.payload))

    # Determine test type
    if result.HasField('pbit'):
        test = result.pbit.result
        test_type = "PBIT"
    elif result.HasField('cbit'):
        test = result.cbit.result
        test_type = "CBIT"
    elif result.HasField('fbit'):
        test = result.fbit.result
        test_type = "FBIT"

    # Process result
    status = "PASS" if test.success else "FAIL"
    print(f"{test_type} [{status}] {test.test_name}: {test.description}")

Installation:

pip install eclipse-zenoh protobuf
protoc --python_out=. examples/proto/bit.proto

See examples/bit_monitor_python/ for the complete implementation.

C++ Subscriber

#include <zenoh.hxx>
#include "bit.pb.h"

int main() {
    // Open Zenoh session
    auto config = zenoh::Config::create_default();
    auto session = zenoh::Session::open(std::move(config));

    // Subscribe to topics
    std::string hostname = "my-device";
    std::string key_expr = "bit/" + hostname + "/*";

    auto subscriber = session.declare_subscriber(
        key_expr,
        [](const zenoh::Sample& sample) {
            // Deserialize protobuf
            bit::BuiltInTest result;
            auto payload = sample.get_payload();
            result.ParseFromArray(payload.data(), payload.size());

            // Process result
            std::string test_type;
            const bit::TestResult* test = nullptr;

            if (result.has_pbit()) {
                test = &result.pbit().result();
                test_type = "PBIT";
            } else if (result.has_cbit()) {
                test = &result.cbit().result();
                test_type = "CBIT";
            } else if (result.has_fbit()) {
                test = &result.fbit().result();
                test_type = "FBIT";
            }

            std::string status = test->success() ? "PASS" : "FAIL";
            std::cout << test_type << " [" << status << "] "
                      << test->test_name() << ": "
                      << test->description() << std::endl;
        }
    );

    // Keep running
    while (true) {
        std::this_thread::sleep_for(std::chrono::seconds(1));
    }
}

Building:

# Install dependencies
sudo apt install libzenohc-dev protobuf-compiler

# Generate protobuf
protoc --cpp_out=. examples/proto/bit.proto

# Compile
g++ -std=c++17 subscriber.cpp bit.pb.cc -lzenohc -lprotobuf -o subscriber

See examples/bit_monitor_cpp/ for the complete implementation.

Rust Subscriber

use zenoh::prelude::*;
use protobuf::Message;

#[tokio::main]
async fn main() {
    // Open Zenoh session
    let session = zenoh::open(zenoh::Config::default()).await.unwrap();

    // Subscribe to topics
    let hostname = "my-device";
    let key_expr = format!("bit/{hostname}/*");
    let subscriber = session.declare_subscriber(&key_expr).await.unwrap();

    // Process messages
    while let Ok(sample) = subscriber.recv_async().await {
        // Deserialize protobuf
        let result = bit_proto::BuiltInTest::parse_from_bytes(
            &sample.payload().to_bytes()
        ).unwrap();

        // Determine test type and extract result
        let (test_type, test) = if result.has_pbit() {
            ("PBIT", result.pbit().result.clone())
        } else if result.has_cbit() {
            ("CBIT", result.cbit().result.clone())
        } else if result.has_fbit() {
            ("FBIT", result.fbit().result.clone())
        } else {
            continue;
        };

        // Process result
        let status = if test.success { "PASS" } else { "FAIL" };
        println!("{} [{}] {}: {}",
            test_type, status, test.test_name, test.description);
    }
}

Dependencies (Cargo.toml):

[dependencies]
zenoh = "1.3.3"
protobuf = "3.0.0"
tokio = { version = "1", features = ["full"] }

[build-dependencies]
protobuf-codegen = "3"

See examples/bit_monitor_rust/ for the complete implementation.

Protocol Buffer Schema

The shared protobuf schema is located at examples/proto/bit.proto. To generate bindings for your language:

# Python
protoc --python_out=. examples/proto/bit.proto

# C++
protoc --cpp_out=. examples/proto/bit.proto

# Java
protoc --java_out=. examples/proto/bit.proto

# C#
protoc --csharp_out=. examples/proto/bit.proto

# JavaScript
protoc --js_out=. examples/proto/bit.proto

# Go
protoc --go_out=. examples/proto/bit.proto

For Rust, use the protobuf-codegen crate in your build.rs.

Zenoh Configuration

Default Configuration

By default, subscribers use peer-to-peer discovery on the local network. No configuration required.

Custom Configuration

For advanced setups, create a Zenoh config file:

# config.yaml
mode: client
connect:
  endpoints:
    - tcp/192.168.1.100:7447  # Connect to specific router

Then load it in your application:

# Python
config = zenoh.Config.from_file("config.yaml")
session = zenoh.open(config)
// C++
auto config = zenoh::Config::from_file("config.yaml");
auto session = zenoh::Session::open(std::move(config));
// Rust
let config = zenoh::Config::from_file("config.yaml").unwrap();
let session = zenoh::open(config).await.unwrap();

Environment Variable

Set ZENOH_CONFIG to point to your configuration file:

export ZENOH_CONFIG=/etc/bit/config.yaml

The example monitors will automatically use this if set.

Common Integration Patterns

Filtering by Test Type

# Subscribe only to CBIT tests
subscriber = session.declare_subscriber(f"bit/{hostname}/CBIT")

Filtering by Test Name

for sample in subscriber:
    result = BuiltInTest()
    result.ParseFromString(bytes(sample.payload))

    test_name = (result.pbit.result.test_name if result.HasField('pbit')
                 else result.cbit.result.test_name if result.HasField('cbit')
                 else result.fbit.result.test_name)

    # Only process CPU tests
    if 'cpu' in test_name.lower():
        process_test(result)

Storing Results in Database

import sqlite3

db = sqlite3.connect('bit_results.db')
db.execute('''CREATE TABLE IF NOT EXISTS results
              (timestamp INTEGER, hostname TEXT, test_type TEXT,
               test_name TEXT, success INTEGER, description TEXT)''')

for sample in subscriber:
    result = BuiltInTest()
    result.ParseFromString(bytes(sample.payload))

    # Extract fields
    hostname = sample.key_expr.split('/')[1]
    test_type = sample.key_expr.split('/')[2]

    if result.HasField('pbit'):
        test = result.pbit.result
    elif result.HasField('cbit'):
        test = result.cbit.result
    else:
        test = result.fbit.result

    # Insert into database
    db.execute('INSERT INTO results VALUES (?,?,?,?,?,?)',
               (result.timestamp, hostname, test_type,
                test.test_name, test.success, test.description))
    db.commit()

Sending Alerts

import requests

for sample in subscriber:
    result = BuiltInTest()
    result.ParseFromString(bytes(sample.payload))

    # Check all test types for failures
    test = None
    if result.HasField('pbit'):
        test = result.pbit.result
    elif result.HasField('cbit'):
        test = result.cbit.result
    elif result.HasField('fbit'):
        test = result.fbit.result

    if test and not test.success:
        # Send alert via webhook
        requests.post('https://alerts.example.com/webhook', json={
            'test_name': test.test_name,
            'description': test.description,
            'error': test.error_message if test.HasField('error_message') else 'Unknown'
        })

Multi-Host Monitoring

# Subscribe to all hosts
subscriber = session.declare_subscriber("bit/*/*/*")

hosts = {}

for sample in subscriber:
    # Extract hostname from topic
    hostname = sample.key_expr.split('/')[1]

    # Track per-host statistics
    if hostname not in hosts:
        hosts[hostname] = {'pass': 0, 'fail': 0}

    result = BuiltInTest()
    result.ParseFromString(bytes(sample.payload))

    # Update statistics
    if result.HasField('pbit'):
        success = result.pbit.result.success
    elif result.HasField('cbit'):
        success = result.cbit.result.success
    else:
        success = result.fbit.result.success

    if success:
        hosts[hostname]['pass'] += 1
    else:
        hosts[hostname]['fail'] += 1

    print(f"Statistics: {hosts}")

Running the Publisher

To enable Zenoh publishing from BIT Manager:

# Start BIT Manager with Zenoh enabled
BIT_CONFIG_PATH=/etc/bit \
BIT_TEST_PATH=/usr/local/lib/bit_manager \
bit-manager --zenoh

# Or as a service (edit /etc/systemd/system/bit_manager.service)
ExecStart=/usr/local/bin/bit-manager --zenoh -c

Testing Your Subscriber

Using the Example Monitors

Test your Zenoh setup by running one of the example monitors:

# Python
cd examples/bit_monitor_python
pip install -r requirements.txt
python bit_monitor.py -hostname=my-device

# C++
cd examples/bit_monitor_cpp
./build-example.sh
./build/bit_monitor_cpp

# Rust
cd examples/bit_monitor_rust
cargo run --release -- -hostname=my-device

Manual Testing

Generate test results manually:

# Run a single test
bit-manager -t pbit_cpu_usage -o --zenoh

# Run all PBIT tests
bit-manager -p --zenoh

Your subscriber should receive and display the results.

Troubleshooting

No Messages Received

Check Zenoh connectivity:

# Install Zenoh tools
cargo install zenoh

# Test publisher
zenoh-pub -k bit/test/PBIT -v "test message"

# Test subscriber
zenoh-sub -k "bit/**"

Verify BIT Manager is publishing:

# Run with Zenoh enabled
bit-manager --zenoh -o -t pbit_cpu_usage

# Check logs
grep -i zenoh /tmp/bit-manager_INFO_*.log

Check firewall rules:

# Zenoh uses UDP multicast for discovery
sudo iptables -L | grep zenoh

Deserialization Errors

Verify protobuf versions match:

# Check protoc version
protoc --version

# Regenerate bindings
cd examples/proto
./generate-protobuf-bindings.sh

Verify message format:

# Inspect raw payload (requires zenoh tools)
zenoh-sub -k "bit/**" --output raw | hexdump -C

Performance Issues

For high-frequency CBIT tests:

# Use async processing
import asyncio

async def process_sample(sample):
    # Your processing code
    pass

async def main():
    subscriber = session.declare_subscriber("bit/*/*/*")

    async for sample in subscriber:
        # Process in background
        asyncio.create_task(process_sample(sample))

asyncio.run(main())

Additional Resources

  • Example Code: See examples/ directory for complete implementations
  • Zenoh Documentation: https://zenoh.io/docs/
  • Protocol Buffers: https://protobuf.dev/
  • BIT Manager Source: bit_manager/src/main.rs - Publisher implementation

Next Steps

  1. Choose a language and review the corresponding example in examples/
  2. Generate protobuf bindings for your language
  3. Implement subscriber logic based on the examples
  4. Test with bit-manager --zenoh
  5. Integrate into your monitoring/alerting system

For questions or contributions, see the repository README.