Real-time Streaming API
SDK-Specific Documentation:
Python SDK Users : See Python SDK Streaming Inference for StreamingSession documentation
Julia SDK Users : This page documents the Julia SDK’s streaming API
Both SDKs perform streaming inference locally (not via API calls) for low-latency real-time processing.
Overview
Streaming inference allows you to process EEG data in real-time, chunk-by-chunk, as it arrives from your BCI hardware. This is essential for:
Real-time BCI applications : Wheelchair control, gaming, assistive devices
Online feedback systems : Neurofeedback, training applications
Low-latency requirements : Systems requiring <100ms response times
Continuous monitoring : Long-duration BCI sessions
Model coverage:
Julia SDK streaming supports: NimbusLDA, NimbusQDA, NimbusProbit (Bayesian Softmax)
Python SDK additionally supports: NimbusSTS (Bayesian STS)
How Streaming Works
Unlike batch inference which processes complete trials, streaming inference processes data incrementally:
EEG Hardware → Preprocessing → Feature Chunks → NimbusSDK → Predictions
(MNE/LSL) (CSP/etc) (Local) (Real-time)
Architecture
Hardware Acquisition : EEG amplifier streams data (e.g., via LSL, BrainFlow)
Preprocessing : Apply filters and feature extraction in chunks
SDK Streaming : Process features chunk-by-chunk with process_chunk()
Aggregation : Combine chunk predictions with finalize_trial()
Key Benefit : All processing happens locally - no API calls during inference.
Quick Start
Initialize Streaming Session
using NimbusSDK
# 1. Authenticate (one-time, can cache offline)
NimbusSDK . install_core ( "nbci_live_your_key" )
# 2. Load model
model = load_model (NimbusLDA, "motor_imagery_4class_v1" )
# 3. Configure streaming metadata
metadata = BCIMetadata (
sampling_rate = 250.0 ,
paradigm = :motor_imagery ,
feature_type = :csp ,
n_features = 16 ,
n_classes = 4 ,
chunk_size = 250 # 1 second chunks at 250 Hz
)
# 4. Initialize streaming session
session = init_streaming (model, metadata)
Process Chunks in Real-Time
# Process chunks as they arrive
for chunk in eeg_feature_stream
# chunk: (n_features × chunk_size) array
result = process_chunk (session, chunk)
println ( "Chunk prediction: $(result . prediction) " )
println ( "Confidence: $( round (result . confidence, digits = 3 )) " )
println ( "Posterior: $(result . posterior) " )
end
Finalize Trial
After processing all chunks for a trial, aggregate the results:
# Aggregate chunk predictions
final_result = finalize_trial (session; method = :weighted_vote )
println ( "Final prediction: $(final_result . prediction) " )
println ( "Final confidence: $( round (final_result . confidence, digits = 3 )) " )
# Check if trial quality is acceptable
if should_reject_trial (final_result . confidence, 0.7 )
println ( "⚠️ Low confidence - trial rejected" )
else
println ( "✓ High confidence - trial accepted" )
end
Complete Example
Motor Imagery Streaming
using NimbusSDK
# Setup
NimbusSDK . install_core ( "nbci_live_your_key" )
model = load_model (NimbusLDA, "motor_imagery_4class_v1" )
metadata = BCIMetadata (
sampling_rate = 250.0 ,
paradigm = :motor_imagery ,
feature_type = :csp ,
n_features = 16 ,
n_classes = 4 ,
chunk_size = 250 # 1 second chunks
)
session = init_streaming (model, metadata)
# Real-time trial processing
function process_trial (csp_chunks)
# Process each chunk as it arrives
for chunk in csp_chunks
chunk_result = process_chunk (session, chunk)
# Real-time feedback
@info "Chunk $( length (session . chunk_history)) " prediction = chunk_result . prediction confidence = chunk_result . confidence
# Optional: Early stopping
if chunk_result . confidence > 0.95
@info "High confidence reached early - consider stopping"
end
end
# Finalize with weighted voting
final = finalize_trial (session; method = :weighted_vote )
return final
end
# Simulate streaming from BCI system
trial_chunks = [
randn ( 16 , 250 ), # Chunk 1: 0-1s
randn ( 16 , 250 ), # Chunk 2: 1-2s
randn ( 16 , 250 ), # Chunk 3: 2-3s
randn ( 16 , 250 ) # Chunk 4: 3-4s
]
final_result = process_trial (trial_chunks)
println ( " \n Final Result:" )
println ( " Prediction: $(final_result . prediction) " )
println ( " Confidence: $( round (final_result . confidence, digits = 3 )) " )
println ( " Quality: $( should_reject_trial (final_result . confidence, 0.7 ) ? "Rejected" : "Accepted" ) " )
Streaming Functions
init_streaming()
Initialize a streaming session for a model.
init_streaming (model, metadata :: BCIMetadata ) -> StreamingSession
Parameters:
model: Loaded NimbusLDA or NimbusQDA model
metadata: BCIMetadata with chunk_size specified
Returns: StreamingSession object
process_chunk()
Process a single chunk of features.
process_chunk (session :: StreamingSession , chunk :: Matrix{Float64} ; iterations :: Int = 10 ) -> StreamingResult
Parameters:
session: Active streaming session
chunk: Feature matrix (n_features × chunk_size)
iterations: Number of RxInfer iterations (default: 10)
Returns:
struct StreamingResult
prediction :: Int # Predicted class
confidence :: Float64 # Confidence (max posterior)
posterior :: Vector{Float64} # Full posterior distribution
end
finalize_trial()
Aggregate chunk predictions into final trial result.
finalize_trial (session :: StreamingSession ; method :: Symbol = :weighted_vote ) -> StreamingResult
Aggregation Methods:
:weighted_vote: Weight predictions by confidence (recommended)
:majority_vote: Simple majority vote across chunks
:last_chunk: Use only the last chunk’s prediction
:max_confidence: Use chunk with highest confidence
Parameters:
session: Streaming session with processed chunks
method: Aggregation strategy
Returns: Final aggregated StreamingResult
Chunk Size Selection
Choose chunk size based on your paradigm and latency requirements:
Paradigm Recommended Chunk Size Rationale Motor Imagery 250-500 samples (1-2s at 250 Hz) Balance between latency and accuracy P300 100-200 samples (0.4-0.8s at 250 Hz) Short epochs for rapid serial visual presentation SSVEP 500-1000 samples (2-4s at 250 Hz) Longer windows for frequency analysis
Trade-offs:
Smaller chunks : Lower latency, more frequent updates, lower per-chunk accuracy
Larger chunks : Higher latency, fewer updates, higher per-chunk accuracy
Aggregation Strategies
Weighted Vote (Recommended)
Weight each chunk’s prediction by its confidence:
final = finalize_trial (session; method = :weighted_vote )
Best for: Most applications, especially with variable confidence across chunks
Majority Vote
Simple democratic vote across chunks:
final = finalize_trial (session; method = :majority_vote )
Best for: Consistent predictions across chunks
Max Confidence
Use prediction from most confident chunk:
final = finalize_trial (session; method = :max_confidence )
Best for: When one chunk is clearly more reliable
Last Chunk
Use only the most recent chunk:
final = finalize_trial (session; method = :last_chunk )
Best for: Real-time systems where latest information is most relevant
Quality Assessment
Confidence Thresholds
Set confidence thresholds to reject low-quality trials:
confidence_threshold = 0.7
if should_reject_trial (final_result . confidence, confidence_threshold)
@warn "Trial rejected - confidence too low" confidence = final_result . confidence
# Ask user to repeat trial
else
@info "Trial accepted" confidence = final_result . confidence
# Use prediction
end
Per-Chunk Quality Monitoring
Monitor quality during streaming:
for chunk in chunks
result = process_chunk (session, chunk)
if result . confidence < 0.5
@warn "Low confidence chunk detected" chunk_idx = length (session . chunk_history)
end
end
Integration with Hardware
Lab Streaming Layer (LSL)
using NimbusSDK
# Assuming you have LSL.jl or PyCall with pylsl
function stream_from_lsl (stream_name, model, session)
# Connect to LSL stream (pseudocode)
inlet = LSL . resolve_stream ( "name" , stream_name)
buffer = []
chunk_samples = session . metadata . chunk_size
while true
# Pull sample from LSL
sample, timestamp = inlet . pull_sample ()
# Apply preprocessing (filter, CSP, etc.)
features = preprocess_sample (sample)
push! (buffer, features)
# Process when chunk is ready
if length (buffer) >= chunk_samples
chunk = hcat (buffer[ 1 : chunk_samples] ... )
result = process_chunk (session, chunk)
println ( "Real-time prediction: $(result . prediction) " )
# Slide window
buffer = buffer[(chunk_samples + 1 ) : end ]
end
end
end
BrainFlow
using NimbusSDK
# Assuming BrainFlow.jl or PyCall with brainflow
function stream_from_brainflow (board, model, session)
while true
# Get data from BrainFlow board
data = BrainFlow . get_current_board_data (board)
# Apply preprocessing
filtered = apply_filters (data)
csp_features = apply_csp (filtered)
# Process chunk
result = process_chunk (session, csp_features)
# Real-time feedback
display_prediction (result . prediction, result . confidence)
sleep ( 1.0 ) # 1-second chunks
end
end
Minimize Latency
# Use fewer iterations for faster processing
result = process_chunk (session, chunk; iterations = 5 ) # ~10-20ms
Precompute Models
# Load and compile model before streaming starts
model = load_model (NimbusLDA, "motor_imagery_4class_v1" )
session = init_streaming (model, metadata)
# Warm-up: process dummy chunk to compile functions
dummy_chunk = randn ( 16 , 250 )
process_chunk (session, dummy_chunk)
# Now ready for real-time processing
Parallel Processing
using Base . Threads
# Process multiple channels in parallel
@threads for channel_session in channel_sessions
result = process_chunk (channel_session, chunks[Threads . threadid ()])
end
Typical Latencies
Operation Latency Notes Chunk processing 10-30ms RxInfer inference (10 iterations) Feature extraction 5-20ms CSP transform, bandpower Trial finalization <5ms Aggregate chunk results Total (end-to-end) <100ms Including preprocessing
All latencies are for local processing. No network calls are made during streaming.
Comparison: Streaming vs Batch
Aspect Streaming Batch Latency Low (<100ms per chunk) Higher (full trial) Use Case Real-time BCI Offline analysis Feedback Continuous After trial completion Memory Low (per-chunk) Higher (full trials) Accuracy Aggregated over chunks Single prediction
Next Read
Real-time Setup Hardware integration guide
Batch Processing Offline analysis workflow
Julia SDK Complete SDK reference
Examples Working code examples
Support
For streaming and real-time processing questions: