test multidongle

This commit is contained in:
Masonmason 2025-05-29 15:27:12 +08:00
parent 2ed3d2cb49
commit 58f0dd75ac
10 changed files with 1868 additions and 1 deletions

View File

@ -0,0 +1,216 @@
# ******************************************************************************
# Copyright (c) 2021-2022. Kneron Inc. All rights reserved. *
# ******************************************************************************
from typing import Union
import os
import sys
import argparse
import time
import threading
import queue
import numpy as np
from utils.ExampleHelper import get_device_usb_speed_by_port_id
import kp
import cv2
PWD = os.path.dirname(os.path.abspath(__file__))
sys.path.insert(1, os.path.join(PWD, '..'))
SCPU_FW_PATH = os.path.join(PWD, '../../res/firmware/KL520/fw_scpu.bin')
NCPU_FW_PATH = os.path.join(PWD, '../../res/firmware/KL520/fw_ncpu.bin')
MODEL_FILE_PATH = os.path.join(PWD, '../../res/models/KL520/tiny_yolo_v3/models_520.nef')
IMAGE_FILE_PATH = os.path.join(PWD, '../../res/images/bike_cars_street_224x224.bmp')
LOOP_TIME = 100
def _image_send_function(_device_group: kp.DeviceGroup,
_loop_time: int,
_generic_inference_input_descriptor: kp.GenericImageInferenceDescriptor,
_image: Union[bytes, np.ndarray],
_image_format: kp.ImageFormat) -> None:
for _loop in range(_loop_time):
try:
_generic_inference_input_descriptor.inference_number = _loop
_generic_inference_input_descriptor.input_node_image_list = [kp.GenericInputNodeImage(
image=_image,
image_format=_image_format,
resize_mode=kp.ResizeMode.KP_RESIZE_ENABLE,
padding_mode=kp.PaddingMode.KP_PADDING_CORNER,
normalize_mode=kp.NormalizeMode.KP_NORMALIZE_KNERON
)]
kp.inference.generic_image_inference_send(device_group=device_group,
generic_inference_input_descriptor=_generic_inference_input_descriptor)
except kp.ApiKPException as exception:
print(' - Error: inference failed, error = {}'.format(exception))
exit(0)
def _result_receive_function(_device_group: kp.DeviceGroup,
_loop_time: int,
_result_queue: queue.Queue) -> None:
_generic_raw_result = None
for _loop in range(_loop_time):
try:
_generic_raw_result = kp.inference.generic_image_inference_receive(device_group=device_group)
if _generic_raw_result.header.inference_number != _loop:
print(' - Error: incorrect inference_number {} at frame {}'.format(
_generic_raw_result.header.inference_number, _loop))
print('.', end='', flush=True)
except kp.ApiKPException as exception:
print(' - Error: inference failed, error = {}'.format(exception))
exit(0)
_result_queue.put(_generic_raw_result)
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='KL520 Demo Generic Image Inference Multi-Thread Example.')
parser.add_argument('-p',
'--port_id',
help='Using specified port ID for connecting device (Default: port ID of first scanned Kneron '
'device)',
default=0,
type=int)
args = parser.parse_args()
usb_port_id = args.port_id
"""
check device USB speed (Recommend run KL520 at high speed)
"""
try:
if kp.UsbSpeed.KP_USB_SPEED_HIGH != get_device_usb_speed_by_port_id(usb_port_id=usb_port_id):
print('\033[91m' + '[Error] Device is not run at high speed.' + '\033[0m')
exit(0)
except Exception as exception:
print('Error: check device USB speed fail, port ID = \'{}\', error msg: [{}]'.format(usb_port_id,
str(exception)))
exit(0)
"""
connect the device
"""
try:
print('[Connect Device]')
device_group = kp.core.connect_devices(usb_port_ids=[usb_port_id])
print(' - Success')
except kp.ApiKPException as exception:
print('Error: connect device fail, port ID = \'{}\', error msg: [{}]'.format(usb_port_id,
str(exception)))
exit(0)
"""
setting timeout of the usb communication with the device
"""
print('[Set Device Timeout]')
kp.core.set_timeout(device_group=device_group, milliseconds=5000)
print(' - Success')
"""
upload firmware to device
"""
try:
print('[Upload Firmware]')
kp.core.load_firmware_from_file(device_group=device_group,
scpu_fw_path=SCPU_FW_PATH,
ncpu_fw_path=NCPU_FW_PATH)
print(' - Success')
except kp.ApiKPException as exception:
print('Error: upload firmware failed, error = \'{}\''.format(str(exception)))
exit(0)
"""
upload model to device
"""
try:
print('[Upload Model]')
model_nef_descriptor = kp.core.load_model_from_file(device_group=device_group,
file_path=MODEL_FILE_PATH)
print(' - Success')
except kp.ApiKPException as exception:
print('Error: upload model failed, error = \'{}\''.format(str(exception)))
exit(0)
"""
prepare the image
"""
print('[Read Image]')
img = cv2.imread(filename=IMAGE_FILE_PATH)
img_bgr565 = cv2.cvtColor(src=img, code=cv2.COLOR_BGR2BGR565)
print(' - Success')
"""
prepare generic image inference input descriptor
"""
generic_inference_input_descriptor = kp.GenericImageInferenceDescriptor(
model_id=model_nef_descriptor.models[0].id,
)
"""
starting inference work
"""
print('[Starting Inference Work]')
print(' - Starting inference loop {} times'.format(LOOP_TIME))
print(' - ', end='')
result_queue = queue.Queue()
send_thread = threading.Thread(target=_image_send_function, args=(device_group,
LOOP_TIME,
generic_inference_input_descriptor,
img_bgr565,
kp.ImageFormat.KP_IMAGE_FORMAT_RGB565))
receive_thread = threading.Thread(target=_result_receive_function, args=(device_group,
LOOP_TIME,
result_queue))
start_inference_time = time.time()
send_thread.start()
receive_thread.start()
try:
while send_thread.is_alive():
send_thread.join(1)
while receive_thread.is_alive():
receive_thread.join(1)
except (KeyboardInterrupt, SystemExit):
print('\n - Received keyboard interrupt, quitting threads.')
exit(0)
end_inference_time = time.time()
time_spent = end_inference_time - start_inference_time
try:
generic_raw_result = result_queue.get(timeout=3)
except Exception as exception:
print('Error: Result queue is empty !')
exit(0)
print()
print('[Result]')
print(" - Total inference {} images".format(LOOP_TIME))
print(" - Time spent: {:.2f} secs, FPS = {:.1f}".format(time_spent, LOOP_TIME / time_spent))
"""
retrieve inference node output
"""
print('[Retrieve Inference Node Output ]')
inf_node_output_list = []
for node_idx in range(generic_raw_result.header.num_output_node):
inference_float_node_output = kp.inference.generic_inference_retrieve_float_node(node_idx=node_idx,
generic_raw_result=generic_raw_result,
channels_ordering=kp.ChannelOrdering.KP_CHANNEL_ORDERING_CHW)
inf_node_output_list.append(inference_float_node_output)
print(' - Success')
print('[Result]')
print(inf_node_output_list)

View File

