from typing import Union, Tuple import os import sys import argparse import time import threading import queue import numpy as np import kp import cv2 import time class MultiDongle: # Curently, only BGR565, RGB8888, YUYV, and RAW8 formats are supported _FORMAT_MAPPING = { 'BGR565': kp.ImageFormat.KP_IMAGE_FORMAT_RGB565, 'RGB8888': kp.ImageFormat.KP_IMAGE_FORMAT_RGBA8888, 'YUYV': kp.ImageFormat.KP_IMAGE_FORMAT_YUYV, 'RAW8': kp.ImageFormat.KP_IMAGE_FORMAT_RAW8, # 'YCBCR422_CRY1CBY0': kp.ImageFormat.KP_IMAGE_FORMAT_YCBCR422_CRY1CBY0, # 'YCBCR422_CBY1CRY0': kp.ImageFormat.KP_IMAGE_FORMAT_CBY1CRY0, # 'YCBCR422_Y1CRY0CB': kp.ImageFormat.KP_IMAGE_FORMAT_Y1CRY0CB, # 'YCBCR422_Y1CBY0CR': kp.ImageFormat.KP_IMAGE_FORMAT_Y1CBY0CR, # 'YCBCR422_CRY0CBY1': kp.ImageFormat.KP_IMAGE_FORMAT_CRY0CBY1, # 'YCBCR422_CBY0CRY1': kp.ImageFormat.KP_IMAGE_FORMAT_CBY0CRY1, # 'YCBCR422_Y0CRY1CB': kp.ImageFormat.KP_IMAGE_FORMAT_Y0CRY1CB, # 'YCBCR422_Y0CBY1CR': kp.ImageFormat.KP_IMAGE_FORMAT_Y0CBY1CR, } def __init__(self, port_id: list, scpu_fw_path: str, ncpu_fw_path: str, model_path: str, upload_fw: bool = False): """ Initialize the MultiDongle class. :param port_id: List of USB port IDs for the same layer's devices. :param scpu_fw_path: Path to the SCPU firmware file. :param ncpu_fw_path: Path to the NCPU firmware file. :param model_path: Path to the model file. :param upload_fw: Flag to indicate whether to upload firmware. """ self.port_id = port_id self.upload_fw = upload_fw # Check if the firmware is needed if self.upload_fw: self.scpu_fw_path = scpu_fw_path self.ncpu_fw_path = ncpu_fw_path self.model_path = model_path self.device_group = None # generic_inference_input_descriptor will be prepared in initialize self.model_nef_descriptor = None self.generic_inference_input_descriptor = None # Queues for data # Input queue for images to be sent self._input_queue = queue.Queue() # Output queue for received results self._output_queue = queue.Queue() # Threading attributes self._send_thread = None self._receive_thread = None self._stop_event = threading.Event() # Event to signal threads to stop self._inference_counter = 0 def initialize(self): """ Connect devices, upload firmware (if upload_fw is True), and upload model. Must be called before start(). """ # Connect device and assign to self.device_group try: print('[Connect Device]') self.device_group = kp.core.connect_devices(usb_port_ids=self.port_id) print(' - Success') except kp.ApiKPException as exception: print('Error: connect device fail, port ID = \'{}\', error msg: [{}]'.format(self.port_id, str(exception))) sys.exit(1) # setting timeout of the usb communication with the device # print('[Set Device Timeout]') # kp.core.set_timeout(device_group=self.device_group, milliseconds=5000) # print(' - Success') if self.upload_fw: try: print('[Upload Firmware]') kp.core.load_firmware_from_file(device_group=self.device_group, scpu_fw_path=self.scpu_fw_path, ncpu_fw_path=self.ncpu_fw_path) print(' - Success') except kp.ApiKPException as exception: print('Error: upload firmware failed, error = \'{}\''.format(str(exception))) sys.exit(1) # upload model to device try: print('[Upload Model]') self.model_nef_descriptor = kp.core.load_model_from_file(device_group=self.device_group, file_path=self.model_path) print(' - Success') except kp.ApiKPException as exception: print('Error: upload model failed, error = \'{}\''.format(str(exception))) sys.exit(1) # Extract model input dimensions automatically from model metadata if self.model_nef_descriptor and self.model_nef_descriptor.models: model = self.model_nef_descriptor.models[0] if hasattr(model, 'input_nodes') and model.input_nodes: input_node = model.input_nodes[0] # From your JSON: "shape_npu": [1, 3, 128, 128] -> (width, height) shape = input_node.tensor_shape_info.data.shape_npu self.model_input_shape = (shape[3], shape[2]) # (width, height) self.model_input_channels = shape[1] # 3 for RGB print(f"Model input shape detected: {self.model_input_shape}, channels: {self.model_input_channels}") else: self.model_input_shape = (128, 128) # fallback self.model_input_channels = 3 print("Using default input shape (128, 128)") else: self.model_input_shape = (128, 128) self.model_input_channels = 3 print("Model info not available, using default shape") # Prepare generic inference input descriptor after model is loaded if self.model_nef_descriptor: self.generic_inference_input_descriptor = kp.GenericImageInferenceDescriptor( model_id=self.model_nef_descriptor.models[0].id, ) else: print("Warning: Could not get generic inference input descriptor from model.") self.generic_inference_input_descriptor = None def preprocess_frame(self, frame: np.ndarray, target_format: str = 'BGR565') -> np.ndarray: """ Preprocess frame for inference """ resized_frame = cv2.resize(frame, self.model_input_shape) if target_format == 'BGR565': return cv2.cvtColor(resized_frame, cv2.COLOR_BGR2BGR565) elif target_format == 'RGB8888': return cv2.cvtColor(resized_frame, cv2.COLOR_BGR2RGBA) elif target_format == 'YUYV': return cv2.cvtColor(resized_frame, cv2.COLOR_BGR2YUV_YUYV) else: return resized_frame # RAW8 or other formats def get_latest_inference_result(self, timeout: float = 0.01) -> Tuple[float, str]: """ Get the latest inference result Returns: (probability, result_string) or (None, None) if no result """ output_descriptor = self.get_output(timeout=timeout) if not output_descriptor: return None, None # Process the output descriptor if hasattr(output_descriptor, 'header') and \ hasattr(output_descriptor.header, 'num_output_node') and \ hasattr(output_descriptor.header, 'inference_number'): inf_node_output_list = [] retrieval_successful = True for node_idx in range(output_descriptor.header.num_output_node): try: inference_float_node_output = kp.inference.generic_inference_retrieve_float_node( node_idx=node_idx, generic_raw_result=output_descriptor, channels_ordering=kp.ChannelOrdering.KP_CHANNEL_ORDERING_CHW ) inf_node_output_list.append(inference_float_node_output.ndarray.copy()) except kp.ApiKPException as e: retrieval_successful = False break except Exception as e: retrieval_successful = False break if retrieval_successful and inf_node_output_list: # Process output nodes if output_descriptor.header.num_output_node == 1: raw_output_array = inf_node_output_list[0].flatten() else: concatenated_outputs = [arr.flatten() for arr in inf_node_output_list] raw_output_array = np.concatenate(concatenated_outputs) if concatenated_outputs else np.array([]) if raw_output_array.size > 0: probability = postprocess(raw_output_array) result_str = "Fire" if probability > 0.5 else "No Fire" return probability, result_str return None, None # Modified _send_thread_func to get data from input queue def _send_thread_func(self): """Internal function run by the send thread, gets images from input queue.""" print("Send thread started.") while not self._stop_event.is_set(): if self.generic_inference_input_descriptor is None: # Wait for descriptor to be ready or stop self._stop_event.wait(0.1) # Avoid busy waiting continue try: # Get image and format from the input queue # Blocks until an item is available or stop event is set/timeout occurs try: # Use get with timeout or check stop event in a loop # This pattern allows thread to check stop event while waiting on queue item = self._input_queue.get(block=True, timeout=0.1) # Check if this is our sentinel value if item is None: continue # Now safely unpack the tuple image_data, image_format_enum = item except queue.Empty: # If queue is empty after timeout, check stop event and continue loop continue # Configure and send the image self._inference_counter += 1 # Increment counter for each image self.generic_inference_input_descriptor.inference_number = self._inference_counter self.generic_inference_input_descriptor.input_node_image_list = [kp.GenericInputNodeImage( image=image_data, image_format=image_format_enum, # Use the format from the queue resize_mode=kp.ResizeMode.KP_RESIZE_ENABLE, padding_mode=kp.PaddingMode.KP_PADDING_CORNER, normalize_mode=kp.NormalizeMode.KP_NORMALIZE_KNERON )] kp.inference.generic_image_inference_send(device_group=self.device_group, generic_inference_input_descriptor=self.generic_inference_input_descriptor) # print("Image sent.") # Optional: add log # No need for sleep here usually, as queue.get is blocking except kp.ApiKPException as exception: print(f' - Error in send thread: inference send failed, error = {exception}') self._stop_event.set() # Signal other thread to stop except Exception as e: print(f' - Unexpected error in send thread: {e}') self._stop_event.set() print("Send thread stopped.") # _receive_thread_func remains the same def _receive_thread_func(self): """Internal function run by the receive thread, puts results into output queue.""" print("Receive thread started.") while not self._stop_event.is_set(): try: generic_inference_output_descriptor = kp.inference.generic_image_inference_receive(device_group=self.device_group) self._output_queue.put(generic_inference_output_descriptor) except kp.ApiKPException as exception: if not self._stop_event.is_set(): # Avoid printing error if we are already stopping print(f' - Error in receive thread: inference receive failed, error = {exception}') self._stop_event.set() except Exception as e: print(f' - Unexpected error in receive thread: {e}') self._stop_event.set() print("Receive thread stopped.") # start method signature changed (no image/format parameters) def start(self): """ Start the send and receive threads. Must be called after initialize(). """ if self.device_group is None: raise RuntimeError("MultiDongle not initialized. Call initialize() first.") if self._send_thread is None or not self._send_thread.is_alive(): self._stop_event.clear() # Clear stop event for a new start self._send_thread = threading.Thread(target=self._send_thread_func, daemon=True) self._send_thread.start() print("Send thread started.") if self._receive_thread is None or not self._receive_thread.is_alive(): self._receive_thread = threading.Thread(target=self._receive_thread_func, daemon=True) self._receive_thread.start() print("Receive thread started.") # stop method remains the same # def stop(self): # """ # Signal the threads to stop and wait for them to finish. # """ # print("Stopping threads...") # self._stop_event.set() # Signal stop # # Put a dummy item in the input queue to unblock the send thread if it's waiting # try: # self._input_queue.put(None) # except Exception as e: # print(f"Error putting dummy item in input queue: {e}") # if self._send_thread and self._send_thread.is_alive(): # self._send_thread.join() # print("Send thread joined.") # if self._receive_thread and self._receive_thread.is_alive(): # # DON'T disconnect the device group unless absolutely necessary # # Instead, use a timeout and warning # self._receive_thread.join(timeout=5) # if self._receive_thread.is_alive(): # print("Warning: Receive thread did not join within timeout. It might be blocked.") # # Only disconnect as a last resort for stuck threads # if self.device_group: # try: # print("Thread stuck - disconnecting device group as last resort...") # kp.core.disconnect_devices(device_group=self.device_group) # # IMPORTANT: Re-connect immediately to keep device available # self.device_group = kp.core.connect_devices(usb_port_ids=self.port_id) # print("Device group reconnected.") # except Exception as e: # print(f"Error during device reconnect: {e}") # self.device_group = None # Only set to None if reconnect fails # else: # print("Receive thread joined.") # print("Threads stopped.") def stop(self): """Improved stop method with better cleanup""" if self._stop_event.is_set(): return # Already stopping print("Stopping threads...") self._stop_event.set() # Clear queues to unblock threads while not self._input_queue.empty(): try: self._input_queue.get_nowait() except queue.Empty: break # Signal send thread to wake up self._input_queue.put(None) # Join threads with timeout for thread, name in [(self._send_thread, "Send"), (self._receive_thread, "Receive")]: if thread and thread.is_alive(): thread.join(timeout=2.0) if thread.is_alive(): print(f"Warning: {name} thread didn't stop cleanly") def put_input(self, image: Union[str, np.ndarray], format: str, target_size: Tuple[int, int] = None): """ Put an image into the input queue with flexible preprocessing """ if isinstance(image, str): image_data = cv2.imread(image) if image_data is None: raise FileNotFoundError(f"Image file not found at {image}") if target_size: image_data = cv2.resize(image_data, target_size) elif isinstance(image, np.ndarray): # Don't modify original array, make copy if needed image_data = image.copy() if target_size is None else cv2.resize(image, target_size) else: raise ValueError("Image must be a file path (str) or a numpy array (ndarray).") if format in self._FORMAT_MAPPING: image_format_enum = self._FORMAT_MAPPING[format] else: raise ValueError(f"Unsupported format: {format}") self._input_queue.put((image_data, image_format_enum)) def get_output(self, timeout: float = None): """ Get the next received data from the output queue. This method is non-blocking by default unless a timeout is specified. :param timeout: Time in seconds to wait for data. If None, it's non-blocking. :return: Received data (e.g., kp.GenericInferenceOutputDescriptor) or None if no data available within timeout. """ try: return self._output_queue.get(block=timeout is not None, timeout=timeout) except queue.Empty: return None def __del__(self): """Ensure resources are released when the object is garbage collected.""" self.stop() if self.device_group: try: kp.core.disconnect_devices(device_group=self.device_group) print("Device group disconnected in destructor.") except Exception as e: print(f"Error disconnecting device group in destructor: {e}") def postprocess(raw_model_output: list) -> float: """ Post-processes the raw model output. Assumes the model output is a list/array where the first element is the desired probability. """ if raw_model_output and len(raw_model_output) > 0: probability = raw_model_output[0] return float(probability) return 0.0 # Default or error value class WebcamInferenceRunner: def __init__(self, multidongle: MultiDongle, image_format: str = 'BGR565'): self.multidongle = multidongle self.image_format = image_format self.latest_probability = 0.0 self.result_str = "No Fire" # Statistics tracking self.processed_inference_count = 0 self.inference_fps_start_time = None self.display_fps_start_time = None self.display_frame_counter = 0 def run(self, camera_id: int = 0): cap = cv2.VideoCapture(camera_id) if not cap.isOpened(): raise RuntimeError("Cannot open webcam") try: while True: ret, frame = cap.read() if not ret: break # Track display FPS if self.display_fps_start_time is None: self.display_fps_start_time = time.time() self.display_frame_counter += 1 # Preprocess and send frame processed_frame = self.multidongle.preprocess_frame(frame, self.image_format) self.multidongle.put_input(processed_frame, self.image_format) # Get inference result prob, result = self.multidongle.get_latest_inference_result() if prob is not None: # Track inference FPS if self.inference_fps_start_time is None: self.inference_fps_start_time = time.time() self.processed_inference_count += 1 self.latest_probability = prob self.result_str = result # Display frame with results self._display_results(frame) if cv2.waitKey(1) & 0xFF == ord('q'): break finally: # self._print_statistics() cap.release() cv2.destroyAllWindows() def _display_results(self, frame): display_frame = frame.copy() text_color = (0, 255, 0) if "Fire" in self.result_str else (0, 0, 255) # Display inference result cv2.putText(display_frame, f"{self.result_str} (Prob: {self.latest_probability:.2f})", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, text_color, 2) # Calculate and display inference FPS if self.inference_fps_start_time and self.processed_inference_count > 0: elapsed_time = time.time() - self.inference_fps_start_time if elapsed_time > 0: inference_fps = self.processed_inference_count / elapsed_time cv2.putText(display_frame, f"Inference FPS: {inference_fps:.2f}", (10, 60), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 255), 2) cv2.imshow('Fire Detection', display_frame) # def _print_statistics(self): # """Print final statistics""" # print(f"\n--- Summary ---") # print(f"Total inferences processed: {self.processed_inference_count}") # if self.inference_fps_start_time and self.processed_inference_count > 0: # elapsed = time.time() - self.inference_fps_start_time # if elapsed > 0: # avg_inference_fps = self.processed_inference_count / elapsed # print(f"Average Inference FPS: {avg_inference_fps:.2f}") # if self.display_fps_start_time and self.display_frame_counter > 0: # elapsed = time.time() - self.display_fps_start_time # if elapsed > 0: # avg_display_fps = self.display_frame_counter / elapsed # print(f"Average Display FPS: {avg_display_fps:.2f}") if __name__ == "_main_": PORT_IDS = [28, 32] SCPU_FW = r'fw_scpu.bin' NCPU_FW = r'fw_ncpu.bin' MODEL_PATH = r'fire_detection_520.nef' try: # Initialize inference engine print("Initializing MultiDongle...") multidongle = MultiDongle(PORT_IDS, SCPU_FW, NCPU_FW, MODEL_PATH, upload_fw=True) multidongle.initialize() multidongle.start() # Run using the new runner class print("Starting webcam inference...") runner = WebcamInferenceRunner(multidongle, 'BGR565') runner.run() except Exception as e: print(f"Error: {e}") import traceback traceback.print_exc() finally: if 'multidongle' in locals(): multidongle.stop()