cluster4npu/cluster4npu_ui
Masonmason 83906c87e3 fix: Implement stdout/stderr capture for complete logging in deployment UI
- Add StdoutCapture context manager to capture all print() statements
- Connect captured output to GUI terminal display via stdout_captured signal
- Fix logging issue where pipeline initialization and operation logs were not shown in app
- Prevent infinite recursion with _emitting flag in TeeWriter
- Ensure both console and GUI receive all log messages during deployment
- Comment out USB timeout setting that was causing device timeout issues

This resolves the issue where logs would stop showing partially in the app,
ensuring complete visibility of MultiDongle and InferencePipeline operations.

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-07-24 12:52:35 +08:00
..
2025-07-23 22:10:03 +08:00
2025-07-24 10:05:39 +08:00
2025-07-23 22:10:03 +08:00
2025-07-23 22:10:03 +08:00

InferencePipeline

A high-performance multi-stage inference pipeline system designed for Kneron NPU dongles, enabling flexible single-stage and cascaded multi-stage AI inference workflows.

Installation

This project uses uv for fast Python package management.

# Install uv if you haven't already
curl -LsSf https://astral.sh/uv/install.sh | sh

# Create and activate virtual environment
uv venv
source .venv/bin/activate  # On Windows: .venv\Scripts\activate

# Install dependencies
uv pip install -r requirements.txt

Requirements

"numpy>=2.2.6",
"opencv-python>=4.11.0.86",

Hardware Requirements

  • Kneron AI dongles (KL520, KL720, etc.)
  • USB ports for device connections
  • Compatible firmware files (fw_scpu.bin, fw_ncpu.bin)
  • Trained model files (.nef format)

Quick Start

Single-Stage Pipeline

Replace your existing MultiDongle usage with InferencePipeline for enhanced features:

from InferencePipeline import InferencePipeline, StageConfig

# Configure single stage
stage_config = StageConfig(
    stage_id="fire_detection",
    port_ids=[28, 32],  # USB port IDs for your dongles
    scpu_fw_path="fw_scpu.bin",
    ncpu_fw_path="fw_ncpu.bin", 
    model_path="fire_detection_520.nef",
    upload_fw=True
)

# Create and start pipeline
pipeline = InferencePipeline([stage_config], pipeline_name="FireDetection")
pipeline.initialize()
pipeline.start()

# Set up result callback
def handle_result(pipeline_data):
    result = pipeline_data.stage_results.get("fire_detection", {})
    print(f"🔥 Detection: {result.get('result', 'Unknown')} "
          f"(Probability: {result.get('probability', 0.0):.3f})")

pipeline.set_result_callback(handle_result)

# Process frames
import cv2
cap = cv2.VideoCapture(0)

try:
    while True:
        ret, frame = cap.read()
        if ret:
            pipeline.put_data(frame)
        if cv2.waitKey(1) & 0xFF == ord('q'):
            break
finally:
    cap.release()
    pipeline.stop()

Multi-Stage Cascade Pipeline

Chain multiple models for complex workflows:

from InferencePipeline import InferencePipeline, StageConfig
from Multidongle import PreProcessor, PostProcessor