@ -0,0 +1,215 @@
# ******************************************************************************
# Copyright (c) 2022. Kneron Inc. All rights reserved. *
# ******************************************************************************
import os
import sys
import argparse
PWD = os.path.dirname(os.path.abspath(__file__))
sys.path.insert(1, os.path.join(PWD, '..'))
sys.path.insert(1, os.path.join(PWD, '../example/'))
from utils.ExampleHelper import get_device_usb_speed_by_port_id
import kp
import cv2
import numpy as np
import math
import multiprocessing
import threading
def get_palette(mapping, seed=9487):
np.random.seed(seed)
return [list(np.random.choice(range(256), size=3))
for _ in range(mapping)]
def convert_numpy_to_rgba_and_width_align_4(data):
"""Converts the numpy data into RGBA.
720 input is 4 byte width aligned.
"""
height, width, channel = data.shape
width_aligned = 4 * math.ceil(width / 4.0)
aligned_data = np.zeros((height, width_aligned, 4), dtype=np.int8)
aligned_data[:height, :width, :channel] = data
aligned_data = aligned_data.flatten()
return aligned_data.tobytes()
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='KL720 Kneron Model Zoo Generic Data Inference Example - STDC.')
parser.add_argument('-p',
'--port_id',
help='Using specified port ID for connecting device (Default: port ID of first scanned Kneron '
'device)',
default=0,
type=int)
parser.add_argument('-img',
'--img_path',
help='input image path',
default=os.path.join(PWD, '../../res/images/pic_0456_jpg.rf.6aa4e19498fc69214a37fc278b23aa6b_leftImg8bit.png'),
type=str)
parser.add_argument('-nef',
'--nef_model_path',
help='input NEF model path',
default=os.path.join(PWD,
'../../res/models/KL720/kn-model-zoo-mmseg_stdc/724models_720.nef'),
type=str)
args = parser.parse_args()
assert args.img_path is not None, "need to set input image but got None"
assert args.nef_model_path is not None, "need to set nef model path but got None"
usb_port_id = args.port_id
nef_model_path = args.nef_model_path
image_file_path = args.img_path
"""
check device USB speed (Recommend run KL720 at super speed)
"""
try:
if kp.UsbSpeed.KP_USB_SPEED_SUPER != get_device_usb_speed_by_port_id(usb_port_id=usb_port_id):
print('\033[91m' + '[Warning] Device is not run at super speed.' + '\033[0m')
except Exception as exception:
print('Error: check device USB speed fail, port ID = \'{}\', error msg: [{}]'.format(usb_port_id,
str(exception)))
exit(0)
"""
connect the device
"""
try:
print('[Connect Device]')
device_group = kp.core.connect_devices(usb_port_ids=[usb_port_id])
print(' - Success')
except kp.ApiKPException as exception:
print('Error: connect device fail, port ID = \'{}\', error msg: [{}]'.format(usb_port_id,
str(exception)))
exit(0)
"""
setting timeout of the usb communication with the device
"""
print('[Set Device Timeout]')
kp.core.set_timeout(device_group=device_group, milliseconds=5000)
print(' - Success')
"""
upload model to device
"""
try:
print('[Upload Model]')
model_nef_descriptor = kp.core.load_model_from_file(device_group=device_group,
file_path=nef_model_path)
print(' - Success')
except kp.ApiKPException as exception:
print('Error: upload model failed, error = \'{}\''.format(str(exception)))
exit(0)
"""
extract input radix from NEF
"""
nef_radix = model_nef_descriptor.models[0].input_nodes[0].quantization_parameters.v1.quantized_fixed_point_descriptor_list[0].radix # only support single model NEF
"""
prepare the image
"""
nef_model_width = model_nef_descriptor.models[0].input_nodes[0].tensor_shape_info.v1.shape_npu[3]
nef_model_height = model_nef_descriptor.models[0].input_nodes[0].tensor_shape_info.v1.shape_npu[2]
print('[Read Image]')
img = cv2.imread(filename=image_file_path)
img_height, img_width, img_channels = img.shape
# resize to model input size
img = cv2.resize(img, (nef_model_width, nef_model_height), interpolation=cv2.INTER_AREA)
# to rgb
img_input = cv2.cvtColor(src=img, code=cv2.COLOR_BGR2RGB)
# this model trained with normalize method: (data - 128)/256 ,
img_input = img_input / 256.
img_input -= 0.5
# toolchain calculate the radix value from input data (after normalization), and set it into NEF model.
# NPU will divide input data "2^radix" automatically, so, we have to scaling the input data here due to this reason.
img_input *= pow(2, nef_radix)
# convert rgb to rgba and width align 4, due to npu requirement.
img_buffer = convert_numpy_to_rgba_and_width_align_4(img_input)
print(' - Success')
"""
prepare generic data inference input descriptor
"""
generic_inference_input_descriptor = kp.GenericDataInferenceDescriptor(
model_id=model_nef_descriptor.models[0].id,
inference_number=0,
input_node_data_list=[kp.GenericInputNodeData(buffer=img_buffer)]
)
"""
starting inference work
"""
print('[Starting Inference Work]')
try:
kp.inference.generic_data_inference_send(device_group=device_group,
generic_inference_input_descriptor=generic_inference_input_descriptor)
generic_raw_result = kp.inference.generic_data_inference_receive(device_group=device_group)
except kp.ApiKPException as exception:
print(' - Error: inference failed, error = {}'.format(exception))
exit(0)
print()
"""
retrieve inference node output
"""
print('[Retrieve Inference Node Output ]')
inf_node_output_list = []
for node_idx in range(generic_raw_result.header.num_output_node):
inference_float_node_output = kp.inference.generic_inference_retrieve_float_node(
node_idx=node_idx,
generic_raw_result=generic_raw_result,
channels_ordering=kp.ChannelOrdering.KP_CHANNEL_ORDERING_CHW)
inf_node_output_list.append(inference_float_node_output)
print(' - Success')
o_im = cv2.imread(filename=image_file_path)
# change output array data order from nchw to hwc
pred = inf_node_output_list[0].ndarray.squeeze().transpose(1, 2, 0) # should only one output node
# channel number means all possible class number
n_c = pred.shape[2]
# upscaling inference result array to origin image size
pred = cv2.resize(pred, (o_im.shape[1], o_im.shape[0]), interpolation=cv2.INTER_LINEAR)
# find max score class
pred = pred.argmax(2)
print('[Result]')
print(' - segmentation result \n{}'.format(pred))
"""
output result image
"""
colors = get_palette(n_c)
seg_res_vis = np.zeros(o_im.shape, np.uint8)
for c in range(n_c):
seg_res_vis[pred == c] = colors[c]
print('[Output Result Image]')
output_img_name = 'output_{}'.format(os.path.basename(image_file_path))
print(' - Output Segmentation result on \'{}\''.format(output_img_name))
cv2.imwrite(output_img_name, seg_res_vis)

View File

@ -0,0 +1 @@
# Cluster4NPU

523
multidongle.py Normal file
View File

