""" MFlow to API Converter This module converts .mflow pipeline files from the UI app into the API format required by MultiDongle and InferencePipeline components. Key Features: - Parse .mflow JSON files - Convert UI node properties to API configurations - Generate StageConfig objects for InferencePipeline - Handle pipeline topology and stage ordering - Validate configurations and provide helpful error messages Usage: from mflow_converter import MFlowConverter converter = MFlowConverter() pipeline_config = converter.load_and_convert("pipeline.mflow") # Use with InferencePipeline inference_pipeline = InferencePipeline(pipeline_config.stage_configs) """ import json import os from typing import List, Dict, Any, Tuple, Optional from dataclasses import dataclass from .InferencePipeline import StageConfig, InferencePipeline from .Multidongle import PostProcessor, PostProcessorOptions, PostProcessType class DefaultProcessors: """Default preprocessing and postprocessing functions""" @staticmethod def resize_and_normalize(frame, target_size=(640, 480), normalize=True): """Default resize and normalize function""" import cv2 import numpy as np # Resize resized = cv2.resize(frame, target_size) # Normalize if requested if normalize: resized = resized.astype(np.float32) / 255.0 return resized @staticmethod def bgr_to_rgb(frame): """Convert BGR to RGB""" import cv2 return cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) @staticmethod def format_detection_output(results, confidence_threshold=0.5): """Format detection results""" formatted = [] for result in results: if result.get('confidence', 0) >= confidence_threshold: formatted.append({ 'class': result.get('class', 'unknown'), 'confidence': result.get('confidence', 0), 'bbox': result.get('bbox', [0, 0, 0, 0]) }) return formatted @dataclass class PipelineConfig: """Complete pipeline configuration ready for API use""" stage_configs: List[StageConfig] pipeline_name: str description: str input_config: Dict[str, Any] output_config: Dict[str, Any] preprocessing_configs: List[Dict[str, Any]] postprocessing_configs: List[Dict[str, Any]] class MFlowConverter: """Convert .mflow files to API configurations""" def __init__(self, default_fw_path: str = "./firmware"): """ Initialize converter Args: default_fw_path: Default path for firmware files if not specified """ self.default_fw_path = default_fw_path self.node_id_map = {} # Map node IDs to node objects self.stage_order = [] # Ordered list of model nodes (stages) def load_and_convert(self, mflow_file_path: str) -> PipelineConfig: """ Load .mflow file and convert to API configuration Args: mflow_file_path: Path to .mflow file Returns: PipelineConfig object ready for API use Raises: FileNotFoundError: If .mflow file doesn't exist ValueError: If .mflow format is invalid RuntimeError: If conversion fails """ if not os.path.exists(mflow_file_path): raise FileNotFoundError(f"MFlow file not found: {mflow_file_path}") with open(mflow_file_path, 'r', encoding='utf-8') as f: mflow_data = json.load(f) return self._convert_mflow_to_config(mflow_data) def _convert_mflow_to_config(self, mflow_data: Dict[str, Any]) -> PipelineConfig: """Convert loaded .mflow data to PipelineConfig""" # Extract basic metadata pipeline_name = mflow_data.get('project_name', 'Converted Pipeline') description = mflow_data.get('description', '') nodes = mflow_data.get('nodes', []) connections = mflow_data.get('connections', []) # Build node lookup and categorize nodes self._build_node_map(nodes) model_nodes, input_nodes, output_nodes, preprocess_nodes, postprocess_nodes = self._categorize_nodes() # Determine stage order based on connections self._determine_stage_order(model_nodes, connections) # Convert to StageConfig objects stage_configs = self._create_stage_configs(model_nodes, preprocess_nodes, postprocess_nodes, connections) # Extract input/output configurations input_config = self._extract_input_config(input_nodes) output_config = self._extract_output_config(output_nodes) # Extract preprocessing/postprocessing configurations preprocessing_configs = self._extract_preprocessing_configs(preprocess_nodes) postprocessing_configs = self._extract_postprocessing_configs(postprocess_nodes) return PipelineConfig( stage_configs=stage_configs, pipeline_name=pipeline_name, description=description, input_config=input_config, output_config=output_config, preprocessing_configs=preprocessing_configs, postprocessing_configs=postprocessing_configs ) def _build_node_map(self, nodes: List[Dict[str, Any]]): """Build lookup map for nodes by ID""" self.node_id_map = {node['id']: node for node in nodes} def _categorize_nodes(self) -> Tuple[List[Dict], List[Dict], List[Dict], List[Dict], List[Dict]]: """Categorize nodes by type""" model_nodes = [] input_nodes = [] output_nodes = [] preprocess_nodes = [] postprocess_nodes = [] for node in self.node_id_map.values(): node_type = node.get('type', '').lower() if 'model' in node_type: model_nodes.append(node) elif 'input' in node_type: input_nodes.append(node) elif 'output' in node_type: output_nodes.append(node) elif 'preprocess' in node_type: preprocess_nodes.append(node) elif 'postprocess' in node_type: postprocess_nodes.append(node) return model_nodes, input_nodes, output_nodes, preprocess_nodes, postprocess_nodes def _determine_stage_order(self, model_nodes: List[Dict], connections: List[Dict]): """ Advanced Topological Sorting Algorithm Analyzes connection dependencies to determine optimal pipeline execution order. Features: - Cycle detection and prevention - Parallel stage identification - Dependency depth analysis - Pipeline efficiency optimization """ print("Starting intelligent pipeline topology analysis...") # Build dependency graph dependency_graph = self._build_dependency_graph(model_nodes, connections) # Detect and handle cycles cycles = self._detect_cycles(dependency_graph) if cycles: print(f"Warning: Detected {len(cycles)} dependency cycles!") dependency_graph = self._resolve_cycles(dependency_graph, cycles) # Perform topological sort with parallel optimization sorted_stages = self._topological_sort_with_optimization(dependency_graph, model_nodes) # Calculate and display pipeline metrics metrics = self._calculate_pipeline_metrics(sorted_stages, dependency_graph) self._display_pipeline_analysis(sorted_stages, metrics) self.stage_order = sorted_stages def _build_dependency_graph(self, model_nodes: List[Dict], connections: List[Dict]) -> Dict[str, Dict]: """Build dependency graph from connections""" print(" Building dependency graph...") # Initialize graph with all model nodes graph = {} node_id_to_model = {node['id']: node for node in model_nodes} for node in model_nodes: graph[node['id']] = { 'node': node, 'dependencies': set(), # What this node depends on 'dependents': set(), # What depends on this node 'depth': 0, # Distance from input 'parallel_group': 0 # For parallel execution grouping } # Analyze connections to build dependencies for conn in connections: output_node_id = conn.get('output_node') input_node_id = conn.get('input_node') # Only consider connections between model nodes if output_node_id in graph and input_node_id in graph: graph[input_node_id]['dependencies'].add(output_node_id) graph[output_node_id]['dependents'].add(input_node_id) print(f" Graph built: {len(graph)} model nodes, {len([c for c in connections if c.get('output_node') in graph and c.get('input_node') in graph])} dependencies") return graph def _detect_cycles(self, graph: Dict[str, Dict]) -> List[List[str]]: """Detect dependency cycles using DFS""" print(" Checking for dependency cycles...") cycles = [] visited = set() rec_stack = set() def dfs_cycle_detect(node_id, path): if node_id in rec_stack: # Found cycle - extract the cycle from path cycle_start = path.index(node_id) cycle = path[cycle_start:] + [node_id] cycles.append(cycle) return True if node_id in visited: return False visited.add(node_id) rec_stack.add(node_id) path.append(node_id) for dependent in graph[node_id]['dependents']: if dfs_cycle_detect(dependent, path): return True path.pop() rec_stack.remove(node_id) return False for node_id in graph: if node_id not in visited: dfs_cycle_detect(node_id, []) if cycles: print(f" Warning: Found {len(cycles)} cycles") else: print(" No cycles detected") return cycles def _resolve_cycles(self, graph: Dict[str, Dict], cycles: List[List[str]]) -> Dict[str, Dict]: """Resolve dependency cycles by breaking weakest links""" print(" Resolving dependency cycles...") for cycle in cycles: print(f" Breaking cycle: {' → '.join([graph[nid]['node']['name'] for nid in cycle])}") # Find the "weakest" dependency to break (arbitrary for now) # In a real implementation, this could be based on model complexity, processing time, etc. if len(cycle) >= 2: node_to_break = cycle[-2] # Break the last dependency dependent_to_break = cycle[-1] graph[dependent_to_break]['dependencies'].discard(node_to_break) graph[node_to_break]['dependents'].discard(dependent_to_break) print(f" Broke dependency: {graph[node_to_break]['node']['name']} → {graph[dependent_to_break]['node']['name']}") return graph def _topological_sort_with_optimization(self, graph: Dict[str, Dict], model_nodes: List[Dict]) -> List[Dict]: """Advanced topological sort with parallel optimization""" print(" Performing optimized topological sort...") # Calculate depth levels for each node self._calculate_depth_levels(graph) # Group nodes by depth for parallel execution depth_groups = self._group_by_depth(graph) # Sort within each depth group by optimization criteria sorted_nodes = [] for depth in sorted(depth_groups.keys()): group_nodes = depth_groups[depth] # Sort by complexity/priority within the same depth group_nodes.sort(key=lambda nid: ( len(graph[nid]['dependencies']), # Fewer dependencies first -len(graph[nid]['dependents']), # More dependents first (critical path) graph[nid]['node']['name'] # Stable sort by name )) for node_id in group_nodes: sorted_nodes.append(graph[node_id]['node']) print(f" Sorted {len(sorted_nodes)} stages into {len(depth_groups)} execution levels") return sorted_nodes def _calculate_depth_levels(self, graph: Dict[str, Dict]): """Calculate depth levels using dynamic programming""" print(" Calculating execution depth levels...") # Find nodes with no dependencies (starting points) no_deps = [nid for nid, data in graph.items() if not data['dependencies']] # BFS to calculate depths from collections import deque queue = deque([(nid, 0) for nid in no_deps]) while queue: node_id, depth = queue.popleft() if graph[node_id]['depth'] < depth: graph[node_id]['depth'] = depth # Update dependents for dependent in graph[node_id]['dependents']: queue.append((dependent, depth + 1)) def _group_by_depth(self, graph: Dict[str, Dict]) -> Dict[int, List[str]]: """Group nodes by execution depth for parallel processing""" depth_groups = {} for node_id, data in graph.items(): depth = data['depth'] if depth not in depth_groups: depth_groups[depth] = [] depth_groups[depth].append(node_id) return depth_groups def _calculate_pipeline_metrics(self, sorted_stages: List[Dict], graph: Dict[str, Dict]) -> Dict[str, Any]: """Calculate pipeline performance metrics""" print(" Calculating pipeline metrics...") total_stages = len(sorted_stages) max_depth = max([data['depth'] for data in graph.values()]) + 1 if graph else 1 # Calculate parallelization potential depth_distribution = {} for data in graph.values(): depth = data['depth'] depth_distribution[depth] = depth_distribution.get(depth, 0) + 1 max_parallel = max(depth_distribution.values()) if depth_distribution else 1 avg_parallel = sum(depth_distribution.values()) / len(depth_distribution) if depth_distribution else 1 # Calculate critical path critical_path = self._find_critical_path(graph) metrics = { 'total_stages': total_stages, 'pipeline_depth': max_depth, 'max_parallel_stages': max_parallel, 'avg_parallel_stages': avg_parallel, 'parallelization_efficiency': (total_stages / max_depth) if max_depth > 0 else 1.0, 'critical_path_length': len(critical_path), 'critical_path': critical_path } return metrics def _find_critical_path(self, graph: Dict[str, Dict]) -> List[str]: """Find the critical path (longest dependency chain)""" longest_path = [] def dfs_longest_path(node_id, current_path): nonlocal longest_path current_path.append(node_id) if not graph[node_id]['dependents']: # Leaf node - check if this is the longest path if len(current_path) > len(longest_path): longest_path = current_path.copy() else: for dependent in graph[node_id]['dependents']: dfs_longest_path(dependent, current_path) current_path.pop() # Start from nodes with no dependencies for node_id, data in graph.items(): if not data['dependencies']: dfs_longest_path(node_id, []) return longest_path def _display_pipeline_analysis(self, sorted_stages: List[Dict], metrics: Dict[str, Any]): """Display pipeline analysis results""" print("\n" + "="*60) print("INTELLIGENT PIPELINE TOPOLOGY ANALYSIS COMPLETE") print("="*60) print(f"Pipeline Metrics:") print(f" Total Stages: {metrics['total_stages']}") print(f" Pipeline Depth: {metrics['pipeline_depth']} levels") print(f" Max Parallel Stages: {metrics['max_parallel_stages']}") print(f" Parallelization Efficiency: {metrics['parallelization_efficiency']:.1%}") print(f"\nOptimized Execution Order:") for i, stage in enumerate(sorted_stages, 1): print(f" {i:2d}. {stage['name']} (ID: {stage['id'][:8]}...)") if metrics['critical_path']: print(f"\nCritical Path ({metrics['critical_path_length']} stages):") critical_names = [] for node_id in metrics['critical_path']: node_name = next((stage['name'] for stage in sorted_stages if stage['id'] == node_id), 'Unknown') critical_names.append(node_name) print(f" {' → '.join(critical_names)}") print(f"\nPerformance Insights:") if metrics['parallelization_efficiency'] > 0.8: print(" Excellent parallelization potential!") elif metrics['parallelization_efficiency'] > 0.6: print(" Good parallelization opportunities available") else: print(" Limited parallelization - consider pipeline redesign") if metrics['pipeline_depth'] <= 3: print(" Low latency pipeline - great for real-time applications") elif metrics['pipeline_depth'] <= 6: print(" Balanced pipeline depth - good throughput/latency trade-off") else: print(" Deep pipeline - optimized for maximum throughput") print("="*60 + "\n") def _build_multi_series_config_from_properties(self, properties: Dict[str, Any]) -> Dict[str, Any]: """Build multi-series configuration from node properties""" try: enabled_series = properties.get('enabled_series', []) assets_folder = properties.get('assets_folder', '') if not enabled_series: print("Warning: No enabled_series found in multi-series mode") return {} multi_series_config = {} for series in enabled_series: # Get port IDs for this series port_ids_str = properties.get(f'kl{series}_port_ids', '') if not port_ids_str or not port_ids_str.strip(): print(f"Warning: No port IDs configured for KL{series}") continue # Parse port IDs (comma-separated string to list of integers) try: port_ids = [int(pid.strip()) for pid in port_ids_str.split(',') if pid.strip()] if not port_ids: continue except ValueError: print(f"Warning: Invalid port IDs for KL{series}: {port_ids_str}") continue # Build series configuration series_config = { "port_ids": port_ids } # Add model path if assets folder is configured if assets_folder: import os model_folder = os.path.join(assets_folder, 'Models', f'KL{series}') if os.path.exists(model_folder): # Look for .nef files in the model folder nef_files = [f for f in os.listdir(model_folder) if f.endswith('.nef')] if nef_files: series_config["model_path"] = os.path.join(model_folder, nef_files[0]) print(f"Found model for KL{series}: {series_config['model_path']}") # Add firmware paths if available firmware_folder = os.path.join(assets_folder, 'Firmware', f'KL{series}') if os.path.exists(firmware_folder): scpu_path = os.path.join(firmware_folder, 'fw_scpu.bin') ncpu_path = os.path.join(firmware_folder, 'fw_ncpu.bin') if os.path.exists(scpu_path) and os.path.exists(ncpu_path): series_config["firmware_paths"] = { "scpu": scpu_path, "ncpu": ncpu_path } print(f"Found firmware for KL{series}: scpu={scpu_path}, ncpu={ncpu_path}") multi_series_config[f'KL{series}'] = series_config print(f"Configured KL{series} with {len(port_ids)} devices on ports {port_ids}") return multi_series_config if multi_series_config else {} except Exception as e: print(f"Error building multi-series config from properties: {e}") return {} def _create_stage_configs(self, model_nodes: List[Dict], preprocess_nodes: List[Dict], postprocess_nodes: List[Dict], connections: List[Dict]) -> List[StageConfig]: """Create StageConfig objects for each model node with postprocessing support""" stage_configs = [] # Build connection mapping for efficient lookup connection_map = {} for conn in connections: output_node_id = conn.get('output_node') input_node_id = conn.get('input_node') if output_node_id not in connection_map: connection_map[output_node_id] = [] connection_map[output_node_id].append(input_node_id) for i, model_node in enumerate(self.stage_order): properties = model_node.get('properties', {}) # Extract configuration from UI properties stage_id = f"stage_{i+1}_{model_node.get('name', 'unknown').replace(' ', '_')}" # Convert port_id to list format port_id_str = properties.get('port_id', '').strip() if port_id_str: try: # Handle comma-separated port IDs port_ids = [int(p.strip()) for p in port_id_str.split(',') if p.strip()] except ValueError: print(f"Warning: Invalid port_id format '{port_id_str}', using default [28]") port_ids = [32] # Default port else: port_ids = [32] # Default port # Model path model_path = properties.get('model_path', '') if not model_path: print(f"Warning: No model_path specified for {model_node.get('name')}") # Firmware paths from UI properties scpu_fw_path = properties.get('scpu_fw_path', os.path.join(self.default_fw_path, 'fw_scpu.bin')) ncpu_fw_path = properties.get('ncpu_fw_path', os.path.join(self.default_fw_path, 'fw_ncpu.bin')) # Upload firmware flag upload_fw = properties.get('upload_fw', False) # Queue size max_queue_size = properties.get('max_queue_size', 50) # Find connected postprocessing node stage_postprocessor = None model_node_id = model_node.get('id') if model_node_id and model_node_id in connection_map: connected_nodes = connection_map[model_node_id] # Look for postprocessing nodes among connected nodes for connected_id in connected_nodes: for postprocess_node in postprocess_nodes: if postprocess_node.get('id') == connected_id: # Found a connected postprocessing node postprocess_props = postprocess_node.get('properties', {}) # Extract postprocessing configuration postprocess_type_str = postprocess_props.get('postprocess_type', 'fire_detection') confidence_threshold = postprocess_props.get('confidence_threshold', 0.5) nms_threshold = postprocess_props.get('nms_threshold', 0.5) max_detections = postprocess_props.get('max_detections', 100) class_names_str = postprocess_props.get('class_names', '') # Parse class names from node (highest priority) if isinstance(class_names_str, str) and class_names_str.strip(): class_names = [name.strip() for name in class_names_str.split(',') if name.strip()] else: class_names = [] # Map string to PostProcessType enum type_mapping = { 'fire_detection': PostProcessType.FIRE_DETECTION, 'yolo_v3': PostProcessType.YOLO_V3, 'yolo_v5': PostProcessType.YOLO_V5, 'classification': PostProcessType.CLASSIFICATION, 'raw_output': PostProcessType.RAW_OUTPUT } postprocess_type = type_mapping.get(postprocess_type_str, PostProcessType.FIRE_DETECTION) # Smart defaults for YOLOv5 labels when none provided if postprocess_type == PostProcessType.YOLO_V5 and not class_names: # Try to load labels near the model file loaded = self._load_labels_for_model(model_path) if loaded: class_names = loaded else: # Fallback to COCO-80 class_names = self._default_coco_labels() print(f"Found postprocessing for {stage_id}: type={postprocess_type.value}, threshold={confidence_threshold}, classes={len(class_names)}") # Create PostProcessorOptions and PostProcessor try: postprocess_options = PostProcessorOptions( postprocess_type=postprocess_type, threshold=confidence_threshold, class_names=class_names, nms_threshold=nms_threshold, max_detections_per_class=max_detections ) stage_postprocessor = PostProcessor(postprocess_options) except Exception as e: print(f"Warning: Failed to create postprocessor for {stage_id}: {e}") break # Use the first postprocessing node found if stage_postprocessor is None: print(f"No postprocessing node found for {stage_id}, using default") # Check if multi-series mode is enabled multi_series_mode = properties.get('multi_series_mode', False) multi_series_config = None if multi_series_mode: # Build multi-series config from node properties multi_series_config = self._build_multi_series_config_from_properties(properties) print(f"Multi-series config for {stage_id}: {multi_series_config}") # Create StageConfig for multi-series mode stage_config = StageConfig( stage_id=stage_id, port_ids=[], # Will be handled by multi_series_config scpu_fw_path='', # Will be handled by multi_series_config ncpu_fw_path='', # Will be handled by multi_series_config model_path='', # Will be handled by multi_series_config upload_fw=upload_fw, max_queue_size=max_queue_size, multi_series_config=multi_series_config, stage_postprocessor=stage_postprocessor ) else: # Create StageConfig for single-series mode (legacy) stage_config = StageConfig( stage_id=stage_id, port_ids=port_ids, scpu_fw_path=scpu_fw_path, ncpu_fw_path=ncpu_fw_path, model_path=model_path, upload_fw=upload_fw, max_queue_size=max_queue_size, multi_series_config=None, stage_postprocessor=stage_postprocessor ) stage_configs.append(stage_config) return stage_configs def _extract_input_config(self, input_nodes: List[Dict]) -> Dict[str, Any]: """Extract input configuration from input nodes""" if not input_nodes: return {} # Use the first input node input_node = input_nodes[0] properties = input_node.get('properties', {}) return { 'source_type': properties.get('source_type', 'Camera'), 'device_id': properties.get('device_id', 0), 'source_path': properties.get('source_path', ''), 'resolution': properties.get('resolution', '1920x1080'), 'fps': properties.get('fps', 30) } def _extract_output_config(self, output_nodes: List[Dict]) -> Dict[str, Any]: """Extract output configuration from output nodes""" if not output_nodes: return {} # Use the first output node output_node = output_nodes[0] properties = output_node.get('properties', {}) return { 'output_type': properties.get('output_type', 'File'), 'format': properties.get('format', 'JSON'), 'destination': properties.get('destination', ''), 'save_interval': properties.get('save_interval', 1.0) } def _extract_preprocessing_configs(self, preprocess_nodes: List[Dict]) -> List[Dict[str, Any]]: """Extract preprocessing configurations""" configs = [] for node in preprocess_nodes: properties = node.get('properties', {}) config = { 'resize_width': properties.get('resize_width', 640), 'resize_height': properties.get('resize_height', 480), 'normalize': properties.get('normalize', True), 'crop_enabled': properties.get('crop_enabled', False), 'operations': properties.get('operations', 'resize,normalize') } configs.append(config) return configs # ---------- Label helpers ---------- def _load_labels_for_model(self, model_path: str) -> Optional[List[str]]: """Attempt to load class labels from files near the model path. Priority: .names -> names.txt -> classes.txt -> labels.txt -> data.yaml/dataset.yaml (names) Returns None if not found. """ try: if not model_path: return None base = os.path.splitext(model_path)[0] dir_ = os.path.dirname(model_path) candidates = [ f"{base}.names", os.path.join(dir_, 'names.txt'), os.path.join(dir_, 'classes.txt'), os.path.join(dir_, 'labels.txt'), os.path.join(dir_, 'data.yaml'), os.path.join(dir_, 'dataset.yaml'), ] for path in candidates: if os.path.exists(path): if path.lower().endswith('.yaml'): labels = self._load_labels_from_yaml(path) else: labels = self._load_labels_from_lines(path) if labels: print(f"Loaded {len(labels)} labels from {os.path.basename(path)}") return labels except Exception as e: print(f"Warning: failed loading labels near model: {e}") return None def _load_labels_from_lines(self, path: str) -> List[str]: try: with open(path, 'r', encoding='utf-8') as f: lines = [ln.strip() for ln in f.readlines()] return [ln for ln in lines if ln and not ln.startswith('#')] except Exception: return [] def _load_labels_from_yaml(self, path: str) -> List[str]: # Try PyYAML if available; else fallback to simple parse try: import yaml # type: ignore with open(path, 'r', encoding='utf-8') as f: data = yaml.safe_load(f) names = data.get('names') if isinstance(data, dict) else None if isinstance(names, dict): # Ordered by key if numeric, else values items = sorted(names.items(), key=lambda kv: int(kv[0]) if str(kv[0]).isdigit() else kv[0]) return [str(v) for _, v in items] elif isinstance(names, list): return [str(x) for x in names] except Exception: pass # Minimal fallback: naive scan try: with open(path, 'r', encoding='utf-8') as f: content = f.read() if 'names:' in content: after = content.split('names:', 1)[1] # Look for block list lines = [ln.strip() for ln in after.splitlines()] block = [] for ln in lines: if ln.startswith('- '): block.append(ln[2:].strip()) elif block: break if block: return block # Look for bracket list if '[' in after and ']' in after: inside = after.split('[', 1)[1].split(']', 1)[0] return [x.strip().strip('"\'') for x in inside.split(',') if x.strip()] except Exception: pass return [] def _default_coco_labels(self) -> List[str]: # Standard COCO 80 class names return [ 'person', 'bicycle', 'car', 'motorbike', 'aeroplane', 'bus', 'train', 'truck', 'boat', 'traffic light', 'fire hydrant', 'stop sign', 'parking meter', 'bench', 'bird', 'cat', 'dog', 'horse', 'sheep', 'cow', 'elephant', 'bear', 'zebra', 'giraffe', 'backpack', 'umbrella', 'handbag', 'tie', 'suitcase', 'frisbee', 'skis', 'snowboard', 'sports ball', 'kite', 'baseball bat', 'baseball glove', 'skateboard', 'surfboard', 'tennis racket', 'bottle', 'wine glass', 'cup', 'fork', 'knife', 'spoon', 'bowl', 'banana', 'apple', 'sandwich', 'orange', 'broccoli', 'carrot', 'hot dog', 'pizza', 'donut', 'cake', 'chair', 'sofa', 'pottedplant', 'bed', 'diningtable', 'toilet', 'tvmonitor', 'laptop', 'mouse', 'remote', 'keyboard', 'cell phone', 'microwave', 'oven', 'toaster', 'sink', 'refrigerator', 'book', 'clock', 'vase', 'scissors', 'teddy bear', 'hair drier', 'toothbrush' ] def _extract_postprocessing_configs(self, postprocess_nodes: List[Dict]) -> List[Dict[str, Any]]: """Extract postprocessing configurations""" configs = [] for node in postprocess_nodes: properties = node.get('properties', {}) config = { 'output_format': properties.get('output_format', 'JSON'), 'confidence_threshold': properties.get('confidence_threshold', 0.5), 'nms_threshold': properties.get('nms_threshold', 0.4), 'max_detections': properties.get('max_detections', 100) } configs.append(config) return configs def create_inference_pipeline(self, config: PipelineConfig) -> InferencePipeline: """ Create InferencePipeline instance from PipelineConfig Args: config: PipelineConfig object Returns: Configured InferencePipeline instance """ return InferencePipeline( stage_configs=config.stage_configs, pipeline_name=config.pipeline_name ) def validate_config(self, config: PipelineConfig) -> Tuple[bool, List[str]]: """ Validate pipeline configuration Args: config: PipelineConfig to validate Returns: (is_valid, error_messages) """ errors = [] # Check if we have at least one stage if not config.stage_configs: errors.append("Pipeline must have at least one stage (model node)") # Validate each stage config for i, stage_config in enumerate(config.stage_configs): stage_errors = self._validate_stage_config(stage_config, i+1) errors.extend(stage_errors) return len(errors) == 0, errors def _validate_stage_config(self, stage_config: StageConfig, stage_num: int) -> List[str]: """Validate individual stage configuration""" errors = [] # Check if this is multi-series configuration if stage_config.multi_series_config: # Multi-series validation errors.extend(self._validate_multi_series_config(stage_config.multi_series_config, stage_num)) else: # Single-series validation (legacy) # Check model path if not stage_config.model_path: errors.append(f"Stage {stage_num}: Model path is required") elif not os.path.exists(stage_config.model_path): errors.append(f"Stage {stage_num}: Model file not found: {stage_config.model_path}") # Check firmware paths if upload_fw is True if stage_config.upload_fw: if not os.path.exists(stage_config.scpu_fw_path): errors.append(f"Stage {stage_num}: SCPU firmware not found: {stage_config.scpu_fw_path}") if not os.path.exists(stage_config.ncpu_fw_path): errors.append(f"Stage {stage_num}: NCPU firmware not found: {stage_config.ncpu_fw_path}") # Check port IDs if not stage_config.port_ids: errors.append(f"Stage {stage_num}: At least one port ID is required") return errors def _validate_multi_series_config(self, multi_series_config: Dict[str, Any], stage_num: int) -> List[str]: """Validate multi-series configuration""" errors = [] if not multi_series_config: errors.append(f"Stage {stage_num}: Multi-series configuration is empty") return errors print(f"Validating multi-series config for stage {stage_num}: {list(multi_series_config.keys())}") # Check each series configuration for series_name, series_config in multi_series_config.items(): if not isinstance(series_config, dict): errors.append(f"Stage {stage_num}: Invalid configuration for {series_name}") continue # Check port IDs port_ids = series_config.get('port_ids', []) if not port_ids: errors.append(f"Stage {stage_num}: {series_name} has no port IDs configured") continue if not isinstance(port_ids, list) or not all(isinstance(p, int) for p in port_ids): errors.append(f"Stage {stage_num}: {series_name} port IDs must be a list of integers") continue print(f" {series_name}: {len(port_ids)} ports configured") # Check model path model_path = series_config.get('model_path') if model_path: if not os.path.exists(model_path): errors.append(f"Stage {stage_num}: {series_name} model file not found: {model_path}") else: print(f" {series_name}: Model validated: {model_path}") else: print(f" {series_name}: No model path specified (optional for multi-series)") # Check firmware paths if specified firmware_paths = series_config.get('firmware_paths') if firmware_paths and isinstance(firmware_paths, dict): scpu_path = firmware_paths.get('scpu') ncpu_path = firmware_paths.get('ncpu') if scpu_path and not os.path.exists(scpu_path): errors.append(f"Stage {stage_num}: {series_name} SCPU firmware not found: {scpu_path}") elif scpu_path: print(f" {series_name}: SCPU firmware validated: {scpu_path}") if ncpu_path and not os.path.exists(ncpu_path): errors.append(f"Stage {stage_num}: {series_name} NCPU firmware not found: {ncpu_path}") elif ncpu_path: print(f" {series_name}: NCPU firmware validated: {ncpu_path}") if not errors: print(f"Stage {stage_num}: Multi-series configuration validation passed") return errors def convert_mflow_file(mflow_path: str, firmware_path: str = "./firmware") -> PipelineConfig: """ Convenience function to convert a .mflow file Args: mflow_path: Path to .mflow file firmware_path: Path to firmware directory Returns: PipelineConfig ready for API use """ converter = MFlowConverter(default_fw_path=firmware_path) return converter.load_and_convert(mflow_path) if __name__ == "__main__": # Example usage import sys if len(sys.argv) < 2: print("Usage: python mflow_converter.py [firmware_path]") sys.exit(1) mflow_file = sys.argv[1] firmware_path = sys.argv[2] if len(sys.argv) > 2 else "./firmware" try: converter = MFlowConverter(default_fw_path=firmware_path) config = converter.load_and_convert(mflow_file) print(f"Converted pipeline: {config.pipeline_name}") print(f"Stages: {len(config.stage_configs)}") # Validate configuration is_valid, errors = converter.validate_config(config) if is_valid: print("✓ Configuration is valid") # Create pipeline instance pipeline = converter.create_inference_pipeline(config) print(f"✓ InferencePipeline created: {pipeline.pipeline_name}") else: print("✗ Configuration has errors:") for error in errors: print(f" - {error}") except Exception as e: print(f"Error: {e}") sys.exit(1)