主要改進: - 修改 FPS 計算邏輯為累積式計算,避免初期不穩定的高 FPS 值 - 過濾掉 async 和 processing 狀態的結果,不顯示也不計入統計 - 只有真正的推理結果才會被計入 FPS 和處理計數 - 新增 _has_valid_inference_result 方法來驗證結果有效性 - 改善 MultiDongle 的 stop 方法,正確斷開設備連接 - 清理不必要的檔案和更新測試配置 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
504 lines
22 KiB
Python
504 lines
22 KiB
Python
from typing import Union, Tuple
|
|
import os
|
|
import sys
|
|
import argparse
|
|
import time
|
|
import threading
|
|
import queue
|
|
import numpy as np
|
|
import kp
|
|
import cv2
|
|
import time
|
|
from abc import ABC, abstractmethod
|
|
from typing import Callable, Optional, Any, Dict
|
|
|
|
|
|
# class PreProcessor(DataProcessor): # type: ignore
|
|
# def __init__(self, resize_fn: Optional[Callable] = None,
|
|
# format_convert_fn: Optional[Callable] = None):
|
|
# self.resize_fn = resize_fn or self._default_resize
|
|
# self.format_convert_fn = format_convert_fn or self._default_format_convert
|
|
|
|
# def process(self, frame: np.ndarray, target_size: tuple, target_format: str) -> np.ndarray:
|
|
# """Main processing pipeline"""
|
|
# resized = self.resize_fn(frame, target_size)
|
|
# return self.format_convert_fn(resized, target_format)
|
|
|
|
# def _default_resize(self, frame: np.ndarray, target_size: tuple) -> np.ndarray:
|
|
# """Default resize implementation"""
|
|
# return cv2.resize(frame, target_size)
|
|
|
|
# def _default_format_convert(self, frame: np.ndarray, target_format: str) -> np.ndarray:
|
|
# """Default format conversion"""
|
|
# if target_format == 'BGR565':
|
|
# return cv2.cvtColor(frame, cv2.COLOR_BGR2BGR565)
|
|
# elif target_format == 'RGB8888':
|
|
# return cv2.cvtColor(frame, cv2.COLOR_BGR2RGBA)
|
|
# return frame
|
|
|
|
class MultiDongle:
|
|
# Curently, only BGR565, RGB8888, YUYV, and RAW8 formats are supported
|
|
_FORMAT_MAPPING = {
|
|
'BGR565': kp.ImageFormat.KP_IMAGE_FORMAT_RGB565,
|
|
'RGB8888': kp.ImageFormat.KP_IMAGE_FORMAT_RGBA8888,
|
|
'YUYV': kp.ImageFormat.KP_IMAGE_FORMAT_YUYV,
|
|
'RAW8': kp.ImageFormat.KP_IMAGE_FORMAT_RAW8,
|
|
# 'YCBCR422_CRY1CBY0': kp.ImageFormat.KP_IMAGE_FORMAT_YCBCR422_CRY1CBY0,
|
|
# 'YCBCR422_CBY1CRY0': kp.ImageFormat.KP_IMAGE_FORMAT_CBY1CRY0,
|
|
# 'YCBCR422_Y1CRY0CB': kp.ImageFormat.KP_IMAGE_FORMAT_Y1CRY0CB,
|
|
# 'YCBCR422_Y1CBY0CR': kp.ImageFormat.KP_IMAGE_FORMAT_Y1CBY0CR,
|
|
# 'YCBCR422_CRY0CBY1': kp.ImageFormat.KP_IMAGE_FORMAT_CRY0CBY1,
|
|
# 'YCBCR422_CBY0CRY1': kp.ImageFormat.KP_IMAGE_FORMAT_CBY0CRY1,
|
|
# 'YCBCR422_Y0CRY1CB': kp.ImageFormat.KP_IMAGE_FORMAT_Y0CRY1CB,
|
|
# 'YCBCR422_Y0CBY1CR': kp.ImageFormat.KP_IMAGE_FORMAT_Y0CBY1CR,
|
|
}
|
|
|
|
def __init__(self, port_id: list, scpu_fw_path: str, ncpu_fw_path: str, model_path: str, upload_fw: bool = False):
|
|
"""
|
|
Initialize the MultiDongle class.
|
|
:param port_id: List of USB port IDs for the same layer's devices.
|
|
:param scpu_fw_path: Path to the SCPU firmware file.
|
|
:param ncpu_fw_path: Path to the NCPU firmware file.
|
|
:param model_path: Path to the model file.
|
|
:param upload_fw: Flag to indicate whether to upload firmware.
|
|
"""
|
|
self.port_id = port_id
|
|
self.upload_fw = upload_fw
|
|
|
|
# Check if the firmware is needed
|
|
if self.upload_fw:
|
|
self.scpu_fw_path = scpu_fw_path
|
|
self.ncpu_fw_path = ncpu_fw_path
|
|
|
|
self.model_path = model_path
|
|
self.device_group = None
|
|
|
|
# generic_inference_input_descriptor will be prepared in initialize
|
|
self.model_nef_descriptor = None
|
|
self.generic_inference_input_descriptor = None
|
|
# Queues for data
|
|
# Input queue for images to be sent
|
|
self._input_queue = queue.Queue()
|
|
# Output queue for received results
|
|
self._output_queue = queue.Queue()
|
|
|
|
# Threading attributes
|
|
self._send_thread = None
|
|
self._receive_thread = None
|
|
self._stop_event = threading.Event() # Event to signal threads to stop
|
|
|
|
self._inference_counter = 0
|
|
|
|
def initialize(self):
|
|
"""
|
|
Connect devices, upload firmware (if upload_fw is True), and upload model.
|
|
Must be called before start().
|
|
"""
|
|
# Connect device and assign to self.device_group
|
|
try:
|
|
print('[Connect Device]')
|
|
self.device_group = kp.core.connect_devices(usb_port_ids=self.port_id)
|
|
print(' - Success')
|
|
except kp.ApiKPException as exception:
|
|
print('Error: connect device fail, port ID = \'{}\', error msg: [{}]'.format(self.port_id, str(exception)))
|
|
sys.exit(1)
|
|
|
|
# setting timeout of the usb communication with the device
|
|
# print('[Set Device Timeout]')
|
|
# kp.core.set_timeout(device_group=self.device_group, milliseconds=5000)
|
|
# print(' - Success')
|
|
|
|
if self.upload_fw:
|
|
try:
|
|
print('[Upload Firmware]')
|
|
kp.core.load_firmware_from_file(device_group=self.device_group,
|
|
scpu_fw_path=self.scpu_fw_path,
|
|
ncpu_fw_path=self.ncpu_fw_path)
|
|
print(' - Success')
|
|
except kp.ApiKPException as exception:
|
|
print('Error: upload firmware failed, error = \'{}\''.format(str(exception)))
|
|
sys.exit(1)
|
|
|
|
# upload model to device
|
|
try:
|
|
print('[Upload Model]')
|
|
self.model_nef_descriptor = kp.core.load_model_from_file(device_group=self.device_group,
|
|
file_path=self.model_path)
|
|
print(' - Success')
|
|
except kp.ApiKPException as exception:
|
|
print('Error: upload model failed, error = \'{}\''.format(str(exception)))
|
|
sys.exit(1)
|
|
|
|
# Extract model input dimensions automatically from model metadata
|
|
if self.model_nef_descriptor and self.model_nef_descriptor.models:
|
|
model = self.model_nef_descriptor.models[0]
|
|
if hasattr(model, 'input_nodes') and model.input_nodes:
|
|
input_node = model.input_nodes[0]
|
|
# From your JSON: "shape_npu": [1, 3, 128, 128] -> (width, height)
|
|
shape = input_node.tensor_shape_info.data.shape_npu
|
|
self.model_input_shape = (shape[3], shape[2]) # (width, height)
|
|
self.model_input_channels = shape[1] # 3 for RGB
|
|
print(f"Model input shape detected: {self.model_input_shape}, channels: {self.model_input_channels}")
|
|
else:
|
|
self.model_input_shape = (128, 128) # fallback
|
|
self.model_input_channels = 3
|
|
print("Using default input shape (128, 128)")
|
|
else:
|
|
self.model_input_shape = (128, 128)
|
|
self.model_input_channels = 3
|
|
print("Model info not available, using default shape")
|
|
|
|
# Prepare generic inference input descriptor after model is loaded
|
|
if self.model_nef_descriptor:
|
|
self.generic_inference_input_descriptor = kp.GenericImageInferenceDescriptor(
|
|
model_id=self.model_nef_descriptor.models[0].id,
|
|
)
|
|
else:
|
|
print("Warning: Could not get generic inference input descriptor from model.")
|
|
self.generic_inference_input_descriptor = None
|
|
|
|
def preprocess_frame(self, frame: np.ndarray, target_format: str = 'BGR565') -> np.ndarray:
|
|
"""
|
|
Preprocess frame for inference
|
|
"""
|
|
resized_frame = cv2.resize(frame, self.model_input_shape)
|
|
|
|
if target_format == 'BGR565':
|
|
return cv2.cvtColor(resized_frame, cv2.COLOR_BGR2BGR565)
|
|
elif target_format == 'RGB8888':
|
|
return cv2.cvtColor(resized_frame, cv2.COLOR_BGR2RGBA)
|
|
elif target_format == 'YUYV':
|
|
return cv2.cvtColor(resized_frame, cv2.COLOR_BGR2YUV_YUYV)
|
|
else:
|
|
return resized_frame # RAW8 or other formats
|
|
|
|
def get_latest_inference_result(self, timeout: float = 0.01) -> Tuple[float, str]:
|
|
"""
|
|
Get the latest inference result
|
|
Returns: (probability, result_string) or (None, None) if no result
|
|
"""
|
|
output_descriptor = self.get_output(timeout=timeout)
|
|
if not output_descriptor:
|
|
return None, None
|
|
|
|
# Process the output descriptor
|
|
if hasattr(output_descriptor, 'header') and \
|
|
hasattr(output_descriptor.header, 'num_output_node') and \
|
|
hasattr(output_descriptor.header, 'inference_number'):
|
|
|
|
inf_node_output_list = []
|
|
retrieval_successful = True
|
|
|
|
for node_idx in range(output_descriptor.header.num_output_node):
|
|
try:
|
|
inference_float_node_output = kp.inference.generic_inference_retrieve_float_node(
|
|
node_idx=node_idx,
|
|
generic_raw_result=output_descriptor,
|
|
channels_ordering=kp.ChannelOrdering.KP_CHANNEL_ORDERING_CHW
|
|
)
|
|
inf_node_output_list.append(inference_float_node_output.ndarray.copy())
|
|
except kp.ApiKPException as e:
|
|
retrieval_successful = False
|
|
break
|
|
except Exception as e:
|
|
retrieval_successful = False
|
|
break
|
|
|
|
if retrieval_successful and len(inf_node_output_list) > 0:
|
|
# Process output nodes
|
|
if output_descriptor.header.num_output_node == 1:
|
|
raw_output_array = inf_node_output_list[0].flatten()
|
|
else:
|
|
concatenated_outputs = [arr.flatten() for arr in inf_node_output_list]
|
|
raw_output_array = np.concatenate(concatenated_outputs) if concatenated_outputs else np.array([])
|
|
|
|
if raw_output_array.size > 0:
|
|
probability = postprocess(raw_output_array)
|
|
result_str = "Fire" if probability > 0.5 else "No Fire"
|
|
return probability, result_str
|
|
|
|
return None, None
|
|
|
|
# Modified _send_thread_func to get data from input queue
|
|
def _send_thread_func(self):
|
|
"""Internal function run by the send thread, gets images from input queue."""
|
|
print("Send thread started.")
|
|
while not self._stop_event.is_set():
|
|
if self.generic_inference_input_descriptor is None:
|
|
# Wait for descriptor to be ready or stop
|
|
self._stop_event.wait(0.1) # Avoid busy waiting
|
|
continue
|
|
|
|
try:
|
|
# Get image and format from the input queue
|
|
# Blocks until an item is available or stop event is set/timeout occurs
|
|
try:
|
|
# Use get with timeout or check stop event in a loop
|
|
# This pattern allows thread to check stop event while waiting on queue
|
|
item = self._input_queue.get(block=True, timeout=0.1)
|
|
# Check if this is our sentinel value
|
|
if item is None:
|
|
continue
|
|
|
|
# Now safely unpack the tuple
|
|
image_data, image_format_enum = item
|
|
except queue.Empty:
|
|
# If queue is empty after timeout, check stop event and continue loop
|
|
continue
|
|
|
|
# Configure and send the image
|
|
self._inference_counter += 1 # Increment counter for each image
|
|
self.generic_inference_input_descriptor.inference_number = self._inference_counter
|
|
self.generic_inference_input_descriptor.input_node_image_list = [kp.GenericInputNodeImage(
|
|
image=image_data,
|
|
image_format=image_format_enum, # Use the format from the queue
|
|
resize_mode=kp.ResizeMode.KP_RESIZE_ENABLE,
|
|
padding_mode=kp.PaddingMode.KP_PADDING_CORNER,
|
|
normalize_mode=kp.NormalizeMode.KP_NORMALIZE_KNERON
|
|
)]
|
|
|
|
kp.inference.generic_image_inference_send(device_group=self.device_group,
|
|
generic_inference_input_descriptor=self.generic_inference_input_descriptor)
|
|
# print("Image sent.") # Optional: add log
|
|
# No need for sleep here usually, as queue.get is blocking
|
|
except kp.ApiKPException as exception:
|
|
print(f' - Error in send thread: inference send failed, error = {exception}')
|
|
self._stop_event.set() # Signal other thread to stop
|
|
except Exception as e:
|
|
print(f' - Unexpected error in send thread: {e}')
|
|
self._stop_event.set()
|
|
|
|
print("Send thread stopped.")
|
|
|
|
# _receive_thread_func remains the same
|
|
def _receive_thread_func(self):
|
|
"""Internal function run by the receive thread, puts results into output queue."""
|
|
print("Receive thread started.")
|
|
while not self._stop_event.is_set():
|
|
try:
|
|
generic_inference_output_descriptor = kp.inference.generic_image_inference_receive(device_group=self.device_group)
|
|
self._output_queue.put(generic_inference_output_descriptor)
|
|
except kp.ApiKPException as exception:
|
|
if not self._stop_event.is_set(): # Avoid printing error if we are already stopping
|
|
print(f' - Error in receive thread: inference receive failed, error = {exception}')
|
|
self._stop_event.set()
|
|
except Exception as e:
|
|
print(f' - Unexpected error in receive thread: {e}')
|
|
self._stop_event.set()
|
|
|
|
print("Receive thread stopped.")
|
|
|
|
def start(self):
|
|
"""
|
|
Start the send and receive threads.
|
|
Must be called after initialize().
|
|
"""
|
|
if self.device_group is None:
|
|
raise RuntimeError("MultiDongle not initialized. Call initialize() first.")
|
|
|
|
if self._send_thread is None or not self._send_thread.is_alive():
|
|
self._stop_event.clear() # Clear stop event for a new start
|
|
self._send_thread = threading.Thread(target=self._send_thread_func, daemon=True)
|
|
self._send_thread.start()
|
|
print("Send thread started.")
|
|
|
|
if self._receive_thread is None or not self._receive_thread.is_alive():
|
|
self._receive_thread = threading.Thread(target=self._receive_thread_func, daemon=True)
|
|
self._receive_thread.start()
|
|
print("Receive thread started.")
|
|
|
|
def stop(self):
|
|
"""Improved stop method with better cleanup"""
|
|
if self._stop_event.is_set():
|
|
return # Already stopping
|
|
|
|
print("Stopping threads...")
|
|
self._stop_event.set()
|
|
|
|
# Clear queues to unblock threads
|
|
while not self._input_queue.empty():
|
|
try:
|
|
self._input_queue.get_nowait()
|
|
except queue.Empty:
|
|
break
|
|
|
|
# Signal send thread to wake up
|
|
self._input_queue.put(None)
|
|
|
|
# Join threads with timeout
|
|
for thread, name in [(self._send_thread, "Send"), (self._receive_thread, "Receive")]:
|
|
if thread and thread.is_alive():
|
|
thread.join(timeout=2.0)
|
|
if thread.is_alive():
|
|
print(f"Warning: {name} thread didn't stop cleanly")
|
|
|
|
def put_input(self, image: Union[str, np.ndarray], format: str, target_size: Tuple[int, int] = None):
|
|
"""
|
|
Put an image into the input queue with flexible preprocessing
|
|
"""
|
|
if isinstance(image, str):
|
|
image_data = cv2.imread(image)
|
|
if image_data is None:
|
|
raise FileNotFoundError(f"Image file not found at {image}")
|
|
if target_size:
|
|
image_data = cv2.resize(image_data, target_size)
|
|
elif isinstance(image, np.ndarray):
|
|
# Don't modify original array, make copy if needed
|
|
image_data = image.copy() if target_size is None else cv2.resize(image, target_size)
|
|
else:
|
|
raise ValueError("Image must be a file path (str) or a numpy array (ndarray).")
|
|
|
|
if format in self._FORMAT_MAPPING:
|
|
image_format_enum = self._FORMAT_MAPPING[format]
|
|
else:
|
|
raise ValueError(f"Unsupported format: {format}")
|
|
|
|
self._input_queue.put((image_data, image_format_enum))
|
|
|
|
def get_output(self, timeout: float = None):
|
|
"""
|
|
Get the next received data from the output queue.
|
|
This method is non-blocking by default unless a timeout is specified.
|
|
:param timeout: Time in seconds to wait for data. If None, it's non-blocking.
|
|
:return: Received data (e.g., kp.GenericInferenceOutputDescriptor) or None if no data available within timeout.
|
|
"""
|
|
try:
|
|
return self._output_queue.get(block=timeout is not None, timeout=timeout)
|
|
except queue.Empty:
|
|
return None
|
|
|
|
def __del__(self):
|
|
"""Ensure resources are released when the object is garbage collected."""
|
|
self.stop()
|
|
if self.device_group:
|
|
try:
|
|
kp.core.disconnect_devices(device_group=self.device_group)
|
|
print("Device group disconnected in destructor.")
|
|
except Exception as e:
|
|
print(f"Error disconnecting device group in destructor: {e}")
|
|
|
|
def postprocess(raw_model_output: list) -> float:
|
|
"""
|
|
Post-processes the raw model output.
|
|
Assumes the model output is a list/array where the first element is the desired probability.
|
|
"""
|
|
if raw_model_output is not None and len(raw_model_output) > 0:
|
|
probability = raw_model_output[0]
|
|
return float(probability)
|
|
return 0.0 # Default or error value
|
|
|
|
class WebcamInferenceRunner:
|
|
def __init__(self, multidongle: MultiDongle, image_format: str = 'BGR565'):
|
|
self.multidongle = multidongle
|
|
self.image_format = image_format
|
|
self.latest_probability = 0.0
|
|
self.result_str = "No Fire"
|
|
|
|
# Statistics tracking
|
|
self.processed_inference_count = 0
|
|
self.inference_fps_start_time = None
|
|
self.display_fps_start_time = None
|
|
self.display_frame_counter = 0
|
|
|
|
def run(self, camera_id: int = 0):
|
|
cap = cv2.VideoCapture(camera_id)
|
|
if not cap.isOpened():
|
|
raise RuntimeError("Cannot open webcam")
|
|
|
|
try:
|
|
while True:
|
|
ret, frame = cap.read()
|
|
if not ret:
|
|
break
|
|
|
|
# Track display FPS
|
|
if self.display_fps_start_time is None:
|
|
self.display_fps_start_time = time.time()
|
|
self.display_frame_counter += 1
|
|
|
|
# Preprocess and send frame
|
|
processed_frame = self.multidongle.preprocess_frame(frame, self.image_format)
|
|
self.multidongle.put_input(processed_frame, self.image_format)
|
|
|
|
# Get inference result
|
|
prob, result = self.multidongle.get_latest_inference_result()
|
|
if prob is not None:
|
|
# Track inference FPS
|
|
if self.inference_fps_start_time is None:
|
|
self.inference_fps_start_time = time.time()
|
|
self.processed_inference_count += 1
|
|
|
|
self.latest_probability = prob
|
|
self.result_str = result
|
|
|
|
# Display frame with results
|
|
self._display_results(frame)
|
|
|
|
if cv2.waitKey(1) & 0xFF == ord('q'):
|
|
break
|
|
|
|
finally:
|
|
# self._print_statistics()
|
|
cap.release()
|
|
cv2.destroyAllWindows()
|
|
|
|
def _display_results(self, frame):
|
|
display_frame = frame.copy()
|
|
text_color = (0, 255, 0) if "Fire" in self.result_str else (0, 0, 255)
|
|
|
|
# Display inference result
|
|
cv2.putText(display_frame, f"{self.result_str} (Prob: {self.latest_probability:.2f})",
|
|
(10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, text_color, 2)
|
|
|
|
# Calculate and display inference FPS
|
|
if self.inference_fps_start_time and self.processed_inference_count > 0:
|
|
elapsed_time = time.time() - self.inference_fps_start_time
|
|
if elapsed_time > 0:
|
|
inference_fps = self.processed_inference_count / elapsed_time
|
|
cv2.putText(display_frame, f"Inference FPS: {inference_fps:.2f}",
|
|
(10, 60), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 255), 2)
|
|
|
|
cv2.imshow('Fire Detection', display_frame)
|
|
|
|
# def _print_statistics(self):
|
|
# """Print final statistics"""
|
|
# print(f"\n--- Summary ---")
|
|
# print(f"Total inferences processed: {self.processed_inference_count}")
|
|
|
|
# if self.inference_fps_start_time and self.processed_inference_count > 0:
|
|
# elapsed = time.time() - self.inference_fps_start_time
|
|
# if elapsed > 0:
|
|
# avg_inference_fps = self.processed_inference_count / elapsed
|
|
# print(f"Average Inference FPS: {avg_inference_fps:.2f}")
|
|
|
|
# if self.display_fps_start_time and self.display_frame_counter > 0:
|
|
# elapsed = time.time() - self.display_fps_start_time
|
|
# if elapsed > 0:
|
|
# avg_display_fps = self.display_frame_counter / elapsed
|
|
# print(f"Average Display FPS: {avg_display_fps:.2f}")
|
|
|
|
if __name__ == "__main__":
|
|
PORT_IDS = [32]
|
|
SCPU_FW = r'C:/Users/mason/Downloads/kneron_plus_v3.1.2/kneron_plus/res/firmware/KL520/fw_scpu.bin'
|
|
NCPU_FW = r'C:/Users/mason/Downloads/kneron_plus_v3.1.2/kneron_plus/res/firmware/KL520/fw_ncpu.bin'
|
|
MODEL_PATH = r'C:/Users/mason/AppData/Local/Kneron_Academy/utils/yolov5s/yolov5s/kl520_20005_yolov5-noupsample_w640h640.nef'
|
|
|
|
try:
|
|
# Initialize inference engine
|
|
print("Initializing MultiDongle...")
|
|
multidongle = MultiDongle(PORT_IDS, SCPU_FW, NCPU_FW, MODEL_PATH, upload_fw=True)
|
|
multidongle.initialize()
|
|
multidongle.start()
|
|
|
|
# Run using the new runner class
|
|
print("Starting webcam inference...")
|
|
runner = WebcamInferenceRunner(multidongle, 'BGR565')
|
|
runner.run()
|
|
|
|
except Exception as e:
|
|
print(f"Error: {e}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
finally:
|
|
if 'multidongle' in locals():
|
|
multidongle.stop() |