import multiprocessing import time import os import sys import cv2 import numpy as np import math # --- Import Kneron Specific Libraries and Utilities --- # Assuming your Kneron SDK and example files are set up such that these imports work. # You might need to adjust sys.path or your project structure. try: # Attempt to import the core Kneron library import kp print("Kneron SDK (kp) imported successfully.") # Attempt to import utilities from your specific example files # Adjust these import paths based on where your files are located relative to this script # from utils.ExampleHelper import get_device_usb_speed_by_port_id # Assuming this is in utils # from utils.ExamplePostProcess import post_process_yolo_v5 # Assuming this is in utils # Import from your provided files directly or ensure they are in Python path # Placeholder imports - **YOU MUST ENSURE THESE ACTUALLY WORK** # Depending on your setup, you might need to copy the functions directly or fix paths. try: # Assuming these are in your utils or directly importable from utils.ExampleHelper import get_device_usb_speed_by_port_id from utils.ExamplePostProcess import post_process_yolo_v5 # Based on snippets from your files def get_palette(mapping, seed=9487): print("Using get_palette from snippet.") np.random.seed(seed) return [list(np.random.choice(range(256), size=3)) for _ in range(mapping)] # Based on snippet from your files - ensure dtype is correct (np.uint8 or np.int8) def convert_numpy_to_rgba_and_width_align_4(data): print("Using convert_numpy_to_rgba_and_width_align_4 from snippet.") height, width, channel = data.shape width_aligned = 4 * math.ceil(width / 4.0) # Use np.uint8 for image data conversion usually aligned_data = np.zeros((height, width_aligned, 4), dtype=np.uint8) aligned_data[:height, :width, :channel] = data aligned_data = aligned_data.flatten() # Flatten as shown in snippet return aligned_data.tobytes() # Based on snippet from your files (adapted to take device_group or device) # It seems inference calls might take a single device object from the group. # Let's assume retrieve_inference_node_output needs the raw_result, not device. def retrieve_inference_node_output(generic_raw_result): print('[Retrieve Inference Node Output ]') inf_node_output_list = [] for node_idx in range(generic_raw_result.header.num_output_node): inference_float_node_output = kp.inference.generic_inference_retrieve_float_node( node_idx=node_idx, generic_raw_result=generic_raw_result, channels_ordering=kp.ChannelOrdering.KP_CHANNEL_ORDERING_CHW) # Use actual kp enum inf_node_output_list.append(inference_float_node_output) print(' - Success') return inf_node_output_list print("Kneron utility functions imported/defined from snippets.") except ImportError as e: print(f"Error importing Kneron utility modules (e.g., utils.ExampleHelper): {e}") print("Please ensure the 'utils' directory is in your Python path or copy the necessary functions.") raise # Re-raise the error to indicate missing dependencies except ImportError as e: print(f"Error importing Kneron SDK (kp): {e}") print("Please ensure Kneron SDK is installed and in your Python path.") print("Cannot run Kneron pipeline without the SDK.") sys.exit("Kneron SDK not found.") # --- Worker Functions --- def yolo_worker(input_queue: multiprocessing.Queue, output_queue: multiprocessing.Queue, firmware_path: str, model_path: str, port_id: int): """ YOLOv5 processing layer worker. Initializes Kneron device and model using kp.core. Reads image data, performs YOLO inference, and passes the original image data to the next layer's queue. """ device_group = None model_yolo_descriptor = None device = None # Will get the specific device object from the group print("YOLO Worker: Starting and initializing Kneron device using kp.core...") try: # --- Device and Model Initialization (per process) --- print(f"YOLO Worker: Connecting to device on port {port_id}") # Use kp.core.connect_devices device_group = kp.core.connect_devices(usb_port_ids=[port_id]) if not device_group or not device_group.devices: raise RuntimeError(f"YOLO Worker: Failed to connect to device on port {port_id}") # Get the specific device object from the group (assuming single device per worker) # device = device_group.devices[0] print(f"YOLO Worker: Device connected") print("YOLO Worker: Loading firmware") # Firmware loading seems to be a method on the device object # device.load_firmware_from_file(firmware_path) print("YOLO Worker: Loading YOLO model using kp.core") # Use kp.core.load_model_from_file with the device_group model_yolo_descriptor = kp.core.load_model_from_file( device_group=device_group, file_path=model_path ) if not model_yolo_descriptor: raise RuntimeError(f"YOLO Worker: Failed to load YOLO model from {model_path}") print("YOLO Worker: Initialization complete. Waiting for data.") # Optional: Check USB speed if needed, using the imported utility # usb_speed = get_device_usb_speed_by_port_id(port_id) # This utility might need adaptation or just be illustrative # print(f"YOLO Worker: Device USB Speed: {usb_speed}") # This utility might need adaptation # Set inference feature if required (e.g., for image format) # Based on examples, sometimes necessary before inference try: # Example, check your original code for required features # device.set_feature(kp.InferenceFeature.INF_FEATURE_IMAGE_FORMAT, kp.ImageFormat.IMAGE_FORMAT_RGBA) pass # Add relevant set_feature calls from your original code if needed except Exception as set_feature_e: print(f"YOLO Worker: Error setting inference features: {set_feature_e}") # Decide if this is a critical error or warning # --------------------------------------- while True: # Get image data from the input queue data_item = input_queue.get() if data_item is None: print("YOLO Worker: Received termination signal. Propagating None to STDC queue.") output_queue.put(None) # Propagate the signal break # Exit the worker loop # Assuming data_item is the image numpy array image_data = data_item # print("YOLO Worker: Received image data for processing.") # Too verbose for loop # --- Perform YOLO Inference --- img_height, img_width, _ = image_data.shape inference_input_size = (img_width, img_height) # Kneron expects (width, height) # Convert image data format for Kneron inference using the utility aligned_image_data = convert_numpy_to_rgba_and_width_align_4(image_data) # Send image to device and get raw results try: # Use kp.inference with the specific device object generic_raw_result = kp.inference.generic_inference_send_image( device=device, # Use the device object from the group data=aligned_image_data, size=inference_input_size ) if not generic_raw_result: print("YOLO Worker: Warning - generic_inference_send_image returned None.") continue # Skip post-processing if raw result is none # Retrieve raw node outputs using the utility # retrieve_inference_node_output utility likely takes the raw_result inf_node_output_list = retrieve_inference_node_output(generic_raw_result) # Perform YOLO specific post-processing using the utility yolo_results = post_process_yolo_v5( inference_float_node_list=inf_node_output_list, hardware_preproc_info=generic_raw_result.header.hw_pre_proc_info_list[0], thresh_value=0.2 # Example threshold, adjust as needed ) # print(f"YOLO Worker: Detected {len(yolo_results.box_list)} objects.") # Too verbose # Pass the *original image data* to the next layer (STDC) # STDC will perform segmentation on the whole image. output_queue.put(image_data) # print("YOLO Worker: Finished inference, put image data to STDC queue.") # Too verbose except Exception as inference_e: print(f"YOLO Worker Inference Error: {inference_e}") # Handle inference errors - maybe put an error marker in the queue? # For simplicity in FPS, we just skip this frame or let it potentially raise further pass # Continue processing next item print("YOLO Worker: Exiting loop.") except Exception as e: print(f"YOLO Worker Initialization or Runtime Error: {e}") finally: # --- Device Disconnection --- # Disconnect the device group if device_group: print("YOLO Worker: Disconnecting device group.") kp.core.disconnect_devices(device_group=device_group) print("YOLO Worker: Exiting.") def stdc_worker(input_queue: multiprocessing.Queue, output_queue: multiprocessing.Queue, firmware_path: str, model_path: str, port_id: int): """ STDC processing layer worker. Initializes Kneron device and model using kp.core. Reads image data, performs STDC inference, and puts a completion marker into the final output queue. """ device_group = None model_stdc_descriptor = None device = None # Will get the specific device object from the group print("STDC Worker: Starting and initializing Kneron device using kp.core...") try: # --- Device and Model Initialization (per process) --- # STDC worker also needs its own device connection and model print(f"STDC Worker: Connecting to device on port {port_id}") # Use kp.core.connect_devices device_group = kp.core.connect_devices(usb_port_ids=[port_id]) if not device_group or not device_group.devices: raise RuntimeError(f"STDC Worker: Failed to connect to device on port {port_id}") # Get the specific device object from the group (assuming single device per worker) # device = device_group.devices[0] print(f"STDC Worker: Device connected") # print("STDC Worker: Loading firmware") # Firmware loading seems to be a method on the device object # device.load_firmware_from_file(firmware_path) print("STDC Worker: Loading STDC model using kp.core") # Use kp.core.load_model_from_file with the device_group model_stdc_descriptor = kp.core.load_model_from_file( device_group=device_group, file_path=model_path ) if not model_stdc_descriptor: raise RuntimeError(f"STDC Worker: Failed to load STDC model from {model_path}") print("STDC Worker: Initialization complete. Waiting for data.") # Optional: Check USB speed if needed # usb_speed = get_device_usb_speed_by_port_id(port_id) # This utility might need adaptation # print(f"STDC Worker: Device USB Speed: {usb_speed}") # This utility might need adaptation # Set inference feature if required (e.g., for image format) try: # Example, check your original code for required features # device.set_feature(kp.InferenceFeature.INF_FEATURE_IMAGE_FORMAT, kp.ImageFormat.IMAGE_FORMAT_RGBA) pass # Add relevant set_feature calls from your original code if needed except Exception as set_feature_e: print(f"STDC Worker: Error setting inference features: {set_feature_e}") # Decide if this is a critical error or warning # --------------------------------------- while True: # Get image data from the input queue (from YOLO worker) data_item = input_queue.get() if data_item is None: print("STDC Worker: Received termination signal. Putting None to final output queue and exiting.") output_queue.put(None) # Signal end of results to the main process break # Exit the worker loop # Assuming data_item is the image numpy array image_data = data_item # print("STDC Worker: Received image data for processing.") # Too verbose # --- Perform STDC Inference --- img_height, img_width, _ = image_data.shape inference_input_size = (img_width, img_height) # Kneron expects (width, height) # Convert image data format for Kneron inference using the utility aligned_image_data = convert_numpy_to_rgba_and_width_align_4(image_data) # Send image to device and get raw results try: # Use kp.inference with the specific device object generic_raw_result = kp.inference.generic_inference_send_image( device=device, # Use the device object from the group data=aligned_image_data, size=inference_input_size ) if not generic_raw_result: print("STDC Worker: Warning - generic_inference_send_image returned None.") continue # Skip post-processing if raw result is none # Retrieve raw node outputs using the utility # retrieve_inference_node_output utility likely takes the raw_result inf_node_output_list = retrieve_inference_node_output(generic_raw_result) # STDC Post-processing (extracting segmentation mask) # Based on your STDC example, the output is likely in the first node if inf_node_output_list: pred_raw = inf_node_output_list[0].ndarray.squeeze() # Shape might be (C, H, W) # Transpose to (H, W, C) if needed for further visualization/processing # pred_transposed = pred_raw.transpose(1, 2, 0) # (H, W, C) # Example: Get the argmax mask (most likely class per pixel) # Assuming pred_raw is shaped (C, H, W) after squeeze() # pred_argmax = np.argmax(pred_raw, axis=0) # Shape (H, W) # For FPS, a simple signal per frame is fine: output_queue.put("STDC_Frame_Done") # If you needed the mask: output_queue.put(pred_argmax.astype(np.uint8)) # print("STDC Worker: Finished segmentation inference, put result to final output queue.") # Too verbose else: print("STDC Worker: Warning - No output nodes retrieved.") output_queue.put("STDC_Frame_Error") # Signal processing error for this frame except Exception as inference_e: print(f"STDC Worker Inference Error: {inference_e}") # Handle inference errors output_queue.put("STDC_Frame_Error") # Signal processing error for this frame print("STDC Worker: Exiting loop.") except Exception as e: print(f"STDC Worker Initialization or Runtime Error: {e}") finally: # --- Device Disconnection --- # Disconnect the device group if device_group: print("STDC Worker: Disconnecting device group.") kp.core.disconnect_devices(device_group=device_group) print("STDC Worker: Exiting.") # --- API Function to Run the Pipeline --- def run_yolo_stdc_pipeline(image_file_path: str, firmware_path: str, yolo_model_path: str, stdc_model_path: str, loop_count: int = 100, port_id: int = 0): """ Runs the YOLOv5 + STDC pipeline using multiprocessing.Queue. Initializes Kneron devices and models within worker processes using kp.core. Processes the same image 'loop_count' times and calculates FPS. Args: image_file_path (str): Path to the input image file (e.g., .bmp). firmware_path (str): Path to the Kneron firmware file (.bin). yolo_model_path (str): Path to the YOLOv5 model file (.nef). stdc_model_path (str): Path to the STDC model file (.nef). loop_count (int): Number of times to process the image through the pipeline. port_id (int): Kneron device port ID to connect to. Returns: float: Calculated FPS for processing 'loop_count' frames. """ # Read the input image ONCE print(f"Main: Reading input image from {image_file_path}") image_data = cv2.imread(image_file_path) if image_data is None: print(f"Error: Could not read image from {image_file_path}") return 0.0 print(f"Main: Image read successfully. Shape: {image_data.shape}") # Define queues for inter-process communication yolo_input_q = multiprocessing.Queue() # Main process puts image data -> YOLO worker reads stdc_input_q = multiprocessing.Queue() # YOLO worker puts image data -> STDC worker reads stdc_output_q = multiprocessing.Queue() # STDC worker puts results/markers -> Main process reads # Create worker processes yolo_process = multiprocessing.Process( target=yolo_worker, args=(yolo_input_q, stdc_input_q, firmware_path, yolo_model_path, port_id) ) stdc_process = multiprocessing.Process( target=stdc_worker, args=(stdc_input_q, stdc_output_q, firmware_path, stdc_model_path, port_id) ) # Start the worker processes print("Main: Starting YOLO and STDC worker processes...") yolo_process.start() stdc_process.start() print("Main: Worker processes started.") # Wait briefly for processes to initialize Kneron devices and load models # This is a heuristic; a more robust method involves workers signaling readiness. # Given the complexity of Kneron init, 5-10 seconds might be reasonable, adjust as needed. initialization_wait_time = 10 # seconds print(f"Main: Waiting {initialization_wait_time}s for workers to initialize devices and models.") time.sleep(initialization_wait_time) print("Main: Finished initialization waiting period.") print(f"Main: Putting the same image into YOLO input queue {loop_count} times...") start_time = time.time() # Start timing the loop # Put the same image data into the input queue 'loop_count' times for i in range(loop_count): yolo_input_q.put(image_data) # print(f"Main: Queued image {i+1}/{loop_count}") # Optional: print progress print(f"Main: Finished queuing {loop_count} images. Sending termination signal to YOLO worker.") # Send termination signal to the first worker's input queue yolo_input_q.put(None) # Collect results/completion markers from the final output queue print("Main: Collecting results from STDC output queue...") processed_frame_count = 0 # collected_results = [] # Uncomment if you put actual results in the queue while processed_frame_count < loop_count: # Collect exactly 'loop_count' valid results/markers # Use a timeout in get() to avoid hanging indefinitely if a worker fails try: # Adjust timeout based on expected processing time per frame result = stdc_output_q.get(timeout=60) # Example timeout: 60 seconds per result if result is None: # Received None prematurely? This shouldn't happen if workers are correct # and we are waiting for loop_count items before checking for None. print("Main: Warning - Received None from STDC output queue before collecting all frames.") break # Exit collection loop if unexpected None if result == "STDC_Frame_Done": processed_frame_count += 1 # print(f"Main: Collected completion marker for frame {processed_frame_count}") # Optional elif result == "STDC_Frame_Error": processed_frame_count += 1 # Count it as a processed frame, albeit with error print(f"Main: Collected error marker for a frame ({processed_frame_count}).") # elif isinstance(result, np.ndarray): # If you put the actual mask (e.g., uint8) # collected_results.append(result) # processed_frame_count += 1 # # print(f"Main: Collected segmentation mask for frame {processed_frame_count}") # Optional else: print(f"Main: Warning - Received unexpected item in STDC output queue: {result}") except multiprocessing.queues.Empty: print(f"Main: Timeout ({60}s) while waiting for results from STDC output queue. {processed_frame_count}/{loop_count} frames processed.") # Decide how to handle this - maybe terminate workers and exit? break # Exit collection loop on timeout except Exception as e: print(f"Main: Error collecting result: {e}") break # Exit collection loop on other errors end_time = time.time() # Stop timing print(f"Main: Collected {processed_frame_count} results/markers.") # Now wait for the final None signal after collecting all expected results # This ensures queues are flushed and workers are terminating cleanly. print("Main: Waiting for final termination signal from STDC output queue...") try: final_signal = stdc_output_q.get(timeout=10) # Short timeout for the final None if final_signal is None: print("Main: Received final termination signal from STDC output queue.") else: print(f"Main: Warning - Expected final None, but received: {final_signal}") except multiprocessing.queues.Empty: print("Main: Timeout while waiting for final None from STDC output queue.") except Exception as e: print(f"Main: Error getting final signal: {e}") # Wait for the worker processes to fully complete print("Main: Joining worker processes...") yolo_process.join(timeout=30) # Add timeout for joining if yolo_process.is_alive(): print("Main: YOLO process did not terminate gracefully within timeout. Terminating.") yolo_process.terminate() print("Main: YOLO process joined.") stdc_process.join(timeout=30) # Add timeout for joining if stdc_process.is_alive(): print("Main: STDC process did not terminate gracefully within timeout. Terminating.") stdc_process.terminate() print("Main: STDC process joined.") print("Main: All processes joined.") # Calculate FPS duration = end_time - start_time if duration > 0 and processed_frame_count > 0: fps = processed_frame_count / duration print(f"\n--- Pipeline Performance ---") print(f"Processed {processed_frame_count} frames in {duration:.4f} seconds.") print(f"Calculated FPS: {fps:.2f}") else: fps = 0.0 print("Could not calculate FPS (duration is zero or no frames processed).") # print("\nCollected STDC Results (Markers or Data):") # print(collected_results) # If you collected actual results return fps # --- Example Usage --- if __name__ == '__main__': # Required for multiprocessing on Windows multiprocessing.freeze_support() # --- CONFIGURE YOUR FILE PATHS HERE --- # !! IMPORTANT !! Replace these placeholder paths with your actual file locations. ACTUAL_FIRMWARE_PATH = "path/to/your/KL720.bin" # e.g., "C:/Kneron_SDK/firmware/KL720/KL720.bin" ACTUAL_YOLO_MODEL_PATH = "path/to/your/yolov5_model.nef" # e.g., "C:/Kneron_SDK/models/KL720/yolov5/yolov5.nef" ACTUAL_STDC_MODEL_PATH = "path/to/your/stdc_model.nef" # e.g., "C:/Kneron_SDK/models/KL720/stdc/stdc.nef" ACTUAL_IMAGE_FILE_PATH = "path/to/your/input_image.bmp" # e.g., "C:/Kneron_SDK/images/people_talk_in_street_1500x1500.bmp" # Check if the placeholder paths are still being used paths_configured = not ("path/to/your/" in ACTUAL_FIRMWARE_PATH or "path/to/your/" in ACTUAL_YOLO_MODEL_PATH or "path/to/your/" in ACTUAL_STDC_MODEL_PATH or "path/to/your/" in ACTUAL_IMAGE_FILE_PATH) if not paths_configured: print("\n===================================================================") print("!!! WARNING: Please update the file paths in the script before running. !!!") print("===================================================================") else: print("\n--- Running YOLOv5 + STDC Pipeline ---") try: final_fps = run_yolo_stdc_pipeline( image_file_path=ACTUAL_IMAGE_FILE_PATH, firmware_path=ACTUAL_FIRMWARE_PATH, yolo_model_path=ACTUAL_YOLO_MODEL_PATH, stdc_model_path=ACTUAL_STDC_MODEL_PATH, loop_count=100, port_id=0 # Change if your device is on a different port ) print(f"\nAPI Function Call Complete. Final FPS: {final_fps:.2f}") except Exception as main_e: print(f"\nAn error occurred during the main pipeline execution: {main_e}")