From fc47e5d868ef151328b2f2873e186085aaa1ad1d Mon Sep 17 00:00:00 2001 From: Mason Date: Mon, 4 Aug 2025 23:34:16 +0800 Subject: [PATCH] Add multi-series dongle manager with load balancing and order preservation MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Implement MultiSeriesDongleManager for parallel inference across different dongle series - Add GOPS-based load balancing (KL720: 1425 GOPS, KL520: 345 GOPS ratio ~4:1) - Ensure sequential result output despite heterogeneous processing speeds - Include comprehensive threading architecture with dispatcher, per-dongle workers, and result ordering - Add performance statistics and monitoring capabilities - Update project configuration and documentation 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude --- CLAUDE.md | 1 + multi_series_dongle_manager.py | 547 +++++++++++++++++++++++++++++++++ mutliseries.py | 193 ++++++++++++ pyproject.toml | 2 +- uv.lock | 8 + 5 files changed, 750 insertions(+), 1 deletion(-) create mode 100644 multi_series_dongle_manager.py create mode 100644 mutliseries.py create mode 100644 uv.lock diff --git a/CLAUDE.md b/CLAUDE.md index 2502efd..82ad345 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -12,6 +12,7 @@ You are a senior software engineer who follows Mason Huang's Test-Driven Develop - Refactor only after tests are passing - Follow Beck's "Tidy First" approach by separating structural changes from behavioral changes - Maintain high code quality throughout development +- Don't use emoji in the work # TDD METHODOLOGY GUIDANCE diff --git a/multi_series_dongle_manager.py b/multi_series_dongle_manager.py new file mode 100644 index 0000000..ce5fdff --- /dev/null +++ b/multi_series_dongle_manager.py @@ -0,0 +1,547 @@ +import kp +from collections import defaultdict, deque +from typing import Union, Dict, List, Tuple, Optional, Any +import os +import sys +import time +import threading +import queue +import numpy as np +import cv2 +from dataclasses import dataclass +from enum import Enum + +@dataclass +class InferenceTask: + sequence_id: int + image_data: np.ndarray + image_format: kp.ImageFormat + timestamp: float + +@dataclass +class InferenceResult: + sequence_id: int + result: Any + dongle_series: str + timestamp: float + +class DongleSeriesSpec: + KL520_GOPS = 345 + KL720_GOPS = 1425 + + SERIES_SPECS = { + 0x100: {"name": "KL520", "gops": KL520_GOPS}, + 0x720: {"name": "KL720", "gops": KL720_GOPS}, + 0x630: {"name": "KL630", "gops": 400}, + 0x730: {"name": "KL730", "gops": 1600}, + 0x540: {"name": "KL540", "gops": 800} + } + +class MultiSeriesDongleManager: + def __init__(self, max_queue_size: int = 100, result_buffer_size: int = 1000): + self.dongle_groups = {} # product_id -> device_group + self.dongle_specs = {} # product_id -> spec info + self.model_descriptors = {} # product_id -> model descriptor + self.inference_descriptors = {} # product_id -> inference descriptor + + # Load balancing + self.gops_weights = {} # product_id -> normalized weight + self.current_loads = {} # product_id -> current queue size + + # Threading and queues + self.input_queue = queue.Queue(maxsize=max_queue_size) + self.result_queues = {} # product_id -> queue + self.ordered_output_queue = queue.Queue() + + # Sequence management + self.sequence_counter = 0 + self.sequence_lock = threading.Lock() + self.pending_results = {} # sequence_id -> InferenceResult + self.next_output_sequence = 0 + self.result_buffer_size = result_buffer_size + + # Threading + self.stop_event = threading.Event() + self.dispatcher_thread = None + self.send_threads = {} # product_id -> thread + self.receive_threads = {} # product_id -> thread + self.result_ordering_thread = None + + # Statistics + self.stats = { + 'total_dispatched': 0, + 'total_completed': 0, + 'dongle_stats': {} # product_id -> {'sent': count, 'received': count} + } + + def scan_and_initialize_devices(self, firmware_paths: Dict[str, Dict[str, str]], + model_paths: Dict[str, str]) -> bool: + """ + Scan, connect, and initialize all available devices + + Args: + firmware_paths: {"KL520": {"scpu": path, "ncpu": path}, "KL720": {...}} + model_paths: {"KL520": model_path, "KL720": model_path} + """ + device_list = kp.core.scan_devices() + if not device_list or device_list.device_descriptor_number == 0: + print("No devices found") + return False + + # Group devices by product_id + grouped_devices = defaultdict(list) + for device in device_list.device_descriptor_list: + grouped_devices[device.product_id].append(device.usb_port_id) + + print(f"Found device groups: {dict(grouped_devices)}") + + # Connect and initialize each group + total_gops = 0 + for product_id, port_ids in grouped_devices.items(): + if product_id not in DongleSeriesSpec.SERIES_SPECS: + print(f"Unknown product ID: {hex(product_id)}") + continue + + series_info = DongleSeriesSpec.SERIES_SPECS[product_id] + series_name = series_info["name"] + + try: + # Connect device group + device_group = kp.core.connect_devices(port_ids) + self.dongle_groups[product_id] = device_group + self.dongle_specs[product_id] = series_info + + # Initialize statistics + self.stats['dongle_stats'][product_id] = {'sent': 0, 'received': 0} + self.current_loads[product_id] = 0 + + print(f"Connected to {series_name} group with ports {port_ids}") + + # Set timeout + kp.core.set_timeout(device_group=device_group, milliseconds=5000) + + # Upload firmware if provided + if series_name in firmware_paths: + fw_paths = firmware_paths[series_name] + print(f"[{series_name}] Uploading firmware...") + kp.core.load_firmware_from_file( + device_group=device_group, + scpu_fw_path=fw_paths["scpu"], + ncpu_fw_path=fw_paths["ncpu"] + ) + print(f"[{series_name}] Firmware upload success") + + # Upload model + if series_name in model_paths: + print(f"[{series_name}] Uploading model...") + model_descriptor = kp.core.load_model_from_file( + device_group=device_group, + file_path=model_paths[series_name] + ) + self.model_descriptors[product_id] = model_descriptor + + # Prepare inference descriptor + self.inference_descriptors[product_id] = kp.GenericImageInferenceDescriptor( + model_id=model_descriptor.models[0].id + ) + print(f"[{series_name}] Model upload success") + + # Create result queue for this dongle + self.result_queues[product_id] = queue.Queue() + + total_gops += series_info["gops"] * len(port_ids) + + except kp.ApiKPException as e: + print(f"Failed to initialize {series_name}: {e}") + return False + + # Calculate load balancing weights based on GOPS + for product_id, spec in self.dongle_specs.items(): + port_count = len(grouped_devices[product_id]) + effective_gops = spec["gops"] * port_count + self.gops_weights[product_id] = effective_gops / total_gops + + print(f"Load balancing weights (by GOPS): {self.gops_weights}") + return True + + def start(self): + """Start all processing threads""" + if not self.dongle_groups: + raise RuntimeError("No dongles initialized. Call scan_and_initialize_devices first.") + + self.stop_event.clear() + + # Start dispatcher thread + self.dispatcher_thread = threading.Thread(target=self._dispatcher_worker, daemon=True) + self.dispatcher_thread.start() + + # Start send/receive threads for each dongle + for product_id in self.dongle_groups.keys(): + # Send thread + send_thread = threading.Thread( + target=self._send_worker, + args=(product_id,), + daemon=True + ) + send_thread.start() + self.send_threads[product_id] = send_thread + + # Receive thread + receive_thread = threading.Thread( + target=self._receive_worker, + args=(product_id,), + daemon=True + ) + receive_thread.start() + self.receive_threads[product_id] = receive_thread + + # Start result ordering thread + self.result_ordering_thread = threading.Thread(target=self._result_ordering_worker, daemon=True) + self.result_ordering_thread.start() + + print(f"Started MultiSeriesDongleManager with {len(self.dongle_groups)} dongle series") + + def stop(self): + """Stop all threads and disconnect devices""" + print("Stopping MultiSeriesDongleManager...") + self.stop_event.set() + + # Join all threads with timeout + threads_to_join = [ + (self.dispatcher_thread, "Dispatcher"), + (self.result_ordering_thread, "Result Ordering") + ] + + for product_id in self.dongle_groups.keys(): + threads_to_join.extend([ + (self.send_threads.get(product_id), f"Send-{hex(product_id)}"), + (self.receive_threads.get(product_id), f"Receive-{hex(product_id)}") + ]) + + for thread, name in threads_to_join: + if thread and thread.is_alive(): + thread.join(timeout=2.0) + if thread.is_alive(): + print(f"Warning: {name} thread didn't stop cleanly") + + # Disconnect device groups + for product_id, device_group in self.dongle_groups.items(): + try: + kp.core.disconnect_devices(device_group) + print(f"Disconnected {hex(product_id)} device group") + except kp.ApiKPException as e: + print(f"Error disconnecting {hex(product_id)}: {e}") + + self.dongle_groups.clear() + + def put_input(self, image: Union[str, np.ndarray], image_format: str = 'BGR565') -> int: + """ + Submit an image for inference + + Returns: + int: sequence_id for tracking this inference + """ + # Process image input + if isinstance(image, str): + image_data = cv2.imread(image) + if image_data is None: + raise FileNotFoundError(f"Image file not found: {image}") + elif isinstance(image, np.ndarray): + image_data = image.copy() + else: + raise ValueError("Image must be file path or numpy array") + + # Convert format string to enum + format_mapping = { + 'BGR565': kp.ImageFormat.KP_IMAGE_FORMAT_RGB565, + 'RGB8888': kp.ImageFormat.KP_IMAGE_FORMAT_RGBA8888, + 'YUYV': kp.ImageFormat.KP_IMAGE_FORMAT_YUYV, + 'RAW8': kp.ImageFormat.KP_IMAGE_FORMAT_RAW8 + } + + image_format_enum = format_mapping.get(image_format) + if image_format_enum is None: + raise ValueError(f"Unsupported format: {image_format}") + + # Generate sequence ID + with self.sequence_lock: + sequence_id = self.sequence_counter + self.sequence_counter += 1 + + # Create task and enqueue + task = InferenceTask( + sequence_id=sequence_id, + image_data=image_data, + image_format=image_format_enum, + timestamp=time.time() + ) + + self.input_queue.put(task) + self.stats['total_dispatched'] += 1 + + return sequence_id + + def get_result(self, timeout: float = None) -> Optional[InferenceResult]: + """Get next inference result in original order""" + try: + return self.ordered_output_queue.get(block=timeout is not None, timeout=timeout) + except queue.Empty: + return None + + def _dispatcher_worker(self): + """Dispatcher thread: assigns tasks to dongles based on load balancing""" + print("Dispatcher thread started") + + while not self.stop_event.is_set(): + try: + task = self.input_queue.get(timeout=0.1) + if task is None: # Sentinel value + continue + + # Select optimal dongle based on current load and capacity + selected_product_id = self._select_optimal_dongle() + if selected_product_id is None: + continue + + # Enqueue to selected dongle + self.result_queues[selected_product_id].put(task) + self.current_loads[selected_product_id] += 1 + + except queue.Empty: + continue + except Exception as e: + print(f"Error in dispatcher: {e}") + + print("Dispatcher thread stopped") + + def _select_optimal_dongle(self) -> Optional[int]: + """Select dongle with best load/capacity ratio""" + if not self.dongle_groups: + return None + + best_ratio = float('inf') + selected_product_id = None + + for product_id in self.dongle_groups.keys(): + current_load = self.current_loads[product_id] + weight = self.gops_weights[product_id] + + # Calculate load ratio (lower is better) + load_ratio = current_load / weight if weight > 0 else float('inf') + + if load_ratio < best_ratio: + best_ratio = load_ratio + selected_product_id = product_id + + return selected_product_id + + def _send_worker(self, product_id: int): + """Send thread for specific dongle series""" + series_name = self.dongle_specs[product_id]["name"] + print(f"Send worker started for {series_name}") + + device_group = self.dongle_groups[product_id] + inference_descriptor = self.inference_descriptors[product_id] + result_queue = self.result_queues[product_id] + + while not self.stop_event.is_set(): + try: + task = result_queue.get(timeout=0.1) + if task is None: + continue + + # Configure inference descriptor + inference_descriptor.inference_number = task.sequence_id + inference_descriptor.input_node_image_list = [kp.GenericInputNodeImage( + image=task.image_data, + image_format=task.image_format, + resize_mode=kp.ResizeMode.KP_RESIZE_ENABLE, + padding_mode=kp.PaddingMode.KP_PADDING_CORNER, + normalize_mode=kp.NormalizeMode.KP_NORMALIZE_KNERON + )] + + # Send inference + kp.inference.generic_image_inference_send( + device_group=device_group, + generic_inference_input_descriptor=inference_descriptor + ) + + self.stats['dongle_stats'][product_id]['sent'] += 1 + + except queue.Empty: + continue + except kp.ApiKPException as e: + print(f"Error in {series_name} send worker: {e}") + self.stop_event.set() + except Exception as e: + print(f"Unexpected error in {series_name} send worker: {e}") + + print(f"Send worker stopped for {series_name}") + + def _receive_worker(self, product_id: int): + """Receive thread for specific dongle series""" + series_name = self.dongle_specs[product_id]["name"] + print(f"Receive worker started for {series_name}") + + device_group = self.dongle_groups[product_id] + + while not self.stop_event.is_set(): + try: + # Receive inference result + raw_result = kp.inference.generic_image_inference_receive(device_group=device_group) + + # Create result object + result = InferenceResult( + sequence_id=raw_result.header.inference_number, + result=raw_result, + dongle_series=series_name, + timestamp=time.time() + ) + + # Add to pending results for ordering + self.pending_results[result.sequence_id] = result + self.current_loads[product_id] = max(0, self.current_loads[product_id] - 1) + self.stats['dongle_stats'][product_id]['received'] += 1 + + except kp.ApiKPException as e: + if not self.stop_event.is_set(): + print(f"Error in {series_name} receive worker: {e}") + self.stop_event.set() + except Exception as e: + print(f"Unexpected error in {series_name} receive worker: {e}") + + print(f"Receive worker stopped for {series_name}") + + def _result_ordering_worker(self): + """Result ordering thread: ensures results are output in sequence order""" + print("Result ordering worker started") + + while not self.stop_event.is_set(): + # Check if next expected result is available + if self.next_output_sequence in self.pending_results: + result = self.pending_results.pop(self.next_output_sequence) + self.ordered_output_queue.put(result) + self.next_output_sequence += 1 + self.stats['total_completed'] += 1 + + # Clean up old pending results to prevent memory bloat + if len(self.pending_results) > self.result_buffer_size: + oldest_sequences = sorted(self.pending_results.keys())[:self.result_buffer_size // 2] + for seq_id in oldest_sequences: + if seq_id < self.next_output_sequence: + self.pending_results.pop(seq_id, None) + else: + time.sleep(0.001) # Small delay to prevent busy waiting + + print("Result ordering worker stopped") + + def get_statistics(self) -> Dict: + """Get performance statistics""" + stats = self.stats.copy() + stats['pending_results'] = len(self.pending_results) + stats['input_queue_size'] = self.input_queue.qsize() + stats['current_loads'] = self.current_loads.copy() + stats['gops_weights'] = self.gops_weights.copy() + return stats + + def print_statistics(self): + """Print current statistics""" + stats = self.get_statistics() + print(f"\n=== MultiSeriesDongleManager Statistics ===") + print(f"Total dispatched: {stats['total_dispatched']}") + print(f"Total completed: {stats['total_completed']}") + print(f"Pending results: {stats['pending_results']}") + print(f"Input queue size: {stats['input_queue_size']}") + + print(f"\nPer-dongle statistics:") + for product_id, dongle_stats in stats['dongle_stats'].items(): + series_name = self.dongle_specs[product_id]["name"] + current_load = stats['current_loads'][product_id] + weight = stats['gops_weights'][product_id] + print(f" {series_name}: sent={dongle_stats['sent']}, received={dongle_stats['received']}, " + f"current_load={current_load}, weight={weight:.3f}") + + +# Example usage and test +if __name__ == "__main__": + # Configuration + firmware_paths = { + "KL520": { + "scpu": r"C:\Users\mason\Downloads\kneron_plus_v3.1.2\kneron_plus\res\firmware\KL520\fw_scpu.bin", + "ncpu": r"C:\Users\mason\Downloads\kneron_plus_v3.1.2\kneron_plus\res\firmware\KL520\fw_ncpu.bin" + }, + "KL720": { + "scpu": r"C:\Users\mason\Downloads\kneron_plus_v3.1.2\kneron_plus\res\firmware\KL720\fw_scpu.bin", + "ncpu": r"C:\Users\mason\Downloads\kneron_plus_v3.1.2\kneron_plus\res\firmware\KL720\fw_ncpu.bin" + } + } + + model_paths = { + "KL520": r"C:\Users\mason\Downloads\kneron_plus_v3.1.2\kneron_plus\res\models\KL520\yolov5-noupsample_w640h640_kn-model-zoo\kl520_20005_yolov5-noupsample_w640h640.nef", + "KL720": r"C:\Users\mason\Downloads\kneron_plus_v3.1.2\kneron_plus\res\models\KL720\yolov5-noupsample_w640h640_kn-model-zoo\kl720_20005_yolov5-noupsample_w640h640.nef" + } + + image_path = r"c:\Users\mason\Downloads\kneron_plus_v3.1.2\kneron_plus\res\images\people_talk_in_street_640x640.bmp" + + try: + # Initialize manager + manager = MultiSeriesDongleManager() + + if not manager.scan_and_initialize_devices(firmware_paths, model_paths): + print("Failed to initialize devices") + sys.exit(1) + + # Start processing + manager.start() + + # Submit test images + num_images = 50 + print(f"\nSubmitting {num_images} images for inference...") + + start_time = time.time() + sequence_ids = [] + + for i in range(num_images): + seq_id = manager.put_input(image_path, 'BGR565') + sequence_ids.append(seq_id) + if (i + 1) % 10 == 0: + print(f"Submitted {i + 1} images") + + # Collect results + print(f"\nCollecting results...") + results = [] + + for i in range(num_images): + result = manager.get_result(timeout=10.0) + if result: + results.append(result) + if (i + 1) % 10 == 0: + print(f"Received {i + 1} results") + else: + print(f"Timeout waiting for result {i + 1}") + break + + end_time = time.time() + + # Print results + print(f"\n=== Test Results ===") + print(f"Total time: {end_time - start_time:.2f} seconds") + print(f"Average FPS: {len(results) / (end_time - start_time):.2f}") + print(f"Results received: {len(results)}/{num_images}") + + # Verify ordering + is_ordered = all(results[i].sequence_id == sequence_ids[i] for i in range(len(results))) + print(f"Results in correct order: {is_ordered}") + + # Show dongle utilization + manager.print_statistics() + + except KeyboardInterrupt: + print("\nInterrupted by user") + except Exception as e: + print(f"Error: {e}") + import traceback + traceback.print_exc() + finally: + if 'manager' in locals(): + manager.stop() \ No newline at end of file diff --git a/mutliseries.py b/mutliseries.py new file mode 100644 index 0000000..2501742 --- /dev/null +++ b/mutliseries.py @@ -0,0 +1,193 @@ +import kp +from collections import defaultdict +from typing import Union +import os +import sys +import argparse +import time +import threading +import queue +import numpy as np +import cv2 + +# PWD = os.path.dirname(os.path.abspath(__file__)) +# sys.path.insert(1, os.path.join(PWD, '..')) +IMAGE_FILE_PATH = r"c:\Users\mason\Downloads\kneron_plus_v3.1.2\kneron_plus\res\images\people_talk_in_street_640x640.bmp" +LOOP_TIME = 100 + + +def _image_send_function(_device_group: kp.DeviceGroup, + _loop_time: int, + _generic_inference_input_descriptor: kp.GenericImageInferenceDescriptor, + _image: Union[bytes, np.ndarray], + _image_format: kp.ImageFormat) -> None: + for _loop in range(_loop_time): + try: + _generic_inference_input_descriptor.inference_number = _loop + _generic_inference_input_descriptor.input_node_image_list = [kp.GenericInputNodeImage( + image=_image, + image_format=_image_format, + resize_mode=kp.ResizeMode.KP_RESIZE_ENABLE, + padding_mode=kp.PaddingMode.KP_PADDING_CORNER, + normalize_mode=kp.NormalizeMode.KP_NORMALIZE_KNERON + )] + + kp.inference.generic_image_inference_send(device_group=device_groups[1], + generic_inference_input_descriptor=_generic_inference_input_descriptor) + except kp.ApiKPException as exception: + print(' - Error: inference failed, error = {}'.format(exception)) + exit(0) + + +def _result_receive_function(_device_group: kp.DeviceGroup, + _loop_time: int, + _result_queue: queue.Queue) -> None: + _generic_raw_result = None + + for _loop in range(_loop_time): + try: + _generic_raw_result = kp.inference.generic_image_inference_receive(device_group=device_groups[1]) + + if _generic_raw_result.header.inference_number != _loop: + print(' - Error: incorrect inference_number {} at frame {}'.format( + _generic_raw_result.header.inference_number, _loop)) + + print('.', end='', flush=True) + + except kp.ApiKPException as exception: + print(' - Error: inference failed, error = {}'.format(exception)) + exit(0) + + _result_queue.put(_generic_raw_result) + +model_path = ["C:\\Users\\mason\\Downloads\\kneron_plus_v3.1.2\\kneron_plus\\res\\models\\KL520\\yolov5-noupsample_w640h640_kn-model-zoo\\kl520_20005_yolov5-noupsample_w640h640.nef", r"C:\Users\mason\Downloads\kneron_plus_v3.1.2\kneron_plus\res\models\KL720\yolov5-noupsample_w640h640_kn-model-zoo\kl720_20005_yolov5-noupsample_w640h640.nef"] +SCPU_FW_PATH_520 = "C:\\Users\\mason\\Downloads\\kneron_plus_v3.1.2\\kneron_plus\\res\\firmware\\KL520\\fw_scpu.bin" +NCPU_FW_PATH_520 = "C:\\Users\\mason\\Downloads\\kneron_plus_v3.1.2\\kneron_plus\\res\\firmware\\KL520\\fw_ncpu.bin" +SCPU_FW_PATH_720 = "C:\\Users\\mason\\Downloads\\kneron_plus_v3.1.2\\kneron_plus\\res\\firmware\\KL720\\fw_scpu.bin" +NCPU_FW_PATH_720 = "C:\\Users\\mason\\Downloads\\kneron_plus_v3.1.2\\kneron_plus\\res\\firmware\\KL720\\fw_ncpu.bin" +device_list = kp.core.scan_devices() + +grouped_devices = defaultdict(list) + +for device in device_list.device_descriptor_list: + grouped_devices[device.product_id].append(device.usb_port_id) + +print(f"Found device groups: {dict(grouped_devices)}") + +device_groups = [] + +for product_id, usb_port_id in grouped_devices.items(): + try: + group = kp.core.connect_devices(usb_port_id) + device_groups.append(group) + print(f"Successfully connected to group for product ID {product_id} with ports{usb_port_id}") + except kp.ApiKPException as e: + print(f"Failed to connect to group for product ID {product_id}: {e}") + +print(device_groups) + +print('[Set Device Timeout]') +kp.core.set_timeout(device_group=device_groups[0], milliseconds=5000) +kp.core.set_timeout(device_group=device_groups[1], milliseconds=5000) +print(' - Success') + +try: + print('[Upload Firmware]') + kp.core.load_firmware_from_file(device_group=device_groups[0], + scpu_fw_path=SCPU_FW_PATH_520, + ncpu_fw_path=NCPU_FW_PATH_520) + kp.core.load_firmware_from_file(device_group=device_groups[1], + scpu_fw_path=SCPU_FW_PATH_720, + ncpu_fw_path=NCPU_FW_PATH_720) + print(' - Success') +except kp.ApiKPException as exception: + print('Error: upload firmware failed, error = \'{}\''.format(str(exception))) + exit(0) + +print('[Upload Model]') +model_nef_descriptors = [] +# for group in device_groups: +model_nef_descriptor = kp.core.load_model_from_file(device_group=device_groups[0], file_path=model_path[0]) +model_nef_descriptors.append(model_nef_descriptor) +model_nef_descriptor = kp.core.load_model_from_file(device_group=device_groups[1], file_path=model_path[1]) +model_nef_descriptors.append(model_nef_descriptor) +print(' - Success') + +""" +prepare the image +""" +print('[Read Image]') +img = cv2.imread(filename=IMAGE_FILE_PATH) +img_bgr565 = cv2.cvtColor(src=img, code=cv2.COLOR_BGR2BGR565) +print(' - Success') + +""" +prepare generic image inference input descriptor +""" +print(model_nef_descriptors) +generic_inference_input_descriptor = kp.GenericImageInferenceDescriptor( + model_id=model_nef_descriptors[1].models[0].id, +) + +""" +starting inference work +""" +print('[Starting Inference Work]') +print(' - Starting inference loop {} times'.format(LOOP_TIME)) +print(' - ', end='') +result_queue = queue.Queue() + +send_thread = threading.Thread(target=_image_send_function, args=(device_groups[1], + LOOP_TIME, + generic_inference_input_descriptor, + img_bgr565, + kp.ImageFormat.KP_IMAGE_FORMAT_RGB565)) + +receive_thread = threading.Thread(target=_result_receive_function, args=(device_groups[1], + LOOP_TIME, + result_queue)) + +start_inference_time = time.time() + +send_thread.start() +receive_thread.start() + +try: + while send_thread.is_alive(): + send_thread.join(1) + + while receive_thread.is_alive(): + receive_thread.join(1) +except (KeyboardInterrupt, SystemExit): + print('\n - Received keyboard interrupt, quitting threads.') + exit(0) + +end_inference_time = time.time() +time_spent = end_inference_time - start_inference_time + +try: + generic_raw_result = result_queue.get(timeout=3) +except Exception as exception: + print('Error: Result queue is empty !') + exit(0) +print() + +print('[Result]') +print(" - Total inference {} images".format(LOOP_TIME)) +print(" - Time spent: {:.2f} secs, FPS = {:.1f}".format(time_spent, LOOP_TIME / time_spent)) + +""" +retrieve inference node output +""" +print('[Retrieve Inference Node Output ]') +inf_node_output_list = [] +for node_idx in range(generic_raw_result.header.num_output_node): + inference_float_node_output = kp.inference.generic_inference_retrieve_float_node(node_idx=node_idx, + generic_raw_result=generic_raw_result, + channels_ordering=kp.ChannelOrdering.KP_CHANNEL_ORDERING_CHW) + inf_node_output_list.append(inference_float_node_output) + +print(' - Success') + +print('[Result]') +print(inf_node_output_list) \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index 7e6c98b..38b0783 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "cluster4npu" -version = "0.1.0" +version = "0.0.3" description = "Add your description here" readme = "README.md" requires-python = ">=3.9, <3.12" diff --git a/uv.lock b/uv.lock new file mode 100644 index 0000000..be041db --- /dev/null +++ b/uv.lock @@ -0,0 +1,8 @@ +version = 1 +revision = 3 +requires-python = ">=3.9, <3.12" + +[[package]] +name = "cluster4npu" +version = "0.0.3" +source = { virtual = "." }