cluster4npu/cluster4npu_ui/core/functions/InferencePipeline.py
HuangMason320 77bd8324ab feat: Add Upload Firmware checkbox to Model Node properties panel
- Add upload_fw property with enhanced UI checkbox styling
- Connect checkbox to inference pipeline process
- Enable/disable firmware upload based on user selection
- Add visual feedback and logging for firmware upload status

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-07-31 01:13:49 +08:00

686 lines
30 KiB
Python

from typing import List, Dict, Any, Optional, Callable, Union
import threading
import queue
import time
import traceback
from dataclasses import dataclass
from concurrent.futures import ThreadPoolExecutor
import numpy as np
from Multidongle import MultiDongle, PreProcessor, PostProcessor, DataProcessor
@dataclass
class StageConfig:
"""Configuration for a single pipeline stage"""
stage_id: str
port_ids: List[int]
scpu_fw_path: str
ncpu_fw_path: str
model_path: str
upload_fw: bool
max_queue_size: int = 50
# Inter-stage processing
input_preprocessor: Optional[PreProcessor] = None # Before this stage
output_postprocessor: Optional[PostProcessor] = None # After this stage
# Stage-specific processing
stage_preprocessor: Optional[PreProcessor] = None # MultiDongle preprocessor
stage_postprocessor: Optional[PostProcessor] = None # MultiDongle postprocessor
@dataclass
class PipelineData:
"""Data structure flowing through pipeline"""
data: Any # Main data (image, features, etc.)
metadata: Dict[str, Any] # Additional info
stage_results: Dict[str, Any] # Results from each stage
pipeline_id: str # Unique identifier for this data flow
timestamp: float
class PipelineStage:
"""Single stage in the inference pipeline"""
def __init__(self, config: StageConfig):
self.config = config
self.stage_id = config.stage_id
# Initialize MultiDongle for this stage
self.multidongle = MultiDongle(
port_id=config.port_ids,
scpu_fw_path=config.scpu_fw_path,
ncpu_fw_path=config.ncpu_fw_path,
model_path=config.model_path,
upload_fw=config.upload_fw,
auto_detect=config.auto_detect if hasattr(config, 'auto_detect') else False,
max_queue_size=config.max_queue_size
)
# Store preprocessor and postprocessor for later use
self.stage_preprocessor = config.stage_preprocessor
self.stage_postprocessor = config.stage_postprocessor
self.max_queue_size = config.max_queue_size
# Inter-stage processors
self.input_preprocessor = config.input_preprocessor
self.output_postprocessor = config.output_postprocessor
# Threading for this stage
self.input_queue = queue.Queue(maxsize=config.max_queue_size)
self.output_queue = queue.Queue(maxsize=config.max_queue_size)
self.worker_thread = None
self.running = False
self._stop_event = threading.Event()
# Statistics
self.processed_count = 0
self.error_count = 0
self.processing_times = []
def initialize(self):
"""Initialize the stage"""
print(f"[Stage {self.stage_id}] Initializing...")
try:
self.multidongle.initialize()
self.multidongle.start()
print(f"[Stage {self.stage_id}] Initialized successfully")
except Exception as e:
print(f"[Stage {self.stage_id}] Initialization failed: {e}")
raise
def start(self):
"""Start the stage worker thread"""
if self.worker_thread and self.worker_thread.is_alive():
return
self.running = True
self._stop_event.clear()
self.worker_thread = threading.Thread(target=self._worker_loop, daemon=True)
self.worker_thread.start()
print(f"[Stage {self.stage_id}] Worker thread started")
def stop(self):
"""Stop the stage gracefully"""
print(f"[Stage {self.stage_id}] Stopping...")
self.running = False
self._stop_event.set()
# Put sentinel to unblock worker
try:
self.input_queue.put(None, timeout=1.0)
except queue.Full:
pass
# Wait for worker thread
if self.worker_thread and self.worker_thread.is_alive():
self.worker_thread.join(timeout=3.0)
if self.worker_thread.is_alive():
print(f"[Stage {self.stage_id}] Warning: Worker thread didn't stop cleanly")
# Stop MultiDongle
self.multidongle.stop()
print(f"[Stage {self.stage_id}] Stopped")
def _worker_loop(self):
"""Main worker loop for processing data"""
print(f"[Stage {self.stage_id}] Worker loop started")
while self.running and not self._stop_event.is_set():
try:
# Get input data
try:
pipeline_data = self.input_queue.get(timeout=1.0)
if pipeline_data is None: # Sentinel value
continue
except queue.Empty:
if self._stop_event.is_set():
break
continue
start_time = time.time()
# Process data through this stage
processed_data = self._process_data(pipeline_data)
# Only count and record timing for actual inference results
if processed_data and self._has_inference_result(processed_data):
# Record processing time
processing_time = time.time() - start_time
self.processing_times.append(processing_time)
if len(self.processing_times) > 1000: # Keep only recent times
self.processing_times = self.processing_times[-500:]
self.processed_count += 1
# Put result to output queue
try:
self.output_queue.put(processed_data, block=False)
except queue.Full:
# Drop oldest and add new
try:
self.output_queue.get_nowait()
self.output_queue.put(processed_data, block=False)
except queue.Empty:
pass
except Exception as e:
self.error_count += 1
print(f"[Stage {self.stage_id}] Processing error: {e}")
traceback.print_exc()
print(f"[Stage {self.stage_id}] Worker loop stopped")
def _has_inference_result(self, processed_data) -> bool:
"""Check if processed_data contains a valid inference result (like standalone code)"""
if not processed_data:
return False
try:
# Check if it's a PipelineData with stage results
if hasattr(processed_data, 'stage_results') and processed_data.stage_results:
stage_result = processed_data.stage_results.get(self.stage_id)
if stage_result:
# Check for tuple result (prob, result_str) - like standalone code
if isinstance(stage_result, tuple) and len(stage_result) == 2:
prob, result_str = stage_result
return prob is not None and result_str is not None
# Check for dict result with actual inference data (not status messages)
elif isinstance(stage_result, dict):
# Don't count "Processing" or "async" status as real results
if stage_result.get("status") in ["processing", "async"]:
return False
# Don't count empty results
if not stage_result or stage_result.get("result") == "Processing":
return False
return True
else:
return stage_result is not None
except Exception:
pass
return False
def _process_data(self, pipeline_data: PipelineData) -> PipelineData:
"""Process data through this stage"""
try:
current_data = pipeline_data.data
# Step 1: Input preprocessing (inter-stage)
if self.input_preprocessor:
if isinstance(current_data, np.ndarray):
current_data = self.input_preprocessor.process(
current_data,
self.multidongle.model_input_shape,
'BGR565' # Default format
)
# Step 2: Always preprocess image data for MultiDongle
processed_data = None
if isinstance(current_data, np.ndarray) and len(current_data.shape) == 3:
# Always use MultiDongle's preprocess_frame to ensure correct format
processed_data = self.multidongle.preprocess_frame(current_data, 'BGR565')
# Validate processed data
if processed_data is None:
raise ValueError("MultiDongle preprocess_frame returned None")
if not isinstance(processed_data, np.ndarray):
raise ValueError(f"MultiDongle preprocess_frame returned {type(processed_data)}, expected np.ndarray")
elif isinstance(current_data, dict) and 'raw_output' in current_data:
# This is result from previous stage, not suitable for direct inference
processed_data = current_data
else:
processed_data = current_data
# Step 3: MultiDongle inference
if isinstance(processed_data, np.ndarray):
self.multidongle.put_input(processed_data, 'BGR565')
# Get inference result (non-blocking, async pattern like standalone code)
result = self.multidongle.get_latest_inference_result()
# Process result if available - only count actual inference results for FPS
inference_result = None
if result is not None:
if isinstance(result, tuple) and len(result) == 2:
# Handle tuple results like (probability, result_string)
prob, result_str = result
if prob is not None and result_str is not None:
# Avoid duplicate logging - handled by GUI callback formatting
# print(f"[Stage {self.stage_id}] ✅ Inference result: prob={prob:.3f}, result={result_str}")
inference_result = result
elif isinstance(result, dict) and result: # Non-empty dict
# Avoid duplicate logging - handled by GUI callback formatting
# print(f"[Stage {self.stage_id}] Dict result: {result}")
inference_result = result
else:
inference_result = result
# If no result, use default (don't spam logs)
if not inference_result:
inference_result = {'probability': 0.0, 'result': 'Processing', 'status': 'async'}
# Step 4: Update pipeline data
pipeline_data.stage_results[self.stage_id] = inference_result
pipeline_data.data = inference_result # Pass result as data to next stage
pipeline_data.metadata[f'{self.stage_id}_timestamp'] = time.time()
return pipeline_data
except Exception as e:
print(f"[Stage {self.stage_id}] Data processing error: {e}")
# Return data with error info
pipeline_data.stage_results[self.stage_id] = {
'error': str(e),
'probability': 0.0,
'result': 'Processing Error'
}
return pipeline_data
def put_data(self, data: PipelineData, timeout: float = 1.0) -> bool:
"""Put data into this stage's input queue"""
try:
self.input_queue.put(data, timeout=timeout)
return True
except queue.Full:
return False
def get_result(self, timeout: float = 0.1) -> Optional[PipelineData]:
"""Get result from this stage's output queue"""
try:
return self.output_queue.get(timeout=timeout)
except queue.Empty:
return None
def get_statistics(self) -> Dict[str, Any]:
"""Get stage statistics"""
avg_processing_time = (
sum(self.processing_times) / len(self.processing_times)
if self.processing_times else 0.0
)
multidongle_stats = self.multidongle.get_statistics()
return {
'stage_id': self.stage_id,
'processed_count': self.processed_count,
'error_count': self.error_count,
'avg_processing_time': avg_processing_time,
'input_queue_size': self.input_queue.qsize(),
'output_queue_size': self.output_queue.qsize(),
'multidongle_stats': multidongle_stats
}
class InferencePipeline:
"""Multi-stage inference pipeline"""
def __init__(self, stage_configs: List[StageConfig],
final_postprocessor: Optional[PostProcessor] = None,
pipeline_name: str = "InferencePipeline"):
"""
Initialize inference pipeline
:param stage_configs: List of stage configurations
:param final_postprocessor: Final postprocessor after all stages
:param pipeline_name: Name for this pipeline instance
"""
self.pipeline_name = pipeline_name
self.stage_configs = stage_configs
self.final_postprocessor = final_postprocessor
# Create stages
self.stages: List[PipelineStage] = []
for config in stage_configs:
stage = PipelineStage(config)
self.stages.append(stage)
# Pipeline coordinator
self.coordinator_thread = None
self.running = False
self._stop_event = threading.Event()
# Input/Output queues for the entire pipeline
self.pipeline_input_queue = queue.Queue(maxsize=100)
self.pipeline_output_queue = queue.Queue(maxsize=100)
# Callbacks
self.result_callback = None
self.error_callback = None
self.stats_callback = None
# Statistics
self.pipeline_counter = 0
self.completed_counter = 0
self.error_counter = 0
# FPS calculation based on output queue throughput (cumulative approach)
self.fps_start_time = None # Start time for FPS calculation
self.fps_lock = threading.Lock() # Thread safety for FPS calculation
def initialize(self):
"""Initialize all stages"""
print(f"[{self.pipeline_name}] Initializing pipeline with {len(self.stages)} stages...")
for i, stage in enumerate(self.stages):
try:
stage.initialize()
print(f"[{self.pipeline_name}] Stage {i+1}/{len(self.stages)} initialized")
except Exception as e:
print(f"[{self.pipeline_name}] Failed to initialize stage {stage.stage_id}: {e}")
# Cleanup already initialized stages
for j in range(i):
self.stages[j].stop()
raise
print(f"[{self.pipeline_name}] All stages initialized successfully")
def _record_output_timestamp(self):
"""Record timestamp when output is generated for FPS calculation"""
with self.fps_lock:
# Set start time only when we have our first completed result
if self.fps_start_time is None and self.completed_counter == 1:
self.fps_start_time = time.time()
def get_current_fps(self) -> float:
"""Calculate current FPS based on output queue throughput (cumulative approach like example.py)"""
with self.fps_lock:
if self.fps_start_time is None or self.completed_counter == 0:
return 0.0
elapsed_time = time.time() - self.fps_start_time
if elapsed_time > 0:
return self.completed_counter / elapsed_time
return 0.0
def _has_valid_inference_result(self, pipeline_data) -> bool:
"""Check if pipeline data contains valid inference results (not async/processing status)"""
for stage_id, stage_result in pipeline_data.stage_results.items():
if stage_result:
# Check for tuple result (prob, result_str)
if isinstance(stage_result, tuple) and len(stage_result) == 2:
prob, result_str = stage_result
if prob is not None and result_str not in ['Processing']:
return True
# Check for dict result with actual inference data
elif isinstance(stage_result, dict):
# Don't count "Processing" or "async" status as real results
if stage_result.get("status") in ["processing", "async"]:
continue
# Don't count empty results
if stage_result.get("result") == "Processing":
continue
# If we have a meaningful result, count it
return True
return False
def start(self):
"""Start the pipeline"""
# Clear previous FPS data when starting
with self.fps_lock:
self.fps_start_time = None
print(f"[{self.pipeline_name}] Starting pipeline...")
# Start all stages
for stage in self.stages:
stage.start()
# Start coordinator
self.running = True
self._stop_event.clear()
self.coordinator_thread = threading.Thread(target=self._coordinator_loop, daemon=True)
self.coordinator_thread.start()
print(f"[{self.pipeline_name}] Pipeline started successfully")
def stop(self):
"""Stop the pipeline gracefully"""
print(f"[{self.pipeline_name}] Stopping pipeline...")
self.running = False
self._stop_event.set()
# Stop coordinator
if self.coordinator_thread and self.coordinator_thread.is_alive():
try:
self.pipeline_input_queue.put(None, timeout=1.0)
except queue.Full:
pass
self.coordinator_thread.join(timeout=3.0)
# Stop all stages
for stage in self.stages:
stage.stop()
print(f"[{self.pipeline_name}] Pipeline stopped")
def _coordinator_loop(self):
"""Coordinate data flow between stages"""
print(f"[{self.pipeline_name}] Coordinator started")
while self.running and not self._stop_event.is_set():
try:
# Get input data
try:
input_data = self.pipeline_input_queue.get(timeout=0.1)
if input_data is None: # Sentinel
continue
except queue.Empty:
continue
# Create pipeline data
pipeline_data = PipelineData(
data=input_data,
metadata={'start_timestamp': time.time()},
stage_results={},
pipeline_id=f"pipeline_{self.pipeline_counter}",
timestamp=time.time()
)
self.pipeline_counter += 1
# Process through each stage
current_data = pipeline_data
success = True
for i, stage in enumerate(self.stages):
# Send data to stage
if not stage.put_data(current_data, timeout=1.0):
print(f"[{self.pipeline_name}] Stage {stage.stage_id} input queue full, dropping data")
success = False
break
# Get result from stage
result_data = None
timeout_start = time.time()
while time.time() - timeout_start < 10.0: # 10 second timeout per stage
result_data = stage.get_result(timeout=0.1)
if result_data:
break
if self._stop_event.is_set():
break
time.sleep(0.01)
if not result_data:
print(f"[{self.pipeline_name}] Stage {stage.stage_id} timeout")
success = False
break
current_data = result_data
# Final postprocessing
if success and self.final_postprocessor:
try:
if isinstance(current_data.data, dict) and 'raw_output' in current_data.data:
final_result = self.final_postprocessor.process(current_data.data['raw_output'])
current_data.stage_results['final'] = final_result
current_data.data = final_result
except Exception as e:
print(f"[{self.pipeline_name}] Final postprocessing error: {e}")
# Output result - but only if it's a real inference result, not async
if success:
# Check if we have valid inference results (not async/processing status)
has_valid_inference = self._has_valid_inference_result(current_data)
current_data.metadata['end_timestamp'] = time.time()
current_data.metadata['total_processing_time'] = (
current_data.metadata['end_timestamp'] -
current_data.metadata['start_timestamp']
)
# Only put valid inference results into output queue
if has_valid_inference:
# Manage output queue size - maintain fixed upper limit for memory management
MAX_OUTPUT_QUEUE_SIZE = 50 # Set maximum output queue size
# If queue is getting full, remove old results to make space
while self.pipeline_output_queue.qsize() >= MAX_OUTPUT_QUEUE_SIZE:
try:
dropped_result = self.pipeline_output_queue.get_nowait()
# Track dropped results for debugging
if not hasattr(self, '_dropped_results_count'):
self._dropped_results_count = 0
self._dropped_results_count += 1
except queue.Empty:
break
try:
self.pipeline_output_queue.put(current_data, block=False)
self.completed_counter += 1
# Record output timestamp for FPS calculation
self._record_output_timestamp()
# Debug: Log pipeline activity every 10 results
if self.completed_counter % 10 == 0:
print(f"[{self.pipeline_name}] Processed {self.completed_counter} results")
print(f"[{self.pipeline_name}] Queue sizes - Input: {self.pipeline_input_queue.qsize()}, Output: {self.pipeline_output_queue.qsize()}")
# Show dropped results info if any
if hasattr(self, '_dropped_results_count') and self._dropped_results_count > 0:
print(f"[{self.pipeline_name}] Dropped {self._dropped_results_count} old results for memory management")
# Call result callback for valid inference results
if self.result_callback:
self.result_callback(current_data)
except queue.Full:
# Fallback: should rarely happen due to pre-emptive cleaning above
print(f"[{self.pipeline_name}] Warning: Output queue still full after cleanup")
else:
self.error_counter += 1
if self.error_callback:
self.error_callback(current_data)
except Exception as e:
print(f"[{self.pipeline_name}] Coordinator error: {e}")
traceback.print_exc()
self.error_counter += 1
print(f"[{self.pipeline_name}] Coordinator stopped")
def put_data(self, data: Any, timeout: float = 1.0) -> bool:
"""Put data into pipeline with memory management"""
try:
self.pipeline_input_queue.put(data, timeout=timeout)
return True
except queue.Full:
# Drop oldest frames to make space for new ones (for real-time processing)
try:
dropped_data = self.pipeline_input_queue.get_nowait()
self.pipeline_input_queue.put(data, block=False)
# Track dropped frames for debugging
if not hasattr(self, '_dropped_frames_count'):
self._dropped_frames_count = 0
self._dropped_frames_count += 1
# Log occasionally to show frame dropping (every 50 drops)
if self._dropped_frames_count % 50 == 0:
print(f"[{self.pipeline_name}] Dropped {self._dropped_frames_count} input frames for real-time processing")
return True
except queue.Empty:
# Rare case: queue became empty between full check and get
try:
self.pipeline_input_queue.put(data, block=False)
return True
except queue.Full:
return False
def get_result(self, timeout: float = 0.1) -> Optional[PipelineData]:
"""Get result from pipeline"""
try:
return self.pipeline_output_queue.get(timeout=timeout)
except queue.Empty:
return None
def set_result_callback(self, callback: Callable[[PipelineData], None]):
"""Set callback for successful results"""
self.result_callback = callback
def set_error_callback(self, callback: Callable[[PipelineData], None]):
"""Set callback for errors"""
self.error_callback = callback
def set_stats_callback(self, callback: Callable[[Dict[str, Any]], None]):
"""Set callback for statistics"""
self.stats_callback = callback
def get_pipeline_statistics(self) -> Dict[str, Any]:
"""Get comprehensive pipeline statistics"""
stage_stats = []
for stage in self.stages:
stage_stats.append(stage.get_statistics())
return {
'pipeline_name': self.pipeline_name,
'total_stages': len(self.stages),
'pipeline_input_submitted': self.pipeline_counter,
'pipeline_completed': self.completed_counter,
'pipeline_errors': self.error_counter,
'pipeline_input_queue_size': self.pipeline_input_queue.qsize(),
'pipeline_output_queue_size': self.pipeline_output_queue.qsize(),
'current_fps': self.get_current_fps(), # Add real-time FPS
'stage_statistics': stage_stats
}
def start_stats_reporting(self, interval: float = 5.0):
"""Start periodic statistics reporting"""
def stats_loop():
while self.running:
if self.stats_callback:
stats = self.get_pipeline_statistics()
self.stats_callback(stats)
time.sleep(interval)
stats_thread = threading.Thread(target=stats_loop, daemon=True)
stats_thread.start()
# Utility functions for common inter-stage processing
def create_feature_extractor_preprocessor() -> PreProcessor:
"""Create preprocessor for feature extraction stage"""
def extract_features(frame, target_size):
# Example: extract edges, keypoints, etc.
import cv2
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
edges = cv2.Canny(gray, 50, 150)
return cv2.resize(edges, target_size)
return PreProcessor(resize_fn=extract_features)
def create_result_aggregator_postprocessor() -> PostProcessor:
"""Create postprocessor for aggregating multiple stage results"""
def aggregate_results(raw_output, **kwargs):
# Example: combine results from multiple stages
if isinstance(raw_output, dict):
# If raw_output is already processed results
return raw_output
# Standard processing
if raw_output.size > 0:
probability = float(raw_output[0])
return {
'aggregated_probability': probability,
'confidence': 'High' if probability > 0.8 else 'Medium' if probability > 0.5 else 'Low',
'result': 'Detected' if probability > 0.5 else 'Not Detected'
}
return {'aggregated_probability': 0.0, 'confidence': 'Low', 'result': 'Not Detected'}
return PostProcessor(process_fn=aggregate_results)