cluster4npu/cluster4npu_ui
Masonmason 24d5726ee2 fix: Restore USB timeout setting and improve terminal display reliability
- Re-enable kp.core.set_timeout() which is required for proper device communication
- Fix GUI terminal truncation issue by using append() instead of setPlainText()
- Remove aggressive line limiting that was causing log display to stop midway
- Implement gentler memory management (trim only after 1000+ lines)
- This should resolve pipeline timeout issues and complete log display

The previous USB timeout disable was causing stage timeouts without inference results.
The terminal display issue was due to frequent text replacement causing display corruption.

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-07-24 19:25:02 +08:00
..
2025-07-23 22:10:03 +08:00
2025-07-24 10:05:39 +08:00
2025-07-23 22:10:03 +08:00
2025-07-23 22:10:03 +08:00

InferencePipeline

A high-performance multi-stage inference pipeline system designed for Kneron NPU dongles, enabling flexible single-stage and cascaded multi-stage AI inference workflows.

Installation

This project uses uv for fast Python package management.

# Install uv if you haven't already
curl -LsSf https://astral.sh/uv/install.sh | sh

# Create and activate virtual environment
uv venv
source .venv/bin/activate  # On Windows: .venv\Scripts\activate

# Install dependencies
uv pip install -r requirements.txt

Requirements

"numpy>=2.2.6",
"opencv-python>=4.11.0.86",

Hardware Requirements

  • Kneron AI dongles (KL520, KL720, etc.)
  • USB ports for device connections
  • Compatible firmware files (fw_scpu.bin, fw_ncpu.bin)
  • Trained model files (.nef format)

Quick Start

Single-Stage Pipeline

Replace your existing MultiDongle usage with InferencePipeline for enhanced features:

from InferencePipeline import InferencePipeline, StageConfig

# Configure single stage
stage_config = StageConfig(
    stage_id="fire_detection",
    port_ids=[28, 32],  # USB port IDs for your dongles
    scpu_fw_path="fw_scpu.bin",
    ncpu_fw_path="fw_ncpu.bin", 
    model_path="fire_detection_520.nef",
    upload_fw=True
)

# Create and start pipeline
pipeline = InferencePipeline([stage_config], pipeline_name="FireDetection")
pipeline.initialize()
pipeline.start()

# Set up result callback
def handle_result(pipeline_data):
    result = pipeline_data.stage_results.get("fire_detection", {})
    print(f"🔥 Detection: {result.get('result', 'Unknown')} "
          f"(Probability: {result.get('probability', 0.0):.3f})")

pipeline.set_result_callback(handle_result)

# Process frames
import cv2
cap = cv2.VideoCapture(0)

try:
    while True:
        ret, frame = cap.read()
        if ret:
            pipeline.put_data(frame)
        if cv2.waitKey(1) & 0xFF == ord('q'):
            break
finally:
    cap.release()
    pipeline.stop()

Multi-Stage Cascade Pipeline

Chain multiple models for complex workflows:

from InferencePipeline import InferencePipeline, StageConfig
from Multidongle import PreProcessor, PostProcessor

