- Add StdoutCapture context manager to capture all print() statements - Connect captured output to GUI terminal display via stdout_captured signal - Fix logging issue where pipeline initialization and operation logs were not shown in app - Prevent infinite recursion with _emitting flag in TeeWriter - Ensure both console and GUI receive all log messages during deployment - Comment out USB timeout setting that was causing device timeout issues This resolves the issue where logs would stop showing partially in the app, ensuring complete visibility of MultiDongle and InferencePipeline operations. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
InferencePipeline
A high-performance multi-stage inference pipeline system designed for Kneron NPU dongles, enabling flexible single-stage and cascaded multi-stage AI inference workflows.
Installation
This project uses uv for fast Python package management.
# Install uv if you haven't already
curl -LsSf https://astral.sh/uv/install.sh | sh
# Create and activate virtual environment
uv venv
source .venv/bin/activate # On Windows: .venv\Scripts\activate
# Install dependencies
uv pip install -r requirements.txt
Requirements
"numpy>=2.2.6",
"opencv-python>=4.11.0.86",
Hardware Requirements
- Kneron AI dongles (KL520, KL720, etc.)
- USB ports for device connections
- Compatible firmware files (
fw_scpu.bin,fw_ncpu.bin) - Trained model files (
.nefformat)
Quick Start
Single-Stage Pipeline
Replace your existing MultiDongle usage with InferencePipeline for enhanced features:
from InferencePipeline import InferencePipeline, StageConfig
# Configure single stage
stage_config = StageConfig(
stage_id="fire_detection",
port_ids=[28, 32], # USB port IDs for your dongles
scpu_fw_path="fw_scpu.bin",
ncpu_fw_path="fw_ncpu.bin",
model_path="fire_detection_520.nef",
upload_fw=True
)
# Create and start pipeline
pipeline = InferencePipeline([stage_config], pipeline_name="FireDetection")
pipeline.initialize()
pipeline.start()
# Set up result callback
def handle_result(pipeline_data):
result = pipeline_data.stage_results.get("fire_detection", {})
print(f"🔥 Detection: {result.get('result', 'Unknown')} "
f"(Probability: {result.get('probability', 0.0):.3f})")
pipeline.set_result_callback(handle_result)
# Process frames
import cv2
cap = cv2.VideoCapture(0)
try:
while True:
ret, frame = cap.read()
if ret:
pipeline.put_data(frame)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
finally:
cap.release()
pipeline.stop()
Multi-Stage Cascade Pipeline
Chain multiple models for complex workflows:
from InferencePipeline import InferencePipeline, StageConfig
from Multidongle import PreProcessor, PostProcessor
# Custom preprocessing for second stage
def roi_extraction(frame, target_size):
"""Extract region of interest from detection results"""
# Extract center region as example
h, w = frame.shape[:2]
center_crop = frame[h//4:3*h//4, w//4:3*w//4]
return cv2.resize(center_crop, target_size)
# Custom result fusion
def combine_results(raw_output, **kwargs):
"""Combine detection + classification results"""
classification_prob = float(raw_output[0]) if raw_output.size > 0 else 0.0
detection_conf = kwargs.get('detection_conf', 0.5)
# Weighted combination
combined_score = (classification_prob * 0.7) + (detection_conf * 0.3)
return {
'combined_probability': combined_score,
'classification_prob': classification_prob,
'detection_conf': detection_conf,
'result': 'Fire Detected' if combined_score > 0.6 else 'No Fire',
'confidence': 'High' if combined_score > 0.8 else 'Low'
}
# Stage 1: Object Detection
detection_stage = StageConfig(
stage_id="object_detection",
port_ids=[28, 30],
scpu_fw_path="fw_scpu.bin",
ncpu_fw_path="fw_ncpu.bin",
model_path="object_detection_520.nef",
upload_fw=True
)
# Stage 2: Fire Classification with preprocessing
classification_stage = StageConfig(
stage_id="fire_classification",
port_ids=[32, 34],
scpu_fw_path="fw_scpu.bin",
ncpu_fw_path="fw_ncpu.bin",
model_path="fire_classification_520.nef",
upload_fw=True,
input_preprocessor=PreProcessor(resize_fn=roi_extraction),
output_postprocessor=PostProcessor(process_fn=combine_results)
)
# Create two-stage pipeline
pipeline = InferencePipeline(
[detection_stage, classification_stage],
pipeline_name="DetectionClassificationCascade"
)
# Enhanced result handler
def handle_cascade_result(pipeline_data):
detection = pipeline_data.stage_results.get("object_detection", {})
classification = pipeline_data.stage_results.get("fire_classification", {})
print(f"🎯 Detection: {detection.get('result', 'Unknown')} "
f"(Conf: {detection.get('probability', 0.0):.3f})")
print(f"🔥 Classification: {classification.get('result', 'Unknown')} "
f"(Combined: {classification.get('combined_probability', 0.0):.3f})")
print(f"⏱️ Processing Time: {pipeline_data.metadata.get('total_processing_time', 0.0):.3f}s")
print("-" * 50)
pipeline.set_result_callback(handle_cascade_result)
pipeline.initialize()
pipeline.start()
# Your processing loop here...
Usage Examples
Example 1: Real-time Webcam Processing
from InferencePipeline import InferencePipeline, StageConfig
from Multidongle import WebcamSource
def run_realtime_detection():
# Configure pipeline
config = StageConfig(
stage_id="realtime_detection",
port_ids=[28, 32],
scpu_fw_path="fw_scpu.bin",
ncpu_fw_path="fw_ncpu.bin",
model_path="your_model.nef",
upload_fw=True,
max_queue_size=30 # Prevent memory buildup
)
pipeline = InferencePipeline([config])
pipeline.initialize()
pipeline.start()
# Use webcam source
source = WebcamSource(camera_id=0)
source.start()
def display_results(pipeline_data):
result = pipeline_data.stage_results["realtime_detection"]
probability = result.get('probability', 0.0)
detection = result.get('result', 'Unknown')
# Your visualization logic here
print(f"Detection: {detection} ({probability:.3f})")
pipeline.set_result_callback(display_results)
try:
while True:
frame = source.get_frame()
if frame is not None:
pipeline.put_data(frame)
time.sleep(0.033) # ~30 FPS
except KeyboardInterrupt:
print("Stopping...")
finally:
source.stop()
pipeline.stop()
if __name__ == "__main__":
run_realtime_detection()
Example 2: Complex Multi-Modal Pipeline
def run_multimodal_pipeline():
"""Multi-modal fire detection with RGB, edge, and thermal-like analysis"""
def edge_preprocessing(frame, target_size):
"""Extract edge features"""
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
edges = cv2.Canny(gray, 50, 150)
edges_3ch = cv2.cvtColor(edges, cv2.COLOR_GRAY2BGR)
return cv2.resize(edges_3ch, target_size)
def thermal_preprocessing(frame, target_size):
"""Simulate thermal processing"""
hsv = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV)
thermal_like = hsv[:, :, 2] # Value channel
thermal_3ch = cv2.cvtColor(thermal_like, cv2.COLOR_GRAY2BGR)
return cv2.resize(thermal_3ch, target_size)
def fusion_postprocessing(raw_output, **kwargs):
"""Fuse results from multiple modalities"""
if raw_output.size > 0:
current_prob = float(raw_output[0])
rgb_conf = kwargs.get('rgb_conf', 0.5)
edge_conf = kwargs.get('edge_conf', 0.5)
# Weighted fusion
fused_prob = (current_prob * 0.5) + (rgb_conf * 0.3) + (edge_conf * 0.2)
return {
'fused_probability': fused_prob,
'modality_scores': {
'thermal': current_prob,
'rgb': rgb_conf,
'edge': edge_conf
},
'result': 'Fire Detected' if fused_prob > 0.6 else 'No Fire',
'confidence': 'Very High' if fused_prob > 0.9 else 'High' if fused_prob > 0.7 else 'Medium'
}
return {'fused_probability': 0.0, 'result': 'No Fire'}
# Define stages
stages = [
StageConfig("rgb_analysis", [28, 30], "fw_scpu.bin", "fw_ncpu.bin", "rgb_model.nef", True),
StageConfig("edge_analysis", [32, 34], "fw_scpu.bin", "fw_ncpu.bin", "edge_model.nef", True,
input_preprocessor=PreProcessor(resize_fn=edge_preprocessing)),
StageConfig("thermal_analysis", [36, 38], "fw_scpu.bin", "fw_ncpu.bin", "thermal_model.nef", True,
input_preprocessor=PreProcessor(resize_fn=thermal_preprocessing)),
StageConfig("fusion", [40, 42], "fw_scpu.bin", "fw_ncpu.bin", "fusion_model.nef", True,
output_postprocessor=PostProcessor(process_fn=fusion_postprocessing))
]
pipeline = InferencePipeline(stages, pipeline_name="MultiModalFireDetection")
def handle_multimodal_result(pipeline_data):
print(f"\n🔥 Multi-Modal Fire Detection Results:")
for stage_id, result in pipeline_data.stage_results.items():
if 'probability' in result:
print(f" {stage_id}: {result['result']} ({result['probability']:.3f})")
if 'fusion' in pipeline_data.stage_results:
fusion = pipeline_data.stage_results['fusion']
print(f" 🎯 FINAL: {fusion['result']} (Fused: {fusion['fused_probability']:.3f})")
print(f" Confidence: {fusion.get('confidence', 'Unknown')}")
pipeline.set_result_callback(handle_multimodal_result)
# Start pipeline
pipeline.initialize()
pipeline.start()
# Your processing logic here...
Example 3: Batch Processing
def process_image_batch(image_paths):
"""Process a batch of images through pipeline"""
config = StageConfig(
stage_id="batch_processing",
port_ids=[28, 32],
scpu_fw_path="fw_scpu.bin",
ncpu_fw_path="fw_ncpu.bin",
model_path="batch_model.nef",
upload_fw=True
)
pipeline = InferencePipeline([config])
pipeline.initialize()
pipeline.start()
results = []
def collect_result(pipeline_data):
result = pipeline_data.stage_results["batch_processing"]
results.append({
'pipeline_id': pipeline_data.pipeline_id,
'result': result,
'processing_time': pipeline_data.metadata.get('total_processing_time', 0.0)
})
pipeline.set_result_callback(collect_result)
# Submit all images
for img_path in image_paths:
image = cv2.imread(img_path)
if image is not None:
pipeline.put_data(image)
# Wait for all results
import time
while len(results) < len(image_paths):
time.sleep(0.1)
pipeline.stop()
return results
Configuration
StageConfig Parameters
StageConfig(
stage_id="unique_stage_name", # Required: Unique identifier
port_ids=[28, 32], # Required: USB port IDs for dongles
scpu_fw_path="fw_scpu.bin", # Required: SCPU firmware path
ncpu_fw_path="fw_ncpu.bin", # Required: NCPU firmware path
model_path="model.nef", # Required: Model file path
upload_fw=True, # Upload firmware on init
max_queue_size=50, # Queue size limit
input_preprocessor=None, # Optional: Inter-stage preprocessing
output_postprocessor=None, # Optional: Inter-stage postprocessing
stage_preprocessor=None, # Optional: MultiDongle preprocessing
stage_postprocessor=None # Optional: MultiDongle postprocessing
)
Performance Tuning
# For high-throughput scenarios
config = StageConfig(
stage_id="high_performance",
port_ids=[28, 30, 32, 34], # Use more dongles
max_queue_size=100, # Larger queues
# ... other params
)
# For low-latency scenarios
config = StageConfig(
stage_id="low_latency",
port_ids=[28, 32],
max_queue_size=10, # Smaller queues
# ... other params
)
Statistics and Monitoring
# Enable statistics reporting
def print_stats(stats):
print(f"\n📊 Pipeline Statistics:")
print(f" Input: {stats['pipeline_input_submitted']}")
print(f" Completed: {stats['pipeline_completed']}")
print(f" Success Rate: {stats['pipeline_completed']/max(stats['pipeline_input_submitted'], 1)*100:.1f}%")
for stage_stat in stats['stage_statistics']:
print(f" Stage {stage_stat['stage_id']}: "
f"Processed={stage_stat['processed_count']}, "
f"AvgTime={stage_stat['avg_processing_time']:.3f}s")
pipeline.set_stats_callback(print_stats)
pipeline.start_stats_reporting(interval=5.0) # Report every 5 seconds
Running Examples
The project includes comprehensive examples in test.py:
# Single-stage pipeline
uv run python test.py --example single
# Two-stage cascade pipeline
uv run python test.py --example cascade
# Complex multi-stage pipeline
uv run python test.py --example complex
API Reference
InferencePipeline
Main pipeline orchestrator class.
Methods:
initialize(): Initialize all pipeline stagesstart(): Start pipeline processing threadsstop(): Gracefully stop pipelineput_data(data, timeout=1.0): Submit data for processingget_result(timeout=0.1): Get processed resultsset_result_callback(callback): Set success callbackset_error_callback(callback): Set error callbackget_pipeline_statistics(): Get performance metrics
StageConfig
Configuration for individual pipeline stages.
PipelineData
Data structure flowing through pipeline stages.
Attributes:
data: Main data payloadmetadata: Processing metadatastage_results: Results from each stagepipeline_id: Unique identifiertimestamp: Creation timestamp
Performance Considerations
- Queue Sizing: Balance memory usage vs. throughput with
max_queue_size - Dongle Distribution: Distribute dongles across stages for optimal performance
- Preprocessing: Minimize expensive operations in preprocessors
- Memory Management: Monitor queue sizes and processing times
- Threading: Pipeline uses multiple threads - ensure thread-safe operations
Troubleshooting
Common Issues
Pipeline hangs or stops processing:
- Check dongle connections and firmware compatibility
- Monitor queue sizes for bottlenecks
- Verify model file paths and formats
High memory usage:
- Reduce
max_queue_sizeparameters - Ensure proper cleanup in custom processors
- Monitor statistics for processing times
Poor performance:
- Distribute dongles optimally across stages
- Profile preprocessing/postprocessing functions
- Consider batch processing for high throughput
Debug Mode
Enable detailed logging for troubleshooting:
import logging
logging.basicConfig(level=logging.DEBUG)
# Pipeline will output detailed processing information