- Add InferencePipeline: Multi-stage inference orchestrator with thread-safe queue management - Add Multidongle: Hardware abstraction layer for Kneron NPU devices - Add comprehensive UI framework with node-based pipeline editor - Add performance estimation and monitoring capabilities - Add extensive documentation and examples - Update project structure and dependencies 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
415 lines
14 KiB
Python
415 lines
14 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
UI Configuration and Integration Settings
|
|
=========================================
|
|
|
|
This module provides configuration settings and helper functions for integrating
|
|
the UI application with cluster4npu tools.
|
|
"""
|
|
|
|
import os
|
|
import json
|
|
from typing import Dict, List, Any, Optional
|
|
from dataclasses import dataclass, asdict
|
|
|
|
|
|
@dataclass
|
|
class UISettings:
|
|
"""UI application settings"""
|
|
theme: str = "harmonious_dark"
|
|
auto_save_interval: int = 300 # seconds
|
|
max_recent_files: int = 10
|
|
default_dongle_count: int = 16
|
|
default_fw_paths: Dict[str, str] = None
|
|
|
|
def __post_init__(self):
|
|
if self.default_fw_paths is None:
|
|
self.default_fw_paths = {
|
|
"scpu": "fw_scpu.bin",
|
|
"ncpu": "fw_ncpu.bin"
|
|
}
|
|
|
|
|
|
@dataclass
|
|
class ClusterConfig:
|
|
"""Cluster hardware configuration"""
|
|
available_dongles: int = 16
|
|
dongle_series: str = "KL520"
|
|
port_range_start: int = 28
|
|
port_range_end: int = 60
|
|
power_limit_watts: int = 200
|
|
cooling_type: str = "standard"
|
|
|
|
|
|
class UIIntegration:
|
|
"""Integration layer between UI and cluster4npu tools"""
|
|
|
|
def __init__(self, config_path: Optional[str] = None):
|
|
self.config_path = config_path or os.path.expanduser("~/.cluster4npu_ui_config.json")
|
|
self.ui_settings = UISettings()
|
|
self.cluster_config = ClusterConfig()
|
|
self.load_config()
|
|
|
|
def load_config(self):
|
|
"""Load configuration from file"""
|
|
try:
|
|
if os.path.exists(self.config_path):
|
|
with open(self.config_path, 'r') as f:
|
|
data = json.load(f)
|
|
|
|
if 'ui_settings' in data:
|
|
self.ui_settings = UISettings(**data['ui_settings'])
|
|
if 'cluster_config' in data:
|
|
self.cluster_config = ClusterConfig(**data['cluster_config'])
|
|
|
|
except Exception as e:
|
|
print(f"Warning: Could not load UI config: {e}")
|
|
|
|
def save_config(self):
|
|
"""Save configuration to file"""
|
|
try:
|
|
data = {
|
|
'ui_settings': asdict(self.ui_settings),
|
|
'cluster_config': asdict(self.cluster_config)
|
|
}
|
|
|
|
with open(self.config_path, 'w') as f:
|
|
json.dump(data, f, indent=2)
|
|
|
|
except Exception as e:
|
|
print(f"Warning: Could not save UI config: {e}")
|
|
|
|
def get_available_ports(self) -> List[int]:
|
|
"""Get list of available USB ports"""
|
|
return list(range(
|
|
self.cluster_config.port_range_start,
|
|
self.cluster_config.port_range_end + 1,
|
|
2 # Even numbers only for dongles
|
|
))
|
|
|
|
def validate_stage_config(self, stage_config: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""
|
|
Validate and normalize a stage configuration from UI
|
|
|
|
Args:
|
|
stage_config: Raw stage configuration from UI
|
|
|
|
Returns:
|
|
Validated and normalized configuration
|
|
"""
|
|
# Ensure required fields
|
|
normalized = {
|
|
'name': stage_config.get('name', 'Unnamed Stage'),
|
|
'dongles': max(1, min(stage_config.get('dongles', 2), self.cluster_config.available_dongles)),
|
|
'port_ids': stage_config.get('port_ids', 'auto'),
|
|
'model_path': stage_config.get('model_path', ''),
|
|
}
|
|
|
|
# Auto-assign ports if needed
|
|
if normalized['port_ids'] == 'auto':
|
|
available_ports = self.get_available_ports()
|
|
dongles_needed = normalized['dongles']
|
|
normalized['port_ids'] = ','.join(map(str, available_ports[:dongles_needed]))
|
|
|
|
# Validate model path
|
|
if normalized['model_path'] and not os.path.exists(normalized['model_path']):
|
|
print(f"Warning: Model file not found: {normalized['model_path']}")
|
|
|
|
return normalized
|
|
|
|
def convert_ui_to_inference_config(self, ui_stages: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
|
|
"""
|
|
Convert UI stage configurations to InferencePipeline StageConfig format
|
|
|
|
Args:
|
|
ui_stages: List of stage configurations from UI
|
|
|
|
Returns:
|
|
List of configurations ready for InferencePipeline
|
|
"""
|
|
inference_configs = []
|
|
|
|
for stage in ui_stages:
|
|
validated = self.validate_stage_config(stage)
|
|
|
|
# Parse port IDs
|
|
if isinstance(validated['port_ids'], str):
|
|
port_ids = [int(p.strip()) for p in validated['port_ids'].split(',') if p.strip()]
|
|
else:
|
|
port_ids = validated['port_ids']
|
|
|
|
config = {
|
|
'stage_id': validated['name'].lower().replace(' ', '_').replace('-', '_'),
|
|
'port_ids': port_ids,
|
|
'scpu_fw_path': self.ui_settings.default_fw_paths['scpu'],
|
|
'ncpu_fw_path': self.ui_settings.default_fw_paths['ncpu'],
|
|
'model_path': validated['model_path'] or f"default_{len(inference_configs)}.nef",
|
|
'upload_fw': True,
|
|
'max_queue_size': 50
|
|
}
|
|
|
|
inference_configs.append(config)
|
|
|
|
return inference_configs
|
|
|
|
def estimate_performance(self, ui_stages: List[Dict[str, Any]]) -> Dict[str, Any]:
|
|
"""
|
|
Estimate performance metrics for given stage configurations
|
|
|
|
Args:
|
|
ui_stages: List of stage configurations from UI
|
|
|
|
Returns:
|
|
Performance metrics dictionary
|
|
"""
|
|
total_dongles = sum(stage.get('dongles', 2) for stage in ui_stages)
|
|
|
|
# Performance estimation based on dongle series
|
|
fps_per_dongle = {
|
|
'KL520': 30,
|
|
'KL720': 45,
|
|
'KL1080': 60
|
|
}.get(self.cluster_config.dongle_series, 30)
|
|
|
|
stage_fps = []
|
|
stage_latencies = []
|
|
|
|
for stage in ui_stages:
|
|
dongles = stage.get('dongles', 2)
|
|
stage_fps_val = dongles * fps_per_dongle
|
|
stage_latency = 1000 / stage_fps_val # ms
|
|
|
|
stage_fps.append(stage_fps_val)
|
|
stage_latencies.append(stage_latency)
|
|
|
|
# Pipeline metrics
|
|
pipeline_fps = min(stage_fps) if stage_fps else 0
|
|
total_latency = sum(stage_latencies)
|
|
|
|
# Resource utilization
|
|
utilization = (total_dongles / self.cluster_config.available_dongles) * 100
|
|
|
|
# Power estimation (simplified)
|
|
estimated_power = total_dongles * 5 # 5W per dongle
|
|
|
|
return {
|
|
'total_dongles': total_dongles,
|
|
'available_dongles': self.cluster_config.available_dongles,
|
|
'utilization_percent': utilization,
|
|
'pipeline_fps': pipeline_fps,
|
|
'total_latency': total_latency,
|
|
'stage_fps': stage_fps,
|
|
'stage_latencies': stage_latencies,
|
|
'estimated_power_watts': estimated_power,
|
|
'power_limit_watts': self.cluster_config.power_limit_watts,
|
|
'within_power_budget': estimated_power <= self.cluster_config.power_limit_watts
|
|
}
|
|
|
|
def generate_deployment_script(self, ui_stages: List[Dict[str, Any]],
|
|
script_format: str = "python") -> str:
|
|
"""
|
|
Generate deployment script from UI configurations
|
|
|
|
Args:
|
|
ui_stages: List of stage configurations from UI
|
|
script_format: Format for the script ("python", "json", "yaml")
|
|
|
|
Returns:
|
|
Generated script content
|
|
"""
|
|
inference_configs = self.convert_ui_to_inference_config(ui_stages)
|
|
|
|
if script_format == "python":
|
|
return self._generate_python_script(inference_configs)
|
|
elif script_format == "json":
|
|
return json.dumps({
|
|
"pipeline_name": "UI_Generated_Pipeline",
|
|
"stages": inference_configs,
|
|
"ui_settings": asdict(self.ui_settings),
|
|
"cluster_config": asdict(self.cluster_config)
|
|
}, indent=2)
|
|
elif script_format == "yaml":
|
|
return self._generate_yaml_script(inference_configs)
|
|
else:
|
|
raise ValueError(f"Unsupported script format: {script_format}")
|
|
|
|
def _generate_python_script(self, inference_configs: List[Dict[str, Any]]) -> str:
|
|
"""Generate Python deployment script"""
|
|
script = '''#!/usr/bin/env python3
|
|
"""
|
|
Generated Deployment Script
|
|
Created by cluster4npu UI
|
|
"""
|
|
|
|
import sys
|
|
import os
|
|
import time
|
|
sys.path.append(os.path.join(os.path.dirname(__file__), 'src'))
|
|
|
|
from src.cluster4npu.InferencePipeline import InferencePipeline, StageConfig
|
|
|
|
def create_pipeline():
|
|
"""Create and configure the inference pipeline"""
|
|
stage_configs = [
|
|
'''
|
|
|
|
for config in inference_configs:
|
|
script += f''' StageConfig(
|
|
stage_id="{config['stage_id']}",
|
|
port_ids={config['port_ids']},
|
|
scpu_fw_path="{config['scpu_fw_path']}",
|
|
ncpu_fw_path="{config['ncpu_fw_path']}",
|
|
model_path="{config['model_path']}",
|
|
upload_fw={config['upload_fw']},
|
|
max_queue_size={config['max_queue_size']}
|
|
),
|
|
'''
|
|
|
|
script += ''' ]
|
|
|
|
return InferencePipeline(stage_configs, pipeline_name="UI_Generated_Pipeline")
|
|
|
|
def main():
|
|
"""Main execution function"""
|
|
print("🚀 Starting UI-generated pipeline...")
|
|
|
|
pipeline = create_pipeline()
|
|
|
|
try:
|
|
print("⚡ Initializing pipeline...")
|
|
pipeline.initialize()
|
|
|
|
print("▶️ Starting pipeline...")
|
|
pipeline.start()
|
|
|
|
# Set up callbacks
|
|
def handle_results(pipeline_data):
|
|
print(f"📊 Results: {pipeline_data.stage_results}")
|
|
|
|
def handle_errors(pipeline_data):
|
|
print(f"❌ Error: {pipeline_data.stage_results}")
|
|
|
|
pipeline.set_result_callback(handle_results)
|
|
pipeline.set_error_callback(handle_errors)
|
|
|
|
print("✅ Pipeline running. Press Ctrl+C to stop.")
|
|
|
|
# Run until interrupted
|
|
while True:
|
|
time.sleep(1)
|
|
|
|
except KeyboardInterrupt:
|
|
print("\\n🛑 Stopping pipeline...")
|
|
except Exception as e:
|
|
print(f"❌ Pipeline error: {e}")
|
|
finally:
|
|
pipeline.stop()
|
|
print("✅ Pipeline stopped.")
|
|
|
|
if __name__ == "__main__":
|
|
main()
|
|
'''
|
|
return script
|
|
|
|
def _generate_yaml_script(self, inference_configs: List[Dict[str, Any]]) -> str:
|
|
"""Generate YAML configuration"""
|
|
yaml_content = '''# cluster4npu Pipeline Configuration
|
|
# Generated by UI Application
|
|
|
|
pipeline:
|
|
name: "UI_Generated_Pipeline"
|
|
|
|
stages:
|
|
'''
|
|
|
|
for config in inference_configs:
|
|
yaml_content += f''' - stage_id: "{config['stage_id']}"
|
|
port_ids: {config['port_ids']}
|
|
scpu_fw_path: "{config['scpu_fw_path']}"
|
|
ncpu_fw_path: "{config['ncpu_fw_path']}"
|
|
model_path: "{config['model_path']}"
|
|
upload_fw: {str(config['upload_fw']).lower()}
|
|
max_queue_size: {config['max_queue_size']}
|
|
|
|
'''
|
|
|
|
yaml_content += f'''
|
|
# Cluster Configuration
|
|
cluster:
|
|
available_dongles: {self.cluster_config.available_dongles}
|
|
dongle_series: "{self.cluster_config.dongle_series}"
|
|
power_limit_watts: {self.cluster_config.power_limit_watts}
|
|
|
|
# UI Settings
|
|
ui:
|
|
theme: "{self.ui_settings.theme}"
|
|
auto_save_interval: {self.ui_settings.auto_save_interval}
|
|
'''
|
|
|
|
return yaml_content
|
|
|
|
|
|
# Global integration instance
|
|
ui_integration = UIIntegration()
|
|
|
|
|
|
def get_integration() -> UIIntegration:
|
|
"""Get the global UI integration instance"""
|
|
return ui_integration
|
|
|
|
|
|
# Convenience functions for UI components
|
|
def validate_stage_configs(ui_stages: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
|
|
"""Validate UI stage configurations"""
|
|
return [ui_integration.validate_stage_config(stage) for stage in ui_stages]
|
|
|
|
|
|
def estimate_pipeline_performance(ui_stages: List[Dict[str, Any]]) -> Dict[str, Any]:
|
|
"""Estimate performance for UI stage configurations"""
|
|
return ui_integration.estimate_performance(ui_stages)
|
|
|
|
|
|
def export_pipeline_config(ui_stages: List[Dict[str, Any]], format_type: str = "python") -> str:
|
|
"""Export UI configurations to deployment scripts"""
|
|
return ui_integration.generate_deployment_script(ui_stages, format_type)
|
|
|
|
|
|
def get_available_ports() -> List[int]:
|
|
"""Get list of available dongle ports"""
|
|
return ui_integration.get_available_ports()
|
|
|
|
|
|
def save_ui_settings():
|
|
"""Save current UI settings"""
|
|
ui_integration.save_config()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
# Test the integration
|
|
print("🧪 Testing UI Integration...")
|
|
|
|
# Sample UI stage configurations
|
|
test_stages = [
|
|
{'name': 'Input Stage', 'dongles': 2, 'port_ids': 'auto', 'model_path': 'input.nef'},
|
|
{'name': 'Processing Stage', 'dongles': 4, 'port_ids': '32,34,36,38', 'model_path': 'process.nef'},
|
|
{'name': 'Output Stage', 'dongles': 2, 'port_ids': 'auto', 'model_path': 'output.nef'}
|
|
]
|
|
|
|
# Test validation
|
|
validated = validate_stage_configs(test_stages)
|
|
print(f"✅ Validated {len(validated)} stages")
|
|
|
|
# Test performance estimation
|
|
performance = estimate_pipeline_performance(test_stages)
|
|
print(f"📊 Pipeline FPS: {performance['pipeline_fps']:.1f}")
|
|
print(f"📊 Total Latency: {performance['total_latency']:.1f} ms")
|
|
print(f"📊 Power Usage: {performance['estimated_power_watts']} W")
|
|
|
|
# Test script generation
|
|
python_script = export_pipeline_config(test_stages, "python")
|
|
print(f"🐍 Generated Python script ({len(python_script)} chars)")
|
|
|
|
json_config = export_pipeline_config(test_stages, "json")
|
|
print(f"📄 Generated JSON config ({len(json_config)} chars)")
|
|
|
|
print("✅ Integration test completed!") |