# Custom preprocessing for second stage
def roi_extraction(frame, target_size):
    """Extract region of interest from detection results"""
    # Extract center region as example
    h, w = frame.shape[:2]
    center_crop = frame[h//4:3*h//4, w//4:3*w//4]
    return cv2.resize(center_crop, target_size)

# Custom result fusion
def combine_results(raw_output, **kwargs):
    """Combine detection + classification results"""
    classification_prob = float(raw_output[0]) if raw_output.size > 0 else 0.0
    detection_conf = kwargs.get('detection_conf', 0.5)
    
    # Weighted combination
    combined_score = (classification_prob * 0.7) + (detection_conf * 0.3)
    
    return {
        'combined_probability': combined_score,
        'classification_prob': classification_prob,
        'detection_conf': detection_conf,
        'result': 'Fire Detected' if combined_score > 0.6 else 'No Fire',
        'confidence': 'High' if combined_score > 0.8 else 'Low'
    }

# Stage 1: Object Detection
detection_stage = StageConfig(
    stage_id="object_detection",
    port_ids=[28, 30],
    scpu_fw_path="fw_scpu.bin",
    ncpu_fw_path="fw_ncpu.bin",
    model_path="object_detection_520.nef",
    upload_fw=True
)

# Stage 2: Fire Classification with preprocessing
classification_stage = StageConfig(
    stage_id="fire_classification",
    port_ids=[32, 34],
    scpu_fw_path="fw_scpu.bin", 
    ncpu_fw_path="fw_ncpu.bin",
    model_path="fire_classification_520.nef",
    upload_fw=True,
    input_preprocessor=PreProcessor(resize_fn=roi_extraction),
    output_postprocessor=PostProcessor(process_fn=combine_results)
)

# Create two-stage pipeline
pipeline = InferencePipeline(
    [detection_stage, classification_stage],
    pipeline_name="DetectionClassificationCascade"
)

# Enhanced result handler
def handle_cascade_result(pipeline_data):
    detection = pipeline_data.stage_results.get("object_detection", {})
    classification = pipeline_data.stage_results.get("fire_classification", {})
    
    print(f"🎯 Detection: {detection.get('result', 'Unknown')} "
          f"(Conf: {detection.get('probability', 0.0):.3f})")
    print(f"🔥 Classification: {classification.get('result', 'Unknown')} "
          f"(Combined: {classification.get('combined_probability', 0.0):.3f})")
    print(f"⏱️  Processing Time: {pipeline_data.metadata.get('total_processing_time', 0.0):.3f}s")
    print("-" * 50)

pipeline.set_result_callback(handle_cascade_result)
pipeline.initialize()
pipeline.start()

# Your processing loop here...

Usage Examples

Example 1: Real-time Webcam Processing

from InferencePipeline import InferencePipeline, StageConfig
from Multidongle import WebcamSource

def run_realtime_detection():
    # Configure pipeline
    config = StageConfig(
        stage_id="realtime_detection",
        port_ids=[28, 32], 
        scpu_fw_path="fw_scpu.bin",
        ncpu_fw_path="fw_ncpu.bin",
        model_path="your_model.nef",
        upload_fw=True,
        max_queue_size=30  # Prevent memory buildup
    )
    
    pipeline = InferencePipeline([config])
    pipeline.initialize()
    pipeline.start()
    
    # Use webcam source
    source = WebcamSource(camera_id=0)
    source.start()
    
    def display_results(pipeline_data):
        result = pipeline_data.stage_results["realtime_detection"]
        probability = result.get('probability', 0.0)
        detection = result.get('result', 'Unknown')
        
        # Your visualization logic here
        print(f"Detection: {detection} ({probability:.3f})")
    
    pipeline.set_result_callback(display_results)
    
    try:
        while True:
            frame = source.get_frame()
            if frame is not None:
                pipeline.put_data(frame)
            time.sleep(0.033)  # ~30 FPS
    except KeyboardInterrupt:
        print("Stopping...")
    finally:
        source.stop()
        pipeline.stop()

if __name__ == "__main__":
    run_realtime_detection()

Example 2: Complex Multi-Modal Pipeline

def run_multimodal_pipeline():
    """Multi-modal fire detection with RGB, edge, and thermal-like analysis"""
    
    def edge_preprocessing(frame, target_size):
        """Extract edge features"""
        gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
        edges = cv2.Canny(gray, 50, 150)
        edges_3ch = cv2.cvtColor(edges, cv2.COLOR_GRAY2BGR)
        return cv2.resize(edges_3ch, target_size)
    
    def thermal_preprocessing(frame, target_size):
        """Simulate thermal processing"""
        hsv = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV)
        thermal_like = hsv[:, :, 2]  # Value channel
        thermal_3ch = cv2.cvtColor(thermal_like, cv2.COLOR_GRAY2BGR)
        return cv2.resize(thermal_3ch, target_size)
    
    def fusion_postprocessing(raw_output, **kwargs):
        """Fuse results from multiple modalities"""
        if raw_output.size > 0:
            current_prob = float(raw_output[0])
            rgb_conf = kwargs.get('rgb_conf', 0.5)
            edge_conf = kwargs.get('edge_conf', 0.5)
            
            # Weighted fusion
            fused_prob = (current_prob * 0.5) + (rgb_conf * 0.3) + (edge_conf * 0.2)
            
            return {
                'fused_probability': fused_prob,
                'modality_scores': {
                    'thermal': current_prob,
                    'rgb': rgb_conf,
                    'edge': edge_conf
                },
                'result': 'Fire Detected' if fused_prob > 0.6 else 'No Fire',
                'confidence': 'Very High' if fused_prob > 0.9 else 'High' if fused_prob > 0.7 else 'Medium'
            }
        return {'fused_probability': 0.0, 'result': 'No Fire'}
    
    # Define stages
    stages = [
        StageConfig("rgb_analysis", [28, 30], "fw_scpu.bin", "fw_ncpu.bin", "rgb_model.nef", True),
        StageConfig("edge_analysis", [32, 34], "fw_scpu.bin", "fw_ncpu.bin", "edge_model.nef", True,
                   input_preprocessor=PreProcessor(resize_fn=edge_preprocessing)),
        StageConfig("thermal_analysis", [36, 38], "fw_scpu.bin", "fw_ncpu.bin", "thermal_model.nef", True,
                   input_preprocessor=PreProcessor(resize_fn=thermal_preprocessing)),
        StageConfig("fusion", [40, 42], "fw_scpu.bin", "fw_ncpu.bin", "fusion_model.nef", True,
                   output_postprocessor=PostProcessor(process_fn=fusion_postprocessing))
    ]
    
    pipeline = InferencePipeline(stages, pipeline_name="MultiModalFireDetection")
    
    def handle_multimodal_result(pipeline_data):
        print(f"\n🔥 Multi-Modal Fire Detection Results:")
        for stage_id, result in pipeline_data.stage_results.items():
            if 'probability' in result:
                print(f"   {stage_id}: {result['result']} ({result['probability']:.3f})")
        
        if 'fusion' in pipeline_data.stage_results:
            fusion = pipeline_data.stage_results['fusion']
            print(f"   🎯 FINAL: {fusion['result']} (Fused: {fusion['fused_probability']:.3f})")
            print(f"   Confidence: {fusion.get('confidence', 'Unknown')}")
    
    pipeline.set_result_callback(handle_multimodal_result)
    
    # Start pipeline
    pipeline.initialize()
    pipeline.start()
    
    # Your processing logic here...

