Cluster/core/functions/demo_topology_clean.py
2025-07-17 17:04:56 +08:00

375 lines
15 KiB
Python

#!/usr/bin/env python3
"""
智慧拓撲排序算法演示 (獨立版本)
不依賴外部模組,純粹展示拓撲排序算法的核心功能
"""
import json
from typing import List, Dict, Any, Tuple
from collections import deque
class TopologyDemo:
"""演示拓撲排序算法的類別"""
def __init__(self):
self.stage_order = []
def analyze_pipeline(self, pipeline_data: Dict[str, Any]):
"""分析pipeline並執行拓撲排序"""
print("Starting intelligent pipeline topology analysis...")
# 提取模型節點
model_nodes = [node for node in pipeline_data.get('nodes', [])
if 'model' in node.get('type', '').lower()]
connections = pipeline_data.get('connections', [])
if not model_nodes:
print(" Warning: No model nodes found!")
return []
# 建立依賴圖
dependency_graph = self._build_dependency_graph(model_nodes, connections)
# 檢測循環
cycles = self._detect_cycles(dependency_graph)
if cycles:
print(f" Warning: Found {len(cycles)} cycles!")
dependency_graph = self._resolve_cycles(dependency_graph, cycles)
# 執行拓撲排序
sorted_stages = self._topological_sort_with_optimization(dependency_graph, model_nodes)
# 計算指標
metrics = self._calculate_pipeline_metrics(sorted_stages, dependency_graph)
self._display_pipeline_analysis(sorted_stages, metrics)
return sorted_stages
def _build_dependency_graph(self, model_nodes: List[Dict], connections: List[Dict]) -> Dict[str, Dict]:
"""建立依賴圖"""
print(" Building dependency graph...")
graph = {}
for node in model_nodes:
graph[node['id']] = {
'node': node,
'dependencies': set(),
'dependents': set(),
'depth': 0
}
# 分析連接
for conn in connections:
output_node_id = conn.get('output_node')
input_node_id = conn.get('input_node')
if output_node_id in graph and input_node_id in graph:
graph[input_node_id]['dependencies'].add(output_node_id)
graph[output_node_id]['dependents'].add(input_node_id)
dep_count = sum(len(data['dependencies']) for data in graph.values())
print(f" Graph built: {len(graph)} nodes, {dep_count} dependencies")
return graph
def _detect_cycles(self, graph: Dict[str, Dict]) -> List[List[str]]:
"""檢測循環"""
print(" Checking for dependency cycles...")
cycles = []
visited = set()
rec_stack = set()
def dfs_cycle_detect(node_id, path):
if node_id in rec_stack:
cycle_start = path.index(node_id)
cycle = path[cycle_start:] + [node_id]
cycles.append(cycle)
return True
if node_id in visited:
return False
visited.add(node_id)
rec_stack.add(node_id)
path.append(node_id)
for dependent in graph[node_id]['dependents']:
if dfs_cycle_detect(dependent, path):
return True
path.pop()
rec_stack.remove(node_id)
return False
for node_id in graph:
if node_id not in visited:
dfs_cycle_detect(node_id, [])
if cycles:
print(f" Warning: Found {len(cycles)} cycles")
else:
print(" No cycles detected")
return cycles
def _resolve_cycles(self, graph: Dict[str, Dict], cycles: List[List[str]]) -> Dict[str, Dict]:
"""解決循環"""
print(" Resolving dependency cycles...")
for cycle in cycles:
node_names = [graph[nid]['node']['name'] for nid in cycle]
print(f" Breaking cycle: {''.join(node_names)}")
if len(cycle) >= 2:
node_to_break = cycle[-2]
dependent_to_break = cycle[-1]
graph[dependent_to_break]['dependencies'].discard(node_to_break)
graph[node_to_break]['dependents'].discard(dependent_to_break)
print(f" Broke dependency: {graph[node_to_break]['node']['name']}{graph[dependent_to_break]['node']['name']}")
return graph
def _topological_sort_with_optimization(self, graph: Dict[str, Dict], model_nodes: List[Dict]) -> List[Dict]:
"""執行優化的拓撲排序"""
print(" Performing optimized topological sort...")
# 計算深度層級
self._calculate_depth_levels(graph)
# 按深度分組
depth_groups = self._group_by_depth(graph)
# 排序
sorted_nodes = []
for depth in sorted(depth_groups.keys()):
group_nodes = depth_groups[depth]
group_nodes.sort(key=lambda nid: (
len(graph[nid]['dependencies']),
-len(graph[nid]['dependents']),
graph[nid]['node']['name']
))
for node_id in group_nodes:
sorted_nodes.append(graph[node_id]['node'])
print(f" Sorted {len(sorted_nodes)} stages into {len(depth_groups)} execution levels")
return sorted_nodes
def _calculate_depth_levels(self, graph: Dict[str, Dict]):
"""計算深度層級"""
print(" Calculating execution depth levels...")
no_deps = [nid for nid, data in graph.items() if not data['dependencies']]
queue = deque([(nid, 0) for nid in no_deps])
while queue:
node_id, depth = queue.popleft()
if graph[node_id]['depth'] < depth:
graph[node_id]['depth'] = depth
for dependent in graph[node_id]['dependents']:
queue.append((dependent, depth + 1))
def _group_by_depth(self, graph: Dict[str, Dict]) -> Dict[int, List[str]]:
"""按深度分組"""
depth_groups = {}
for node_id, data in graph.items():
depth = data['depth']
if depth not in depth_groups:
depth_groups[depth] = []
depth_groups[depth].append(node_id)
return depth_groups
def _calculate_pipeline_metrics(self, sorted_stages: List[Dict], graph: Dict[str, Dict]) -> Dict[str, Any]:
"""計算指標"""
print(" Calculating pipeline metrics...")
total_stages = len(sorted_stages)
max_depth = max([data['depth'] for data in graph.values()]) + 1 if graph else 1
depth_distribution = {}
for data in graph.values():
depth = data['depth']
depth_distribution[depth] = depth_distribution.get(depth, 0) + 1
max_parallel = max(depth_distribution.values()) if depth_distribution else 1
critical_path = self._find_critical_path(graph)
return {
'total_stages': total_stages,
'pipeline_depth': max_depth,
'max_parallel_stages': max_parallel,
'parallelization_efficiency': (total_stages / max_depth) if max_depth > 0 else 1.0,
'critical_path_length': len(critical_path),
'critical_path': critical_path
}
def _find_critical_path(self, graph: Dict[str, Dict]) -> List[str]:
"""找出關鍵路徑"""
longest_path = []
def dfs_longest_path(node_id, current_path):
nonlocal longest_path
current_path.append(node_id)
if not graph[node_id]['dependents']:
if len(current_path) > len(longest_path):
longest_path = current_path.copy()
else:
for dependent in graph[node_id]['dependents']:
dfs_longest_path(dependent, current_path)
current_path.pop()
for node_id, data in graph.items():
if not data['dependencies']:
dfs_longest_path(node_id, [])
return longest_path
def _display_pipeline_analysis(self, sorted_stages: List[Dict], metrics: Dict[str, Any]):
"""顯示分析結果"""
print("\n" + "="*60)
print("INTELLIGENT PIPELINE TOPOLOGY ANALYSIS COMPLETE")
print("="*60)
print(f"Pipeline Metrics:")
print(f" Total Stages: {metrics['total_stages']}")
print(f" Pipeline Depth: {metrics['pipeline_depth']} levels")
print(f" Max Parallel Stages: {metrics['max_parallel_stages']}")
print(f" Parallelization Efficiency: {metrics['parallelization_efficiency']:.1%}")
print(f"\nOptimized Execution Order:")
for i, stage in enumerate(sorted_stages, 1):
print(f" {i:2d}. {stage['name']} (ID: {stage['id'][:8]}...)")
if metrics['critical_path']:
print(f"\nCritical Path ({metrics['critical_path_length']} stages):")
critical_names = []
for node_id in metrics['critical_path']:
node_name = next((stage['name'] for stage in sorted_stages if stage['id'] == node_id), 'Unknown')
critical_names.append(node_name)
print(f" {''.join(critical_names)}")
print(f"\nPerformance Insights:")
if metrics['parallelization_efficiency'] > 0.8:
print(" Excellent parallelization potential!")
elif metrics['parallelization_efficiency'] > 0.6:
print(" Good parallelization opportunities available")
else:
print(" Limited parallelization - consider pipeline redesign")
if metrics['pipeline_depth'] <= 3:
print(" Low latency pipeline - great for real-time applications")
elif metrics['pipeline_depth'] <= 6:
print(" Balanced pipeline depth - good throughput/latency trade-off")
else:
print(" Deep pipeline - optimized for maximum throughput")
print("="*60 + "\n")
def create_demo_pipelines():
"""創建演示用的pipeline"""
# Demo 1: 簡單線性pipeline
simple_pipeline = {
"project_name": "Simple Linear Pipeline",
"nodes": [
{"id": "model_001", "name": "Object Detection", "type": "ExactModelNode"},
{"id": "model_002", "name": "Fire Classification", "type": "ExactModelNode"},
{"id": "model_003", "name": "Result Verification", "type": "ExactModelNode"}
],
"connections": [
{"output_node": "model_001", "input_node": "model_002"},
{"output_node": "model_002", "input_node": "model_003"}
]
}
# Demo 2: 並行pipeline
parallel_pipeline = {
"project_name": "Parallel Processing Pipeline",
"nodes": [
{"id": "model_001", "name": "RGB Processor", "type": "ExactModelNode"},
{"id": "model_002", "name": "IR Processor", "type": "ExactModelNode"},
{"id": "model_003", "name": "Depth Processor", "type": "ExactModelNode"},
{"id": "model_004", "name": "Fusion Engine", "type": "ExactModelNode"}
],
"connections": [
{"output_node": "model_001", "input_node": "model_004"},
{"output_node": "model_002", "input_node": "model_004"},
{"output_node": "model_003", "input_node": "model_004"}
]
}
# Demo 3: 複雜多層pipeline
complex_pipeline = {
"project_name": "Advanced Multi-Stage Fire Detection Pipeline",
"nodes": [
{"id": "model_rgb_001", "name": "RGB Feature Extractor", "type": "ExactModelNode"},
{"id": "model_edge_002", "name": "Edge Feature Extractor", "type": "ExactModelNode"},
{"id": "model_thermal_003", "name": "Thermal Feature Extractor", "type": "ExactModelNode"},
{"id": "model_fusion_004", "name": "Feature Fusion", "type": "ExactModelNode"},
{"id": "model_attention_005", "name": "Attention Mechanism", "type": "ExactModelNode"},
{"id": "model_classifier_006", "name": "Fire Classifier", "type": "ExactModelNode"}
],
"connections": [
{"output_node": "model_rgb_001", "input_node": "model_fusion_004"},
{"output_node": "model_edge_002", "input_node": "model_fusion_004"},
{"output_node": "model_thermal_003", "input_node": "model_attention_005"},
{"output_node": "model_fusion_004", "input_node": "model_classifier_006"},
{"output_node": "model_attention_005", "input_node": "model_classifier_006"}
]
}
# Demo 4: 有循環的pipeline (測試循環檢測)
cycle_pipeline = {
"project_name": "Pipeline with Cycles (Testing)",
"nodes": [
{"id": "model_A", "name": "Model A", "type": "ExactModelNode"},
{"id": "model_B", "name": "Model B", "type": "ExactModelNode"},
{"id": "model_C", "name": "Model C", "type": "ExactModelNode"}
],
"connections": [
{"output_node": "model_A", "input_node": "model_B"},
{"output_node": "model_B", "input_node": "model_C"},
{"output_node": "model_C", "input_node": "model_A"} # 創建循環!
]
}
return [simple_pipeline, parallel_pipeline, complex_pipeline, cycle_pipeline]
def main():
"""主演示函數"""
print("INTELLIGENT PIPELINE TOPOLOGY SORTING DEMONSTRATION")
print("="*60)
print("This demo showcases our advanced pipeline analysis capabilities:")
print("• Automatic dependency resolution")
print("• Parallel execution optimization")
print("• Cycle detection and prevention")
print("• Critical path analysis")
print("• Performance metrics calculation")
print("="*60 + "\n")
demo = TopologyDemo()
pipelines = create_demo_pipelines()
demo_names = ["Simple Linear", "Parallel Processing", "Complex Multi-Stage", "Cycle Detection"]
for i, (pipeline, name) in enumerate(zip(pipelines, demo_names), 1):
print(f"DEMO {i}: {name} Pipeline")
print("="*50)
demo.analyze_pipeline(pipeline)
print("\n")
print("ALL DEMONSTRATIONS COMPLETED SUCCESSFULLY!")
print("Ready for production deployment and progress reporting!")
if __name__ == "__main__":
main()