Add multi-series dongle manager with load balancing and order preservation
- Implement MultiSeriesDongleManager for parallel inference across different dongle series - Add GOPS-based load balancing (KL720: 1425 GOPS, KL520: 345 GOPS ratio ~4:1) - Ensure sequential result output despite heterogeneous processing speeds - Include comprehensive threading architecture with dispatcher, per-dongle workers, and result ordering - Add performance statistics and monitoring capabilities - Update project configuration and documentation 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
parent
b518d75299
commit
fc47e5d868
@ -12,6 +12,7 @@ You are a senior software engineer who follows Mason Huang's Test-Driven Develop
|
||||
- Refactor only after tests are passing
|
||||
- Follow Beck's "Tidy First" approach by separating structural changes from behavioral changes
|
||||
- Maintain high code quality throughout development
|
||||
- Don't use emoji in the work
|
||||
|
||||
# TDD METHODOLOGY GUIDANCE
|
||||
|
||||
|
||||
547
multi_series_dongle_manager.py
Normal file
547
multi_series_dongle_manager.py
Normal file
@ -0,0 +1,547 @@
|
||||
import kp
|
||||
from collections import defaultdict, deque
|
||||
from typing import Union, Dict, List, Tuple, Optional, Any
|
||||
import os
|
||||
import sys
|
||||
import time
|
||||
import threading
|
||||
import queue
|
||||
import numpy as np
|
||||
import cv2
|
||||
from dataclasses import dataclass
|
||||
from enum import Enum
|
||||
|
||||
@dataclass
|
||||
class InferenceTask:
|
||||
sequence_id: int
|
||||
image_data: np.ndarray
|
||||
image_format: kp.ImageFormat
|
||||
timestamp: float
|
||||
|
||||
@dataclass
|
||||
class InferenceResult:
|
||||
sequence_id: int
|
||||
result: Any
|
||||
dongle_series: str
|
||||
timestamp: float
|
||||
|
||||
class DongleSeriesSpec:
|
||||
KL520_GOPS = 345
|
||||
KL720_GOPS = 1425
|
||||
|
||||
SERIES_SPECS = {
|
||||
0x100: {"name": "KL520", "gops": KL520_GOPS},
|
||||
0x720: {"name": "KL720", "gops": KL720_GOPS},
|
||||
0x630: {"name": "KL630", "gops": 400},
|
||||
0x730: {"name": "KL730", "gops": 1600},
|
||||
0x540: {"name": "KL540", "gops": 800}
|
||||
}
|
||||
|
||||
class MultiSeriesDongleManager:
|
||||
def __init__(self, max_queue_size: int = 100, result_buffer_size: int = 1000):
|
||||
self.dongle_groups = {} # product_id -> device_group
|
||||
self.dongle_specs = {} # product_id -> spec info
|
||||
self.model_descriptors = {} # product_id -> model descriptor
|
||||
self.inference_descriptors = {} # product_id -> inference descriptor
|
||||
|
||||
# Load balancing
|
||||
self.gops_weights = {} # product_id -> normalized weight
|
||||
self.current_loads = {} # product_id -> current queue size
|
||||
|
||||
# Threading and queues
|
||||
self.input_queue = queue.Queue(maxsize=max_queue_size)
|
||||
self.result_queues = {} # product_id -> queue
|
||||
self.ordered_output_queue = queue.Queue()
|
||||
|
||||
# Sequence management
|
||||
self.sequence_counter = 0
|
||||
self.sequence_lock = threading.Lock()
|
||||
self.pending_results = {} # sequence_id -> InferenceResult
|
||||
self.next_output_sequence = 0
|
||||
self.result_buffer_size = result_buffer_size
|
||||
|
||||
# Threading
|
||||
self.stop_event = threading.Event()
|
||||
self.dispatcher_thread = None
|
||||
self.send_threads = {} # product_id -> thread
|
||||
self.receive_threads = {} # product_id -> thread
|
||||
self.result_ordering_thread = None
|
||||
|
||||
# Statistics
|
||||
self.stats = {
|
||||
'total_dispatched': 0,
|
||||
'total_completed': 0,
|
||||
'dongle_stats': {} # product_id -> {'sent': count, 'received': count}
|
||||
}
|
||||
|
||||
def scan_and_initialize_devices(self, firmware_paths: Dict[str, Dict[str, str]],
|
||||
model_paths: Dict[str, str]) -> bool:
|
||||
"""
|
||||
Scan, connect, and initialize all available devices
|
||||
|
||||
Args:
|
||||
firmware_paths: {"KL520": {"scpu": path, "ncpu": path}, "KL720": {...}}
|
||||
model_paths: {"KL520": model_path, "KL720": model_path}
|
||||
"""
|
||||
device_list = kp.core.scan_devices()
|
||||
if not device_list or device_list.device_descriptor_number == 0:
|
||||
print("No devices found")
|
||||
return False
|
||||
|
||||
# Group devices by product_id
|
||||
grouped_devices = defaultdict(list)
|
||||
for device in device_list.device_descriptor_list:
|
||||
grouped_devices[device.product_id].append(device.usb_port_id)
|
||||
|
||||
print(f"Found device groups: {dict(grouped_devices)}")
|
||||
|
||||
# Connect and initialize each group
|
||||
total_gops = 0
|
||||
for product_id, port_ids in grouped_devices.items():
|
||||
if product_id not in DongleSeriesSpec.SERIES_SPECS:
|
||||
print(f"Unknown product ID: {hex(product_id)}")
|
||||
continue
|
||||
|
||||
series_info = DongleSeriesSpec.SERIES_SPECS[product_id]
|
||||
series_name = series_info["name"]
|
||||
|
||||
try:
|
||||
# Connect device group
|
||||
device_group = kp.core.connect_devices(port_ids)
|
||||
self.dongle_groups[product_id] = device_group
|
||||
self.dongle_specs[product_id] = series_info
|
||||
|
||||
# Initialize statistics
|
||||
self.stats['dongle_stats'][product_id] = {'sent': 0, 'received': 0}
|
||||
self.current_loads[product_id] = 0
|
||||
|
||||
print(f"Connected to {series_name} group with ports {port_ids}")
|
||||
|
||||
# Set timeout
|
||||
kp.core.set_timeout(device_group=device_group, milliseconds=5000)
|
||||
|
||||
# Upload firmware if provided
|
||||
if series_name in firmware_paths:
|
||||
fw_paths = firmware_paths[series_name]
|
||||
print(f"[{series_name}] Uploading firmware...")
|
||||
kp.core.load_firmware_from_file(
|
||||
device_group=device_group,
|
||||
scpu_fw_path=fw_paths["scpu"],
|
||||
ncpu_fw_path=fw_paths["ncpu"]
|
||||
)
|
||||
print(f"[{series_name}] Firmware upload success")
|
||||
|
||||
# Upload model
|
||||
if series_name in model_paths:
|
||||
print(f"[{series_name}] Uploading model...")
|
||||
model_descriptor = kp.core.load_model_from_file(
|
||||
device_group=device_group,
|
||||
file_path=model_paths[series_name]
|
||||
)
|
||||
self.model_descriptors[product_id] = model_descriptor
|
||||
|
||||
# Prepare inference descriptor
|
||||
self.inference_descriptors[product_id] = kp.GenericImageInferenceDescriptor(
|
||||
model_id=model_descriptor.models[0].id
|
||||
)
|
||||
print(f"[{series_name}] Model upload success")
|
||||
|
||||
# Create result queue for this dongle
|
||||
self.result_queues[product_id] = queue.Queue()
|
||||
|
||||
total_gops += series_info["gops"] * len(port_ids)
|
||||
|
||||
except kp.ApiKPException as e:
|
||||
print(f"Failed to initialize {series_name}: {e}")
|
||||
return False
|
||||
|
||||
# Calculate load balancing weights based on GOPS
|
||||
for product_id, spec in self.dongle_specs.items():
|
||||
port_count = len(grouped_devices[product_id])
|
||||
effective_gops = spec["gops"] * port_count
|
||||
self.gops_weights[product_id] = effective_gops / total_gops
|
||||
|
||||
print(f"Load balancing weights (by GOPS): {self.gops_weights}")
|
||||
return True
|
||||
|
||||
def start(self):
|
||||
"""Start all processing threads"""
|
||||
if not self.dongle_groups:
|
||||
raise RuntimeError("No dongles initialized. Call scan_and_initialize_devices first.")
|
||||
|
||||
self.stop_event.clear()
|
||||
|
||||
# Start dispatcher thread
|
||||
self.dispatcher_thread = threading.Thread(target=self._dispatcher_worker, daemon=True)
|
||||
self.dispatcher_thread.start()
|
||||
|
||||
# Start send/receive threads for each dongle
|
||||
for product_id in self.dongle_groups.keys():
|
||||
# Send thread
|
||||
send_thread = threading.Thread(
|
||||
target=self._send_worker,
|
||||
args=(product_id,),
|
||||
daemon=True
|
||||
)
|
||||
send_thread.start()
|
||||
self.send_threads[product_id] = send_thread
|
||||
|
||||
# Receive thread
|
||||
receive_thread = threading.Thread(
|
||||
target=self._receive_worker,
|
||||
args=(product_id,),
|
||||
daemon=True
|
||||
)
|
||||
receive_thread.start()
|
||||
self.receive_threads[product_id] = receive_thread
|
||||
|
||||
# Start result ordering thread
|
||||
self.result_ordering_thread = threading.Thread(target=self._result_ordering_worker, daemon=True)
|
||||
self.result_ordering_thread.start()
|
||||
|
||||
print(f"Started MultiSeriesDongleManager with {len(self.dongle_groups)} dongle series")
|
||||
|
||||
def stop(self):
|
||||
"""Stop all threads and disconnect devices"""
|
||||
print("Stopping MultiSeriesDongleManager...")
|
||||
self.stop_event.set()
|
||||
|
||||
# Join all threads with timeout
|
||||
threads_to_join = [
|
||||
(self.dispatcher_thread, "Dispatcher"),
|
||||
(self.result_ordering_thread, "Result Ordering")
|
||||
]
|
||||
|
||||
for product_id in self.dongle_groups.keys():
|
||||
threads_to_join.extend([
|
||||
(self.send_threads.get(product_id), f"Send-{hex(product_id)}"),
|
||||
(self.receive_threads.get(product_id), f"Receive-{hex(product_id)}")
|
||||
])
|
||||
|
||||
for thread, name in threads_to_join:
|
||||
if thread and thread.is_alive():
|
||||
thread.join(timeout=2.0)
|
||||
if thread.is_alive():
|
||||
print(f"Warning: {name} thread didn't stop cleanly")
|
||||
|
||||
# Disconnect device groups
|
||||
for product_id, device_group in self.dongle_groups.items():
|
||||
try:
|
||||
kp.core.disconnect_devices(device_group)
|
||||
print(f"Disconnected {hex(product_id)} device group")
|
||||
except kp.ApiKPException as e:
|
||||
print(f"Error disconnecting {hex(product_id)}: {e}")
|
||||
|
||||
self.dongle_groups.clear()
|
||||
|
||||
def put_input(self, image: Union[str, np.ndarray], image_format: str = 'BGR565') -> int:
|
||||
"""
|
||||
Submit an image for inference
|
||||
|
||||
Returns:
|
||||
int: sequence_id for tracking this inference
|
||||
"""
|
||||
# Process image input
|
||||
if isinstance(image, str):
|
||||
image_data = cv2.imread(image)
|
||||
if image_data is None:
|
||||
raise FileNotFoundError(f"Image file not found: {image}")
|
||||
elif isinstance(image, np.ndarray):
|
||||
image_data = image.copy()
|
||||
else:
|
||||
raise ValueError("Image must be file path or numpy array")
|
||||
|
||||
# Convert format string to enum
|
||||
format_mapping = {
|
||||
'BGR565': kp.ImageFormat.KP_IMAGE_FORMAT_RGB565,
|
||||
'RGB8888': kp.ImageFormat.KP_IMAGE_FORMAT_RGBA8888,
|
||||
'YUYV': kp.ImageFormat.KP_IMAGE_FORMAT_YUYV,
|
||||
'RAW8': kp.ImageFormat.KP_IMAGE_FORMAT_RAW8
|
||||
}
|
||||
|
||||
image_format_enum = format_mapping.get(image_format)
|
||||
if image_format_enum is None:
|
||||
raise ValueError(f"Unsupported format: {image_format}")
|
||||
|
||||
# Generate sequence ID
|
||||
with self.sequence_lock:
|
||||
sequence_id = self.sequence_counter
|
||||
self.sequence_counter += 1
|
||||
|
||||
# Create task and enqueue
|
||||
task = InferenceTask(
|
||||
sequence_id=sequence_id,
|
||||
image_data=image_data,
|
||||
image_format=image_format_enum,
|
||||
timestamp=time.time()
|
||||
)
|
||||
|
||||
self.input_queue.put(task)
|
||||
self.stats['total_dispatched'] += 1
|
||||
|
||||
return sequence_id
|
||||
|
||||
def get_result(self, timeout: float = None) -> Optional[InferenceResult]:
|
||||
"""Get next inference result in original order"""
|
||||
try:
|
||||
return self.ordered_output_queue.get(block=timeout is not None, timeout=timeout)
|
||||
except queue.Empty:
|
||||
return None
|
||||
|
||||
def _dispatcher_worker(self):
|
||||
"""Dispatcher thread: assigns tasks to dongles based on load balancing"""
|
||||
print("Dispatcher thread started")
|
||||
|
||||
while not self.stop_event.is_set():
|
||||
try:
|
||||
task = self.input_queue.get(timeout=0.1)
|
||||
if task is None: # Sentinel value
|
||||
continue
|
||||
|
||||
# Select optimal dongle based on current load and capacity
|
||||
selected_product_id = self._select_optimal_dongle()
|
||||
if selected_product_id is None:
|
||||
continue
|
||||
|
||||
# Enqueue to selected dongle
|
||||
self.result_queues[selected_product_id].put(task)
|
||||
self.current_loads[selected_product_id] += 1
|
||||
|
||||
except queue.Empty:
|
||||
continue
|
||||
except Exception as e:
|
||||
print(f"Error in dispatcher: {e}")
|
||||
|
||||
print("Dispatcher thread stopped")
|
||||
|
||||
def _select_optimal_dongle(self) -> Optional[int]:
|
||||
"""Select dongle with best load/capacity ratio"""
|
||||
if not self.dongle_groups:
|
||||
return None
|
||||
|
||||
best_ratio = float('inf')
|
||||
selected_product_id = None
|
||||
|
||||
for product_id in self.dongle_groups.keys():
|
||||
current_load = self.current_loads[product_id]
|
||||
weight = self.gops_weights[product_id]
|
||||
|
||||
# Calculate load ratio (lower is better)
|
||||
load_ratio = current_load / weight if weight > 0 else float('inf')
|
||||
|
||||
if load_ratio < best_ratio:
|
||||
best_ratio = load_ratio
|
||||
selected_product_id = product_id
|
||||
|
||||
return selected_product_id
|
||||
|
||||
def _send_worker(self, product_id: int):
|
||||
"""Send thread for specific dongle series"""
|
||||
series_name = self.dongle_specs[product_id]["name"]
|
||||
print(f"Send worker started for {series_name}")
|
||||
|
||||
device_group = self.dongle_groups[product_id]
|
||||
inference_descriptor = self.inference_descriptors[product_id]
|
||||
result_queue = self.result_queues[product_id]
|
||||
|
||||
while not self.stop_event.is_set():
|
||||
try:
|
||||
task = result_queue.get(timeout=0.1)
|
||||
if task is None:
|
||||
continue
|
||||
|
||||
# Configure inference descriptor
|
||||
inference_descriptor.inference_number = task.sequence_id
|
||||
inference_descriptor.input_node_image_list = [kp.GenericInputNodeImage(
|
||||
image=task.image_data,
|
||||
image_format=task.image_format,
|
||||
resize_mode=kp.ResizeMode.KP_RESIZE_ENABLE,
|
||||
padding_mode=kp.PaddingMode.KP_PADDING_CORNER,
|
||||
normalize_mode=kp.NormalizeMode.KP_NORMALIZE_KNERON
|
||||
)]
|
||||
|
||||
# Send inference
|
||||
kp.inference.generic_image_inference_send(
|
||||
device_group=device_group,
|
||||
generic_inference_input_descriptor=inference_descriptor
|
||||
)
|
||||
|
||||
self.stats['dongle_stats'][product_id]['sent'] += 1
|
||||
|
||||
except queue.Empty:
|
||||
continue
|
||||
except kp.ApiKPException as e:
|
||||
print(f"Error in {series_name} send worker: {e}")
|
||||
self.stop_event.set()
|
||||
except Exception as e:
|
||||
print(f"Unexpected error in {series_name} send worker: {e}")
|
||||
|
||||
print(f"Send worker stopped for {series_name}")
|
||||
|
||||
def _receive_worker(self, product_id: int):
|
||||
"""Receive thread for specific dongle series"""
|
||||
series_name = self.dongle_specs[product_id]["name"]
|
||||
print(f"Receive worker started for {series_name}")
|
||||
|
||||
device_group = self.dongle_groups[product_id]
|
||||
|
||||
while not self.stop_event.is_set():
|
||||
try:
|
||||
# Receive inference result
|
||||
raw_result = kp.inference.generic_image_inference_receive(device_group=device_group)
|
||||
|
||||
# Create result object
|
||||
result = InferenceResult(
|
||||
sequence_id=raw_result.header.inference_number,
|
||||
result=raw_result,
|
||||
dongle_series=series_name,
|
||||
timestamp=time.time()
|
||||
)
|
||||
|
||||
# Add to pending results for ordering
|
||||
self.pending_results[result.sequence_id] = result
|
||||
self.current_loads[product_id] = max(0, self.current_loads[product_id] - 1)
|
||||
self.stats['dongle_stats'][product_id]['received'] += 1
|
||||
|
||||
except kp.ApiKPException as e:
|
||||
if not self.stop_event.is_set():
|
||||
print(f"Error in {series_name} receive worker: {e}")
|
||||
self.stop_event.set()
|
||||
except Exception as e:
|
||||
print(f"Unexpected error in {series_name} receive worker: {e}")
|
||||
|
||||
print(f"Receive worker stopped for {series_name}")
|
||||
|
||||
def _result_ordering_worker(self):
|
||||
"""Result ordering thread: ensures results are output in sequence order"""
|
||||
print("Result ordering worker started")
|
||||
|
||||
while not self.stop_event.is_set():
|
||||
# Check if next expected result is available
|
||||
if self.next_output_sequence in self.pending_results:
|
||||
result = self.pending_results.pop(self.next_output_sequence)
|
||||
self.ordered_output_queue.put(result)
|
||||
self.next_output_sequence += 1
|
||||
self.stats['total_completed'] += 1
|
||||
|
||||
# Clean up old pending results to prevent memory bloat
|
||||
if len(self.pending_results) > self.result_buffer_size:
|
||||
oldest_sequences = sorted(self.pending_results.keys())[:self.result_buffer_size // 2]
|
||||
for seq_id in oldest_sequences:
|
||||
if seq_id < self.next_output_sequence:
|
||||
self.pending_results.pop(seq_id, None)
|
||||
else:
|
||||
time.sleep(0.001) # Small delay to prevent busy waiting
|
||||
|
||||
print("Result ordering worker stopped")
|
||||
|
||||
def get_statistics(self) -> Dict:
|
||||
"""Get performance statistics"""
|
||||
stats = self.stats.copy()
|
||||
stats['pending_results'] = len(self.pending_results)
|
||||
stats['input_queue_size'] = self.input_queue.qsize()
|
||||
stats['current_loads'] = self.current_loads.copy()
|
||||
stats['gops_weights'] = self.gops_weights.copy()
|
||||
return stats
|
||||
|
||||
def print_statistics(self):
|
||||
"""Print current statistics"""
|
||||
stats = self.get_statistics()
|
||||
print(f"\n=== MultiSeriesDongleManager Statistics ===")
|
||||
print(f"Total dispatched: {stats['total_dispatched']}")
|
||||
print(f"Total completed: {stats['total_completed']}")
|
||||
print(f"Pending results: {stats['pending_results']}")
|
||||
print(f"Input queue size: {stats['input_queue_size']}")
|
||||
|
||||
print(f"\nPer-dongle statistics:")
|
||||
for product_id, dongle_stats in stats['dongle_stats'].items():
|
||||
series_name = self.dongle_specs[product_id]["name"]
|
||||
current_load = stats['current_loads'][product_id]
|
||||
weight = stats['gops_weights'][product_id]
|
||||
print(f" {series_name}: sent={dongle_stats['sent']}, received={dongle_stats['received']}, "
|
||||
f"current_load={current_load}, weight={weight:.3f}")
|
||||
|
||||
|
||||
# Example usage and test
|
||||
if __name__ == "__main__":
|
||||
# Configuration
|
||||
firmware_paths = {
|
||||
"KL520": {
|
||||
"scpu": r"C:\Users\mason\Downloads\kneron_plus_v3.1.2\kneron_plus\res\firmware\KL520\fw_scpu.bin",
|
||||
"ncpu": r"C:\Users\mason\Downloads\kneron_plus_v3.1.2\kneron_plus\res\firmware\KL520\fw_ncpu.bin"
|
||||
},
|
||||
"KL720": {
|
||||
"scpu": r"C:\Users\mason\Downloads\kneron_plus_v3.1.2\kneron_plus\res\firmware\KL720\fw_scpu.bin",
|
||||
"ncpu": r"C:\Users\mason\Downloads\kneron_plus_v3.1.2\kneron_plus\res\firmware\KL720\fw_ncpu.bin"
|
||||
}
|
||||
}
|
||||
|
||||
model_paths = {
|
||||
"KL520": r"C:\Users\mason\Downloads\kneron_plus_v3.1.2\kneron_plus\res\models\KL520\yolov5-noupsample_w640h640_kn-model-zoo\kl520_20005_yolov5-noupsample_w640h640.nef",
|
||||
"KL720": r"C:\Users\mason\Downloads\kneron_plus_v3.1.2\kneron_plus\res\models\KL720\yolov5-noupsample_w640h640_kn-model-zoo\kl720_20005_yolov5-noupsample_w640h640.nef"
|
||||
}
|
||||
|
||||
image_path = r"c:\Users\mason\Downloads\kneron_plus_v3.1.2\kneron_plus\res\images\people_talk_in_street_640x640.bmp"
|
||||
|
||||
try:
|
||||
# Initialize manager
|
||||
manager = MultiSeriesDongleManager()
|
||||
|
||||
if not manager.scan_and_initialize_devices(firmware_paths, model_paths):
|
||||
print("Failed to initialize devices")
|
||||
sys.exit(1)
|
||||
|
||||
# Start processing
|
||||
manager.start()
|
||||
|
||||
# Submit test images
|
||||
num_images = 50
|
||||
print(f"\nSubmitting {num_images} images for inference...")
|
||||
|
||||
start_time = time.time()
|
||||
sequence_ids = []
|
||||
|
||||
for i in range(num_images):
|
||||
seq_id = manager.put_input(image_path, 'BGR565')
|
||||
sequence_ids.append(seq_id)
|
||||
if (i + 1) % 10 == 0:
|
||||
print(f"Submitted {i + 1} images")
|
||||
|
||||
# Collect results
|
||||
print(f"\nCollecting results...")
|
||||
results = []
|
||||
|
||||
for i in range(num_images):
|
||||
result = manager.get_result(timeout=10.0)
|
||||
if result:
|
||||
results.append(result)
|
||||
if (i + 1) % 10 == 0:
|
||||
print(f"Received {i + 1} results")
|
||||
else:
|
||||
print(f"Timeout waiting for result {i + 1}")
|
||||
break
|
||||
|
||||
end_time = time.time()
|
||||
|
||||
# Print results
|
||||
print(f"\n=== Test Results ===")
|
||||
print(f"Total time: {end_time - start_time:.2f} seconds")
|
||||
print(f"Average FPS: {len(results) / (end_time - start_time):.2f}")
|
||||
print(f"Results received: {len(results)}/{num_images}")
|
||||
|
||||
# Verify ordering
|
||||
is_ordered = all(results[i].sequence_id == sequence_ids[i] for i in range(len(results)))
|
||||
print(f"Results in correct order: {is_ordered}")
|
||||
|
||||
# Show dongle utilization
|
||||
manager.print_statistics()
|
||||
|
||||
except KeyboardInterrupt:
|
||||
print("\nInterrupted by user")
|
||||
except Exception as e:
|
||||
print(f"Error: {e}")
|
||||
import traceback
|
||||
traceback.print_exc()
|
||||
finally:
|
||||
if 'manager' in locals():
|
||||
manager.stop()
|
||||
193
mutliseries.py
Normal file
193
mutliseries.py
Normal file
@ -0,0 +1,193 @@
|
||||
import kp
|
||||
from collections import defaultdict
|
||||
from typing import Union
|
||||
import os
|
||||
import sys
|
||||
import argparse
|
||||
import time
|
||||
import threading
|
||||
import queue
|
||||
import numpy as np
|
||||
import cv2
|
||||
|
||||
# PWD = os.path.dirname(os.path.abspath(__file__))
|
||||
# sys.path.insert(1, os.path.join(PWD, '..'))
|
||||
IMAGE_FILE_PATH = r"c:\Users\mason\Downloads\kneron_plus_v3.1.2\kneron_plus\res\images\people_talk_in_street_640x640.bmp"
|
||||
LOOP_TIME = 100
|
||||
|
||||
|
||||
def _image_send_function(_device_group: kp.DeviceGroup,
|
||||
_loop_time: int,
|
||||
_generic_inference_input_descriptor: kp.GenericImageInferenceDescriptor,
|
||||
_image: Union[bytes, np.ndarray],
|
||||
_image_format: kp.ImageFormat) -> None:
|
||||
for _loop in range(_loop_time):
|
||||
try:
|
||||
_generic_inference_input_descriptor.inference_number = _loop
|
||||
_generic_inference_input_descriptor.input_node_image_list = [kp.GenericInputNodeImage(
|
||||
image=_image,
|
||||
image_format=_image_format,
|
||||
resize_mode=kp.ResizeMode.KP_RESIZE_ENABLE,
|
||||
padding_mode=kp.PaddingMode.KP_PADDING_CORNER,
|
||||
normalize_mode=kp.NormalizeMode.KP_NORMALIZE_KNERON
|
||||
)]
|
||||
|
||||
kp.inference.generic_image_inference_send(device_group=device_groups[1],
|
||||
generic_inference_input_descriptor=_generic_inference_input_descriptor)
|
||||
except kp.ApiKPException as exception:
|
||||
print(' - Error: inference failed, error = {}'.format(exception))
|
||||
exit(0)
|
||||
|
||||
|
||||
def _result_receive_function(_device_group: kp.DeviceGroup,
|
||||
_loop_time: int,
|
||||
_result_queue: queue.Queue) -> None:
|
||||
_generic_raw_result = None
|
||||
|
||||
for _loop in range(_loop_time):
|
||||
try:
|
||||
_generic_raw_result = kp.inference.generic_image_inference_receive(device_group=device_groups[1])
|
||||
|
||||
if _generic_raw_result.header.inference_number != _loop:
|
||||
print(' - Error: incorrect inference_number {} at frame {}'.format(
|
||||
_generic_raw_result.header.inference_number, _loop))
|
||||
|
||||
print('.', end='', flush=True)
|
||||
|
||||
except kp.ApiKPException as exception:
|
||||
print(' - Error: inference failed, error = {}'.format(exception))
|
||||
exit(0)
|
||||
|
||||
_result_queue.put(_generic_raw_result)
|
||||
|
||||
model_path = ["C:\\Users\\mason\\Downloads\\kneron_plus_v3.1.2\\kneron_plus\\res\\models\\KL520\\yolov5-noupsample_w640h640_kn-model-zoo\\kl520_20005_yolov5-noupsample_w640h640.nef", r"C:\Users\mason\Downloads\kneron_plus_v3.1.2\kneron_plus\res\models\KL720\yolov5-noupsample_w640h640_kn-model-zoo\kl720_20005_yolov5-noupsample_w640h640.nef"]
|
||||
SCPU_FW_PATH_520 = "C:\\Users\\mason\\Downloads\\kneron_plus_v3.1.2\\kneron_plus\\res\\firmware\\KL520\\fw_scpu.bin"
|
||||
NCPU_FW_PATH_520 = "C:\\Users\\mason\\Downloads\\kneron_plus_v3.1.2\\kneron_plus\\res\\firmware\\KL520\\fw_ncpu.bin"
|
||||
SCPU_FW_PATH_720 = "C:\\Users\\mason\\Downloads\\kneron_plus_v3.1.2\\kneron_plus\\res\\firmware\\KL720\\fw_scpu.bin"
|
||||
NCPU_FW_PATH_720 = "C:\\Users\\mason\\Downloads\\kneron_plus_v3.1.2\\kneron_plus\\res\\firmware\\KL720\\fw_ncpu.bin"
|
||||
device_list = kp.core.scan_devices()
|
||||
|
||||
grouped_devices = defaultdict(list)
|
||||
|
||||
for device in device_list.device_descriptor_list:
|
||||
grouped_devices[device.product_id].append(device.usb_port_id)
|
||||
|
||||
print(f"Found device groups: {dict(grouped_devices)}")
|
||||
|
||||
device_groups = []
|
||||
|
||||
for product_id, usb_port_id in grouped_devices.items():
|
||||
try:
|
||||
group = kp.core.connect_devices(usb_port_id)
|
||||
device_groups.append(group)
|
||||
print(f"Successfully connected to group for product ID {product_id} with ports{usb_port_id}")
|
||||
except kp.ApiKPException as e:
|
||||
print(f"Failed to connect to group for product ID {product_id}: {e}")
|
||||
|
||||
print(device_groups)
|
||||
|
||||
print('[Set Device Timeout]')
|
||||
kp.core.set_timeout(device_group=device_groups[0], milliseconds=5000)
|
||||
kp.core.set_timeout(device_group=device_groups[1], milliseconds=5000)
|
||||
print(' - Success')
|
||||
|
||||
try:
|
||||
print('[Upload Firmware]')
|
||||
kp.core.load_firmware_from_file(device_group=device_groups[0],
|
||||
scpu_fw_path=SCPU_FW_PATH_520,
|
||||
ncpu_fw_path=NCPU_FW_PATH_520)
|
||||
kp.core.load_firmware_from_file(device_group=device_groups[1],
|
||||
scpu_fw_path=SCPU_FW_PATH_720,
|
||||
ncpu_fw_path=NCPU_FW_PATH_720)
|
||||
print(' - Success')
|
||||
except kp.ApiKPException as exception:
|
||||
print('Error: upload firmware failed, error = \'{}\''.format(str(exception)))
|
||||
exit(0)
|
||||
|
||||
print('[Upload Model]')
|
||||
model_nef_descriptors = []
|
||||
# for group in device_groups:
|
||||
model_nef_descriptor = kp.core.load_model_from_file(device_group=device_groups[0], file_path=model_path[0])
|
||||
model_nef_descriptors.append(model_nef_descriptor)
|
||||
model_nef_descriptor = kp.core.load_model_from_file(device_group=device_groups[1], file_path=model_path[1])
|
||||
model_nef_descriptors.append(model_nef_descriptor)
|
||||
print(' - Success')
|
||||
|
||||
"""
|
||||
prepare the image
|
||||
"""
|
||||
print('[Read Image]')
|
||||
img = cv2.imread(filename=IMAGE_FILE_PATH)
|
||||
img_bgr565 = cv2.cvtColor(src=img, code=cv2.COLOR_BGR2BGR565)
|
||||
print(' - Success')
|
||||
|
||||
"""
|
||||
prepare generic image inference input descriptor
|
||||
"""
|
||||
print(model_nef_descriptors)
|
||||
generic_inference_input_descriptor = kp.GenericImageInferenceDescriptor(
|
||||
model_id=model_nef_descriptors[1].models[0].id,
|
||||
)
|
||||
|
||||
"""
|
||||
starting inference work
|
||||
"""
|
||||
print('[Starting Inference Work]')
|
||||
print(' - Starting inference loop {} times'.format(LOOP_TIME))
|
||||
print(' - ', end='')
|
||||
result_queue = queue.Queue()
|
||||
|
||||
send_thread = threading.Thread(target=_image_send_function, args=(device_groups[1],
|
||||
LOOP_TIME,
|
||||
generic_inference_input_descriptor,
|
||||
img_bgr565,
|
||||
kp.ImageFormat.KP_IMAGE_FORMAT_RGB565))
|
||||
|
||||
receive_thread = threading.Thread(target=_result_receive_function, args=(device_groups[1],
|
||||
LOOP_TIME,
|
||||
result_queue))
|
||||
|
||||
start_inference_time = time.time()
|
||||
|
||||
send_thread.start()
|
||||
receive_thread.start()
|
||||
|
||||
try:
|
||||
while send_thread.is_alive():
|
||||
send_thread.join(1)
|
||||
|
||||
while receive_thread.is_alive():
|
||||
receive_thread.join(1)
|
||||
except (KeyboardInterrupt, SystemExit):
|
||||
print('\n - Received keyboard interrupt, quitting threads.')
|
||||
exit(0)
|
||||
|
||||
end_inference_time = time.time()
|
||||
time_spent = end_inference_time - start_inference_time
|
||||
|
||||
try:
|
||||
generic_raw_result = result_queue.get(timeout=3)
|
||||
except Exception as exception:
|
||||
print('Error: Result queue is empty !')
|
||||
exit(0)
|
||||
print()
|
||||
|
||||
print('[Result]')
|
||||
print(" - Total inference {} images".format(LOOP_TIME))
|
||||
print(" - Time spent: {:.2f} secs, FPS = {:.1f}".format(time_spent, LOOP_TIME / time_spent))
|
||||
|
||||
"""
|
||||
retrieve inference node output
|
||||
"""
|
||||
print('[Retrieve Inference Node Output ]')
|
||||
inf_node_output_list = []
|
||||
for node_idx in range(generic_raw_result.header.num_output_node):
|
||||
inference_float_node_output = kp.inference.generic_inference_retrieve_float_node(node_idx=node_idx,
|
||||
generic_raw_result=generic_raw_result,
|
||||
channels_ordering=kp.ChannelOrdering.KP_CHANNEL_ORDERING_CHW)
|
||||
inf_node_output_list.append(inference_float_node_output)
|
||||
|
||||
print(' - Success')
|
||||
|
||||
print('[Result]')
|
||||
print(inf_node_output_list)
|
||||
@ -1,6 +1,6 @@
|
||||
[project]
|
||||
name = "cluster4npu"
|
||||
version = "0.1.0"
|
||||
version = "0.0.3"
|
||||
description = "Add your description here"
|
||||
readme = "README.md"
|
||||
requires-python = ">=3.9, <3.12"
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user