cluster4npu/core/functions/multi_series_converter.py

398 lines
16 KiB
Python

"""
Multi-Series UI Bridge Converter
This module provides a simplified bridge between the UI pipeline data and the
MultiSeriesDongleManager system, making it easy to convert UI configurations
to working multi-series inference pipelines.
Key Features:
- Direct conversion from UI pipeline data to MultiSeriesDongleManager config
- Simplified interface for deployment system
- Automatic validation and configuration generation
- Support for both folder-based and individual file configurations
Usage:
from multi_series_converter import MultiSeriesConverter
converter = MultiSeriesConverter()
manager = converter.create_multi_series_manager(pipeline_data, ui_config)
manager.start()
sequence_id = manager.put_input(image, 'BGR565')
result = manager.get_result()
"""
import os
import sys
from typing import Dict, Any, List, Tuple, Optional
# Add parent directory to path for imports
current_dir = os.path.dirname(__file__)
parent_dir = os.path.dirname(os.path.dirname(current_dir))
sys.path.insert(0, parent_dir)
try:
from multi_series_dongle_manager import MultiSeriesDongleManager, DongleSeriesSpec
MULTI_SERIES_AVAILABLE = True
except ImportError as e:
print(f"MultiSeriesDongleManager not available: {e}")
MULTI_SERIES_AVAILABLE = False
class MultiSeriesConverter:
"""Simplified converter for UI to MultiSeriesDongleManager bridge"""
def __init__(self):
self.series_specs = DongleSeriesSpec.SERIES_SPECS if MULTI_SERIES_AVAILABLE else {
0x100: {"name": "KL520", "gops": 3},
0x720: {"name": "KL720", "gops": 28},
0x630: {"name": "KL630", "gops": 400},
0x730: {"name": "KL730", "gops": 1600},
0x540: {"name": "KL540", "gops": 800}
}
def create_multi_series_manager(self, pipeline_data: Dict[str, Any],
multi_series_config: Dict[str, Any]) -> Optional[MultiSeriesDongleManager]:
"""
Create and configure MultiSeriesDongleManager from UI data
Args:
pipeline_data: Pipeline data from UI (.mflow format)
multi_series_config: Configuration from MultiSeriesConfigDialog
Returns:
Configured MultiSeriesDongleManager or None if creation fails
"""
if not MULTI_SERIES_AVAILABLE:
print("MultiSeriesDongleManager not available")
return None
try:
# Extract firmware and model paths
firmware_paths, model_paths = self._extract_paths(multi_series_config)
if not firmware_paths or not model_paths:
print("Insufficient firmware or model paths")
return None
# Create and initialize manager
manager = MultiSeriesDongleManager(
max_queue_size=multi_series_config.get('max_queue_size', 100),
result_buffer_size=multi_series_config.get('result_buffer_size', 1000)
)
# Initialize devices
success = manager.scan_and_initialize_devices(firmware_paths, model_paths)
if not success:
print("Failed to initialize multi-series devices")
return None
print("Multi-series manager created and initialized successfully")
return manager
except Exception as e:
print(f"Error creating multi-series manager: {e}")
return None
def _extract_paths(self, multi_series_config: Dict[str, Any]) -> Tuple[Dict[str, Dict[str, str]], Dict[str, str]]:
"""Extract firmware and model paths from multi-series config"""
config_mode = multi_series_config.get('config_mode', 'folder')
enabled_series = multi_series_config.get('enabled_series', [])
firmware_paths = {}
model_paths = {}
if config_mode == 'folder':
firmware_paths, model_paths = self._extract_folder_paths(multi_series_config, enabled_series)
else:
firmware_paths, model_paths = self._extract_individual_paths(multi_series_config, enabled_series)
return firmware_paths, model_paths
def _extract_folder_paths(self, config: Dict[str, Any], enabled_series: List[str]) -> Tuple[Dict[str, Dict[str, str]], Dict[str, str]]:
"""Extract paths from folder-based configuration"""
assets_folder = config.get('assets_folder', '')
if not assets_folder or not os.path.exists(assets_folder):
print(f"Assets folder not found: {assets_folder}")
return {}, {}
firmware_base = os.path.join(assets_folder, 'Firmware')
models_base = os.path.join(assets_folder, 'Models')
firmware_paths = {}
model_paths = {}
for series in enabled_series:
series_name = f'KL{series}' if series.isdigit() else series
# Firmware paths
series_fw_dir = os.path.join(firmware_base, series_name)
if os.path.exists(series_fw_dir):
scpu_path = os.path.join(series_fw_dir, 'fw_scpu.bin')
ncpu_path = os.path.join(series_fw_dir, 'fw_ncpu.bin')
if os.path.exists(scpu_path) and os.path.exists(ncpu_path):
firmware_paths[series_name] = {
'scpu': scpu_path,
'ncpu': ncpu_path
}
else:
print(f"Warning: Missing firmware files for {series_name}")
# Model paths - find first .nef file
series_model_dir = os.path.join(models_base, series_name)
if os.path.exists(series_model_dir):
model_files = [f for f in os.listdir(series_model_dir) if f.endswith('.nef')]
if model_files:
model_paths[series_name] = os.path.join(series_model_dir, model_files[0])
else:
print(f"Warning: No .nef model files found for {series_name}")
return firmware_paths, model_paths
def _extract_individual_paths(self, config: Dict[str, Any], enabled_series: List[str]) -> Tuple[Dict[str, Dict[str, str]], Dict[str, str]]:
"""Extract paths from individual file configuration"""
individual_paths = config.get('individual_paths', {})
firmware_paths = {}
model_paths = {}
for series in enabled_series:
series_name = f'KL{series}' if series.isdigit() else series
if series_name in individual_paths:
series_config = individual_paths[series_name]
# Firmware paths
scpu_path = series_config.get('scpu', '')
ncpu_path = series_config.get('ncpu', '')
if scpu_path and ncpu_path and os.path.exists(scpu_path) and os.path.exists(ncpu_path):
firmware_paths[series_name] = {
'scpu': scpu_path,
'ncpu': ncpu_path
}
else:
print(f"Warning: Invalid firmware paths for {series_name}")
# Model path
model_path = series_config.get('model', '')
if model_path and os.path.exists(model_path):
model_paths[series_name] = model_path
else:
print(f"Warning: Invalid model path for {series_name}")
return firmware_paths, model_paths
def validate_multi_series_config(self, multi_series_config: Dict[str, Any]) -> Tuple[bool, List[str]]:
"""
Validate multi-series configuration
Args:
multi_series_config: Configuration to validate
Returns:
Tuple of (is_valid, list_of_issues)
"""
issues = []
# Check enabled series
enabled_series = multi_series_config.get('enabled_series', [])
if not enabled_series:
issues.append("No series enabled")
# Check configuration mode
config_mode = multi_series_config.get('config_mode', 'folder')
if config_mode not in ['folder', 'individual']:
issues.append("Invalid configuration mode")
# Validate paths
firmware_paths, model_paths = self._extract_paths(multi_series_config)
if not firmware_paths:
issues.append("No valid firmware paths found")
if not model_paths:
issues.append("No valid model paths found")
# Check if all enabled series have both firmware and models
for series in enabled_series:
series_name = f'KL{series}' if series.isdigit() else series
if series_name not in firmware_paths:
issues.append(f"Missing firmware for {series_name}")
if series_name not in model_paths:
issues.append(f"Missing model for {series_name}")
# Check port mapping
port_mapping = multi_series_config.get('port_mapping', {})
if not port_mapping:
issues.append("No port mappings configured")
return len(issues) == 0, issues
def generate_config_summary(self, multi_series_config: Dict[str, Any]) -> str:
"""Generate a human-readable summary of the configuration"""
enabled_series = multi_series_config.get('enabled_series', [])
config_mode = multi_series_config.get('config_mode', 'folder')
port_mapping = multi_series_config.get('port_mapping', {})
summary = ["Multi-Series Configuration Summary", "=" * 40, ""]
summary.append(f"Configuration Mode: {config_mode}")
summary.append(f"Enabled Series: {', '.join(enabled_series)}")
summary.append(f"Port Mappings: {len(port_mapping)}")
summary.append("")
# Firmware and model paths
firmware_paths, model_paths = self._extract_paths(multi_series_config)
summary.append("Firmware Configuration:")
for series, fw_config in firmware_paths.items():
summary.append(f" {series}:")
summary.append(f" SCPU: {fw_config.get('scpu', 'Not configured')}")
summary.append(f" NCPU: {fw_config.get('ncpu', 'Not configured')}")
summary.append("")
summary.append("Model Configuration:")
for series, model_path in model_paths.items():
model_name = os.path.basename(model_path) if model_path else "Not configured"
summary.append(f" {series}: {model_name}")
summary.append("")
# Port mapping
summary.append("Port Mapping:")
if port_mapping:
for port_id, series in port_mapping.items():
summary.append(f" Port {port_id}: {series}")
else:
summary.append(" No port mappings configured")
return "\n".join(summary)
def get_performance_estimate(self, multi_series_config: Dict[str, Any]) -> Dict[str, Any]:
"""Get estimated performance for the multi-series configuration"""
enabled_series = multi_series_config.get('enabled_series', [])
port_mapping = multi_series_config.get('port_mapping', {})
total_gops = 0
series_counts = {}
# Count devices per series
for port_id, series in port_mapping.items():
series_name = f'KL{series}' if series.isdigit() else series
series_counts[series_name] = series_counts.get(series_name, 0) + 1
# Calculate total GOPS
for series_name, count in series_counts.items():
# Find corresponding product_id
for product_id, spec in self.series_specs.items():
if spec["name"] == series_name:
gops = spec["gops"] * count
total_gops += gops
break
# Estimate FPS improvement
base_fps = 10 # Baseline single dongle FPS
estimated_fps = min(base_fps * (total_gops / 10), base_fps * 5) # Cap at 5x improvement
return {
'total_gops': total_gops,
'estimated_fps': estimated_fps,
'series_counts': series_counts,
'total_devices': len(port_mapping),
'load_balancing': 'automatic_by_gops'
}
# Convenience function for easy usage
def create_multi_series_manager_from_ui(pipeline_data: Dict[str, Any],
multi_series_config: Dict[str, Any]) -> Optional[MultiSeriesDongleManager]:
"""
Convenience function to create MultiSeriesDongleManager from UI data
Args:
pipeline_data: Pipeline data from UI (.mflow format)
multi_series_config: Configuration from MultiSeriesConfigDialog
Returns:
Configured MultiSeriesDongleManager or None if creation fails
"""
converter = MultiSeriesConverter()
return converter.create_multi_series_manager(pipeline_data, multi_series_config)
# Example usage and testing
if __name__ == "__main__":
# Example configuration for testing
example_multi_series_config = {
'language': 'en',
'enabled_series': ['KL520', 'KL720'],
'config_mode': 'folder',
'assets_folder': r'C:\MyProject\Assets',
'port_mapping': {
28: 'KL520',
32: 'KL720'
},
'max_queue_size': 100,
'result_buffer_size': 1000
}
example_pipeline_data = {
'project_name': 'Test Multi-Series Pipeline',
'description': 'Testing multi-series configuration',
'nodes': [
{'id': '1', 'type': 'input', 'name': 'Camera Input'},
{'id': '2', 'type': 'model', 'name': 'Detection Model',
'custom_properties': {'multi_series_mode': True}},
{'id': '3', 'type': 'output', 'name': 'Display Output'}
]
}
try:
converter = MultiSeriesConverter()
# Validate configuration
is_valid, issues = converter.validate_multi_series_config(example_multi_series_config)
print("Multi-Series Converter Test")
print("=" * 30)
print(f"Configuration valid: {is_valid}")
if issues:
print("Issues found:")
for issue in issues:
print(f" - {issue}")
# Generate summary
print("\nConfiguration Summary:")
print(converter.generate_config_summary(example_multi_series_config))
# Get performance estimate
performance = converter.get_performance_estimate(example_multi_series_config)
print(f"\nPerformance Estimate:")
print(f" Total GOPS: {performance['total_gops']}")
print(f" Estimated FPS: {performance['estimated_fps']:.1f}")
print(f" Total devices: {performance['total_devices']}")
# Try to create manager (will fail without hardware)
if MULTI_SERIES_AVAILABLE:
manager = converter.create_multi_series_manager(
example_pipeline_data,
example_multi_series_config
)
if manager:
print("\n✓ MultiSeriesDongleManager created successfully")
manager.stop() # Clean shutdown
else:
print("\n✗ Failed to create MultiSeriesDongleManager (expected without hardware)")
else:
print("\n⚠ MultiSeriesDongleManager not available")
except Exception as e:
print(f"Error testing multi-series converter: {e}")
import traceback
traceback.print_exc()