Example 3: Batch Processing

def process_image_batch(image_paths):
    """Process a batch of images through pipeline"""
    
    config = StageConfig(
        stage_id="batch_processing",
        port_ids=[28, 32],
        scpu_fw_path="fw_scpu.bin", 
        ncpu_fw_path="fw_ncpu.bin",
        model_path="batch_model.nef",
        upload_fw=True
    )
    
    pipeline = InferencePipeline([config])
    pipeline.initialize()
    pipeline.start()
    
    results = []
    
    def collect_result(pipeline_data):
        result = pipeline_data.stage_results["batch_processing"]
        results.append({
            'pipeline_id': pipeline_data.pipeline_id,
            'result': result,
            'processing_time': pipeline_data.metadata.get('total_processing_time', 0.0)
        })
    
    pipeline.set_result_callback(collect_result)
    
    # Submit all images
    for img_path in image_paths:
        image = cv2.imread(img_path)
        if image is not None:
            pipeline.put_data(image)
    
    # Wait for all results
    import time
    while len(results) < len(image_paths):
        time.sleep(0.1)
    
    pipeline.stop()
    return results

Configuration

StageConfig Parameters

StageConfig(
    stage_id="unique_stage_name",           # Required: Unique identifier
    port_ids=[28, 32],                      # Required: USB port IDs for dongles
    scpu_fw_path="fw_scpu.bin",            # Required: SCPU firmware path
    ncpu_fw_path="fw_ncpu.bin",            # Required: NCPU firmware path  
    model_path="model.nef",                 # Required: Model file path
    upload_fw=True,                         # Upload firmware on init
    max_queue_size=50,                      # Queue size limit
    input_preprocessor=None,                # Optional: Inter-stage preprocessing
    output_postprocessor=None,              # Optional: Inter-stage postprocessing
    stage_preprocessor=None,                # Optional: MultiDongle preprocessing
    stage_postprocessor=None                # Optional: MultiDongle postprocessing
)

