cluster4npu/tests/unit/test_optimization_engine.py
abin 55040733fe feat: implement Phase 1-4 performance visualization and device management
Phase 1 — Performance Benchmarking:
- PerformanceBenchmarker: sequential vs parallel benchmark with injectable runner
- PerformanceHistory: JSON-backed benchmark history with regression support
- PerformanceDashboard: real-time FPS/latency display widget
- BenchmarkDialog: one-click benchmark with 3-phase progress bar

Phase 2 — Device Management:
- DeviceManager: NPU dongle scan, assign/unassign, load balance recommendation
- DeviceManagementPanel: live device status cards with auto-refresh
- BottleneckAlert: dataclass for pipeline bottleneck detection

Phase 3 — Advanced Features:
- OptimizationEngine: 3 optimization rules (rebalance/adjust_queue/add_devices)
- TemplateManager: 3 built-in pipeline templates (YOLOv5, fire detection, dual-model)

Phase 4 — Report Export:
- ReportExporter: PDF (reportlab, optional) and CSV export
- ExportReportDialog: format selection + path picker UI

192 unit tests, all passing.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-06 19:32:05 +08:00

365 lines
15 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
tests/unit/test_optimization_engine.py
TDD Phase 3.3.1 — OptimizationEngine 單元測試。
覆蓋範圍:
- analyze_pipeline 的三條優化規則(含邊界值測試)
- predict_performance 計算邏輯
- apply_suggestion 對 rebalance_devices 呼叫 device_manager
"""
import pytest
from unittest.mock import MagicMock, call
from core.optimization.engine import OptimizationEngine, OptimizationSuggestion
from core.device.device_manager import DeviceInfo
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
@pytest.fixture
def engine():
return OptimizationEngine()
def _make_stats(
stage_fill_rates=None,
stage_avg_times=None,
device_utilizations=None,
):
"""建立 analyze_pipeline 接受的 stats 字典。"""
stage_fill_rates = stage_fill_rates or {}
stage_avg_times = stage_avg_times or {}
device_utilizations = device_utilizations or {}
stages = {}
all_stage_ids = set(stage_fill_rates) | set(stage_avg_times)
for sid in all_stage_ids:
stages[sid] = {
"queue_fill_rate": stage_fill_rates.get(sid, 0.0),
"avg_processing_time": stage_avg_times.get(sid, 10.0),
"fps": 30.0,
}
devices = {}
for did, util in device_utilizations.items():
devices[did] = {
"utilization_pct": util,
"series": "KL720",
}
return {"stages": stages, "devices": devices}
def _make_device_info(device_id="usb-1", gops=28, series="KL720"):
return DeviceInfo(
device_id=device_id,
series=series,
product_id=0x720,
status="online",
gops=gops,
assigned_stage=None,
current_fps=0.0,
utilization_pct=0.0,
)
# ---------------------------------------------------------------------------
# analyze_pipeline — rule 1: rebalance_devices
# ---------------------------------------------------------------------------
class TestAnalyzePipelineRebalanceDevices:
"""queue_fill_rate > 0.70 應觸發 rebalance_devices 建議。"""
def test_should_suggest_rebalance_when_fill_rate_above_threshold(self, engine):
stats = _make_stats(stage_fill_rates={"stage_0": 0.71})
suggestions = engine.analyze_pipeline(stats)
types = [s.type for s in suggestions]
assert "rebalance_devices" in types
def test_should_not_suggest_rebalance_when_fill_rate_at_threshold(self, engine):
"""恰好等於 0.70 不觸發(需 > 0.70)。"""
stats = _make_stats(stage_fill_rates={"stage_0": 0.70})
suggestions = engine.analyze_pipeline(stats)
types = [s.type for s in suggestions]
assert "rebalance_devices" not in types
def test_should_not_suggest_rebalance_when_fill_rate_below_threshold(self, engine):
stats = _make_stats(stage_fill_rates={"stage_0": 0.50})
suggestions = engine.analyze_pipeline(stats)
types = [s.type for s in suggestions]
assert "rebalance_devices" not in types
def test_rebalance_suggestion_has_required_fields(self, engine):
stats = _make_stats(stage_fill_rates={"stage_0": 0.85})
suggestions = engine.analyze_pipeline(stats)
rebalance = next(s for s in suggestions if s.type == "rebalance_devices")
assert rebalance.suggestion_id
assert rebalance.description
assert 0.0 <= rebalance.estimated_improvement_pct
assert rebalance.confidence in ("high", "medium", "low")
assert isinstance(rebalance.action_params, dict)
def test_rebalance_action_params_includes_stage_id(self, engine):
stats = _make_stats(stage_fill_rates={"stage_0": 0.85})
suggestions = engine.analyze_pipeline(stats)
rebalance = next(s for s in suggestions if s.type == "rebalance_devices")
assert "stage_id" in rebalance.action_params
# ---------------------------------------------------------------------------
# analyze_pipeline — rule 2: adjust_queue
# ---------------------------------------------------------------------------
class TestAnalyzePipelineAdjustQueue:
"""avg_processing_time 最大/最小比值 > 2.0 應觸發 adjust_queue 建議。"""
def test_should_suggest_adjust_queue_when_ratio_above_threshold(self, engine):
stats = _make_stats(
stage_avg_times={"stage_0": 10.0, "stage_1": 25.0}
)
suggestions = engine.analyze_pipeline(stats)
types = [s.type for s in suggestions]
assert "adjust_queue" in types
def test_should_not_suggest_adjust_queue_when_ratio_at_threshold(self, engine):
"""恰好等於 2.0 不觸發(需 > 2.0)。"""
stats = _make_stats(
stage_avg_times={"stage_0": 10.0, "stage_1": 20.0}
)
suggestions = engine.analyze_pipeline(stats)
types = [s.type for s in suggestions]
assert "adjust_queue" not in types
def test_should_not_suggest_adjust_queue_when_ratio_below_threshold(self, engine):
stats = _make_stats(
stage_avg_times={"stage_0": 10.0, "stage_1": 15.0}
)
suggestions = engine.analyze_pipeline(stats)
types = [s.type for s in suggestions]
assert "adjust_queue" not in types
def test_should_not_suggest_adjust_queue_with_single_stage(self, engine):
"""只有一個 Stage 時無法計算比值,不觸發。"""
stats = _make_stats(stage_avg_times={"stage_0": 100.0})
suggestions = engine.analyze_pipeline(stats)
types = [s.type for s in suggestions]
assert "adjust_queue" not in types
def test_adjust_queue_suggestion_has_required_fields(self, engine):
stats = _make_stats(
stage_avg_times={"stage_0": 10.0, "stage_1": 25.0}
)
suggestions = engine.analyze_pipeline(stats)
adj = next(s for s in suggestions if s.type == "adjust_queue")
assert adj.suggestion_id
assert adj.description
assert adj.confidence in ("high", "medium", "low")
assert isinstance(adj.action_params, dict)
def should_not_suggest_adjust_queue_when_min_processing_time_is_zero(self, engine):
# stage avg_processing_time 為 0 時,比值計算無意義,不應觸發規則
stats = _make_stats(stage_avg_times={"stage_0": 0.0, "stage_1": 50.0})
suggestions = engine.analyze_pipeline(stats)
adjust = [s for s in suggestions if s.type == "adjust_queue"]
assert len(adjust) == 0
# ---------------------------------------------------------------------------
# analyze_pipeline — rule 3: add_devices
# ---------------------------------------------------------------------------
class TestAnalyzePipelineAddDevices:
"""所有 Dongle 使用率 > 85% 應觸發 add_devices 建議。"""
def test_should_suggest_add_devices_when_all_above_threshold(self, engine):
stats = _make_stats(
device_utilizations={"usb-1": 86.0, "usb-2": 90.0}
)
suggestions = engine.analyze_pipeline(stats)
types = [s.type for s in suggestions]
assert "add_devices" in types
def test_should_not_suggest_add_devices_when_one_device_below_threshold(self, engine):
stats = _make_stats(
device_utilizations={"usb-1": 90.0, "usb-2": 80.0}
)
suggestions = engine.analyze_pipeline(stats)
types = [s.type for s in suggestions]
assert "add_devices" not in types
def test_should_not_suggest_add_devices_when_all_at_threshold(self, engine):
"""恰好等於 85% 不觸發(需 > 85%)。"""
stats = _make_stats(
device_utilizations={"usb-1": 85.0, "usb-2": 85.0}
)
suggestions = engine.analyze_pipeline(stats)
types = [s.type for s in suggestions]
assert "add_devices" not in types
def test_should_not_suggest_add_devices_when_no_devices(self, engine):
"""沒有裝置資訊時不觸發。"""
stats = _make_stats(device_utilizations={})
suggestions = engine.analyze_pipeline(stats)
types = [s.type for s in suggestions]
assert "add_devices" not in types
def test_add_devices_suggestion_has_required_fields(self, engine):
stats = _make_stats(
device_utilizations={"usb-1": 90.0, "usb-2": 92.0}
)
suggestions = engine.analyze_pipeline(stats)
add = next(s for s in suggestions if s.type == "add_devices")
assert add.suggestion_id
assert add.description
assert add.confidence in ("high", "medium", "low")
# ---------------------------------------------------------------------------
# analyze_pipeline — empty stats
# ---------------------------------------------------------------------------
class TestAnalyzePipelineEmptyStats:
def test_should_return_empty_list_when_stats_empty(self, engine):
suggestions = engine.analyze_pipeline({"stages": {}, "devices": {}})
assert suggestions == []
# ---------------------------------------------------------------------------
# predict_performance
# ---------------------------------------------------------------------------
class TestPredictPerformance:
"""predict_performance 使用 sum(gops) / num_stages * 0.6 計算 FPS。"""
def test_should_return_expected_fps_with_single_device_single_stage(self, engine):
devices = [_make_device_info(gops=28)]
# estimated_fps = 28 / 1 * 0.6 = 16.8
config = [MagicMock()] # 1 stage
result = engine.predict_performance(config, devices)
assert result["estimated_fps"] == pytest.approx(16.8)
def test_should_return_expected_latency(self, engine):
devices = [_make_device_info(gops=28)]
config = [MagicMock()] # 1 stage
result = engine.predict_performance(config, devices)
# estimated_latency_ms = 1000 / 16.8
assert result["estimated_latency_ms"] == pytest.approx(1000.0 / 16.8, rel=1e-4)
def test_should_return_confidence_range_as_tuple(self, engine):
devices = [_make_device_info(gops=28)]
config = [MagicMock()] # 1 stage
result = engine.predict_performance(config, devices)
low, high = result["confidence_range"]
fps = result["estimated_fps"]
assert low == pytest.approx(fps * 0.8)
assert high == pytest.approx(fps * 1.2)
def test_should_scale_fps_with_multiple_devices(self, engine):
devices = [
_make_device_info("usb-1", gops=28),
_make_device_info("usb-2", gops=28),
]
config = [MagicMock(), MagicMock()] # 2 stages
result = engine.predict_performance(config, devices)
# estimated_fps = (28 + 28) / 2 * 0.6 = 16.8
assert result["estimated_fps"] == pytest.approx(16.8)
def test_should_decrease_fps_with_more_stages(self, engine):
devices = [_make_device_info(gops=28)]
config_1 = [MagicMock()] # 1 stage
config_4 = [MagicMock()] * 4 # 4 stages
result_1 = engine.predict_performance(config_1, devices)
result_4 = engine.predict_performance(config_4, devices)
assert result_4["estimated_fps"] < result_1["estimated_fps"]
def test_should_handle_zero_stages_without_crash(self, engine):
"""num_stages = 0 時回傳 0 FPS不拋錯"""
devices = [_make_device_info(gops=28)]
result = engine.predict_performance([], devices)
assert result["estimated_fps"] == 0.0
def test_should_return_zero_fps_with_no_devices(self, engine):
config = [MagicMock()]
result = engine.predict_performance(config, [])
assert result["estimated_fps"] == 0.0
# ---------------------------------------------------------------------------
# apply_suggestion
# ---------------------------------------------------------------------------
class TestApplySuggestion:
def _make_rebalance_suggestion(self, stage_id="stage_0", device_id="usb-1"):
return OptimizationSuggestion(
suggestion_id="test-001",
type="rebalance_devices",
description="Rebalance test",
estimated_improvement_pct=10.0,
confidence="medium",
action_params={"stage_id": stage_id, "device_id": device_id},
)
def test_should_call_assign_device_for_rebalance_suggestion(self, engine):
dm = MagicMock()
dm.assign_device.return_value = True
suggestion = self._make_rebalance_suggestion("stage_0", "usb-1")
result = engine.apply_suggestion(suggestion, dm)
dm.assign_device.assert_called_once_with("usb-1", "stage_0")
assert result is True
def test_should_return_false_when_assign_device_fails(self, engine):
dm = MagicMock()
dm.assign_device.return_value = False
suggestion = self._make_rebalance_suggestion()
result = engine.apply_suggestion(suggestion, dm)
assert result is False
def test_should_return_true_for_add_devices_without_calling_assign(self, engine):
dm = MagicMock()
suggestion = OptimizationSuggestion(
suggestion_id="test-002",
type="add_devices",
description="Add more dongles",
estimated_improvement_pct=20.0,
confidence="high",
action_params={},
)
result = engine.apply_suggestion(suggestion, dm)
dm.assign_device.assert_not_called()
assert result is True
def test_should_return_true_for_adjust_queue_without_calling_assign(self, engine):
dm = MagicMock()
suggestion = OptimizationSuggestion(
suggestion_id="test-003",
type="adjust_queue",
description="Adjust queue size",
estimated_improvement_pct=5.0,
confidence="low",
action_params={},
)
result = engine.apply_suggestion(suggestion, dm)
dm.assign_device.assert_not_called()
assert result is True
def should_call_assign_device_with_empty_device_id_when_not_populated(self, engine):
# analyze_pipeline 產生的 rebalance 建議 device_id 預設為空字串
# apply_suggestion 應如實傳遞空字串給 device_manager行為可預期
suggestion = OptimizationSuggestion(
suggestion_id="test",
type="rebalance_devices",
description="test",
estimated_improvement_pct=10.0,
confidence="medium",
action_params={"device_id": "", "stage_id": "stage_0"}
)
mock_dm = MagicMock()
mock_dm.assign_device.return_value = False # 空 device_id 通常回傳 False
result = engine.apply_suggestion(suggestion, mock_dm)
mock_dm.assign_device.assert_called_once_with("", "stage_0")
# result 取決於 assign_device 回傳值
assert result == False