forked from masonhuang/KNEO-Academy
- Remove unused files (model_controller.py, model_service.py, device_connection_popup.py) - Clean up commented code in device_service.py, device_popup.py, config.py - Update docstrings and comments across all modules - Improve code organization and readability
300 lines
11 KiB
Python
300 lines
11 KiB
Python
"""
|
|
device_controller.py - Device Controller
|
|
|
|
This module handles Kneron device connection, selection, and management.
|
|
It provides functionality for scanning, connecting, and disconnecting devices.
|
|
"""
|
|
|
|
from PyQt5.QtWidgets import QWidget, QListWidgetItem
|
|
from PyQt5.QtGui import QPixmap, QIcon
|
|
from PyQt5.QtCore import Qt
|
|
import os
|
|
import kp
|
|
|
|
from src.services.device_service import check_available_device
|
|
from src.config import UXUI_ASSETS, DongleModelMap, DongleIconMap, FW_DIR
|
|
|
|
|
|
class DeviceController:
|
|
"""
|
|
Controller class for managing Kneron device connections.
|
|
|
|
Attributes:
|
|
main_window: Reference to the main application window
|
|
selected_device: Currently selected device
|
|
connected_devices: List of connected devices
|
|
device_group: Kneron Plus device group for connected devices
|
|
"""
|
|
|
|
def __init__(self, main_window):
|
|
"""
|
|
Initialize the DeviceController.
|
|
|
|
Args:
|
|
main_window: Reference to the main application window.
|
|
"""
|
|
self.main_window = main_window
|
|
self.selected_device = None
|
|
self.connected_devices = []
|
|
self.device_group = None # Stores connected device group
|
|
|
|
def refresh_devices(self):
|
|
"""
|
|
Refresh the list of connected devices.
|
|
|
|
Scans for available Kneron devices and updates the UI.
|
|
|
|
Returns:
|
|
bool: True if devices were found, False otherwise.
|
|
"""
|
|
try:
|
|
print("[CTRL] Refreshing devices...")
|
|
device_descriptors = check_available_device()
|
|
print("[CTRL] check_available_device returned")
|
|
print(f"[CTRL] device_descriptors type: {type(device_descriptors)}")
|
|
|
|
# Access attributes separately for debugging
|
|
print("[CTRL] Accessing device_descriptor_number...")
|
|
desc_num = device_descriptors.device_descriptor_number
|
|
print(f"[CTRL] device_descriptor_number: {desc_num}")
|
|
|
|
self.connected_devices = []
|
|
|
|
if device_descriptors.device_descriptor_number > 0:
|
|
print("[DEBUG] Starting parse_and_store_devices...")
|
|
self.parse_and_store_devices(device_descriptors.device_descriptor_list)
|
|
print("[DEBUG] parse_and_store_devices completed")
|
|
print("[DEBUG] Starting display_devices...")
|
|
self.display_devices(device_descriptors.device_descriptor_list)
|
|
print("[DEBUG] display_devices completed")
|
|
return True
|
|
else:
|
|
print("[DEBUG] No devices detected")
|
|
self.main_window.show_no_device_gif()
|
|
return False
|
|
except Exception as e:
|
|
print(f"Error in refresh_devices: {e}")
|
|
import traceback
|
|
print(traceback.format_exc())
|
|
return False
|
|
|
|
def parse_and_store_devices(self, devices):
|
|
"""
|
|
Parse device information and store it in connected_devices list.
|
|
|
|
Args:
|
|
devices: List of device descriptors from the scanner.
|
|
"""
|
|
for device in devices:
|
|
try:
|
|
product_id = hex(device.product_id).strip().lower()
|
|
dongle = DongleModelMap.get(product_id, "unknown")
|
|
device.dongle = dongle
|
|
|
|
new_device = {
|
|
'usb_port_id': device.usb_port_id,
|
|
'product_id': device.product_id,
|
|
'kn_number': device.kn_number,
|
|
'dongle': dongle
|
|
}
|
|
|
|
existing_device_index = next(
|
|
(index for (index, d) in enumerate(self.connected_devices)
|
|
if d['usb_port_id'] == new_device['usb_port_id']),
|
|
None
|
|
)
|
|
|
|
if existing_device_index is not None:
|
|
self.connected_devices[existing_device_index] = new_device
|
|
else:
|
|
self.connected_devices.append(new_device)
|
|
except Exception as e:
|
|
print(f"Error processing device: {e}")
|
|
|
|
def display_devices(self, devices):
|
|
"""
|
|
Display the connected devices in the UI list widget.
|
|
|
|
Args:
|
|
devices: List of device descriptors to display.
|
|
"""
|
|
try:
|
|
if not hasattr(self.main_window, 'device_list_widget'):
|
|
print("Warning: main_window does not have device_list_widget attribute")
|
|
return
|
|
|
|
self.main_window.device_list_widget.clear()
|
|
|
|
if not devices:
|
|
print("No devices to display")
|
|
return
|
|
|
|
for device in devices:
|
|
try:
|
|
product_id = hex(device.product_id).strip().lower()
|
|
icon_path = os.path.join(UXUI_ASSETS, DongleIconMap.get(product_id, "unknown_dongle.png"))
|
|
|
|
item = QListWidgetItem()
|
|
item.setData(Qt.UserRole, device)
|
|
|
|
pixmap = QPixmap(icon_path)
|
|
icon = QIcon(pixmap) # Convert QPixmap to QIcon
|
|
item.setIcon(icon)
|
|
|
|
# Set device name as the display text
|
|
dongle_name = DongleModelMap.get(product_id, "Unknown Device")
|
|
item.setText(f"{dongle_name} (KN: {device.kn_number})")
|
|
|
|
self.main_window.device_list_widget.addItem(item)
|
|
print(f"Added device to list: {dongle_name} (KN: {device.kn_number})")
|
|
except Exception as e:
|
|
print(f"Error adding device to list: {e}")
|
|
except Exception as e:
|
|
print(f"Error in display_devices: {e}")
|
|
|
|
def get_devices(self):
|
|
"""
|
|
Get the list of connected devices.
|
|
|
|
Returns:
|
|
list: List of device descriptors, or empty list if no devices found.
|
|
"""
|
|
try:
|
|
device_descriptors = check_available_device()
|
|
if device_descriptors.device_descriptor_number > 0:
|
|
# Parse and store devices to ensure connected_devices is updated
|
|
self.parse_and_store_devices(device_descriptors.device_descriptor_list)
|
|
return device_descriptors.device_descriptor_list
|
|
return []
|
|
except Exception as e:
|
|
print(f"Error in get_devices: {e}")
|
|
return []
|
|
|
|
def get_selected_device(self):
|
|
"""
|
|
Get the currently selected device.
|
|
|
|
Returns:
|
|
The selected device object, or None if no device is selected.
|
|
"""
|
|
return self.selected_device
|
|
|
|
def select_device(self, device, list_item, list_widget):
|
|
"""
|
|
Select a device (does not automatically connect or load firmware).
|
|
|
|
Args:
|
|
device: The device object to select.
|
|
list_item: The QListWidgetItem representing the device.
|
|
list_widget: The QListWidget containing the devices.
|
|
"""
|
|
self.selected_device = device
|
|
print("Selected dongle:", device)
|
|
|
|
# Update visual selection for list items
|
|
for index in range(list_widget.count()):
|
|
item = list_widget.item(index)
|
|
widget = list_widget.itemWidget(item)
|
|
if widget: # Check if widget exists before setting style
|
|
widget.setStyleSheet("background: none;")
|
|
|
|
list_item_widget = list_widget.itemWidget(list_item)
|
|
if list_item_widget: # Check if widget exists before setting style
|
|
list_item_widget.setStyleSheet("background-color: lightblue;")
|
|
|
|
def connect_device(self):
|
|
"""
|
|
Connect to the selected device and upload firmware.
|
|
|
|
Returns:
|
|
bool: True if connection successful, False otherwise.
|
|
"""
|
|
if not self.selected_device:
|
|
print("No device selected, cannot connect")
|
|
return False
|
|
|
|
try:
|
|
# Get USB port ID
|
|
if isinstance(self.selected_device, dict):
|
|
usb_port_id = self.selected_device.get("usb_port_id", 0)
|
|
product_id = self.selected_device.get("product_id", 0)
|
|
else:
|
|
usb_port_id = getattr(self.selected_device, "usb_port_id", 0)
|
|
product_id = getattr(self.selected_device, "product_id", 0)
|
|
|
|
# Convert product_id to lowercase hex string
|
|
if isinstance(product_id, int):
|
|
product_id = hex(product_id).lower()
|
|
elif isinstance(product_id, str) and not product_id.startswith('0x'):
|
|
try:
|
|
product_id = hex(int(product_id, 0)).lower()
|
|
except ValueError:
|
|
pass
|
|
|
|
# Map product_id to dongle type
|
|
dongle = DongleModelMap.get(product_id, "unknown")
|
|
print(f"Connecting device: product_id={product_id}, mapped to={dongle}")
|
|
|
|
# Set firmware paths
|
|
scpu_path = os.path.join(FW_DIR, dongle, "fw_scpu.bin")
|
|
ncpu_path = os.path.join(FW_DIR, dongle, "fw_ncpu.bin")
|
|
|
|
# Check if firmware files exist
|
|
if not os.path.exists(scpu_path) or not os.path.exists(ncpu_path):
|
|
print(f"Firmware files not found: {scpu_path} or {ncpu_path}")
|
|
return False
|
|
|
|
# Connect to device
|
|
print('[Connecting device]')
|
|
self.device_group = kp.core.connect_devices(usb_port_ids=[usb_port_id])
|
|
print(' - Connection successful')
|
|
|
|
# Upload firmware
|
|
print('[Uploading firmware]')
|
|
kp.core.load_firmware_from_file(
|
|
device_group=self.device_group,
|
|
scpu_fw_path=scpu_path,
|
|
ncpu_fw_path=ncpu_path
|
|
)
|
|
print(' - Upload successful')
|
|
|
|
return True
|
|
except Exception as e:
|
|
print(f"Error connecting to device: {e}")
|
|
# Try to clean up on error
|
|
if self.device_group:
|
|
try:
|
|
kp.core.disconnect_devices(device_group=self.device_group)
|
|
except Exception:
|
|
pass
|
|
self.device_group = None
|
|
return False
|
|
|
|
def disconnect_device(self):
|
|
"""
|
|
Disconnect from the currently connected device.
|
|
|
|
Returns:
|
|
bool: True if disconnection successful, False otherwise.
|
|
"""
|
|
if self.device_group:
|
|
try:
|
|
print('[Disconnecting device]')
|
|
kp.core.disconnect_devices(device_group=self.device_group)
|
|
print(' - Disconnected')
|
|
self.device_group = None
|
|
return True
|
|
except Exception as e:
|
|
print(f"Error disconnecting device: {e}")
|
|
self.device_group = None
|
|
return False
|
|
return True # If no connected device, treat as success
|
|
|
|
def get_device_group(self):
|
|
"""
|
|
Get the connected device group.
|
|
|
|
Returns:
|
|
The Kneron Plus device group object, or None if not connected.
|
|
"""
|
|
return self.device_group |