""" Multi-Series UI Bridge Converter This module provides a simplified bridge between the UI pipeline data and the MultiSeriesDongleManager system, making it easy to convert UI configurations to working multi-series inference pipelines. Key Features: - Direct conversion from UI pipeline data to MultiSeriesDongleManager config - Simplified interface for deployment system - Automatic validation and configuration generation - Support for both folder-based and individual file configurations Usage: from multi_series_converter import MultiSeriesConverter converter = MultiSeriesConverter() manager = converter.create_multi_series_manager(pipeline_data, ui_config) manager.start() sequence_id = manager.put_input(image, 'BGR565') result = manager.get_result() """ import os import sys from typing import Dict, Any, List, Tuple, Optional # Add parent directory to path for imports current_dir = os.path.dirname(__file__) parent_dir = os.path.dirname(os.path.dirname(current_dir)) sys.path.insert(0, parent_dir) try: from multi_series_dongle_manager import MultiSeriesDongleManager, DongleSeriesSpec MULTI_SERIES_AVAILABLE = True except ImportError as e: print(f"MultiSeriesDongleManager not available: {e}") MULTI_SERIES_AVAILABLE = False class MultiSeriesConverter: """Simplified converter for UI to MultiSeriesDongleManager bridge""" def __init__(self): self.series_specs = DongleSeriesSpec.SERIES_SPECS if MULTI_SERIES_AVAILABLE else { 0x100: {"name": "KL520", "gops": 3}, 0x720: {"name": "KL720", "gops": 28}, 0x630: {"name": "KL630", "gops": 400}, 0x730: {"name": "KL730", "gops": 1600}, 0x540: {"name": "KL540", "gops": 800} } def create_multi_series_manager(self, pipeline_data: Dict[str, Any], multi_series_config: Dict[str, Any]) -> Optional[MultiSeriesDongleManager]: """ Create and configure MultiSeriesDongleManager from UI data Args: pipeline_data: Pipeline data from UI (.mflow format) multi_series_config: Configuration from MultiSeriesConfigDialog Returns: Configured MultiSeriesDongleManager or None if creation fails """ if not MULTI_SERIES_AVAILABLE: print("MultiSeriesDongleManager not available") return None try: # Extract firmware and model paths firmware_paths, model_paths = self._extract_paths(multi_series_config) if not firmware_paths or not model_paths: print("Insufficient firmware or model paths") return None # Create and initialize manager manager = MultiSeriesDongleManager( max_queue_size=multi_series_config.get('max_queue_size', 100), result_buffer_size=multi_series_config.get('result_buffer_size', 1000) ) # Initialize devices success = manager.scan_and_initialize_devices(firmware_paths, model_paths) if not success: print("Failed to initialize multi-series devices") return None print("Multi-series manager created and initialized successfully") return manager except Exception as e: print(f"Error creating multi-series manager: {e}") return None def _extract_paths(self, multi_series_config: Dict[str, Any]) -> Tuple[Dict[str, Dict[str, str]], Dict[str, str]]: """Extract firmware and model paths from multi-series config""" config_mode = multi_series_config.get('config_mode', 'folder') enabled_series = multi_series_config.get('enabled_series', []) firmware_paths = {} model_paths = {} if config_mode == 'folder': firmware_paths, model_paths = self._extract_folder_paths(multi_series_config, enabled_series) else: firmware_paths, model_paths = self._extract_individual_paths(multi_series_config, enabled_series) return firmware_paths, model_paths def _extract_folder_paths(self, config: Dict[str, Any], enabled_series: List[str]) -> Tuple[Dict[str, Dict[str, str]], Dict[str, str]]: """Extract paths from folder-based configuration""" assets_folder = config.get('assets_folder', '') if not assets_folder or not os.path.exists(assets_folder): print(f"Assets folder not found: {assets_folder}") return {}, {} firmware_base = os.path.join(assets_folder, 'Firmware') models_base = os.path.join(assets_folder, 'Models') firmware_paths = {} model_paths = {} for series in enabled_series: series_name = f'KL{series}' if series.isdigit() else series # Firmware paths series_fw_dir = os.path.join(firmware_base, series_name) if os.path.exists(series_fw_dir): scpu_path = os.path.join(series_fw_dir, 'fw_scpu.bin') ncpu_path = os.path.join(series_fw_dir, 'fw_ncpu.bin') if os.path.exists(scpu_path) and os.path.exists(ncpu_path): firmware_paths[series_name] = { 'scpu': scpu_path, 'ncpu': ncpu_path } else: print(f"Warning: Missing firmware files for {series_name}") # Model paths - find first .nef file series_model_dir = os.path.join(models_base, series_name) if os.path.exists(series_model_dir): model_files = [f for f in os.listdir(series_model_dir) if f.endswith('.nef')] if model_files: model_paths[series_name] = os.path.join(series_model_dir, model_files[0]) else: print(f"Warning: No .nef model files found for {series_name}") return firmware_paths, model_paths def _extract_individual_paths(self, config: Dict[str, Any], enabled_series: List[str]) -> Tuple[Dict[str, Dict[str, str]], Dict[str, str]]: """Extract paths from individual file configuration""" individual_paths = config.get('individual_paths', {}) firmware_paths = {} model_paths = {} for series in enabled_series: series_name = f'KL{series}' if series.isdigit() else series if series_name in individual_paths: series_config = individual_paths[series_name] # Firmware paths scpu_path = series_config.get('scpu', '') ncpu_path = series_config.get('ncpu', '') if scpu_path and ncpu_path and os.path.exists(scpu_path) and os.path.exists(ncpu_path): firmware_paths[series_name] = { 'scpu': scpu_path, 'ncpu': ncpu_path } else: print(f"Warning: Invalid firmware paths for {series_name}") # Model path model_path = series_config.get('model', '') if model_path and os.path.exists(model_path): model_paths[series_name] = model_path else: print(f"Warning: Invalid model path for {series_name}") return firmware_paths, model_paths def validate_multi_series_config(self, multi_series_config: Dict[str, Any]) -> Tuple[bool, List[str]]: """ Validate multi-series configuration Args: multi_series_config: Configuration to validate Returns: Tuple of (is_valid, list_of_issues) """ issues = [] # Check enabled series enabled_series = multi_series_config.get('enabled_series', []) if not enabled_series: issues.append("No series enabled") # Check configuration mode config_mode = multi_series_config.get('config_mode', 'folder') if config_mode not in ['folder', 'individual']: issues.append("Invalid configuration mode") # Validate paths firmware_paths, model_paths = self._extract_paths(multi_series_config) if not firmware_paths: issues.append("No valid firmware paths found") if not model_paths: issues.append("No valid model paths found") # Check if all enabled series have both firmware and models for series in enabled_series: series_name = f'KL{series}' if series.isdigit() else series if series_name not in firmware_paths: issues.append(f"Missing firmware for {series_name}") if series_name not in model_paths: issues.append(f"Missing model for {series_name}") # Check port mapping port_mapping = multi_series_config.get('port_mapping', {}) if not port_mapping: issues.append("No port mappings configured") return len(issues) == 0, issues def generate_config_summary(self, multi_series_config: Dict[str, Any]) -> str: """Generate a human-readable summary of the configuration""" enabled_series = multi_series_config.get('enabled_series', []) config_mode = multi_series_config.get('config_mode', 'folder') port_mapping = multi_series_config.get('port_mapping', {}) summary = ["Multi-Series Configuration Summary", "=" * 40, ""] summary.append(f"Configuration Mode: {config_mode}") summary.append(f"Enabled Series: {', '.join(enabled_series)}") summary.append(f"Port Mappings: {len(port_mapping)}") summary.append("") # Firmware and model paths firmware_paths, model_paths = self._extract_paths(multi_series_config) summary.append("Firmware Configuration:") for series, fw_config in firmware_paths.items(): summary.append(f" {series}:") summary.append(f" SCPU: {fw_config.get('scpu', 'Not configured')}") summary.append(f" NCPU: {fw_config.get('ncpu', 'Not configured')}") summary.append("") summary.append("Model Configuration:") for series, model_path in model_paths.items(): model_name = os.path.basename(model_path) if model_path else "Not configured" summary.append(f" {series}: {model_name}") summary.append("") # Port mapping summary.append("Port Mapping:") if port_mapping: for port_id, series in port_mapping.items(): summary.append(f" Port {port_id}: {series}") else: summary.append(" No port mappings configured") return "\n".join(summary) def get_performance_estimate(self, multi_series_config: Dict[str, Any]) -> Dict[str, Any]: """Get estimated performance for the multi-series configuration""" enabled_series = multi_series_config.get('enabled_series', []) port_mapping = multi_series_config.get('port_mapping', {}) total_gops = 0 series_counts = {} # Count devices per series for port_id, series in port_mapping.items(): series_name = f'KL{series}' if series.isdigit() else series series_counts[series_name] = series_counts.get(series_name, 0) + 1 # Calculate total GOPS for series_name, count in series_counts.items(): # Find corresponding product_id for product_id, spec in self.series_specs.items(): if spec["name"] == series_name: gops = spec["gops"] * count total_gops += gops break # Estimate FPS improvement base_fps = 10 # Baseline single dongle FPS estimated_fps = min(base_fps * (total_gops / 10), base_fps * 5) # Cap at 5x improvement return { 'total_gops': total_gops, 'estimated_fps': estimated_fps, 'series_counts': series_counts, 'total_devices': len(port_mapping), 'load_balancing': 'automatic_by_gops' } # Convenience function for easy usage def create_multi_series_manager_from_ui(pipeline_data: Dict[str, Any], multi_series_config: Dict[str, Any]) -> Optional[MultiSeriesDongleManager]: """ Convenience function to create MultiSeriesDongleManager from UI data Args: pipeline_data: Pipeline data from UI (.mflow format) multi_series_config: Configuration from MultiSeriesConfigDialog Returns: Configured MultiSeriesDongleManager or None if creation fails """ converter = MultiSeriesConverter() return converter.create_multi_series_manager(pipeline_data, multi_series_config) # Example usage and testing if __name__ == "__main__": # Example configuration for testing example_multi_series_config = { 'language': 'en', 'enabled_series': ['KL520', 'KL720'], 'config_mode': 'folder', 'assets_folder': r'C:\MyProject\Assets', 'port_mapping': { 28: 'KL520', 32: 'KL720' }, 'max_queue_size': 100, 'result_buffer_size': 1000 } example_pipeline_data = { 'project_name': 'Test Multi-Series Pipeline', 'description': 'Testing multi-series configuration', 'nodes': [ {'id': '1', 'type': 'input', 'name': 'Camera Input'}, {'id': '2', 'type': 'model', 'name': 'Detection Model', 'custom_properties': {'multi_series_mode': True}}, {'id': '3', 'type': 'output', 'name': 'Display Output'} ] } try: converter = MultiSeriesConverter() # Validate configuration is_valid, issues = converter.validate_multi_series_config(example_multi_series_config) print("Multi-Series Converter Test") print("=" * 30) print(f"Configuration valid: {is_valid}") if issues: print("Issues found:") for issue in issues: print(f" - {issue}") # Generate summary print("\nConfiguration Summary:") print(converter.generate_config_summary(example_multi_series_config)) # Get performance estimate performance = converter.get_performance_estimate(example_multi_series_config) print(f"\nPerformance Estimate:") print(f" Total GOPS: {performance['total_gops']}") print(f" Estimated FPS: {performance['estimated_fps']:.1f}") print(f" Total devices: {performance['total_devices']}") # Try to create manager (will fail without hardware) if MULTI_SERIES_AVAILABLE: manager = converter.create_multi_series_manager( example_pipeline_data, example_multi_series_config ) if manager: print("\n✓ MultiSeriesDongleManager created successfully") manager.stop() # Clean shutdown else: print("\n✗ Failed to create MultiSeriesDongleManager (expected without hardware)") else: print("\n⚠ MultiSeriesDongleManager not available") except Exception as e: print(f"Error testing multi-series converter: {e}") import traceback traceback.print_exc()