@ -0,0 +1,523 @@
from typing import Union, Tuple
import os
import sys
import argparse
import time
import threading
import queue
import numpy as np
import kp
import cv2
import time
class MultiDongle:
# Curently, only BGR565, RGB8888, YUYV, and RAW8 formats are supported
_FORMAT_MAPPING = {
'BGR565': kp.ImageFormat.KP_IMAGE_FORMAT_RGB565,
'RGB8888': kp.ImageFormat.KP_IMAGE_FORMAT_RGBA8888,
'YUYV': kp.ImageFormat.KP_IMAGE_FORMAT_YUYV,
'RAW8': kp.ImageFormat.KP_IMAGE_FORMAT_RAW8,
# 'YCBCR422_CRY1CBY0': kp.ImageFormat.KP_IMAGE_FORMAT_YCBCR422_CRY1CBY0,
# 'YCBCR422_CBY1CRY0': kp.ImageFormat.KP_IMAGE_FORMAT_CBY1CRY0,
# 'YCBCR422_Y1CRY0CB': kp.ImageFormat.KP_IMAGE_FORMAT_Y1CRY0CB,
# 'YCBCR422_Y1CBY0CR': kp.ImageFormat.KP_IMAGE_FORMAT_Y1CBY0CR,
# 'YCBCR422_CRY0CBY1': kp.ImageFormat.KP_IMAGE_FORMAT_CRY0CBY1,
# 'YCBCR422_CBY0CRY1': kp.ImageFormat.KP_IMAGE_FORMAT_CBY0CRY1,
# 'YCBCR422_Y0CRY1CB': kp.ImageFormat.KP_IMAGE_FORMAT_Y0CRY1CB,
# 'YCBCR422_Y0CBY1CR': kp.ImageFormat.KP_IMAGE_FORMAT_Y0CBY1CR,
}
def __init__(self, port_id: list, scpu_fw_path: str, ncpu_fw_path: str, model_path: str, upload_fw: bool = False):
"""
Initialize the MultiDongle class.
:param port_id: List of USB port IDs for the same layer's devices.
:param scpu_fw_path: Path to the SCPU firmware file.
:param ncpu_fw_path: Path to the NCPU firmware file.
:param model_path: Path to the model file.
:param upload_fw: Flag to indicate whether to upload firmware.
"""
self.port_id = port_id
self.upload_fw = upload_fw
# Check if the firmware is needed
if self.upload_fw:
self.scpu_fw_path = scpu_fw_path
self.ncpu_fw_path = ncpu_fw_path
self.model_path = model_path
self.device_group = None
# generic_inference_input_descriptor will be prepared in initialize
self.model_nef_descriptor = None
self.generic_inference_input_descriptor = None
# Queues for data
# Input queue for images to be sent
self._input_queue = queue.Queue()
# Output queue for received results
self._output_queue = queue.Queue()
# Threading attributes
self._send_thread = None
self._receive_thread = None
self._stop_event = threading.Event() # Event to signal threads to stop
self._inference_counter = 0
def initialize(self):
"""
Connect devices, upload firmware (if upload_fw is True), and upload model.
Must be called before start().
"""
# Connect device and assign to self.device_group
try:
print('[Connect Device]')
self.device_group = kp.core.connect_devices(usb_port_ids=self.port_id)
print(' - Success')
except kp.ApiKPException as exception:
print('Error: connect device fail, port ID = \'{}\', error msg: [{}]'.format(self.port_id, str(exception)))
sys.exit(1)
# setting timeout of the usb communication with the device
# print('[Set Device Timeout]')
# kp.core.set_timeout(device_group=self.device_group, milliseconds=5000)
# print(' - Success')
if self.upload_fw:
try:
print('[Upload Firmware]')
kp.core.load_firmware_from_file(device_group=self.device_group,
scpu_fw_path=self.scpu_fw_path,
ncpu_fw_path=self.ncpu_fw_path)
print(' - Success')
except kp.ApiKPException as exception:
print('Error: upload firmware failed, error = \'{}\''.format(str(exception)))
sys.exit(1)
# upload model to device
try:
print('[Upload Model]')
self.model_nef_descriptor = kp.core.load_model_from_file(device_group=self.device_group,
file_path=self.model_path)
print(' - Success')
except kp.ApiKPException as exception:
print('Error: upload model failed, error = \'{}\''.format(str(exception)))
sys.exit(1)
# Extract model input dimensions automatically from model metadata
if self.model_nef_descriptor and self.model_nef_descriptor.models:
model = self.model_nef_descriptor.models[0]
if hasattr(model, 'input_nodes') and model.input_nodes:
input_node = model.input_nodes[0]
# From your JSON: "shape_npu": [1, 3, 128, 128] -> (width, height)
shape = input_node.tensor_shape_info.data.shape_npu
self.model_input_shape = (shape[3], shape[2]) # (width, height)
self.model_input_channels = shape[1] # 3 for RGB
print(f"Model input shape detected: {self.model_input_shape}, channels: {self.model_input_channels}")
else:
self.model_input_shape = (128, 128) # fallback
self.model_input_channels = 3
print("Using default input shape (128, 128)")
else:
self.model_input_shape = (128, 128)
self.model_input_channels = 3
print("Model info not available, using default shape")
# Prepare generic inference input descriptor after model is loaded
if self.model_nef_descriptor:
self.generic_inference_input_descriptor = kp.GenericImageInferenceDescriptor(
model_id=self.model_nef_descriptor.models[0].id,
)
else:
print("Warning: Could not get generic inference input descriptor from model.")
self.generic_inference_input_descriptor = None
def preprocess_frame(self, frame: np.ndarray, target_format: str = 'BGR565') -> np.ndarray:
"""
Preprocess frame for inference
"""
resized_frame = cv2.resize(frame, self.model_input_shape)
if target_format == 'BGR565':
return cv2.cvtColor(resized_frame, cv2.COLOR_BGR2BGR565)
elif target_format == 'RGB8888':
return cv2.cvtColor(resized_frame, cv2.COLOR_BGR2RGBA)
elif target_format == 'YUYV':
return cv2.cvtColor(resized_frame, cv2.COLOR_BGR2YUV_YUYV)
else:
return resized_frame # RAW8 or other formats
def get_latest_inference_result(self, timeout: float = 0.01) -> Tuple[float, str]:
"""
Get the latest inference result
Returns: (probability, result_string) or (None, None) if no result
"""
output_descriptor = self.get_output(timeout=timeout)
if not output_descriptor:
return None, None
# Process the output descriptor
if hasattr(output_descriptor, 'header') and \
hasattr(output_descriptor.header, 'num_output_node') and \
hasattr(output_descriptor.header, 'inference_number'):
inf_node_output_list = []
retrieval_successful = True
for node_idx in range(output_descriptor.header.num_output_node):
try:
inference_float_node_output = kp.inference.generic_inference_retrieve_float_node(
node_idx=node_idx,
generic_raw_result=output_descriptor,
channels_ordering=kp.ChannelOrdering.KP_CHANNEL_ORDERING_CHW
)
inf_node_output_list.append(inference_float_node_output.ndarray.copy())
except kp.ApiKPException as e:
retrieval_successful = False
break
except Exception as e:
retrieval_successful = False
break
if retrieval_successful and inf_node_output_list:
# Process output nodes
if output_descriptor.header.num_output_node == 1:
raw_output_array = inf_node_output_list[0].flatten()
else:
concatenated_outputs = [arr.flatten() for arr in inf_node_output_list]
raw_output_array = np.concatenate(concatenated_outputs) if concatenated_outputs else np.array([])
if raw_output_array.size > 0:
probability = postprocess(raw_output_array)
result_str = "Fire" if probability > 0.5 else "No Fire"
return probability, result_str
return None, None
# Modified _send_thread_func to get data from input queue
def _send_thread_func(self):
"""Internal function run by the send thread, gets images from input queue."""
print("Send thread started.")
while not self._stop_event.is_set():
if self.generic_inference_input_descriptor is None:
# Wait for descriptor to be ready or stop
self._stop_event.wait(0.1) # Avoid busy waiting
continue
try:
# Get image and format from the input queue
# Blocks until an item is available or stop event is set/timeout occurs
try:
# Use get with timeout or check stop event in a loop
# This pattern allows thread to check stop event while waiting on queue
item = self._input_queue.get(block=True, timeout=0.1)
# Check if this is our sentinel value
if item is None:
continue
# Now safely unpack the tuple
image_data, image_format_enum = item
except queue.Empty:
# If queue is empty after timeout, check stop event and continue loop
continue
# Configure and send the image
self._inference_counter += 1 # Increment counter for each image
self.generic_inference_input_descriptor.inference_number = self._inference_counter
self.generic_inference_input_descriptor.input_node_image_list = [kp.GenericInputNodeImage(
image=image_data,
image_format=image_format_enum, # Use the format from the queue
resize_mode=kp.ResizeMode.KP_RESIZE_ENABLE,
padding_mode=kp.PaddingMode.KP_PADDING_CORNER,
normalize_mode=kp.NormalizeMode.KP_NORMALIZE_KNERON
)]
kp.inference.generic_image_inference_send(device_group=self.device_group,
generic_inference_input_descriptor=self.generic_inference_input_descriptor)
# print("Image sent.") # Optional: add log
# No need for sleep here usually, as queue.get is blocking
except kp.ApiKPException as exception:
print(f' - Error in send thread: inference send failed, error = {exception}')
self._stop_event.set() # Signal other thread to stop
except Exception as e:
print(f' - Unexpected error in send thread: {e}')
self._stop_event.set()
print("Send thread stopped.")
# _receive_thread_func remains the same
def _receive_thread_func(self):
"""Internal function run by the receive thread, puts results into output queue."""
print("Receive thread started.")
while not self._stop_event.is_set():
try:
generic_inference_output_descriptor = kp.inference.generic_image_inference_receive(device_group=self.device_group)
self._output_queue.put(generic_inference_output_descriptor)
except kp.ApiKPException as exception:
if not self._stop_event.is_set(): # Avoid printing error if we are already stopping
print(f' - Error in receive thread: inference receive failed, error = {exception}')
self._stop_event.set()
except Exception as e:
print(f' - Unexpected error in receive thread: {e}')
self._stop_event.set()
print("Receive thread stopped.")
# start method signature changed (no image/format parameters)
def start(self):
"""
Start the send and receive threads.
Must be called after initialize().
"""
if self.device_group is None:
raise RuntimeError("MultiDongle not initialized. Call initialize() first.")
if self._send_thread is None or not self._send_thread.is_alive():
self._stop_event.clear() # Clear stop event for a new start
self._send_thread = threading.Thread(target=self._send_thread_func, daemon=True)
self._send_thread.start()
print("Send thread started.")
if self._receive_thread is None or not self._receive_thread.is_alive():
self._receive_thread = threading.Thread(target=self._receive_thread_func, daemon=True)
self._receive_thread.start()
print("Receive thread started.")
# stop method remains the same
# def stop(self):
# """
# Signal the threads to stop and wait for them to finish.
# """
# print("Stopping threads...")
# self._stop_event.set() # Signal stop
# # Put a dummy item in the input queue to unblock the send thread if it's waiting
# try:
# self._input_queue.put(None)
# except Exception as e:
# print(f"Error putting dummy item in input queue: {e}")
# if self._send_thread and self._send_thread.is_alive():
# self._send_thread.join()
# print("Send thread joined.")
# if self._receive_thread and self._receive_thread.is_alive():
# # DON'T disconnect the device group unless absolutely necessary
# # Instead, use a timeout and warning
# self._receive_thread.join(timeout=5)
# if self._receive_thread.is_alive():
# print("Warning: Receive thread did not join within timeout. It might be blocked.")
# # Only disconnect as a last resort for stuck threads
# if self.device_group:
# try:
# print("Thread stuck - disconnecting device group as last resort...")
# kp.core.disconnect_devices(device_group=self.device_group)
# # IMPORTANT: Re-connect immediately to keep device available
# self.device_group = kp.core.connect_devices(usb_port_ids=self.port_id)
# print("Device group reconnected.")
# except Exception as e:
# print(f"Error during device reconnect: {e}")
# self.device_group = None # Only set to None if reconnect fails
# else:
# print("Receive thread joined.")
# print("Threads stopped.")
def stop(self):
"""
Stop inference threads cleanly
"""
print("Stopping threads...")
self._stop_event.set()
# Unblock send thread if waiting on queue
try:
self._input_queue.put(None, timeout=1.0)
except:
pass
# Join threads with reasonable timeout
threads = [
(self._send_thread, "Send thread"),
(self._receive_thread, "Receive thread")
]
for thread, name in threads:
if thread and thread.is_alive():
thread.join(timeout=3.0)
if thread.is_alive():
print(f"Warning: {name} did not stop within timeout")
print("All threads stopped")
def put_input(self, image: Union[str, np.ndarray], format: str, target_size: Tuple[int, int] = None):
"""
Put an image into the input queue with flexible preprocessing
"""
if isinstance(image, str):
image_data = cv2.imread(image)
if image_data is None:
raise FileNotFoundError(f"Image file not found at {image}")
if target_size:
image_data = cv2.resize(image_data, target_size)
elif isinstance(image, np.ndarray):
# Don't modify original array, make copy if needed
image_data = image.copy() if target_size is None else cv2.resize(image, target_size)
else:
raise ValueError("Image must be a file path (str) or a numpy array (ndarray).")
if format in self._FORMAT_MAPPING:
image_format_enum = self._FORMAT_MAPPING[format]
else:
raise ValueError(f"Unsupported format: {format}")
self._input_queue.put((image_data, image_format_enum))
def get_output(self, timeout: float = None):
"""
Get the next received data from the output queue.
This method is non-blocking by default unless a timeout is specified.
:param timeout: Time in seconds to wait for data. If None, it's non-blocking.
:return: Received data (e.g., kp.GenericInferenceOutputDescriptor) or None if no data available within timeout.
"""
try:
return self._output_queue.get(block=timeout is not None, timeout=timeout)
except queue.Empty:
return None
def __del__(self):
"""Ensure resources are released when the object is garbage collected."""
self.stop()
if self.device_group:
try:
kp.core.disconnect_devices(device_group=self.device_group)
print("Device group disconnected in destructor.")
except Exception as e:
print(f"Error disconnecting device group in destructor: {e}")
def postprocess(raw_model_output: list) -> float:
"""
Post-processes the raw model output.
Assumes the model output is a list/array where the first element is the desired probability.
"""
if raw_model_output and len(raw_model_output) > 0:
probability = raw_model_output[0]
return float(probability)
return 0.0 # Default or error value
class WebcamInferenceRunner:
def __init__(self, multidongle: MultiDongle, image_format: str = 'BGR565'):
self.multidongle = multidongle
self.image_format = image_format
self.latest_probability = 0.0
self.result_str = "No Fire"
# Statistics tracking
self.processed_inference_count = 0
self.inference_fps_start_time = None
self.display_fps_start_time = None
self.display_frame_counter = 0
def run(self, camera_id: int = 0):
cap = cv2.VideoCapture(camera_id)
if not cap.isOpened():
raise RuntimeError("Cannot open webcam")
try:
while True:
ret, frame = cap.read()
if not ret:
break
# Track display FPS
if self.display_fps_start_time is None:
self.display_fps_start_time = time.time()
self.display_frame_counter += 1
# Preprocess and send frame
processed_frame = self.multidongle.preprocess_frame(frame, self.image_format)
self.multidongle.put_input(processed_frame, self.image_format)
# Get inference result
prob, result = self.multidongle.get_latest_inference_result()
if prob is not None:
# Track inference FPS
if self.inference_fps_start_time is None:
self.inference_fps_start_time = time.time()
self.processed_inference_count += 1
self.latest_probability = prob
self.result_str = result
# Display frame with results
self._display_results(frame)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
finally:
# self._print_statistics()
cap.release()
cv2.destroyAllWindows()
def _display_results(self, frame):
display_frame = frame.copy()
text_color = (0, 255, 0) if "Fire" in self.result_str else (0, 0, 255)
# Display inference result
cv2.putText(display_frame, f"{self.result_str} (Prob: {self.latest_probability:.2f})",
(10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, text_color, 2)
# Calculate and display inference FPS
if self.inference_fps_start_time and self.processed_inference_count > 0:
elapsed_time = time.time() - self.inference_fps_start_time
if elapsed_time > 0:
inference_fps = self.processed_inference_count / elapsed_time
cv2.putText(display_frame, f"Inference FPS: {inference_fps:.2f}",
(10, 60), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 255), 2)
cv2.imshow('Fire Detection', display_frame)
# def _print_statistics(self):
# """Print final statistics"""
# print(f"\n--- Summary ---")
# print(f"Total inferences processed: {self.processed_inference_count}")
# if self.inference_fps_start_time and self.processed_inference_count > 0:
# elapsed = time.time() - self.inference_fps_start_time
# if elapsed > 0:
# avg_inference_fps = self.processed_inference_count / elapsed
# print(f"Average Inference FPS: {avg_inference_fps:.2f}")
# if self.display_fps_start_time and self.display_frame_counter > 0:
# elapsed = time.time() - self.display_fps_start_time
# if elapsed > 0:
# avg_display_fps = self.display_frame_counter / elapsed
# print(f"Average Display FPS: {avg_display_fps:.2f}")
if __name__ == "_main_":
PORT_IDS = [28, 32]
SCPU_FW = r'fw_scpu.bin'
NCPU_FW = r'fw_ncpu.bin'
MODEL_PATH = r'fire_detection_520.nef'
try:
# Initialize inference engine
print("Initializing MultiDongle...")
multidongle = MultiDongle(PORT_IDS, SCPU_FW, NCPU_FW, MODEL_PATH, upload_fw=True)
multidongle.initialize()
multidongle.start()
# Run using the new runner class
print("Starting webcam inference...")
runner = WebcamInferenceRunner(multidongle, 'BGR565')
runner.run()
except Exception as e:
print(f"Error: {e}")
import traceback
traceback.print_exc()
finally:
if 'multidongle' in locals():
multidongle.stop()

