Fix: resolve 5 bugs found during project onboarding health check

- custom_inference_worker: reuse existing device_group from DeviceController
  to avoid double kp.connect_devices() conflict on same USB port
- custom_inference_worker: add TYPE_CHECKING guard for kp type annotations
  to prevent potential NameError at import time
- utilities_screen: replace missing back_arrow.png with text arrow (←)
- utilities_screen: add set_device_controller() so AppController can inject
  MainWindow's shared DeviceController instance
- main.py: wire UtilitiesScreen to share MainWindow's DeviceController
- video_thread: emit camera_error_signal on failure and max-retry exhaustion
- media_controller: connect camera_error_signal and display error on canvas
- media_panel: fix pause button using wrong delete icon; use video_normal SVG

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
abin 2026-04-07 14:33:37 +08:00
parent 9b11e90505
commit 7e323cf3e1
7 changed files with 80 additions and 19 deletions

View File

@ -67,6 +67,10 @@ class AppController:
self.main_window = MainWindow()
self.stack.addWidget(self.main_window)
# Share MainWindow's DeviceController with UtilitiesScreen so both
# screens reflect the same device connection state.
self.utilities_screen.set_device_controller(self.main_window.device_controller)
def connect_signals(self):
"""
Connect signals between screens for navigation.

View File

@ -11,7 +11,6 @@ import cv2
import json
from PyQt5.QtWidgets import QMessageBox, QApplication
from PyQt5.QtCore import QTimer, Qt
import kp
from src.models.inference_worker import InferenceWorkerThread
from src.models.custom_inference_worker import CustomInferenceWorkerThread
@ -187,6 +186,7 @@ class InferenceController:
# Upload model to device
if device_group:
try:
import kp
print('[Uploading model]')
self.model_descriptor = kp.core.load_model_from_file(
device_group=device_group,
@ -364,6 +364,9 @@ class InferenceController:
input_params["custom_ncpu_path"] = tool_config.get("custom_ncpu_path")
input_params["custom_labels"] = tool_config.get("custom_labels")
# Pass existing device_group to avoid double connection in the worker
input_params["device_group"] = self.device_controller.get_device_group()
# Get device-related settings
selected_device = self.device_controller.get_selected_device()
if selected_device:

View File

@ -66,6 +66,7 @@ class MediaController:
print("Camera signal connected successfully")
except Exception as e:
print(f"Error connecting camera signal: {e}")
self.video_thread.camera_error_signal.connect(self.handle_camera_error)
# Start camera thread
self.video_thread.start()
@ -163,6 +164,20 @@ class MediaController:
import traceback
print(traceback.format_exc())
def handle_camera_error(self, error_msg):
"""
Handle camera error signal from VideoThread.
Args:
error_msg (str): Error message describing what went wrong.
"""
print(f"Camera error: {error_msg}")
if hasattr(self.main_window, 'canvas_label'):
self.main_window.canvas_label.setText(error_msg)
self.main_window.canvas_label.setAlignment(Qt.AlignCenter)
self.video_thread = None
self._signal_was_connected = False
def reconnect_camera_signal(self):
"""Reconnect the camera signal if it was previously disconnected"""
if self.video_thread is not None and self._signal_was_connected:

View File

@ -4,16 +4,17 @@ custom_inference_worker.py - Custom Inference Worker
This module provides a worker thread for running inference using user-uploaded
custom models. It uses YOLO V5 pre/post-processing logic for object detection.
"""
from __future__ import annotations
import os
import time
import queue
import cv2
import numpy as np
from typing import List
from typing import List, TYPE_CHECKING
from PyQt5.QtCore import QThread, pyqtSignal
if TYPE_CHECKING:
import kp
from kp.KPBaseClass.ValueBase import ValueRepresentBase
# COCO dataset class names (80 classes)
@ -31,7 +32,7 @@ COCO_CLASSES = [
]
class ExampleBoundingBox(ValueRepresentBase):
class ExampleBoundingBox:
"""Bounding box descriptor."""
def __init__(self,
@ -59,7 +60,7 @@ class ExampleBoundingBox(ValueRepresentBase):
}
class ExampleYoloResult(ValueRepresentBase):
class ExampleYoloResult:
"""YOLO output result descriptor."""
def __init__(self,
@ -354,6 +355,10 @@ class CustomInferenceWorkerThread(QThread):
"""
Initialize device, upload firmware and model.
If a device_group is already provided in input_params (connected by
DeviceController), reuse it and skip connect_devices to avoid double
connection conflicts with the Kneron SDK.
Returns:
bool: True if initialization successful, False otherwise.
"""
@ -374,12 +379,20 @@ class CustomInferenceWorkerThread(QThread):
print("Missing required file paths")
return False
# Connect to device
import kp
# Reuse existing device_group if provided to avoid double connection
existing_device_group = self.input_params.get("device_group")
if existing_device_group is not None:
print('[Reusing existing device connection]')
self.device_group = existing_device_group
else:
print('[Connecting device]')
self.device_group = kp.core.connect_devices(usb_port_ids=[port_id])
kp.core.set_timeout(device_group=self.device_group, milliseconds=5000)
print(' - Connection successful')
kp.core.set_timeout(device_group=self.device_group, milliseconds=5000)
# Upload firmware
print('[Uploading firmware]')
kp.core.load_firmware_from_file(self.device_group, scpu_path, ncpu_path)
@ -421,6 +434,7 @@ class CustomInferenceWorkerThread(QThread):
img_processed, original_width, original_height = preprocess_frame(frame)
# 建立推論描述符
import kp
descriptor = kp.GenericImageInferenceDescriptor(
model_id=self.model_descriptor.models[0].id,
inference_number=0,
@ -532,11 +546,20 @@ class CustomInferenceWorkerThread(QThread):
self.quit()
def cleanup(self):
"""Clean up resources and disconnect device."""
"""Clean up resources.
Only disconnects device if this worker created the connection itself
(i.e. no device_group was provided via input_params).
"""
try:
if self.device_group is not None:
owned_by_worker = self.input_params.get("device_group") is None
if owned_by_worker:
import kp
kp.core.disconnect_devices(self.device_group)
print('[Device disconnected]')
else:
print('[Device connection owned by DeviceController, skipping disconnect]')
self.device_group = None
except Exception as e:
print(f"Error cleaning up resources: {e}")

View File

@ -125,9 +125,10 @@ class VideoThread(QThread):
height, width, channel = frame.shape
bytes_per_line = channel * width
qt_image = QImage(frame.data, width, height, bytes_per_line, QImage.Format_RGB888)
self.change_pixmap_signal.emit(qt_image)
self.change_pixmap_signal.emit(qt_image.copy())
else:
print("Unable to read camera frame, camera may be disconnected")
self.camera_error_signal.emit("相機連線中斷,嘗試重新連線...")
break
# Release camera resources
@ -141,6 +142,7 @@ class VideoThread(QThread):
if self._camera_open_attempts >= self._max_attempts:
print("Maximum attempts reached, unable to open camera")
self.camera_error_signal.emit("無法開啟相機,請確認相機是否連接")
def stop(self):
"""Stop the video capture thread."""

View File

@ -25,7 +25,7 @@ def create_media_panel(parent, media_controller, file_service):
media_controller.take_screenshot),
('upload file', os.path.join(assets_path, "Assets_svg/bt_function_upload_normal.svg").replace('\\', '/'),
file_service.upload_file),
('pause/resume', os.path.join(assets_path, "Assets_svg/btn_result_image_delete_hover.svg").replace('\\', '/'),
('pause/resume', os.path.join(assets_path, "Assets_svg/bt_function_video_normal.svg").replace('\\', '/'),
lambda: toggle_pause_button(parent, media_controller)),
('voice', os.path.join(assets_path, "Assets_svg/ic_recording_voice.svg").replace('\\', '/'),
lambda: media_controller.record_audio(None)),

View File

@ -14,7 +14,6 @@ from PyQt5.QtWidgets import (
from PyQt5.QtCore import Qt, pyqtSignal, QTimer
from PyQt5.QtGui import QPixmap, QFont, QIcon, QColor
import os
import kp
from src.config import UXUI_ASSETS, WINDOW_SIZE, BACKGROUND_COLOR, DongleModelMap
from src.controllers.device_controller import DeviceController
from src.services.device_service import check_available_device
@ -56,6 +55,18 @@ class UtilitiesScreen(QWidget):
self.current_page = "utilities" # Track current page: "utilities" or "purchased_items"
self.init_ui()
def set_device_controller(self, device_controller):
"""
Replace the local DeviceController with a shared instance.
Call this from AppController after all screens are created so that
UtilitiesScreen and MainWindow share the same device connection state.
Args:
device_controller: Shared DeviceController instance from MainWindow.
"""
self.device_controller = device_controller
def init_ui(self):
"""
Initialize the user interface.
@ -135,14 +146,14 @@ class UtilitiesScreen(QWidget):
header_layout.setContentsMargins(20, 0, 20, 0)
# Back button
back_button = QPushButton("", self)
back_button.setIcon(QIcon(os.path.join(UXUI_ASSETS, "Assets_png/back_arrow.png")))
back_button.setIconSize(QPixmap(os.path.join(UXUI_ASSETS, "Assets_png/back_arrow.png")).size())
back_button = QPushButton("", self)
back_button.setFixedSize(40, 40)
back_button.setStyleSheet("""
QPushButton {
background-color: transparent;
border: none;
color: white;
font-size: 20px;
}
QPushButton:hover {
background-color: rgba(255, 255, 255, 0.1);
@ -851,6 +862,7 @@ class UtilitiesScreen(QWidget):
firmware_version = "-"
try:
if device.is_connectable:
import kp
# Connect to device and get system info
device_group = kp.core.connect_devices(usb_port_ids=[port_id])
system_info = kp.core.get_system_info(
@ -980,6 +992,7 @@ class UtilitiesScreen(QWidget):
self.show_progress(f"Updating firmware for {device_model}...", 0)
# Connect to device
import kp
device_group = kp.core.connect_devices(usb_port_ids=[int(port_id)])
# Build firmware file paths
@ -1026,6 +1039,7 @@ class UtilitiesScreen(QWidget):
self.show_progress("Installing Kneron Device Drivers...", 0)
# List all product IDs
import kp
product_ids = [
kp.ProductId.KP_DEVICE_KL520,
kp.ProductId.KP_DEVICE_KL720_LEGACY,