cluster4npu/mutliseries.py
Mason fc47e5d868 Add multi-series dongle manager with load balancing and order preservation
- Implement MultiSeriesDongleManager for parallel inference across different dongle series
- Add GOPS-based load balancing (KL720: 1425 GOPS, KL520: 345 GOPS ratio ~4:1)
- Ensure sequential result output despite heterogeneous processing speeds
- Include comprehensive threading architecture with dispatcher, per-dongle workers, and result ordering
- Add performance statistics and monitoring capabilities
- Update project configuration and documentation

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-08-04 23:34:16 +08:00

193 lines
7.7 KiB
Python

import kp
from collections import defaultdict
from typing import Union
import os
import sys
import argparse
import time
import threading
import queue
import numpy as np
import cv2
# PWD = os.path.dirname(os.path.abspath(__file__))
# sys.path.insert(1, os.path.join(PWD, '..'))
IMAGE_FILE_PATH = r"c:\Users\mason\Downloads\kneron_plus_v3.1.2\kneron_plus\res\images\people_talk_in_street_640x640.bmp"
LOOP_TIME = 100
def _image_send_function(_device_group: kp.DeviceGroup,
_loop_time: int,
_generic_inference_input_descriptor: kp.GenericImageInferenceDescriptor,
_image: Union[bytes, np.ndarray],
_image_format: kp.ImageFormat) -> None:
for _loop in range(_loop_time):
try:
_generic_inference_input_descriptor.inference_number = _loop
_generic_inference_input_descriptor.input_node_image_list = [kp.GenericInputNodeImage(
image=_image,
image_format=_image_format,
resize_mode=kp.ResizeMode.KP_RESIZE_ENABLE,
padding_mode=kp.PaddingMode.KP_PADDING_CORNER,
normalize_mode=kp.NormalizeMode.KP_NORMALIZE_KNERON
)]
kp.inference.generic_image_inference_send(device_group=device_groups[1],
generic_inference_input_descriptor=_generic_inference_input_descriptor)
except kp.ApiKPException as exception:
print(' - Error: inference failed, error = {}'.format(exception))
exit(0)
def _result_receive_function(_device_group: kp.DeviceGroup,
_loop_time: int,
_result_queue: queue.Queue) -> None:
_generic_raw_result = None
for _loop in range(_loop_time):
try:
_generic_raw_result = kp.inference.generic_image_inference_receive(device_group=device_groups[1])
if _generic_raw_result.header.inference_number != _loop:
print(' - Error: incorrect inference_number {} at frame {}'.format(
_generic_raw_result.header.inference_number, _loop))
print('.', end='', flush=True)
except kp.ApiKPException as exception:
print(' - Error: inference failed, error = {}'.format(exception))
exit(0)
_result_queue.put(_generic_raw_result)
model_path = ["C:\\Users\\mason\\Downloads\\kneron_plus_v3.1.2\\kneron_plus\\res\\models\\KL520\\yolov5-noupsample_w640h640_kn-model-zoo\\kl520_20005_yolov5-noupsample_w640h640.nef", r"C:\Users\mason\Downloads\kneron_plus_v3.1.2\kneron_plus\res\models\KL720\yolov5-noupsample_w640h640_kn-model-zoo\kl720_20005_yolov5-noupsample_w640h640.nef"]
SCPU_FW_PATH_520 = "C:\\Users\\mason\\Downloads\\kneron_plus_v3.1.2\\kneron_plus\\res\\firmware\\KL520\\fw_scpu.bin"
NCPU_FW_PATH_520 = "C:\\Users\\mason\\Downloads\\kneron_plus_v3.1.2\\kneron_plus\\res\\firmware\\KL520\\fw_ncpu.bin"
SCPU_FW_PATH_720 = "C:\\Users\\mason\\Downloads\\kneron_plus_v3.1.2\\kneron_plus\\res\\firmware\\KL720\\fw_scpu.bin"
NCPU_FW_PATH_720 = "C:\\Users\\mason\\Downloads\\kneron_plus_v3.1.2\\kneron_plus\\res\\firmware\\KL720\\fw_ncpu.bin"
device_list = kp.core.scan_devices()
grouped_devices = defaultdict(list)
for device in device_list.device_descriptor_list:
grouped_devices[device.product_id].append(device.usb_port_id)
print(f"Found device groups: {dict(grouped_devices)}")
device_groups = []
for product_id, usb_port_id in grouped_devices.items():
try:
group = kp.core.connect_devices(usb_port_id)
device_groups.append(group)
print(f"Successfully connected to group for product ID {product_id} with ports{usb_port_id}")
except kp.ApiKPException as e:
print(f"Failed to connect to group for product ID {product_id}: {e}")
print(device_groups)
print('[Set Device Timeout]')
kp.core.set_timeout(device_group=device_groups[0], milliseconds=5000)
kp.core.set_timeout(device_group=device_groups[1], milliseconds=5000)
print(' - Success')
try:
print('[Upload Firmware]')
kp.core.load_firmware_from_file(device_group=device_groups[0],
scpu_fw_path=SCPU_FW_PATH_520,
ncpu_fw_path=NCPU_FW_PATH_520)
kp.core.load_firmware_from_file(device_group=device_groups[1],
scpu_fw_path=SCPU_FW_PATH_720,
ncpu_fw_path=NCPU_FW_PATH_720)
print(' - Success')
except kp.ApiKPException as exception:
print('Error: upload firmware failed, error = \'{}\''.format(str(exception)))
exit(0)
print('[Upload Model]')
model_nef_descriptors = []
# for group in device_groups:
model_nef_descriptor = kp.core.load_model_from_file(device_group=device_groups[0], file_path=model_path[0])
model_nef_descriptors.append(model_nef_descriptor)
model_nef_descriptor = kp.core.load_model_from_file(device_group=device_groups[1], file_path=model_path[1])
model_nef_descriptors.append(model_nef_descriptor)
print(' - Success')
"""
prepare the image
"""
print('[Read Image]')
img = cv2.imread(filename=IMAGE_FILE_PATH)
img_bgr565 = cv2.cvtColor(src=img, code=cv2.COLOR_BGR2BGR565)
print(' - Success')
"""
prepare generic image inference input descriptor
"""
print(model_nef_descriptors)
generic_inference_input_descriptor = kp.GenericImageInferenceDescriptor(
model_id=model_nef_descriptors[1].models[0].id,
)
"""
starting inference work
"""
print('[Starting Inference Work]')
print(' - Starting inference loop {} times'.format(LOOP_TIME))
print(' - ', end='')
result_queue = queue.Queue()
send_thread = threading.Thread(target=_image_send_function, args=(device_groups[1],
LOOP_TIME,
generic_inference_input_descriptor,
img_bgr565,
kp.ImageFormat.KP_IMAGE_FORMAT_RGB565))
receive_thread = threading.Thread(target=_result_receive_function, args=(device_groups[1],
LOOP_TIME,
result_queue))
start_inference_time = time.time()
send_thread.start()
receive_thread.start()
try:
while send_thread.is_alive():
send_thread.join(1)
while receive_thread.is_alive():
receive_thread.join(1)
except (KeyboardInterrupt, SystemExit):
print('\n - Received keyboard interrupt, quitting threads.')
exit(0)
end_inference_time = time.time()
time_spent = end_inference_time - start_inference_time
try:
generic_raw_result = result_queue.get(timeout=3)
except Exception as exception:
print('Error: Result queue is empty !')
exit(0)
print()
print('[Result]')
print(" - Total inference {} images".format(LOOP_TIME))
print(" - Time spent: {:.2f} secs, FPS = {:.1f}".format(time_spent, LOOP_TIME / time_spent))
"""
retrieve inference node output
"""
print('[Retrieve Inference Node Output ]')
inf_node_output_list = []
for node_idx in range(generic_raw_result.header.num_output_node):
inference_float_node_output = kp.inference.generic_inference_retrieve_float_node(node_idx=node_idx,
generic_raw_result=generic_raw_result,
channels_ordering=kp.ChannelOrdering.KP_CHANNEL_ORDERING_CHW)
inf_node_output_list.append(inference_float_node_output)
print(' - Success')
print('[Result]')
print(inf_node_output_list)