cluster4npu/core/functions/multi_series_mflow_converter.py

443 lines
19 KiB
Python

"""
Enhanced MFlow to Multi-Series API Converter
This module extends the MFlowConverter to support multi-series dongle configurations
by detecting multi-series model nodes and generating appropriate configurations for
the MultiSeriesDongleManager.
Key Features:
- Detect multi-series enabled model nodes
- Generate MultiSeriesStageConfig objects
- Maintain backward compatibility with single-series configurations
- Validate multi-series folder structures
- Optimize pipeline for mixed single/multi-series stages
Usage:
from multi_series_mflow_converter import MultiSeriesMFlowConverter
converter = MultiSeriesMFlowConverter()
pipeline_config = converter.load_and_convert("pipeline.mflow")
# Automatically creates appropriate pipeline type
if pipeline_config.has_multi_series:
pipeline = MultiSeriesInferencePipeline(pipeline_config.stage_configs)
else:
pipeline = InferencePipeline(pipeline_config.stage_configs)
"""
import json
import os
from typing import List, Dict, Any, Tuple, Union
from dataclasses import dataclass
# Import base converter and pipeline components
from .mflow_converter import MFlowConverter, PipelineConfig
from .multi_series_pipeline import MultiSeriesStageConfig, MultiSeriesInferencePipeline
from .InferencePipeline import StageConfig
@dataclass
class EnhancedPipelineConfig:
"""Enhanced pipeline configuration supporting both single and multi-series"""
stage_configs: List[Union[StageConfig, MultiSeriesStageConfig]]
pipeline_name: str
description: str
input_config: Dict[str, Any]
output_config: Dict[str, Any]
preprocessing_configs: List[Dict[str, Any]]
postprocessing_configs: List[Dict[str, Any]]
has_multi_series: bool = False
multi_series_count: int = 0
class MultiSeriesMFlowConverter(MFlowConverter):
"""Enhanced converter supporting multi-series configurations"""
def __init__(self, default_fw_path: str = "./firmware", default_assets_path: str = "./assets"):
"""
Initialize enhanced converter
Args:
default_fw_path: Default path for single-series firmware files
default_assets_path: Default path for multi-series assets folder structure
"""
super().__init__(default_fw_path)
self.default_assets_path = default_assets_path
def load_and_convert(self, mflow_file_path: str) -> EnhancedPipelineConfig:
"""
Load .mflow file and convert to enhanced API configuration
Args:
mflow_file_path: Path to the .mflow file
Returns:
EnhancedPipelineConfig: Configuration supporting both single and multi-series
"""
with open(mflow_file_path, 'r') as f:
mflow_data = json.load(f)
return self._convert_mflow_to_enhanced_config(mflow_data)
def _convert_mflow_to_enhanced_config(self, mflow_data: Dict[str, Any]) -> EnhancedPipelineConfig:
"""Convert loaded .mflow data to EnhancedPipelineConfig"""
# Extract basic metadata
pipeline_name = mflow_data.get('project_name', 'Enhanced Pipeline')
description = mflow_data.get('description', '')
nodes = mflow_data.get('nodes', [])
connections = mflow_data.get('connections', [])
# Build node lookup and categorize nodes
self._build_node_map(nodes)
model_nodes, input_nodes, output_nodes, preprocess_nodes, postprocess_nodes = self._categorize_nodes()
# Determine stage order based on connections
self._determine_stage_order(model_nodes, connections)
# Create enhanced stage configs (supporting both single and multi-series)
stage_configs, has_multi_series, multi_series_count = self._create_enhanced_stage_configs(
model_nodes, preprocess_nodes, postprocess_nodes, connections
)
# Extract input/output configurations
input_config = self._extract_input_config(input_nodes)
output_config = self._extract_output_config(output_nodes)
# Extract preprocessing/postprocessing configurations
preprocessing_configs = self._extract_preprocessing_configs(preprocess_nodes)
postprocessing_configs = self._extract_postprocessing_configs(postprocess_nodes)
return EnhancedPipelineConfig(
stage_configs=stage_configs,
pipeline_name=pipeline_name,
description=description,
input_config=input_config,
output_config=output_config,
preprocessing_configs=preprocessing_configs,
postprocessing_configs=postprocessing_configs,
has_multi_series=has_multi_series,
multi_series_count=multi_series_count
)
def _create_enhanced_stage_configs(self, model_nodes: List[Dict], preprocess_nodes: List[Dict],
postprocess_nodes: List[Dict], connections: List[Dict]
) -> Tuple[List[Union[StageConfig, MultiSeriesStageConfig]], bool, int]:
"""
Create stage configurations supporting both single and multi-series modes
Returns:
Tuple of (stage_configs, has_multi_series, multi_series_count)
"""
stage_configs = []
has_multi_series = False
multi_series_count = 0
for node in self.stage_order:
# Extract node properties - check both 'custom_properties' and 'custom' keys for compatibility
node_properties = node.get('custom_properties', {})
if not node_properties:
node_properties = node.get('custom', {})
# Check if this node is configured for multi-series mode
if node_properties.get('multi_series_mode', False):
# Create multi-series stage config
stage_config = self._create_multi_series_stage_config(node, preprocess_nodes, postprocess_nodes, connections)
stage_configs.append(stage_config)
has_multi_series = True
multi_series_count += 1
print(f"Created multi-series stage config for node: {node.get('name', 'Unknown')}")
else:
# Create single-series stage config (backward compatibility)
stage_config = self._create_single_series_stage_config(node, preprocess_nodes, postprocess_nodes, connections)
stage_configs.append(stage_config)
print(f"Created single-series stage config for node: {node.get('name', 'Unknown')}")
return stage_configs, has_multi_series, multi_series_count
def _create_multi_series_stage_config(self, node: Dict, preprocess_nodes: List[Dict],
postprocess_nodes: List[Dict], connections: List[Dict]) -> MultiSeriesStageConfig:
"""Create multi-series stage configuration from model node"""
# Extract node properties - check both 'custom_properties' and 'custom' keys for compatibility
node_properties = node.get('custom_properties', {})
if not node_properties:
node_properties = node.get('custom', {})
stage_id = node.get('name', f"stage_{node.get('id', 'unknown')}")
# Extract assets folder and validate structure
assets_folder = node_properties.get('assets_folder', '')
if not assets_folder or not os.path.exists(assets_folder):
raise ValueError(f"Multi-series assets folder not found or not specified for node {stage_id}: {assets_folder}")
# Get enabled series
enabled_series = node_properties.get('enabled_series', ['520', '720'])
if not enabled_series:
raise ValueError(f"No series enabled for multi-series node {stage_id}")
# Build firmware and model paths
firmware_paths = {}
model_paths = {}
firmware_folder = os.path.join(assets_folder, 'Firmware')
models_folder = os.path.join(assets_folder, 'Models')
for series in enabled_series:
series_name = f'KL{series}'
# Firmware paths
series_fw_folder = os.path.join(firmware_folder, series_name)
if os.path.exists(series_fw_folder):
firmware_paths[series_name] = {
'scpu': os.path.join(series_fw_folder, 'fw_scpu.bin'),
'ncpu': os.path.join(series_fw_folder, 'fw_ncpu.bin')
}
# Model paths - find the first .nef file
series_model_folder = os.path.join(models_folder, series_name)
if os.path.exists(series_model_folder):
model_files = [f for f in os.listdir(series_model_folder) if f.endswith('.nef')]
if model_files:
model_paths[series_name] = os.path.join(series_model_folder, model_files[0])
# Validate paths
if not firmware_paths:
raise ValueError(f"No firmware found for multi-series node {stage_id} in enabled series: {enabled_series}")
if not model_paths:
raise ValueError(f"No models found for multi-series node {stage_id} in enabled series: {enabled_series}")
return MultiSeriesStageConfig(
stage_id=stage_id,
multi_series_mode=True,
firmware_paths=firmware_paths,
model_paths=model_paths,
max_queue_size=node_properties.get('max_queue_size', 100),
result_buffer_size=node_properties.get('result_buffer_size', 1000),
# TODO: Add preprocessor/postprocessor support if needed
)
def _create_single_series_stage_config(self, node: Dict, preprocess_nodes: List[Dict],
postprocess_nodes: List[Dict], connections: List[Dict]) -> MultiSeriesStageConfig:
"""Create single-series stage configuration for backward compatibility"""
# Extract node properties - check both 'custom_properties' and 'custom' keys for compatibility
node_properties = node.get('custom_properties', {})
if not node_properties:
node_properties = node.get('custom', {})
stage_id = node.get('name', f"stage_{node.get('id', 'unknown')}")
# Extract single-series paths
model_path = node_properties.get('model_path', '')
scpu_fw_path = node_properties.get('scpu_fw_path', '')
ncpu_fw_path = node_properties.get('ncpu_fw_path', '')
# Validate single-series configuration
if not model_path:
raise ValueError(f"Model path required for single-series node {stage_id}")
return MultiSeriesStageConfig(
stage_id=stage_id,
multi_series_mode=False,
port_ids=[], # Will be auto-detected
scpu_fw_path=scpu_fw_path,
ncpu_fw_path=ncpu_fw_path,
model_path=model_path,
upload_fw=True if scpu_fw_path and ncpu_fw_path else False,
max_queue_size=node_properties.get('max_queue_size', 50),
# TODO: Add preprocessor/postprocessor support if needed
)
def validate_enhanced_config(self, config: EnhancedPipelineConfig) -> Tuple[bool, List[str]]:
"""
Validate enhanced pipeline configuration
Returns:
Tuple of (is_valid, list_of_error_messages)
"""
errors = []
# Basic validation
if not config.stage_configs:
errors.append("No stages configured")
if not config.pipeline_name:
errors.append("Pipeline name is required")
# Validate each stage
for i, stage_config in enumerate(config.stage_configs):
stage_errors = self._validate_stage_config(stage_config, i)
errors.extend(stage_errors)
# Multi-series specific validation
if config.has_multi_series:
multi_series_errors = self._validate_multi_series_configuration(config)
errors.extend(multi_series_errors)
return len(errors) == 0, errors
def _validate_stage_config(self, stage_config: Union[StageConfig, MultiSeriesStageConfig], stage_index: int) -> List[str]:
"""Validate individual stage configuration"""
errors = []
stage_name = getattr(stage_config, 'stage_id', f'Stage {stage_index}')
if isinstance(stage_config, MultiSeriesStageConfig):
if stage_config.multi_series_mode:
# Validate multi-series configuration
if not stage_config.firmware_paths:
errors.append(f"{stage_name}: No firmware paths configured for multi-series mode")
if not stage_config.model_paths:
errors.append(f"{stage_name}: No model paths configured for multi-series mode")
# Validate file existence
for series_name, fw_paths in (stage_config.firmware_paths or {}).items():
scpu_path = fw_paths.get('scpu')
ncpu_path = fw_paths.get('ncpu')
if not scpu_path or not os.path.exists(scpu_path):
errors.append(f"{stage_name}: SCPU firmware not found for {series_name}: {scpu_path}")
if not ncpu_path or not os.path.exists(ncpu_path):
errors.append(f"{stage_name}: NCPU firmware not found for {series_name}: {ncpu_path}")
for series_name, model_path in (stage_config.model_paths or {}).items():
if not model_path or not os.path.exists(model_path):
errors.append(f"{stage_name}: Model not found for {series_name}: {model_path}")
else:
# Validate single-series configuration
if not stage_config.model_path:
errors.append(f"{stage_name}: Model path is required for single-series mode")
elif not os.path.exists(stage_config.model_path):
errors.append(f"{stage_name}: Model file not found: {stage_config.model_path}")
return errors
def _validate_multi_series_configuration(self, config: EnhancedPipelineConfig) -> List[str]:
"""Validate multi-series specific requirements"""
errors = []
# Check for mixed configurations
single_series_count = len(config.stage_configs) - config.multi_series_count
if config.multi_series_count > 0 and single_series_count > 0:
# Mixed pipeline - add warning
print(f"Warning: Mixed pipeline detected - {config.multi_series_count} multi-series stages and {single_series_count} single-series stages")
# Additional multi-series validations can be added here
return errors
def create_enhanced_inference_pipeline(self, config: EnhancedPipelineConfig) -> Union[MultiSeriesInferencePipeline, 'InferencePipeline']:
"""
Create appropriate inference pipeline based on configuration
Returns:
MultiSeriesInferencePipeline if multi-series stages detected, otherwise regular InferencePipeline
"""
if config.has_multi_series:
print(f"Creating MultiSeriesInferencePipeline with {config.multi_series_count} multi-series stages")
return MultiSeriesInferencePipeline(
stage_configs=config.stage_configs,
pipeline_name=config.pipeline_name
)
else:
print("Creating standard InferencePipeline (single-series only)")
# Convert to standard StageConfig objects for backward compatibility
from .InferencePipeline import InferencePipeline
standard_configs = []
for stage_config in config.stage_configs:
if isinstance(stage_config, MultiSeriesStageConfig) and not stage_config.multi_series_mode:
# Convert to standard StageConfig
standard_config = StageConfig(
stage_id=stage_config.stage_id,
port_ids=stage_config.port_ids or [],
scpu_fw_path=stage_config.scpu_fw_path or '',
ncpu_fw_path=stage_config.ncpu_fw_path or '',
model_path=stage_config.model_path or '',
upload_fw=stage_config.upload_fw,
max_queue_size=stage_config.max_queue_size
)
standard_configs.append(standard_config)
return InferencePipeline(
stage_configs=standard_configs,
pipeline_name=config.pipeline_name
)
def create_assets_folder_structure(base_path: str, series_list: List[str] = None):
"""
Create the recommended folder structure for multi-series assets
Args:
base_path: Root path where assets folder should be created
series_list: List of series to create folders for (default: ['520', '720', '630', '730', '540'])
"""
if series_list is None:
series_list = ['520', '720', '630', '730', '540']
assets_path = os.path.join(base_path, 'Assets')
firmware_path = os.path.join(assets_path, 'Firmware')
models_path = os.path.join(assets_path, 'Models')
# Create main directories
os.makedirs(firmware_path, exist_ok=True)
os.makedirs(models_path, exist_ok=True)
# Create series-specific directories
for series in series_list:
series_name = f'KL{series}'
os.makedirs(os.path.join(firmware_path, series_name), exist_ok=True)
os.makedirs(os.path.join(models_path, series_name), exist_ok=True)
# Create README file explaining the structure
readme_content = """
# Multi-Series Assets Folder Structure
This folder contains firmware and models organized by dongle series for multi-series inference.
## Structure:
```
Assets/
├── Firmware/
│ ├── KL520/
│ │ ├── fw_scpu.bin
│ │ └── fw_ncpu.bin
│ ├── KL720/
│ │ ├── fw_scpu.bin
│ │ └── fw_ncpu.bin
│ └── [other series...]
└── Models/
├── KL520/
│ └── [model.nef files]
├── KL720/
│ └── [model.nef files]
└── [other series...]
```
## Usage:
1. Place firmware files (fw_scpu.bin, fw_ncpu.bin) in the appropriate series subfolder under Firmware/
2. Place model files (.nef) in the appropriate series subfolder under Models/
3. Configure your model node to use this Assets folder in multi-series mode
4. Select which series to enable in the model node properties
## Supported Series:
- KL520: Entry-level performance
- KL720: Mid-range performance
- KL630: High performance
- KL730: Very high performance
- KL540: Specialized performance
The multi-series system will automatically load balance inference across all enabled series
based on their GOPS capacity for optimal performance.
"""
with open(os.path.join(assets_path, 'README.md'), 'w') as f:
f.write(readme_content.strip())
print(f"Multi-series assets folder structure created at: {assets_path}")
print("Please copy your firmware and model files to the appropriate series subfolders.")