diff --git a/CLAUDE.md b/CLAUDE.md deleted file mode 100644 index 113263e..0000000 --- a/CLAUDE.md +++ /dev/null @@ -1,191 +0,0 @@ -# CLAUDE.md - -This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository. - -## Project Overview - -**cluster4npu** is a high-performance multi-stage inference pipeline system for Kneron NPU dongles. The project enables flexible single-stage and cascaded multi-stage AI inference workflows optimized for real-time video processing and high-throughput scenarios. - -### Core Architecture - -- **InferencePipeline**: Main orchestrator managing multi-stage workflows with automatic queue management and thread coordination -- **MultiDongle**: Hardware abstraction layer for Kneron NPU devices (KL520, KL720, etc.) -- **StageConfig**: Configuration system for individual pipeline stages -- **PipelineData**: Data structure that flows through pipeline stages, accumulating results -- **PreProcessor/PostProcessor**: Flexible data transformation components for inter-stage processing - -### Key Design Patterns - -- **Producer-Consumer**: Each stage runs in separate threads with input/output queues -- **Pipeline Architecture**: Linear data flow through configurable stages with result accumulation -- **Hardware Abstraction**: MultiDongle encapsulates Kneron SDK complexity -- **Callback-Based**: Asynchronous result handling via configurable callbacks - -## Development Commands - -### Environment Setup -```bash -# Setup virtual environment with uv -uv venv -source .venv/bin/activate # Windows: .venv\Scripts\activate - -# Install dependencies -uv pip install -r requirements.txt -``` - -### Running Examples -```bash -# Single-stage pipeline -uv run python src/cluster4npu/test.py --example single - -# Two-stage cascade pipeline -uv run python src/cluster4npu/test.py --example cascade - -# Complex multi-stage pipeline -uv run python src/cluster4npu/test.py --example complex - -# Basic MultiDongle usage -uv run python src/cluster4npu/Multidongle.py - -# Complete UI application with full workflow -uv run python UI.py - -# UI integration examples -uv run python ui_integration_example.py - -# Test UI configuration system -uv run python ui_config.py -``` - -### UI Application Workflow -The UI.py provides a complete visual workflow: - -1. **Dashboard/Home** - Main entry point with recent files -2. **Pipeline Editor** - Visual node-based pipeline design -3. **Stage Configuration** - Dongle allocation and hardware setup -4. **Performance Estimation** - FPS calculations and optimization -5. **Save & Deploy** - Export configurations and cost estimation -6. **Monitoring & Management** - Real-time pipeline monitoring - -```bash -# Access different workflow stages directly: -# 1. Create new pipeline → Pipeline Editor -# 2. Configure Stages & Deploy → Stage Configuration -# 3. Pipeline menu → Performance Analysis → Performance Panel -# 4. Pipeline menu → Deploy Pipeline → Save & Deploy Dialog -``` - -### Testing -```bash -# Run pipeline tests -uv run python test_pipeline.py - -# Test MultiDongle functionality -uv run python src/cluster4npu/test.py -``` - -## Hardware Requirements - -- **Kneron NPU dongles**: KL520, KL720, etc. -- **Firmware files**: `fw_scpu.bin`, `fw_ncpu.bin` -- **Models**: `.nef` format files -- **USB ports**: Multiple ports required for multi-dongle setups - -## Critical Implementation Notes - -### Pipeline Configuration -- Each stage requires unique `stage_id` and dedicated `port_ids` -- Queue sizes (`max_queue_size`) must be balanced between memory usage and throughput -- Stages process sequentially - output from stage N becomes input to stage N+1 - -### Thread Safety -- All pipeline operations are thread-safe -- Each stage runs in isolated worker threads -- Use callbacks for result handling, not direct queue access - -### Data Flow -``` -Input → Stage1 → Stage2 → ... → StageN → Output - ↓ ↓ ↓ ↓ - Queue Process Process Result - + Results + Results Callback -``` - -### Hardware Management -- Always call `initialize()` before `start()` -- Always call `stop()` for clean shutdown -- Firmware upload (`upload_fw=True`) only needed once per session -- Port IDs must match actual USB connections - -### Error Handling -- Pipeline continues on individual stage errors -- Failed stages return error results rather than blocking -- Comprehensive statistics available via `get_pipeline_statistics()` - -## UI Application Architecture - -### Complete Workflow Components - -- **DashboardLogin**: Main entry point with project management -- **PipelineEditor**: Node-based visual pipeline design using NodeGraphQt -- **StageConfigurationDialog**: Hardware allocation and dongle assignment -- **PerformanceEstimationPanel**: Real-time performance analysis and optimization -- **SaveDeployDialog**: Export configurations and deployment cost estimation -- **MonitoringDashboard**: Live pipeline monitoring and cluster management - -### UI Integration System - -- **ui_config.py**: Configuration management and UI/core integration -- **ui_integration_example.py**: Demonstrates conversion from UI to core tools -- **UIIntegration class**: Bridges UI configurations to InferencePipeline - -### Key UI Features - -- **Auto-dongle allocation**: Smart assignment of dongles to pipeline stages -- **Performance estimation**: Real-time FPS and latency calculations -- **Cost analysis**: Hardware and operational cost projections -- **Export formats**: Python scripts, JSON configs, YAML, Docker containers -- **Live monitoring**: Real-time metrics and cluster scaling controls - -## Code Patterns - -### Basic Pipeline Setup -```python -config = StageConfig( - stage_id="unique_name", - port_ids=[28, 32], - scpu_fw_path="fw_scpu.bin", - ncpu_fw_path="fw_ncpu.bin", - model_path="model.nef", - upload_fw=True -) - -pipeline = InferencePipeline([config]) -pipeline.initialize() -pipeline.start() -pipeline.set_result_callback(callback_func) -# ... processing ... -pipeline.stop() -``` - -### Inter-Stage Processing -```python -# Custom preprocessing for stage input -preprocessor = PreProcessor(resize_fn=custom_resize_func) - -# Custom postprocessing for stage output -postprocessor = PostProcessor(process_fn=custom_process_func) - -config = StageConfig( - # ... basic config ... - input_preprocessor=preprocessor, - output_postprocessor=postprocessor -) -``` - -## Performance Considerations - -- **Queue Sizing**: Smaller queues = lower latency, larger queues = higher throughput -- **Dongle Distribution**: Spread dongles across stages for optimal parallelization -- **Processing Functions**: Keep preprocessors/postprocessors lightweight -- **Memory Management**: Monitor queue sizes to prevent memory buildup \ No newline at end of file diff --git a/Flowchart.jpg b/Flowchart.jpg deleted file mode 100644 index 3c27e39..0000000 Binary files a/Flowchart.jpg and /dev/null differ diff --git a/README.md b/README.md deleted file mode 100644 index 7fc7d6e..0000000 --- a/README.md +++ /dev/null @@ -1,488 +0,0 @@ -# InferencePipeline - -A high-performance multi-stage inference pipeline system designed for Kneron NPU dongles, enabling flexible single-stage and cascaded multi-stage AI inference workflows. - - - -## Installation - -This project uses [uv](https://github.com/astral-sh/uv) for fast Python package management. - -```bash -# Install uv if you haven't already -curl -LsSf https://astral.sh/uv/install.sh | sh - -# Create and activate virtual environment -uv venv -source .venv/bin/activate # On Windows: .venv\Scripts\activate - -# Install dependencies -uv pip install -r requirements.txt -``` - -### Requirements - -```txt -"numpy>=2.2.6", -"opencv-python>=4.11.0.86", -``` - -### Hardware Requirements - -- Kneron AI dongles (KL520, KL720, etc.) -- USB ports for device connections -- Compatible firmware files (`fw_scpu.bin`, `fw_ncpu.bin`) -- Trained model files (`.nef` format) - -## Quick Start - -### Single-Stage Pipeline - -Replace your existing MultiDongle usage with InferencePipeline for enhanced features: - -```python -from InferencePipeline import InferencePipeline, StageConfig - -# Configure single stage -stage_config = StageConfig( - stage_id="fire_detection", - port_ids=[28, 32], # USB port IDs for your dongles - scpu_fw_path="fw_scpu.bin", - ncpu_fw_path="fw_ncpu.bin", - model_path="fire_detection_520.nef", - upload_fw=True -) - -# Create and start pipeline -pipeline = InferencePipeline([stage_config], pipeline_name="FireDetection") -pipeline.initialize() -pipeline.start() - -# Set up result callback -def handle_result(pipeline_data): - result = pipeline_data.stage_results.get("fire_detection", {}) - print(f"🔥 Detection: {result.get('result', 'Unknown')} " - f"(Probability: {result.get('probability', 0.0):.3f})") - -pipeline.set_result_callback(handle_result) - -# Process frames -import cv2 -cap = cv2.VideoCapture(0) - -try: - while True: - ret, frame = cap.read() - if ret: - pipeline.put_data(frame) - if cv2.waitKey(1) & 0xFF == ord('q'): - break -finally: - cap.release() - pipeline.stop() -``` - -### Multi-Stage Cascade Pipeline - -Chain multiple models for complex workflows: - -```python -from InferencePipeline import InferencePipeline, StageConfig -from Multidongle import PreProcessor, PostProcessor - -# Custom preprocessing for second stage -def roi_extraction(frame, target_size): - """Extract region of interest from detection results""" - # Extract center region as example - h, w = frame.shape[:2] - center_crop = frame[h//4:3*h//4, w//4:3*w//4] - return cv2.resize(center_crop, target_size) - -# Custom result fusion -def combine_results(raw_output, **kwargs): - """Combine detection + classification results""" - classification_prob = float(raw_output[0]) if raw_output.size > 0 else 0.0 - detection_conf = kwargs.get('detection_conf', 0.5) - - # Weighted combination - combined_score = (classification_prob * 0.7) + (detection_conf * 0.3) - - return { - 'combined_probability': combined_score, - 'classification_prob': classification_prob, - 'detection_conf': detection_conf, - 'result': 'Fire Detected' if combined_score > 0.6 else 'No Fire', - 'confidence': 'High' if combined_score > 0.8 else 'Low' - } - -# Stage 1: Object Detection -detection_stage = StageConfig( - stage_id="object_detection", - port_ids=[28, 30], - scpu_fw_path="fw_scpu.bin", - ncpu_fw_path="fw_ncpu.bin", - model_path="object_detection_520.nef", - upload_fw=True -) - -# Stage 2: Fire Classification with preprocessing -classification_stage = StageConfig( - stage_id="fire_classification", - port_ids=[32, 34], - scpu_fw_path="fw_scpu.bin", - ncpu_fw_path="fw_ncpu.bin", - model_path="fire_classification_520.nef", - upload_fw=True, - input_preprocessor=PreProcessor(resize_fn=roi_extraction), - output_postprocessor=PostProcessor(process_fn=combine_results) -) - -# Create two-stage pipeline -pipeline = InferencePipeline( - [detection_stage, classification_stage], - pipeline_name="DetectionClassificationCascade" -) - -# Enhanced result handler -def handle_cascade_result(pipeline_data): - detection = pipeline_data.stage_results.get("object_detection", {}) - classification = pipeline_data.stage_results.get("fire_classification", {}) - - print(f"🎯 Detection: {detection.get('result', 'Unknown')} " - f"(Conf: {detection.get('probability', 0.0):.3f})") - print(f"🔥 Classification: {classification.get('result', 'Unknown')} " - f"(Combined: {classification.get('combined_probability', 0.0):.3f})") - print(f"⏱️ Processing Time: {pipeline_data.metadata.get('total_processing_time', 0.0):.3f}s") - print("-" * 50) - -pipeline.set_result_callback(handle_cascade_result) -pipeline.initialize() -pipeline.start() - -# Your processing loop here... -``` - -## Usage Examples - -### Example 1: Real-time Webcam Processing - -```python -from InferencePipeline import InferencePipeline, StageConfig -from Multidongle import WebcamSource - -def run_realtime_detection(): - # Configure pipeline - config = StageConfig( - stage_id="realtime_detection", - port_ids=[28, 32], - scpu_fw_path="fw_scpu.bin", - ncpu_fw_path="fw_ncpu.bin", - model_path="your_model.nef", - upload_fw=True, - max_queue_size=30 # Prevent memory buildup - ) - - pipeline = InferencePipeline([config]) - pipeline.initialize() - pipeline.start() - - # Use webcam source - source = WebcamSource(camera_id=0) - source.start() - - def display_results(pipeline_data): - result = pipeline_data.stage_results["realtime_detection"] - probability = result.get('probability', 0.0) - detection = result.get('result', 'Unknown') - - # Your visualization logic here - print(f"Detection: {detection} ({probability:.3f})") - - pipeline.set_result_callback(display_results) - - try: - while True: - frame = source.get_frame() - if frame is not None: - pipeline.put_data(frame) - time.sleep(0.033) # ~30 FPS - except KeyboardInterrupt: - print("Stopping...") - finally: - source.stop() - pipeline.stop() - -if __name__ == "__main__": - run_realtime_detection() -``` - -### Example 2: Complex Multi-Modal Pipeline - -```python -def run_multimodal_pipeline(): - """Multi-modal fire detection with RGB, edge, and thermal-like analysis""" - - def edge_preprocessing(frame, target_size): - """Extract edge features""" - gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) - edges = cv2.Canny(gray, 50, 150) - edges_3ch = cv2.cvtColor(edges, cv2.COLOR_GRAY2BGR) - return cv2.resize(edges_3ch, target_size) - - def thermal_preprocessing(frame, target_size): - """Simulate thermal processing""" - hsv = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV) - thermal_like = hsv[:, :, 2] # Value channel - thermal_3ch = cv2.cvtColor(thermal_like, cv2.COLOR_GRAY2BGR) - return cv2.resize(thermal_3ch, target_size) - - def fusion_postprocessing(raw_output, **kwargs): - """Fuse results from multiple modalities""" - if raw_output.size > 0: - current_prob = float(raw_output[0]) - rgb_conf = kwargs.get('rgb_conf', 0.5) - edge_conf = kwargs.get('edge_conf', 0.5) - - # Weighted fusion - fused_prob = (current_prob * 0.5) + (rgb_conf * 0.3) + (edge_conf * 0.2) - - return { - 'fused_probability': fused_prob, - 'modality_scores': { - 'thermal': current_prob, - 'rgb': rgb_conf, - 'edge': edge_conf - }, - 'result': 'Fire Detected' if fused_prob > 0.6 else 'No Fire', - 'confidence': 'Very High' if fused_prob > 0.9 else 'High' if fused_prob > 0.7 else 'Medium' - } - return {'fused_probability': 0.0, 'result': 'No Fire'} - - # Define stages - stages = [ - StageConfig("rgb_analysis", [28, 30], "fw_scpu.bin", "fw_ncpu.bin", "rgb_model.nef", True), - StageConfig("edge_analysis", [32, 34], "fw_scpu.bin", "fw_ncpu.bin", "edge_model.nef", True, - input_preprocessor=PreProcessor(resize_fn=edge_preprocessing)), - StageConfig("thermal_analysis", [36, 38], "fw_scpu.bin", "fw_ncpu.bin", "thermal_model.nef", True, - input_preprocessor=PreProcessor(resize_fn=thermal_preprocessing)), - StageConfig("fusion", [40, 42], "fw_scpu.bin", "fw_ncpu.bin", "fusion_model.nef", True, - output_postprocessor=PostProcessor(process_fn=fusion_postprocessing)) - ] - - pipeline = InferencePipeline(stages, pipeline_name="MultiModalFireDetection") - - def handle_multimodal_result(pipeline_data): - print(f"\n🔥 Multi-Modal Fire Detection Results:") - for stage_id, result in pipeline_data.stage_results.items(): - if 'probability' in result: - print(f" {stage_id}: {result['result']} ({result['probability']:.3f})") - - if 'fusion' in pipeline_data.stage_results: - fusion = pipeline_data.stage_results['fusion'] - print(f" 🎯 FINAL: {fusion['result']} (Fused: {fusion['fused_probability']:.3f})") - print(f" Confidence: {fusion.get('confidence', 'Unknown')}") - - pipeline.set_result_callback(handle_multimodal_result) - - # Start pipeline - pipeline.initialize() - pipeline.start() - - # Your processing logic here... -``` - -### Example 3: Batch Processing - -```python -def process_image_batch(image_paths): - """Process a batch of images through pipeline""" - - config = StageConfig( - stage_id="batch_processing", - port_ids=[28, 32], - scpu_fw_path="fw_scpu.bin", - ncpu_fw_path="fw_ncpu.bin", - model_path="batch_model.nef", - upload_fw=True - ) - - pipeline = InferencePipeline([config]) - pipeline.initialize() - pipeline.start() - - results = [] - - def collect_result(pipeline_data): - result = pipeline_data.stage_results["batch_processing"] - results.append({ - 'pipeline_id': pipeline_data.pipeline_id, - 'result': result, - 'processing_time': pipeline_data.metadata.get('total_processing_time', 0.0) - }) - - pipeline.set_result_callback(collect_result) - - # Submit all images - for img_path in image_paths: - image = cv2.imread(img_path) - if image is not None: - pipeline.put_data(image) - - # Wait for all results - import time - while len(results) < len(image_paths): - time.sleep(0.1) - - pipeline.stop() - return results -``` - -## Configuration - -### StageConfig Parameters - -```python -StageConfig( - stage_id="unique_stage_name", # Required: Unique identifier - port_ids=[28, 32], # Required: USB port IDs for dongles - scpu_fw_path="fw_scpu.bin", # Required: SCPU firmware path - ncpu_fw_path="fw_ncpu.bin", # Required: NCPU firmware path - model_path="model.nef", # Required: Model file path - upload_fw=True, # Upload firmware on init - max_queue_size=50, # Queue size limit - input_preprocessor=None, # Optional: Inter-stage preprocessing - output_postprocessor=None, # Optional: Inter-stage postprocessing - stage_preprocessor=None, # Optional: MultiDongle preprocessing - stage_postprocessor=None # Optional: MultiDongle postprocessing -) -``` - -### Performance Tuning - -```python -# For high-throughput scenarios -config = StageConfig( - stage_id="high_performance", - port_ids=[28, 30, 32, 34], # Use more dongles - max_queue_size=100, # Larger queues - # ... other params -) - -# For low-latency scenarios -config = StageConfig( - stage_id="low_latency", - port_ids=[28, 32], - max_queue_size=10, # Smaller queues - # ... other params -) -``` - -## Statistics and Monitoring - -```python -# Enable statistics reporting -def print_stats(stats): - print(f"\n📊 Pipeline Statistics:") - print(f" Input: {stats['pipeline_input_submitted']}") - print(f" Completed: {stats['pipeline_completed']}") - print(f" Success Rate: {stats['pipeline_completed']/max(stats['pipeline_input_submitted'], 1)*100:.1f}%") - - for stage_stat in stats['stage_statistics']: - print(f" Stage {stage_stat['stage_id']}: " - f"Processed={stage_stat['processed_count']}, " - f"AvgTime={stage_stat['avg_processing_time']:.3f}s") - -pipeline.set_stats_callback(print_stats) -pipeline.start_stats_reporting(interval=5.0) # Report every 5 seconds -``` - -## Running Examples - -The project includes comprehensive examples in `test.py`: - -```bash -# Single-stage pipeline -uv run python test.py --example single - -# Two-stage cascade pipeline -uv run python test.py --example cascade - -# Complex multi-stage pipeline -uv run python test.py --example complex -``` - -## API Reference - -### InferencePipeline - -Main pipeline orchestrator class. - -**Methods:** -- `initialize()`: Initialize all pipeline stages -- `start()`: Start pipeline processing threads -- `stop()`: Gracefully stop pipeline -- `put_data(data, timeout=1.0)`: Submit data for processing -- `get_result(timeout=0.1)`: Get processed results -- `set_result_callback(callback)`: Set success callback -- `set_error_callback(callback)`: Set error callback -- `get_pipeline_statistics()`: Get performance metrics - -### StageConfig - -Configuration for individual pipeline stages. - -### PipelineData - -Data structure flowing through pipeline stages. - -**Attributes:** -- `data`: Main data payload -- `metadata`: Processing metadata -- `stage_results`: Results from each stage -- `pipeline_id`: Unique identifier -- `timestamp`: Creation timestamp - -## Performance Considerations - -1. **Queue Sizing**: Balance memory usage vs. throughput with `max_queue_size` -2. **Dongle Distribution**: Distribute dongles across stages for optimal performance -3. **Preprocessing**: Minimize expensive operations in preprocessors -4. **Memory Management**: Monitor queue sizes and processing times -5. **Threading**: Pipeline uses multiple threads - ensure thread-safe operations - -## Troubleshooting - -### Common Issues - -**Pipeline hangs or stops processing:** -- Check dongle connections and firmware compatibility -- Monitor queue sizes for bottlenecks -- Verify model file paths and formats - -**High memory usage:** -- Reduce `max_queue_size` parameters -- Ensure proper cleanup in custom processors -- Monitor statistics for processing times - -**Poor performance:** -- Distribute dongles optimally across stages -- Profile preprocessing/postprocessing functions -- Consider batch processing for high throughput - -### Debug Mode - -Enable detailed logging for troubleshooting: - -```python -import logging -logging.basicConfig(level=logging.DEBUG) - -# Pipeline will output detailed processing information -``` \ No newline at end of file diff --git a/multidongle.py b/multidongle.py deleted file mode 100644 index 983cfaf..0000000 --- a/multidongle.py +++ /dev/null @@ -1,521 +0,0 @@ -from typing import Union, Tuple -import os -import sys -import argparse -import time -import threading -import queue -import numpy as np -import kp -import cv2 -import time - -class MultiDongle: - # Curently, only BGR565, RGB8888, YUYV, and RAW8 formats are supported - _FORMAT_MAPPING = { - 'BGR565': kp.ImageFormat.KP_IMAGE_FORMAT_RGB565, - 'RGB8888': kp.ImageFormat.KP_IMAGE_FORMAT_RGBA8888, - 'YUYV': kp.ImageFormat.KP_IMAGE_FORMAT_YUYV, - 'RAW8': kp.ImageFormat.KP_IMAGE_FORMAT_RAW8, - # 'YCBCR422_CRY1CBY0': kp.ImageFormat.KP_IMAGE_FORMAT_YCBCR422_CRY1CBY0, - # 'YCBCR422_CBY1CRY0': kp.ImageFormat.KP_IMAGE_FORMAT_CBY1CRY0, - # 'YCBCR422_Y1CRY0CB': kp.ImageFormat.KP_IMAGE_FORMAT_Y1CRY0CB, - # 'YCBCR422_Y1CBY0CR': kp.ImageFormat.KP_IMAGE_FORMAT_Y1CBY0CR, - # 'YCBCR422_CRY0CBY1': kp.ImageFormat.KP_IMAGE_FORMAT_CRY0CBY1, - # 'YCBCR422_CBY0CRY1': kp.ImageFormat.KP_IMAGE_FORMAT_CBY0CRY1, - # 'YCBCR422_Y0CRY1CB': kp.ImageFormat.KP_IMAGE_FORMAT_Y0CRY1CB, - # 'YCBCR422_Y0CBY1CR': kp.ImageFormat.KP_IMAGE_FORMAT_Y0CBY1CR, - } - - def __init__(self, port_id: list, scpu_fw_path: str, ncpu_fw_path: str, model_path: str, upload_fw: bool = False): - """ - Initialize the MultiDongle class. - :param port_id: List of USB port IDs for the same layer's devices. - :param scpu_fw_path: Path to the SCPU firmware file. - :param ncpu_fw_path: Path to the NCPU firmware file. - :param model_path: Path to the model file. - :param upload_fw: Flag to indicate whether to upload firmware. - """ - self.port_id = port_id - self.upload_fw = upload_fw - - # Check if the firmware is needed - if self.upload_fw: - self.scpu_fw_path = scpu_fw_path - self.ncpu_fw_path = ncpu_fw_path - - self.model_path = model_path - self.device_group = None - - # generic_inference_input_descriptor will be prepared in initialize - self.model_nef_descriptor = None - self.generic_inference_input_descriptor = None - # Queues for data - # Input queue for images to be sent - self._input_queue = queue.Queue() - # Output queue for received results - self._output_queue = queue.Queue() - - # Threading attributes - self._send_thread = None - self._receive_thread = None - self._stop_event = threading.Event() # Event to signal threads to stop - - self._inference_counter = 0 - - def initialize(self): - """ - Connect devices, upload firmware (if upload_fw is True), and upload model. - Must be called before start(). - """ - # Connect device and assign to self.device_group - try: - print('[Connect Device]') - self.device_group = kp.core.connect_devices(usb_port_ids=self.port_id) - print(' - Success') - except kp.ApiKPException as exception: - print('Error: connect device fail, port ID = \'{}\', error msg: [{}]'.format(self.port_id, str(exception))) - sys.exit(1) - - # setting timeout of the usb communication with the device - # print('[Set Device Timeout]') - # kp.core.set_timeout(device_group=self.device_group, milliseconds=5000) - # print(' - Success') - - if self.upload_fw: - try: - print('[Upload Firmware]') - kp.core.load_firmware_from_file(device_group=self.device_group, - scpu_fw_path=self.scpu_fw_path, - ncpu_fw_path=self.ncpu_fw_path) - print(' - Success') - except kp.ApiKPException as exception: - print('Error: upload firmware failed, error = \'{}\''.format(str(exception))) - sys.exit(1) - - # upload model to device - try: - print('[Upload Model]') - self.model_nef_descriptor = kp.core.load_model_from_file(device_group=self.device_group, - file_path=self.model_path) - print(' - Success') - except kp.ApiKPException as exception: - print('Error: upload model failed, error = \'{}\''.format(str(exception))) - sys.exit(1) - - # Extract model input dimensions automatically from model metadata - if self.model_nef_descriptor and self.model_nef_descriptor.models: - model = self.model_nef_descriptor.models[0] - if hasattr(model, 'input_nodes') and model.input_nodes: - input_node = model.input_nodes[0] - # From your JSON: "shape_npu": [1, 3, 128, 128] -> (width, height) - shape = input_node.tensor_shape_info.data.shape_npu - self.model_input_shape = (shape[3], shape[2]) # (width, height) - self.model_input_channels = shape[1] # 3 for RGB - print(f"Model input shape detected: {self.model_input_shape}, channels: {self.model_input_channels}") - else: - self.model_input_shape = (128, 128) # fallback - self.model_input_channels = 3 - print("Using default input shape (128, 128)") - else: - self.model_input_shape = (128, 128) - self.model_input_channels = 3 - print("Model info not available, using default shape") - - # Prepare generic inference input descriptor after model is loaded - if self.model_nef_descriptor: - self.generic_inference_input_descriptor = kp.GenericImageInferenceDescriptor( - model_id=self.model_nef_descriptor.models[0].id, - ) - else: - print("Warning: Could not get generic inference input descriptor from model.") - self.generic_inference_input_descriptor = None - - def preprocess_frame(self, frame: np.ndarray, target_format: str = 'BGR565') -> np.ndarray: - """ - Preprocess frame for inference - """ - resized_frame = cv2.resize(frame, self.model_input_shape) - - if target_format == 'BGR565': - return cv2.cvtColor(resized_frame, cv2.COLOR_BGR2BGR565) - elif target_format == 'RGB8888': - return cv2.cvtColor(resized_frame, cv2.COLOR_BGR2RGBA) - elif target_format == 'YUYV': - return cv2.cvtColor(resized_frame, cv2.COLOR_BGR2YUV_YUYV) - else: - return resized_frame # RAW8 or other formats - - def get_latest_inference_result(self, timeout: float = 0.01) -> Tuple[float, str]: - """ - Get the latest inference result - Returns: (probability, result_string) or (None, None) if no result - """ - output_descriptor = self.get_output(timeout=timeout) - if not output_descriptor: - return None, None - - # Process the output descriptor - if hasattr(output_descriptor, 'header') and \ - hasattr(output_descriptor.header, 'num_output_node') and \ - hasattr(output_descriptor.header, 'inference_number'): - - inf_node_output_list = [] - retrieval_successful = True - - for node_idx in range(output_descriptor.header.num_output_node): - try: - inference_float_node_output = kp.inference.generic_inference_retrieve_float_node( - node_idx=node_idx, - generic_raw_result=output_descriptor, - channels_ordering=kp.ChannelOrdering.KP_CHANNEL_ORDERING_CHW - ) - inf_node_output_list.append(inference_float_node_output.ndarray.copy()) - except kp.ApiKPException as e: - retrieval_successful = False - break - except Exception as e: - retrieval_successful = False - break - - if retrieval_successful and inf_node_output_list: - # Process output nodes - if output_descriptor.header.num_output_node == 1: - raw_output_array = inf_node_output_list[0].flatten() - else: - concatenated_outputs = [arr.flatten() for arr in inf_node_output_list] - raw_output_array = np.concatenate(concatenated_outputs) if concatenated_outputs else np.array([]) - - if raw_output_array.size > 0: - probability = postprocess(raw_output_array) - result_str = "Fire" if probability > 0.5 else "No Fire" - return probability, result_str - - return None, None - - - # Modified _send_thread_func to get data from input queue - def _send_thread_func(self): - """Internal function run by the send thread, gets images from input queue.""" - print("Send thread started.") - while not self._stop_event.is_set(): - if self.generic_inference_input_descriptor is None: - # Wait for descriptor to be ready or stop - self._stop_event.wait(0.1) # Avoid busy waiting - continue - - try: - # Get image and format from the input queue - # Blocks until an item is available or stop event is set/timeout occurs - try: - # Use get with timeout or check stop event in a loop - # This pattern allows thread to check stop event while waiting on queue - item = self._input_queue.get(block=True, timeout=0.1) - # Check if this is our sentinel value - if item is None: - continue - - # Now safely unpack the tuple - image_data, image_format_enum = item - except queue.Empty: - # If queue is empty after timeout, check stop event and continue loop - continue - - # Configure and send the image - self._inference_counter += 1 # Increment counter for each image - self.generic_inference_input_descriptor.inference_number = self._inference_counter - self.generic_inference_input_descriptor.input_node_image_list = [kp.GenericInputNodeImage( - image=image_data, - image_format=image_format_enum, # Use the format from the queue - resize_mode=kp.ResizeMode.KP_RESIZE_ENABLE, - padding_mode=kp.PaddingMode.KP_PADDING_CORNER, - normalize_mode=kp.NormalizeMode.KP_NORMALIZE_KNERON - )] - - kp.inference.generic_image_inference_send(device_group=self.device_group, - generic_inference_input_descriptor=self.generic_inference_input_descriptor) - # print("Image sent.") # Optional: add log - # No need for sleep here usually, as queue.get is blocking - except kp.ApiKPException as exception: - print(f' - Error in send thread: inference send failed, error = {exception}') - self._stop_event.set() # Signal other thread to stop - except Exception as e: - print(f' - Unexpected error in send thread: {e}') - self._stop_event.set() - - print("Send thread stopped.") - - # _receive_thread_func remains the same - def _receive_thread_func(self): - """Internal function run by the receive thread, puts results into output queue.""" - print("Receive thread started.") - while not self._stop_event.is_set(): - try: - generic_inference_output_descriptor = kp.inference.generic_image_inference_receive(device_group=self.device_group) - self._output_queue.put(generic_inference_output_descriptor) - except kp.ApiKPException as exception: - if not self._stop_event.is_set(): # Avoid printing error if we are already stopping - print(f' - Error in receive thread: inference receive failed, error = {exception}') - self._stop_event.set() - except Exception as e: - print(f' - Unexpected error in receive thread: {e}') - self._stop_event.set() - - print("Receive thread stopped.") - - # start method signature changed (no image/format parameters) - def start(self): - """ - Start the send and receive threads. - Must be called after initialize(). - """ - if self.device_group is None: - raise RuntimeError("MultiDongle not initialized. Call initialize() first.") - - if self._send_thread is None or not self._send_thread.is_alive(): - self._stop_event.clear() # Clear stop event for a new start - self._send_thread = threading.Thread(target=self._send_thread_func, daemon=True) - self._send_thread.start() - print("Send thread started.") - - if self._receive_thread is None or not self._receive_thread.is_alive(): - self._receive_thread = threading.Thread(target=self._receive_thread_func, daemon=True) - self._receive_thread.start() - print("Receive thread started.") - - # stop method remains the same - # def stop(self): - # """ - # Signal the threads to stop and wait for them to finish. - # """ - # print("Stopping threads...") - # self._stop_event.set() # Signal stop - - # # Put a dummy item in the input queue to unblock the send thread if it's waiting - # try: - # self._input_queue.put(None) - # except Exception as e: - # print(f"Error putting dummy item in input queue: {e}") - - # if self._send_thread and self._send_thread.is_alive(): - # self._send_thread.join() - # print("Send thread joined.") - - # if self._receive_thread and self._receive_thread.is_alive(): - # # DON'T disconnect the device group unless absolutely necessary - # # Instead, use a timeout and warning - # self._receive_thread.join(timeout=5) - # if self._receive_thread.is_alive(): - # print("Warning: Receive thread did not join within timeout. It might be blocked.") - - # # Only disconnect as a last resort for stuck threads - # if self.device_group: - # try: - # print("Thread stuck - disconnecting device group as last resort...") - # kp.core.disconnect_devices(device_group=self.device_group) - # # IMPORTANT: Re-connect immediately to keep device available - # self.device_group = kp.core.connect_devices(usb_port_ids=self.port_id) - # print("Device group reconnected.") - # except Exception as e: - # print(f"Error during device reconnect: {e}") - # self.device_group = None # Only set to None if reconnect fails - # else: - # print("Receive thread joined.") - - # print("Threads stopped.") - - def stop(self): - """Improved stop method with better cleanup""" - if self._stop_event.is_set(): - return # Already stopping - - print("Stopping threads...") - self._stop_event.set() - - # Clear queues to unblock threads - while not self._input_queue.empty(): - try: - self._input_queue.get_nowait() - except queue.Empty: - break - - # Signal send thread to wake up - self._input_queue.put(None) - - # Join threads with timeout - for thread, name in [(self._send_thread, "Send"), (self._receive_thread, "Receive")]: - if thread and thread.is_alive(): - thread.join(timeout=2.0) - if thread.is_alive(): - print(f"Warning: {name} thread didn't stop cleanly") - - def put_input(self, image: Union[str, np.ndarray], format: str, target_size: Tuple[int, int] = None): - """ - Put an image into the input queue with flexible preprocessing - """ - if isinstance(image, str): - image_data = cv2.imread(image) - if image_data is None: - raise FileNotFoundError(f"Image file not found at {image}") - if target_size: - image_data = cv2.resize(image_data, target_size) - elif isinstance(image, np.ndarray): - # Don't modify original array, make copy if needed - image_data = image.copy() if target_size is None else cv2.resize(image, target_size) - else: - raise ValueError("Image must be a file path (str) or a numpy array (ndarray).") - - if format in self._FORMAT_MAPPING: - image_format_enum = self._FORMAT_MAPPING[format] - else: - raise ValueError(f"Unsupported format: {format}") - - self._input_queue.put((image_data, image_format_enum)) - - def get_output(self, timeout: float = None): - """ - Get the next received data from the output queue. - This method is non-blocking by default unless a timeout is specified. - :param timeout: Time in seconds to wait for data. If None, it's non-blocking. - :return: Received data (e.g., kp.GenericInferenceOutputDescriptor) or None if no data available within timeout. - """ - try: - return self._output_queue.get(block=timeout is not None, timeout=timeout) - except queue.Empty: - return None - - def __del__(self): - """Ensure resources are released when the object is garbage collected.""" - self.stop() - if self.device_group: - try: - kp.core.disconnect_devices(device_group=self.device_group) - print("Device group disconnected in destructor.") - except Exception as e: - print(f"Error disconnecting device group in destructor: {e}") - -def postprocess(raw_model_output: list) -> float: - """ - Post-processes the raw model output. - Assumes the model output is a list/array where the first element is the desired probability. - """ - if raw_model_output and len(raw_model_output) > 0: - probability = raw_model_output[0] - return float(probability) - return 0.0 # Default or error value - -class WebcamInferenceRunner: - def __init__(self, multidongle: MultiDongle, image_format: str = 'BGR565'): - self.multidongle = multidongle - self.image_format = image_format - self.latest_probability = 0.0 - self.result_str = "No Fire" - - # Statistics tracking - self.processed_inference_count = 0 - self.inference_fps_start_time = None - self.display_fps_start_time = None - self.display_frame_counter = 0 - - def run(self, camera_id: int = 0): - cap = cv2.VideoCapture(camera_id) - if not cap.isOpened(): - raise RuntimeError("Cannot open webcam") - - try: - while True: - ret, frame = cap.read() - if not ret: - break - - # Track display FPS - if self.display_fps_start_time is None: - self.display_fps_start_time = time.time() - self.display_frame_counter += 1 - - # Preprocess and send frame - processed_frame = self.multidongle.preprocess_frame(frame, self.image_format) - self.multidongle.put_input(processed_frame, self.image_format) - - # Get inference result - prob, result = self.multidongle.get_latest_inference_result() - if prob is not None: - # Track inference FPS - if self.inference_fps_start_time is None: - self.inference_fps_start_time = time.time() - self.processed_inference_count += 1 - - self.latest_probability = prob - self.result_str = result - - # Display frame with results - self._display_results(frame) - - if cv2.waitKey(1) & 0xFF == ord('q'): - break - - finally: - # self._print_statistics() - cap.release() - cv2.destroyAllWindows() - - def _display_results(self, frame): - display_frame = frame.copy() - text_color = (0, 255, 0) if "Fire" in self.result_str else (0, 0, 255) - - # Display inference result - cv2.putText(display_frame, f"{self.result_str} (Prob: {self.latest_probability:.2f})", - (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, text_color, 2) - - # Calculate and display inference FPS - if self.inference_fps_start_time and self.processed_inference_count > 0: - elapsed_time = time.time() - self.inference_fps_start_time - if elapsed_time > 0: - inference_fps = self.processed_inference_count / elapsed_time - cv2.putText(display_frame, f"Inference FPS: {inference_fps:.2f}", - (10, 60), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 255), 2) - - cv2.imshow('Fire Detection', display_frame) - - # def _print_statistics(self): - # """Print final statistics""" - # print(f"\n--- Summary ---") - # print(f"Total inferences processed: {self.processed_inference_count}") - - # if self.inference_fps_start_time and self.processed_inference_count > 0: - # elapsed = time.time() - self.inference_fps_start_time - # if elapsed > 0: - # avg_inference_fps = self.processed_inference_count / elapsed - # print(f"Average Inference FPS: {avg_inference_fps:.2f}") - - # if self.display_fps_start_time and self.display_frame_counter > 0: - # elapsed = time.time() - self.display_fps_start_time - # if elapsed > 0: - # avg_display_fps = self.display_frame_counter / elapsed - # print(f"Average Display FPS: {avg_display_fps:.2f}") - -if __name__ == "_main_": - PORT_IDS = [28, 32] - SCPU_FW = r'fw_scpu.bin' - NCPU_FW = r'fw_ncpu.bin' - MODEL_PATH = r'fire_detection_520.nef' - - try: - # Initialize inference engine - print("Initializing MultiDongle...") - multidongle = MultiDongle(PORT_IDS, SCPU_FW, NCPU_FW, MODEL_PATH, upload_fw=True) - multidongle.initialize() - multidongle.start() - - # Run using the new runner class - print("Starting webcam inference...") - runner = WebcamInferenceRunner(multidongle, 'BGR565') - runner.run() - - except Exception as e: - print(f"Error: {e}") - import traceback - traceback.print_exc() - finally: - if 'multidongle' in locals(): - multidongle.stop() \ No newline at end of file diff --git a/src/cluster4npu/InferencePipeline.py b/src/cluster4npu/InferencePipeline.py deleted file mode 100644 index 4571420..0000000 --- a/src/cluster4npu/InferencePipeline.py +++ /dev/null @@ -1,563 +0,0 @@ -from typing import List, Dict, Any, Optional, Callable, Union -import threading -import queue -import time -import traceback -from dataclasses import dataclass -from concurrent.futures import ThreadPoolExecutor -import numpy as np - -from Multidongle import MultiDongle, PreProcessor, PostProcessor, DataProcessor - -@dataclass -class StageConfig: - """Configuration for a single pipeline stage""" - stage_id: str - port_ids: List[int] - scpu_fw_path: str - ncpu_fw_path: str - model_path: str - upload_fw: bool = False - max_queue_size: int = 50 - # Inter-stage processing - input_preprocessor: Optional[PreProcessor] = None # Before this stage - output_postprocessor: Optional[PostProcessor] = None # After this stage - # Stage-specific processing - stage_preprocessor: Optional[PreProcessor] = None # MultiDongle preprocessor - stage_postprocessor: Optional[PostProcessor] = None # MultiDongle postprocessor - -@dataclass -class PipelineData: - """Data structure flowing through pipeline""" - data: Any # Main data (image, features, etc.) - metadata: Dict[str, Any] # Additional info - stage_results: Dict[str, Any] # Results from each stage - pipeline_id: str # Unique identifier for this data flow - timestamp: float - -class PipelineStage: - """Single stage in the inference pipeline""" - - def __init__(self, config: StageConfig): - self.config = config - self.stage_id = config.stage_id - - # Initialize MultiDongle for this stage - self.multidongle = MultiDongle( - port_id=config.port_ids, - scpu_fw_path=config.scpu_fw_path, - ncpu_fw_path=config.ncpu_fw_path, - model_path=config.model_path, - upload_fw=config.upload_fw, - preprocessor=config.stage_preprocessor, - postprocessor=config.stage_postprocessor, - max_queue_size=config.max_queue_size - ) - - # Inter-stage processors - self.input_preprocessor = config.input_preprocessor - self.output_postprocessor = config.output_postprocessor - - # Threading for this stage - self.input_queue = queue.Queue(maxsize=config.max_queue_size) - self.output_queue = queue.Queue(maxsize=config.max_queue_size) - self.worker_thread = None - self.running = False - self._stop_event = threading.Event() - - # Statistics - self.processed_count = 0 - self.error_count = 0 - self.processing_times = [] - - def initialize(self): - """Initialize the stage""" - print(f"[Stage {self.stage_id}] Initializing...") - try: - self.multidongle.initialize() - self.multidongle.start() - print(f"[Stage {self.stage_id}] Initialized successfully") - except Exception as e: - print(f"[Stage {self.stage_id}] Initialization failed: {e}") - raise - - def start(self): - """Start the stage worker thread""" - if self.worker_thread and self.worker_thread.is_alive(): - return - - self.running = True - self._stop_event.clear() - self.worker_thread = threading.Thread(target=self._worker_loop, daemon=True) - self.worker_thread.start() - print(f"[Stage {self.stage_id}] Worker thread started") - - def stop(self): - """Stop the stage gracefully""" - print(f"[Stage {self.stage_id}] Stopping...") - self.running = False - self._stop_event.set() - - # Put sentinel to unblock worker - try: - self.input_queue.put(None, timeout=1.0) - except queue.Full: - pass - - # Wait for worker thread - if self.worker_thread and self.worker_thread.is_alive(): - self.worker_thread.join(timeout=3.0) - if self.worker_thread.is_alive(): - print(f"[Stage {self.stage_id}] Warning: Worker thread didn't stop cleanly") - - # Stop MultiDongle - self.multidongle.stop() - print(f"[Stage {self.stage_id}] Stopped") - - def _worker_loop(self): - """Main worker loop for processing data""" - print(f"[Stage {self.stage_id}] Worker loop started") - - while self.running and not self._stop_event.is_set(): - try: - # Get input data - try: - pipeline_data = self.input_queue.get(timeout=0.1) - if pipeline_data is None: # Sentinel value - continue - except queue.Empty: - continue - - start_time = time.time() - - # Process data through this stage - processed_data = self._process_data(pipeline_data) - - # Record processing time - processing_time = time.time() - start_time - self.processing_times.append(processing_time) - if len(self.processing_times) > 1000: # Keep only recent times - self.processing_times = self.processing_times[-500:] - - self.processed_count += 1 - - # Put result to output queue - try: - self.output_queue.put(processed_data, block=False) - except queue.Full: - # Drop oldest and add new - try: - self.output_queue.get_nowait() - self.output_queue.put(processed_data, block=False) - except queue.Empty: - pass - - except Exception as e: - self.error_count += 1 - print(f"[Stage {self.stage_id}] Processing error: {e}") - traceback.print_exc() - - print(f"[Stage {self.stage_id}] Worker loop stopped") - - def _process_data(self, pipeline_data: PipelineData) -> PipelineData: - """Process data through this stage""" - try: - current_data = pipeline_data.data - - # Debug: Print data info - if isinstance(current_data, np.ndarray): - print(f"[Stage {self.stage_id}] Input data: shape={current_data.shape}, dtype={current_data.dtype}") - - # Step 1: Input preprocessing (inter-stage) - if self.input_preprocessor: - if isinstance(current_data, np.ndarray): - print(f"[Stage {self.stage_id}] Applying input preprocessor...") - current_data = self.input_preprocessor.process( - current_data, - self.multidongle.model_input_shape, - 'BGR565' # Default format - ) - print(f"[Stage {self.stage_id}] After input preprocess: shape={current_data.shape}, dtype={current_data.dtype}") - - # Step 2: Always preprocess image data for MultiDongle - processed_data = None - if isinstance(current_data, np.ndarray) and len(current_data.shape) == 3: - # Always use MultiDongle's preprocess_frame to ensure correct format - print(f"[Stage {self.stage_id}] Preprocessing frame for MultiDongle...") - processed_data = self.multidongle.preprocess_frame(current_data, 'BGR565') - print(f"[Stage {self.stage_id}] After MultiDongle preprocess: shape={processed_data.shape}, dtype={processed_data.dtype}") - - # Validate processed data - if processed_data is None: - raise ValueError("MultiDongle preprocess_frame returned None") - if not isinstance(processed_data, np.ndarray): - raise ValueError(f"MultiDongle preprocess_frame returned {type(processed_data)}, expected np.ndarray") - - elif isinstance(current_data, dict) and 'raw_output' in current_data: - # This is result from previous stage, not suitable for direct inference - print(f"[Stage {self.stage_id}] Warning: Received processed result instead of image data") - processed_data = current_data - else: - print(f"[Stage {self.stage_id}] Warning: Unexpected data type: {type(current_data)}") - processed_data = current_data - - # Step 3: MultiDongle inference - if isinstance(processed_data, np.ndarray): - print(f"[Stage {self.stage_id}] Sending to MultiDongle: shape={processed_data.shape}, dtype={processed_data.dtype}") - self.multidongle.put_input(processed_data, 'BGR565') - - # Get inference result with timeout - inference_result = {} - timeout_start = time.time() - while time.time() - timeout_start < 5.0: # 5 second timeout - result = self.multidongle.get_latest_inference_result(timeout=0.1) - if result: - inference_result = result - break - time.sleep(0.01) - - if not inference_result: - print(f"[Stage {self.stage_id}] Warning: No inference result received") - inference_result = {'probability': 0.0, 'result': 'No Result'} - - # Step 3: Output postprocessing (inter-stage) - processed_result = inference_result - if self.output_postprocessor: - if 'raw_output' in inference_result: - processed_result = self.output_postprocessor.process( - inference_result['raw_output'] - ) - # Merge with original result - processed_result.update(inference_result) - - # Step 4: Update pipeline data - pipeline_data.stage_results[self.stage_id] = processed_result - pipeline_data.data = processed_result # Pass result as data to next stage - pipeline_data.metadata[f'{self.stage_id}_timestamp'] = time.time() - - return pipeline_data - - except Exception as e: - print(f"[Stage {self.stage_id}] Data processing error: {e}") - # Return data with error info - pipeline_data.stage_results[self.stage_id] = { - 'error': str(e), - 'probability': 0.0, - 'result': 'Processing Error' - } - return pipeline_data - - def put_data(self, data: PipelineData, timeout: float = 1.0) -> bool: - """Put data into this stage's input queue""" - try: - self.input_queue.put(data, timeout=timeout) - return True - except queue.Full: - return False - - def get_result(self, timeout: float = 0.1) -> Optional[PipelineData]: - """Get result from this stage's output queue""" - try: - return self.output_queue.get(timeout=timeout) - except queue.Empty: - return None - - def get_statistics(self) -> Dict[str, Any]: - """Get stage statistics""" - avg_processing_time = ( - sum(self.processing_times) / len(self.processing_times) - if self.processing_times else 0.0 - ) - - multidongle_stats = self.multidongle.get_statistics() - - return { - 'stage_id': self.stage_id, - 'processed_count': self.processed_count, - 'error_count': self.error_count, - 'avg_processing_time': avg_processing_time, - 'input_queue_size': self.input_queue.qsize(), - 'output_queue_size': self.output_queue.qsize(), - 'multidongle_stats': multidongle_stats - } - -class InferencePipeline: - """Multi-stage inference pipeline""" - - def __init__(self, stage_configs: List[StageConfig], - final_postprocessor: Optional[PostProcessor] = None, - pipeline_name: str = "InferencePipeline"): - """ - Initialize inference pipeline - :param stage_configs: List of stage configurations - :param final_postprocessor: Final postprocessor after all stages - :param pipeline_name: Name for this pipeline instance - """ - self.pipeline_name = pipeline_name - self.stage_configs = stage_configs - self.final_postprocessor = final_postprocessor - - # Create stages - self.stages: List[PipelineStage] = [] - for config in stage_configs: - stage = PipelineStage(config) - self.stages.append(stage) - - # Pipeline coordinator - self.coordinator_thread = None - self.running = False - self._stop_event = threading.Event() - - # Input/Output queues for the entire pipeline - self.pipeline_input_queue = queue.Queue(maxsize=100) - self.pipeline_output_queue = queue.Queue(maxsize=100) - - # Callbacks - self.result_callback = None - self.error_callback = None - self.stats_callback = None - - # Statistics - self.pipeline_counter = 0 - self.completed_counter = 0 - self.error_counter = 0 - - def initialize(self): - """Initialize all stages""" - print(f"[{self.pipeline_name}] Initializing pipeline with {len(self.stages)} stages...") - - for i, stage in enumerate(self.stages): - try: - stage.initialize() - print(f"[{self.pipeline_name}] Stage {i+1}/{len(self.stages)} initialized") - except Exception as e: - print(f"[{self.pipeline_name}] Failed to initialize stage {stage.stage_id}: {e}") - # Cleanup already initialized stages - for j in range(i): - self.stages[j].stop() - raise - - print(f"[{self.pipeline_name}] All stages initialized successfully") - - def start(self): - """Start the pipeline""" - print(f"[{self.pipeline_name}] Starting pipeline...") - - # Start all stages - for stage in self.stages: - stage.start() - - # Start coordinator - self.running = True - self._stop_event.clear() - self.coordinator_thread = threading.Thread(target=self._coordinator_loop, daemon=True) - self.coordinator_thread.start() - - print(f"[{self.pipeline_name}] Pipeline started successfully") - - def stop(self): - """Stop the pipeline gracefully""" - print(f"[{self.pipeline_name}] Stopping pipeline...") - - self.running = False - self._stop_event.set() - - # Stop coordinator - if self.coordinator_thread and self.coordinator_thread.is_alive(): - try: - self.pipeline_input_queue.put(None, timeout=1.0) - except queue.Full: - pass - self.coordinator_thread.join(timeout=3.0) - - # Stop all stages - for stage in self.stages: - stage.stop() - - print(f"[{self.pipeline_name}] Pipeline stopped") - - def _coordinator_loop(self): - """Coordinate data flow between stages""" - print(f"[{self.pipeline_name}] Coordinator started") - - while self.running and not self._stop_event.is_set(): - try: - # Get input data - try: - input_data = self.pipeline_input_queue.get(timeout=0.1) - if input_data is None: # Sentinel - continue - except queue.Empty: - continue - - # Create pipeline data - pipeline_data = PipelineData( - data=input_data, - metadata={'start_timestamp': time.time()}, - stage_results={}, - pipeline_id=f"pipeline_{self.pipeline_counter}", - timestamp=time.time() - ) - self.pipeline_counter += 1 - - # Process through each stage - current_data = pipeline_data - success = True - - for i, stage in enumerate(self.stages): - # Send data to stage - if not stage.put_data(current_data, timeout=1.0): - print(f"[{self.pipeline_name}] Stage {stage.stage_id} input queue full, dropping data") - success = False - break - - # Get result from stage - result_data = None - timeout_start = time.time() - while time.time() - timeout_start < 10.0: # 10 second timeout per stage - result_data = stage.get_result(timeout=0.1) - if result_data: - break - if self._stop_event.is_set(): - break - time.sleep(0.01) - - if not result_data: - print(f"[{self.pipeline_name}] Stage {stage.stage_id} timeout") - success = False - break - - current_data = result_data - - # Final postprocessing - if success and self.final_postprocessor: - try: - if isinstance(current_data.data, dict) and 'raw_output' in current_data.data: - final_result = self.final_postprocessor.process(current_data.data['raw_output']) - current_data.stage_results['final'] = final_result - current_data.data = final_result - except Exception as e: - print(f"[{self.pipeline_name}] Final postprocessing error: {e}") - - # Output result - if success: - current_data.metadata['end_timestamp'] = time.time() - current_data.metadata['total_processing_time'] = ( - current_data.metadata['end_timestamp'] - - current_data.metadata['start_timestamp'] - ) - - try: - self.pipeline_output_queue.put(current_data, block=False) - self.completed_counter += 1 - - # Call result callback - if self.result_callback: - self.result_callback(current_data) - - except queue.Full: - # Drop oldest and add new - try: - self.pipeline_output_queue.get_nowait() - self.pipeline_output_queue.put(current_data, block=False) - except queue.Empty: - pass - else: - self.error_counter += 1 - if self.error_callback: - self.error_callback(current_data) - - except Exception as e: - print(f"[{self.pipeline_name}] Coordinator error: {e}") - traceback.print_exc() - self.error_counter += 1 - - print(f"[{self.pipeline_name}] Coordinator stopped") - - def put_data(self, data: Any, timeout: float = 1.0) -> bool: - """Put data into pipeline""" - try: - self.pipeline_input_queue.put(data, timeout=timeout) - return True - except queue.Full: - return False - - def get_result(self, timeout: float = 0.1) -> Optional[PipelineData]: - """Get result from pipeline""" - try: - return self.pipeline_output_queue.get(timeout=timeout) - except queue.Empty: - return None - - def set_result_callback(self, callback: Callable[[PipelineData], None]): - """Set callback for successful results""" - self.result_callback = callback - - def set_error_callback(self, callback: Callable[[PipelineData], None]): - """Set callback for errors""" - self.error_callback = callback - - def set_stats_callback(self, callback: Callable[[Dict[str, Any]], None]): - """Set callback for statistics""" - self.stats_callback = callback - - def get_pipeline_statistics(self) -> Dict[str, Any]: - """Get comprehensive pipeline statistics""" - stage_stats = [] - for stage in self.stages: - stage_stats.append(stage.get_statistics()) - - return { - 'pipeline_name': self.pipeline_name, - 'total_stages': len(self.stages), - 'pipeline_input_submitted': self.pipeline_counter, - 'pipeline_completed': self.completed_counter, - 'pipeline_errors': self.error_counter, - 'pipeline_input_queue_size': self.pipeline_input_queue.qsize(), - 'pipeline_output_queue_size': self.pipeline_output_queue.qsize(), - 'stage_statistics': stage_stats - } - - def start_stats_reporting(self, interval: float = 5.0): - """Start periodic statistics reporting""" - def stats_loop(): - while self.running: - if self.stats_callback: - stats = self.get_pipeline_statistics() - self.stats_callback(stats) - time.sleep(interval) - - stats_thread = threading.Thread(target=stats_loop, daemon=True) - stats_thread.start() - -# Utility functions for common inter-stage processing -def create_feature_extractor_preprocessor() -> PreProcessor: - """Create preprocessor for feature extraction stage""" - def extract_features(frame, target_size): - # Example: extract edges, keypoints, etc. - import cv2 - gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) - edges = cv2.Canny(gray, 50, 150) - return cv2.resize(edges, target_size) - - return PreProcessor(resize_fn=extract_features) - -def create_result_aggregator_postprocessor() -> PostProcessor: - """Create postprocessor for aggregating multiple stage results""" - def aggregate_results(raw_output, **kwargs): - # Example: combine results from multiple stages - if isinstance(raw_output, dict): - # If raw_output is already processed results - return raw_output - - # Standard processing - if raw_output.size > 0: - probability = float(raw_output[0]) - return { - 'aggregated_probability': probability, - 'confidence': 'High' if probability > 0.8 else 'Medium' if probability > 0.5 else 'Low', - 'result': 'Detected' if probability > 0.5 else 'Not Detected' - } - return {'aggregated_probability': 0.0, 'confidence': 'Low', 'result': 'Not Detected'} - - return PostProcessor(process_fn=aggregate_results) \ No newline at end of file diff --git a/src/cluster4npu/Multidongle.py b/src/cluster4npu/Multidongle.py deleted file mode 100644 index 0dfb2df..0000000 --- a/src/cluster4npu/Multidongle.py +++ /dev/null @@ -1,505 +0,0 @@ -from typing import Union, Tuple -import os -import sys -import argparse -import time -import threading -import queue -import numpy as np -import kp -import cv2 -import time -from abc import ABC, abstractmethod -from typing import Callable, Optional, Any, Dict - - -class PreProcessor(DataProcessor): # type: ignore - def __init__(self, resize_fn: Optional[Callable] = None, - format_convert_fn: Optional[Callable] = None): - self.resize_fn = resize_fn or self._default_resize - self.format_convert_fn = format_convert_fn or self._default_format_convert - - def process(self, frame: np.ndarray, target_size: tuple, target_format: str) -> np.ndarray: - """Main processing pipeline""" - resized = self.resize_fn(frame, target_size) - return self.format_convert_fn(resized, target_format) - - def _default_resize(self, frame: np.ndarray, target_size: tuple) -> np.ndarray: - """Default resize implementation""" - return cv2.resize(frame, target_size) - - def _default_format_convert(self, frame: np.ndarray, target_format: str) -> np.ndarray: - """Default format conversion""" - if target_format == 'BGR565': - return cv2.cvtColor(frame, cv2.COLOR_BGR2BGR565) - elif target_format == 'RGB8888': - return cv2.cvtColor(frame, cv2.COLOR_BGR2RGBA) - return frame - -class MultiDongle: - # Curently, only BGR565, RGB8888, YUYV, and RAW8 formats are supported - _FORMAT_MAPPING = { - 'BGR565': kp.ImageFormat.KP_IMAGE_FORMAT_RGB565, - 'RGB8888': kp.ImageFormat.KP_IMAGE_FORMAT_RGBA8888, - 'YUYV': kp.ImageFormat.KP_IMAGE_FORMAT_YUYV, - 'RAW8': kp.ImageFormat.KP_IMAGE_FORMAT_RAW8, - # 'YCBCR422_CRY1CBY0': kp.ImageFormat.KP_IMAGE_FORMAT_YCBCR422_CRY1CBY0, - # 'YCBCR422_CBY1CRY0': kp.ImageFormat.KP_IMAGE_FORMAT_CBY1CRY0, - # 'YCBCR422_Y1CRY0CB': kp.ImageFormat.KP_IMAGE_FORMAT_Y1CRY0CB, - # 'YCBCR422_Y1CBY0CR': kp.ImageFormat.KP_IMAGE_FORMAT_Y1CBY0CR, - # 'YCBCR422_CRY0CBY1': kp.ImageFormat.KP_IMAGE_FORMAT_CRY0CBY1, - # 'YCBCR422_CBY0CRY1': kp.ImageFormat.KP_IMAGE_FORMAT_CBY0CRY1, - # 'YCBCR422_Y0CRY1CB': kp.ImageFormat.KP_IMAGE_FORMAT_Y0CRY1CB, - # 'YCBCR422_Y0CBY1CR': kp.ImageFormat.KP_IMAGE_FORMAT_Y0CBY1CR, - } - - def __init__(self, port_id: list, scpu_fw_path: str, ncpu_fw_path: str, model_path: str, upload_fw: bool = False): - """ - Initialize the MultiDongle class. - :param port_id: List of USB port IDs for the same layer's devices. - :param scpu_fw_path: Path to the SCPU firmware file. - :param ncpu_fw_path: Path to the NCPU firmware file. - :param model_path: Path to the model file. - :param upload_fw: Flag to indicate whether to upload firmware. - """ - self.port_id = port_id - self.upload_fw = upload_fw - - # Check if the firmware is needed - if self.upload_fw: - self.scpu_fw_path = scpu_fw_path - self.ncpu_fw_path = ncpu_fw_path - - self.model_path = model_path - self.device_group = None - - # generic_inference_input_descriptor will be prepared in initialize - self.model_nef_descriptor = None - self.generic_inference_input_descriptor = None - # Queues for data - # Input queue for images to be sent - self._input_queue = queue.Queue() - # Output queue for received results - self._output_queue = queue.Queue() - - # Threading attributes - self._send_thread = None - self._receive_thread = None - self._stop_event = threading.Event() # Event to signal threads to stop - - self._inference_counter = 0 - - def initialize(self): - """ - Connect devices, upload firmware (if upload_fw is True), and upload model. - Must be called before start(). - """ - # Connect device and assign to self.device_group - try: - print('[Connect Device]') - self.device_group = kp.core.connect_devices(usb_port_ids=self.port_id) - print(' - Success') - except kp.ApiKPException as exception: - print('Error: connect device fail, port ID = \'{}\', error msg: [{}]'.format(self.port_id, str(exception))) - sys.exit(1) - - # setting timeout of the usb communication with the device - # print('[Set Device Timeout]') - # kp.core.set_timeout(device_group=self.device_group, milliseconds=5000) - # print(' - Success') - - if self.upload_fw: - try: - print('[Upload Firmware]') - kp.core.load_firmware_from_file(device_group=self.device_group, - scpu_fw_path=self.scpu_fw_path, - ncpu_fw_path=self.ncpu_fw_path) - print(' - Success') - except kp.ApiKPException as exception: - print('Error: upload firmware failed, error = \'{}\''.format(str(exception))) - sys.exit(1) - - # upload model to device - try: - print('[Upload Model]') - self.model_nef_descriptor = kp.core.load_model_from_file(device_group=self.device_group, - file_path=self.model_path) - print(' - Success') - except kp.ApiKPException as exception: - print('Error: upload model failed, error = \'{}\''.format(str(exception))) - sys.exit(1) - - # Extract model input dimensions automatically from model metadata - if self.model_nef_descriptor and self.model_nef_descriptor.models: - model = self.model_nef_descriptor.models[0] - if hasattr(model, 'input_nodes') and model.input_nodes: - input_node = model.input_nodes[0] - # From your JSON: "shape_npu": [1, 3, 128, 128] -> (width, height) - shape = input_node.tensor_shape_info.data.shape_npu - self.model_input_shape = (shape[3], shape[2]) # (width, height) - self.model_input_channels = shape[1] # 3 for RGB - print(f"Model input shape detected: {self.model_input_shape}, channels: {self.model_input_channels}") - else: - self.model_input_shape = (128, 128) # fallback - self.model_input_channels = 3 - print("Using default input shape (128, 128)") - else: - self.model_input_shape = (128, 128) - self.model_input_channels = 3 - print("Model info not available, using default shape") - - # Prepare generic inference input descriptor after model is loaded - if self.model_nef_descriptor: - self.generic_inference_input_descriptor = kp.GenericImageInferenceDescriptor( - model_id=self.model_nef_descriptor.models[0].id, - ) - else: - print("Warning: Could not get generic inference input descriptor from model.") - self.generic_inference_input_descriptor = None - - def preprocess_frame(self, frame: np.ndarray, target_format: str = 'BGR565') -> np.ndarray: - """ - Preprocess frame for inference - """ - resized_frame = cv2.resize(frame, self.model_input_shape) - - if target_format == 'BGR565': - return cv2.cvtColor(resized_frame, cv2.COLOR_BGR2BGR565) - elif target_format == 'RGB8888': - return cv2.cvtColor(resized_frame, cv2.COLOR_BGR2RGBA) - elif target_format == 'YUYV': - return cv2.cvtColor(resized_frame, cv2.COLOR_BGR2YUV_YUYV) - else: - return resized_frame # RAW8 or other formats - - def get_latest_inference_result(self, timeout: float = 0.01) -> Tuple[float, str]: - """ - Get the latest inference result - Returns: (probability, result_string) or (None, None) if no result - """ - output_descriptor = self.get_output(timeout=timeout) - if not output_descriptor: - return None, None - - # Process the output descriptor - if hasattr(output_descriptor, 'header') and \ - hasattr(output_descriptor.header, 'num_output_node') and \ - hasattr(output_descriptor.header, 'inference_number'): - - inf_node_output_list = [] - retrieval_successful = True - - for node_idx in range(output_descriptor.header.num_output_node): - try: - inference_float_node_output = kp.inference.generic_inference_retrieve_float_node( - node_idx=node_idx, - generic_raw_result=output_descriptor, - channels_ordering=kp.ChannelOrdering.KP_CHANNEL_ORDERING_CHW - ) - inf_node_output_list.append(inference_float_node_output.ndarray.copy()) - except kp.ApiKPException as e: - retrieval_successful = False - break - except Exception as e: - retrieval_successful = False - break - - if retrieval_successful and inf_node_output_list: - # Process output nodes - if output_descriptor.header.num_output_node == 1: - raw_output_array = inf_node_output_list[0].flatten() - else: - concatenated_outputs = [arr.flatten() for arr in inf_node_output_list] - raw_output_array = np.concatenate(concatenated_outputs) if concatenated_outputs else np.array([]) - - if raw_output_array.size > 0: - probability = postprocess(raw_output_array) - result_str = "Fire" if probability > 0.5 else "No Fire" - return probability, result_str - - return None, None - - - # Modified _send_thread_func to get data from input queue - def _send_thread_func(self): - """Internal function run by the send thread, gets images from input queue.""" - print("Send thread started.") - while not self._stop_event.is_set(): - if self.generic_inference_input_descriptor is None: - # Wait for descriptor to be ready or stop - self._stop_event.wait(0.1) # Avoid busy waiting - continue - - try: - # Get image and format from the input queue - # Blocks until an item is available or stop event is set/timeout occurs - try: - # Use get with timeout or check stop event in a loop - # This pattern allows thread to check stop event while waiting on queue - item = self._input_queue.get(block=True, timeout=0.1) - # Check if this is our sentinel value - if item is None: - continue - - # Now safely unpack the tuple - image_data, image_format_enum = item - except queue.Empty: - # If queue is empty after timeout, check stop event and continue loop - continue - - # Configure and send the image - self._inference_counter += 1 # Increment counter for each image - self.generic_inference_input_descriptor.inference_number = self._inference_counter - self.generic_inference_input_descriptor.input_node_image_list = [kp.GenericInputNodeImage( - image=image_data, - image_format=image_format_enum, # Use the format from the queue - resize_mode=kp.ResizeMode.KP_RESIZE_ENABLE, - padding_mode=kp.PaddingMode.KP_PADDING_CORNER, - normalize_mode=kp.NormalizeMode.KP_NORMALIZE_KNERON - )] - - kp.inference.generic_image_inference_send(device_group=self.device_group, - generic_inference_input_descriptor=self.generic_inference_input_descriptor) - # print("Image sent.") # Optional: add log - # No need for sleep here usually, as queue.get is blocking - except kp.ApiKPException as exception: - print(f' - Error in send thread: inference send failed, error = {exception}') - self._stop_event.set() # Signal other thread to stop - except Exception as e: - print(f' - Unexpected error in send thread: {e}') - self._stop_event.set() - - print("Send thread stopped.") - - # _receive_thread_func remains the same - def _receive_thread_func(self): - """Internal function run by the receive thread, puts results into output queue.""" - print("Receive thread started.") - while not self._stop_event.is_set(): - try: - generic_inference_output_descriptor = kp.inference.generic_image_inference_receive(device_group=self.device_group) - self._output_queue.put(generic_inference_output_descriptor) - except kp.ApiKPException as exception: - if not self._stop_event.is_set(): # Avoid printing error if we are already stopping - print(f' - Error in receive thread: inference receive failed, error = {exception}') - self._stop_event.set() - except Exception as e: - print(f' - Unexpected error in receive thread: {e}') - self._stop_event.set() - - print("Receive thread stopped.") - - def start(self): - """ - Start the send and receive threads. - Must be called after initialize(). - """ - if self.device_group is None: - raise RuntimeError("MultiDongle not initialized. Call initialize() first.") - - if self._send_thread is None or not self._send_thread.is_alive(): - self._stop_event.clear() # Clear stop event for a new start - self._send_thread = threading.Thread(target=self._send_thread_func, daemon=True) - self._send_thread.start() - print("Send thread started.") - - if self._receive_thread is None or not self._receive_thread.is_alive(): - self._receive_thread = threading.Thread(target=self._receive_thread_func, daemon=True) - self._receive_thread.start() - print("Receive thread started.") - - def stop(self): - """Improved stop method with better cleanup""" - if self._stop_event.is_set(): - return # Already stopping - - print("Stopping threads...") - self._stop_event.set() - - # Clear queues to unblock threads - while not self._input_queue.empty(): - try: - self._input_queue.get_nowait() - except queue.Empty: - break - - # Signal send thread to wake up - self._input_queue.put(None) - - # Join threads with timeout - for thread, name in [(self._send_thread, "Send"), (self._receive_thread, "Receive")]: - if thread and thread.is_alive(): - thread.join(timeout=2.0) - if thread.is_alive(): - print(f"Warning: {name} thread didn't stop cleanly") - - def put_input(self, image: Union[str, np.ndarray], format: str, target_size: Tuple[int, int] = None): - """ - Put an image into the input queue with flexible preprocessing - """ - if isinstance(image, str): - image_data = cv2.imread(image) - if image_data is None: - raise FileNotFoundError(f"Image file not found at {image}") - if target_size: - image_data = cv2.resize(image_data, target_size) - elif isinstance(image, np.ndarray): - # Don't modify original array, make copy if needed - image_data = image.copy() if target_size is None else cv2.resize(image, target_size) - else: - raise ValueError("Image must be a file path (str) or a numpy array (ndarray).") - - if format in self._FORMAT_MAPPING: - image_format_enum = self._FORMAT_MAPPING[format] - else: - raise ValueError(f"Unsupported format: {format}") - - self._input_queue.put((image_data, image_format_enum)) - - def get_output(self, timeout: float = None): - """ - Get the next received data from the output queue. - This method is non-blocking by default unless a timeout is specified. - :param timeout: Time in seconds to wait for data. If None, it's non-blocking. - :return: Received data (e.g., kp.GenericInferenceOutputDescriptor) or None if no data available within timeout. - """ - try: - return self._output_queue.get(block=timeout is not None, timeout=timeout) - except queue.Empty: - return None - - def __del__(self): - """Ensure resources are released when the object is garbage collected.""" - self.stop() - if self.device_group: - try: - kp.core.disconnect_devices(device_group=self.device_group) - print("Device group disconnected in destructor.") - except Exception as e: - print(f"Error disconnecting device group in destructor: {e}") - -def postprocess(raw_model_output: list) -> float: - """ - Post-processes the raw model output. - Assumes the model output is a list/array where the first element is the desired probability. - """ - if raw_model_output and len(raw_model_output) > 0: - probability = raw_model_output[0] - return float(probability) - return 0.0 # Default or error value - -class WebcamInferenceRunner: - def __init__(self, multidongle: MultiDongle, image_format: str = 'BGR565'): - self.multidongle = multidongle - self.image_format = image_format - self.latest_probability = 0.0 - self.result_str = "No Fire" - - # Statistics tracking - self.processed_inference_count = 0 - self.inference_fps_start_time = None - self.display_fps_start_time = None - self.display_frame_counter = 0 - - def run(self, camera_id: int = 0): - cap = cv2.VideoCapture(camera_id) - if not cap.isOpened(): - raise RuntimeError("Cannot open webcam") - - try: - while True: - ret, frame = cap.read() - if not ret: - break - - # Track display FPS - if self.display_fps_start_time is None: - self.display_fps_start_time = time.time() - self.display_frame_counter += 1 - - # Preprocess and send frame - processed_frame = self.multidongle.preprocess_frame(frame, self.image_format) - self.multidongle.put_input(processed_frame, self.image_format) - - # Get inference result - prob, result = self.multidongle.get_latest_inference_result() - if prob is not None: - # Track inference FPS - if self.inference_fps_start_time is None: - self.inference_fps_start_time = time.time() - self.processed_inference_count += 1 - - self.latest_probability = prob - self.result_str = result - - # Display frame with results - self._display_results(frame) - - if cv2.waitKey(1) & 0xFF == ord('q'): - break - - finally: - # self._print_statistics() - cap.release() - cv2.destroyAllWindows() - - def _display_results(self, frame): - display_frame = frame.copy() - text_color = (0, 255, 0) if "Fire" in self.result_str else (0, 0, 255) - - # Display inference result - cv2.putText(display_frame, f"{self.result_str} (Prob: {self.latest_probability:.2f})", - (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, text_color, 2) - - # Calculate and display inference FPS - if self.inference_fps_start_time and self.processed_inference_count > 0: - elapsed_time = time.time() - self.inference_fps_start_time - if elapsed_time > 0: - inference_fps = self.processed_inference_count / elapsed_time - cv2.putText(display_frame, f"Inference FPS: {inference_fps:.2f}", - (10, 60), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 255), 2) - - cv2.imshow('Fire Detection', display_frame) - - # def _print_statistics(self): - # """Print final statistics""" - # print(f"\n--- Summary ---") - # print(f"Total inferences processed: {self.processed_inference_count}") - - # if self.inference_fps_start_time and self.processed_inference_count > 0: - # elapsed = time.time() - self.inference_fps_start_time - # if elapsed > 0: - # avg_inference_fps = self.processed_inference_count / elapsed - # print(f"Average Inference FPS: {avg_inference_fps:.2f}") - - # if self.display_fps_start_time and self.display_frame_counter > 0: - # elapsed = time.time() - self.display_fps_start_time - # if elapsed > 0: - # avg_display_fps = self.display_frame_counter / elapsed - # print(f"Average Display FPS: {avg_display_fps:.2f}") - -if __name__ == "__main__": - PORT_IDS = [28, 32] - SCPU_FW = r'fw_scpu.bin' - NCPU_FW = r'fw_ncpu.bin' - MODEL_PATH = r'fire_detection_520.nef' - - try: - # Initialize inference engine - print("Initializing MultiDongle...") - multidongle = MultiDongle(PORT_IDS, SCPU_FW, NCPU_FW, MODEL_PATH, upload_fw=True) - multidongle.initialize() - multidongle.start() - - # Run using the new runner class - print("Starting webcam inference...") - runner = WebcamInferenceRunner(multidongle, 'BGR565') - runner.run() - - except Exception as e: - print(f"Error: {e}") - import traceback - traceback.print_exc() - finally: - if 'multidongle' in locals(): - multidongle.stop() \ No newline at end of file diff --git a/src/cluster4npu/test.py b/src/cluster4npu/test.py deleted file mode 100644 index bf5682e..0000000 --- a/src/cluster4npu/test.py +++ /dev/null @@ -1,407 +0,0 @@ -""" -InferencePipeline Usage Examples -================================ - -This file demonstrates how to use the InferencePipeline for various scenarios: -1. Single stage (equivalent to MultiDongle) -2. Two-stage cascade (detection -> classification) -3. Multi-stage complex pipeline -""" - -import cv2 -import numpy as np -import time -from InferencePipeline import ( - InferencePipeline, StageConfig, - create_feature_extractor_preprocessor, - create_result_aggregator_postprocessor -) -from Multidongle import PreProcessor, PostProcessor, WebcamSource, RTSPSource - -# ============================================================================= -# Example 1: Single Stage Pipeline (Basic Usage) -# ============================================================================= - -def example_single_stage(): - """Single stage pipeline - equivalent to using MultiDongle directly""" - print("=== Single Stage Pipeline Example ===") - - # Create stage configuration - stage_config = StageConfig( - stage_id="fire_detection", - port_ids=[28, 32], - scpu_fw_path="fw_scpu.bin", - ncpu_fw_path="fw_ncpu.bin", - model_path="fire_detection_520.nef", - upload_fw=True, - max_queue_size=30 - # Note: No inter-stage processors needed for single stage - # MultiDongle will handle internal preprocessing/postprocessing - ) - - # Create pipeline with single stage - pipeline = InferencePipeline( - stage_configs=[stage_config], - pipeline_name="SingleStageFireDetection" - ) - - # Initialize and start - pipeline.initialize() - pipeline.start() - - # Process some data - data_source = WebcamSource(camera_id=0) - data_source.start() - - def handle_result(pipeline_data): - result = pipeline_data.stage_results.get("fire_detection", {}) - print(f"Fire Detection: {result.get('result', 'Unknown')} " - f"(Prob: {result.get('probability', 0.0):.3f})") - - def handle_error(pipeline_data): - print(f"❌ Error: {pipeline_data.stage_results}") - - pipeline.set_result_callback(handle_result) - pipeline.set_error_callback(handle_error) - - try: - print("🚀 Starting single stage pipeline...") - for i in range(100): # Process 100 frames - frame = data_source.get_frame() - if frame is not None: - success = pipeline.put_data(frame, timeout=1.0) - if not success: - print("Pipeline input queue full, dropping frame") - time.sleep(0.1) - except KeyboardInterrupt: - print("\nStopping...") - finally: - data_source.stop() - pipeline.stop() - print("Single stage pipeline test completed") - -# ============================================================================= -# Example 2: Two-Stage Cascade Pipeline -# ============================================================================= - -def example_two_stage_cascade(): - """Two-stage cascade: Object Detection -> Fire Classification""" - print("=== Two-Stage Cascade Pipeline Example ===") - - # Custom preprocessor for second stage - def roi_extraction_preprocess(frame, target_size): - """Extract ROI from detection results and prepare for classification""" - # This would normally extract bounding box from first stage results - # For demo, we'll just do center crop - h, w = frame.shape[:2] if len(frame.shape) == 3 else frame.shape - center_x, center_y = w // 2, h // 2 - crop_size = min(w, h) // 2 - - x1 = max(0, center_x - crop_size // 2) - y1 = max(0, center_y - crop_size // 2) - x2 = min(w, center_x + crop_size // 2) - y2 = min(h, center_y + crop_size // 2) - - if len(frame.shape) == 3: - cropped = frame[y1:y2, x1:x2] - else: - cropped = frame[y1:y2, x1:x2] - - return cv2.resize(cropped, target_size) - - # Custom postprocessor for combining results - def combine_detection_classification(raw_output, **kwargs): - """Combine detection and classification results""" - if raw_output.size > 0: - classification_prob = float(raw_output[0]) - - # Get detection result from metadata (would be passed from first stage) - detection_confidence = kwargs.get('detection_conf', 0.5) - - # Combined confidence - combined_prob = (classification_prob * 0.7) + (detection_confidence * 0.3) - - return { - 'combined_probability': combined_prob, - 'classification_prob': classification_prob, - 'detection_conf': detection_confidence, - 'result': 'Fire Detected' if combined_prob > 0.6 else 'No Fire', - 'confidence': 'High' if combined_prob > 0.8 else 'Medium' if combined_prob > 0.5 else 'Low' - } - return {'combined_probability': 0.0, 'result': 'No Fire', 'confidence': 'Low'} - - # Set up callbacks - def handle_cascade_result(pipeline_data): - """Handle results from cascade pipeline""" - detection_result = pipeline_data.stage_results.get("object_detection", {}) - classification_result = pipeline_data.stage_results.get("fire_classification", {}) - - print(f"Detection: {detection_result.get('result', 'Unknown')} " - f"(Prob: {detection_result.get('probability', 0.0):.3f})") - print(f"Classification: {classification_result.get('result', 'Unknown')} " - f"(Combined: {classification_result.get('combined_probability', 0.0):.3f})") - print(f"Processing Time: {pipeline_data.metadata.get('total_processing_time', 0.0):.3f}s") - print("-" * 50) - - def handle_pipeline_stats(stats): - """Handle pipeline statistics""" - print(f"\n📊 Pipeline Stats:") - print(f" Submitted: {stats['pipeline_input_submitted']}") - print(f" Completed: {stats['pipeline_completed']}") - print(f" Errors: {stats['pipeline_errors']}") - - for stage_stat in stats['stage_statistics']: - print(f" Stage {stage_stat['stage_id']}: " - f"Processed={stage_stat['processed_count']}, " - f"AvgTime={stage_stat['avg_processing_time']:.3f}s") - - # Stage 1: Object Detection - stage1_config = StageConfig( - stage_id="object_detection", - port_ids=[28, 30], # First set of dongles - scpu_fw_path="fw_scpu.bin", - ncpu_fw_path="fw_ncpu.bin", - model_path="object_detection_520.nef", - upload_fw=True, - max_queue_size=30 - ) - - # Stage 2: Fire Classification - stage2_config = StageConfig( - stage_id="fire_classification", - port_ids=[32, 34], # Second set of dongles - scpu_fw_path="fw_scpu.bin", - ncpu_fw_path="fw_ncpu.bin", - model_path="fire_classification_520.nef", - upload_fw=True, - max_queue_size=30, - # Inter-stage processing - input_preprocessor=PreProcessor(resize_fn=roi_extraction_preprocess), - output_postprocessor=PostProcessor(process_fn=combine_detection_classification) - ) - - # Create two-stage pipeline - pipeline = InferencePipeline( - stage_configs=[stage1_config, stage2_config], - pipeline_name="TwoStageCascade" - ) - - pipeline.set_result_callback(handle_cascade_result) - pipeline.set_stats_callback(handle_pipeline_stats) - - # Initialize and start - pipeline.initialize() - pipeline.start() - pipeline.start_stats_reporting(interval=10.0) # Stats every 10 seconds - - # Process data - # data_source = RTSPSource("rtsp://your-camera-url") - data_source = WebcamSource(0) - data_source.start() - - try: - frame_count = 0 - while frame_count < 200: - frame = data_source.get_frame() - if frame is not None: - if pipeline.put_data(frame, timeout=1.0): - frame_count += 1 - else: - print("Pipeline input queue full, dropping frame") - time.sleep(0.05) - except KeyboardInterrupt: - print("\nStopping cascade pipeline...") - finally: - data_source.stop() - pipeline.stop() - -# ============================================================================= -# Example 3: Complex Multi-Stage Pipeline -# ============================================================================= - -def example_complex_pipeline(): - """Complex multi-stage pipeline with feature extraction and fusion""" - print("=== Complex Multi-Stage Pipeline Example ===") - - # Custom processors for different stages - def edge_detection_preprocess(frame, target_size): - """Extract edge features""" - gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) - edges = cv2.Canny(gray, 50, 150) - edges_3ch = cv2.cvtColor(edges, cv2.COLOR_GRAY2BGR) - return cv2.resize(edges_3ch, target_size) - - def thermal_simulation_preprocess(frame, target_size): - """Simulate thermal-like processing""" - # Convert to HSV and extract V channel as pseudo-thermal - hsv = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV) - thermal_like = hsv[:, :, 2] # Value channel - thermal_3ch = cv2.cvtColor(thermal_like, cv2.COLOR_GRAY2BGR) - return cv2.resize(thermal_3ch, target_size) - - def fusion_postprocess(raw_output, **kwargs): - """Fuse results from multiple modalities""" - if raw_output.size > 0: - current_prob = float(raw_output[0]) - - # This would get previous stage results from pipeline metadata - # For demo, we'll simulate - rgb_confidence = kwargs.get('rgb_conf', 0.5) - edge_confidence = kwargs.get('edge_conf', 0.5) - - # Weighted fusion - fused_prob = (current_prob * 0.5) + (rgb_confidence * 0.3) + (edge_confidence * 0.2) - - return { - 'fused_probability': fused_prob, - 'individual_probs': { - 'thermal': current_prob, - 'rgb': rgb_confidence, - 'edge': edge_confidence - }, - 'result': 'Fire Detected' if fused_prob > 0.6 else 'No Fire', - 'confidence': 'Very High' if fused_prob > 0.9 else 'High' if fused_prob > 0.7 else 'Medium' if fused_prob > 0.5 else 'Low' - } - return {'fused_probability': 0.0, 'result': 'No Fire', 'confidence': 'Low'} - - # Stage 1: RGB Analysis - rgb_stage = StageConfig( - stage_id="rgb_analysis", - port_ids=[28, 30], - scpu_fw_path="fw_scpu.bin", - ncpu_fw_path="fw_ncpu.bin", - model_path="rgb_fire_detection_520.nef", - upload_fw=True - ) - - # Stage 2: Edge Feature Analysis - edge_stage = StageConfig( - stage_id="edge_analysis", - port_ids=[32, 34], - scpu_fw_path="fw_scpu.bin", - ncpu_fw_path="fw_ncpu.bin", - model_path="edge_fire_detection_520.nef", - upload_fw=True, - input_preprocessor=PreProcessor(resize_fn=edge_detection_preprocess) - ) - - # Stage 3: Thermal-like Analysis - thermal_stage = StageConfig( - stage_id="thermal_analysis", - port_ids=[36, 38], - scpu_fw_path="fw_scpu.bin", - ncpu_fw_path="fw_ncpu.bin", - model_path="thermal_fire_detection_520.nef", - upload_fw=True, - input_preprocessor=PreProcessor(resize_fn=thermal_simulation_preprocess) - ) - - # Stage 4: Fusion - fusion_stage = StageConfig( - stage_id="result_fusion", - port_ids=[40, 42], - scpu_fw_path="fw_scpu.bin", - ncpu_fw_path="fw_ncpu.bin", - model_path="fusion_520.nef", - upload_fw=True, - output_postprocessor=PostProcessor(process_fn=fusion_postprocess) - ) - - # Create complex pipeline - pipeline = InferencePipeline( - stage_configs=[rgb_stage, edge_stage, thermal_stage, fusion_stage], - pipeline_name="ComplexMultiModalPipeline" - ) - - # Advanced result handling - def handle_complex_result(pipeline_data): - """Handle complex pipeline results""" - print(f"\n🔥 Multi-Modal Fire Detection Results:") - print(f" Pipeline ID: {pipeline_data.pipeline_id}") - - for stage_id, result in pipeline_data.stage_results.items(): - if 'probability' in result: - print(f" {stage_id}: {result.get('result', 'Unknown')} " - f"(Prob: {result.get('probability', 0.0):.3f})") - - # Final fused result - if 'result_fusion' in pipeline_data.stage_results: - fusion_result = pipeline_data.stage_results['result_fusion'] - print(f" 🎯 FINAL: {fusion_result.get('result', 'Unknown')} " - f"(Fused: {fusion_result.get('fused_probability', 0.0):.3f})") - print(f" Confidence: {fusion_result.get('confidence', 'Unknown')}") - - print(f" Total Processing Time: {pipeline_data.metadata.get('total_processing_time', 0.0):.3f}s") - print("=" * 60) - - def handle_error(pipeline_data): - """Handle pipeline errors""" - print(f"❌ Pipeline Error for {pipeline_data.pipeline_id}") - for stage_id, result in pipeline_data.stage_results.items(): - if 'error' in result: - print(f" Stage {stage_id} error: {result['error']}") - - pipeline.set_result_callback(handle_complex_result) - pipeline.set_error_callback(handle_error) - - # Initialize and start - try: - pipeline.initialize() - pipeline.start() - - # Simulate data input - data_source = WebcamSource(camera_id=0) - data_source.start() - - print("🚀 Complex pipeline started. Processing frames...") - - frame_count = 0 - start_time = time.time() - - while frame_count < 50: # Process 50 frames for demo - frame = data_source.get_frame() - if frame is not None: - if pipeline.put_data(frame): - frame_count += 1 - if frame_count % 10 == 0: - elapsed = time.time() - start_time - fps = frame_count / elapsed - print(f"📈 Processed {frame_count} frames, Pipeline FPS: {fps:.2f}") - time.sleep(0.1) - - except Exception as e: - print(f"Error in complex pipeline: {e}") - finally: - data_source.stop() - pipeline.stop() - - # Final statistics - final_stats = pipeline.get_pipeline_statistics() - print(f"\n📊 Final Pipeline Statistics:") - print(f" Total Input: {final_stats['pipeline_input_submitted']}") - print(f" Completed: {final_stats['pipeline_completed']}") - print(f" Success Rate: {final_stats['pipeline_completed']/max(final_stats['pipeline_input_submitted'], 1)*100:.1f}%") - -# ============================================================================= -# Main Function - Run Examples -# ============================================================================= - -if __name__ == "__main__": - import argparse - - parser = argparse.ArgumentParser(description="InferencePipeline Examples") - parser.add_argument("--example", choices=["single", "cascade", "complex"], - default="single", help="Which example to run") - args = parser.parse_args() - - if args.example == "single": - example_single_stage() - elif args.example == "cascade": - example_two_stage_cascade() - elif args.example == "complex": - example_complex_pipeline() - else: - print("Available examples:") - print(" python pipeline_example.py --example single") - print(" python pipeline_example.py --example cascade") - print(" python pipeline_example.py --example complex") \ No newline at end of file