visionA/local-tool/server/scripts/test_kneron_bridge_firmware.py
jim800121chen d7b5a2398a feat(local-tool): M9-1 — bridge.py firmware_upgrade handler(KL520+KL720 KDP1→KDP2)
A 階段第一個 milestone、純 bridge.py 層 + ctypes 直接呼叫 KneronPLUS C symbol。

Source:
- server/scripts/kneron_bridge.py: 1207 → 2058 行(+851)
- server/scripts/test_kneron_bridge_firmware.py: 新檔 840 行、36 unit tests 全綠 0.076s

Firmware bundled:
- server/scripts/firmware/KL520/fw_loader.bin(90112 bytes、MD5 aef7cca17bc023abbd6152c46c18e774、與 warrenchen 一致)
- server/scripts/firmware/{KL520,KL720}/VERSION(v2.2.0)

實作對齊 TDD §6.1 規格(98% 對齊度):
- handler input/output schema 100%
- stage enum: preparing/loading/flashing/verifying/done/error(採 Design 命名)
- reason enum 7/8(disconnect_during_op 留 M9-5 實機測試)
- ctypes binding 1:1 對齊 warrenchen legacy_plus121_runner.py
- 4 個情境 stage 序列驗證通過(KL520 KDP1+loader / KL520 KDP1 缺 loader / KL720 legacy / 已 KDP2)
- timeout 60s/200s、USB stable 5-8s wait、SIGTERM 拒絕邏輯
- progress event schema 完整(percent/stage/message/elapsed_ms/eta_ms/extra)

Reviewer 兩輪審查:
- 第 1 輪:0 Critical / 3 Major / 4 Minor / 4 Suggestion
- 第 2 輪:通過 with 1 Minor + 1 Suggestion(m5 test 死碼 / s5 test 註解、留 M9-2 順手清)
- M3 firmware 字串覆蓋從 substring → 顯式 enumeration + KDP3+ forward-compat(防未來 brick 風險)
- M2 控制流重構(needs_loader/should_run_loader_stage/loader_required_but_missing 三個顯式 bool)
- m3 single-owner disconnect 原則完整落地

既有 6 個 handler(scan/connect/disconnect/reset/load_model/inference)零改動、無 spillover risk。

下一步:M9-2 Go driver UpgradeFirmware + firmware/service.go

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-25 08:10:46 +08:00

