Implementing BIT Subscribers¶
This guide shows how to create custom applications that subscribe to BIT test results in any programming language.
Overview¶
The BIT Manager publishes test results via Zenoh pub/sub when run with the --zenoh flag. Any application can subscribe to these results by:
- Connecting to the Zenoh network
- Subscribing to BIT topics (
bit/{hostname}/{PBIT|CBIT|FBIT}) - Deserializing protobuf messages
- Processing test results
This enables you to: - Build custom dashboards and monitoring tools - Integrate BIT results into existing systems - Create language-specific clients (Python, C++, JavaScript, etc.) - Store results in databases or analytics platforms - Trigger alerts and automated responses
Architecture¶
┌─────────────────┐ ┌──────────────────┐
│ BIT Manager │ Zenoh pub/sub │ Your Subscriber │
│ (Publisher) │───────────────────>│ (Any Language) │
│ │ protobuf msgs │ │
└─────────────────┘ └──────────────────┘
│
├─ bit/hostname/PBIT
├─ bit/hostname/CBIT
└─ bit/hostname/FBIT
Example Implementations¶
Complete working examples are provided in the examples/ directory:
- Rust:
examples/bit_monitor_rust/- Self-contained Rust subscriber - C++:
examples/bit_monitor_cpp/- C++17 with zenoh-c - Python:
examples/bit_monitor_python/- Python 3.10+ with zenoh-python
Each example includes:
- Full source code
- Build instructions
- README with usage details
- Shared protobuf definitions (examples/proto/bit.proto)
Topic Structure¶
Subscription Pattern¶
Subscribe to topics in the format:
Or specifically:
bit/{hostname}/PBIT # Power-On Built-In Tests
bit/{hostname}/CBIT # Continuous Built-In Tests
bit/{hostname}/FBIT # Factory Built-In Tests
Message Format¶
All messages are serialized using Protocol Buffers (bit.proto):
message BuiltInTest {
uint64 timestamp = 1;
optional PbitResult pbit = 2;
optional CbitResult cbit = 3;
optional FbitResult fbit = 4;
}
message PbitResult {
TestResult result = 1;
}
message TestResult {
string test_name = 1;
string description = 2;
bool success = 3;
optional string error_message = 4;
}
Quick Start Guides¶
Python Subscriber¶
import zenoh
from bit_pb2 import BuiltInTest
# Open Zenoh session
session = zenoh.open()
# Subscribe to all test types for a specific hostname
hostname = "my-device"
subscriber = session.declare_subscriber(f"bit/{hostname}/*")
# Process messages
for sample in subscriber:
# Deserialize protobuf
result = BuiltInTest()
result.ParseFromString(bytes(sample.payload))
# Determine test type
if result.HasField('pbit'):
test = result.pbit.result
test_type = "PBIT"
elif result.HasField('cbit'):
test = result.cbit.result
test_type = "CBIT"
elif result.HasField('fbit'):
test = result.fbit.result
test_type = "FBIT"
# Process result
status = "PASS" if test.success else "FAIL"
print(f"{test_type} [{status}] {test.test_name}: {test.description}")
Installation:
See examples/bit_monitor_python/ for the complete implementation.
C++ Subscriber¶
#include <zenoh.hxx>
#include "bit.pb.h"
int main() {
// Open Zenoh session
auto config = zenoh::Config::create_default();
auto session = zenoh::Session::open(std::move(config));
// Subscribe to topics
std::string hostname = "my-device";
std::string key_expr = "bit/" + hostname + "/*";
auto subscriber = session.declare_subscriber(
key_expr,
[](const zenoh::Sample& sample) {
// Deserialize protobuf
bit::BuiltInTest result;
auto payload = sample.get_payload();
result.ParseFromArray(payload.data(), payload.size());
// Process result
std::string test_type;
const bit::TestResult* test = nullptr;
if (result.has_pbit()) {
test = &result.pbit().result();
test_type = "PBIT";
} else if (result.has_cbit()) {
test = &result.cbit().result();
test_type = "CBIT";
} else if (result.has_fbit()) {
test = &result.fbit().result();
test_type = "FBIT";
}
std::string status = test->success() ? "PASS" : "FAIL";
std::cout << test_type << " [" << status << "] "
<< test->test_name() << ": "
<< test->description() << std::endl;
}
);
// Keep running
while (true) {
std::this_thread::sleep_for(std::chrono::seconds(1));
}
}
Building:
# Install dependencies
sudo apt install libzenohc-dev protobuf-compiler
# Generate protobuf
protoc --cpp_out=. examples/proto/bit.proto
# Compile
g++ -std=c++17 subscriber.cpp bit.pb.cc -lzenohc -lprotobuf -o subscriber
See examples/bit_monitor_cpp/ for the complete implementation.
Rust Subscriber¶
use zenoh::prelude::*;
use protobuf::Message;
#[tokio::main]
async fn main() {
// Open Zenoh session
let session = zenoh::open(zenoh::Config::default()).await.unwrap();
// Subscribe to topics
let hostname = "my-device";
let key_expr = format!("bit/{hostname}/*");
let subscriber = session.declare_subscriber(&key_expr).await.unwrap();
// Process messages
while let Ok(sample) = subscriber.recv_async().await {
// Deserialize protobuf
let result = bit_proto::BuiltInTest::parse_from_bytes(
&sample.payload().to_bytes()
).unwrap();
// Determine test type and extract result
let (test_type, test) = if result.has_pbit() {
("PBIT", result.pbit().result.clone())
} else if result.has_cbit() {
("CBIT", result.cbit().result.clone())
} else if result.has_fbit() {
("FBIT", result.fbit().result.clone())
} else {
continue;
};
// Process result
let status = if test.success { "PASS" } else { "FAIL" };
println!("{} [{}] {}: {}",
test_type, status, test.test_name, test.description);
}
}
Dependencies (Cargo.toml):
[dependencies]
zenoh = "1.3.3"
protobuf = "3.0.0"
tokio = { version = "1", features = ["full"] }
[build-dependencies]
protobuf-codegen = "3"
See examples/bit_monitor_rust/ for the complete implementation.
Protocol Buffer Schema¶
The shared protobuf schema is located at examples/proto/bit.proto. To generate bindings for your language:
# Python
protoc --python_out=. examples/proto/bit.proto
# C++
protoc --cpp_out=. examples/proto/bit.proto
# Java
protoc --java_out=. examples/proto/bit.proto
# C#
protoc --csharp_out=. examples/proto/bit.proto
# JavaScript
protoc --js_out=. examples/proto/bit.proto
# Go
protoc --go_out=. examples/proto/bit.proto
For Rust, use the protobuf-codegen crate in your build.rs.
Zenoh Configuration¶
Default Configuration¶
By default, subscribers use peer-to-peer discovery on the local network. No configuration required.
Custom Configuration¶
For advanced setups, create a Zenoh config file:
# config.yaml
mode: client
connect:
endpoints:
- tcp/192.168.1.100:7447 # Connect to specific router
Then load it in your application:
// C++
auto config = zenoh::Config::from_file("config.yaml");
auto session = zenoh::Session::open(std::move(config));
// Rust
let config = zenoh::Config::from_file("config.yaml").unwrap();
let session = zenoh::open(config).await.unwrap();
Environment Variable¶
Set ZENOH_CONFIG to point to your configuration file:
The example monitors will automatically use this if set.
Common Integration Patterns¶
Filtering by Test Type¶
Filtering by Test Name¶
for sample in subscriber:
result = BuiltInTest()
result.ParseFromString(bytes(sample.payload))
test_name = (result.pbit.result.test_name if result.HasField('pbit')
else result.cbit.result.test_name if result.HasField('cbit')
else result.fbit.result.test_name)
# Only process CPU tests
if 'cpu' in test_name.lower():
process_test(result)
Storing Results in Database¶
import sqlite3
db = sqlite3.connect('bit_results.db')
db.execute('''CREATE TABLE IF NOT EXISTS results
(timestamp INTEGER, hostname TEXT, test_type TEXT,
test_name TEXT, success INTEGER, description TEXT)''')
for sample in subscriber:
result = BuiltInTest()
result.ParseFromString(bytes(sample.payload))
# Extract fields
hostname = sample.key_expr.split('/')[1]
test_type = sample.key_expr.split('/')[2]
if result.HasField('pbit'):
test = result.pbit.result
elif result.HasField('cbit'):
test = result.cbit.result
else:
test = result.fbit.result
# Insert into database
db.execute('INSERT INTO results VALUES (?,?,?,?,?,?)',
(result.timestamp, hostname, test_type,
test.test_name, test.success, test.description))
db.commit()
Sending Alerts¶
import requests
for sample in subscriber:
result = BuiltInTest()
result.ParseFromString(bytes(sample.payload))
# Check all test types for failures
test = None
if result.HasField('pbit'):
test = result.pbit.result
elif result.HasField('cbit'):
test = result.cbit.result
elif result.HasField('fbit'):
test = result.fbit.result
if test and not test.success:
# Send alert via webhook
requests.post('https://alerts.example.com/webhook', json={
'test_name': test.test_name,
'description': test.description,
'error': test.error_message if test.HasField('error_message') else 'Unknown'
})
Multi-Host Monitoring¶
# Subscribe to all hosts
subscriber = session.declare_subscriber("bit/*/*/*")
hosts = {}
for sample in subscriber:
# Extract hostname from topic
hostname = sample.key_expr.split('/')[1]
# Track per-host statistics
if hostname not in hosts:
hosts[hostname] = {'pass': 0, 'fail': 0}
result = BuiltInTest()
result.ParseFromString(bytes(sample.payload))
# Update statistics
if result.HasField('pbit'):
success = result.pbit.result.success
elif result.HasField('cbit'):
success = result.cbit.result.success
else:
success = result.fbit.result.success
if success:
hosts[hostname]['pass'] += 1
else:
hosts[hostname]['fail'] += 1
print(f"Statistics: {hosts}")
Running the Publisher¶
To enable Zenoh publishing from BIT Manager:
# Start BIT Manager with Zenoh enabled
BIT_CONFIG_PATH=/etc/bit \
BIT_TEST_PATH=/usr/local/lib/bit_manager \
bit-manager --zenoh
# Or as a service (edit /etc/systemd/system/bit_manager.service)
ExecStart=/usr/local/bin/bit-manager --zenoh -c
Testing Your Subscriber¶
Using the Example Monitors¶
Test your Zenoh setup by running one of the example monitors:
# Python
cd examples/bit_monitor_python
pip install -r requirements.txt
python bit_monitor.py -hostname=my-device
# C++
cd examples/bit_monitor_cpp
./build-example.sh
./build/bit_monitor_cpp
# Rust
cd examples/bit_monitor_rust
cargo run --release -- -hostname=my-device
Manual Testing¶
Generate test results manually:
# Run a single test
bit-manager -t pbit_cpu_usage -o --zenoh
# Run all PBIT tests
bit-manager -p --zenoh
Your subscriber should receive and display the results.
Troubleshooting¶
No Messages Received¶
Check Zenoh connectivity:
# Install Zenoh tools
cargo install zenoh
# Test publisher
zenoh-pub -k bit/test/PBIT -v "test message"
# Test subscriber
zenoh-sub -k "bit/**"
Verify BIT Manager is publishing:
# Run with Zenoh enabled
bit-manager --zenoh -o -t pbit_cpu_usage
# Check logs
grep -i zenoh /tmp/bit-manager_INFO_*.log
Check firewall rules:
Deserialization Errors¶
Verify protobuf versions match:
# Check protoc version
protoc --version
# Regenerate bindings
cd examples/proto
./generate-protobuf-bindings.sh
Verify message format:
Performance Issues¶
For high-frequency CBIT tests:
# Use async processing
import asyncio
async def process_sample(sample):
# Your processing code
pass
async def main():
subscriber = session.declare_subscriber("bit/*/*/*")
async for sample in subscriber:
# Process in background
asyncio.create_task(process_sample(sample))
asyncio.run(main())
Additional Resources¶
- Example Code: See
examples/directory for complete implementations - Zenoh Documentation: https://zenoh.io/docs/
- Protocol Buffers: https://protobuf.dev/
- BIT Manager Source:
bit_manager/src/main.rs- Publisher implementation
Next Steps¶
- Choose a language and review the corresponding example in
examples/ - Generate protobuf bindings for your language
- Implement subscriber logic based on the examples
- Test with
bit-manager --zenoh - Integrate into your monitoring/alerting system
For questions or contributions, see the repository README.