import kp from collections import defaultdict from typing import Union import os import sys import argparse import time import threading import queue import numpy as np import cv2 # PWD = os.path.dirname(os.path.abspath(__file__)) # sys.path.insert(1, os.path.join(PWD, '..')) IMAGE_FILE_PATH = r"c:\Users\mason\Downloads\kneron_plus_v3.1.2\kneron_plus\res\images\people_talk_in_street_640x640.bmp" LOOP_TIME = 100 def _image_send_function(_device_group: kp.DeviceGroup, _loop_time: int, _generic_inference_input_descriptor: kp.GenericImageInferenceDescriptor, _image: Union[bytes, np.ndarray], _image_format: kp.ImageFormat) -> None: for _loop in range(_loop_time): try: _generic_inference_input_descriptor.inference_number = _loop _generic_inference_input_descriptor.input_node_image_list = [kp.GenericInputNodeImage( image=_image, image_format=_image_format, resize_mode=kp.ResizeMode.KP_RESIZE_ENABLE, padding_mode=kp.PaddingMode.KP_PADDING_CORNER, normalize_mode=kp.NormalizeMode.KP_NORMALIZE_KNERON )] kp.inference.generic_image_inference_send(device_group=device_groups[1], generic_inference_input_descriptor=_generic_inference_input_descriptor) except kp.ApiKPException as exception: print(' - Error: inference failed, error = {}'.format(exception)) exit(0) def _result_receive_function(_device_group: kp.DeviceGroup, _loop_time: int, _result_queue: queue.Queue) -> None: _generic_raw_result = None for _loop in range(_loop_time): try: _generic_raw_result = kp.inference.generic_image_inference_receive(device_group=device_groups[1]) if _generic_raw_result.header.inference_number != _loop: print(' - Error: incorrect inference_number {} at frame {}'.format( _generic_raw_result.header.inference_number, _loop)) print('.', end='', flush=True) except kp.ApiKPException as exception: print(' - Error: inference failed, error = {}'.format(exception)) exit(0) _result_queue.put(_generic_raw_result) model_path = ["C:\\Users\\mason\\Downloads\\kneron_plus_v3.1.2\\kneron_plus\\res\\models\\KL520\\yolov5-noupsample_w640h640_kn-model-zoo\\kl520_20005_yolov5-noupsample_w640h640.nef", r"C:\Users\mason\Downloads\kneron_plus_v3.1.2\kneron_plus\res\models\KL720\yolov5-noupsample_w640h640_kn-model-zoo\kl720_20005_yolov5-noupsample_w640h640.nef"] SCPU_FW_PATH_520 = "C:\\Users\\mason\\Downloads\\kneron_plus_v3.1.2\\kneron_plus\\res\\firmware\\KL520\\fw_scpu.bin" NCPU_FW_PATH_520 = "C:\\Users\\mason\\Downloads\\kneron_plus_v3.1.2\\kneron_plus\\res\\firmware\\KL520\\fw_ncpu.bin" SCPU_FW_PATH_720 = "C:\\Users\\mason\\Downloads\\kneron_plus_v3.1.2\\kneron_plus\\res\\firmware\\KL720\\fw_scpu.bin" NCPU_FW_PATH_720 = "C:\\Users\\mason\\Downloads\\kneron_plus_v3.1.2\\kneron_plus\\res\\firmware\\KL720\\fw_ncpu.bin" device_list = kp.core.scan_devices() grouped_devices = defaultdict(list) for device in device_list.device_descriptor_list: grouped_devices[device.product_id].append(device.usb_port_id) print(f"Found device groups: {dict(grouped_devices)}") device_groups = [] for product_id, usb_port_id in grouped_devices.items(): try: group = kp.core.connect_devices(usb_port_id) device_groups.append(group) print(f"Successfully connected to group for product ID {product_id} with ports{usb_port_id}") except kp.ApiKPException as e: print(f"Failed to connect to group for product ID {product_id}: {e}") print(device_groups) print('[Set Device Timeout]') kp.core.set_timeout(device_group=device_groups[0], milliseconds=5000) kp.core.set_timeout(device_group=device_groups[1], milliseconds=5000) print(' - Success') try: print('[Upload Firmware]') kp.core.load_firmware_from_file(device_group=device_groups[0], scpu_fw_path=SCPU_FW_PATH_520, ncpu_fw_path=NCPU_FW_PATH_520) kp.core.load_firmware_from_file(device_group=device_groups[1], scpu_fw_path=SCPU_FW_PATH_720, ncpu_fw_path=NCPU_FW_PATH_720) print(' - Success') except kp.ApiKPException as exception: print('Error: upload firmware failed, error = \'{}\''.format(str(exception))) exit(0) print('[Upload Model]') model_nef_descriptors = [] # for group in device_groups: model_nef_descriptor = kp.core.load_model_from_file(device_group=device_groups[0], file_path=model_path[0]) model_nef_descriptors.append(model_nef_descriptor) model_nef_descriptor = kp.core.load_model_from_file(device_group=device_groups[1], file_path=model_path[1]) model_nef_descriptors.append(model_nef_descriptor) print(' - Success') """ prepare the image """ print('[Read Image]') img = cv2.imread(filename=IMAGE_FILE_PATH) img_bgr565 = cv2.cvtColor(src=img, code=cv2.COLOR_BGR2BGR565) print(' - Success') """ prepare generic image inference input descriptor """ print(model_nef_descriptors) generic_inference_input_descriptor = kp.GenericImageInferenceDescriptor( model_id=model_nef_descriptors[1].models[0].id, ) """ starting inference work """ print('[Starting Inference Work]') print(' - Starting inference loop {} times'.format(LOOP_TIME)) print(' - ', end='') result_queue = queue.Queue() send_thread = threading.Thread(target=_image_send_function, args=(device_groups[1], LOOP_TIME, generic_inference_input_descriptor, img_bgr565, kp.ImageFormat.KP_IMAGE_FORMAT_RGB565)) receive_thread = threading.Thread(target=_result_receive_function, args=(device_groups[1], LOOP_TIME, result_queue)) start_inference_time = time.time() send_thread.start() receive_thread.start() try: while send_thread.is_alive(): send_thread.join(1) while receive_thread.is_alive(): receive_thread.join(1) except (KeyboardInterrupt, SystemExit): print('\n - Received keyboard interrupt, quitting threads.') exit(0) end_inference_time = time.time() time_spent = end_inference_time - start_inference_time try: generic_raw_result = result_queue.get(timeout=3) except Exception as exception: print('Error: Result queue is empty !') exit(0) print() print('[Result]') print(" - Total inference {} images".format(LOOP_TIME)) print(" - Time spent: {:.2f} secs, FPS = {:.1f}".format(time_spent, LOOP_TIME / time_spent)) """ retrieve inference node output """ print('[Retrieve Inference Node Output ]') inf_node_output_list = [] for node_idx in range(generic_raw_result.header.num_output_node): inference_float_node_output = kp.inference.generic_inference_retrieve_float_node(node_idx=node_idx, generic_raw_result=generic_raw_result, channels_ordering=kp.ChannelOrdering.KP_CHANNEL_ORDERING_CHW) inf_node_output_list.append(inference_float_node_output) print(' - Success') print('[Result]') print(inf_node_output_list)