841 lines
36 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""Unit tests for kneron_bridge.handle_firmware_upgrade (A 階段 M9-1).
Mock-based tests — no real Kneron dongle needed. Covers TDD §6.1 handler
contract:
- 5 successful path stages all fire progress events
- 4 failure paths (scan_not_found / connect_failed / loader_write_failed /
verify_mismatch)
- timeout护栏 (KL520 60s / KL720 200s)
- graceful shutdown SIGTERM rejection during upgrade in progress
執行方式:
cd server/scripts && python3 test_kneron_bridge_firmware.py
"""
from __future__ import annotations
import io
import json
import os
import sys
import time
import unittest
from unittest import mock
# 確保 import 路徑正確
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
# ── 在 import bridge 前 fake kp module避免實機相依─────────────────
class _FakeKpCore:
def scan_devices(self):
raise NotImplementedError("must be patched per test")
def disconnect_devices(self, *args, **kwargs):
return 0
class _FakeKp:
core = _FakeKpCore()
# 注入 fake kp 給 bridge 在 import 時取代真實 kp
sys.modules.setdefault("kp", _FakeKp())
import kneron_bridge as bridge # noqa: E402
# ── Helperfake device descriptor ───────────────────────────────────
class FakeDeviceDescriptor:
def __init__(self, usb_port_id, product_id, firmware, kn_number=0x12345678,
is_connectable=True):
self.usb_port_id = usb_port_id
self.product_id = product_id
self.firmware = firmware
self.kn_number = kn_number
self.is_connectable = is_connectable
class FakeDeviceList:
def __init__(self, devices):
self.device_descriptor_list = devices
self.device_descriptor_number = len(devices)
# ── Helperfake libkplusctypes.CDLL 替身)─────────────────────────
class FakeLib:
"""Mock libkplus shared library with same surface as ctypes binding."""
def __init__(self):
self.upgrade_calls = [] # list of (scpu_or_loader, ncpu_or_None, auto_reboot)
self.load_calls = [] # list of (scpu, ncpu)
self.connect_calls = []
self.disconnect_calls = 0
self.timeout_calls = []
# 控制 mock 行為的 knob
self.upgrade_return = 0
self.load_return = 0
self.connect_return = (0xCAFEBABE, 0) # (handle, status)
# 模擬 time.sleep 時間(測試端不真睡)
self._sleep_skipped = True
def kp_scan_devices(self):
return 0xDEADBEEF # 不會被用到_fw_scan_target 走 kp.core.scan_devices
def kp_connect_devices(self, n, ports_ptr, status_ptr):
# ctypes c_int.value 取出
port_id = ports_ptr[0]
self.connect_calls.append(port_id)
handle, status = self.connect_return
status_ptr._obj.value = status
return handle
def kp_set_timeout(self, dg, ms):
self.timeout_calls.append(ms)
def kp_load_firmware_from_file(self, dg, scpu, ncpu):
self.load_calls.append((scpu, ncpu))
return self.load_return
def kp_update_kdp_firmware_from_files(self, dg, scpu_or_loader, ncpu_or_none, auto_reboot):
self.upgrade_calls.append((scpu_or_loader, ncpu_or_none, auto_reboot))
return self.upgrade_return
def kp_disconnect_devices(self, dg):
self.disconnect_calls += 1
return 0
def kp_error_string(self, code):
return f"mock_err({code})".encode("utf-8")
# ── 共用 fixture每個 test 用乾淨 FakeLib + sleep stub ─────────────
class FirmwareUpgradeTestBase(unittest.TestCase):
"""Patches common to all tests so handler doesn't touch real Kneron stack."""
def setUp(self):
self.fake_lib = FakeLib()
# 收集所有 progress events
self.progress_events = []
# 真實 stderr 改 catch、避免 test output 髒
self._stderr_capture = io.StringIO()
# Patch HAS_KP = True
self._has_kp_patch = mock.patch.object(bridge, "HAS_KP", True)
self._has_kp_patch.start()
# Patch _fw_load_libkplus → return our FakeLib
self._load_lib_patch = mock.patch.object(
bridge, "_fw_load_libkplus", return_value=self.fake_lib
)
self._load_lib_patch.start()
# Patch firmware path resolver預設 scpu/ncpu/loader 都齊
self._fw_paths = {
"scpu": "/fake/firmware/KL520/fw_scpu.bin",
"ncpu": "/fake/firmware/KL520/fw_ncpu.bin",
"loader": "/fake/firmware/KL520/fw_loader.bin",
"version": "v2.2.0",
}
self._resolve_paths_patch = mock.patch.object(
bridge, "_resolve_firmware_paths_full",
side_effect=lambda chip: self._fw_paths,
)
self._resolve_paths_patch.start()
# Patch time.sleep → no-op測試端不真睡
self._sleep_patch = mock.patch.object(bridge.time, "sleep", lambda x: None)
self._sleep_patch.start()
# Patch _fw_emit_progress 收集事件、同時仍寫一份到 stderr stub
original_emit = bridge._fw_emit_progress
def _capture_emit(stage, message="", elapsed_ms=0, eta_ms=0, extra=None):
event = {
"stage": stage,
"message": message,
"elapsed_ms": elapsed_ms,
"eta_ms": eta_ms,
}
if extra:
event.update(extra)
self.progress_events.append(event)
self._emit_patch = mock.patch.object(
bridge, "_fw_emit_progress", side_effect=_capture_emit
)
self._emit_patch.start()
# Patch _clear_device_group → no-op避免 touch _device_group 全域)
self._clear_patch = mock.patch.object(
bridge, "_clear_device_group", lambda: None
)
self._clear_patch.start()
def tearDown(self):
self._has_kp_patch.stop()
self._load_lib_patch.stop()
self._resolve_paths_patch.stop()
self._sleep_patch.stop()
self._emit_patch.stop()
self._clear_patch.stop()
# 確保 sigterm handler 還原(避免 test 間互相影響)
try:
bridge._fw_unregister_sigterm_handler()
except Exception:
pass
bridge._firmware_upgrade_in_progress = False
def stub_scan_returning(self, *device_lists):
"""Patch kp.core.scan_devices 依次回不同的 device list.
Args:
*device_lists: 每個 list 是 [FakeDeviceDescriptor, ...]
"""
results = [FakeDeviceList(devs) for devs in device_lists]
it = iter(results)
def _next_scan():
try:
return next(it)
except StopIteration:
# 多餘的 scan call 重複回最後一個結果(測試容忍)
return results[-1] if results else FakeDeviceList([])
return mock.patch.object(bridge.kp.core, "scan_devices", side_effect=_next_scan)
# ── 5 個成功路徑測試 ──────────────────────────────────────────────────
class TestFirmwareUpgradeSuccess(FirmwareUpgradeTestBase):
def test_kl520_kdp1_legacy_full_5_stages(self):
"""KL520 KDP1 legacy → KDP2preparing/loading/flashing/verifying/done 5 stage 都 fire."""
legacy_dev = FakeDeviceDescriptor(
usb_port_id=42, product_id=0x100, firmware="KDP",
)
post_loader_dev = FakeDeviceDescriptor(
usb_port_id=42, product_id=0x100, firmware="USB Boot Loader",
)
kdp2_dev = FakeDeviceDescriptor(
usb_port_id=42, product_id=0x100, firmware="KDP2",
)
with self.stub_scan_returning([legacy_dev], [post_loader_dev], [kdp2_dev]):
result = bridge.handle_firmware_upgrade({"port": "42", "chip": "KL520"})
self.assertEqual(result["status"], "upgraded", msg=f"unexpected result: {result}")
self.assertEqual(result["before_firmware"], "KDP")
self.assertEqual(result["after_firmware"], "KDP2")
self.assertIn("ctypes", result["method"])
self.assertGreaterEqual(result["duration_ms"], 0)
# 驗證 5 個 stage 都 fire
stages = [e["stage"] for e in self.progress_events]
self.assertEqual(
stages, ["preparing", "loading", "flashing", "verifying", "done"],
msg=f"unexpected stage sequence: {stages}",
)
# KL520 KDP1 legacyloader.bin 寫一次kp_update_kdp_firmware_from_files
# + scpu/ncpu 載一次kp_load_firmware_from_file
self.assertEqual(len(self.fake_lib.upgrade_calls), 1,
msg="loader write should be called exactly once")
self.assertEqual(len(self.fake_lib.load_calls), 1,
msg="kp_load_firmware_from_file should be called once after loader")
def test_kl520_already_kdp2_short_circuit(self):
"""KL520 已是 KDP2跳過 loader stage、直接 flashing用 kp_update_kdp_firmware_from_files."""
kdp2_dev = FakeDeviceDescriptor(
usb_port_id=10, product_id=0x100, firmware="KDP2.5",
)
kdp2_after = FakeDeviceDescriptor(
usb_port_id=10, product_id=0x100, firmware="KDP2.5",
)
with self.stub_scan_returning([kdp2_dev], [kdp2_after]):
result = bridge.handle_firmware_upgrade({"port": "10", "chip": "KL520"})
self.assertEqual(result["status"], "upgraded")
stages = [e["stage"] for e in self.progress_events]
# KDP2 short-circuitpreparing → flashing → verifying → done無 loading
self.assertEqual(stages, ["preparing", "flashing", "verifying", "done"])
# kp_update_kdp_firmware_from_files 用兩 path 模式scpu + ncpu
self.assertEqual(len(self.fake_lib.upgrade_calls), 1)
scpu_path, ncpu_path, auto_reboot = self.fake_lib.upgrade_calls[0]
self.assertIn(b"fw_scpu", scpu_path)
self.assertIn(b"fw_ncpu", ncpu_path)
self.assertTrue(auto_reboot)
def test_kl720_kdp_legacy(self):
"""KL720 KDP1 legacy (pid=0x200):走 flashing 路徑warrenchen 模式)."""
legacy = FakeDeviceDescriptor(
usb_port_id=5, product_id=0x200, firmware="KDP",
)
after = FakeDeviceDescriptor(
usb_port_id=5, product_id=0x720, firmware="KDP2",
)
# KL720 沒 loader.binwarrenchen 也沒附)
self._fw_paths["loader"] = None
with self.stub_scan_returning([legacy], [after]):
result = bridge.handle_firmware_upgrade({"port": "5", "chip": "KL720"})
self.assertEqual(result["status"], "upgraded")
# KL720 legacy 沒 loader.binpreparing → flashing → verifying → done
stages = [e["stage"] for e in self.progress_events]
self.assertEqual(stages, ["preparing", "flashing", "verifying", "done"])
def test_progress_event_schema_has_required_fields(self):
"""進度事件 schema 對齊 TDD §4.2stage, elapsed_ms, eta_ms 必填."""
dev = FakeDeviceDescriptor(usb_port_id=1, product_id=0x100, firmware="KDP2")
with self.stub_scan_returning([dev], [dev]):
bridge.handle_firmware_upgrade({"port": "1", "chip": "KL520"})
for e in self.progress_events:
self.assertIn("stage", e)
self.assertIn("elapsed_ms", e)
self.assertIn("eta_ms", e)
self.assertIsInstance(e["elapsed_ms"], int)
def test_done_stage_returns_duration_ms(self):
"""done event 必須有 duration_mscaller 取 elapsed_ms."""
dev = FakeDeviceDescriptor(usb_port_id=2, product_id=0x100, firmware="KDP2")
with self.stub_scan_returning([dev], [dev]):
result = bridge.handle_firmware_upgrade({"port": "2", "chip": "KL520"})
self.assertIn("duration_ms", result)
# done event 的 elapsed_ms 應該 = duration_msfinishing-time alignment
done_event = [e for e in self.progress_events if e["stage"] == "done"][0]
self.assertEqual(done_event["elapsed_ms"], result["duration_ms"])
# ── 4 個失敗路徑測試 ──────────────────────────────────────────────────
class TestFirmwareUpgradeFailure(FirmwareUpgradeTestBase):
def test_scan_not_found(self):
"""scan 找不到 target portpreparing stage failure with reason=scan_not_found."""
with self.stub_scan_returning([]): # empty scan
result = bridge.handle_firmware_upgrade({"port": "999", "chip": "KL520"})
self.assertIn("error", result)
self.assertEqual(result["stage"], "preparing")
self.assertEqual(result["reason"], "scan_not_found")
self.assertIn("not found", result["error"].lower())
# 應該 fire preparing + error 兩個 event
stages = [e["stage"] for e in self.progress_events]
self.assertIn("preparing", stages)
self.assertIn("error", stages)
def test_connect_failed(self):
"""ctypes connect 回 status != KP_SUCCESSpreparing/connect_failed."""
dev = FakeDeviceDescriptor(usb_port_id=1, product_id=0x100, firmware="KDP")
# 讓 connect 回 non-zero status
self.fake_lib.connect_return = (0, -3) # handle=NULL, status=-3
with self.stub_scan_returning([dev]):
result = bridge.handle_firmware_upgrade({"port": "1", "chip": "KL520"})
self.assertEqual(result["stage"], "preparing")
self.assertEqual(result["reason"], "connect_failed")
# 錯誤訊息應該包含 raw error 線索
self.assertIn("connect", result["error"].lower())
def test_loader_write_failed(self):
"""KL520 KDP1 legacyloader 寫入回 non-zero → loading/loader_write_failed."""
legacy_dev = FakeDeviceDescriptor(usb_port_id=1, product_id=0x100, firmware="KDP")
# 讓 kp_update_kdp_firmware_from_files 回 error code
self.fake_lib.upgrade_return = -7
with self.stub_scan_returning([legacy_dev]):
result = bridge.handle_firmware_upgrade({"port": "1", "chip": "KL520"})
self.assertEqual(result["stage"], "loading")
self.assertEqual(result["reason"], "loader_write_failed")
# loader call 確實有發生
self.assertEqual(len(self.fake_lib.upgrade_calls), 1)
def test_verify_mismatch(self):
"""升級完成但 verify 階段發現 firmware 字串仍 legacy → verify_mismatch."""
legacy_dev = FakeDeviceDescriptor(usb_port_id=1, product_id=0x100, firmware="KDP")
# 升級完仍是 KDP1mockupgrade 成功但 device firmware 字串沒變)
stuck_legacy = FakeDeviceDescriptor(usb_port_id=1, product_id=0x100, firmware="KDP")
with self.stub_scan_returning([legacy_dev], [stuck_legacy], [stuck_legacy]):
result = bridge.handle_firmware_upgrade({"port": "1", "chip": "KL520"})
self.assertEqual(result["stage"], "verifying")
self.assertEqual(result["reason"], "verify_mismatch")
self.assertIn("legacy", result["error"].lower())
def test_verify_not_found(self):
"""verify 階段 device disappearrescan 找不到)→ verify_not_found."""
legacy_dev = FakeDeviceDescriptor(usb_port_id=1, product_id=0x100, firmware="KDP")
# 升級時走 loader → flashing 都 OK、verify 階段 scan 回空device 還沒 re-enumerate
with self.stub_scan_returning([legacy_dev], [legacy_dev], []):
result = bridge.handle_firmware_upgrade({"port": "1", "chip": "KL520"})
self.assertEqual(result["stage"], "verifying")
self.assertEqual(result["reason"], "verify_not_found")
def test_failure_event_carries_reason_and_raw_error(self):
"""error event 必須含 reason + raw_errorTDD §4.2 失敗欄位)."""
with self.stub_scan_returning([]):
bridge.handle_firmware_upgrade({"port": "1", "chip": "KL520"})
err_events = [e for e in self.progress_events if e["stage"] == "error"]
self.assertEqual(len(err_events), 1, "error event 應該 fire 一次")
e = err_events[0]
self.assertIn("reason", e)
self.assertIn("raw_error", e)
self.assertIn("before_version", e)
def test_chip_unsupported(self):
"""A 階段不支援 KL630 / KL730應該直接拒絕preparing/scan_not_found."""
result = bridge.handle_firmware_upgrade({"port": "1", "chip": "KL630"})
self.assertIn("error", result)
self.assertEqual(result["stage"], "preparing")
self.assertIn("KL630", result["error"])
# ── Reviewer s4補 4 個欠缺的 test case ──────────────────────────
def test_loading_stage_disconnect_during_op(self):
"""Reviewer s4 (1)loading stage 寫 loader 成功後 rescan 找不到 device.
對應 kneron_bridge.py 1753-1758 行disconnect_during_op in loading stage
Stage 序列preparing → loading → error(disconnect_during_op)。
"""
legacy_dev = FakeDeviceDescriptor(
usb_port_id=42, product_id=0x100, firmware="KDP",
)
# loader 寫成功upgrade_return=0 default、但 reboot 後 rescan 回空
# device 沒 re-enumerate 回來)
with self.stub_scan_returning([legacy_dev], []): # 第二次 scan 空
result = bridge.handle_firmware_upgrade({"port": "42", "chip": "KL520"})
self.assertEqual(result["stage"], "loading")
self.assertEqual(result["reason"], "disconnect_during_op")
self.assertIn("disappear", result["error"].lower())
# 第一個 upgrade_call 是 loader成功、應有 1 個 call
self.assertEqual(len(self.fake_lib.upgrade_calls), 1)
def test_loading_stage_reconnect_failed(self):
"""Reviewer s4 (2)loading stage 寫 loader 成功、rescan 找到 device、但 reconnect 失敗.
對應 kneron_bridge.py 1759-1765 行connect_failed in loading stage、reconnect 失敗)。
Stage 序列preparing → loading → error(connect_failed)。
"""
legacy_dev = FakeDeviceDescriptor(
usb_port_id=42, product_id=0x100, firmware="KDP",
)
post_loader_dev = FakeDeviceDescriptor(
usb_port_id=42, product_id=0x100, firmware="USB Boot Loader",
)
# 第一次 connectpreparingOK第二次 connectreconnect after loader失敗
call_count = [0]
original_connect = self.fake_lib.kp_connect_devices
def maybe_fail_connect(n, ports_ptr, status_ptr):
call_count[0] += 1
if call_count[0] == 2:
# 第二次 connect 失敗
status_ptr._obj.value = -5
return 0 # NULL handle
return original_connect(n, ports_ptr, status_ptr)
self.fake_lib.kp_connect_devices = maybe_fail_connect
with self.stub_scan_returning([legacy_dev], [post_loader_dev]):
result = bridge.handle_firmware_upgrade({"port": "42", "chip": "KL520"})
self.assertEqual(result["stage"], "loading")
self.assertEqual(result["reason"], "connect_failed")
self.assertIn("reconnect", result["error"].lower())
def test_failure_event_full_extra_fields(self):
"""Reviewer s4 (3)error event 必須含 TDD §4.2 完整失敗欄位.
TDD §4.2 列出 error event extra dict 應含:
error / reason / raw_error / before_version
本 test 驗 caller 的 _fw_handle_failure 確實組裝這些欄位。
"""
legacy_dev = FakeDeviceDescriptor(
usb_port_id=99, product_id=0x100, firmware="KDP1.5",
)
# 讓 loader write 失敗、確保走進 _fw_handle_failure
self.fake_lib.upgrade_return = -7
with self.stub_scan_returning([legacy_dev]):
result = bridge.handle_firmware_upgrade({"port": "99", "chip": "KL520"})
err_events = [e for e in self.progress_events if e["stage"] == "error"]
self.assertEqual(len(err_events), 1)
e = err_events[0]
# TDD §4.2 必填欄位
for field in ("error", "reason", "raw_error", "before_version"):
self.assertIn(field, e, f"error event missing field: {field}")
# before_version 應該抓到 scan 階段的 firmware string
self.assertEqual(e["before_version"], "KDP1.5")
# raw_error 應該帶 SDK error context包含 ret code / 函式名)
self.assertIn("loader", e["raw_error"].lower())
# ── Reviewer s4 (4)ctypes binding 簽名測試 ──────────────────────
class TestCtypesBindingSignatures(unittest.TestCase):
"""驗證 _fw_load_libkplus 設對 argtypes / restype.
在 mock test 階段我們繞過 _fw_load_libkplus、直接餵 FakeLib
但實機跑時 binding 簽名錯會在 first call 拋 ctypes.ArgumentError 或更糟
silently corrupt memory。本 test 用一個 mock CDLL object 跑 _fw_load_libkplus、
驗它對每個 C 符號設了正確的 argtypes / restype。
"""
def test_libkplus_binding_signatures(self):
"""_fw_load_libkplus 對所有 C 符號設了正確 argtypes / restype."""
import ctypes
# Mock CDLL紀錄 argtypes / restype 設定、不執行真實 lib
class MockSymbol:
def __init__(self, name):
self.name = name
self.argtypes = None
self.restype = None
class MockCDLL:
def __init__(self, *args, **kwargs):
self.kp_connect_devices = MockSymbol("kp_connect_devices")
self.kp_set_timeout = MockSymbol("kp_set_timeout")
self.kp_load_firmware_from_file = MockSymbol("kp_load_firmware_from_file")
self.kp_update_kdp_firmware_from_files = MockSymbol(
"kp_update_kdp_firmware_from_files"
)
self.kp_disconnect_devices = MockSymbol("kp_disconnect_devices")
self.kp_error_string = MockSymbol("kp_error_string")
def __getattr__(self, name):
# 任何沒設的符號回 MockSymbolhasattr check 用)
sym = MockSymbol(name)
setattr(self, name, sym)
return sym
# Mock importlib + os.path.isfile + ctypes.CDLL
mock_spec = mock.MagicMock()
mock_spec.submodule_search_locations = ["/fake/kp_dir"]
with mock.patch("importlib.util.find_spec", return_value=mock_spec), \
mock.patch("os.path.isfile", return_value=True), \
mock.patch.object(ctypes, "CDLL", side_effect=lambda p: MockCDLL()):
lib = bridge._fw_load_libkplus()
# 驗 kp_connect_devices(int, c_int*, c_int*) -> c_void_p
self.assertEqual(
lib.kp_connect_devices.argtypes,
[ctypes.c_int, ctypes.POINTER(ctypes.c_int), ctypes.POINTER(ctypes.c_int)],
)
self.assertEqual(lib.kp_connect_devices.restype, ctypes.c_void_p)
# 驗 kp_set_timeout(c_void_p, c_int) -> None
self.assertEqual(lib.kp_set_timeout.argtypes, [ctypes.c_void_p, ctypes.c_int])
self.assertIsNone(lib.kp_set_timeout.restype)
# 驗 kp_load_firmware_from_file(c_void_p, c_char_p, c_char_p) -> c_int
self.assertEqual(
lib.kp_load_firmware_from_file.argtypes,
[ctypes.c_void_p, ctypes.c_char_p, ctypes.c_char_p],
)
self.assertEqual(lib.kp_load_firmware_from_file.restype, ctypes.c_int)
# 驗 kp_update_kdp_firmware_from_files(c_void_p, c_char_p, c_char_p, c_bool) -> c_int
self.assertEqual(
lib.kp_update_kdp_firmware_from_files.argtypes,
[ctypes.c_void_p, ctypes.c_char_p, ctypes.c_char_p, ctypes.c_bool],
)
self.assertEqual(lib.kp_update_kdp_firmware_from_files.restype, ctypes.c_int)
# 驗 kp_disconnect_devices(c_void_p) -> c_int
self.assertEqual(lib.kp_disconnect_devices.argtypes, [ctypes.c_void_p])
self.assertEqual(lib.kp_disconnect_devices.restype, ctypes.c_int)
# 驗 kp_error_string(c_int) -> c_char_p若存在
self.assertEqual(lib.kp_error_string.argtypes, [ctypes.c_int])
self.assertEqual(lib.kp_error_string.restype, ctypes.c_char_p)
# ── Timeout 測試 ──────────────────────────────────────────────────────
class TestFirmwareUpgradeTimeout(FirmwareUpgradeTestBase):
def test_timeout_kl520(self):
"""KL520 升級 > 60s 撞 timeout → reason=timeout.
Mock time.monotonic 讓每次 call 都讀到一個跳很快的 clock
確保第二個 stage check 之前就撞 timeout60s
"""
legacy_dev = FakeDeviceDescriptor(usb_port_id=1, product_id=0x100, firmware="KDP")
# 第一次 call 回 0start_ts、第二次起回 99s撞 60s timeout
clock_values = iter([0.0, 0.0, 99.0, 99.0, 99.0, 99.0, 99.0, 99.0])
def fake_monotonic():
try:
return next(clock_values)
except StopIteration:
return 99.0
monotonic_patch = mock.patch.object(
bridge.time, "monotonic", side_effect=fake_monotonic
)
with monotonic_patch, self.stub_scan_returning([legacy_dev]):
result = bridge.handle_firmware_upgrade({"port": "1", "chip": "KL520"})
self.assertEqual(result["reason"], "timeout",
msg=f"expected timeout, got: {result}")
self.assertIn(result["stage"], ("preparing", "loading", "flashing", "verifying"))
def test_timeout_kl720_uses_200s_bound(self):
"""KL720 用 200s timeoutKL520 60s 不適用)."""
# 直接驗 constant 是 200防止後續誤改
self.assertEqual(bridge.KL720_UPGRADE_TIMEOUT_S, 200)
self.assertEqual(bridge.KL520_UPGRADE_TIMEOUT_S, 60)
# ── Graceful shutdown (SIGTERM) 拒絕測試 ─────────────────────────────
class TestFirmwareUpgradeGracefulShutdown(unittest.TestCase):
"""Test AC-FW-1.9:升級進行中收到 SIGTERM 不應立即退出."""
def setUp(self):
# 確保旗標歸零
bridge._firmware_upgrade_in_progress = False
bridge._firmware_upgrade_start_ts = 0.0
def tearDown(self):
try:
bridge._fw_unregister_sigterm_handler()
except Exception:
pass
bridge._firmware_upgrade_in_progress = False
@unittest.skipIf(sys.platform == "win32", "SIGTERM not on Windows")
def test_sigterm_rejected_during_upgrade(self):
"""升級進行中SIGTERM handler 拒絕並 push shutdown_rejected event."""
import signal
bridge._firmware_upgrade_in_progress = True
start_ts = time.monotonic()
bridge._firmware_upgrade_start_ts = start_ts
# 攔截 stderr
capture = io.StringIO()
bridge._fw_register_sigterm_handler(start_ts)
with mock.patch.object(sys, "stderr", capture):
os.kill(os.getpid(), signal.SIGTERM)
# 給 signal handler 一點時間執行
time.sleep(0.05)
# 驗證 process 沒退出test 還在跑、能讀到 stderr
output = capture.getvalue()
# 應該找到 shutdown_rejected event
self.assertIn("shutdown_rejected", output,
msg=f"expected shutdown_rejected in stderr, got: {output}")
# 解析 JSON 驗 schema
for line in output.strip().split("\n"):
if not line.strip():
continue
try:
ev = json.loads(line)
except json.JSONDecodeError:
continue
if ev.get("event") == "shutdown_rejected":
self.assertEqual(ev["reason"], "firmware_upgrade_in_progress")
self.assertEqual(ev["task"], "firmware_upgrade")
self.assertIn("elapsed_ms", ev)
return
self.fail("shutdown_rejected event 沒找到")
@unittest.skipIf(sys.platform == "win32", "SIGTERM not on Windows")
def test_sigterm_handler_unregistered_after_upgrade(self):
"""升級結束後 SIGTERM handler 應該還原(避免影響後續 server graceful shutdown."""
import signal
# 預設 handlerpython default 是 SIG_DFL
prev = signal.signal(signal.SIGTERM, signal.SIG_DFL)
signal.signal(signal.SIGTERM, prev)
# register
bridge._fw_register_sigterm_handler(time.monotonic())
# 確認 handler 已換
current = signal.signal(signal.SIGTERM, signal.SIG_DFL)
self.assertNotEqual(current, signal.SIG_DFL,
msg="handler should be installed during upgrade")
# 重新 install 後 unregister
bridge._fw_register_sigterm_handler(time.monotonic())
bridge._fw_unregister_sigterm_handler()
# unregister 後 handler 應還原(不再是我們的 wrapper
# 注意unregister 後可能是原 handler 或 SIG_DFL、我們的 wrapper 不該再生效
bridge._firmware_upgrade_in_progress = False
# 這個測試重點是 unregister 不報錯、且 _fw_original_sigterm_handler 已歸 None
self.assertIsNone(bridge._fw_original_sigterm_handler)
# ── _fw_classify_legacy 邏輯測試 ──────────────────────────────────────
class TestClassifyLegacy(unittest.TestCase):
def test_kl720_legacy_by_product_id(self):
self.assertTrue(bridge._fw_classify_legacy("any", 0x0200))
def test_kl520_legacy_by_firmware_string(self):
self.assertTrue(bridge._fw_classify_legacy("KDP", 0x0100))
def test_kl520_kdp2_not_legacy(self):
self.assertFalse(bridge._fw_classify_legacy("KDP2", 0x0100))
self.assertFalse(bridge._fw_classify_legacy("KDP2.5", 0x0100))
def test_kl720_kdp2_not_legacy(self):
self.assertFalse(bridge._fw_classify_legacy("KDP2", 0x0720))
# ── Reviewer M3 + s3firmware 字串覆蓋擴展 ────────────────────
# 原本 substring match `"KDP" in fw and "KDP2" not in fw` 對 USB Boot /
# Loader / 空字串 / KDP3+ 等情境覆蓋不夠或會誤判、改用顯式 enumeration
# + prefix 比對表後、以下 case 必須通過:
def test_kl520_legacy_empty_firmware_string(self):
"""部分 USB Boot state device 不回 firmware string、應視為 legacy."""
self.assertTrue(bridge._fw_classify_legacy("", 0x0100))
self.assertTrue(bridge._fw_classify_legacy(None, 0x0100))
def test_kl520_legacy_usb_boot_strings(self):
"""USB Boot / Loader / Bootloader 等 legacy state 字串都應視為 legacy."""
for fw in ("USB Boot", "USB Boot Loader", "Loader", "Bootloader",
"USB BOOT", "loader", "BOOTLOADER"):
with self.subTest(firmware=fw):
self.assertTrue(
bridge._fw_classify_legacy(fw, 0x0100),
f"firmware={fw!r} should be classified as legacy",
)
def test_kl520_legacy_kdp1_variants(self):
"""KDP1 / KDP1.x / KDP1 space 等版本字串都應視為 legacy."""
for fw in ("KDP1", "KDP1.0", "KDP1.5", "KDP1 v1.0", "kdp1.5"):
with self.subTest(firmware=fw):
self.assertTrue(
bridge._fw_classify_legacy(fw, 0x0100),
f"firmware={fw!r} should be classified as legacy",
)
def test_kdp3_kdp4_not_legacy(self):
"""Reviewer s3KDP3 / KDP4+(未來 firmware不該被 substring match 誤判 legacy."""
# 原本 substring match `"KDP" in fw and "KDP2" not in fw` 對 KDP3.0 會誤判 legacy
# 改用顯式 prefix 比對表後、KDP3 / KDP4 應視為 modern firmware
for fw in ("KDP3", "KDP3.0", "KDP3.5", "KDP4", "KDP4.2", "KDP9"):
with self.subTest(firmware=fw):
self.assertFalse(
bridge._fw_classify_legacy(fw, 0x0100),
f"firmware={fw!r} (modern KDP3+) should NOT be classified as legacy",
)
def test_unknown_firmware_default_not_legacy(self):
"""未知 firmware 字串保守 default = 不走 loader (避免誤觸 brick device)."""
# 例:未來 firmware 用全新命名 → 不確定走 loader 是否會 brick、保守不走
# 若 mis-classify、verify 階段會偵測 verify_mismatch、不致 brick
for fw in ("NEF", "K3", "FOO", "RANDOM"):
with self.subTest(firmware=fw):
self.assertFalse(
bridge._fw_classify_legacy(fw, 0x0100),
f"firmware={fw!r} (unknown) should default to not-legacy",
)
# ── _fw_eta_ms 邏輯測試 ──────────────────────────────────────────────
class TestEtaEstimation(unittest.TestCase):
def test_eta_decreases_through_stages(self):
kl520_etas = [
bridge._fw_eta_ms("KL520", s)
for s in ("preparing", "loading", "flashing", "verifying")
]
# ETA 應該遞減
self.assertEqual(kl520_etas, sorted(kl520_etas, reverse=True))
def test_kl720_eta_larger_than_kl520(self):
self.assertGreater(
bridge._fw_eta_ms("KL720", "preparing"),
bridge._fw_eta_ms("KL520", "preparing"),
)
# ── _resolve_firmware_paths_full 測試(用真實檔案)─────────────────────
class TestResolveFirmwarePathsFull(unittest.TestCase):
def test_kl520_has_loader(self):
"""KL520 升級後應該找到 scpu/ncpu/loader 三個檔案."""
paths = bridge._resolve_firmware_paths_full("KL520")
self.assertIsNotNone(paths["scpu"], "fw_scpu.bin missing")
self.assertIsNotNone(paths["ncpu"], "fw_ncpu.bin missing")
self.assertIsNotNone(paths["loader"],
"fw_loader.bin missing — required for KDP1→KDP2")
self.assertTrue(os.path.exists(paths["scpu"]))
self.assertTrue(os.path.exists(paths["loader"]))
def test_kl720_has_scpu_ncpu(self):
paths = bridge._resolve_firmware_paths_full("KL720")
self.assertIsNotNone(paths["scpu"])
self.assertIsNotNone(paths["ncpu"])
# KL720 沒 loader.bin 預期、不檢查
self.assertTrue(os.path.exists(paths["scpu"]))
def test_unknown_chip_returns_none(self):
paths = bridge._resolve_firmware_paths_full("KL999")
self.assertIsNone(paths["scpu"])
self.assertIsNone(paths["ncpu"])
# ── _fw_emit_progress JSON schema 測試 ───────────────────────────────
class TestEmitProgress(unittest.TestCase):
def test_emit_writes_json_line_to_stderr(self):
capture = io.StringIO()
with mock.patch.object(sys, "stderr", capture):
bridge._fw_emit_progress(
"flashing",
message="testing",
elapsed_ms=1234,
eta_ms=5678,
)
line = capture.getvalue().strip()
ev = json.loads(line)
self.assertEqual(ev["event"], "firmware_progress")
self.assertEqual(ev["stage"], "flashing")
self.assertEqual(ev["percent"], 50)
self.assertEqual(ev["message"], "testing")
self.assertEqual(ev["elapsed_ms"], 1234)
self.assertEqual(ev["eta_ms"], 5678)
def test_emit_with_extra_includes_failure_fields(self):
capture = io.StringIO()
with mock.patch.object(sys, "stderr", capture):
bridge._fw_emit_progress(
"error",
message="bad",
elapsed_ms=100,
extra={"reason": "scan_not_found", "raw_error": "details"},
)
ev = json.loads(capture.getvalue().strip())
self.assertEqual(ev["stage"], "error")
self.assertEqual(ev["percent"], -1)
self.assertEqual(ev["reason"], "scan_not_found")
self.assertEqual(ev["raw_error"], "details")
if __name__ == "__main__":
unittest.main(verbosity=2)