#!/usr/bin/env python3 """ UI Configuration and Integration Settings ========================================= This module provides configuration settings and helper functions for integrating the UI application with cluster4npu tools. """ import os import json from typing import Dict, List, Any, Optional from dataclasses import dataclass, asdict @dataclass class UISettings: """UI application settings""" theme: str = "harmonious_dark" auto_save_interval: int = 300 # seconds max_recent_files: int = 10 default_dongle_count: int = 16 default_fw_paths: Dict[str, str] = None def __post_init__(self): if self.default_fw_paths is None: self.default_fw_paths = { "scpu": "fw_scpu.bin", "ncpu": "fw_ncpu.bin" } @dataclass class ClusterConfig: """Cluster hardware configuration""" available_dongles: int = 16 dongle_series: str = "KL520" port_range_start: int = 28 port_range_end: int = 60 power_limit_watts: int = 200 cooling_type: str = "standard" class UIIntegration: """Integration layer between UI and cluster4npu tools""" def __init__(self, config_path: Optional[str] = None): self.config_path = config_path or os.path.expanduser("~/.cluster4npu_ui_config.json") self.ui_settings = UISettings() self.cluster_config = ClusterConfig() self.load_config() def load_config(self): """Load configuration from file""" try: if os.path.exists(self.config_path): with open(self.config_path, 'r') as f: data = json.load(f) if 'ui_settings' in data: self.ui_settings = UISettings(**data['ui_settings']) if 'cluster_config' in data: self.cluster_config = ClusterConfig(**data['cluster_config']) except Exception as e: print(f"Warning: Could not load UI config: {e}") def save_config(self): """Save configuration to file""" try: data = { 'ui_settings': asdict(self.ui_settings), 'cluster_config': asdict(self.cluster_config) } with open(self.config_path, 'w') as f: json.dump(data, f, indent=2) except Exception as e: print(f"Warning: Could not save UI config: {e}") def get_available_ports(self) -> List[int]: """Get list of available USB ports""" return list(range( self.cluster_config.port_range_start, self.cluster_config.port_range_end + 1, 2 # Even numbers only for dongles )) def validate_stage_config(self, stage_config: Dict[str, Any]) -> Dict[str, Any]: """ Validate and normalize a stage configuration from UI Args: stage_config: Raw stage configuration from UI Returns: Validated and normalized configuration """ # Ensure required fields normalized = { 'name': stage_config.get('name', 'Unnamed Stage'), 'dongles': max(1, min(stage_config.get('dongles', 2), self.cluster_config.available_dongles)), 'port_ids': stage_config.get('port_ids', 'auto'), 'model_path': stage_config.get('model_path', ''), } # Auto-assign ports if needed if normalized['port_ids'] == 'auto': available_ports = self.get_available_ports() dongles_needed = normalized['dongles'] normalized['port_ids'] = ','.join(map(str, available_ports[:dongles_needed])) # Validate model path if normalized['model_path'] and not os.path.exists(normalized['model_path']): print(f"Warning: Model file not found: {normalized['model_path']}") return normalized def convert_ui_to_inference_config(self, ui_stages: List[Dict[str, Any]]) -> List[Dict[str, Any]]: """ Convert UI stage configurations to InferencePipeline StageConfig format Args: ui_stages: List of stage configurations from UI Returns: List of configurations ready for InferencePipeline """ inference_configs = [] for stage in ui_stages: validated = self.validate_stage_config(stage) # Parse port IDs if isinstance(validated['port_ids'], str): port_ids = [int(p.strip()) for p in validated['port_ids'].split(',') if p.strip()] else: port_ids = validated['port_ids'] config = { 'stage_id': validated['name'].lower().replace(' ', '_').replace('-', '_'), 'port_ids': port_ids, 'scpu_fw_path': self.ui_settings.default_fw_paths['scpu'], 'ncpu_fw_path': self.ui_settings.default_fw_paths['ncpu'], 'model_path': validated['model_path'] or f"default_{len(inference_configs)}.nef", 'upload_fw': True, 'max_queue_size': 50 } inference_configs.append(config) return inference_configs def estimate_performance(self, ui_stages: List[Dict[str, Any]]) -> Dict[str, Any]: """ Estimate performance metrics for given stage configurations Args: ui_stages: List of stage configurations from UI Returns: Performance metrics dictionary """ total_dongles = sum(stage.get('dongles', 2) for stage in ui_stages) # Performance estimation based on dongle series fps_per_dongle = { 'KL520': 30, 'KL720': 45, 'KL1080': 60 }.get(self.cluster_config.dongle_series, 30) stage_fps = [] stage_latencies = [] for stage in ui_stages: dongles = stage.get('dongles', 2) stage_fps_val = dongles * fps_per_dongle stage_latency = 1000 / stage_fps_val # ms stage_fps.append(stage_fps_val) stage_latencies.append(stage_latency) # Pipeline metrics pipeline_fps = min(stage_fps) if stage_fps else 0 total_latency = sum(stage_latencies) # Resource utilization utilization = (total_dongles / self.cluster_config.available_dongles) * 100 # Power estimation (simplified) estimated_power = total_dongles * 5 # 5W per dongle return { 'total_dongles': total_dongles, 'available_dongles': self.cluster_config.available_dongles, 'utilization_percent': utilization, 'pipeline_fps': pipeline_fps, 'total_latency': total_latency, 'stage_fps': stage_fps, 'stage_latencies': stage_latencies, 'estimated_power_watts': estimated_power, 'power_limit_watts': self.cluster_config.power_limit_watts, 'within_power_budget': estimated_power <= self.cluster_config.power_limit_watts } def generate_deployment_script(self, ui_stages: List[Dict[str, Any]], script_format: str = "python") -> str: """ Generate deployment script from UI configurations Args: ui_stages: List of stage configurations from UI script_format: Format for the script ("python", "json", "yaml") Returns: Generated script content """ inference_configs = self.convert_ui_to_inference_config(ui_stages) if script_format == "python": return self._generate_python_script(inference_configs) elif script_format == "json": return json.dumps({ "pipeline_name": "UI_Generated_Pipeline", "stages": inference_configs, "ui_settings": asdict(self.ui_settings), "cluster_config": asdict(self.cluster_config) }, indent=2) elif script_format == "yaml": return self._generate_yaml_script(inference_configs) else: raise ValueError(f"Unsupported script format: {script_format}") def _generate_python_script(self, inference_configs: List[Dict[str, Any]]) -> str: """Generate Python deployment script""" script = '''#!/usr/bin/env python3 """ Generated Deployment Script Created by cluster4npu UI """ import sys import os import time sys.path.append(os.path.join(os.path.dirname(__file__), 'src')) from src.cluster4npu.InferencePipeline import InferencePipeline, StageConfig def create_pipeline(): """Create and configure the inference pipeline""" stage_configs = [ ''' for config in inference_configs: script += f''' StageConfig( stage_id="{config['stage_id']}", port_ids={config['port_ids']}, scpu_fw_path="{config['scpu_fw_path']}", ncpu_fw_path="{config['ncpu_fw_path']}", model_path="{config['model_path']}", upload_fw={config['upload_fw']}, max_queue_size={config['max_queue_size']} ), ''' script += ''' ] return InferencePipeline(stage_configs, pipeline_name="UI_Generated_Pipeline") def main(): """Main execution function""" print("๐Ÿš€ Starting UI-generated pipeline...") pipeline = create_pipeline() try: print("โšก Initializing pipeline...") pipeline.initialize() print("โ–ถ๏ธ Starting pipeline...") pipeline.start() # Set up callbacks def handle_results(pipeline_data): print(f"๐Ÿ“Š Results: {pipeline_data.stage_results}") def handle_errors(pipeline_data): print(f"โŒ Error: {pipeline_data.stage_results}") pipeline.set_result_callback(handle_results) pipeline.set_error_callback(handle_errors) print("โœ… Pipeline running. Press Ctrl+C to stop.") # Run until interrupted while True: time.sleep(1) except KeyboardInterrupt: print("\\n๐Ÿ›‘ Stopping pipeline...") except Exception as e: print(f"โŒ Pipeline error: {e}") finally: pipeline.stop() print("โœ… Pipeline stopped.") if __name__ == "__main__": main() ''' return script def _generate_yaml_script(self, inference_configs: List[Dict[str, Any]]) -> str: """Generate YAML configuration""" yaml_content = '''# cluster4npu Pipeline Configuration # Generated by UI Application pipeline: name: "UI_Generated_Pipeline" stages: ''' for config in inference_configs: yaml_content += f''' - stage_id: "{config['stage_id']}" port_ids: {config['port_ids']} scpu_fw_path: "{config['scpu_fw_path']}" ncpu_fw_path: "{config['ncpu_fw_path']}" model_path: "{config['model_path']}" upload_fw: {str(config['upload_fw']).lower()} max_queue_size: {config['max_queue_size']} ''' yaml_content += f''' # Cluster Configuration cluster: available_dongles: {self.cluster_config.available_dongles} dongle_series: "{self.cluster_config.dongle_series}" power_limit_watts: {self.cluster_config.power_limit_watts} # UI Settings ui: theme: "{self.ui_settings.theme}" auto_save_interval: {self.ui_settings.auto_save_interval} ''' return yaml_content # Global integration instance ui_integration = UIIntegration() def get_integration() -> UIIntegration: """Get the global UI integration instance""" return ui_integration # Convenience functions for UI components def validate_stage_configs(ui_stages: List[Dict[str, Any]]) -> List[Dict[str, Any]]: """Validate UI stage configurations""" return [ui_integration.validate_stage_config(stage) for stage in ui_stages] def estimate_pipeline_performance(ui_stages: List[Dict[str, Any]]) -> Dict[str, Any]: """Estimate performance for UI stage configurations""" return ui_integration.estimate_performance(ui_stages) def export_pipeline_config(ui_stages: List[Dict[str, Any]], format_type: str = "python") -> str: """Export UI configurations to deployment scripts""" return ui_integration.generate_deployment_script(ui_stages, format_type) def get_available_ports() -> List[int]: """Get list of available dongle ports""" return ui_integration.get_available_ports() def save_ui_settings(): """Save current UI settings""" ui_integration.save_config() if __name__ == "__main__": # Test the integration print("๐Ÿงช Testing UI Integration...") # Sample UI stage configurations test_stages = [ {'name': 'Input Stage', 'dongles': 2, 'port_ids': 'auto', 'model_path': 'input.nef'}, {'name': 'Processing Stage', 'dongles': 4, 'port_ids': '32,34,36,38', 'model_path': 'process.nef'}, {'name': 'Output Stage', 'dongles': 2, 'port_ids': 'auto', 'model_path': 'output.nef'} ] # Test validation validated = validate_stage_configs(test_stages) print(f"โœ… Validated {len(validated)} stages") # Test performance estimation performance = estimate_pipeline_performance(test_stages) print(f"๐Ÿ“Š Pipeline FPS: {performance['pipeline_fps']:.1f}") print(f"๐Ÿ“Š Total Latency: {performance['total_latency']:.1f} ms") print(f"๐Ÿ“Š Power Usage: {performance['estimated_power_watts']} W") # Test script generation python_script = export_pipeline_config(test_stages, "python") print(f"๐Ÿ Generated Python script ({len(python_script)} chars)") json_config = export_pipeline_config(test_stages, "json") print(f"๐Ÿ“„ Generated JSON config ({len(json_config)} chars)") print("โœ… Integration test completed!")