View File

@ -4,4 +4,7 @@ version = "0.1.0"
description = "Add your description here"
readme = "README.md"
requires-python = ">=3.12"
dependencies = []
dependencies = [
"numpy>=2.2.6",
"opencv-python>=4.11.0.86",
]

View File

297
test.py Normal file
View File

@ -0,0 +1,297 @@
import kp
import time
import numpy as np
from typing import List, Dict, Any, Callable, Optional
import queue
import threading
import multiprocessing
import cv2
import os
# 定義一個 Dongle 的設定結構
class DongleConfig:
def __init__(self, port_id: list, scpu_fw_path: str, ncpu_fw_path: str, model_path: str, device_type: str = "KL520"):
self.port_id = port_id
self.scpu_fw_path = scpu_fw_path
self.ncpu_fw_path = ncpu_fw_path
self.model_path = model_path
self.device_type = device_type
# 定義一個 Pipeline 的層級結構
class PipelineLayer:
def __init__(self, name: str, dongle_config: DongleConfig, preprocess_func: Optional[Callable] = None, postprocess_func: Optional[Callable] = None):
self.name = name
self.dongle_config = dongle_config
self.preprocess_func = preprocess_func
self.postprocess_func = postprocess_func
class KneronPipeline:
def __init__(self, pipeline_layers: List[PipelineLayer]):
if not pipeline_layers:
raise ValueError("Pipeline must have at least one layer.")
self.pipeline_layers = pipeline_layers
self._dongles: Dict[str, Any] = {} # 儲存 kp.core.DeviceGroup 實例
self._model_descriptors: Dict[str, Any] = {} # 儲存模型描述符
self._layer_connections: List[tuple] = [] # 儲存層之間的連接關係
self._initialized = False
self._lock = threading.Lock() # 用於初始化保護
def add_layer_connection(self, from_layer_name: str, to_layer_name: str):
"""
定義不同層之間的資料流向
例如: pipeline.add_layer_connection("layer1", "layer2")
表示 layer1 的輸出作為 layer2 的輸入
更複雜的連接方式可能需要更詳細的定義例如指定輸出節點到輸入節點的對應
"""
from_layer = next((layer for layer in self.pipeline_layers if layer.name == from_layer_name), None)
to_layer = next((layer for layer in self.pipeline_layers if layer.name == to_layer_name), None)
if not from_layer or not to_layer:
raise ValueError(f"Invalid layer names: {from_layer_name} or {to_layer_name} not found.")
self._layer_connections.append((from_layer_name, to_layer_name))
def initialize(self):
"""
初始化所有 dongles, 載入韌體和模型
"""
with self._lock:
if self._initialized:
print("Pipeline already initialized.")
return
print("[初始化 Pipeline...]")
for layer in self.pipeline_layers:
config = layer.dongle_config
print(f"[連接設備] Layer: {layer.name}, Port: {config.port_id}")
try:
# 使用單獨的 DeviceGroup 來管理每個 dongle
device_group = kp.core.connect_devices(usb_port_ids=config.port_id)
self._dongles[layer.name] = device_group
print(f" - {layer.name}: 連接成功")
print(f"[設置超時] Layer: {layer.name}")
kp.core.set_timeout(device_group=device_group, milliseconds=5000)
print(f" - {layer.name}: 超時設置成功")
print(f"[上傳韌體] Layer: {layer.name}")
kp.core.load_firmware_from_file(device_group=device_group,
scpu_fw_path=config.scpu_fw_path,
ncpu_fw_path=config.ncpu_fw_path)
print(f" - {layer.name}: 韌體上傳成功")
print(f"[上傳模型] Layer: {layer.name}")
model_descriptor = kp.core.load_model_from_file(device_group=device_group,
file_path=config.model_path)
self._model_descriptors[layer.name] = model_descriptor
print(f" - {layer.name}: 模型上傳成功")
except Exception as e:
print(f"錯誤: 初始化 Layer {layer.name} 失敗: {str(e)}")
# 清理已連接的設備
self.release()
raise e
self._initialized = True
print("[Pipeline 初始化完成]")
def run(self, input_data: Any) -> Dict[str, Any]:
"""
執行整個 pipeline
這部分需要處理平行和串行的執行邏輯
輸入可以是原始數據 (例如圖片路徑)第一個 layer preprocess 會處理它
"""
if not self._initialized:
raise RuntimeError("Pipeline not initialized. Call .initialize() first.")
# 這裡需要實現平行和多層邏輯。
# 一種方式是使用 ThreadPoolExecutor 或 ProcessPoolExecutor。
# 另一種是手動管理 Thread/Process。
# 考慮到 dongle 通訊的 I/O 綁定特性Thread 可能更適合平行處理。
# 但如果 preprocess/postprocess 是 CPU 綁定,則 multiprocessing 更優。
# 我們先假設 dongle 通訊是主要瓶頸,使用 threading。
# 如果 preprocess/postprocess 也是瓶頸,可以考慮在 pipeline 內部針對這些步驟使用 Process。
results: Dict[str, Any] = {}
# 這個範例只處理簡單的順序 pipeline平行和複雜串接需要更多邏輯
# TODO: 實現平行和複雜串接邏輯
current_input = input_data
for i, layer in enumerate(self.pipeline_layers):
print(f"[執行 Layer] {layer.name}")
dongle = self._dongles[layer.name]
model_descriptor = self._model_descriptors[layer.name]
# 預處理
processed_input = current_input
if layer.preprocess_func:
print(f" - 執行 {layer.name} 的預處理")
processed_input = layer.preprocess_func(current_input)
# 推論
print(f" - 執行 {layer.name} 的推論")
try:
# 假設 processed_input 是 kp.GenericInputNodeImage 列表或可轉換為它
# 這裡需要根據實際的 preprocess 輸出和模型輸入來調整
if isinstance(processed_input, list) and all(isinstance(item, kp.GenericInputNodeImage) for item in processed_input):
inference_input_descriptor = kp.GenericImageInferenceDescriptor(
model_id=model_descriptor.models[0].id,
inference_number=0,
input_node_image_list=processed_input
)
elif isinstance(processed_input, np.ndarray):
# 假設 preprocess 輸出了 numpy array, 需要轉換為 GenericInputNodeImage
# 這需要更詳細的 info, 例如圖像格式, resize, padding, normalize
# 這裡先給一個簡易範例假設是BGR565, 128x128
inference_input_descriptor = kp.GenericImageInferenceDescriptor(
model_id=model_descriptor.models[0].id,
inference_number=0,
input_node_image_list=[
kp.GenericInputNodeImage(
image=processed_input,
image_format=kp.ImageFormat.KP_IMAGE_FORMAT_RGB565, # 這裡需要根據你的 preprocess 輸出調整
resize_mode=kp.ResizeMode.KP_RESIZE_ENABLE,
padding_mode=kp.PaddingMode.KP_PADDING_CORNER,
normalize_mode=kp.NormalizeMode.KP_NORMALIZE_KNERON
)
]
)
else:
raise TypeError(f"Unsupported processed input type for layer {layer.name}: {type(processed_input)}")
kp.inference.generic_image_inference_send(device_group=dongle,
generic_inference_input_descriptor=inference_input_descriptor)
generic_raw_result = kp.inference.generic_image_inference_receive(device_group=dongle)
# 處理原始結果
inf_node_output_list = []
for node_idx in range(generic_raw_result.header.num_output_node):
# 這裡假設輸出是 float 類型,需要根據你的模型輸出類型調整
inference_float_node_output = kp.inference.generic_inference_retrieve_float_node(
node_idx=node_idx,
generic_raw_result=generic_raw_result,
channels_ordering=kp.ChannelOrdering.KP_CHANNEL_ORDERING_CHW # 需要根據模型輸出調整
)
inf_node_output_list.append(inference_float_node_output.ndarray.copy())
raw_output = inf_node_output_list # 可以是 list of numpy arrays
except Exception as e:
print(f"錯誤: Layer {layer.name} 推論失敗: {str(e)}")
raise e
# 後處理
final_output = raw_output
if layer.postprocess_func:
print(f" - 執行 {layer.name} 的後處理")
final_output = layer.postprocess_func(raw_output)
results[layer.name] = final_output
# 設定下一個 layer 的輸入 (簡易串接,更複雜需要 _layer_connections 邏輯)
current_input = final_output
return results
def release(self):
"""
釋放所有 dongles 連接
"""
with self._lock:
if not self._initialized:
print("Pipeline not initialized.")
return
print("[釋放 Pipeline...]")
for layer_name, dongle in self._dongles.items():
try:
kp.core.disconnect_devices(device_group=dongle)
print(f" - {layer_name}: 已斷開連接")
except Exception as e:
print(f"錯誤: 斷開 Layer {layer_name} 連接失敗: {str(e)}")
self._dongles = {}
self._model_descriptors = {}
self._initialized = False
print("[Pipeline 釋放完成]")
def __enter__(self):
self.initialize()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.release()
# 範例使用
if __name__ == '__main__':
# 定義你的 preprocess 和 postprocess 函數
def my_preprocess(image_path: str):
# 參照你提供的 2_2nef_test.py
img = cv2.imread(image_path)
if img is None:
raise Exception(f"無法讀取圖片: {image_path}")
img_resized = cv2.resize(img, (128, 128))
img_bgr565 = cv2.cvtColor(img_resized, cv2.COLOR_BGR2BGR565)
# 返回 numpy arrayKneronPipeline.run 中會轉換為 GenericInputNodeImage
return img_bgr565
def my_postprocess(raw_output: List[np.ndarray]):
# 參照你提供的 2_2nef_test.py
probability = raw_output[0].flatten()[0] # 假設是單一輸出節點,取第一個值
result = "Fire" if probability > 0.5 else "No Fire"
return {"result": result, "confidence": probability}
def another_preprocess(data: Any):
# 另一個 layer 的預處理
print("執行第二層的預處理...")
return data # 這裡只是範例,實際需要根據前一层的輸出和當前層模型輸入來處理
def another_postprocess(raw_output: List[np.ndarray]):
# 另一個 layer 的後處理
print("執行第二層的後處理...")
# 假設這層輸出是另一個分類結果
class_id = np.argmax(raw_output[0].flatten())
return {"class_id": class_id}
# 定義 Dongle 配置
dongle_config1 = DongleConfig(port_id=0, scpu_fw_path='fw_scpu.bin', ncpu_fw_path='fw_ncpu.bin', model_path='models_520.nef')
# 如果有另一個 dongle 和模型
dongle_config2 = DongleConfig(port_id=1, scpu_fw_path='fw_scpu.bin', ncpu_fw_path='fw_ncpu.bin', model_path='another_model.nef')
# 定義 Pipeline 層
# 單層 pipeline (平行處理多個輸入可以使用這個 structure, 但 run 方法需要修改)
# layers_single = [
# PipelineLayer(name="detector_dongle_0", dongle_config=dongle_config1, preprocess_func=my_preprocess, postprocess_func=my_postprocess),
# # 如果想平行處理,可以在這裡加更多使用不同 dongle 的 layer但 run 方法需要平行化
# # PipelineLayer(name="detector_dongle_1", dongle_config=dongle_config2, preprocess_func=my_preprocess, postprocess_func=my_postprocess),
# ]
# 多層 pipeline (串接不同 dongles)
layers_multi = [
PipelineLayer(name="detector_layer", dongle_config=dongle_config1, preprocess_func=my_preprocess, postprocess_func=my_postprocess),
PipelineLayer(name="classifier_layer", dongle_config=dongle_config2, preprocess_func=another_preprocess, postprocess_func=another_postprocess),
]
# 建立 Pipeline 實例
# pipeline = KneronPipeline(pipeline_layers=layers_single) # 單層範例
pipeline = KneronPipeline(pipeline_layers=layers_multi) # 多層範例
# 定義層之間的連接 (僅多層時需要,目前 run 方法只支持簡單順序串接)
# pipeline.add_layer_connection("detector_layer", "classifier_layer")
# 使用 with 語句確保釋放資源
try:
with pipeline:
# 執行推論
image_path = r'C:\Users\USER\Desktop\Yu-An\Firedetection\test_images\fire4.jpeg'
results = pipeline.run(input_data=image_path)
print("\nPipeline 執行結果:")
for layer_name, output in results.items():
print(f" Layer '{layer_name}' 輸出: {output}")
# 如果是平行處理,可以在這裡輸入多個 image paths然後在 run 方法裡分派給不同的 dongle
except Exception as e:
print(f"Pipeline 執行過程中發生錯誤: {str(e)}")

