Masonmason 080eb5b887 Add intelligent pipeline topology analysis and comprehensive UI framework
Major Features:
• Advanced topological sorting algorithm with cycle detection and resolution
• Intelligent pipeline optimization with parallelization analysis
• Critical path analysis and performance metrics calculation
• Comprehensive .mflow file converter for seamless UI-to-API integration
• Complete modular UI framework with node-based pipeline editor
• Enhanced model node properties (scpu_fw_path, ncpu_fw_path)
• Professional output formatting without emoji decorations

Technical Improvements:
• Graph theory algorithms (DFS, BFS, topological sort)
• Automatic dependency resolution and conflict prevention
• Multi-criteria pipeline optimization
• Real-time stage count calculation and validation
• Comprehensive configuration validation and error handling
• Modular architecture with clean separation of concerns

New Components:
• MFlow converter with topology analysis (core/functions/mflow_converter.py)
• Complete node system with exact property matching
• Pipeline editor with visual node connections
• Performance estimation and dongle management panels
• Comprehensive test suite and demonstration scripts

🤖 Generated with Claude Code (https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-07-10 12:58:47 +08:00

174 lines
5.9 KiB
Python

"""
Model node implementation for ML inference operations.
This module provides the ModelNode class which represents AI model inference
nodes in the pipeline. It handles model loading, hardware allocation, and
inference configuration for various NPU dongles.
Main Components:
- ModelNode: Core model inference node implementation
- Model configuration and validation
- Hardware dongle management
Usage:
from cluster4npu_ui.core.nodes.model_node import ModelNode
node = ModelNode()
node.set_property('model_path', '/path/to/model.onnx')
node.set_property('dongle_series', '720')
"""
from .base_node import BaseNodeWithProperties
class ModelNode(BaseNodeWithProperties):
"""
Model node for ML inference operations.
This node represents an AI model inference stage in the pipeline, handling
model loading, hardware allocation, and inference configuration.
"""
__identifier__ = 'com.cluster.model_node'
NODE_NAME = 'Model Node'
def __init__(self):
super().__init__()
# Setup node connections
self.add_input('input', multi_input=False, color=(255, 140, 0))
self.add_output('output', color=(0, 255, 0))
self.set_color(65, 84, 102)
# Initialize properties
self.setup_properties()
def setup_properties(self):
"""Initialize model-specific properties."""
# Model configuration
self.create_business_property('model_path', '', {
'type': 'file_path',
'filter': 'Model files (*.onnx *.tflite *.pb *.nef)',
'description': 'Path to the model file'
})
# Hardware configuration
self.create_business_property('dongle_series', '520', [
'520', '720', '1080', 'Custom'
])
self.create_business_property('num_dongles', 1, {
'min': 1,
'max': 16,
'description': 'Number of dongles to use for this model'
})
self.create_business_property('port_id', '', {
'placeholder': 'e.g., 8080 or auto',
'description': 'Port ID for dongle communication'
})
# Performance configuration
self.create_business_property('batch_size', 1, {
'min': 1,
'max': 32,
'description': 'Inference batch size'
})
self.create_business_property('max_queue_size', 10, {
'min': 1,
'max': 100,
'description': 'Maximum input queue size'
})
# Advanced options
self.create_business_property('enable_preprocessing', True, {
'description': 'Enable built-in preprocessing'
})
self.create_business_property('enable_postprocessing', True, {
'description': 'Enable built-in postprocessing'
})
def validate_configuration(self) -> tuple[bool, str]:
"""
Validate the current node configuration.
Returns:
Tuple of (is_valid, error_message)
"""
# Check model path
model_path = self.get_property('model_path')
if not model_path:
return False, "Model path is required"
# Check dongle series
dongle_series = self.get_property('dongle_series')
if dongle_series not in ['520', '720', '1080', 'Custom']:
return False, f"Invalid dongle series: {dongle_series}"
# Check number of dongles
num_dongles = self.get_property('num_dongles')
if not isinstance(num_dongles, int) or num_dongles < 1:
return False, "Number of dongles must be at least 1"
return True, ""
def get_inference_config(self) -> dict:
"""
Get inference configuration for pipeline execution.
Returns:
Dictionary containing inference configuration
"""
return {
'node_id': self.id,
'node_name': self.name(),
'model_path': self.get_property('model_path'),
'dongle_series': self.get_property('dongle_series'),
'num_dongles': self.get_property('num_dongles'),
'port_id': self.get_property('port_id'),
'batch_size': self.get_property('batch_size'),
'max_queue_size': self.get_property('max_queue_size'),
'enable_preprocessing': self.get_property('enable_preprocessing'),
'enable_postprocessing': self.get_property('enable_postprocessing')
}
def get_hardware_requirements(self) -> dict:
"""
Get hardware requirements for this model node.
Returns:
Dictionary containing hardware requirements
"""
return {
'dongle_series': self.get_property('dongle_series'),
'num_dongles': self.get_property('num_dongles'),
'port_id': self.get_property('port_id'),
'estimated_memory': self._estimate_memory_usage(),
'estimated_power': self._estimate_power_usage()
}
def _estimate_memory_usage(self) -> float:
"""Estimate memory usage in MB."""
# Simple estimation based on batch size and number of dongles
base_memory = 512 # Base memory in MB
batch_factor = self.get_property('batch_size') * 50
dongle_factor = self.get_property('num_dongles') * 100
return base_memory + batch_factor + dongle_factor
def _estimate_power_usage(self) -> float:
"""Estimate power usage in Watts."""
# Simple estimation based on dongle series and count
dongle_series = self.get_property('dongle_series')
num_dongles = self.get_property('num_dongles')
power_per_dongle = {
'520': 2.5,
'720': 3.5,
'1080': 5.0,
'Custom': 4.0
}
return power_per_dongle.get(dongle_series, 4.0) * num_dongles