forked from masonhuang/cluster4npu
Phase 1 — Performance Benchmarking: - PerformanceBenchmarker: sequential vs parallel benchmark with injectable runner - PerformanceHistory: JSON-backed benchmark history with regression support - PerformanceDashboard: real-time FPS/latency display widget - BenchmarkDialog: one-click benchmark with 3-phase progress bar Phase 2 — Device Management: - DeviceManager: NPU dongle scan, assign/unassign, load balance recommendation - DeviceManagementPanel: live device status cards with auto-refresh - BottleneckAlert: dataclass for pipeline bottleneck detection Phase 3 — Advanced Features: - OptimizationEngine: 3 optimization rules (rebalance/adjust_queue/add_devices) - TemplateManager: 3 built-in pipeline templates (YOLOv5, fire detection, dual-model) Phase 4 — Report Export: - ReportExporter: PDF (reportlab, optional) and CSV export - ExportReportDialog: format selection + path picker UI 192 unit tests, all passing. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
225 lines
8.5 KiB
Python
225 lines
8.5 KiB
Python
"""
|
||
PerformanceHistory 的單元測試。
|
||
|
||
測試覆蓋:
|
||
- 記錄 BenchmarkResult
|
||
- 依條件查詢歷史記錄(limit / mode 過濾)
|
||
- 回歸比較報告
|
||
- 持久化(JSON 讀寫)
|
||
"""
|
||
import json
|
||
import os
|
||
import time
|
||
import tempfile
|
||
import pytest
|
||
|
||
from core.performance.benchmarker import BenchmarkResult
|
||
from core.performance.history import PerformanceHistory
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# 輔助函式
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def make_result(mode: str = "sequential", fps: float = 30.0, avg_latency_ms: float = 33.3,
|
||
p95_latency_ms: float = 50.0, total_frames: int = 900) -> BenchmarkResult:
|
||
"""建立測試用的 BenchmarkResult。"""
|
||
return BenchmarkResult(
|
||
mode=mode,
|
||
fps=fps,
|
||
avg_latency_ms=avg_latency_ms,
|
||
p95_latency_ms=p95_latency_ms,
|
||
total_frames=total_frames,
|
||
timestamp=time.time(),
|
||
device_config={"KL520": 1},
|
||
)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Fixture
|
||
# ---------------------------------------------------------------------------
|
||
|
||
@pytest.fixture
|
||
def tmp_history(tmp_path):
|
||
"""回傳一個使用暫存路徑的 PerformanceHistory 實例。"""
|
||
storage_path = str(tmp_path / "benchmark_history.json")
|
||
return PerformanceHistory(storage_path=storage_path)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# 測試:基本記錄功能
|
||
# ---------------------------------------------------------------------------
|
||
|
||
class TestRecord:
|
||
def should_record_result_to_storage(self, tmp_history):
|
||
"""record() 應將結果寫入 JSON 儲存。"""
|
||
result = make_result()
|
||
tmp_history.record(result)
|
||
|
||
records = tmp_history.get_history()
|
||
assert len(records) == 1
|
||
|
||
def should_persist_across_instances(self, tmp_path):
|
||
"""record() 應將資料持久化,重新建立實例後仍可讀取。"""
|
||
storage_path = str(tmp_path / "benchmark_history.json")
|
||
history1 = PerformanceHistory(storage_path=storage_path)
|
||
result = make_result(fps=42.0)
|
||
history1.record(result)
|
||
|
||
history2 = PerformanceHistory(storage_path=storage_path)
|
||
records = history2.get_history()
|
||
assert len(records) == 1
|
||
assert records[0].fps == 42.0
|
||
|
||
def should_assign_unique_id_to_each_record(self, tmp_history):
|
||
"""每筆記錄應有唯一的 id。"""
|
||
tmp_history.record(make_result())
|
||
time.sleep(0.01)
|
||
tmp_history.record(make_result())
|
||
|
||
records = tmp_history.get_history()
|
||
ids = [r.id for r in records]
|
||
assert len(set(ids)) == 2
|
||
|
||
def should_store_all_benchmark_fields(self, tmp_history):
|
||
"""record() 應完整儲存所有欄位。"""
|
||
result = make_result(
|
||
mode="parallel",
|
||
fps=60.5,
|
||
avg_latency_ms=16.5,
|
||
p95_latency_ms=25.0,
|
||
total_frames=1815,
|
||
)
|
||
tmp_history.record(result)
|
||
|
||
saved = tmp_history.get_history()[0]
|
||
assert saved.mode == "parallel"
|
||
assert saved.fps == pytest.approx(60.5)
|
||
assert saved.avg_latency_ms == pytest.approx(16.5)
|
||
assert saved.p95_latency_ms == pytest.approx(25.0)
|
||
assert saved.total_frames == 1815
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# 測試:get_history 查詢
|
||
# ---------------------------------------------------------------------------
|
||
|
||
class TestGetHistory:
|
||
def should_return_records_in_reverse_chronological_order(self, tmp_history):
|
||
"""get_history() 應以最新優先的順序回傳記錄。"""
|
||
base_time = 1000000.0
|
||
for i, fps in enumerate([10.0, 20.0, 30.0]):
|
||
result = make_result(fps=fps)
|
||
result.timestamp = base_time + i # 確保時間戳遞增
|
||
tmp_history.record(result)
|
||
|
||
records = tmp_history.get_history()
|
||
fps_values = [r.fps for r in records]
|
||
# 最新優先:fps=30 (timestamp最大) 排第一
|
||
assert fps_values == [30.0, 20.0, 10.0]
|
||
|
||
def should_respect_limit_parameter(self, tmp_history):
|
||
"""get_history(limit=N) 應只回傳最新的 N 筆記錄。"""
|
||
for i in range(5):
|
||
tmp_history.record(make_result(fps=float(i + 1)))
|
||
|
||
records = tmp_history.get_history(limit=3)
|
||
assert len(records) == 3
|
||
|
||
def should_filter_by_mode(self, tmp_history):
|
||
"""get_history(mode='parallel') 應只回傳 parallel 模式的記錄。"""
|
||
tmp_history.record(make_result(mode="sequential"))
|
||
tmp_history.record(make_result(mode="parallel"))
|
||
tmp_history.record(make_result(mode="sequential"))
|
||
|
||
records = tmp_history.get_history(mode="parallel")
|
||
assert len(records) == 1
|
||
assert records[0].mode == "parallel"
|
||
|
||
def should_return_empty_list_when_no_records(self, tmp_history):
|
||
"""空儲存應回傳空列表。"""
|
||
records = tmp_history.get_history()
|
||
assert records == []
|
||
|
||
def should_apply_limit_after_mode_filter(self, tmp_history):
|
||
"""limit 應在 mode 過濾之後套用。"""
|
||
for _ in range(4):
|
||
tmp_history.record(make_result(mode="sequential"))
|
||
for _ in range(4):
|
||
tmp_history.record(make_result(mode="parallel"))
|
||
|
||
records = tmp_history.get_history(limit=2, mode="parallel")
|
||
assert len(records) == 2
|
||
assert all(r.mode == "parallel" for r in records)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# 測試:回歸報告
|
||
# ---------------------------------------------------------------------------
|
||
|
||
class TestGetRegressionReport:
|
||
def should_report_fps_improvement(self, tmp_history):
|
||
"""get_regression_report() 應計算 FPS 改善百分比。"""
|
||
baseline = make_result(fps=30.0, avg_latency_ms=33.3, p95_latency_ms=50.0)
|
||
tmp_history.record(baseline)
|
||
baseline_id = tmp_history.get_history()[0].id
|
||
|
||
compare = make_result(fps=45.0, avg_latency_ms=22.2, p95_latency_ms=35.0)
|
||
tmp_history.record(compare)
|
||
compare_id = tmp_history.get_history()[0].id # 最新一筆
|
||
|
||
report = tmp_history.get_regression_report(baseline_id, compare_id)
|
||
|
||
assert "fps_change_pct" in report
|
||
assert report["fps_change_pct"] == pytest.approx(50.0, rel=1e-2)
|
||
|
||
def should_report_latency_change(self, tmp_history):
|
||
"""get_regression_report() 應計算延遲變化百分比。"""
|
||
baseline = make_result(avg_latency_ms=40.0, p95_latency_ms=60.0)
|
||
tmp_history.record(baseline)
|
||
baseline_id = tmp_history.get_history()[0].id
|
||
|
||
compare = make_result(avg_latency_ms=20.0, p95_latency_ms=30.0)
|
||
tmp_history.record(compare)
|
||
compare_id = tmp_history.get_history()[0].id
|
||
|
||
report = tmp_history.get_regression_report(baseline_id, compare_id)
|
||
|
||
assert "avg_latency_change_pct" in report
|
||
assert report["avg_latency_change_pct"] == pytest.approx(-50.0, rel=1e-2)
|
||
|
||
def should_raise_error_for_invalid_id(self, tmp_history):
|
||
"""無效的 id 應引發 ValueError。"""
|
||
with pytest.raises(ValueError):
|
||
tmp_history.get_regression_report("nonexistent_baseline", "nonexistent_compare")
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# 測試:JSON 檔案格式
|
||
# ---------------------------------------------------------------------------
|
||
|
||
class TestStorageFormat:
|
||
def should_produce_valid_json_file(self, tmp_path):
|
||
"""儲存的檔案應為合法的 JSON 並符合規格格式。"""
|
||
storage_path = str(tmp_path / "benchmark_history.json")
|
||
history = PerformanceHistory(storage_path=storage_path)
|
||
history.record(make_result(mode="parallel", fps=45.2))
|
||
|
||
with open(storage_path, "r", encoding="utf-8") as f:
|
||
data = json.load(f)
|
||
|
||
assert "records" in data
|
||
assert len(data["records"]) == 1
|
||
record = data["records"][0]
|
||
for field in ("id", "mode", "fps", "avg_latency_ms", "p95_latency_ms",
|
||
"total_frames", "timestamp", "device_config"):
|
||
assert field in record, f"缺少欄位:{field}"
|
||
|
||
def should_create_parent_directory_if_not_exists(self, tmp_path):
|
||
"""若父目錄不存在,應自動建立。"""
|
||
storage_path = str(tmp_path / "deep" / "nested" / "history.json")
|
||
history = PerformanceHistory(storage_path=storage_path)
|
||
history.record(make_result())
|
||
|
||
assert os.path.exists(storage_path)
|