812 lines
34 KiB
Python
812 lines
34 KiB
Python
from typing import Union, Tuple
|
|
import os
|
|
import sys
|
|
import argparse
|
|
import time
|
|
import threading
|
|
import queue
|
|
import numpy as np
|
|
import kp
|
|
import cv2
|
|
import time
|
|
from abc import ABC, abstractmethod
|
|
from typing import Callable, Optional, Any, Dict
|
|
|
|
|
|
class DataProcessor(ABC):
|
|
"""Abstract base class for data processors in the pipeline"""
|
|
|
|
@abstractmethod
|
|
def process(self, data: Any, *args, **kwargs) -> Any:
|
|
"""Process data and return result"""
|
|
pass
|
|
|
|
|
|
class PreProcessor(DataProcessor):
|
|
def __init__(self, resize_fn: Optional[Callable] = None,
|
|
format_convert_fn: Optional[Callable] = None):
|
|
self.resize_fn = resize_fn or self._default_resize
|
|
self.format_convert_fn = format_convert_fn or self._default_format_convert
|
|
|
|
def process(self, frame: np.ndarray, target_size: tuple, target_format: str) -> np.ndarray:
|
|
"""Main processing pipeline"""
|
|
resized = self.resize_fn(frame, target_size)
|
|
return self.format_convert_fn(resized, target_format)
|
|
|
|
def _default_resize(self, frame: np.ndarray, target_size: tuple) -> np.ndarray:
|
|
"""Default resize implementation"""
|
|
return cv2.resize(frame, target_size)
|
|
|
|
def _default_format_convert(self, frame: np.ndarray, target_format: str) -> np.ndarray:
|
|
"""Default format conversion"""
|
|
if target_format == 'BGR565':
|
|
return cv2.cvtColor(frame, cv2.COLOR_BGR2BGR565)
|
|
elif target_format == 'RGB8888':
|
|
return cv2.cvtColor(frame, cv2.COLOR_BGR2RGBA)
|
|
return frame
|
|
|
|
|
|
class PostProcessor(DataProcessor):
|
|
"""Post-processor for handling output data from inference stages"""
|
|
|
|
def __init__(self, process_fn: Optional[Callable] = None):
|
|
self.process_fn = process_fn or self._default_process
|
|
|
|
def process(self, data: Any, *args, **kwargs) -> Any:
|
|
"""Process inference output data"""
|
|
return self.process_fn(data, *args, **kwargs)
|
|
|
|
def _default_process(self, data: Any, *args, **kwargs) -> Any:
|
|
"""Default post-processing - returns data unchanged"""
|
|
return data
|
|
|
|
|
|
class MultiDongle:
|
|
# Curently, only BGR565, RGB8888, YUYV, and RAW8 formats are supported
|
|
_FORMAT_MAPPING = {
|
|
'BGR565': kp.ImageFormat.KP_IMAGE_FORMAT_RGB565,
|
|
'RGB8888': kp.ImageFormat.KP_IMAGE_FORMAT_RGBA8888,
|
|
'YUYV': kp.ImageFormat.KP_IMAGE_FORMAT_YUYV,
|
|
'RAW8': kp.ImageFormat.KP_IMAGE_FORMAT_RAW8,
|
|
# 'YCBCR422_CRY1CBY0': kp.ImageFormat.KP_IMAGE_FORMAT_YCBCR422_CRY1CBY0,
|
|
# 'YCBCR422_CBY1CRY0': kp.ImageFormat.KP_IMAGE_FORMAT_CBY1CRY0,
|
|
# 'YCBCR422_Y1CRY0CB': kp.ImageFormat.KP_IMAGE_FORMAT_Y1CRY0CB,
|
|
# 'YCBCR422_Y1CBY0CR': kp.ImageFormat.KP_IMAGE_FORMAT_Y1CBY0CR,
|
|
# 'YCBCR422_CRY0CBY1': kp.ImageFormat.KP_IMAGE_FORMAT_CRY0CBY1,
|
|
# 'YCBCR422_CBY0CRY1': kp.ImageFormat.KP_IMAGE_FORMAT_CBY0CRY1,
|
|
# 'YCBCR422_Y0CRY1CB': kp.ImageFormat.KP_IMAGE_FORMAT_Y0CRY1CB,
|
|
# 'YCBCR422_Y0CBY1CR': kp.ImageFormat.KP_IMAGE_FORMAT_Y0CBY1CR,
|
|
}
|
|
|
|
@staticmethod
|
|
def scan_devices():
|
|
"""
|
|
Scan for available Kneron devices and return their information.
|
|
|
|
Returns:
|
|
List[Dict]: List of device information containing port_id, series, and device_descriptor
|
|
"""
|
|
try:
|
|
print('[Scanning Devices]')
|
|
device_descriptors = kp.core.scan_devices()
|
|
|
|
print(device_descriptors)
|
|
|
|
if not device_descriptors:
|
|
print(' - No devices found')
|
|
return []
|
|
|
|
devices_info = []
|
|
|
|
# Handle both dict and object formats
|
|
if isinstance(device_descriptors, dict):
|
|
# Handle JSON dict format: {"0": {...}, "1": {...}}
|
|
print(f' - Found {len(device_descriptors)} device(s):')
|
|
|
|
for key, device_desc in device_descriptors.items():
|
|
# Get device series using product_id
|
|
series = MultiDongle._get_device_series(device_desc)
|
|
# Use usb_port_id from the device descriptor
|
|
port_id = device_desc.get('usb_port_id', 0)
|
|
|
|
device_info = {
|
|
'port_id': port_id,
|
|
'series': series,
|
|
'device_descriptor': device_desc
|
|
}
|
|
devices_info.append(device_info)
|
|
|
|
print(f' [{int(key)+1}] Port ID: {port_id}, Series: {series}, Product ID: {device_desc.get("product_id", "Unknown")}')
|
|
|
|
elif isinstance(device_descriptors, (list, tuple)):
|
|
# Handle list/array format
|
|
print(f' - Found {len(device_descriptors)} device(s):')
|
|
|
|
for i, device_desc in enumerate(device_descriptors):
|
|
# Get device series
|
|
series = MultiDongle._get_device_series(device_desc)
|
|
|
|
# Extract port_id based on format
|
|
if isinstance(device_desc, dict):
|
|
port_id = device_desc.get('usb_port_id', device_desc.get('port_id', 0))
|
|
else:
|
|
port_id = getattr(device_desc, 'usb_port_id', getattr(device_desc, 'port_id', 0))
|
|
|
|
device_info = {
|
|
'port_id': port_id,
|
|
'series': series,
|
|
'device_descriptor': device_desc
|
|
}
|
|
devices_info.append(device_info)
|
|
|
|
print(f' [{i+1}] Port ID: {port_id}, Series: {series}')
|
|
else:
|
|
# Handle single device or other formats
|
|
print(' - Found 1 device:')
|
|
series = MultiDongle._get_device_series(device_descriptors)
|
|
|
|
if isinstance(device_descriptors, dict):
|
|
port_id = device_descriptors.get('usb_port_id', device_descriptors.get('port_id', 0))
|
|
else:
|
|
port_id = getattr(device_descriptors, 'usb_port_id', getattr(device_descriptors, 'port_id', 0))
|
|
|
|
device_info = {
|
|
'port_id': port_id,
|
|
'series': series,
|
|
'device_descriptor': device_descriptors
|
|
}
|
|
devices_info.append(device_info)
|
|
|
|
print(f' [1] Port ID: {port_id}, Series: {series}')
|
|
|
|
return devices_info
|
|
|
|
except kp.ApiKPException as exception:
|
|
print(f'Error: scan devices fail, error msg: [{str(exception)}]')
|
|
return []
|
|
|
|
@staticmethod
|
|
def _get_device_series(device_descriptor):
|
|
"""
|
|
Extract device series from device descriptor using product_id.
|
|
|
|
Args:
|
|
device_descriptor: Device descriptor from scan_devices() - can be dict or object
|
|
|
|
Returns:
|
|
str: Device series (e.g., 'KL520', 'KL720', etc.)
|
|
"""
|
|
try:
|
|
# TODO: Check Product ID to device series mapping
|
|
product_id_mapping = {
|
|
'0x100': 'KL520',
|
|
'0x720': 'KL720',
|
|
'0x630': 'KL630',
|
|
'0x730': 'KL730',
|
|
'0x540': 'KL540',
|
|
}
|
|
|
|
# Handle dict format (from JSON)
|
|
if isinstance(device_descriptor, dict):
|
|
product_id = device_descriptor.get('product_id', '')
|
|
if product_id in product_id_mapping:
|
|
return product_id_mapping[product_id]
|
|
return f'Unknown ({product_id})'
|
|
|
|
# Handle object format (from SDK)
|
|
if hasattr(device_descriptor, 'product_id'):
|
|
product_id = device_descriptor.product_id
|
|
if isinstance(product_id, int):
|
|
product_id = hex(product_id)
|
|
if product_id in product_id_mapping:
|
|
return product_id_mapping[product_id]
|
|
return f'Unknown ({product_id})'
|
|
|
|
# Legacy chip-based detection (fallback)
|
|
if hasattr(device_descriptor, 'chip'):
|
|
chip = device_descriptor.chip
|
|
if chip == kp.ModelNefDescriptor.KP_CHIP_KL520:
|
|
return 'KL520'
|
|
elif chip == kp.ModelNefDescriptor.KP_CHIP_KL720:
|
|
return 'KL720'
|
|
elif chip == kp.ModelNefDescriptor.KP_CHIP_KL630:
|
|
return 'KL630'
|
|
elif chip == kp.ModelNefDescriptor.KP_CHIP_KL730:
|
|
return 'KL730'
|
|
elif chip == kp.ModelNefDescriptor.KP_CHIP_KL540:
|
|
return 'KL540'
|
|
|
|
# Final fallback
|
|
return 'Unknown'
|
|
|
|
except Exception as e:
|
|
print(f'Warning: Unable to determine device series: {str(e)}')
|
|
return 'Unknown'
|
|
|
|
@staticmethod
|
|
def connect_auto_detected_devices(device_count: int = None):
|
|
"""
|
|
Auto-detect and connect to available Kneron devices.
|
|
|
|
Args:
|
|
device_count: Number of devices to connect. If None, connect to all available devices.
|
|
|
|
Returns:
|
|
Tuple[kp.DeviceGroup, List[Dict]]: Device group and list of connected device info
|
|
"""
|
|
devices_info = MultiDongle.scan_devices()
|
|
|
|
if not devices_info:
|
|
raise Exception("No Kneron devices found")
|
|
|
|
# Determine how many devices to connect
|
|
if device_count is None:
|
|
device_count = len(devices_info)
|
|
else:
|
|
device_count = min(device_count, len(devices_info))
|
|
|
|
# Get port IDs for connection
|
|
port_ids = [devices_info[i]['port_id'] for i in range(device_count)]
|
|
|
|
try:
|
|
print(f'[Connecting to {device_count} device(s)]')
|
|
device_group = kp.core.connect_devices(usb_port_ids=port_ids)
|
|
print(' - Success')
|
|
|
|
connected_devices = devices_info[:device_count]
|
|
return device_group, connected_devices
|
|
|
|
except kp.ApiKPException as exception:
|
|
raise Exception(f'Failed to connect devices: {str(exception)}')
|
|
|
|
def __init__(self, port_id: list = None, scpu_fw_path: str = None, ncpu_fw_path: str = None, model_path: str = None, upload_fw: bool = False, auto_detect: bool = False, max_queue_size: int = 0):
|
|
"""
|
|
Initialize the MultiDongle class.
|
|
:param port_id: List of USB port IDs for the same layer's devices. If None and auto_detect=True, will auto-detect devices.
|
|
:param scpu_fw_path: Path to the SCPU firmware file.
|
|
:param ncpu_fw_path: Path to the NCPU firmware file.
|
|
:param model_path: Path to the model file.
|
|
:param upload_fw: Flag to indicate whether to upload firmware.
|
|
:param auto_detect: Flag to auto-detect and connect to available devices.
|
|
:param max_queue_size: Maximum size for internal queues. If 0, unlimited queues are used.
|
|
"""
|
|
self.auto_detect = auto_detect
|
|
self.connected_devices_info = []
|
|
|
|
if auto_detect:
|
|
# Auto-detect devices
|
|
devices_info = self.scan_devices()
|
|
if devices_info:
|
|
self.port_id = [device['port_id'] for device in devices_info]
|
|
self.connected_devices_info = devices_info
|
|
else:
|
|
raise Exception("No Kneron devices found for auto-detection")
|
|
else:
|
|
self.port_id = port_id or []
|
|
|
|
self.upload_fw = upload_fw
|
|
|
|
# Always store firmware paths when provided
|
|
self.scpu_fw_path = scpu_fw_path
|
|
self.ncpu_fw_path = ncpu_fw_path
|
|
self.model_path = model_path
|
|
self.device_group = None
|
|
|
|
# generic_inference_input_descriptor will be prepared in initialize
|
|
self.model_nef_descriptor = None
|
|
self.generic_inference_input_descriptor = None
|
|
# Queues for data
|
|
# Input queue for images to be sent
|
|
if max_queue_size > 0:
|
|
self._input_queue = queue.Queue(maxsize=max_queue_size)
|
|
self._output_queue = queue.Queue(maxsize=max_queue_size)
|
|
else:
|
|
self._input_queue = queue.Queue()
|
|
self._output_queue = queue.Queue()
|
|
|
|
# Threading attributes
|
|
self._send_thread = None
|
|
self._receive_thread = None
|
|
self._stop_event = threading.Event() # Event to signal threads to stop
|
|
|
|
self._inference_counter = 0
|
|
|
|
def initialize(self):
|
|
"""
|
|
Connect devices, upload firmware (if upload_fw is True), and upload model.
|
|
Must be called before start().
|
|
"""
|
|
# Connect device and assign to self.device_group
|
|
try:
|
|
print('[Connect Device]')
|
|
self.device_group = kp.core.connect_devices(usb_port_ids=self.port_id)
|
|
print(' - Success')
|
|
except kp.ApiKPException as exception:
|
|
print('Error: connect device fail, port ID = \'{}\', error msg: [{}]'.format(self.port_id, str(exception)))
|
|
sys.exit(1)
|
|
|
|
# setting timeout of the usb communication with the device
|
|
# print('[Set Device Timeout]')
|
|
# kp.core.set_timeout(device_group=self.device_group, milliseconds=5000)
|
|
# print(' - Success')
|
|
|
|
# if self.upload_fw:
|
|
try:
|
|
print('[Upload Firmware]')
|
|
kp.core.load_firmware_from_file(device_group=self.device_group,
|
|
scpu_fw_path=self.scpu_fw_path,
|
|
ncpu_fw_path=self.ncpu_fw_path)
|
|
print(' - Success')
|
|
except kp.ApiKPException as exception:
|
|
print('Error: upload firmware failed, error = \'{}\''.format(str(exception)))
|
|
sys.exit(1)
|
|
|
|
# upload model to device
|
|
try:
|
|
print('[Upload Model]')
|
|
self.model_nef_descriptor = kp.core.load_model_from_file(device_group=self.device_group,
|
|
file_path=self.model_path)
|
|
print(' - Success')
|
|
except kp.ApiKPException as exception:
|
|
print('Error: upload model failed, error = \'{}\''.format(str(exception)))
|
|
sys.exit(1)
|
|
|
|
# Extract model input dimensions automatically from model metadata
|
|
if self.model_nef_descriptor and self.model_nef_descriptor.models:
|
|
model = self.model_nef_descriptor.models[0]
|
|
if hasattr(model, 'input_nodes') and model.input_nodes:
|
|
input_node = model.input_nodes[0]
|
|
# From your JSON: "shape_npu": [1, 3, 128, 128] -> (width, height)
|
|
shape = input_node.tensor_shape_info.data.shape_npu
|
|
self.model_input_shape = (shape[3], shape[2]) # (width, height)
|
|
self.model_input_channels = shape[1] # 3 for RGB
|
|
print(f"Model input shape detected: {self.model_input_shape}, channels: {self.model_input_channels}")
|
|
else:
|
|
self.model_input_shape = (128, 128) # fallback
|
|
self.model_input_channels = 3
|
|
print("Using default input shape (128, 128)")
|
|
else:
|
|
self.model_input_shape = (128, 128)
|
|
self.model_input_channels = 3
|
|
print("Model info not available, using default shape")
|
|
|
|
# Prepare generic inference input descriptor after model is loaded
|
|
if self.model_nef_descriptor:
|
|
self.generic_inference_input_descriptor = kp.GenericImageInferenceDescriptor(
|
|
model_id=self.model_nef_descriptor.models[0].id,
|
|
)
|
|
else:
|
|
print("Warning: Could not get generic inference input descriptor from model.")
|
|
self.generic_inference_input_descriptor = None
|
|
|
|
def preprocess_frame(self, frame: np.ndarray, target_format: str = 'BGR565') -> np.ndarray:
|
|
"""
|
|
Preprocess frame for inference
|
|
"""
|
|
resized_frame = cv2.resize(frame, self.model_input_shape)
|
|
|
|
if target_format == 'BGR565':
|
|
return cv2.cvtColor(resized_frame, cv2.COLOR_BGR2BGR565)
|
|
elif target_format == 'RGB8888':
|
|
return cv2.cvtColor(resized_frame, cv2.COLOR_BGR2RGBA)
|
|
elif target_format == 'YUYV':
|
|
return cv2.cvtColor(resized_frame, cv2.COLOR_BGR2YUV_YUYV)
|
|
else:
|
|
return resized_frame # RAW8 or other formats
|
|
|
|
def get_latest_inference_result(self, timeout: float = 0.01) -> Tuple[float, str]:
|
|
"""
|
|
Get the latest inference result
|
|
Returns: (probability, result_string) or (None, None) if no result
|
|
"""
|
|
output_descriptor = self.get_output(timeout=timeout)
|
|
if not output_descriptor:
|
|
return None, None
|
|
|
|
# Process the output descriptor
|
|
if hasattr(output_descriptor, 'header') and \
|
|
hasattr(output_descriptor.header, 'num_output_node') and \
|
|
hasattr(output_descriptor.header, 'inference_number'):
|
|
|
|
inf_node_output_list = []
|
|
retrieval_successful = True
|
|
|
|
for node_idx in range(output_descriptor.header.num_output_node):
|
|
try:
|
|
inference_float_node_output = kp.inference.generic_inference_retrieve_float_node(
|
|
node_idx=node_idx,
|
|
generic_raw_result=output_descriptor,
|
|
channels_ordering=kp.ChannelOrdering.KP_CHANNEL_ORDERING_CHW
|
|
)
|
|
inf_node_output_list.append(inference_float_node_output.ndarray.copy())
|
|
except kp.ApiKPException as e:
|
|
retrieval_successful = False
|
|
break
|
|
except Exception as e:
|
|
retrieval_successful = False
|
|
break
|
|
|
|
if retrieval_successful and len(inf_node_output_list) > 0:
|
|
# Process output nodes
|
|
if output_descriptor.header.num_output_node == 1:
|
|
raw_output_array = inf_node_output_list[0].flatten()
|
|
else:
|
|
concatenated_outputs = [arr.flatten() for arr in inf_node_output_list]
|
|
raw_output_array = np.concatenate(concatenated_outputs) if concatenated_outputs else np.array([])
|
|
|
|
if raw_output_array.size > 0:
|
|
probability = postprocess(raw_output_array)
|
|
result_str = "Fire" if probability > 0.5 else "No Fire"
|
|
return probability, result_str
|
|
|
|
return None, None
|
|
|
|
|
|
# Modified _send_thread_func to get data from input queue
|
|
def _send_thread_func(self):
|
|
"""Internal function run by the send thread, gets images from input queue."""
|
|
print("Send thread started.")
|
|
while not self._stop_event.is_set():
|
|
if self.generic_inference_input_descriptor is None:
|
|
# Wait for descriptor to be ready or stop
|
|
self._stop_event.wait(0.1) # Avoid busy waiting
|
|
continue
|
|
|
|
try:
|
|
# Get image and format from the input queue
|
|
# Blocks until an item is available or stop event is set/timeout occurs
|
|
try:
|
|
# Use get with timeout or check stop event in a loop
|
|
# This pattern allows thread to check stop event while waiting on queue
|
|
item = self._input_queue.get(block=True, timeout=0.1)
|
|
# Check if this is our sentinel value
|
|
if item is None:
|
|
continue
|
|
|
|
# Now safely unpack the tuple
|
|
image_data, image_format_enum = item
|
|
except queue.Empty:
|
|
# If queue is empty after timeout, check stop event and continue loop
|
|
continue
|
|
|
|
# Configure and send the image
|
|
self._inference_counter += 1 # Increment counter for each image
|
|
self.generic_inference_input_descriptor.inference_number = self._inference_counter
|
|
self.generic_inference_input_descriptor.input_node_image_list = [kp.GenericInputNodeImage(
|
|
image=image_data,
|
|
image_format=image_format_enum, # Use the format from the queue
|
|
resize_mode=kp.ResizeMode.KP_RESIZE_ENABLE,
|
|
padding_mode=kp.PaddingMode.KP_PADDING_CORNER,
|
|
normalize_mode=kp.NormalizeMode.KP_NORMALIZE_KNERON
|
|
)]
|
|
|
|
kp.inference.generic_image_inference_send(device_group=self.device_group,
|
|
generic_inference_input_descriptor=self.generic_inference_input_descriptor)
|
|
# print("Image sent.") # Optional: add log
|
|
# No need for sleep here usually, as queue.get is blocking
|
|
except kp.ApiKPException as exception:
|
|
print(f' - Error in send thread: inference send failed, error = {exception}')
|
|
self._stop_event.set() # Signal other thread to stop
|
|
except Exception as e:
|
|
print(f' - Unexpected error in send thread: {e}')
|
|
self._stop_event.set()
|
|
|
|
print("Send thread stopped.")
|
|
|
|
# _receive_thread_func remains the same
|
|
def _receive_thread_func(self):
|
|
"""Internal function run by the receive thread, puts results into output queue."""
|
|
print("Receive thread started.")
|
|
while not self._stop_event.is_set():
|
|
try:
|
|
generic_inference_output_descriptor = kp.inference.generic_image_inference_receive(device_group=self.device_group)
|
|
self._output_queue.put(generic_inference_output_descriptor)
|
|
except kp.ApiKPException as exception:
|
|
if not self._stop_event.is_set(): # Avoid printing error if we are already stopping
|
|
print(f' - Error in receive thread: inference receive failed, error = {exception}')
|
|
self._stop_event.set()
|
|
except Exception as e:
|
|
print(f' - Unexpected error in receive thread: {e}')
|
|
self._stop_event.set()
|
|
|
|
print("Receive thread stopped.")
|
|
|
|
def start(self):
|
|
"""
|
|
Start the send and receive threads.
|
|
Must be called after initialize().
|
|
"""
|
|
if self.device_group is None:
|
|
raise RuntimeError("MultiDongle not initialized. Call initialize() first.")
|
|
|
|
if self._send_thread is None or not self._send_thread.is_alive():
|
|
self._stop_event.clear() # Clear stop event for a new start
|
|
self._send_thread = threading.Thread(target=self._send_thread_func, daemon=True)
|
|
self._send_thread.start()
|
|
print("Send thread started.")
|
|
|
|
if self._receive_thread is None or not self._receive_thread.is_alive():
|
|
self._receive_thread = threading.Thread(target=self._receive_thread_func, daemon=True)
|
|
self._receive_thread.start()
|
|
print("Receive thread started.")
|
|
|
|
def stop(self):
|
|
"""Improved stop method with better cleanup"""
|
|
if self._stop_event.is_set():
|
|
return # Already stopping
|
|
|
|
print("Stopping threads...")
|
|
self._stop_event.set()
|
|
|
|
# Clear queues to unblock threads
|
|
while not self._input_queue.empty():
|
|
try:
|
|
self._input_queue.get_nowait()
|
|
except queue.Empty:
|
|
break
|
|
|
|
# Signal send thread to wake up
|
|
self._input_queue.put(None)
|
|
|
|
# Join threads with timeout
|
|
for thread, name in [(self._send_thread, "Send"), (self._receive_thread, "Receive")]:
|
|
if thread and thread.is_alive():
|
|
thread.join(timeout=2.0)
|
|
if thread.is_alive():
|
|
print(f"Warning: {name} thread didn't stop cleanly")
|
|
|
|
def put_input(self, image: Union[str, np.ndarray], format: str, target_size: Tuple[int, int] = None):
|
|
"""
|
|
Put an image into the input queue with flexible preprocessing
|
|
"""
|
|
if isinstance(image, str):
|
|
image_data = cv2.imread(image)
|
|
if image_data is None:
|
|
raise FileNotFoundError(f"Image file not found at {image}")
|
|
if target_size:
|
|
image_data = cv2.resize(image_data, target_size)
|
|
elif isinstance(image, np.ndarray):
|
|
# Don't modify original array, make copy if needed
|
|
image_data = image.copy() if target_size is None else cv2.resize(image, target_size)
|
|
else:
|
|
raise ValueError("Image must be a file path (str) or a numpy array (ndarray).")
|
|
|
|
if format in self._FORMAT_MAPPING:
|
|
image_format_enum = self._FORMAT_MAPPING[format]
|
|
else:
|
|
raise ValueError(f"Unsupported format: {format}")
|
|
|
|
self._input_queue.put((image_data, image_format_enum))
|
|
|
|
def get_output(self, timeout: float = None):
|
|
"""
|
|
Get the next received data from the output queue.
|
|
This method is non-blocking by default unless a timeout is specified.
|
|
:param timeout: Time in seconds to wait for data. If None, it's non-blocking.
|
|
:return: Received data (e.g., kp.GenericInferenceOutputDescriptor) or None if no data available within timeout.
|
|
"""
|
|
try:
|
|
return self._output_queue.get(block=timeout is not None, timeout=timeout)
|
|
except queue.Empty:
|
|
return None
|
|
|
|
def get_device_info(self):
|
|
"""
|
|
Get information about connected devices including port IDs and series.
|
|
|
|
Returns:
|
|
List[Dict]: List of device information with port_id and series
|
|
"""
|
|
if self.auto_detect and self.connected_devices_info:
|
|
return self.connected_devices_info
|
|
|
|
# If not auto-detected, try to get info from device group
|
|
if self.device_group:
|
|
try:
|
|
device_info_list = []
|
|
|
|
# Get device group content
|
|
device_group_content = self.device_group.content
|
|
|
|
# Iterate through devices in the group
|
|
for i, port_id in enumerate(self.port_id):
|
|
device_info = {
|
|
'port_id': port_id,
|
|
'series': 'Unknown', # We'll try to determine this
|
|
'device_descriptor': None
|
|
}
|
|
|
|
# Try to get device series from device group
|
|
try:
|
|
# This is a simplified approach - you might need to adjust
|
|
# based on the actual device group structure
|
|
if hasattr(device_group_content, 'devices') and i < len(device_group_content.devices):
|
|
device = device_group_content.devices[i]
|
|
if hasattr(device, 'chip_id'):
|
|
device_info['series'] = self._chip_id_to_series(device.chip_id)
|
|
except:
|
|
# If we can't get series info, keep as 'Unknown'
|
|
pass
|
|
|
|
device_info_list.append(device_info)
|
|
|
|
return device_info_list
|
|
|
|
except Exception as e:
|
|
print(f"Warning: Could not get device info from device group: {str(e)}")
|
|
|
|
# Fallback: return basic info based on port_id
|
|
return [{'port_id': port_id, 'series': 'Unknown', 'device_descriptor': None} for port_id in self.port_id]
|
|
|
|
def _chip_id_to_series(self, chip_id):
|
|
"""
|
|
Convert chip ID to series name.
|
|
|
|
Args:
|
|
chip_id: Chip ID from device
|
|
|
|
Returns:
|
|
str: Device series name
|
|
"""
|
|
chip_mapping = {
|
|
'kl520': 'KL520',
|
|
'kl720': 'KL720',
|
|
'kl630': 'KL630',
|
|
'kl730': 'KL730',
|
|
'kl540': 'KL540',
|
|
}
|
|
|
|
if isinstance(chip_id, str):
|
|
return chip_mapping.get(chip_id.lower(), 'Unknown')
|
|
|
|
return 'Unknown'
|
|
|
|
def print_device_info(self):
|
|
"""
|
|
Print detailed information about connected devices.
|
|
"""
|
|
devices_info = self.get_device_info()
|
|
|
|
if not devices_info:
|
|
print("No device information available")
|
|
return
|
|
|
|
print(f"\n[Connected Devices - {len(devices_info)} device(s)]")
|
|
for i, device_info in enumerate(devices_info):
|
|
print(f" [{i+1}] Port ID: {device_info['port_id']}, Series: {device_info['series']}")
|
|
|
|
def __del__(self):
|
|
"""Ensure resources are released when the object is garbage collected."""
|
|
self.stop()
|
|
if self.device_group:
|
|
try:
|
|
kp.core.disconnect_devices(device_group=self.device_group)
|
|
print("Device group disconnected in destructor.")
|
|
except Exception as e:
|
|
print(f"Error disconnecting device group in destructor: {e}")
|
|
|
|
def postprocess(raw_model_output: list) -> float:
|
|
"""
|
|
Post-processes the raw model output.
|
|
Assumes the model output is a list/array where the first element is the desired probability.
|
|
"""
|
|
if raw_model_output is not None and len(raw_model_output) > 0:
|
|
probability = raw_model_output[0]
|
|
return float(probability)
|
|
return 0.0 # Default or error value
|
|
|
|
class WebcamInferenceRunner:
|
|
def __init__(self, multidongle: MultiDongle, image_format: str = 'BGR565'):
|
|
self.multidongle = multidongle
|
|
self.image_format = image_format
|
|
self.latest_probability = 0.0
|
|
self.result_str = "No Fire"
|
|
|
|
# Statistics tracking
|
|
self.processed_inference_count = 0
|
|
self.inference_fps_start_time = None
|
|
self.display_fps_start_time = None
|
|
self.display_frame_counter = 0
|
|
|
|
def run(self, camera_id: int = 0):
|
|
cap = cv2.VideoCapture(camera_id)
|
|
if not cap.isOpened():
|
|
raise RuntimeError("Cannot open webcam")
|
|
|
|
try:
|
|
while True:
|
|
ret, frame = cap.read()
|
|
if not ret:
|
|
break
|
|
|
|
# Track display FPS
|
|
if self.display_fps_start_time is None:
|
|
self.display_fps_start_time = time.time()
|
|
self.display_frame_counter += 1
|
|
|
|
# Preprocess and send frame
|
|
processed_frame = self.multidongle.preprocess_frame(frame, self.image_format)
|
|
self.multidongle.put_input(processed_frame, self.image_format)
|
|
|
|
# Get inference result
|
|
prob, result = self.multidongle.get_latest_inference_result()
|
|
if prob is not None:
|
|
# Track inference FPS
|
|
if self.inference_fps_start_time is None:
|
|
self.inference_fps_start_time = time.time()
|
|
self.processed_inference_count += 1
|
|
|
|
self.latest_probability = prob
|
|
self.result_str = result
|
|
|
|
# Display frame with results
|
|
self._display_results(frame)
|
|
|
|
if cv2.waitKey(1) & 0xFF == ord('q'):
|
|
break
|
|
|
|
finally:
|
|
# self._print_statistics()
|
|
cap.release()
|
|
cv2.destroyAllWindows()
|
|
|
|
def _display_results(self, frame):
|
|
display_frame = frame.copy()
|
|
text_color = (0, 255, 0) if "Fire" in self.result_str else (0, 0, 255)
|
|
|
|
# Display inference result
|
|
cv2.putText(display_frame, f"{self.result_str} (Prob: {self.latest_probability:.2f})",
|
|
(10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, text_color, 2)
|
|
|
|
# Calculate and display inference FPS
|
|
if self.inference_fps_start_time and self.processed_inference_count > 0:
|
|
elapsed_time = time.time() - self.inference_fps_start_time
|
|
if elapsed_time > 0:
|
|
inference_fps = self.processed_inference_count / elapsed_time
|
|
cv2.putText(display_frame, f"Inference FPS: {inference_fps:.2f}",
|
|
(10, 60), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 255), 2)
|
|
|
|
cv2.imshow('Fire Detection', display_frame)
|
|
|
|
# def _print_statistics(self):
|
|
# """Print final statistics"""
|
|
# print(f"\n--- Summary ---")
|
|
# print(f"Total inferences processed: {self.processed_inference_count}")
|
|
|
|
# if self.inference_fps_start_time and self.processed_inference_count > 0:
|
|
# elapsed = time.time() - self.inference_fps_start_time
|
|
# if elapsed > 0:
|
|
# avg_inference_fps = self.processed_inference_count / elapsed
|
|
# print(f"Average Inference FPS: {avg_inference_fps:.2f}")
|
|
|
|
# if self.display_fps_start_time and self.display_frame_counter > 0:
|
|
# elapsed = time.time() - self.display_fps_start_time
|
|
# if elapsed > 0:
|
|
# avg_display_fps = self.display_frame_counter / elapsed
|
|
# print(f"Average Display FPS: {avg_display_fps:.2f}")
|
|
|
|
if __name__ == "__main__":
|
|
PORT_IDS = [28, 32]
|
|
SCPU_FW = r'fw_scpu.bin'
|
|
NCPU_FW = r'fw_ncpu.bin'
|
|
MODEL_PATH = r'fire_detection_520.nef'
|
|
|
|
try:
|
|
# Initialize inference engine
|
|
print("Initializing MultiDongle...")
|
|
multidongle = MultiDongle(PORT_IDS, SCPU_FW, NCPU_FW, MODEL_PATH, upload_fw=True)
|
|
multidongle.initialize()
|
|
multidongle.start()
|
|
|
|
# Run using the new runner class
|
|
print("Starting webcam inference...")
|
|
runner = WebcamInferenceRunner(multidongle, 'BGR565')
|
|
runner.run()
|
|
|
|
except Exception as e:
|
|
print(f"Error: {e}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
finally:
|
|
if 'multidongle' in locals():
|
|
multidongle.stop() |