# Custom preprocessing for second stage
def roi_extraction(frame, target_size):
    """Extract region of interest from detection results"""
    # Extract center region as example
    h, w = frame.shape[:2]
    center_crop = frame[h//4:3*h//4, w//4:3*w//4]
    return cv2.resize(center_crop, target_size)

# Custom result fusion
def combine_results(raw_output, **kwargs):
    """Combine detection + classification results"""
    classification_prob = float(raw_output[0]) if raw_output.size > 0 else 0.0
    detection_conf = kwargs.get('detection_conf', 0.5)
    
    # Weighted combination
    combined_score = (classification_prob * 0.7) + (detection_conf * 0.3)
    
    return {
        'combined_probability': combined_score,
        'classification_prob': classification_prob,
        'detection_conf': detection_conf,
        'result': 'Fire Detected' if combined_score > 0.6 else 'No Fire',
        'confidence': 'High' if combined_score > 0.8 else 'Low'
    }

# Stage 1: Object Detection
detection_stage = StageConfig(
    stage_id="object_detection",
    port_ids=[28, 30],
    scpu_fw_path="fw_scpu.bin",
    ncpu_fw_path="fw_ncpu.bin",
    model_path="object_detection_520.nef",
    upload_fw=True
)

# Stage 2: Fire Classification with preprocessing
classification_stage = StageConfig(
    stage_id="fire_classification",
    port_ids=[32, 34],
    scpu_fw_path="fw_scpu.bin", 
    ncpu_fw_path="fw_ncpu.bin",
    model_path="fire_classification_520.nef",
    upload_fw=True,
    input_preprocessor=PreProcessor(resize_fn=roi_extraction),
    output_postprocessor=PostProcessor(process_fn=combine_results)
)

# Create two-stage pipeline
pipeline = InferencePipeline(
    [detection_stage, classification_stage],
    pipeline_name="DetectionClassificationCascade"
)

# Enhanced result handler
def handle_cascade_result(pipeline_data):
    detection = pipeline_data.stage_results.get("object_detection", {})
    classification = pipeline_data.stage_results.get("fire_classification", {})
    
    print(f"🎯 Detection: {detection.get('result', 'Unknown')} "
          f"(Conf: {detection.get('probability', 0.0):.3f})")
    print(f"🔥 Classification: {classification.get('result', 'Unknown')} "
          f"(Combined: {classification.get('combined_probability', 0.0):.3f})")
    print(f"⏱️  Processing Time: {pipeline_data.metadata.get('total_processing_time', 0.0):.3f}s")
    print("-" * 50)

pipeline.set_result_callback(handle_cascade_result)
pipeline.initialize()
pipeline.start()

# Your processing loop here...

Usage Examples

Example 1: Real-time Webcam Processing

from InferencePipeline import InferencePipeline, StageConfig
from Multidongle import WebcamSource

def run_realtime_detection():
    # Configure pipeline
    config = StageConfig(
        stage_id="realtime_detection",
        port_ids=[28, 32], 
        scpu_fw_path="fw_scpu.bin",
        ncpu_fw_path="fw_ncpu.bin",
        model_path="your_model.nef",
        upload_fw=True,
        max_queue_size=30  # Prevent memory buildup
    )
    
    pipeline = InferencePipeline([config])
    pipeline.initialize()
    pipeline.start()
    
    # Use webcam source
    source = WebcamSource(camera_id=0)
    source.start()
    
    def display_results(pipeline_data):
        result = pipeline_data.stage_results["realtime_detection"]
        probability = result.get('probability', 0.0)
        detection = result.get('result', 'Unknown')
        
        # Your visualization logic here
        print(f"Detection: {detection} ({probability:.3f})")
    
    pipeline.set_result_callback(display_results)
    
    try:
        while True:
            frame = source.get_frame()
            if frame is not None:
                pipeline.put_data(frame)
            time.sleep(0.033)  # ~30 FPS
    except KeyboardInterrupt:
        print("Stopping...")
    finally:
        source.stop()
        pipeline.stop()

if __name__ == "__main__":
    run_realtime_detection()

Example 2: Complex Multi-Modal Pipeline

def run_multimodal_pipeline():
    """Multi-modal fire detection with RGB, edge, and thermal-like analysis"""
    
    def edge_preprocessing(frame, target_size):
        """Extract edge features"""
        gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
        edges = cv2.Canny(gray, 50, 150)
        edges_3ch = cv2.cvtColor(edges, cv2.COLOR_GRAY2BGR)
        return cv2.resize(edges_3ch, target_size)
    
    def thermal_preprocessing(frame, target_size):
        """Simulate thermal processing"""
        hsv = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV)
        thermal_like = hsv[:, :, 2]  # Value channel
        thermal_3ch = cv2.cvtColor(thermal_like, cv2.COLOR_GRAY2BGR)
        return cv2.resize(thermal_3ch, target_size)
    
    def fusion_postprocessing(raw_output, **kwargs):
        """Fuse results from multiple modalities"""
        if raw_output.size > 0:
            current_prob = float(raw_output[0])
            rgb_conf = kwargs.get('rgb_conf', 0.5)
            edge_conf = kwargs.get('edge_conf', 0.5)
            
            # Weighted fusion
            fused_prob = (current_prob * 0.5) + (rgb_conf * 0.3) + (edge_conf * 0.2)
            
            return {
                'fused_probability': fused_prob,
                'modality_scores': {
                    'thermal': current_prob,
                    'rgb': rgb_conf,
                    'edge': edge_conf
                },
                'result': 'Fire Detected' if fused_prob > 0.6 else 'No Fire',
                'confidence': 'Very High' if fused_prob > 0.9 else 'High' if fused_prob > 0.7 else 'Medium'
            }
        return {'fused_probability': 0.0, 'result': 'No Fire'}
    
    # Define stages
    stages = [
        StageConfig("rgb_analysis", [28, 30], "fw_scpu.bin", "fw_ncpu.bin", "rgb_model.nef", True),
        StageConfig("edge_analysis", [32, 34], "fw_scpu.bin", "fw_ncpu.bin", "edge_model.nef", True,
                   input_preprocessor=PreProcessor(resize_fn=edge_preprocessing)),
        StageConfig("thermal_analysis", [36, 38], "fw_scpu.bin", "fw_ncpu.bin", "thermal_model.nef", True,
                   input_preprocessor=PreProcessor(resize_fn=thermal_preprocessing)),
        StageConfig("fusion", [40, 42], "fw_scpu.bin", "fw_ncpu.bin", "fusion_model.nef", True,
                   output_postprocessor=PostProcessor(process_fn=fusion_postprocessing))
    ]
    
    pipeline = InferencePipeline(stages, pipeline_name="MultiModalFireDetection")
    
    def handle_multimodal_result(pipeline_data):
        print(f"\n🔥 Multi-Modal Fire Detection Results:")
        for stage_id, result in pipeline_data.stage_results.items():
            if 'probability' in result:
                print(f"   {stage_id}: {result['result']} ({result['probability']:.3f})")
        
        if 'fusion' in pipeline_data.stage_results:
            fusion = pipeline_data.stage_results['fusion']
            print(f"   🎯 FINAL: {fusion['result']} (Fused: {fusion['fused_probability']:.3f})")
            print(f"   Confidence: {fusion.get('confidence', 'Unknown')}")
    
    pipeline.set_result_callback(handle_multimodal_result)
    
    # Start pipeline
    pipeline.initialize()
    pipeline.start()
    
    # Your processing logic here...

