cluster4npu/core/performance/benchmarker.py
abin 55040733fe feat: implement Phase 1-4 performance visualization and device management
Phase 1 — Performance Benchmarking:
- PerformanceBenchmarker: sequential vs parallel benchmark with injectable runner
- PerformanceHistory: JSON-backed benchmark history with regression support
- PerformanceDashboard: real-time FPS/latency display widget
- BenchmarkDialog: one-click benchmark with 3-phase progress bar

Phase 2 — Device Management:
- DeviceManager: NPU dongle scan, assign/unassign, load balance recommendation
- DeviceManagementPanel: live device status cards with auto-refresh
- BottleneckAlert: dataclass for pipeline bottleneck detection

Phase 3 — Advanced Features:
- OptimizationEngine: 3 optimization rules (rebalance/adjust_queue/add_devices)
- TemplateManager: 3 built-in pipeline templates (YOLOv5, fire detection, dual-model)

Phase 4 — Report Export:
- ReportExporter: PDF (reportlab, optional) and CSV export
- ExportReportDialog: format selection + path picker UI

192 unit tests, all passing.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-06 19:32:05 +08:00

248 lines
8.3 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
core/performance/benchmarker.py — 效能基準測試模組。
提供 BenchmarkConfig、BenchmarkResult 資料結構,
以及 PerformanceBenchmarker 執行單/多裝置效能測試並計算加速倍數。
設計重點:
- 實際推論呼叫透過 inference_runner callable 注入,
方便在沒有硬體的環境下進行單元測試(注入 Mock
- 純計算邏輯calculate_speedup 等)可直接測試,無需 Mock。
使用範例(測試環境):
config = BenchmarkConfig(pipeline_config=[], test_input_source="test.mp4")
benchmarker = PerformanceBenchmarker()
def mock_runner(frame_data):
return {"result": "ok"}
seq = benchmarker.run_sequential_benchmark(config, inference_runner=mock_runner)
par = benchmarker.run_parallel_benchmark(config, inference_runner=mock_runner)
speedup = benchmarker.calculate_speedup(seq, par)
"""
import time
import statistics
from dataclasses import dataclass, field
from typing import Any, Callable, Dict, List, Optional, Tuple
@dataclass
class BenchmarkConfig:
"""Benchmark 測試設定。
屬性:
pipeline_config: Pipeline 各 Stage 的設定列表(來自 UI
test_input_source: 測試輸入來源(影片檔路徑或相機索引)。
test_duration_seconds: 測試持續時間(秒),不含暖機階段。
warmup_frames: 暖機幀數,不計入統計。
"""
pipeline_config: List[Any]
test_input_source: str
test_duration_seconds: float = 30.0
warmup_frames: int = 50
@dataclass
class BenchmarkResult:
"""單次 Benchmark 的測試結果。
屬性:
mode: 測試模式,'sequential'(單裝置)或 'parallel'(多裝置)。
fps: 每秒幀數。
avg_latency_ms: 平均推論延遲(毫秒)。
p95_latency_ms: 95th percentile 延遲(毫秒)。
total_frames: 測試期間處理的總幀數(不含暖機)。
timestamp: 測試開始的 Unix timestamp。
device_config: 裝置分配設定,例如 {"KL520": 1}。
id: 唯一識別碼(由 PerformanceHistory.record() 填入)。
"""
mode: str
fps: float
avg_latency_ms: float
p95_latency_ms: float
total_frames: int
timestamp: float
device_config: Dict[str, Any]
id: Optional[str] = field(default=None)
class PerformanceBenchmarker:
"""執行單裝置 vs 多裝置效能測試,計算加速倍數。
設計為可測試性Testability-First
- run_sequential_benchmark / run_parallel_benchmark 接受 inference_runner 參數,
讓測試時可注入 Mock 而不需要真實硬體。
- calculate_speedup 為純函式,直接接受 BenchmarkResult 計算。
屬性:
device_config: 裝置設定資訊,會填入 BenchmarkResult.device_config。
"""
def __init__(self, device_config: Optional[Dict[str, Any]] = None):
"""初始化 PerformanceBenchmarker。
參數:
device_config: 裝置設定,例如 {"KL520": 1}。未指定時使用空字典。
"""
self.device_config: Dict[str, Any] = device_config or {}
# ------------------------------------------------------------------
# 公開介面
# ------------------------------------------------------------------
def run_sequential_benchmark(
self,
config: BenchmarkConfig,
inference_runner: Optional[Callable[[Any], Any]] = None,
) -> BenchmarkResult:
"""以單裝置(循序)模式執行 Benchmark。
參數:
config: 測試設定。
inference_runner: 推論執行函式,簽名為 ``(frame_data: Any) -> Any``。
若為 None使用 no-op 函式(僅供架構驗證)。
回傳:
mode='sequential' 的 BenchmarkResult。
"""
runner = inference_runner or self._default_runner
return self._run_benchmark(config, runner, mode="sequential")
def run_parallel_benchmark(
self,
config: BenchmarkConfig,
inference_runner: Optional[Callable[[Any], Any]] = None,
) -> BenchmarkResult:
"""以多裝置(平行)模式執行 Benchmark。
參數:
config: 測試設定。
inference_runner: 推論執行函式,簽名為 ``(frame_data: Any) -> Any``。
若為 None使用 no-op 函式(僅供架構驗證)。
回傳:
mode='parallel' 的 BenchmarkResult。
"""
runner = inference_runner or self._default_runner
return self._run_benchmark(config, runner, mode="parallel")
def calculate_speedup(
self,
seq: BenchmarkResult,
par: BenchmarkResult,
) -> float:
"""計算平行相對於循序的加速倍數。
計算公式par.fps / seq.fps
參數:
seq: 循序模式的 BenchmarkResult。
par: 平行模式的 BenchmarkResult。
回傳:
加速倍數float
引發:
ValueError: 當 seq.fps <= 0 時(避免除以零)。
"""
if seq.fps <= 0:
raise ValueError(
f"循序模式的 FPS 必須大於 0收到{seq.fps}"
)
return par.fps / seq.fps
def run_full_benchmark(
self,
config: BenchmarkConfig,
inference_runner: Optional[Callable[[Any], Any]] = None,
) -> Tuple[BenchmarkResult, BenchmarkResult, float]:
"""執行完整 Benchmark循序 → 平行 → 計算加速倍數。
執行序列:
1. 執行循序 Benchmark
2. 執行平行 Benchmark
3. 計算加速倍數
參數:
config: 測試設定。
inference_runner: 推論執行函式(可注入 Mock
回傳:
Tuple[BenchmarkResult, BenchmarkResult, float]
即 (sequential_result, parallel_result, speedup)。
"""
seq_result = self.run_sequential_benchmark(config, inference_runner)
par_result = self.run_parallel_benchmark(config, inference_runner)
speedup = self.calculate_speedup(seq_result, par_result)
return seq_result, par_result, speedup
# ------------------------------------------------------------------
# 內部實作
# ------------------------------------------------------------------
def _run_benchmark(
self,
config: BenchmarkConfig,
runner: Callable[[Any], Any],
mode: str,
) -> BenchmarkResult:
"""執行 Benchmark 的共用邏輯。
流程:
1. 暖機warmup_frames 幀,不計入統計)
2. 正式測試test_duration_seconds 秒)
3. 計算 FPS、平均延遲、p95 延遲
參數:
config: 測試設定。
runner: 推論執行函式。
mode: 'sequential''parallel'
回傳:
BenchmarkResult。
"""
# 暖機階段
for _ in range(config.warmup_frames):
runner(None)
# 正式測試
latencies: List[float] = []
test_start = time.time()
while time.time() - test_start < config.test_duration_seconds:
frame_start = time.time()
runner(None)
frame_end = time.time()
latencies.append((frame_end - frame_start) * 1000.0) # 轉換為毫秒
total_frames = len(latencies)
elapsed = time.time() - test_start
# 計算統計數值
if total_frames == 0:
fps = 0.0
avg_latency_ms = 0.0
p95_latency_ms = 0.0
else:
fps = total_frames / elapsed if elapsed > 0 else 0.0
avg_latency_ms = statistics.mean(latencies)
sorted_latencies = sorted(latencies)
p95_index = int(len(sorted_latencies) * 0.95)
p95_latency_ms = sorted_latencies[min(p95_index, len(sorted_latencies) - 1)]
return BenchmarkResult(
mode=mode,
fps=fps,
avg_latency_ms=avg_latency_ms,
p95_latency_ms=p95_latency_ms,
total_frames=total_frames,
timestamp=test_start,
device_config=dict(self.device_config),
)
@staticmethod
def _default_runner(frame_data: Any) -> Any:
"""預設的推論執行函式no-op僅供架構驗證"""
return None