cluster4npu/core/functions/multi_series_pipeline.py

433 lines
17 KiB
Python

"""
Multi-Series Inference Pipeline
This module extends the InferencePipeline to support multi-series dongle configurations
using the MultiSeriesDongleManager for improved performance across different dongle series.
Main Components:
- MultiSeriesPipelineStage: Pipeline stage supporting both single and multi-series modes
- Enhanced InferencePipeline with multi-series support
- Configuration adapters for seamless integration
Usage:
from core.functions.multi_series_pipeline import MultiSeriesInferencePipeline
# Multi-series configuration
config = MultiSeriesStageConfig(
stage_id="detection",
multi_series_mode=True,
firmware_paths={"KL520": {"scpu": "...", "ncpu": "..."}, ...},
model_paths={"KL520": "...", "KL720": "..."}
)
"""
from typing import List, Dict, Any, Optional, Callable, Union
import threading
import queue
import time
import traceback
import numpy as np
from dataclasses import dataclass
# Import existing pipeline components
from .InferencePipeline import (
PipelineData, InferencePipeline, PreProcessor, PostProcessor, DataProcessor
)
from .Multidongle import MultiDongle
# Import multi-series manager
import sys
import os
sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(__file__))))
from multi_series_dongle_manager import MultiSeriesDongleManager
@dataclass
class MultiSeriesStageConfig:
"""Enhanced configuration for multi-series pipeline stages"""
stage_id: str
max_queue_size: int = 100
# Multi-series mode configuration
multi_series_mode: bool = False
firmware_paths: Optional[Dict[str, Dict[str, str]]] = None # {"KL520": {"scpu": path, "ncpu": path}}
model_paths: Optional[Dict[str, str]] = None # {"KL520": model_path, "KL720": model_path}
result_buffer_size: int = 1000
# Single-series mode configuration (backward compatibility)
port_ids: Optional[List[int]] = None
scpu_fw_path: Optional[str] = None
ncpu_fw_path: Optional[str] = None
model_path: Optional[str] = None
upload_fw: bool = False
# Processing configuration
input_preprocessor: Optional[PreProcessor] = None
output_postprocessor: Optional[PostProcessor] = None
stage_preprocessor: Optional[PreProcessor] = None
stage_postprocessor: Optional[PostProcessor] = None
class MultiSeriesPipelineStage:
"""Enhanced pipeline stage supporting both single and multi-series modes"""
def __init__(self, config: MultiSeriesStageConfig):
self.config = config
self.stage_id = config.stage_id
# Initialize inference engine based on mode
if config.multi_series_mode:
# Multi-series mode using MultiSeriesDongleManager
self.inference_engine = MultiSeriesDongleManager(
max_queue_size=config.max_queue_size,
result_buffer_size=config.result_buffer_size
)
self.is_multi_series = True
else:
# Single-series mode using MultiDongle (backward compatibility)
self.inference_engine = MultiDongle(
port_id=config.port_ids or [],
scpu_fw_path=config.scpu_fw_path or "",
ncpu_fw_path=config.ncpu_fw_path or "",
model_path=config.model_path or "",
upload_fw=config.upload_fw,
max_queue_size=config.max_queue_size
)
self.is_multi_series = False
# Store processors
self.input_preprocessor = config.input_preprocessor
self.output_postprocessor = config.output_postprocessor
# Threading for this stage
self.input_queue = queue.Queue(maxsize=config.max_queue_size)
self.output_queue = queue.Queue(maxsize=config.max_queue_size)
self.worker_thread = None
self.running = False
self._stop_event = threading.Event()
# Statistics
self.processed_count = 0
self.error_count = 0
self.processing_times = []
def initialize(self):
"""Initialize the stage"""
print(f"[Stage {self.stage_id}] Initializing {'multi-series' if self.is_multi_series else 'single-series'} mode...")
try:
if self.is_multi_series:
# Initialize multi-series manager
if not self.inference_engine.scan_and_initialize_devices(
self.config.firmware_paths,
self.config.model_paths
):
raise RuntimeError("Failed to initialize multi-series dongles")
print(f"[Stage {self.stage_id}] Multi-series dongles initialized successfully")
else:
# Initialize single-series MultiDongle
self.inference_engine.initialize()
print(f"[Stage {self.stage_id}] Single-series dongle initialized successfully")
except Exception as e:
print(f"[Stage {self.stage_id}] Initialization failed: {e}")
raise
def start(self):
"""Start the stage worker thread"""
if self.worker_thread and self.worker_thread.is_alive():
return
self.running = True
self._stop_event.clear()
# Start inference engine
if self.is_multi_series:
self.inference_engine.start()
else:
self.inference_engine.start()
# Start worker thread
self.worker_thread = threading.Thread(target=self._worker_loop, daemon=True)
self.worker_thread.start()
print(f"[Stage {self.stage_id}] Worker thread started")
def stop(self):
"""Stop the stage gracefully"""
print(f"[Stage {self.stage_id}] Stopping...")
self.running = False
self._stop_event.set()
# Put sentinel to unblock worker
try:
self.input_queue.put(None, timeout=1.0)
except queue.Full:
pass
# Wait for worker thread
if self.worker_thread and self.worker_thread.is_alive():
self.worker_thread.join(timeout=3.0)
# Stop inference engine
if self.is_multi_series:
self.inference_engine.stop()
else:
self.inference_engine.stop()
print(f"[Stage {self.stage_id}] Stopped")
def _worker_loop(self):
"""Main worker loop for processing data"""
print(f"[Stage {self.stage_id}] Worker loop started")
while self.running and not self._stop_event.is_set():
try:
# Get input data
try:
pipeline_data = self.input_queue.get(timeout=1.0)
if pipeline_data is None: # Sentinel value
continue
except queue.Empty:
if self._stop_event.is_set():
break
continue
start_time = time.time()
# Process data through this stage
processed_data = self._process_data(pipeline_data)
# Only count and record timing for actual inference results
if processed_data and self._has_inference_result(processed_data):
processing_time = time.time() - start_time
self.processing_times.append(processing_time)
if len(self.processing_times) > 1000:
self.processing_times = self.processing_times[-500:]
self.processed_count += 1
# Put result to output queue
try:
self.output_queue.put(processed_data, block=False)
except queue.Full:
# Drop oldest and add new
try:
self.output_queue.get_nowait()
self.output_queue.put(processed_data, block=False)
except queue.Empty:
pass
except Exception as e:
self.error_count += 1
print(f"[Stage {self.stage_id}] Processing error: {e}")
traceback.print_exc()
print(f"[Stage {self.stage_id}] Worker loop stopped")
def _has_inference_result(self, processed_data) -> bool:
"""Check if processed_data contains a valid inference result"""
if not processed_data:
return False
try:
if hasattr(processed_data, 'stage_results') and processed_data.stage_results:
stage_result = processed_data.stage_results.get(self.stage_id)
if stage_result:
if isinstance(stage_result, tuple) and len(stage_result) == 2:
prob, result_str = stage_result
return prob is not None and result_str is not None and result_str != 'Processing'
elif isinstance(stage_result, dict):
if stage_result.get("status") in ["processing", "async"]:
return False
if stage_result.get("result") == "Processing":
return False
return True
else:
return stage_result is not None
except Exception:
pass
return False
def _process_data(self, pipeline_data: PipelineData) -> PipelineData:
"""Process data through this stage"""
try:
current_data = pipeline_data.data
# Step 1: Input preprocessing (inter-stage)
if self.input_preprocessor and isinstance(current_data, np.ndarray):
if self.is_multi_series:
# For multi-series, we may need different preprocessing
current_data = self.input_preprocessor.process(current_data, (640, 640), 'BGR565')
else:
current_data = self.input_preprocessor.process(
current_data,
self.inference_engine.model_input_shape,
'BGR565'
)
# Step 2: Inference
inference_result = None
if isinstance(current_data, np.ndarray) and len(current_data.shape) == 3:
if self.is_multi_series:
# Multi-series inference
sequence_id = self.inference_engine.put_input(current_data, 'BGR565')
# Try to get result (non-blocking for async processing)
result = self.inference_engine.get_result(timeout=0.1)
if result is not None:
# Extract actual inference data from MultiSeriesDongleManager result
if hasattr(result, 'result') and result.result:
if isinstance(result.result, tuple) and len(result.result) == 2:
inference_result = result.result
else:
inference_result = result.result
else:
inference_result = {'probability': 0.0, 'result': 'Processing', 'status': 'async'}
else:
inference_result = {'probability': 0.0, 'result': 'Processing', 'status': 'async'}
else:
# Single-series inference (existing behavior)
processed_data = self.inference_engine.preprocess_frame(current_data, 'BGR565')
if processed_data is not None:
self.inference_engine.put_input(processed_data, 'BGR565')
# Get inference result
result = self.inference_engine.get_latest_inference_result()
if result is not None:
if isinstance(result, tuple) and len(result) == 2:
inference_result = result
elif isinstance(result, dict) and result:
inference_result = result
else:
inference_result = result
else:
inference_result = {'probability': 0.0, 'result': 'Processing', 'status': 'async'}
# Step 3: Update pipeline data
if not inference_result:
inference_result = {'probability': 0.0, 'result': 'Processing', 'status': 'async'}
pipeline_data.stage_results[self.stage_id] = inference_result
pipeline_data.data = inference_result
pipeline_data.metadata[f'{self.stage_id}_timestamp'] = time.time()
return pipeline_data
except Exception as e:
print(f"[Stage {self.stage_id}] Data processing error: {e}")
pipeline_data.stage_results[self.stage_id] = {
'error': str(e),
'probability': 0.0,
'result': 'Processing Error'
}
return pipeline_data
def put_data(self, data: PipelineData, timeout: float = 1.0) -> bool:
"""Put data into this stage's input queue"""
try:
self.input_queue.put(data, timeout=timeout)
return True
except queue.Full:
return False
def get_result(self, timeout: float = 0.1) -> Optional[PipelineData]:
"""Get result from this stage's output queue"""
try:
return self.output_queue.get(timeout=timeout)
except queue.Empty:
return None
def get_statistics(self) -> Dict[str, Any]:
"""Get stage statistics"""
avg_processing_time = (
sum(self.processing_times) / len(self.processing_times)
if self.processing_times else 0.0
)
# Get engine-specific statistics
if self.is_multi_series:
engine_stats = self.inference_engine.get_statistics()
else:
engine_stats = self.inference_engine.get_statistics()
return {
'stage_id': self.stage_id,
'mode': 'multi-series' if self.is_multi_series else 'single-series',
'processed_count': self.processed_count,
'error_count': self.error_count,
'avg_processing_time': avg_processing_time,
'input_queue_size': self.input_queue.qsize(),
'output_queue_size': self.output_queue.qsize(),
'engine_stats': engine_stats
}
class MultiSeriesInferencePipeline(InferencePipeline):
"""Enhanced inference pipeline with multi-series support"""
def __init__(self, stage_configs: List[MultiSeriesStageConfig],
final_postprocessor: Optional[PostProcessor] = None,
pipeline_name: str = "MultiSeriesInferencePipeline"):
"""
Initialize multi-series inference pipeline
"""
self.pipeline_name = pipeline_name
self.stage_configs = stage_configs
self.final_postprocessor = final_postprocessor
# Create enhanced stages
self.stages: List[MultiSeriesPipelineStage] = []
for config in stage_configs:
stage = MultiSeriesPipelineStage(config)
self.stages.append(stage)
# Initialize other components from parent class
self.coordinator_thread = None
self.running = False
self._stop_event = threading.Event()
self.pipeline_input_queue = queue.Queue(maxsize=100)
self.pipeline_output_queue = queue.Queue(maxsize=100)
self.result_callback = None
self.error_callback = None
self.stats_callback = None
self.pipeline_counter = 0
self.completed_counter = 0
self.error_counter = 0
self.fps_start_time = None
self.fps_lock = threading.Lock()
def create_multi_series_config_from_model_node(model_config: Dict[str, Any]) -> MultiSeriesStageConfig:
"""
Create MultiSeriesStageConfig from model node configuration
"""
if model_config.get('multi_series_mode', False):
# Multi-series configuration
return MultiSeriesStageConfig(
stage_id=model_config.get('node_name', 'inference_stage'),
multi_series_mode=True,
firmware_paths=model_config.get('firmware_paths'),
model_paths=model_config.get('model_paths'),
max_queue_size=model_config.get('max_queue_size', 100),
result_buffer_size=model_config.get('result_buffer_size', 1000)
)
else:
# Single-series configuration (backward compatibility)
return MultiSeriesStageConfig(
stage_id=model_config.get('node_name', 'inference_stage'),
multi_series_mode=False,
port_ids=[], # Will be auto-detected
scpu_fw_path=model_config.get('scpu_fw_path'),
ncpu_fw_path=model_config.get('ncpu_fw_path'),
model_path=model_config.get('model_path'),
upload_fw=True,
max_queue_size=model_config.get('max_queue_size', 50)
)