534
test_pipeline.py Normal file
View File

@ -0,0 +1,534 @@
import multiprocessing
import time
import os
import sys
import cv2
import numpy as np
import math
# --- Import Kneron Specific Libraries and Utilities ---
# Assuming your Kneron SDK and example files are set up such that these imports work.
# You might need to adjust sys.path or your project structure.
try:
# Attempt to import the core Kneron library
import kp
print("Kneron SDK (kp) imported successfully.")
# Attempt to import utilities from your specific example files
# Adjust these import paths based on where your files are located relative to this script
# from utils.ExampleHelper import get_device_usb_speed_by_port_id # Assuming this is in utils
# from utils.ExamplePostProcess import post_process_yolo_v5 # Assuming this is in utils
# Import from your provided files directly or ensure they are in Python path
# Placeholder imports - **YOU MUST ENSURE THESE ACTUALLY WORK**
# Depending on your setup, you might need to copy the functions directly or fix paths.
try:
# Assuming these are in your utils or directly importable
from utils.ExampleHelper import get_device_usb_speed_by_port_id
from utils.ExamplePostProcess import post_process_yolo_v5
# Based on snippets from your files
def get_palette(mapping, seed=9487):
print("Using get_palette from snippet.")
np.random.seed(seed)
return [list(np.random.choice(range(256), size=3))
for _ in range(mapping)]
# Based on snippet from your files - ensure dtype is correct (np.uint8 or np.int8)
def convert_numpy_to_rgba_and_width_align_4(data):
print("Using convert_numpy_to_rgba_and_width_align_4 from snippet.")
height, width, channel = data.shape
width_aligned = 4 * math.ceil(width / 4.0)
# Use np.uint8 for image data conversion usually
aligned_data = np.zeros((height, width_aligned, 4), dtype=np.uint8)
aligned_data[:height, :width, :channel] = data
aligned_data = aligned_data.flatten() # Flatten as shown in snippet
return aligned_data.tobytes()
# Based on snippet from your files (adapted to take device_group or device)
# It seems inference calls might take a single device object from the group.
# Let's assume retrieve_inference_node_output needs the raw_result, not device.
def retrieve_inference_node_output(generic_raw_result):
print('[Retrieve Inference Node Output ]')
inf_node_output_list = []
for node_idx in range(generic_raw_result.header.num_output_node):
inference_float_node_output = kp.inference.generic_inference_retrieve_float_node(
node_idx=node_idx,
generic_raw_result=generic_raw_result,
channels_ordering=kp.ChannelOrdering.KP_CHANNEL_ORDERING_CHW) # Use actual kp enum
inf_node_output_list.append(inference_float_node_output)
print(' - Success')
return inf_node_output_list
print("Kneron utility functions imported/defined from snippets.")
except ImportError as e:
print(f"Error importing Kneron utility modules (e.g., utils.ExampleHelper): {e}")
print("Please ensure the 'utils' directory is in your Python path or copy the necessary functions.")
raise # Re-raise the error to indicate missing dependencies
except ImportError as e:
print(f"Error importing Kneron SDK (kp): {e}")
print("Please ensure Kneron SDK is installed and in your Python path.")
print("Cannot run Kneron pipeline without the SDK.")
sys.exit("Kneron SDK not found.")
# --- Worker Functions ---
def yolo_worker(input_queue: multiprocessing.Queue, output_queue: multiprocessing.Queue,
firmware_path: str, model_path: str, port_id: int):
"""
YOLOv5 processing layer worker. Initializes Kneron device and model using kp.core.
Reads image data, performs YOLO inference, and passes the original image data
to the next layer's queue.
"""
device_group = None
model_yolo_descriptor = None
device = None # Will get the specific device object from the group
print("YOLO Worker: Starting and initializing Kneron device using kp.core...")
try:
# --- Device and Model Initialization (per process) ---
print(f"YOLO Worker: Connecting to device on port {port_id}")
# Use kp.core.connect_devices
device_group = kp.core.connect_devices(usb_port_ids=[port_id])
if not device_group or not device_group.devices:
raise RuntimeError(f"YOLO Worker: Failed to connect to device on port {port_id}")
# Get the specific device object from the group (assuming single device per worker)
# device = device_group.devices[0]
print(f"YOLO Worker: Device connected")
print("YOLO Worker: Loading firmware")
# Firmware loading seems to be a method on the device object
# device.load_firmware_from_file(firmware_path)
print("YOLO Worker: Loading YOLO model using kp.core")
# Use kp.core.load_model_from_file with the device_group
model_yolo_descriptor = kp.core.load_model_from_file(
device_group=device_group,
file_path=model_path
)
if not model_yolo_descriptor:
raise RuntimeError(f"YOLO Worker: Failed to load YOLO model from {model_path}")
print("YOLO Worker: Initialization complete. Waiting for data.")
# Optional: Check USB speed if needed, using the imported utility
# usb_speed = get_device_usb_speed_by_port_id(port_id) # This utility might need adaptation or just be illustrative
# print(f"YOLO Worker: Device USB Speed: {usb_speed}") # This utility might need adaptation
# Set inference feature if required (e.g., for image format)
# Based on examples, sometimes necessary before inference
try:
# Example, check your original code for required features
# device.set_feature(kp.InferenceFeature.INF_FEATURE_IMAGE_FORMAT, kp.ImageFormat.IMAGE_FORMAT_RGBA)
pass # Add relevant set_feature calls from your original code if needed
except Exception as set_feature_e:
print(f"YOLO Worker: Error setting inference features: {set_feature_e}")
# Decide if this is a critical error or warning
# ---------------------------------------
while True:
# Get image data from the input queue
data_item = input_queue.get()
if data_item is None:
print("YOLO Worker: Received termination signal. Propagating None to STDC queue.")
output_queue.put(None) # Propagate the signal
break # Exit the worker loop
# Assuming data_item is the image numpy array
image_data = data_item
# print("YOLO Worker: Received image data for processing.") # Too verbose for loop
# --- Perform YOLO Inference ---
img_height, img_width, _ = image_data.shape
inference_input_size = (img_width, img_height) # Kneron expects (width, height)
# Convert image data format for Kneron inference using the utility
aligned_image_data = convert_numpy_to_rgba_and_width_align_4(image_data)
# Send image to device and get raw results
try:
# Use kp.inference with the specific device object
generic_raw_result = kp.inference.generic_inference_send_image(
device=device, # Use the device object from the group
data=aligned_image_data,
size=inference_input_size
)
if not generic_raw_result:
print("YOLO Worker: Warning - generic_inference_send_image returned None.")
continue # Skip post-processing if raw result is none
# Retrieve raw node outputs using the utility
# retrieve_inference_node_output utility likely takes the raw_result
inf_node_output_list = retrieve_inference_node_output(generic_raw_result)
# Perform YOLO specific post-processing using the utility
yolo_results = post_process_yolo_v5(
inference_float_node_list=inf_node_output_list,
hardware_preproc_info=generic_raw_result.header.hw_pre_proc_info_list[0],
thresh_value=0.2 # Example threshold, adjust as needed
)
# print(f"YOLO Worker: Detected {len(yolo_results.box_list)} objects.") # Too verbose
# Pass the *original image data* to the next layer (STDC)
# STDC will perform segmentation on the whole image.
output_queue.put(image_data)
# print("YOLO Worker: Finished inference, put image data to STDC queue.") # Too verbose
except Exception as inference_e:
print(f"YOLO Worker Inference Error: {inference_e}")
# Handle inference errors - maybe put an error marker in the queue?
# For simplicity in FPS, we just skip this frame or let it potentially raise further
pass # Continue processing next item
print("YOLO Worker: Exiting loop.")
except Exception as e:
print(f"YOLO Worker Initialization or Runtime Error: {e}")
finally:
# --- Device Disconnection ---
# Disconnect the device group
if device_group:
print("YOLO Worker: Disconnecting device group.")
kp.core.disconnect_devices(device_group=device_group)
print("YOLO Worker: Exiting.")
def stdc_worker(input_queue: multiprocessing.Queue, output_queue: multiprocessing.Queue,
firmware_path: str, model_path: str, port_id: int):
"""
STDC processing layer worker. Initializes Kneron device and model using kp.core.
Reads image data, performs STDC inference, and puts a completion marker
into the final output queue.
"""
device_group = None
model_stdc_descriptor = None
device = None # Will get the specific device object from the group
print("STDC Worker: Starting and initializing Kneron device using kp.core...")
try:
# --- Device and Model Initialization (per process) ---
# STDC worker also needs its own device connection and model
print(f"STDC Worker: Connecting to device on port {port_id}")
# Use kp.core.connect_devices
device_group = kp.core.connect_devices(usb_port_ids=[port_id])
if not device_group or not device_group.devices:
raise RuntimeError(f"STDC Worker: Failed to connect to device on port {port_id}")
# Get the specific device object from the group (assuming single device per worker)
# device = device_group.devices[0]
print(f"STDC Worker: Device connected")
# print("STDC Worker: Loading firmware")
# Firmware loading seems to be a method on the device object
# device.load_firmware_from_file(firmware_path)
print("STDC Worker: Loading STDC model using kp.core")
# Use kp.core.load_model_from_file with the device_group
model_stdc_descriptor = kp.core.load_model_from_file(
device_group=device_group,
file_path=model_path
)
if not model_stdc_descriptor:
raise RuntimeError(f"STDC Worker: Failed to load STDC model from {model_path}")
print("STDC Worker: Initialization complete. Waiting for data.")
# Optional: Check USB speed if needed
# usb_speed = get_device_usb_speed_by_port_id(port_id) # This utility might need adaptation
# print(f"STDC Worker: Device USB Speed: {usb_speed}") # This utility might need adaptation
# Set inference feature if required (e.g., for image format)
try:
# Example, check your original code for required features
# device.set_feature(kp.InferenceFeature.INF_FEATURE_IMAGE_FORMAT, kp.ImageFormat.IMAGE_FORMAT_RGBA)
pass # Add relevant set_feature calls from your original code if needed
except Exception as set_feature_e:
print(f"STDC Worker: Error setting inference features: {set_feature_e}")
# Decide if this is a critical error or warning
# ---------------------------------------
while True:
# Get image data from the input queue (from YOLO worker)
data_item = input_queue.get()
if data_item is None:
print("STDC Worker: Received termination signal. Putting None to final output queue and exiting.")
output_queue.put(None) # Signal end of results to the main process
break # Exit the worker loop
# Assuming data_item is the image numpy array
image_data = data_item
# print("STDC Worker: Received image data for processing.") # Too verbose
# --- Perform STDC Inference ---
img_height, img_width, _ = image_data.shape
inference_input_size = (img_width, img_height) # Kneron expects (width, height)
# Convert image data format for Kneron inference using the utility
aligned_image_data = convert_numpy_to_rgba_and_width_align_4(image_data)
# Send image to device and get raw results
try:
# Use kp.inference with the specific device object
generic_raw_result = kp.inference.generic_inference_send_image(
device=device, # Use the device object from the group
data=aligned_image_data,
size=inference_input_size
)
if not generic_raw_result:
print("STDC Worker: Warning - generic_inference_send_image returned None.")
continue # Skip post-processing if raw result is none
# Retrieve raw node outputs using the utility
# retrieve_inference_node_output utility likely takes the raw_result
inf_node_output_list = retrieve_inference_node_output(generic_raw_result)
# STDC Post-processing (extracting segmentation mask)
# Based on your STDC example, the output is likely in the first node
if inf_node_output_list:
pred_raw = inf_node_output_list[0].ndarray.squeeze() # Shape might be (C, H, W)
# Transpose to (H, W, C) if needed for further visualization/processing
# pred_transposed = pred_raw.transpose(1, 2, 0) # (H, W, C)
# Example: Get the argmax mask (most likely class per pixel)
# Assuming pred_raw is shaped (C, H, W) after squeeze()
# pred_argmax = np.argmax(pred_raw, axis=0) # Shape (H, W)
# For FPS, a simple signal per frame is fine:
output_queue.put("STDC_Frame_Done")
# If you needed the mask: output_queue.put(pred_argmax.astype(np.uint8))
# print("STDC Worker: Finished segmentation inference, put result to final output queue.") # Too verbose
else:
print("STDC Worker: Warning - No output nodes retrieved.")
output_queue.put("STDC_Frame_Error") # Signal processing error for this frame
except Exception as inference_e:
print(f"STDC Worker Inference Error: {inference_e}")
# Handle inference errors
output_queue.put("STDC_Frame_Error") # Signal processing error for this frame
print("STDC Worker: Exiting loop.")
except Exception as e:
print(f"STDC Worker Initialization or Runtime Error: {e}")
finally:
# --- Device Disconnection ---
# Disconnect the device group
if device_group:
print("STDC Worker: Disconnecting device group.")
kp.core.disconnect_devices(device_group=device_group)
print("STDC Worker: Exiting.")
# --- API Function to Run the Pipeline ---
def run_yolo_stdc_pipeline(image_file_path: str, firmware_path: str,
yolo_model_path: str, stdc_model_path: str,
loop_count: int = 100, port_id: int = 0):
"""
Runs the YOLOv5 + STDC pipeline using multiprocessing.Queue.
Initializes Kneron devices and models within worker processes using kp.core.
Processes the same image 'loop_count' times and calculates FPS.
Args:
image_file_path (str): Path to the input image file (e.g., .bmp).
firmware_path (str): Path to the Kneron firmware file (.bin).
yolo_model_path (str): Path to the YOLOv5 model file (.nef).
stdc_model_path (str): Path to the STDC model file (.nef).
loop_count (int): Number of times to process the image through the pipeline.
port_id (int): Kneron device port ID to connect to.
Returns:
float: Calculated FPS for processing 'loop_count' frames.
"""
# Read the input image ONCE
print(f"Main: Reading input image from {image_file_path}")
image_data = cv2.imread(image_file_path)
if image_data is None:
print(f"Error: Could not read image from {image_file_path}")
return 0.0
print(f"Main: Image read successfully. Shape: {image_data.shape}")
# Define queues for inter-process communication
yolo_input_q = multiprocessing.Queue() # Main process puts image data -> YOLO worker reads
stdc_input_q = multiprocessing.Queue() # YOLO worker puts image data -> STDC worker reads
stdc_output_q = multiprocessing.Queue() # STDC worker puts results/markers -> Main process reads
# Create worker processes
yolo_process = multiprocessing.Process(
target=yolo_worker,
args=(yolo_input_q, stdc_input_q, firmware_path, yolo_model_path, port_id)
)
stdc_process = multiprocessing.Process(
target=stdc_worker,
args=(stdc_input_q, stdc_output_q, firmware_path, stdc_model_path, port_id)
)
# Start the worker processes
print("Main: Starting YOLO and STDC worker processes...")
yolo_process.start()
stdc_process.start()
print("Main: Worker processes started.")
# Wait briefly for processes to initialize Kneron devices and load models
# This is a heuristic; a more robust method involves workers signaling readiness.
# Given the complexity of Kneron init, 5-10 seconds might be reasonable, adjust as needed.
initialization_wait_time = 10 # seconds
print(f"Main: Waiting {initialization_wait_time}s for workers to initialize devices and models.")
time.sleep(initialization_wait_time)
print("Main: Finished initialization waiting period.")
print(f"Main: Putting the same image into YOLO input queue {loop_count} times...")
start_time = time.time() # Start timing the loop
# Put the same image data into the input queue 'loop_count' times
for i in range(loop_count):
yolo_input_q.put(image_data)
# print(f"Main: Queued image {i+1}/{loop_count}") # Optional: print progress
print(f"Main: Finished queuing {loop_count} images. Sending termination signal to YOLO worker.")
# Send termination signal to the first worker's input queue
yolo_input_q.put(None)
# Collect results/completion markers from the final output queue
print("Main: Collecting results from STDC output queue...")
processed_frame_count = 0
# collected_results = [] # Uncomment if you put actual results in the queue
while processed_frame_count < loop_count: # Collect exactly 'loop_count' valid results/markers
# Use a timeout in get() to avoid hanging indefinitely if a worker fails
try:
# Adjust timeout based on expected processing time per frame
result = stdc_output_q.get(timeout=60) # Example timeout: 60 seconds per result
if result is None:
# Received None prematurely? This shouldn't happen if workers are correct
# and we are waiting for loop_count items before checking for None.
print("Main: Warning - Received None from STDC output queue before collecting all frames.")
break # Exit collection loop if unexpected None
if result == "STDC_Frame_Done":
processed_frame_count += 1
# print(f"Main: Collected completion marker for frame {processed_frame_count}") # Optional
elif result == "STDC_Frame_Error":
processed_frame_count += 1 # Count it as a processed frame, albeit with error
print(f"Main: Collected error marker for a frame ({processed_frame_count}).")
# elif isinstance(result, np.ndarray): # If you put the actual mask (e.g., uint8)
# collected_results.append(result)
# processed_frame_count += 1
# # print(f"Main: Collected segmentation mask for frame {processed_frame_count}") # Optional
else:
print(f"Main: Warning - Received unexpected item in STDC output queue: {result}")
except multiprocessing.queues.Empty:
print(f"Main: Timeout ({60}s) while waiting for results from STDC output queue. {processed_frame_count}/{loop_count} frames processed.")
# Decide how to handle this - maybe terminate workers and exit?
break # Exit collection loop on timeout
except Exception as e:
print(f"Main: Error collecting result: {e}")
break # Exit collection loop on other errors
end_time = time.time() # Stop timing
print(f"Main: Collected {processed_frame_count} results/markers.")
# Now wait for the final None signal after collecting all expected results
# This ensures queues are flushed and workers are terminating cleanly.
print("Main: Waiting for final termination signal from STDC output queue...")
try:
final_signal = stdc_output_q.get(timeout=10) # Short timeout for the final None
if final_signal is None:
print("Main: Received final termination signal from STDC output queue.")
else:
print(f"Main: Warning - Expected final None, but received: {final_signal}")
except multiprocessing.queues.Empty:
print("Main: Timeout while waiting for final None from STDC output queue.")
except Exception as e:
print(f"Main: Error getting final signal: {e}")
# Wait for the worker processes to fully complete
print("Main: Joining worker processes...")
yolo_process.join(timeout=30) # Add timeout for joining
if yolo_process.is_alive():
print("Main: YOLO process did not terminate gracefully within timeout. Terminating.")
yolo_process.terminate()
print("Main: YOLO process joined.")
stdc_process.join(timeout=30) # Add timeout for joining
if stdc_process.is_alive():
print("Main: STDC process did not terminate gracefully within timeout. Terminating.")
stdc_process.terminate()
print("Main: STDC process joined.")
print("Main: All processes joined.")
# Calculate FPS
duration = end_time - start_time
if duration > 0 and processed_frame_count > 0:
fps = processed_frame_count / duration
print(f"\n--- Pipeline Performance ---")
print(f"Processed {processed_frame_count} frames in {duration:.4f} seconds.")
print(f"Calculated FPS: {fps:.2f}")
else:
fps = 0.0
print("Could not calculate FPS (duration is zero or no frames processed).")
# print("\nCollected STDC Results (Markers or Data):")
# print(collected_results) # If you collected actual results
return fps
# --- Example Usage ---
if __name__ == '__main__':
# Required for multiprocessing on Windows
multiprocessing.freeze_support()
# --- CONFIGURE YOUR FILE PATHS HERE ---
# !! IMPORTANT !! Replace these placeholder paths with your actual file locations.
ACTUAL_FIRMWARE_PATH = "path/to/your/KL720.bin" # e.g., "C:/Kneron_SDK/firmware/KL720/KL720.bin"
ACTUAL_YOLO_MODEL_PATH = "path/to/your/yolov5_model.nef" # e.g., "C:/Kneron_SDK/models/KL720/yolov5/yolov5.nef"
ACTUAL_STDC_MODEL_PATH = "path/to/your/stdc_model.nef" # e.g., "C:/Kneron_SDK/models/KL720/stdc/stdc.nef"
ACTUAL_IMAGE_FILE_PATH = "path/to/your/input_image.bmp" # e.g., "C:/Kneron_SDK/images/people_talk_in_street_1500x1500.bmp"
# Check if the placeholder paths are still being used
paths_configured = not ("path/to/your/" in ACTUAL_FIRMWARE_PATH or
"path/to/your/" in ACTUAL_YOLO_MODEL_PATH or
"path/to/your/" in ACTUAL_STDC_MODEL_PATH or
"path/to/your/" in ACTUAL_IMAGE_FILE_PATH)
if not paths_configured:
print("\n===================================================================")
print("!!! WARNING: Please update the file paths in the script before running. !!!")
print("===================================================================")
else:
print("\n--- Running YOLOv5 + STDC Pipeline ---")
try:
final_fps = run_yolo_stdc_pipeline(
image_file_path=ACTUAL_IMAGE_FILE_PATH,
firmware_path=ACTUAL_FIRMWARE_PATH,
yolo_model_path=ACTUAL_YOLO_MODEL_PATH,
stdc_model_path=ACTUAL_STDC_MODEL_PATH,
loop_count=100,
port_id=0 # Change if your device is on a different port
)
print(f"\nAPI Function Call Complete. Final FPS: {final_fps:.2f}")
except Exception as main_e:
print(f"\nAn error occurred during the main pipeline execution: {main_e}")

