433 lines
17 KiB
Python
433 lines
17 KiB
Python
"""
|
|
Multi-Series Inference Pipeline
|
|
|
|
This module extends the InferencePipeline to support multi-series dongle configurations
|
|
using the MultiSeriesDongleManager for improved performance across different dongle series.
|
|
|
|
Main Components:
|
|
- MultiSeriesPipelineStage: Pipeline stage supporting both single and multi-series modes
|
|
- Enhanced InferencePipeline with multi-series support
|
|
- Configuration adapters for seamless integration
|
|
|
|
Usage:
|
|
from core.functions.multi_series_pipeline import MultiSeriesInferencePipeline
|
|
|
|
# Multi-series configuration
|
|
config = MultiSeriesStageConfig(
|
|
stage_id="detection",
|
|
multi_series_mode=True,
|
|
firmware_paths={"KL520": {"scpu": "...", "ncpu": "..."}, ...},
|
|
model_paths={"KL520": "...", "KL720": "..."}
|
|
)
|
|
"""
|
|
|
|
from typing import List, Dict, Any, Optional, Callable, Union
|
|
import threading
|
|
import queue
|
|
import time
|
|
import traceback
|
|
import numpy as np
|
|
from dataclasses import dataclass
|
|
|
|
# Import existing pipeline components
|
|
from .InferencePipeline import (
|
|
PipelineData, InferencePipeline, PreProcessor, PostProcessor, DataProcessor
|
|
)
|
|
from .Multidongle import MultiDongle
|
|
|
|
# Import multi-series manager
|
|
import sys
|
|
import os
|
|
sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(__file__))))
|
|
from multi_series_dongle_manager import MultiSeriesDongleManager
|
|
|
|
|
|
@dataclass
|
|
class MultiSeriesStageConfig:
|
|
"""Enhanced configuration for multi-series pipeline stages"""
|
|
stage_id: str
|
|
max_queue_size: int = 100
|
|
|
|
# Multi-series mode configuration
|
|
multi_series_mode: bool = False
|
|
firmware_paths: Optional[Dict[str, Dict[str, str]]] = None # {"KL520": {"scpu": path, "ncpu": path}}
|
|
model_paths: Optional[Dict[str, str]] = None # {"KL520": model_path, "KL720": model_path}
|
|
result_buffer_size: int = 1000
|
|
|
|
# Single-series mode configuration (backward compatibility)
|
|
port_ids: Optional[List[int]] = None
|
|
scpu_fw_path: Optional[str] = None
|
|
ncpu_fw_path: Optional[str] = None
|
|
model_path: Optional[str] = None
|
|
upload_fw: bool = False
|
|
|
|
# Processing configuration
|
|
input_preprocessor: Optional[PreProcessor] = None
|
|
output_postprocessor: Optional[PostProcessor] = None
|
|
stage_preprocessor: Optional[PreProcessor] = None
|
|
stage_postprocessor: Optional[PostProcessor] = None
|
|
|
|
|
|
class MultiSeriesPipelineStage:
|
|
"""Enhanced pipeline stage supporting both single and multi-series modes"""
|
|
|
|
def __init__(self, config: MultiSeriesStageConfig):
|
|
self.config = config
|
|
self.stage_id = config.stage_id
|
|
|
|
# Initialize inference engine based on mode
|
|
if config.multi_series_mode:
|
|
# Multi-series mode using MultiSeriesDongleManager
|
|
self.inference_engine = MultiSeriesDongleManager(
|
|
max_queue_size=config.max_queue_size,
|
|
result_buffer_size=config.result_buffer_size
|
|
)
|
|
self.is_multi_series = True
|
|
else:
|
|
# Single-series mode using MultiDongle (backward compatibility)
|
|
self.inference_engine = MultiDongle(
|
|
port_id=config.port_ids or [],
|
|
scpu_fw_path=config.scpu_fw_path or "",
|
|
ncpu_fw_path=config.ncpu_fw_path or "",
|
|
model_path=config.model_path or "",
|
|
upload_fw=config.upload_fw,
|
|
max_queue_size=config.max_queue_size
|
|
)
|
|
self.is_multi_series = False
|
|
|
|
# Store processors
|
|
self.input_preprocessor = config.input_preprocessor
|
|
self.output_postprocessor = config.output_postprocessor
|
|
|
|
# Threading for this stage
|
|
self.input_queue = queue.Queue(maxsize=config.max_queue_size)
|
|
self.output_queue = queue.Queue(maxsize=config.max_queue_size)
|
|
self.worker_thread = None
|
|
self.running = False
|
|
self._stop_event = threading.Event()
|
|
|
|
# Statistics
|
|
self.processed_count = 0
|
|
self.error_count = 0
|
|
self.processing_times = []
|
|
|
|
def initialize(self):
|
|
"""Initialize the stage"""
|
|
print(f"[Stage {self.stage_id}] Initializing {'multi-series' if self.is_multi_series else 'single-series'} mode...")
|
|
|
|
try:
|
|
if self.is_multi_series:
|
|
# Initialize multi-series manager
|
|
if not self.inference_engine.scan_and_initialize_devices(
|
|
self.config.firmware_paths,
|
|
self.config.model_paths
|
|
):
|
|
raise RuntimeError("Failed to initialize multi-series dongles")
|
|
print(f"[Stage {self.stage_id}] Multi-series dongles initialized successfully")
|
|
else:
|
|
# Initialize single-series MultiDongle
|
|
self.inference_engine.initialize()
|
|
print(f"[Stage {self.stage_id}] Single-series dongle initialized successfully")
|
|
|
|
except Exception as e:
|
|
print(f"[Stage {self.stage_id}] Initialization failed: {e}")
|
|
raise
|
|
|
|
def start(self):
|
|
"""Start the stage worker thread"""
|
|
if self.worker_thread and self.worker_thread.is_alive():
|
|
return
|
|
|
|
self.running = True
|
|
self._stop_event.clear()
|
|
|
|
# Start inference engine
|
|
if self.is_multi_series:
|
|
self.inference_engine.start()
|
|
else:
|
|
self.inference_engine.start()
|
|
|
|
# Start worker thread
|
|
self.worker_thread = threading.Thread(target=self._worker_loop, daemon=True)
|
|
self.worker_thread.start()
|
|
print(f"[Stage {self.stage_id}] Worker thread started")
|
|
|
|
def stop(self):
|
|
"""Stop the stage gracefully"""
|
|
print(f"[Stage {self.stage_id}] Stopping...")
|
|
self.running = False
|
|
self._stop_event.set()
|
|
|
|
# Put sentinel to unblock worker
|
|
try:
|
|
self.input_queue.put(None, timeout=1.0)
|
|
except queue.Full:
|
|
pass
|
|
|
|
# Wait for worker thread
|
|
if self.worker_thread and self.worker_thread.is_alive():
|
|
self.worker_thread.join(timeout=3.0)
|
|
|
|
# Stop inference engine
|
|
if self.is_multi_series:
|
|
self.inference_engine.stop()
|
|
else:
|
|
self.inference_engine.stop()
|
|
print(f"[Stage {self.stage_id}] Stopped")
|
|
|
|
def _worker_loop(self):
|
|
"""Main worker loop for processing data"""
|
|
print(f"[Stage {self.stage_id}] Worker loop started")
|
|
|
|
while self.running and not self._stop_event.is_set():
|
|
try:
|
|
# Get input data
|
|
try:
|
|
pipeline_data = self.input_queue.get(timeout=1.0)
|
|
if pipeline_data is None: # Sentinel value
|
|
continue
|
|
except queue.Empty:
|
|
if self._stop_event.is_set():
|
|
break
|
|
continue
|
|
|
|
start_time = time.time()
|
|
|
|
# Process data through this stage
|
|
processed_data = self._process_data(pipeline_data)
|
|
|
|
# Only count and record timing for actual inference results
|
|
if processed_data and self._has_inference_result(processed_data):
|
|
processing_time = time.time() - start_time
|
|
self.processing_times.append(processing_time)
|
|
if len(self.processing_times) > 1000:
|
|
self.processing_times = self.processing_times[-500:]
|
|
|
|
self.processed_count += 1
|
|
|
|
# Put result to output queue
|
|
try:
|
|
self.output_queue.put(processed_data, block=False)
|
|
except queue.Full:
|
|
# Drop oldest and add new
|
|
try:
|
|
self.output_queue.get_nowait()
|
|
self.output_queue.put(processed_data, block=False)
|
|
except queue.Empty:
|
|
pass
|
|
|
|
except Exception as e:
|
|
self.error_count += 1
|
|
print(f"[Stage {self.stage_id}] Processing error: {e}")
|
|
traceback.print_exc()
|
|
|
|
print(f"[Stage {self.stage_id}] Worker loop stopped")
|
|
|
|
def _has_inference_result(self, processed_data) -> bool:
|
|
"""Check if processed_data contains a valid inference result"""
|
|
if not processed_data:
|
|
return False
|
|
|
|
try:
|
|
if hasattr(processed_data, 'stage_results') and processed_data.stage_results:
|
|
stage_result = processed_data.stage_results.get(self.stage_id)
|
|
if stage_result:
|
|
if isinstance(stage_result, tuple) and len(stage_result) == 2:
|
|
prob, result_str = stage_result
|
|
return prob is not None and result_str is not None and result_str != 'Processing'
|
|
elif isinstance(stage_result, dict):
|
|
if stage_result.get("status") in ["processing", "async"]:
|
|
return False
|
|
if stage_result.get("result") == "Processing":
|
|
return False
|
|
return True
|
|
else:
|
|
return stage_result is not None
|
|
except Exception:
|
|
pass
|
|
|
|
return False
|
|
|
|
def _process_data(self, pipeline_data: PipelineData) -> PipelineData:
|
|
"""Process data through this stage"""
|
|
try:
|
|
current_data = pipeline_data.data
|
|
|
|
# Step 1: Input preprocessing (inter-stage)
|
|
if self.input_preprocessor and isinstance(current_data, np.ndarray):
|
|
if self.is_multi_series:
|
|
# For multi-series, we may need different preprocessing
|
|
current_data = self.input_preprocessor.process(current_data, (640, 640), 'BGR565')
|
|
else:
|
|
current_data = self.input_preprocessor.process(
|
|
current_data,
|
|
self.inference_engine.model_input_shape,
|
|
'BGR565'
|
|
)
|
|
|
|
# Step 2: Inference
|
|
inference_result = None
|
|
|
|
if isinstance(current_data, np.ndarray) and len(current_data.shape) == 3:
|
|
if self.is_multi_series:
|
|
# Multi-series inference
|
|
sequence_id = self.inference_engine.put_input(current_data, 'BGR565')
|
|
|
|
# Try to get result (non-blocking for async processing)
|
|
result = self.inference_engine.get_result(timeout=0.1)
|
|
|
|
if result is not None:
|
|
# Extract actual inference data from MultiSeriesDongleManager result
|
|
if hasattr(result, 'result') and result.result:
|
|
if isinstance(result.result, tuple) and len(result.result) == 2:
|
|
inference_result = result.result
|
|
else:
|
|
inference_result = result.result
|
|
else:
|
|
inference_result = {'probability': 0.0, 'result': 'Processing', 'status': 'async'}
|
|
else:
|
|
inference_result = {'probability': 0.0, 'result': 'Processing', 'status': 'async'}
|
|
|
|
else:
|
|
# Single-series inference (existing behavior)
|
|
processed_data = self.inference_engine.preprocess_frame(current_data, 'BGR565')
|
|
if processed_data is not None:
|
|
self.inference_engine.put_input(processed_data, 'BGR565')
|
|
|
|
# Get inference result
|
|
result = self.inference_engine.get_latest_inference_result()
|
|
|
|
if result is not None:
|
|
if isinstance(result, tuple) and len(result) == 2:
|
|
inference_result = result
|
|
elif isinstance(result, dict) and result:
|
|
inference_result = result
|
|
else:
|
|
inference_result = result
|
|
else:
|
|
inference_result = {'probability': 0.0, 'result': 'Processing', 'status': 'async'}
|
|
|
|
# Step 3: Update pipeline data
|
|
if not inference_result:
|
|
inference_result = {'probability': 0.0, 'result': 'Processing', 'status': 'async'}
|
|
|
|
pipeline_data.stage_results[self.stage_id] = inference_result
|
|
pipeline_data.data = inference_result
|
|
pipeline_data.metadata[f'{self.stage_id}_timestamp'] = time.time()
|
|
|
|
return pipeline_data
|
|
|
|
except Exception as e:
|
|
print(f"[Stage {self.stage_id}] Data processing error: {e}")
|
|
pipeline_data.stage_results[self.stage_id] = {
|
|
'error': str(e),
|
|
'probability': 0.0,
|
|
'result': 'Processing Error'
|
|
}
|
|
return pipeline_data
|
|
|
|
def put_data(self, data: PipelineData, timeout: float = 1.0) -> bool:
|
|
"""Put data into this stage's input queue"""
|
|
try:
|
|
self.input_queue.put(data, timeout=timeout)
|
|
return True
|
|
except queue.Full:
|
|
return False
|
|
|
|
def get_result(self, timeout: float = 0.1) -> Optional[PipelineData]:
|
|
"""Get result from this stage's output queue"""
|
|
try:
|
|
return self.output_queue.get(timeout=timeout)
|
|
except queue.Empty:
|
|
return None
|
|
|
|
def get_statistics(self) -> Dict[str, Any]:
|
|
"""Get stage statistics"""
|
|
avg_processing_time = (
|
|
sum(self.processing_times) / len(self.processing_times)
|
|
if self.processing_times else 0.0
|
|
)
|
|
|
|
# Get engine-specific statistics
|
|
if self.is_multi_series:
|
|
engine_stats = self.inference_engine.get_statistics()
|
|
else:
|
|
engine_stats = self.inference_engine.get_statistics()
|
|
|
|
return {
|
|
'stage_id': self.stage_id,
|
|
'mode': 'multi-series' if self.is_multi_series else 'single-series',
|
|
'processed_count': self.processed_count,
|
|
'error_count': self.error_count,
|
|
'avg_processing_time': avg_processing_time,
|
|
'input_queue_size': self.input_queue.qsize(),
|
|
'output_queue_size': self.output_queue.qsize(),
|
|
'engine_stats': engine_stats
|
|
}
|
|
|
|
|
|
class MultiSeriesInferencePipeline(InferencePipeline):
|
|
"""Enhanced inference pipeline with multi-series support"""
|
|
|
|
def __init__(self, stage_configs: List[MultiSeriesStageConfig],
|
|
final_postprocessor: Optional[PostProcessor] = None,
|
|
pipeline_name: str = "MultiSeriesInferencePipeline"):
|
|
"""
|
|
Initialize multi-series inference pipeline
|
|
"""
|
|
self.pipeline_name = pipeline_name
|
|
self.stage_configs = stage_configs
|
|
self.final_postprocessor = final_postprocessor
|
|
|
|
# Create enhanced stages
|
|
self.stages: List[MultiSeriesPipelineStage] = []
|
|
for config in stage_configs:
|
|
stage = MultiSeriesPipelineStage(config)
|
|
self.stages.append(stage)
|
|
|
|
# Initialize other components from parent class
|
|
self.coordinator_thread = None
|
|
self.running = False
|
|
self._stop_event = threading.Event()
|
|
|
|
self.pipeline_input_queue = queue.Queue(maxsize=100)
|
|
self.pipeline_output_queue = queue.Queue(maxsize=100)
|
|
|
|
self.result_callback = None
|
|
self.error_callback = None
|
|
self.stats_callback = None
|
|
|
|
self.pipeline_counter = 0
|
|
self.completed_counter = 0
|
|
self.error_counter = 0
|
|
|
|
self.fps_start_time = None
|
|
self.fps_lock = threading.Lock()
|
|
|
|
|
|
def create_multi_series_config_from_model_node(model_config: Dict[str, Any]) -> MultiSeriesStageConfig:
|
|
"""
|
|
Create MultiSeriesStageConfig from model node configuration
|
|
"""
|
|
if model_config.get('multi_series_mode', False):
|
|
# Multi-series configuration
|
|
return MultiSeriesStageConfig(
|
|
stage_id=model_config.get('node_name', 'inference_stage'),
|
|
multi_series_mode=True,
|
|
firmware_paths=model_config.get('firmware_paths'),
|
|
model_paths=model_config.get('model_paths'),
|
|
max_queue_size=model_config.get('max_queue_size', 100),
|
|
result_buffer_size=model_config.get('result_buffer_size', 1000)
|
|
)
|
|
else:
|
|
# Single-series configuration (backward compatibility)
|
|
return MultiSeriesStageConfig(
|
|
stage_id=model_config.get('node_name', 'inference_stage'),
|
|
multi_series_mode=False,
|
|
port_ids=[], # Will be auto-detected
|
|
scpu_fw_path=model_config.get('scpu_fw_path'),
|
|
ncpu_fw_path=model_config.get('ncpu_fw_path'),
|
|
model_path=model_config.get('model_path'),
|
|
upload_fw=True,
|
|
max_queue_size=model_config.get('max_queue_size', 50)
|
|
) |