#!/usr/bin/env python3 """ UI Integration Example for cluster4npu Tools ============================================ This file demonstrates how to integrate the UI application with the core cluster4npu tools: - InferencePipeline - Multidongle - StageConfig Usage: python ui_integration_example.py This example shows how stage configurations from the UI can be converted to actual InferencePipeline configurations and executed. """ import sys import os sys.path.append(os.path.join(os.path.dirname(__file__), 'src')) try: from src.cluster4npu.InferencePipeline import InferencePipeline, StageConfig from src.cluster4npu.Multidongle import PreProcessor, PostProcessor CLUSTER4NPU_AVAILABLE = True except ImportError: print("cluster4npu modules not available - running in simulation mode") CLUSTER4NPU_AVAILABLE = False # Mock classes for demonstration class StageConfig: def __init__(self, **kwargs): self.__dict__.update(kwargs) class InferencePipeline: def __init__(self, stages, **kwargs): self.stages = stages def initialize(self): print("Mock: Initializing pipeline...") def start(self): print("Mock: Starting pipeline...") def stop(self): print("Mock: Stopping pipeline...") def convert_ui_config_to_pipeline(stage_configs): """ Convert UI stage configurations to InferencePipeline configurations Args: stage_configs: List of stage configurations from UI Returns: List of StageConfig objects for InferencePipeline """ pipeline_stages = [] for config in stage_configs: # Parse port IDs if config['port_ids'] == 'auto': # Auto-assign ports based on stage index stage_idx = stage_configs.index(config) port_ids = [28 + (stage_idx * 2), 30 + (stage_idx * 2)] else: # Parse comma-separated port IDs port_ids = [int(p.strip()) for p in config['port_ids'].split(',') if p.strip()] # Create StageConfig stage_config = StageConfig( stage_id=config['name'].lower().replace(' ', '_'), port_ids=port_ids, scpu_fw_path="fw_scpu.bin", # Default firmware paths ncpu_fw_path="fw_ncpu.bin", model_path=config['model_path'] or "default_model.nef", upload_fw=True, max_queue_size=50 ) pipeline_stages.append(stage_config) print(f"āœ“ Created stage: {config['name']}") print(f" - Dongles: {config['dongles']}") print(f" - Ports: {port_ids}") print(f" - Model: {config['model_path'] or 'default_model.nef'}") print() return pipeline_stages def create_sample_ui_config(): """Create a sample UI configuration for testing""" return [ { 'name': 'Input Processing', 'dongles': 2, 'port_ids': '28,30', 'model_path': 'models/input_processor.nef' }, { 'name': 'Main Inference', 'dongles': 4, 'port_ids': '32,34,36,38', 'model_path': 'models/main_model.nef' }, { 'name': 'Post Processing', 'dongles': 2, 'port_ids': 'auto', 'model_path': 'models/post_processor.nef' } ] def run_pipeline_from_ui_config(stage_configs): """ Run an InferencePipeline based on UI stage configurations Args: stage_configs: List of stage configurations from UI """ print("šŸš€ Converting UI Configuration to Pipeline...") print("=" * 50) # Convert UI config to pipeline stages pipeline_stages = convert_ui_config_to_pipeline(stage_configs) print(f"šŸ“Š Created {len(pipeline_stages)} pipeline stages") print() # Create and run pipeline try: print("šŸ”§ Initializing InferencePipeline...") pipeline = InferencePipeline( stage_configs=pipeline_stages, pipeline_name="UI_Generated_Pipeline" ) if CLUSTER4NPU_AVAILABLE: print("⚔ Starting pipeline (real hardware)...") pipeline.initialize() pipeline.start() # Set up result callback def handle_results(pipeline_data): print(f"šŸ“Š Pipeline Results: {pipeline_data.stage_results}") pipeline.set_result_callback(handle_results) print("āœ… Pipeline running! Press Ctrl+C to stop...") try: import time while True: time.sleep(1) except KeyboardInterrupt: print("\nšŸ›‘ Stopping pipeline...") pipeline.stop() print("āœ… Pipeline stopped successfully") else: print("šŸŽ­ Running in simulation mode...") pipeline.initialize() pipeline.start() # Simulate some processing import time for i in range(5): print(f"ā³ Processing frame {i+1}...") time.sleep(1) pipeline.stop() print("āœ… Simulation complete") except Exception as e: print(f"āŒ Error running pipeline: {e}") return False return True def calculate_performance_metrics(stage_configs): """ Calculate performance metrics based on stage configurations Args: stage_configs: List of stage configurations from UI Returns: Dict with performance metrics """ total_dongles = sum(config['dongles'] for config in stage_configs) # Simple performance estimation base_fps_per_dongle = 30 stage_fps = [] for config in stage_configs: stage_fps.append(config['dongles'] * base_fps_per_dongle) # Pipeline FPS is limited by slowest stage pipeline_fps = min(stage_fps) if stage_fps else 0 # Total latency is sum of stage latencies total_latency = sum(1000 / fps for fps in stage_fps) # ms return { 'total_dongles': total_dongles, 'pipeline_fps': pipeline_fps, 'total_latency': total_latency, 'stage_fps': stage_fps, 'bottleneck_stage': stage_configs[stage_fps.index(min(stage_fps))]['name'] if stage_fps else None } def export_configuration(stage_configs, format_type="python"): """ Export stage configuration to various formats Args: stage_configs: List of stage configurations from UI format_type: Export format ("python", "json", "yaml") """ if format_type == "python": return generate_python_script(stage_configs) elif format_type == "json": import json return json.dumps(stage_configs, indent=2) elif format_type == "yaml": yaml_content = "# Pipeline Configuration\nstages:\n" for config in stage_configs: yaml_content += f" - name: {config['name']}\n" yaml_content += f" dongles: {config['dongles']}\n" yaml_content += f" port_ids: '{config['port_ids']}'\n" yaml_content += f" model_path: '{config['model_path']}'\n" return yaml_content else: raise ValueError(f"Unsupported format: {format_type}") def generate_python_script(stage_configs): """Generate a standalone Python script from stage configurations""" script = '''#!/usr/bin/env python3 """ Generated Pipeline Script Auto-generated from UI configuration """ from src.cluster4npu.InferencePipeline import InferencePipeline, StageConfig import time def main(): # Stage configurations generated from UI stage_configs = [ ''' for config in stage_configs: port_ids = config['port_ids'].split(',') if ',' in config['port_ids'] else [28, 30] script += f''' StageConfig( stage_id="{config['name'].lower().replace(' ', '_')}", port_ids={port_ids}, scpu_fw_path="fw_scpu.bin", ncpu_fw_path="fw_ncpu.bin", model_path="{config['model_path']}", upload_fw=True, max_queue_size=50 ), ''' script += ''' ] # Create and run pipeline pipeline = InferencePipeline(stage_configs, pipeline_name="GeneratedPipeline") try: print("Initializing pipeline...") pipeline.initialize() print("Starting pipeline...") pipeline.start() def handle_results(pipeline_data): print(f"Results: {pipeline_data.stage_results}") pipeline.set_result_callback(handle_results) print("Pipeline running. Press Ctrl+C to stop.") while True: time.sleep(1) except KeyboardInterrupt: print("Stopping pipeline...") finally: pipeline.stop() print("Pipeline stopped.") if __name__ == "__main__": main() ''' return script def main(): """Main function demonstrating UI integration""" print("šŸŽÆ cluster4npu UI Integration Example") print("=" * 40) print() # Create sample configuration (as would come from UI) stage_configs = create_sample_ui_config() print("šŸ“‹ Sample UI Configuration:") for i, config in enumerate(stage_configs, 1): print(f" {i}. {config['name']}: {config['dongles']} dongles, ports {config['port_ids']}") print() # Calculate performance metrics metrics = calculate_performance_metrics(stage_configs) print("šŸ“Š Performance Metrics:") print(f" • Total Dongles: {metrics['total_dongles']}") print(f" • Pipeline FPS: {metrics['pipeline_fps']:.1f}") print(f" • Total Latency: {metrics['total_latency']:.1f} ms") print(f" • Bottleneck Stage: {metrics['bottleneck_stage']}") print() # Export configuration print("šŸ“„ Export Examples:") print("\n--- Python Script ---") python_script = export_configuration(stage_configs, "python") print(python_script[:300] + "...") print("\n--- JSON Config ---") json_config = export_configuration(stage_configs, "json") print(json_config) print("\n--- YAML Config ---") yaml_config = export_configuration(stage_configs, "yaml") print(yaml_config) # Ask user if they want to run the pipeline try: user_input = input("\nšŸš€ Run the pipeline? (y/N): ").strip().lower() if user_input == 'y': success = run_pipeline_from_ui_config(stage_configs) if success: print("āœ… Integration example completed successfully!") else: print("āŒ Integration example failed.") else: print("āœ… Integration example completed (pipeline not run).") except (KeyboardInterrupt, EOFError): print("\nāœ… Integration example completed.") if __name__ == "__main__": main()