Example 3: Batch Processing

def process_image_batch(image_paths):
    """Process a batch of images through pipeline"""
    
    config = StageConfig(
        stage_id="batch_processing",
        port_ids=[28, 32],
        scpu_fw_path="fw_scpu.bin", 
        ncpu_fw_path="fw_ncpu.bin",
        model_path="batch_model.nef",
        upload_fw=True
    )
    
    pipeline = InferencePipeline([config])
    pipeline.initialize()
    pipeline.start()
    
    results = []
    
    def collect_result(pipeline_data):
        result = pipeline_data.stage_results["batch_processing"]
        results.append({
            'pipeline_id': pipeline_data.pipeline_id,
            'result': result,
            'processing_time': pipeline_data.metadata.get('total_processing_time', 0.0)
        })
    
    pipeline.set_result_callback(collect_result)
    
    # Submit all images
    for img_path in image_paths:
        image = cv2.imread(img_path)
        if image is not None:
            pipeline.put_data(image)
    
    # Wait for all results
    import time
    while len(results) < len(image_paths):
        time.sleep(0.1)
    
    pipeline.stop()
    return results

Configuration

StageConfig Parameters

StageConfig(
    stage_id="unique_stage_name",           # Required: Unique identifier
    port_ids=[28, 32],                      # Required: USB port IDs for dongles
    scpu_fw_path="fw_scpu.bin",            # Required: SCPU firmware path
    ncpu_fw_path="fw_ncpu.bin",            # Required: NCPU firmware path  
    model_path="model.nef",                 # Required: Model file path
    upload_fw=True,                         # Upload firmware on init
    max_queue_size=50,                      # Queue size limit
    input_preprocessor=None,                # Optional: Inter-stage preprocessing
    output_postprocessor=None,              # Optional: Inter-stage postprocessing
    stage_preprocessor=None,                # Optional: MultiDongle preprocessing
    stage_postprocessor=None                # Optional: MultiDongle postprocessing
)

