cluster4npu/test_pipeline.py
2025-05-29 15:27:12 +08:00

534 lines
25 KiB
Python

import multiprocessing
import time
import os
import sys
import cv2
import numpy as np
import math
# --- Import Kneron Specific Libraries and Utilities ---
# Assuming your Kneron SDK and example files are set up such that these imports work.
# You might need to adjust sys.path or your project structure.
try:
# Attempt to import the core Kneron library
import kp
print("Kneron SDK (kp) imported successfully.")
# Attempt to import utilities from your specific example files
# Adjust these import paths based on where your files are located relative to this script
# from utils.ExampleHelper import get_device_usb_speed_by_port_id # Assuming this is in utils
# from utils.ExamplePostProcess import post_process_yolo_v5 # Assuming this is in utils
# Import from your provided files directly or ensure they are in Python path
# Placeholder imports - **YOU MUST ENSURE THESE ACTUALLY WORK**
# Depending on your setup, you might need to copy the functions directly or fix paths.
try:
# Assuming these are in your utils or directly importable
from utils.ExampleHelper import get_device_usb_speed_by_port_id
from utils.ExamplePostProcess import post_process_yolo_v5
# Based on snippets from your files
def get_palette(mapping, seed=9487):
print("Using get_palette from snippet.")
np.random.seed(seed)
return [list(np.random.choice(range(256), size=3))
for _ in range(mapping)]
# Based on snippet from your files - ensure dtype is correct (np.uint8 or np.int8)
def convert_numpy_to_rgba_and_width_align_4(data):
print("Using convert_numpy_to_rgba_and_width_align_4 from snippet.")
height, width, channel = data.shape
width_aligned = 4 * math.ceil(width / 4.0)
# Use np.uint8 for image data conversion usually
aligned_data = np.zeros((height, width_aligned, 4), dtype=np.uint8)
aligned_data[:height, :width, :channel] = data
aligned_data = aligned_data.flatten() # Flatten as shown in snippet
return aligned_data.tobytes()
# Based on snippet from your files (adapted to take device_group or device)
# It seems inference calls might take a single device object from the group.
# Let's assume retrieve_inference_node_output needs the raw_result, not device.
def retrieve_inference_node_output(generic_raw_result):
print('[Retrieve Inference Node Output ]')
inf_node_output_list = []
for node_idx in range(generic_raw_result.header.num_output_node):
inference_float_node_output = kp.inference.generic_inference_retrieve_float_node(
node_idx=node_idx,
generic_raw_result=generic_raw_result,
channels_ordering=kp.ChannelOrdering.KP_CHANNEL_ORDERING_CHW) # Use actual kp enum
inf_node_output_list.append(inference_float_node_output)
print(' - Success')
return inf_node_output_list
print("Kneron utility functions imported/defined from snippets.")
except ImportError as e:
print(f"Error importing Kneron utility modules (e.g., utils.ExampleHelper): {e}")
print("Please ensure the 'utils' directory is in your Python path or copy the necessary functions.")
raise # Re-raise the error to indicate missing dependencies
except ImportError as e:
print(f"Error importing Kneron SDK (kp): {e}")
print("Please ensure Kneron SDK is installed and in your Python path.")
print("Cannot run Kneron pipeline without the SDK.")
sys.exit("Kneron SDK not found.")
# --- Worker Functions ---
def yolo_worker(input_queue: multiprocessing.Queue, output_queue: multiprocessing.Queue,
firmware_path: str, model_path: str, port_id: int):
"""
YOLOv5 processing layer worker. Initializes Kneron device and model using kp.core.
Reads image data, performs YOLO inference, and passes the original image data
to the next layer's queue.
"""
device_group = None
model_yolo_descriptor = None
device = None # Will get the specific device object from the group
print("YOLO Worker: Starting and initializing Kneron device using kp.core...")
try:
# --- Device and Model Initialization (per process) ---
print(f"YOLO Worker: Connecting to device on port {port_id}")
# Use kp.core.connect_devices
device_group = kp.core.connect_devices(usb_port_ids=[port_id])
if not device_group or not device_group.devices:
raise RuntimeError(f"YOLO Worker: Failed to connect to device on port {port_id}")
# Get the specific device object from the group (assuming single device per worker)
# device = device_group.devices[0]
print(f"YOLO Worker: Device connected")
print("YOLO Worker: Loading firmware")
# Firmware loading seems to be a method on the device object
# device.load_firmware_from_file(firmware_path)
print("YOLO Worker: Loading YOLO model using kp.core")
# Use kp.core.load_model_from_file with the device_group
model_yolo_descriptor = kp.core.load_model_from_file(
device_group=device_group,
file_path=model_path
)
if not model_yolo_descriptor:
raise RuntimeError(f"YOLO Worker: Failed to load YOLO model from {model_path}")
print("YOLO Worker: Initialization complete. Waiting for data.")
# Optional: Check USB speed if needed, using the imported utility
# usb_speed = get_device_usb_speed_by_port_id(port_id) # This utility might need adaptation or just be illustrative
# print(f"YOLO Worker: Device USB Speed: {usb_speed}") # This utility might need adaptation
# Set inference feature if required (e.g., for image format)
# Based on examples, sometimes necessary before inference
try:
# Example, check your original code for required features
# device.set_feature(kp.InferenceFeature.INF_FEATURE_IMAGE_FORMAT, kp.ImageFormat.IMAGE_FORMAT_RGBA)
pass # Add relevant set_feature calls from your original code if needed
except Exception as set_feature_e:
print(f"YOLO Worker: Error setting inference features: {set_feature_e}")
# Decide if this is a critical error or warning
# ---------------------------------------
while True:
# Get image data from the input queue
data_item = input_queue.get()
if data_item is None:
print("YOLO Worker: Received termination signal. Propagating None to STDC queue.")
output_queue.put(None) # Propagate the signal
break # Exit the worker loop
# Assuming data_item is the image numpy array
image_data = data_item
# print("YOLO Worker: Received image data for processing.") # Too verbose for loop
# --- Perform YOLO Inference ---
img_height, img_width, _ = image_data.shape
inference_input_size = (img_width, img_height) # Kneron expects (width, height)
# Convert image data format for Kneron inference using the utility
aligned_image_data = convert_numpy_to_rgba_and_width_align_4(image_data)
# Send image to device and get raw results
try:
# Use kp.inference with the specific device object
generic_raw_result = kp.inference.generic_inference_send_image(
device=device, # Use the device object from the group
data=aligned_image_data,
size=inference_input_size
)
if not generic_raw_result:
print("YOLO Worker: Warning - generic_inference_send_image returned None.")
continue # Skip post-processing if raw result is none
# Retrieve raw node outputs using the utility
# retrieve_inference_node_output utility likely takes the raw_result
inf_node_output_list = retrieve_inference_node_output(generic_raw_result)
# Perform YOLO specific post-processing using the utility
yolo_results = post_process_yolo_v5(
inference_float_node_list=inf_node_output_list,
hardware_preproc_info=generic_raw_result.header.hw_pre_proc_info_list[0],
thresh_value=0.2 # Example threshold, adjust as needed
)
# print(f"YOLO Worker: Detected {len(yolo_results.box_list)} objects.") # Too verbose
# Pass the *original image data* to the next layer (STDC)
# STDC will perform segmentation on the whole image.
output_queue.put(image_data)
# print("YOLO Worker: Finished inference, put image data to STDC queue.") # Too verbose
except Exception as inference_e:
print(f"YOLO Worker Inference Error: {inference_e}")
# Handle inference errors - maybe put an error marker in the queue?
# For simplicity in FPS, we just skip this frame or let it potentially raise further
pass # Continue processing next item
print("YOLO Worker: Exiting loop.")
except Exception as e:
print(f"YOLO Worker Initialization or Runtime Error: {e}")
finally:
# --- Device Disconnection ---
# Disconnect the device group
if device_group:
print("YOLO Worker: Disconnecting device group.")
kp.core.disconnect_devices(device_group=device_group)
print("YOLO Worker: Exiting.")
def stdc_worker(input_queue: multiprocessing.Queue, output_queue: multiprocessing.Queue,
firmware_path: str, model_path: str, port_id: int):
"""
STDC processing layer worker. Initializes Kneron device and model using kp.core.
Reads image data, performs STDC inference, and puts a completion marker
into the final output queue.
"""
device_group = None
model_stdc_descriptor = None
device = None # Will get the specific device object from the group
print("STDC Worker: Starting and initializing Kneron device using kp.core...")
try:
# --- Device and Model Initialization (per process) ---
# STDC worker also needs its own device connection and model
print(f"STDC Worker: Connecting to device on port {port_id}")
# Use kp.core.connect_devices
device_group = kp.core.connect_devices(usb_port_ids=[port_id])
if not device_group or not device_group.devices:
raise RuntimeError(f"STDC Worker: Failed to connect to device on port {port_id}")
# Get the specific device object from the group (assuming single device per worker)
# device = device_group.devices[0]
print(f"STDC Worker: Device connected")
# print("STDC Worker: Loading firmware")
# Firmware loading seems to be a method on the device object
# device.load_firmware_from_file(firmware_path)
print("STDC Worker: Loading STDC model using kp.core")
# Use kp.core.load_model_from_file with the device_group
model_stdc_descriptor = kp.core.load_model_from_file(
device_group=device_group,
file_path=model_path
)
if not model_stdc_descriptor:
raise RuntimeError(f"STDC Worker: Failed to load STDC model from {model_path}")
print("STDC Worker: Initialization complete. Waiting for data.")
# Optional: Check USB speed if needed
# usb_speed = get_device_usb_speed_by_port_id(port_id) # This utility might need adaptation
# print(f"STDC Worker: Device USB Speed: {usb_speed}") # This utility might need adaptation
# Set inference feature if required (e.g., for image format)
try:
# Example, check your original code for required features
# device.set_feature(kp.InferenceFeature.INF_FEATURE_IMAGE_FORMAT, kp.ImageFormat.IMAGE_FORMAT_RGBA)
pass # Add relevant set_feature calls from your original code if needed
except Exception as set_feature_e:
print(f"STDC Worker: Error setting inference features: {set_feature_e}")
# Decide if this is a critical error or warning
# ---------------------------------------
while True:
# Get image data from the input queue (from YOLO worker)
data_item = input_queue.get()
if data_item is None:
print("STDC Worker: Received termination signal. Putting None to final output queue and exiting.")
output_queue.put(None) # Signal end of results to the main process
break # Exit the worker loop
# Assuming data_item is the image numpy array
image_data = data_item
# print("STDC Worker: Received image data for processing.") # Too verbose
# --- Perform STDC Inference ---
img_height, img_width, _ = image_data.shape
inference_input_size = (img_width, img_height) # Kneron expects (width, height)
# Convert image data format for Kneron inference using the utility
aligned_image_data = convert_numpy_to_rgba_and_width_align_4(image_data)
# Send image to device and get raw results
try:
# Use kp.inference with the specific device object
generic_raw_result = kp.inference.generic_inference_send_image(
device=device, # Use the device object from the group
data=aligned_image_data,
size=inference_input_size
)
if not generic_raw_result:
print("STDC Worker: Warning - generic_inference_send_image returned None.")
continue # Skip post-processing if raw result is none
# Retrieve raw node outputs using the utility
# retrieve_inference_node_output utility likely takes the raw_result
inf_node_output_list = retrieve_inference_node_output(generic_raw_result)
# STDC Post-processing (extracting segmentation mask)
# Based on your STDC example, the output is likely in the first node
if inf_node_output_list:
pred_raw = inf_node_output_list[0].ndarray.squeeze() # Shape might be (C, H, W)
# Transpose to (H, W, C) if needed for further visualization/processing
# pred_transposed = pred_raw.transpose(1, 2, 0) # (H, W, C)
# Example: Get the argmax mask (most likely class per pixel)
# Assuming pred_raw is shaped (C, H, W) after squeeze()
# pred_argmax = np.argmax(pred_raw, axis=0) # Shape (H, W)
# For FPS, a simple signal per frame is fine:
output_queue.put("STDC_Frame_Done")
# If you needed the mask: output_queue.put(pred_argmax.astype(np.uint8))
# print("STDC Worker: Finished segmentation inference, put result to final output queue.") # Too verbose
else:
print("STDC Worker: Warning - No output nodes retrieved.")
output_queue.put("STDC_Frame_Error") # Signal processing error for this frame
except Exception as inference_e:
print(f"STDC Worker Inference Error: {inference_e}")
# Handle inference errors
output_queue.put("STDC_Frame_Error") # Signal processing error for this frame
print("STDC Worker: Exiting loop.")
except Exception as e:
print(f"STDC Worker Initialization or Runtime Error: {e}")
finally:
# --- Device Disconnection ---
# Disconnect the device group
if device_group:
print("STDC Worker: Disconnecting device group.")
kp.core.disconnect_devices(device_group=device_group)
print("STDC Worker: Exiting.")
# --- API Function to Run the Pipeline ---
def run_yolo_stdc_pipeline(image_file_path: str, firmware_path: str,
yolo_model_path: str, stdc_model_path: str,
loop_count: int = 100, port_id: int = 0):
"""
Runs the YOLOv5 + STDC pipeline using multiprocessing.Queue.
Initializes Kneron devices and models within worker processes using kp.core.
Processes the same image 'loop_count' times and calculates FPS.
Args:
image_file_path (str): Path to the input image file (e.g., .bmp).
firmware_path (str): Path to the Kneron firmware file (.bin).
yolo_model_path (str): Path to the YOLOv5 model file (.nef).
stdc_model_path (str): Path to the STDC model file (.nef).
loop_count (int): Number of times to process the image through the pipeline.
port_id (int): Kneron device port ID to connect to.
Returns:
float: Calculated FPS for processing 'loop_count' frames.
"""
# Read the input image ONCE
print(f"Main: Reading input image from {image_file_path}")
image_data = cv2.imread(image_file_path)
if image_data is None:
print(f"Error: Could not read image from {image_file_path}")
return 0.0
print(f"Main: Image read successfully. Shape: {image_data.shape}")
# Define queues for inter-process communication
yolo_input_q = multiprocessing.Queue() # Main process puts image data -> YOLO worker reads
stdc_input_q = multiprocessing.Queue() # YOLO worker puts image data -> STDC worker reads
stdc_output_q = multiprocessing.Queue() # STDC worker puts results/markers -> Main process reads
# Create worker processes
yolo_process = multiprocessing.Process(
target=yolo_worker,
args=(yolo_input_q, stdc_input_q, firmware_path, yolo_model_path, port_id)
)
stdc_process = multiprocessing.Process(
target=stdc_worker,
args=(stdc_input_q, stdc_output_q, firmware_path, stdc_model_path, port_id)
)
# Start the worker processes
print("Main: Starting YOLO and STDC worker processes...")
yolo_process.start()
stdc_process.start()
print("Main: Worker processes started.")
# Wait briefly for processes to initialize Kneron devices and load models
# This is a heuristic; a more robust method involves workers signaling readiness.
# Given the complexity of Kneron init, 5-10 seconds might be reasonable, adjust as needed.
initialization_wait_time = 10 # seconds
print(f"Main: Waiting {initialization_wait_time}s for workers to initialize devices and models.")
time.sleep(initialization_wait_time)
print("Main: Finished initialization waiting period.")
print(f"Main: Putting the same image into YOLO input queue {loop_count} times...")
start_time = time.time() # Start timing the loop
# Put the same image data into the input queue 'loop_count' times
for i in range(loop_count):
yolo_input_q.put(image_data)
# print(f"Main: Queued image {i+1}/{loop_count}") # Optional: print progress
print(f"Main: Finished queuing {loop_count} images. Sending termination signal to YOLO worker.")
# Send termination signal to the first worker's input queue
yolo_input_q.put(None)
# Collect results/completion markers from the final output queue
print("Main: Collecting results from STDC output queue...")
processed_frame_count = 0
# collected_results = [] # Uncomment if you put actual results in the queue
while processed_frame_count < loop_count: # Collect exactly 'loop_count' valid results/markers
# Use a timeout in get() to avoid hanging indefinitely if a worker fails
try:
# Adjust timeout based on expected processing time per frame
result = stdc_output_q.get(timeout=60) # Example timeout: 60 seconds per result
if result is None:
# Received None prematurely? This shouldn't happen if workers are correct
# and we are waiting for loop_count items before checking for None.
print("Main: Warning - Received None from STDC output queue before collecting all frames.")
break # Exit collection loop if unexpected None
if result == "STDC_Frame_Done":
processed_frame_count += 1
# print(f"Main: Collected completion marker for frame {processed_frame_count}") # Optional
elif result == "STDC_Frame_Error":
processed_frame_count += 1 # Count it as a processed frame, albeit with error
print(f"Main: Collected error marker for a frame ({processed_frame_count}).")
# elif isinstance(result, np.ndarray): # If you put the actual mask (e.g., uint8)
# collected_results.append(result)
# processed_frame_count += 1
# # print(f"Main: Collected segmentation mask for frame {processed_frame_count}") # Optional
else:
print(f"Main: Warning - Received unexpected item in STDC output queue: {result}")
except multiprocessing.queues.Empty:
print(f"Main: Timeout ({60}s) while waiting for results from STDC output queue. {processed_frame_count}/{loop_count} frames processed.")
# Decide how to handle this - maybe terminate workers and exit?
break # Exit collection loop on timeout
except Exception as e:
print(f"Main: Error collecting result: {e}")
break # Exit collection loop on other errors
end_time = time.time() # Stop timing
print(f"Main: Collected {processed_frame_count} results/markers.")
# Now wait for the final None signal after collecting all expected results
# This ensures queues are flushed and workers are terminating cleanly.
print("Main: Waiting for final termination signal from STDC output queue...")
try:
final_signal = stdc_output_q.get(timeout=10) # Short timeout for the final None
if final_signal is None:
print("Main: Received final termination signal from STDC output queue.")
else:
print(f"Main: Warning - Expected final None, but received: {final_signal}")
except multiprocessing.queues.Empty:
print("Main: Timeout while waiting for final None from STDC output queue.")
except Exception as e:
print(f"Main: Error getting final signal: {e}")
# Wait for the worker processes to fully complete
print("Main: Joining worker processes...")
yolo_process.join(timeout=30) # Add timeout for joining
if yolo_process.is_alive():
print("Main: YOLO process did not terminate gracefully within timeout. Terminating.")
yolo_process.terminate()
print("Main: YOLO process joined.")
stdc_process.join(timeout=30) # Add timeout for joining
if stdc_process.is_alive():
print("Main: STDC process did not terminate gracefully within timeout. Terminating.")
stdc_process.terminate()
print("Main: STDC process joined.")
print("Main: All processes joined.")
# Calculate FPS
duration = end_time - start_time
if duration > 0 and processed_frame_count > 0:
fps = processed_frame_count / duration
print(f"\n--- Pipeline Performance ---")
print(f"Processed {processed_frame_count} frames in {duration:.4f} seconds.")
print(f"Calculated FPS: {fps:.2f}")
else:
fps = 0.0
print("Could not calculate FPS (duration is zero or no frames processed).")
# print("\nCollected STDC Results (Markers or Data):")
# print(collected_results) # If you collected actual results
return fps
# --- Example Usage ---
if __name__ == '__main__':
# Required for multiprocessing on Windows
multiprocessing.freeze_support()
# --- CONFIGURE YOUR FILE PATHS HERE ---
# !! IMPORTANT !! Replace these placeholder paths with your actual file locations.
ACTUAL_FIRMWARE_PATH = "path/to/your/KL720.bin" # e.g., "C:/Kneron_SDK/firmware/KL720/KL720.bin"
ACTUAL_YOLO_MODEL_PATH = "path/to/your/yolov5_model.nef" # e.g., "C:/Kneron_SDK/models/KL720/yolov5/yolov5.nef"
ACTUAL_STDC_MODEL_PATH = "path/to/your/stdc_model.nef" # e.g., "C:/Kneron_SDK/models/KL720/stdc/stdc.nef"
ACTUAL_IMAGE_FILE_PATH = "path/to/your/input_image.bmp" # e.g., "C:/Kneron_SDK/images/people_talk_in_street_1500x1500.bmp"
# Check if the placeholder paths are still being used
paths_configured = not ("path/to/your/" in ACTUAL_FIRMWARE_PATH or
"path/to/your/" in ACTUAL_YOLO_MODEL_PATH or
"path/to/your/" in ACTUAL_STDC_MODEL_PATH or
"path/to/your/" in ACTUAL_IMAGE_FILE_PATH)
if not paths_configured:
print("\n===================================================================")
print("!!! WARNING: Please update the file paths in the script before running. !!!")
print("===================================================================")
else:
print("\n--- Running YOLOv5 + STDC Pipeline ---")
try:
final_fps = run_yolo_stdc_pipeline(
image_file_path=ACTUAL_IMAGE_FILE_PATH,
firmware_path=ACTUAL_FIRMWARE_PATH,
yolo_model_path=ACTUAL_YOLO_MODEL_PATH,
stdc_model_path=ACTUAL_STDC_MODEL_PATH,
loop_count=100,
port_id=0 # Change if your device is on a different port
)
print(f"\nAPI Function Call Complete. Final FPS: {final_fps:.2f}")
except Exception as main_e:
print(f"\nAn error occurred during the main pipeline execution: {main_e}")