172 lines
5.7 KiB
Python
172 lines
5.7 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Test script for pipeline editor integration into dashboard.
|
|
|
|
This script tests the integration of pipeline_editor.py functionality
|
|
into the dashboard.py file.
|
|
"""
|
|
|
|
import sys
|
|
import os
|
|
|
|
# Add parent directory to path
|
|
current_dir = os.path.dirname(os.path.abspath(__file__))
|
|
parent_dir = os.path.dirname(current_dir)
|
|
sys.path.insert(0, parent_dir)
|
|
|
|
def test_imports():
|
|
"""Test that all required imports work."""
|
|
print("🔍 Testing imports...")
|
|
|
|
try:
|
|
from cluster4npu_ui.ui.windows.dashboard import IntegratedPipelineDashboard, StageCountWidget
|
|
print("✅ Dashboard components imported successfully")
|
|
|
|
# Test PyQt5 imports
|
|
from PyQt5.QtWidgets import QApplication, QWidget
|
|
from PyQt5.QtCore import QTimer
|
|
print("✅ PyQt5 components imported successfully")
|
|
|
|
return True
|
|
except Exception as e:
|
|
print(f"❌ Import failed: {e}")
|
|
return False
|
|
|
|
def test_stage_count_widget():
|
|
"""Test StageCountWidget functionality."""
|
|
print("\n🔍 Testing StageCountWidget...")
|
|
|
|
try:
|
|
from PyQt5.QtWidgets import QApplication
|
|
from cluster4npu_ui.ui.windows.dashboard import StageCountWidget
|
|
|
|
# Create application if needed
|
|
app = QApplication.instance()
|
|
if app is None:
|
|
app = QApplication([])
|
|
|
|
# Create widget
|
|
widget = StageCountWidget()
|
|
print("✅ StageCountWidget created successfully")
|
|
|
|
# Test stage count updates
|
|
widget.update_stage_count(0, True, "")
|
|
assert widget.stage_count == 0
|
|
print("✅ Initial stage count test passed")
|
|
|
|
widget.update_stage_count(3, True, "")
|
|
assert widget.stage_count == 3
|
|
assert widget.pipeline_valid == True
|
|
print("✅ Valid pipeline test passed")
|
|
|
|
widget.update_stage_count(1, False, "Test error")
|
|
assert widget.stage_count == 1
|
|
assert widget.pipeline_valid == False
|
|
assert widget.pipeline_error == "Test error"
|
|
print("✅ Error state test passed")
|
|
|
|
return True
|
|
except Exception as e:
|
|
print(f"❌ StageCountWidget test failed: {e}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
return False
|
|
|
|
def test_dashboard_methods():
|
|
"""Test that dashboard methods exist and are callable."""
|
|
print("\n🔍 Testing Dashboard methods...")
|
|
|
|
try:
|
|
from cluster4npu_ui.ui.windows.dashboard import IntegratedPipelineDashboard
|
|
|
|
# Check critical methods exist
|
|
required_methods = [
|
|
'setup_analysis_timer',
|
|
'schedule_analysis',
|
|
'analyze_pipeline',
|
|
'print_pipeline_analysis',
|
|
'create_pipeline_toolbar',
|
|
'clear_pipeline',
|
|
'validate_pipeline'
|
|
]
|
|
|
|
for method_name in required_methods:
|
|
if hasattr(IntegratedPipelineDashboard, method_name):
|
|
method = getattr(IntegratedPipelineDashboard, method_name)
|
|
if callable(method):
|
|
print(f"✅ Method {method_name} exists and is callable")
|
|
else:
|
|
print(f"❌ Method {method_name} exists but is not callable")
|
|
return False
|
|
else:
|
|
print(f"❌ Method {method_name} does not exist")
|
|
return False
|
|
|
|
print("✅ All required methods are present and callable")
|
|
return True
|
|
except Exception as e:
|
|
print(f"❌ Dashboard methods test failed: {e}")
|
|
return False
|
|
|
|
def test_pipeline_analysis_functions():
|
|
"""Test pipeline analysis function imports."""
|
|
print("\n🔍 Testing pipeline analysis functions...")
|
|
|
|
try:
|
|
from cluster4npu_ui.ui.windows.dashboard import get_pipeline_summary, get_stage_count, analyze_pipeline_stages
|
|
print("✅ Pipeline analysis functions imported (or fallbacks created)")
|
|
|
|
# Test fallback functions with None input
|
|
try:
|
|
result = get_pipeline_summary(None)
|
|
print(f"✅ get_pipeline_summary fallback works: {result}")
|
|
|
|
count = get_stage_count(None)
|
|
print(f"✅ get_stage_count fallback works: {count}")
|
|
|
|
stages = analyze_pipeline_stages(None)
|
|
print(f"✅ analyze_pipeline_stages fallback works: {stages}")
|
|
|
|
except Exception as e:
|
|
print(f"⚠️ Fallback functions exist but may need graph input: {e}")
|
|
|
|
return True
|
|
except Exception as e:
|
|
print(f"❌ Pipeline analysis functions test failed: {e}")
|
|
return False
|
|
|
|
def run_all_tests():
|
|
"""Run all integration tests."""
|
|
print("🚀 Starting pipeline editor integration tests...\n")
|
|
|
|
tests = [
|
|
test_imports,
|
|
test_stage_count_widget,
|
|
test_dashboard_methods,
|
|
test_pipeline_analysis_functions
|
|
]
|
|
|
|
passed = 0
|
|
total = len(tests)
|
|
|
|
for test_func in tests:
|
|
try:
|
|
if test_func():
|
|
passed += 1
|
|
else:
|
|
print(f"❌ Test {test_func.__name__} failed")
|
|
except Exception as e:
|
|
print(f"❌ Test {test_func.__name__} raised exception: {e}")
|
|
|
|
print(f"\n📊 Test Results: {passed}/{total} tests passed")
|
|
|
|
if passed == total:
|
|
print("🎉 All integration tests passed! Pipeline editor functionality has been successfully integrated into dashboard.")
|
|
return True
|
|
else:
|
|
print("❌ Some tests failed. Integration may have issues.")
|
|
return False
|
|
|
|
if __name__ == "__main__":
|
|
success = run_all_tests()
|
|
sys.exit(0 if success else 1) |