Performance Tuning

# For high-throughput scenarios
config = StageConfig(
    stage_id="high_performance",
    port_ids=[28, 30, 32, 34],  # Use more dongles
    max_queue_size=100,          # Larger queues
    # ... other params
)

# For low-latency scenarios  
config = StageConfig(
    stage_id="low_latency",
    port_ids=[28, 32],
    max_queue_size=10,           # Smaller queues
    # ... other params
)

Statistics and Monitoring

# Enable statistics reporting
def print_stats(stats):
    print(f"\n📊 Pipeline Statistics:")
    print(f"   Input: {stats['pipeline_input_submitted']}")
    print(f"   Completed: {stats['pipeline_completed']}")
    print(f"   Success Rate: {stats['pipeline_completed']/max(stats['pipeline_input_submitted'], 1)*100:.1f}%")
    
    for stage_stat in stats['stage_statistics']:
        print(f"   Stage {stage_stat['stage_id']}: "
              f"Processed={stage_stat['processed_count']}, "
              f"AvgTime={stage_stat['avg_processing_time']:.3f}s")

pipeline.set_stats_callback(print_stats)
pipeline.start_stats_reporting(interval=5.0)  # Report every 5 seconds

Running Examples

The project includes comprehensive examples in test.py:

# Single-stage pipeline
uv run python test.py --example single

# Two-stage cascade pipeline
uv run python test.py --example cascade

# Complex multi-stage pipeline
uv run python test.py --example complex

API Reference

InferencePipeline

Main pipeline orchestrator class.

Methods:

  • initialize(): Initialize all pipeline stages
  • start(): Start pipeline processing threads
  • stop(): Gracefully stop pipeline
  • put_data(data, timeout=1.0): Submit data for processing
  • get_result(timeout=0.1): Get processed results
  • set_result_callback(callback): Set success callback
  • set_error_callback(callback): Set error callback
  • get_pipeline_statistics(): Get performance metrics

StageConfig

Configuration for individual pipeline stages.

PipelineData

Data structure flowing through pipeline stages.

Attributes:

  • data: Main data payload
  • metadata: Processing metadata
  • stage_results: Results from each stage
  • pipeline_id: Unique identifier
  • timestamp: Creation timestamp

Performance Considerations

  1. Queue Sizing: Balance memory usage vs. throughput with max_queue_size
  2. Dongle Distribution: Distribute dongles across stages for optimal performance
  3. Preprocessing: Minimize expensive operations in preprocessors
  4. Memory Management: Monitor queue sizes and processing times
  5. Threading: Pipeline uses multiple threads - ensure thread-safe operations

Troubleshooting

Common Issues

Pipeline hangs or stops processing:

  • Check dongle connections and firmware compatibility
  • Monitor queue sizes for bottlenecks
  • Verify model file paths and formats

High memory usage:

  • Reduce max_queue_size parameters
  • Ensure proper cleanup in custom processors
  • Monitor statistics for processing times

Poor performance:

  • Distribute dongles optimally across stages
  • Profile preprocessing/postprocessing functions
  • Consider batch processing for high throughput

Debug Mode

Enable detailed logging for troubleshooting:

import logging
logging.basicConfig(level=logging.DEBUG)

# Pipeline will output detailed processing information