Performance Tuning

# For high-throughput scenarios
config = StageConfig(
    stage_id="high_performance",
    port_ids=[28, 30, 32, 34],  # Use more dongles
    max_queue_size=100,          # Larger queues
    # ... other params
)

# For low-latency scenarios  
config = StageConfig(
    stage_id="low_latency",
    port_ids=[28, 32],
    max_queue_size=10,           # Smaller queues
    # ... other params
)

Statistics and Monitoring

# Enable statistics reporting
def print_stats(stats):
    print(f"\n📊 Pipeline Statistics:")
    print(f"   Input: {stats['pipeline_input_submitted']}")
    print(f"   Completed: {stats['pipeline_completed']}")
    print(f"   Success Rate: {stats['pipeline_completed']/max(stats['pipeline_input_submitted'], 1)*100:.1f}%")
    
    for stage_stat in stats['stage_statistics']:
        print(f"   Stage {stage_stat['stage_id']}: "
              f"Processed={stage_stat['processed_count']}, "
              f"AvgTime={stage_stat['avg_processing_time']:.3f}s")

pipeline.set_stats_callback(print_stats)
pipeline.start_stats_reporting(interval=5.0)  # Report every 5 seconds

Running Examples

The project includes comprehensive examples in test.py:

# Single-stage pipeline
uv run python test.py --example single

# Two-stage cascade pipeline
uv run python test.py --example cascade

# Complex multi-stage pipeline
uv run python test.py --example complex

API Reference

InferencePipeline

Main pipeline orchestrator class.

Methods:

  • initialize(): Initialize all pipeline stages
  • start(): Start pipeline processing threads
  • stop(): Gracefully stop pipeline
  • put_data(data, timeout=1.0): Submit data for processing
  • get_result(timeout=0.1): Get processed results
  • set_result_callback(callback): Set success callback
  • set_error_callback(callback): Set error callback
  • get_pipeline_statistics(): Get performance metrics

StageConfig

Configuration for individual pipeline stages.

PipelineData

Data structure flowing through pipeline stages.

Attributes:

  • data: Main data payload
  • metadata: Processing metadata
  • stage_results: Results from each stage
  • pipeline_id: Unique identifier
  • timestamp: Creation timestamp

Performance Considerations

  1. Queue Sizing: Balance memory usage vs. throughput with max_queue_size
  2. Dongle Distribution: Distribute dongles across stages for optimal performance
  3. Preprocessing: Minimize expensive operations in preprocessors
  4. Memory Management: Monitor queue sizes and processing times
  5. Threading: Pipeline uses multiple threads - ensure thread-safe operations

Troubleshooting

Common Issues

Pipeline hangs or stops processing:

  • Check dongle connections and firmware compatibility
  • Monitor queue sizes for bottlenecks
  • Verify model file paths and formats

High memory usage:

  • Reduce max_queue_size parameters
  • Ensure proper cleanup in custom processors
  • Monitor statistics for processing times

Poor performance:

  • Distribute dongles optimally across stages
  • Profile preprocessing/postprocessing functions
  • Consider batch processing for high throughput

Debug Mode

Enable detailed logging for troubleshooting:

import logging
logging.basicConfig(level=logging.DEBUG)

# Pipeline will output detailed processing information