0
tests/__init__.py Normal file
View File

78
uv.lock generated Normal file
View File

@ -0,0 +1,78 @@
version = 1
revision = 2
requires-python = ">=3.12"
resolution-markers = [
"sys_platform == 'darwin'",
"platform_machine == 'aarch64' and sys_platform == 'linux'",
"(platform_machine != 'aarch64' and sys_platform == 'linux') or (sys_platform != 'darwin' and sys_platform != 'linux')",
]
[[package]]
name = "cluster4npu"
version = "0.1.0"
source = { virtual = "." }
dependencies = [
{ name = "numpy" },
{ name = "opencv-python" },
]
[package.metadata]
requires-dist = [
{ name = "numpy", specifier = ">=2.2.6" },
{ name = "opencv-python", specifier = ">=4.11.0.86" },
]
[[package]]
name = "numpy"
version = "2.2.6"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/76/21/7d2a95e4bba9dc13d043ee156a356c0a8f0c6309dff6b21b4d71a073b8a8/numpy-2.2.6.tar.gz", hash = "sha256:e29554e2bef54a90aa5cc07da6ce955accb83f21ab5de01a62c8478897b264fd", size = 20276440, upload-time = "2025-05-17T22:38:04.611Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/82/5d/c00588b6cf18e1da539b45d3598d3557084990dcc4331960c15ee776ee41/numpy-2.2.6-cp312-cp312-macosx_10_13_x86_64.whl", hash = "sha256:41c5a21f4a04fa86436124d388f6ed60a9343a6f767fced1a8a71c3fbca038ff", size = 20875348, upload-time = "2025-05-17T21:34:39.648Z" },
{ url = "https://files.pythonhosted.org/packages/66/ee/560deadcdde6c2f90200450d5938f63a34b37e27ebff162810f716f6a230/numpy-2.2.6-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:de749064336d37e340f640b05f24e9e3dd678c57318c7289d222a8a2f543e90c", size = 14119362, upload-time = "2025-05-17T21:35:01.241Z" },
{ url = "https://files.pythonhosted.org/packages/3c/65/4baa99f1c53b30adf0acd9a5519078871ddde8d2339dc5a7fde80d9d87da/numpy-2.2.6-cp312-cp312-macosx_14_0_arm64.whl", hash = "sha256:894b3a42502226a1cac872f840030665f33326fc3dac8e57c607905773cdcde3", size = 5084103, upload-time = "2025-05-17T21:35:10.622Z" },
{ url = "https://files.pythonhosted.org/packages/cc/89/e5a34c071a0570cc40c9a54eb472d113eea6d002e9ae12bb3a8407fb912e/numpy-2.2.6-cp312-cp312-macosx_14_0_x86_64.whl", hash = "sha256:71594f7c51a18e728451bb50cc60a3ce4e6538822731b2933209a1f3614e9282", size = 6625382, upload-time = "2025-05-17T21:35:21.414Z" },
{ url = "https://files.pythonhosted.org/packages/f8/35/8c80729f1ff76b3921d5c9487c7ac3de9b2a103b1cd05e905b3090513510/numpy-2.2.6-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:f2618db89be1b4e05f7a1a847a9c1c0abd63e63a1607d892dd54668dd92faf87", size = 14018462, upload-time = "2025-05-17T21:35:42.174Z" },
{ url = "https://files.pythonhosted.org/packages/8c/3d/1e1db36cfd41f895d266b103df00ca5b3cbe965184df824dec5c08c6b803/numpy-2.2.6-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:fd83c01228a688733f1ded5201c678f0c53ecc1006ffbc404db9f7a899ac6249", size = 16527618, upload-time = "2025-05-17T21:36:06.711Z" },
{ url = "https://files.pythonhosted.org/packages/61/c6/03ed30992602c85aa3cd95b9070a514f8b3c33e31124694438d88809ae36/numpy-2.2.6-cp312-cp312-musllinux_1_2_aarch64.whl", hash = "sha256:37c0ca431f82cd5fa716eca9506aefcabc247fb27ba69c5062a6d3ade8cf8f49", size = 15505511, upload-time = "2025-05-17T21:36:29.965Z" },
{ url = "https://files.pythonhosted.org/packages/b7/25/5761d832a81df431e260719ec45de696414266613c9ee268394dd5ad8236/numpy-2.2.6-cp312-cp312-musllinux_1_2_x86_64.whl", hash = "sha256:fe27749d33bb772c80dcd84ae7e8df2adc920ae8297400dabec45f0dedb3f6de", size = 18313783, upload-time = "2025-05-17T21:36:56.883Z" },
{ url = "https://files.pythonhosted.org/packages/57/0a/72d5a3527c5ebffcd47bde9162c39fae1f90138c961e5296491ce778e682/numpy-2.2.6-cp312-cp312-win32.whl", hash = "sha256:4eeaae00d789f66c7a25ac5f34b71a7035bb474e679f410e5e1a94deb24cf2d4", size = 6246506, upload-time = "2025-05-17T21:37:07.368Z" },
{ url = "https://files.pythonhosted.org/packages/36/fa/8c9210162ca1b88529ab76b41ba02d433fd54fecaf6feb70ef9f124683f1/numpy-2.2.6-cp312-cp312-win_amd64.whl", hash = "sha256:c1f9540be57940698ed329904db803cf7a402f3fc200bfe599334c9bd84a40b2", size = 12614190, upload-time = "2025-05-17T21:37:26.213Z" },
{ url = "https://files.pythonhosted.org/packages/f9/5c/6657823f4f594f72b5471f1db1ab12e26e890bb2e41897522d134d2a3e81/numpy-2.2.6-cp313-cp313-macosx_10_13_x86_64.whl", hash = "sha256:0811bb762109d9708cca4d0b13c4f67146e3c3b7cf8d34018c722adb2d957c84", size = 20867828, upload-time = "2025-05-17T21:37:56.699Z" },
{ url = "https://files.pythonhosted.org/packages/dc/9e/14520dc3dadf3c803473bd07e9b2bd1b69bc583cb2497b47000fed2fa92f/numpy-2.2.6-cp313-cp313-macosx_11_0_arm64.whl", hash = "sha256:287cc3162b6f01463ccd86be154f284d0893d2b3ed7292439ea97eafa8170e0b", size = 14143006, upload-time = "2025-05-17T21:38:18.291Z" },
{ url = "https://files.pythonhosted.org/packages/4f/06/7e96c57d90bebdce9918412087fc22ca9851cceaf5567a45c1f404480e9e/numpy-2.2.6-cp313-cp313-macosx_14_0_arm64.whl", hash = "sha256:f1372f041402e37e5e633e586f62aa53de2eac8d98cbfb822806ce4bbefcb74d", size = 5076765, upload-time = "2025-05-17T21:38:27.319Z" },
{ url = "https://files.pythonhosted.org/packages/73/ed/63d920c23b4289fdac96ddbdd6132e9427790977d5457cd132f18e76eae0/numpy-2.2.6-cp313-cp313-macosx_14_0_x86_64.whl", hash = "sha256:55a4d33fa519660d69614a9fad433be87e5252f4b03850642f88993f7b2ca566", size = 6617736, upload-time = "2025-05-17T21:38:38.141Z" },
{ url = "https://files.pythonhosted.org/packages/85/c5/e19c8f99d83fd377ec8c7e0cf627a8049746da54afc24ef0a0cb73d5dfb5/numpy-2.2.6-cp313-cp313-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:f92729c95468a2f4f15e9bb94c432a9229d0d50de67304399627a943201baa2f", size = 14010719, upload-time = "2025-05-17T21:38:58.433Z" },
{ url = "https://files.pythonhosted.org/packages/19/49/4df9123aafa7b539317bf6d342cb6d227e49f7a35b99c287a6109b13dd93/numpy-2.2.6-cp313-cp313-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:1bc23a79bfabc5d056d106f9befb8d50c31ced2fbc70eedb8155aec74a45798f", size = 16526072, upload-time = "2025-05-17T21:39:22.638Z" },
{ url = "https://files.pythonhosted.org/packages/b2/6c/04b5f47f4f32f7c2b0e7260442a8cbcf8168b0e1a41ff1495da42f42a14f/numpy-2.2.6-cp313-cp313-musllinux_1_2_aarch64.whl", hash = "sha256:e3143e4451880bed956e706a3220b4e5cf6172ef05fcc397f6f36a550b1dd868", size = 15503213, upload-time = "2025-05-17T21:39:45.865Z" },
{ url = "https://files.pythonhosted.org/packages/17/0a/5cd92e352c1307640d5b6fec1b2ffb06cd0dabe7d7b8227f97933d378422/numpy-2.2.6-cp313-cp313-musllinux_1_2_x86_64.whl", hash = "sha256:b4f13750ce79751586ae2eb824ba7e1e8dba64784086c98cdbbcc6a42112ce0d", size = 18316632, upload-time = "2025-05-17T21:40:13.331Z" },
{ url = "https://files.pythonhosted.org/packages/f0/3b/5cba2b1d88760ef86596ad0f3d484b1cbff7c115ae2429678465057c5155/numpy-2.2.6-cp313-cp313-win32.whl", hash = "sha256:5beb72339d9d4fa36522fc63802f469b13cdbe4fdab4a288f0c441b74272ebfd", size = 6244532, upload-time = "2025-05-17T21:43:46.099Z" },
{ url = "https://files.pythonhosted.org/packages/cb/3b/d58c12eafcb298d4e6d0d40216866ab15f59e55d148a5658bb3132311fcf/numpy-2.2.6-cp313-cp313-win_amd64.whl", hash = "sha256:b0544343a702fa80c95ad5d3d608ea3599dd54d4632df855e4c8d24eb6ecfa1c", size = 12610885, upload-time = "2025-05-17T21:44:05.145Z" },
{ url = "https://files.pythonhosted.org/packages/6b/9e/4bf918b818e516322db999ac25d00c75788ddfd2d2ade4fa66f1f38097e1/numpy-2.2.6-cp313-cp313t-macosx_10_13_x86_64.whl", hash = "sha256:0bca768cd85ae743b2affdc762d617eddf3bcf8724435498a1e80132d04879e6", size = 20963467, upload-time = "2025-05-17T21:40:44Z" },
{ url = "https://files.pythonhosted.org/packages/61/66/d2de6b291507517ff2e438e13ff7b1e2cdbdb7cb40b3ed475377aece69f9/numpy-2.2.6-cp313-cp313t-macosx_11_0_arm64.whl", hash = "sha256:fc0c5673685c508a142ca65209b4e79ed6740a4ed6b2267dbba90f34b0b3cfda", size = 14225144, upload-time = "2025-05-17T21:41:05.695Z" },
{ url = "https://files.pythonhosted.org/packages/e4/25/480387655407ead912e28ba3a820bc69af9adf13bcbe40b299d454ec011f/numpy-2.2.6-cp313-cp313t-macosx_14_0_arm64.whl", hash = "sha256:5bd4fc3ac8926b3819797a7c0e2631eb889b4118a9898c84f585a54d475b7e40", size = 5200217, upload-time = "2025-05-17T21:41:15.903Z" },
{ url = "https://files.pythonhosted.org/packages/aa/4a/6e313b5108f53dcbf3aca0c0f3e9c92f4c10ce57a0a721851f9785872895/numpy-2.2.6-cp313-cp313t-macosx_14_0_x86_64.whl", hash = "sha256:fee4236c876c4e8369388054d02d0e9bb84821feb1a64dd59e137e6511a551f8", size = 6712014, upload-time = "2025-05-17T21:41:27.321Z" },
{ url = "https://files.pythonhosted.org/packages/b7/30/172c2d5c4be71fdf476e9de553443cf8e25feddbe185e0bd88b096915bcc/numpy-2.2.6-cp313-cp313t-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:e1dda9c7e08dc141e0247a5b8f49cf05984955246a327d4c48bda16821947b2f", size = 14077935, upload-time = "2025-05-17T21:41:49.738Z" },
{ url = "https://files.pythonhosted.org/packages/12/fb/9e743f8d4e4d3c710902cf87af3512082ae3d43b945d5d16563f26ec251d/numpy-2.2.6-cp313-cp313t-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:f447e6acb680fd307f40d3da4852208af94afdfab89cf850986c3ca00562f4fa", size = 16600122, upload-time = "2025-05-17T21:42:14.046Z" },
{ url = "https://files.pythonhosted.org/packages/12/75/ee20da0e58d3a66f204f38916757e01e33a9737d0b22373b3eb5a27358f9/numpy-2.2.6-cp313-cp313t-musllinux_1_2_aarch64.whl", hash = "sha256:389d771b1623ec92636b0786bc4ae56abafad4a4c513d36a55dce14bd9ce8571", size = 15586143, upload-time = "2025-05-17T21:42:37.464Z" },
{ url = "https://files.pythonhosted.org/packages/76/95/bef5b37f29fc5e739947e9ce5179ad402875633308504a52d188302319c8/numpy-2.2.6-cp313-cp313t-musllinux_1_2_x86_64.whl", hash = "sha256:8e9ace4a37db23421249ed236fdcdd457d671e25146786dfc96835cd951aa7c1", size = 18385260, upload-time = "2025-05-17T21:43:05.189Z" },
{ url = "https://files.pythonhosted.org/packages/09/04/f2f83279d287407cf36a7a8053a5abe7be3622a4363337338f2585e4afda/numpy-2.2.6-cp313-cp313t-win32.whl", hash = "sha256:038613e9fb8c72b0a41f025a7e4c3f0b7a1b5d768ece4796b674c8f3fe13efff", size = 6377225, upload-time = "2025-05-17T21:43:16.254Z" },
{ url = "https://files.pythonhosted.org/packages/67/0e/35082d13c09c02c011cf21570543d202ad929d961c02a147493cb0c2bdf5/numpy-2.2.6-cp313-cp313t-win_amd64.whl", hash = "sha256:6031dd6dfecc0cf9f668681a37648373bddd6421fff6c66ec1624eed0180ee06", size = 12771374, upload-time = "2025-05-17T21:43:35.479Z" },
]
[[package]]
name = "opencv-python"
version = "4.11.0.86"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "numpy" },
]
sdist = { url = "https://files.pythonhosted.org/packages/17/06/68c27a523103dad5837dc5b87e71285280c4f098c60e4fe8a8db6486ab09/opencv-python-4.11.0.86.tar.gz", hash = "sha256:03d60ccae62304860d232272e4a4fda93c39d595780cb40b161b310244b736a4", size = 95171956, upload-time = "2025-01-16T13:52:24.737Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/05/4d/53b30a2a3ac1f75f65a59eb29cf2ee7207ce64867db47036ad61743d5a23/opencv_python-4.11.0.86-cp37-abi3-macosx_13_0_arm64.whl", hash = "sha256:432f67c223f1dc2824f5e73cdfcd9db0efc8710647d4e813012195dc9122a52a", size = 37326322, upload-time = "2025-01-16T13:52:25.887Z" },
{ url = "https://files.pythonhosted.org/packages/3b/84/0a67490741867eacdfa37bc18df96e08a9d579583b419010d7f3da8ff503/opencv_python-4.11.0.86-cp37-abi3-macosx_13_0_x86_64.whl", hash = "sha256:9d05ef13d23fe97f575153558653e2d6e87103995d54e6a35db3f282fe1f9c66", size = 56723197, upload-time = "2025-01-16T13:55:21.222Z" },
{ url = "https://files.pythonhosted.org/packages/f3/bd/29c126788da65c1fb2b5fb621b7fed0ed5f9122aa22a0868c5e2c15c6d23/opencv_python-4.11.0.86-cp37-abi3-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:1b92ae2c8852208817e6776ba1ea0d6b1e0a1b5431e971a2a0ddd2a8cc398202", size = 42230439, upload-time = "2025-01-16T13:51:35.822Z" },
{ url = "https://files.pythonhosted.org/packages/2c/8b/90eb44a40476fa0e71e05a0283947cfd74a5d36121a11d926ad6f3193cc4/opencv_python-4.11.0.86-cp37-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:6b02611523803495003bd87362db3e1d2a0454a6a63025dc6658a9830570aa0d", size = 62986597, upload-time = "2025-01-16T13:52:08.836Z" },
{ url = "https://files.pythonhosted.org/packages/fb/d7/1d5941a9dde095468b288d989ff6539dd69cd429dbf1b9e839013d21b6f0/opencv_python-4.11.0.86-cp37-abi3-win32.whl", hash = "sha256:810549cb2a4aedaa84ad9a1c92fbfdfc14090e2749cedf2c1589ad8359aa169b", size = 29384337, upload-time = "2025-01-16T13:52:13.549Z" },
{ url = "https://files.pythonhosted.org/packages/a4/7d/f1c30a92854540bf789e9cd5dde7ef49bbe63f855b85a2e6b3db8135c591/opencv_python-4.11.0.86-cp37-abi3-win_amd64.whl", hash = "sha256:085ad9b77c18853ea66283e98affefe2de8cc4c1f43eda4c100cf9b2721142ec", size = 39488044, upload-time = "2025-01-16T13:52:21.928Z" },
]