- Move test scripts to tests/ directory for better organization - Add improved YOLOv5 postprocessing with reference implementation - Update gitignore to exclude *.mflow files and include main.spec - Add debug capabilities and coordinate scaling improvements - Enhance multi-series support with proper validation - Add AGENTS.md documentation and example utilities 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
2389 lines
104 KiB
Python
2389 lines
104 KiB
Python
from typing import Union, Tuple
|
|
import os
|
|
import sys
|
|
import argparse
|
|
import time
|
|
import threading
|
|
import queue
|
|
import numpy as np
|
|
import kp
|
|
import cv2
|
|
import time
|
|
from abc import ABC, abstractmethod
|
|
from typing import Callable, Optional, Any, Dict, List
|
|
from dataclasses import dataclass
|
|
from collections import defaultdict
|
|
from enum import Enum
|
|
from .yolo_v5_postprocess_reference import post_process_yolo_v5_reference
|
|
|
|
# Verbose debug controlled by env var
|
|
DEBUG_VERBOSE = os.getenv('C4NPU_DEBUG', '0') == '1'
|
|
|
|
|
|
@dataclass
|
|
class InferenceTask:
|
|
sequence_id: int
|
|
image_data: np.ndarray
|
|
image_format: Any # kp.ImageFormat
|
|
timestamp: float
|
|
|
|
|
|
@dataclass
|
|
class BoundingBox:
|
|
"""Bounding box descriptor for object detection results"""
|
|
x1: int = 0
|
|
y1: int = 0
|
|
x2: int = 0
|
|
y2: int = 0
|
|
score: float = 0.0
|
|
class_num: int = 0
|
|
class_name: str = ""
|
|
|
|
@dataclass
|
|
class ObjectDetectionResult:
|
|
"""Object detection result descriptor"""
|
|
class_count: int = 0
|
|
box_count: int = 0
|
|
box_list: List[BoundingBox] = None
|
|
# Optional letterbox mapping info for reversing to original frame
|
|
model_input_width: int = 0
|
|
model_input_height: int = 0
|
|
resized_img_width: int = 0
|
|
resized_img_height: int = 0
|
|
pad_left: int = 0
|
|
pad_top: int = 0
|
|
pad_right: int = 0
|
|
pad_bottom: int = 0
|
|
|
|
def __post_init__(self):
|
|
if self.box_list is None:
|
|
self.box_list = []
|
|
|
|
@dataclass
|
|
class ClassificationResult:
|
|
"""Classification result descriptor"""
|
|
probability: float = 0.0
|
|
class_name: str = ""
|
|
class_num: int = 0
|
|
confidence_threshold: float = 0.5
|
|
|
|
@property
|
|
def is_positive(self) -> bool:
|
|
return self.probability > self.confidence_threshold
|
|
|
|
def __str__(self) -> str:
|
|
"""String representation for ClassificationResult"""
|
|
return f"{self.class_name} (Prob: {self.probability:.3f})"
|
|
|
|
def __format__(self, format_spec: str) -> str:
|
|
"""Support for format string operations"""
|
|
if format_spec == '':
|
|
return str(self)
|
|
else:
|
|
return str(self).__format__(format_spec)
|
|
|
|
class PostProcessType(Enum):
|
|
"""Enumeration of available postprocessing types"""
|
|
FIRE_DETECTION = "fire_detection"
|
|
YOLO_V3 = "yolo_v3"
|
|
YOLO_V5 = "yolo_v5"
|
|
CLASSIFICATION = "classification"
|
|
RAW_OUTPUT = "raw_output"
|
|
|
|
@dataclass
|
|
class InferenceResult:
|
|
sequence_id: int
|
|
result: Any
|
|
series_name: str
|
|
timestamp: float
|
|
postprocess_type: PostProcessType = PostProcessType.RAW_OUTPUT
|
|
|
|
|
|
class DongleSeriesSpec:
|
|
"""Dongle series specifications with GOPS capacity for load balancing"""
|
|
KL520_GOPS = 2
|
|
KL720_GOPS = 28
|
|
|
|
SERIES_SPECS = {
|
|
"KL520": {"product_id": 0x100, "gops": KL520_GOPS},
|
|
"KL720": {"product_id": 0x720, "gops": KL720_GOPS},
|
|
"KL630": {"product_id": 0x630, "gops": 400},
|
|
"KL730": {"product_id": 0x730, "gops": 1600},
|
|
# "KL540": {"product_id": 0x540, "gops": 800}
|
|
}
|
|
|
|
|
|
class DataProcessor(ABC):
|
|
"""Abstract base class for data processors in the pipeline"""
|
|
|
|
@abstractmethod
|
|
def process(self, data: Any, *args, **kwargs) -> Any:
|
|
"""Process data and return result"""
|
|
pass
|
|
|
|
|
|
class PreProcessor(DataProcessor):
|
|
def __init__(self, resize_fn: Optional[Callable] = None,
|
|
format_convert_fn: Optional[Callable] = None):
|
|
self.resize_fn = resize_fn or self._default_resize
|
|
self.format_convert_fn = format_convert_fn or self._default_format_convert
|
|
|
|
def process(self, frame: np.ndarray, target_size: tuple, target_format: str) -> np.ndarray:
|
|
"""Main processing pipeline"""
|
|
resized = self.resize_fn(frame, target_size)
|
|
return self.format_convert_fn(resized, target_format)
|
|
|
|
def _default_resize(self, frame: np.ndarray, target_size: tuple) -> np.ndarray:
|
|
"""Default resize implementation"""
|
|
return cv2.resize(frame, target_size)
|
|
|
|
def _default_format_convert(self, frame: np.ndarray, target_format: str) -> np.ndarray:
|
|
"""Default format conversion"""
|
|
if target_format == 'BGR565':
|
|
return cv2.cvtColor(frame, cv2.COLOR_BGR2BGR565)
|
|
elif target_format == 'RGB8888':
|
|
return cv2.cvtColor(frame, cv2.COLOR_BGR2RGBA)
|
|
return frame
|
|
|
|
|
|
class PostProcessorOptions:
|
|
"""Configuration for postprocessing options"""
|
|
|
|
def __init__(self,
|
|
postprocess_type: PostProcessType = PostProcessType.FIRE_DETECTION,
|
|
threshold: float = 0.5,
|
|
class_names: List[str] = None,
|
|
nms_threshold: float = 0.45,
|
|
max_detections_per_class: int = 100):
|
|
self.postprocess_type = postprocess_type
|
|
self.threshold = threshold
|
|
self.class_names = class_names or []
|
|
self.nms_threshold = nms_threshold
|
|
self.max_detections_per_class = max_detections_per_class
|
|
|
|
class PostProcessor(DataProcessor):
|
|
"""Post-processor for handling output data from inference stages"""
|
|
|
|
def __init__(self, options: PostProcessorOptions = None):
|
|
self.options = options or PostProcessorOptions()
|
|
|
|
def process(self, data: Any, *args, **kwargs) -> Any:
|
|
"""Process inference output data based on configured type"""
|
|
if self.options.postprocess_type == PostProcessType.FIRE_DETECTION:
|
|
return self._process_fire_detection(data, *args, **kwargs)
|
|
elif self.options.postprocess_type == PostProcessType.CLASSIFICATION:
|
|
return self._process_classification(data, *args, **kwargs)
|
|
elif self.options.postprocess_type == PostProcessType.YOLO_V3:
|
|
return self._process_yolo_v3(data, *args, **kwargs)
|
|
elif self.options.postprocess_type == PostProcessType.YOLO_V5:
|
|
return self._process_yolo_v5(data, *args, **kwargs)
|
|
else:
|
|
return self._process_raw_output(data, *args, **kwargs)
|
|
|
|
def _process_fire_detection(self, raw_output: Any, *args, **kwargs) -> ClassificationResult:
|
|
"""Process fire detection output"""
|
|
if hasattr(raw_output, 'size') and raw_output.size > 0:
|
|
probability = float(raw_output.flatten()[0]) if raw_output.size > 0 else 0.0
|
|
elif isinstance(raw_output, (list, tuple)) and len(raw_output) > 0:
|
|
probability = float(raw_output[0])
|
|
else:
|
|
probability = 0.0
|
|
|
|
class_name = "Fire" if probability > self.options.threshold else "No Fire"
|
|
return ClassificationResult(
|
|
probability=probability,
|
|
class_name=class_name,
|
|
class_num=1 if probability > self.options.threshold else 0,
|
|
confidence_threshold=self.options.threshold
|
|
)
|
|
|
|
def _process_classification(self, raw_output: Any, *args, **kwargs) -> ClassificationResult:
|
|
"""Process general classification output"""
|
|
if hasattr(raw_output, 'flatten'):
|
|
output_array = raw_output.flatten()
|
|
elif isinstance(raw_output, (list, tuple)):
|
|
output_array = np.array(raw_output)
|
|
else:
|
|
return ClassificationResult()
|
|
|
|
if len(output_array) == 0:
|
|
return ClassificationResult()
|
|
|
|
if len(output_array) == 1:
|
|
# Binary classification
|
|
probability = float(output_array[0])
|
|
class_num = 1 if probability > self.options.threshold else 0
|
|
else:
|
|
# Multi-class classification
|
|
class_num = int(np.argmax(output_array))
|
|
probability = float(output_array[class_num])
|
|
|
|
class_name = self.options.class_names[class_num] if class_num < len(self.options.class_names) else f"Class_{class_num}"
|
|
|
|
return ClassificationResult(
|
|
probability=probability,
|
|
class_name=class_name,
|
|
class_num=class_num,
|
|
confidence_threshold=self.options.threshold
|
|
)
|
|
|
|
def _process_yolo_v3(self, inference_output_list: List, hardware_preproc_info=None, *args, **kwargs) -> ObjectDetectionResult:
|
|
"""Process YOLO v3 output for object detection"""
|
|
# Simplified YOLO v3 postprocessing (built-in version)
|
|
# This is a basic implementation - for full functionality, refer to Kneron examples
|
|
return self._process_yolo_generic(inference_output_list, hardware_preproc_info, version="v3")
|
|
|
|
def _process_yolo_v5(self, inference_output_list: List, hardware_preproc_info=None, *args, **kwargs) -> ObjectDetectionResult:
|
|
"""Process YOLO v5 output using reference implementation copied into codebase."""
|
|
try:
|
|
if not inference_output_list or len(inference_output_list) == 0:
|
|
return ObjectDetectionResult(class_count=len(self.options.class_names), box_count=0, box_list=[])
|
|
|
|
# Run reference postprocess (returns list of tuples)
|
|
dets = post_process_yolo_v5_reference(
|
|
inference_output_list, hardware_preproc_info, thresh_value=self.options.threshold
|
|
)
|
|
|
|
boxes: List[BoundingBox] = []
|
|
for x1, y1, x2, y2, score, class_num in dets:
|
|
class_name = (
|
|
self.options.class_names[class_num]
|
|
if self.options.class_names and class_num < len(self.options.class_names)
|
|
else f"class_{class_num}"
|
|
)
|
|
boxes.append(BoundingBox(x1=x1, y1=y1, x2=x2, y2=y2, score=score, class_num=class_num, class_name=class_name))
|
|
|
|
# Attach letterbox/mapping metadata into ObjectDetectionResult
|
|
mapping = {
|
|
'model_input_width': 0,
|
|
'model_input_height': 0,
|
|
'resized_img_width': 0,
|
|
'resized_img_height': 0,
|
|
'pad_left': 0,
|
|
'pad_top': 0,
|
|
'pad_right': 0,
|
|
'pad_bottom': 0,
|
|
}
|
|
try:
|
|
if hardware_preproc_info is not None:
|
|
for k in mapping.keys():
|
|
if hasattr(hardware_preproc_info, k):
|
|
mapping[k] = int(getattr(hardware_preproc_info, k))
|
|
except Exception:
|
|
pass
|
|
|
|
return ObjectDetectionResult(
|
|
class_count=len(self.options.class_names) if self.options.class_names else 1,
|
|
box_count=len(boxes),
|
|
box_list=boxes,
|
|
model_input_width=mapping['model_input_width'],
|
|
model_input_height=mapping['model_input_height'],
|
|
resized_img_width=mapping['resized_img_width'],
|
|
resized_img_height=mapping['resized_img_height'],
|
|
pad_left=mapping['pad_left'],
|
|
pad_top=mapping['pad_top'],
|
|
pad_right=mapping['pad_right'],
|
|
pad_bottom=mapping['pad_bottom'],
|
|
)
|
|
except Exception as e:
|
|
print(f"Error in YOLOv5 reference postprocessing: {e}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
return ObjectDetectionResult(class_count=len(self.options.class_names) if self.options.class_names else 1,
|
|
box_count=0, box_list=[])
|
|
|
|
def _process_yolo_generic(self, inference_output_list: List, hardware_preproc_info=None, version="v3") -> ObjectDetectionResult:
|
|
"""Improved YOLO postprocessing with proper format handling"""
|
|
boxes = []
|
|
|
|
try:
|
|
if not inference_output_list or len(inference_output_list) == 0:
|
|
return ObjectDetectionResult(class_count=len(self.options.class_names), box_count=0, box_list=[])
|
|
|
|
if DEBUG_VERBOSE:
|
|
print(f"DEBUG: Processing {len(inference_output_list)} YOLO output nodes")
|
|
print("=" * 60)
|
|
print("RAW INFERENCE OUTPUT DATA:")
|
|
for i, output in enumerate(inference_output_list):
|
|
print(f"\nOutput node {i}:")
|
|
print(f" Type: {type(output)}")
|
|
if hasattr(output, 'ndarray'):
|
|
arr = output.ndarray
|
|
print(f" Has ndarray attribute, shape: {arr.shape}")
|
|
print(f" Data type: {arr.dtype}")
|
|
print(f" Min value: {np.min(arr):.6f}")
|
|
print(f" Max value: {np.max(arr):.6f}")
|
|
print(f" Mean value: {np.mean(arr):.6f}")
|
|
print(f" Raw values (first 20): {arr.flatten()[:20]}")
|
|
elif hasattr(output, 'flatten'):
|
|
arr = output
|
|
print(f" Direct array, shape: {arr.shape}")
|
|
print(f" Data type: {arr.dtype}")
|
|
print(f" Min value: {np.min(arr):.6f}")
|
|
print(f" Max value: {np.max(arr):.6f}")
|
|
print(f" Mean value: {np.mean(arr):.6f}")
|
|
print(f" Raw values (first 20): {arr.flatten()[:20]}")
|
|
elif isinstance(output, np.ndarray):
|
|
print(f" NumPy array, shape: {output.shape}")
|
|
print(f" Data type: {output.dtype}")
|
|
print(f" Min value: {np.min(output):.6f}")
|
|
print(f" Max value: {np.max(output):.6f}")
|
|
print(f" Mean value: {np.mean(output):.6f}")
|
|
print(f" Raw values (first 20): {output.flatten()[:20]}")
|
|
else:
|
|
print(f" Unknown type: {type(output)}")
|
|
try:
|
|
print(f" String representation: {str(output)[:200]}")
|
|
except:
|
|
print(" Cannot convert to string")
|
|
print("=" * 60)
|
|
print("HARDWARE PREPROCESSING INFO:")
|
|
if hardware_preproc_info:
|
|
print(f" Type: {type(hardware_preproc_info)}")
|
|
if hasattr(hardware_preproc_info, 'img_width'):
|
|
print(f" Image width: {hardware_preproc_info.img_width}")
|
|
if hasattr(hardware_preproc_info, 'img_height'):
|
|
print(f" Image height: {hardware_preproc_info.img_height}")
|
|
else:
|
|
print(" No hardware preprocessing info available")
|
|
print("=" * 60)
|
|
|
|
# CORRECTED: Process using proper YOLOv5 logic from reference implementation
|
|
if DEBUG_VERBOSE:
|
|
print("USING CORRECTED YOLOV5 POSTPROCESSING")
|
|
boxes = self._process_yolo_v5_corrected(inference_output_list, hardware_preproc_info)
|
|
|
|
# OLD CODE REMOVED - now using _process_yolo_v5_corrected() above
|
|
|
|
# Note: boxes now contains results from _process_yolo_v5_corrected()
|
|
# Skip all old processing logic since we're using the corrected implementation
|
|
if DEBUG_VERBOSE:
|
|
print(f"INFO: Using corrected YOLOv5 processing, found {len(boxes)} detections")
|
|
|
|
# All processing is now handled by _process_yolo_v5_corrected()
|
|
if DEBUG_VERBOSE:
|
|
if boxes:
|
|
print("Final detections:")
|
|
for i, box in enumerate(boxes[:5]):
|
|
print(f" {box.class_name}: ({box.x1},{box.y1})-({box.x2},{box.y2}) conf={box.score:.3f}")
|
|
if len(boxes) > 5:
|
|
print(f" ... and {len(boxes) - 5} more")
|
|
print(f"DEBUG: Final detection count: {len(boxes)}")
|
|
|
|
except Exception as e:
|
|
print(f"Error in YOLO postprocessing: {e}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
|
|
# Capture letterbox mapping info from hardware_preproc_info when available
|
|
mapping = {
|
|
'model_input_width': 0,
|
|
'model_input_height': 0,
|
|
'resized_img_width': 0,
|
|
'resized_img_height': 0,
|
|
'pad_left': 0,
|
|
'pad_top': 0,
|
|
'pad_right': 0,
|
|
'pad_bottom': 0,
|
|
}
|
|
try:
|
|
if hardware_preproc_info is not None:
|
|
# Prefer explicit model_input_* if present; else fallback to img_* (model space)
|
|
if hasattr(hardware_preproc_info, 'model_input_width'):
|
|
mapping['model_input_width'] = int(getattr(hardware_preproc_info, 'model_input_width'))
|
|
elif hasattr(hardware_preproc_info, 'img_width'):
|
|
mapping['model_input_width'] = int(getattr(hardware_preproc_info, 'img_width'))
|
|
if hasattr(hardware_preproc_info, 'model_input_height'):
|
|
mapping['model_input_height'] = int(getattr(hardware_preproc_info, 'model_input_height'))
|
|
elif hasattr(hardware_preproc_info, 'img_height'):
|
|
mapping['model_input_height'] = int(getattr(hardware_preproc_info, 'img_height'))
|
|
# Resized (pre-pad) image size inside model input window
|
|
if hasattr(hardware_preproc_info, 'resized_img_width'):
|
|
mapping['resized_img_width'] = int(getattr(hardware_preproc_info, 'resized_img_width'))
|
|
if hasattr(hardware_preproc_info, 'resized_img_height'):
|
|
mapping['resized_img_height'] = int(getattr(hardware_preproc_info, 'resized_img_height'))
|
|
# Padding applied
|
|
for k in ['pad_left', 'pad_top', 'pad_right', 'pad_bottom']:
|
|
if hasattr(hardware_preproc_info, k):
|
|
mapping[k] = int(getattr(hardware_preproc_info, k))
|
|
except Exception:
|
|
pass
|
|
|
|
return ObjectDetectionResult(
|
|
class_count=len(self.options.class_names) if self.options.class_names else 1,
|
|
box_count=len(boxes),
|
|
box_list=boxes,
|
|
model_input_width=mapping['model_input_width'],
|
|
model_input_height=mapping['model_input_height'],
|
|
resized_img_width=mapping['resized_img_width'],
|
|
resized_img_height=mapping['resized_img_height'],
|
|
pad_left=mapping['pad_left'],
|
|
pad_top=mapping['pad_top'],
|
|
pad_right=mapping['pad_right'],
|
|
pad_bottom=mapping['pad_bottom'],
|
|
)
|
|
|
|
def _process_yolo_v5_corrected(self, inference_output_list: List, hardware_preproc_info=None) -> List[BoundingBox]:
|
|
"""
|
|
Corrected YOLOv5 postprocessing for multi-scale outputs (80x80, 40x40, 20x20)
|
|
基於參考實現的正確 YOLOv5 後處理,支援多尺度輸出
|
|
"""
|
|
try:
|
|
if not inference_output_list or len(inference_output_list) == 0:
|
|
return []
|
|
|
|
# YOLOv5 uses 3 output scales with different anchors
|
|
scales_info = [
|
|
{"size": 80, "stride": 8, "anchors": [[10, 13], [16, 30], [33, 23]]},
|
|
{"size": 40, "stride": 16, "anchors": [[30, 61], [62, 45], [59, 119]]},
|
|
{"size": 20, "stride": 32, "anchors": [[116, 90], [156, 198], [373, 326]]}
|
|
]
|
|
|
|
all_detections = []
|
|
|
|
# Process each output scale
|
|
for scale_idx, (output, scale_info) in enumerate(zip(inference_output_list, scales_info)):
|
|
# Extract numpy array
|
|
if hasattr(output, 'ndarray'):
|
|
raw_data = output.ndarray
|
|
elif isinstance(output, np.ndarray):
|
|
raw_data = output
|
|
else:
|
|
print(f"WARNING: Unsupported output type for scale {scale_idx}: {type(output)}")
|
|
continue
|
|
|
|
if DEBUG_VERBOSE:
|
|
print(f"DEBUG: Scale {scale_idx} raw shape: {raw_data.shape}")
|
|
|
|
# Expected format: [1, 255, grid_h, grid_w] -> [1, 3, 85, grid_h, grid_w] -> [1, 3*grid_h*grid_w, 85]
|
|
batch_size, channels, grid_h, grid_w = raw_data.shape
|
|
num_anchors = 3
|
|
num_classes = 80
|
|
|
|
# Reshape to [batch, num_anchors, num_classes+5, grid_h, grid_w]
|
|
reshaped = raw_data.reshape(batch_size, num_anchors, num_classes + 5, grid_h, grid_w)
|
|
|
|
# Transpose to [batch, num_anchors, grid_h, grid_w, num_classes+5]
|
|
reshaped = reshaped.transpose(0, 1, 3, 4, 2)
|
|
|
|
# Apply sigmoid to x, y, confidence and class probabilities
|
|
def sigmoid(x):
|
|
return 1.0 / (1.0 + np.exp(-np.clip(x, -500, 500)))
|
|
|
|
# Process each anchor
|
|
for anchor_idx in range(num_anchors):
|
|
anchor_data = reshaped[0, anchor_idx] # [grid_h, grid_w, 85]
|
|
|
|
# Apply sigmoid to specific channels
|
|
anchor_data[..., 0:2] = sigmoid(anchor_data[..., 0:2]) # x, y
|
|
anchor_data[..., 4:] = sigmoid(anchor_data[..., 4:]) # conf, classes
|
|
|
|
# Create coordinate grids
|
|
grid_x, grid_y = np.meshgrid(np.arange(grid_w), np.arange(grid_h))
|
|
|
|
# Process coordinates
|
|
x_offset = (anchor_data[..., 0] * 2 - 0.5 + grid_x) / grid_w
|
|
y_offset = (anchor_data[..., 1] * 2 - 0.5 + grid_y) / grid_h
|
|
|
|
# Process width and height
|
|
w = (anchor_data[..., 2] * 2) ** 2 * scale_info["anchors"][anchor_idx][0] / hardware_preproc_info.img_width
|
|
h = (anchor_data[..., 3] * 2) ** 2 * scale_info["anchors"][anchor_idx][1] / hardware_preproc_info.img_height
|
|
|
|
# Objectness confidence
|
|
obj_conf = anchor_data[..., 4]
|
|
|
|
# Class probabilities
|
|
class_probs = anchor_data[..., 5:]
|
|
|
|
# Filter by objectness threshold
|
|
obj_mask = obj_conf > self.options.threshold
|
|
|
|
if not np.any(obj_mask):
|
|
continue
|
|
|
|
# Get valid detections
|
|
valid_x = x_offset[obj_mask]
|
|
valid_y = y_offset[obj_mask]
|
|
valid_w = w[obj_mask]
|
|
valid_h = h[obj_mask]
|
|
valid_obj_conf = obj_conf[obj_mask]
|
|
valid_class_probs = class_probs[obj_mask]
|
|
|
|
# Find best class for each detection
|
|
class_scores = valid_class_probs * valid_obj_conf.reshape(-1, 1)
|
|
best_classes = np.argmax(class_scores, axis=1)
|
|
best_scores = np.max(class_scores, axis=1)
|
|
|
|
# Filter by class confidence threshold
|
|
class_mask = best_scores > self.options.threshold
|
|
|
|
if not np.any(class_mask):
|
|
continue
|
|
|
|
# Final detections for this anchor
|
|
final_detections = []
|
|
for i in np.where(class_mask)[0]:
|
|
# Convert to corner coordinates
|
|
x_center, y_center = valid_x[i], valid_y[i]
|
|
width, height = valid_w[i], valid_h[i]
|
|
|
|
x1 = x_center - width / 2
|
|
y1 = y_center - height / 2
|
|
x2 = x_center + width / 2
|
|
y2 = y_center + height / 2
|
|
|
|
# Scale to image coordinates
|
|
x1 = max(0, min(x1 * hardware_preproc_info.img_width, hardware_preproc_info.img_width - 1))
|
|
y1 = max(0, min(y1 * hardware_preproc_info.img_height, hardware_preproc_info.img_height - 1))
|
|
x2 = max(x1 + 1, min(x2 * hardware_preproc_info.img_width, hardware_preproc_info.img_width))
|
|
y2 = max(y1 + 1, min(y2 * hardware_preproc_info.img_height, hardware_preproc_info.img_height))
|
|
|
|
class_num = best_classes[i]
|
|
class_score = best_scores[i]
|
|
|
|
# Get class name
|
|
if self.options.class_names and class_num < len(self.options.class_names):
|
|
class_name = self.options.class_names[class_num]
|
|
else:
|
|
class_name = f"class_{class_num}"
|
|
|
|
detection = {
|
|
'x1': x1, 'y1': y1, 'x2': x2, 'y2': y2,
|
|
'score': class_score,
|
|
'class_num': class_num,
|
|
'class_name': class_name
|
|
}
|
|
final_detections.append(detection)
|
|
|
|
all_detections.extend(final_detections)
|
|
if DEBUG_VERBOSE:
|
|
print(f"DEBUG: Scale {scale_idx}, anchor {anchor_idx}: {len(final_detections)} detections")
|
|
|
|
# Apply global NMS across all scales
|
|
if not all_detections:
|
|
return []
|
|
|
|
# Group by class and apply NMS
|
|
from collections import defaultdict
|
|
class_detections = defaultdict(list)
|
|
for det in all_detections:
|
|
class_detections[det['class_num']].append(det)
|
|
|
|
final_boxes = []
|
|
for class_num, detections in class_detections.items():
|
|
# Sort by confidence
|
|
detections.sort(key=lambda x: x['score'], reverse=True)
|
|
|
|
# Apply NMS
|
|
keep = []
|
|
while detections and len(keep) < 10: # Max 10 per class
|
|
current = detections.pop(0)
|
|
keep.append(current)
|
|
|
|
# Remove overlapping detections
|
|
remaining = []
|
|
for det in detections:
|
|
iou = self._calculate_iou_dict(current, det)
|
|
if iou <= 0.5: # NMS threshold
|
|
remaining.append(det)
|
|
detections = remaining
|
|
|
|
# Convert to BoundingBox objects
|
|
for det in keep:
|
|
box = BoundingBox(
|
|
x1=int(det['x1']),
|
|
y1=int(det['y1']),
|
|
x2=int(det['x2']),
|
|
y2=int(det['y2']),
|
|
score=float(det['score']),
|
|
class_num=det['class_num'],
|
|
class_name=det['class_name']
|
|
)
|
|
final_boxes.append(box)
|
|
print(f"DEBUG: {det['class_name']}: ({int(det['x1'])},{int(det['y1'])})-({int(det['x2'])},{int(det['y2'])}) conf={det['score']:.3f}")
|
|
|
|
print(f"DEBUG: Total final detections after NMS: {len(final_boxes)}")
|
|
return final_boxes
|
|
|
|
except Exception as e:
|
|
print(f"ERROR in corrected YOLOv5 processing: {e}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
return []
|
|
|
|
def _calculate_iou_dict(self, det1: dict, det2: dict) -> float:
|
|
"""Calculate IoU between two detection dictionaries"""
|
|
try:
|
|
# Calculate intersection
|
|
x1_i = max(det1['x1'], det2['x1'])
|
|
y1_i = max(det1['y1'], det2['y1'])
|
|
x2_i = min(det1['x2'], det2['x2'])
|
|
y2_i = min(det1['y2'], det2['y2'])
|
|
|
|
if x2_i <= x1_i or y2_i <= y1_i:
|
|
return 0.0
|
|
|
|
intersection = (x2_i - x1_i) * (y2_i - y1_i)
|
|
|
|
# Calculate union
|
|
area1 = (det1['x2'] - det1['x1']) * (det1['y2'] - det1['y1'])
|
|
area2 = (det2['x2'] - det2['x1']) * (det2['y2'] - det2['y1'])
|
|
union = area1 + area2 - intersection
|
|
|
|
if union <= 0:
|
|
return 0.0
|
|
|
|
return intersection / union
|
|
|
|
except Exception:
|
|
return 0.0
|
|
|
|
def _apply_nms_numpy(self, detections: np.ndarray, class_idx: int, nms_threshold: float = 0.5, max_detections: int = 10) -> List[int]:
|
|
"""Apply Non-Maximum Suppression using numpy operations"""
|
|
if detections.shape[0] == 0:
|
|
return []
|
|
|
|
if detections.shape[0] == 1:
|
|
return [0]
|
|
|
|
keep_indices = []
|
|
|
|
for i in range(min(detections.shape[0], max_detections)):
|
|
if detections[i, class_idx] == 0: # Already suppressed
|
|
continue
|
|
|
|
keep_indices.append(i)
|
|
|
|
# Calculate IoU with remaining boxes
|
|
for j in range(i + 1, detections.shape[0]):
|
|
if detections[j, class_idx] == 0: # Already suppressed
|
|
continue
|
|
|
|
# IoU calculation
|
|
iou = self._calculate_iou_numpy(detections[i], detections[j])
|
|
|
|
if iou > nms_threshold:
|
|
detections[j, class_idx] = 0 # Suppress this detection
|
|
|
|
return keep_indices[:max_detections]
|
|
|
|
def _calculate_iou_numpy(self, det1: np.ndarray, det2: np.ndarray) -> float:
|
|
"""Calculate IoU between two detections [x_center, y_center, w, h, ...]"""
|
|
try:
|
|
# Convert to x1, y1, x2, y2
|
|
x1_1, y1_1 = det1[0] - det1[2]/2, det1[1] - det1[3]/2
|
|
x2_1, y2_1 = det1[0] + det1[2]/2, det1[1] + det1[3]/2
|
|
|
|
x1_2, y1_2 = det2[0] - det2[2]/2, det2[1] - det2[3]/2
|
|
x2_2, y2_2 = det2[0] + det2[2]/2, det2[1] + det2[3]/2
|
|
|
|
# Calculate intersection
|
|
x1_i = max(x1_1, x1_2)
|
|
y1_i = max(y1_1, y1_2)
|
|
x2_i = min(x2_1, x2_2)
|
|
y2_i = min(y2_1, y2_2)
|
|
|
|
if x2_i <= x1_i or y2_i <= y1_i:
|
|
return 0.0
|
|
|
|
intersection = (x2_i - x1_i) * (y2_i - y1_i)
|
|
|
|
# Calculate union
|
|
area1 = (x2_1 - x1_1) * (y2_1 - y1_1)
|
|
area2 = (x2_2 - x1_2) * (y2_2 - y1_2)
|
|
union = area1 + area2 - intersection
|
|
|
|
if union <= 0:
|
|
return 0.0
|
|
|
|
return intersection / union
|
|
|
|
except Exception:
|
|
return 0.0
|
|
|
|
def _apply_nms(self, boxes: List[BoundingBox]) -> List[BoundingBox]:
|
|
"""Apply Non-Maximum Suppression to remove duplicate detections"""
|
|
if not boxes or len(boxes) <= 1:
|
|
return boxes
|
|
|
|
try:
|
|
# Group boxes by class
|
|
class_boxes = defaultdict(list)
|
|
for box in boxes:
|
|
class_boxes[box.class_num].append(box)
|
|
|
|
final_boxes = []
|
|
|
|
for class_id, class_box_list in class_boxes.items():
|
|
if len(class_box_list) <= 1:
|
|
final_boxes.extend(class_box_list)
|
|
continue
|
|
|
|
# Sort by confidence (descending)
|
|
class_box_list.sort(key=lambda x: x.score, reverse=True)
|
|
|
|
# Apply NMS
|
|
keep = []
|
|
while class_box_list:
|
|
# Take the box with highest confidence
|
|
current_box = class_box_list.pop(0)
|
|
keep.append(current_box)
|
|
|
|
# Remove boxes with high IoU
|
|
remaining = []
|
|
for box in class_box_list:
|
|
iou = self._calculate_iou(current_box, box)
|
|
if iou <= self.options.nms_threshold:
|
|
remaining.append(box)
|
|
class_box_list = remaining
|
|
|
|
final_boxes.extend(keep[:self.options.max_detections_per_class])
|
|
|
|
print(f"DEBUG: NMS reduced {len(boxes)} to {len(final_boxes)} boxes")
|
|
return final_boxes
|
|
|
|
except Exception as e:
|
|
print(f"Warning: NMS failed: {e}")
|
|
return boxes
|
|
|
|
def _calculate_iou(self, box1: BoundingBox, box2: BoundingBox) -> float:
|
|
"""Calculate Intersection over Union (IoU) between two bounding boxes"""
|
|
try:
|
|
# Calculate intersection area
|
|
x1 = max(box1.x1, box2.x1)
|
|
y1 = max(box1.y1, box2.y1)
|
|
x2 = min(box1.x2, box2.x2)
|
|
y2 = min(box1.y2, box2.y2)
|
|
|
|
if x1 >= x2 or y1 >= y2:
|
|
return 0.0
|
|
|
|
intersection = (x2 - x1) * (y2 - y1)
|
|
|
|
# Calculate union area
|
|
area1 = (box1.x2 - box1.x1) * (box1.y2 - box1.y1)
|
|
area2 = (box2.x2 - box2.x1) * (box2.y2 - box2.y1)
|
|
union = area1 + area2 - intersection
|
|
|
|
return intersection / union if union > 0 else 0.0
|
|
|
|
except Exception:
|
|
return 0.0
|
|
|
|
def _process_raw_output(self, data: Any, *args, **kwargs) -> Any:
|
|
"""Default post-processing - returns data unchanged"""
|
|
return data
|
|
|
|
|
|
class MultiDongle:
|
|
# Curently, only BGR565, RGB8888, YUYV, and RAW8 formats are supported
|
|
_FORMAT_MAPPING = {
|
|
'BGR565': kp.ImageFormat.KP_IMAGE_FORMAT_RGB565,
|
|
'RGB8888': kp.ImageFormat.KP_IMAGE_FORMAT_RGBA8888,
|
|
'YUYV': kp.ImageFormat.KP_IMAGE_FORMAT_YUYV,
|
|
'RAW8': kp.ImageFormat.KP_IMAGE_FORMAT_RAW8,
|
|
# 'YCBCR422_CRY1CBY0': kp.ImageFormat.KP_IMAGE_FORMAT_YCBCR422_CRY1CBY0,
|
|
# 'YCBCR422_CBY1CRY0': kp.ImageFormat.KP_IMAGE_FORMAT_CBY1CRY0,
|
|
# 'YCBCR422_Y1CRY0CB': kp.ImageFormat.KP_IMAGE_FORMAT_Y1CRY0CB,
|
|
# 'YCBCR422_Y1CBY0CR': kp.ImageFormat.KP_IMAGE_FORMAT_Y1CBY0CR,
|
|
# 'YCBCR422_CRY0CBY1': kp.ImageFormat.KP_IMAGE_FORMAT_CRY0CBY1,
|
|
# 'YCBCR422_CBY0CRY1': kp.ImageFormat.KP_IMAGE_FORMAT_CBY0CRY1,
|
|
# 'YCBCR422_Y0CRY1CB': kp.ImageFormat.KP_IMAGE_FORMAT_Y0CRY1CB,
|
|
# 'YCBCR422_Y0CBY1CR': kp.ImageFormat.KP_IMAGE_FORMAT_Y0CBY1CR,
|
|
}
|
|
|
|
DongleModelMap = {
|
|
"0x100": "KL520",
|
|
"0x720": "KL720",
|
|
"0x630": "KL630",
|
|
"0x730": "KL730",
|
|
# "0x540": "KL540",
|
|
}
|
|
|
|
@staticmethod
|
|
def scan_devices():
|
|
"""
|
|
Scan for available Kneron devices and return their information.
|
|
|
|
Returns:
|
|
List[Dict]: List of device information containing port_id, series, and device_descriptor
|
|
"""
|
|
try:
|
|
print('[Scanning Devices]')
|
|
device_descriptors = kp.core.scan_devices()
|
|
|
|
print(device_descriptors)
|
|
|
|
if not device_descriptors or device_descriptors.device_descriptor_number == 0:
|
|
print(' - No devices found')
|
|
return []
|
|
|
|
devices_info = []
|
|
|
|
# Access the actual device list from the DeviceDescriptorList object
|
|
devices = device_descriptors.device_descriptor_list
|
|
|
|
print(f' - Found {len(devices)} device(s):')
|
|
|
|
for i, device_desc in enumerate(devices):
|
|
try:
|
|
product_id_hex = hex(device_desc.product_id).strip().lower()
|
|
dongle_model = MultiDongle.DongleModelMap.get(product_id_hex, "Unknown")
|
|
|
|
device_info = {
|
|
'port_id': device_desc.usb_port_id,
|
|
'product_id': product_id_hex,
|
|
'kn_number': device_desc.kn_number,
|
|
'dongle': dongle_model,
|
|
'series': dongle_model, # Assuming series is the same as dongle model
|
|
'device_descriptor': device_desc
|
|
}
|
|
devices_info.append(device_info)
|
|
|
|
print(f' [{i+1}] Port ID: {device_info["port_id"]}, Series: {device_info["series"]}, Product ID: {device_info["product_id"]}, KN Number: {device_info["kn_number"]}')
|
|
|
|
except Exception as e:
|
|
print(f"Error processing device: {e}")
|
|
|
|
return devices_info
|
|
|
|
except kp.ApiKPException as exception:
|
|
print(f'Error: scan devices fail, error msg: [{str(exception)}]')
|
|
return []
|
|
|
|
@staticmethod
|
|
def _get_device_series(device_descriptor):
|
|
"""
|
|
Extract device series from device descriptor using product_id.
|
|
|
|
Args:
|
|
device_descriptor: Device descriptor from scan_devices() - can be dict or object
|
|
|
|
Returns:
|
|
str: Device series (e.g., 'KL520', 'KL720', etc.)
|
|
"""
|
|
try:
|
|
# Handle dict format (from JSON)
|
|
if isinstance(device_descriptor, dict):
|
|
product_id = device_descriptor.get('product_id', '')
|
|
if product_id in MultiDongle.DongleModelMap:
|
|
return MultiDongle.DongleModelMap[product_id]
|
|
return f'Unknown ({product_id})'
|
|
|
|
# Handle object format (from SDK)
|
|
if hasattr(device_descriptor, 'product_id'):
|
|
product_id = device_descriptor.product_id
|
|
if isinstance(product_id, int):
|
|
product_id = hex(product_id)
|
|
if product_id in MultiDongle.DongleModelMap:
|
|
return MultiDongle.DongleModelMap[product_id]
|
|
return f'Unknown ({product_id})'
|
|
|
|
# Legacy chip-based detection (fallback)
|
|
if hasattr(device_descriptor, 'chip'):
|
|
chip = device_descriptor.chip
|
|
if chip == kp.ModelNefDescriptor.KP_CHIP_KL520:
|
|
return 'KL520'
|
|
elif chip == kp.ModelNefDescriptor.KP_CHIP_KL720:
|
|
return 'KL720'
|
|
elif chip == kp.ModelNefDescriptor.KP_CHIP_KL630:
|
|
return 'KL630'
|
|
elif chip == kp.ModelNefDescriptor.KP_CHIP_KL730:
|
|
return 'KL730'
|
|
# elif chip == kp.ModelNefDescriptor.KP_CHIP_KL540:
|
|
# return 'KL540'
|
|
|
|
# Final fallback
|
|
return 'Unknown'
|
|
|
|
except Exception as e:
|
|
print(f'Warning: Unable to determine device series: {str(e)}')
|
|
return 'Unknown'
|
|
|
|
@staticmethod
|
|
def connect_auto_detected_devices(device_count: int = None):
|
|
"""
|
|
Auto-detect and connect to available Kneron devices.
|
|
|
|
Args:
|
|
device_count: Number of devices to connect. If None, connect to all available devices.
|
|
|
|
Returns:
|
|
Tuple[kp.DeviceGroup, List[Dict]]: Device group and list of connected device info
|
|
"""
|
|
devices_info = MultiDongle.scan_devices()
|
|
|
|
if not devices_info:
|
|
raise Exception("No Kneron devices found")
|
|
|
|
# Determine how many devices to connect
|
|
if device_count is None:
|
|
device_count = len(devices_info)
|
|
else:
|
|
device_count = min(device_count, len(devices_info))
|
|
|
|
# Get port IDs for connection
|
|
port_ids = [devices_info[i]['port_id'] for i in range(device_count)]
|
|
|
|
try:
|
|
print(f'[Connecting to {device_count} device(s)]')
|
|
device_group = kp.core.connect_devices(usb_port_ids=port_ids)
|
|
print(' - Success')
|
|
|
|
connected_devices = devices_info[:device_count]
|
|
return device_group, connected_devices
|
|
|
|
except kp.ApiKPException as exception:
|
|
raise Exception(f'Failed to connect devices: {str(exception)}')
|
|
|
|
def __init__(self, port_id: list = None, scpu_fw_path: str = None, ncpu_fw_path: str = None,
|
|
model_path: str = None, upload_fw: bool = False, auto_detect: bool = False,
|
|
max_queue_size: int = 0, multi_series_config: dict = None,
|
|
postprocess_options: PostProcessorOptions = None):
|
|
"""
|
|
Initialize the MultiDongle class with support for both single and multi-series configurations.
|
|
|
|
:param port_id: List of USB port IDs for single-series (legacy). If None and auto_detect=True, will auto-detect.
|
|
:param scpu_fw_path: Path to the SCPU firmware file for single-series (legacy).
|
|
:param ncpu_fw_path: Path to the NCPU firmware file for single-series (legacy).
|
|
:param model_path: Path to the model file for single-series (legacy).
|
|
:param upload_fw: Flag to indicate whether to upload firmware for single-series (legacy).
|
|
:param auto_detect: Flag to auto-detect and connect to available devices for single-series (legacy).
|
|
:param max_queue_size: Maximum size for internal queues. If 0, unlimited queues are used.
|
|
:param multi_series_config: Multi-series configuration dict. Format:
|
|
{
|
|
"KL520": {
|
|
"port_ids": [28, 32],
|
|
"model_path": "path/to/kl520_model.nef",
|
|
"firmware_paths": { # Optional
|
|
"scpu": "path/to/kl520_scpu.bin",
|
|
"ncpu": "path/to/kl520_ncpu.bin"
|
|
}
|
|
}
|
|
}
|
|
:param postprocess_options: PostProcessorOptions for configuring output processing
|
|
"""
|
|
# Set up postprocessing
|
|
self.postprocess_options = postprocess_options or PostProcessorOptions()
|
|
self.postprocessor = PostProcessor(self.postprocess_options)
|
|
|
|
# Determine if we're using multi-series mode
|
|
self.multi_series_mode = multi_series_config is not None
|
|
|
|
if self.multi_series_mode:
|
|
# Multi-series initialization
|
|
self._init_multi_series(multi_series_config, max_queue_size)
|
|
else:
|
|
# Legacy single-series initialization
|
|
self._init_single_series(port_id, scpu_fw_path, ncpu_fw_path, model_path,
|
|
upload_fw, auto_detect, max_queue_size)
|
|
|
|
def _init_multi_series(self, multi_series_config: dict, max_queue_size: int):
|
|
"""Initialize multi-series configuration"""
|
|
self.series_config = multi_series_config
|
|
self.series_groups = {} # series_name -> config
|
|
self.device_groups = {} # series_name -> device_group
|
|
self.model_descriptors = {} # series_name -> model descriptor
|
|
self.gops_weights = {} # series_name -> normalized weight
|
|
self.current_loads = {} # series_name -> current queue size
|
|
|
|
# Set up series groups and calculate weights
|
|
total_gops = 0
|
|
for series_name, config in multi_series_config.items():
|
|
if series_name not in DongleSeriesSpec.SERIES_SPECS:
|
|
raise ValueError(f"Unknown series: {series_name}")
|
|
|
|
self.series_groups[series_name] = config
|
|
self.current_loads[series_name] = 0
|
|
|
|
# Calculate effective GOPS (series GOPS * number of devices)
|
|
port_count = len(config.get("port_ids", []))
|
|
series_gops = DongleSeriesSpec.SERIES_SPECS[series_name]["gops"]
|
|
effective_gops = series_gops * port_count
|
|
total_gops += effective_gops
|
|
|
|
# Calculate normalized weights
|
|
for series_name, config in multi_series_config.items():
|
|
port_count = len(config.get("port_ids", []))
|
|
series_gops = DongleSeriesSpec.SERIES_SPECS[series_name]["gops"]
|
|
effective_gops = series_gops * port_count
|
|
self.gops_weights[series_name] = effective_gops / total_gops if total_gops > 0 else 0
|
|
|
|
# Multi-series threading and queues
|
|
if max_queue_size > 0:
|
|
self._input_queue = queue.Queue(maxsize=max_queue_size)
|
|
self._ordered_output_queue = queue.Queue(maxsize=max_queue_size)
|
|
else:
|
|
self._input_queue = queue.Queue()
|
|
self._ordered_output_queue = queue.Queue()
|
|
|
|
# Create output queue for legacy compatibility
|
|
self._output_queue = self._ordered_output_queue # Point to the same queue
|
|
|
|
self.result_queues = {} # series_name -> queue
|
|
for series_name in multi_series_config.keys():
|
|
self.result_queues[series_name] = queue.Queue()
|
|
|
|
# Sequence management for ordered results
|
|
self.sequence_counter = 0
|
|
self.sequence_lock = threading.Lock()
|
|
self.pending_results = {} # sequence_id -> InferenceResult
|
|
self.next_output_sequence = 0
|
|
|
|
# Threading
|
|
self._stop_event = threading.Event()
|
|
self.dispatcher_thread = None
|
|
self.send_threads = {} # series_name -> thread
|
|
self.receive_threads = {} # series_name -> thread
|
|
self.result_ordering_thread = None
|
|
|
|
# Legacy attributes for compatibility
|
|
self.port_id = []
|
|
self.device_group = None
|
|
self.model_nef_descriptor = None
|
|
self.generic_inference_input_descriptor = None
|
|
self._inference_counter = 0
|
|
|
|
def _init_single_series(self, port_id: list, scpu_fw_path: str, ncpu_fw_path: str,
|
|
model_path: str, upload_fw: bool, auto_detect: bool, max_queue_size: int):
|
|
"""Initialize legacy single-series configuration"""
|
|
self.auto_detect = auto_detect
|
|
self.connected_devices_info = []
|
|
|
|
if auto_detect:
|
|
# Auto-detect devices
|
|
devices_info = self.scan_devices()
|
|
if devices_info:
|
|
self.port_id = [device['port_id'] for device in devices_info]
|
|
self.connected_devices_info = devices_info
|
|
else:
|
|
raise Exception("No Kneron devices found for auto-detection")
|
|
else:
|
|
self.port_id = port_id or []
|
|
|
|
self.upload_fw = upload_fw
|
|
|
|
# Always store firmware paths when provided
|
|
self.scpu_fw_path = scpu_fw_path
|
|
self.ncpu_fw_path = ncpu_fw_path
|
|
self.model_path = model_path
|
|
self.device_group = None
|
|
|
|
# generic_inference_input_descriptor will be prepared in initialize
|
|
self.model_nef_descriptor = None
|
|
self.generic_inference_input_descriptor = None
|
|
|
|
# Queues for data
|
|
if max_queue_size > 0:
|
|
self._input_queue = queue.Queue(maxsize=max_queue_size)
|
|
self._output_queue = queue.Queue(maxsize=max_queue_size)
|
|
else:
|
|
self._input_queue = queue.Queue()
|
|
self._output_queue = queue.Queue()
|
|
|
|
# Threading attributes
|
|
self._send_thread = None
|
|
self._receive_thread = None
|
|
self._stop_event = threading.Event()
|
|
|
|
self._inference_counter = 0
|
|
|
|
# Convert single-series to multi-series format internally for unified processing
|
|
self._convert_single_to_multi_series()
|
|
|
|
def _convert_single_to_multi_series(self):
|
|
"""
|
|
Convert single-series configuration to multi-series format internally
|
|
This allows unified processing regardless of initialization mode
|
|
"""
|
|
if not self.port_id:
|
|
# No ports specified, create empty structure
|
|
self.series_groups = {}
|
|
self.gops_weights = {}
|
|
self.current_loads = {}
|
|
return
|
|
|
|
# Detect series from connected devices or use default
|
|
detected_series = self._detect_series_from_ports(self.port_id)
|
|
|
|
# Create multi-series config format
|
|
self.series_groups = {
|
|
detected_series: {
|
|
"port_ids": self.port_id.copy(),
|
|
"model_path": self.model_path,
|
|
"firmware_paths": {
|
|
"scpu": self.scpu_fw_path,
|
|
"ncpu": self.ncpu_fw_path
|
|
} if self.scpu_fw_path and self.ncpu_fw_path else {}
|
|
}
|
|
}
|
|
|
|
# Calculate GOPS weights (100% since it's single series)
|
|
self.gops_weights = {detected_series: 1.0}
|
|
|
|
# Initialize load tracking
|
|
self.current_loads = {detected_series: 0}
|
|
|
|
print(f"Single-series config converted to multi-series format: {detected_series}")
|
|
|
|
def _detect_series_from_ports(self, port_ids: List[int]) -> str:
|
|
"""
|
|
Detect series from port IDs by scanning connected devices
|
|
Falls back to KL520 if unable to detect
|
|
"""
|
|
try:
|
|
# Try to scan devices and match port IDs
|
|
devices_info = self.scan_devices()
|
|
|
|
for device_info in devices_info:
|
|
if device_info['port_id'] in port_ids:
|
|
series = device_info.get('series', 'Unknown')
|
|
if series != 'Unknown':
|
|
return series
|
|
|
|
# If scanning didn't work, try to auto-detect from the first available device
|
|
if self.auto_detect and self.connected_devices_info:
|
|
for device_info in self.connected_devices_info:
|
|
series = device_info.get('series', 'Unknown')
|
|
if series != 'Unknown':
|
|
return series
|
|
except Exception as e:
|
|
print(f"Warning: Could not detect series from devices: {e}")
|
|
|
|
# Fallback to KL520 (most common series)
|
|
print("Warning: Could not detect device series, defaulting to KL520")
|
|
return "KL520"
|
|
|
|
def _select_optimal_series(self) -> Optional[str]:
|
|
"""
|
|
Select optimal series based on current load and GOPS capacity with performance bias
|
|
Returns the series name with the best load/capacity ratio, favoring high-performance dongles
|
|
"""
|
|
if not self.multi_series_mode or not self.series_groups:
|
|
return None
|
|
|
|
best_score = float('inf')
|
|
selected_series = None
|
|
|
|
# Get series GOPS values for performance bias
|
|
series_gops = {}
|
|
for series_name in self.series_groups.keys():
|
|
# Extract GOPS from DongleSeriesSpec
|
|
for spec_name, spec_info in DongleSeriesSpec.SERIES_SPECS.items():
|
|
if spec_name == series_name:
|
|
series_gops[series_name] = spec_info["gops"]
|
|
break
|
|
|
|
for series_name in self.series_groups.keys():
|
|
current_load = self.current_loads.get(series_name, 0)
|
|
weight = self.gops_weights.get(series_name, 0)
|
|
gops = series_gops.get(series_name, 1)
|
|
|
|
if weight <= 0:
|
|
continue
|
|
|
|
# Calculate load ratio (lower is better)
|
|
load_ratio = current_load / weight
|
|
|
|
# Add performance bias: penalize low-GOPS devices more heavily
|
|
# This encourages using high-performance dongles even if they have slightly higher load
|
|
if gops < 10: # Low-performance threshold (like KL520 with 2 GOPS)
|
|
performance_penalty = 2.0 # 2x penalty for slow devices
|
|
else:
|
|
performance_penalty = 1.0
|
|
|
|
# Combined score considers both load and performance
|
|
combined_score = load_ratio * performance_penalty
|
|
|
|
if combined_score < best_score:
|
|
best_score = combined_score
|
|
selected_series = series_name
|
|
|
|
return selected_series
|
|
|
|
def initialize(self):
|
|
"""
|
|
Connect devices, upload firmware (if upload_fw is True), and upload model.
|
|
Must be called before start().
|
|
"""
|
|
if self.multi_series_mode:
|
|
# Multi-series initialization
|
|
self._initialize_multi_series()
|
|
else:
|
|
# Legacy single-series initialization
|
|
self._initialize_single_series()
|
|
|
|
def _initialize_single_series(self):
|
|
"""Initialize single-series (legacy) mode"""
|
|
# Connect device and assign to self.device_group
|
|
try:
|
|
print('[Connect Device]')
|
|
self.device_group = kp.core.connect_devices(usb_port_ids=self.port_id)
|
|
print(' - Success')
|
|
except kp.ApiKPException as exception:
|
|
print('Error: connect device fail, port ID = \'{}\', error msg: [{}]'.format(self.port_id, str(exception)))
|
|
sys.exit(1)
|
|
|
|
# setting timeout of the usb communication with the device
|
|
# Note: Timeout setting removed as it causes crashes when camera is connected
|
|
print('[Set Device Timeout]')
|
|
print(' - Skipped (prevents camera connection crashes)')
|
|
|
|
if self.upload_fw:
|
|
try:
|
|
print('[Upload Firmware]')
|
|
kp.core.load_firmware_from_file(device_group=self.device_group,
|
|
scpu_fw_path=self.scpu_fw_path,
|
|
ncpu_fw_path=self.ncpu_fw_path)
|
|
print(' - Success')
|
|
except kp.ApiKPException as exception:
|
|
print('Error: upload firmware failed, error = \'{}\''.format(str(exception)))
|
|
sys.exit(1)
|
|
|
|
# upload model to device
|
|
try:
|
|
print('[Upload Model]')
|
|
self.model_nef_descriptor = kp.core.load_model_from_file(device_group=self.device_group,
|
|
file_path=self.model_path)
|
|
print(' - Success')
|
|
except kp.ApiKPException as exception:
|
|
print('Error: upload model failed, error = \'{}\''.format(str(exception)))
|
|
sys.exit(1)
|
|
|
|
# Extract model input dimensions automatically from model metadata
|
|
if self.model_nef_descriptor and self.model_nef_descriptor.models:
|
|
model = self.model_nef_descriptor.models[0]
|
|
if hasattr(model, 'input_nodes') and model.input_nodes:
|
|
input_node = model.input_nodes[0]
|
|
# From your JSON: "shape_npu": [1, 3, 128, 128] -> (width, height)
|
|
shape = input_node.tensor_shape_info.data.shape_npu
|
|
self.model_input_shape = (shape[3], shape[2]) # (width, height)
|
|
self.model_input_channels = shape[1] # 3 for RGB
|
|
print(f"Model input shape detected: {self.model_input_shape}, channels: {self.model_input_channels}")
|
|
else:
|
|
self.model_input_shape = (128, 128) # fallback
|
|
self.model_input_channels = 3
|
|
print("Using default input shape (128, 128)")
|
|
else:
|
|
self.model_input_shape = (128, 128)
|
|
self.model_input_channels = 3
|
|
print("Model info not available, using default shape")
|
|
|
|
# Prepare generic inference input descriptor after model is loaded
|
|
if self.model_nef_descriptor:
|
|
self.generic_inference_input_descriptor = kp.GenericImageInferenceDescriptor(
|
|
model_id=self.model_nef_descriptor.models[0].id,
|
|
)
|
|
else:
|
|
print("Warning: Could not get generic inference input descriptor from model.")
|
|
self.generic_inference_input_descriptor = None
|
|
|
|
def _initialize_multi_series(self):
|
|
"""Initialize multi-series mode"""
|
|
print('[Multi-Series Initialization]')
|
|
|
|
# Initialize each series separately
|
|
for series_name, config in self.series_config.items():
|
|
print(f'[Initializing {series_name}]')
|
|
|
|
# Get port IDs for this series
|
|
port_ids = config.get('port_ids', [])
|
|
if not port_ids:
|
|
print(f'Warning: No port IDs configured for {series_name}, skipping')
|
|
continue
|
|
|
|
# Connect devices for this series
|
|
try:
|
|
print(f' [Connect Devices] Port IDs: {port_ids}')
|
|
device_group = kp.core.connect_devices(usb_port_ids=port_ids)
|
|
self.device_groups[series_name] = device_group
|
|
print(f' - Success ({len(port_ids)} devices)')
|
|
except kp.ApiKPException as exception:
|
|
print(f'Error: connect devices failed for {series_name}, port IDs = {port_ids}, error = {str(exception)}')
|
|
continue
|
|
|
|
# Upload firmware if available
|
|
firmware_paths = config.get('firmware_paths')
|
|
if firmware_paths and 'scpu' in firmware_paths and 'ncpu' in firmware_paths:
|
|
try:
|
|
print(f' [Upload Firmware]')
|
|
kp.core.load_firmware_from_file(
|
|
device_group=device_group,
|
|
scpu_fw_path=firmware_paths['scpu'],
|
|
ncpu_fw_path=firmware_paths['ncpu']
|
|
)
|
|
print(f' - Success')
|
|
except kp.ApiKPException as exception:
|
|
print(f'Error: upload firmware failed for {series_name}, error = {str(exception)}')
|
|
continue
|
|
else:
|
|
print(f' [Upload Firmware] - Skipped (no firmware paths configured)')
|
|
|
|
# Upload model
|
|
model_path = config.get('model_path')
|
|
if model_path:
|
|
try:
|
|
print(f' [Upload Model]')
|
|
model_descriptor = kp.core.load_model_from_file(
|
|
device_group=device_group,
|
|
file_path=model_path
|
|
)
|
|
self.model_descriptors[series_name] = model_descriptor
|
|
print(f' - Success')
|
|
|
|
# Extract model input dimensions for this series
|
|
if model_descriptor and model_descriptor.models:
|
|
model = model_descriptor.models[0]
|
|
if hasattr(model, 'input_nodes') and model.input_nodes:
|
|
input_node = model.input_nodes[0]
|
|
shape = input_node.tensor_shape_info.data.shape_npu
|
|
model_input_shape = (shape[3], shape[2]) # (width, height)
|
|
model_input_channels = shape[1] # 3 for RGB
|
|
print(f' Model input shape: {model_input_shape}, channels: {model_input_channels}')
|
|
|
|
# Store series-specific model info
|
|
self.series_groups[series_name]['model_input_shape'] = model_input_shape
|
|
self.series_groups[series_name]['model_input_channels'] = model_input_channels
|
|
|
|
except kp.ApiKPException as exception:
|
|
print(f'Error: upload model failed for {series_name}, error = {str(exception)}')
|
|
continue
|
|
else:
|
|
print(f' [Upload Model] - Skipped (no model path configured)')
|
|
|
|
print('[Multi-Series Initialization Complete]')
|
|
|
|
# Set up legacy compatibility attributes using the first series
|
|
if self.device_groups:
|
|
first_series = next(iter(self.device_groups.keys()))
|
|
self.device_group = self.device_groups[first_series]
|
|
self.model_nef_descriptor = self.model_descriptors.get(first_series)
|
|
|
|
# Set up generic inference descriptor from first series
|
|
if self.model_nef_descriptor:
|
|
self.generic_inference_input_descriptor = kp.GenericImageInferenceDescriptor(
|
|
model_id=self.model_nef_descriptor.models[0].id,
|
|
)
|
|
|
|
# Set model input shape from first series
|
|
if first_series in self.series_groups:
|
|
series_info = self.series_groups[first_series]
|
|
self.model_input_shape = series_info.get('model_input_shape', (640, 640))
|
|
self.model_input_channels = series_info.get('model_input_channels', 3)
|
|
else:
|
|
self.model_input_shape = (640, 640)
|
|
self.model_input_channels = 3
|
|
|
|
def preprocess_frame(self, frame: np.ndarray, target_format: str = 'BGR565') -> np.ndarray:
|
|
"""
|
|
Preprocess frame for inference
|
|
"""
|
|
resized_frame = cv2.resize(frame, self.model_input_shape)
|
|
|
|
if target_format == 'BGR565':
|
|
return cv2.cvtColor(resized_frame, cv2.COLOR_BGR2BGR565)
|
|
elif target_format == 'RGB8888':
|
|
return cv2.cvtColor(resized_frame, cv2.COLOR_BGR2RGBA)
|
|
elif target_format == 'YUYV':
|
|
return cv2.cvtColor(resized_frame, cv2.COLOR_BGR2YUV_YUYV)
|
|
else:
|
|
return resized_frame # RAW8 or other formats
|
|
|
|
def get_latest_inference_result(self, timeout: float = 0.01) -> Tuple[Any, str]:
|
|
"""
|
|
Get the latest inference result with postprocessing
|
|
Returns: (processed_result, result_string) or (None, None) if no result
|
|
"""
|
|
output_descriptor = self.get_output(timeout=timeout)
|
|
if not output_descriptor:
|
|
return None, None
|
|
|
|
# Process the output descriptor
|
|
if hasattr(output_descriptor, 'header') and \
|
|
hasattr(output_descriptor.header, 'num_output_node') and \
|
|
hasattr(output_descriptor.header, 'inference_number'):
|
|
|
|
inf_node_output_list = []
|
|
retrieval_successful = True
|
|
|
|
for node_idx in range(output_descriptor.header.num_output_node):
|
|
try:
|
|
inference_float_node_output = kp.inference.generic_inference_retrieve_float_node(
|
|
node_idx=node_idx,
|
|
generic_raw_result=output_descriptor,
|
|
channels_ordering=kp.ChannelOrdering.KP_CHANNEL_ORDERING_CHW
|
|
)
|
|
inf_node_output_list.append(inference_float_node_output)
|
|
except kp.ApiKPException as e:
|
|
retrieval_successful = False
|
|
break
|
|
except Exception as e:
|
|
retrieval_successful = False
|
|
break
|
|
|
|
if retrieval_successful and len(inf_node_output_list) > 0:
|
|
# Get hardware preprocessing info for YOLO models
|
|
hardware_preproc_info = None
|
|
if hasattr(output_descriptor.header, 'hw_pre_proc_info_list') and len(output_descriptor.header.hw_pre_proc_info_list) > 0:
|
|
hardware_preproc_info = output_descriptor.header.hw_pre_proc_info_list[0]
|
|
|
|
# Process with configured postprocessor
|
|
if self.postprocess_options.postprocess_type in [PostProcessType.YOLO_V3, PostProcessType.YOLO_V5]:
|
|
# For YOLO models, pass the full output list and hardware info
|
|
processed_result = self.postprocessor.process(inf_node_output_list, hardware_preproc_info=hardware_preproc_info)
|
|
else:
|
|
# For classification models, process the raw array
|
|
if output_descriptor.header.num_output_node == 1:
|
|
raw_output_array = inf_node_output_list[0].ndarray.flatten()
|
|
else:
|
|
concatenated_outputs = [node.ndarray.flatten() for node in inf_node_output_list]
|
|
raw_output_array = np.concatenate(concatenated_outputs) if concatenated_outputs else np.array([])
|
|
|
|
processed_result = self.postprocessor.process(raw_output_array)
|
|
|
|
# Generate result string based on output type
|
|
result_str = self._generate_result_string(processed_result)
|
|
return processed_result, result_str
|
|
|
|
return None, None
|
|
|
|
def _generate_result_string(self, processed_result: Any) -> str:
|
|
"""Generate a human-readable result string from processed output"""
|
|
if isinstance(processed_result, ClassificationResult):
|
|
return f"{processed_result.class_name} (Prob: {processed_result.probability:.2f})"
|
|
elif isinstance(processed_result, ObjectDetectionResult):
|
|
if processed_result.box_count == 0:
|
|
return "No objects detected"
|
|
else:
|
|
# Create detailed description of detected objects
|
|
object_summary = {}
|
|
for box in processed_result.box_list:
|
|
class_name = box.class_name
|
|
if class_name in object_summary:
|
|
object_summary[class_name] += 1
|
|
else:
|
|
object_summary[class_name] = 1
|
|
|
|
# Format summary
|
|
if len(object_summary) == 1:
|
|
# Single class detected
|
|
class_name, count = list(object_summary.items())[0]
|
|
if count == 1:
|
|
# Get the confidence of the single detection
|
|
confidence = processed_result.box_list[0].score
|
|
return f"{class_name} detected (Conf: {confidence:.2f})"
|
|
else:
|
|
return f"{count} {class_name}s detected"
|
|
else:
|
|
# Multiple classes detected
|
|
parts = []
|
|
for class_name, count in sorted(object_summary.items()):
|
|
if count == 1:
|
|
parts.append(f"1 {class_name}")
|
|
else:
|
|
parts.append(f"{count} {class_name}s")
|
|
return f"Detected: {', '.join(parts)}"
|
|
else:
|
|
return str(processed_result)
|
|
|
|
|
|
# Modified _send_thread_func to get data from input queue
|
|
def _send_thread_func(self):
|
|
"""Internal function run by the send thread, gets images from input queue."""
|
|
print("Send thread started.")
|
|
send_count = 0
|
|
while not self._stop_event.is_set():
|
|
if self.generic_inference_input_descriptor is None:
|
|
# Wait for descriptor to be ready or stop
|
|
self._stop_event.wait(0.1) # Avoid busy waiting
|
|
continue
|
|
|
|
try:
|
|
# Get image and format from the input queue
|
|
# Blocks until an item is available or stop event is set/timeout occurs
|
|
try:
|
|
# Use get with timeout or check stop event in a loop
|
|
# This pattern allows thread to check stop event while waiting on queue
|
|
item = self._input_queue.get(block=True, timeout=0.1)
|
|
# Check if this is our sentinel value
|
|
if item is None:
|
|
continue
|
|
|
|
# Now safely unpack the tuple
|
|
image_data, image_format_enum = item
|
|
except queue.Empty:
|
|
# If queue is empty after timeout, check stop event and continue loop
|
|
continue
|
|
|
|
# Configure and send the image
|
|
self._inference_counter += 1 # Increment counter for each image
|
|
send_count += 1
|
|
|
|
# Debug: Log send activity every 100 images
|
|
if send_count % 100 == 0:
|
|
print(f"[MultiDongle] Sent {send_count} images to inference")
|
|
|
|
self.generic_inference_input_descriptor.inference_number = self._inference_counter
|
|
self.generic_inference_input_descriptor.input_node_image_list = [kp.GenericInputNodeImage(
|
|
image=image_data,
|
|
image_format=image_format_enum, # Use the format from the queue
|
|
resize_mode=kp.ResizeMode.KP_RESIZE_ENABLE,
|
|
padding_mode=kp.PaddingMode.KP_PADDING_CORNER,
|
|
normalize_mode=kp.NormalizeMode.KP_NORMALIZE_KNERON
|
|
)]
|
|
|
|
kp.inference.generic_image_inference_send(device_group=self.device_group,
|
|
generic_inference_input_descriptor=self.generic_inference_input_descriptor)
|
|
# No need for sleep here usually, as queue.get is blocking
|
|
except kp.ApiKPException as exception:
|
|
print(f' - Error in send thread: inference send failed, error = {exception}')
|
|
self._stop_event.set() # Signal other thread to stop
|
|
except Exception as e:
|
|
print(f' - Unexpected error in send thread: {e}')
|
|
self._stop_event.set()
|
|
|
|
print("Send thread stopped.")
|
|
|
|
# _receive_thread_func remains the same
|
|
def _receive_thread_func(self):
|
|
"""Internal function run by the receive thread, puts results into output queue."""
|
|
print("Receive thread started.")
|
|
receive_count = 0
|
|
while not self._stop_event.is_set():
|
|
try:
|
|
generic_inference_output_descriptor = kp.inference.generic_image_inference_receive(device_group=self.device_group)
|
|
self._output_queue.put(generic_inference_output_descriptor)
|
|
receive_count += 1
|
|
|
|
# Debug: Log receive activity every 100 results
|
|
if receive_count % 100 == 0:
|
|
print(f"[MultiDongle] Received {receive_count} inference results")
|
|
except kp.ApiKPException as exception:
|
|
if not self._stop_event.is_set(): # Avoid printing error if we are already stopping
|
|
print(f' - Error in receive thread: inference receive failed, error = {exception}')
|
|
self._stop_event.set()
|
|
except Exception as e:
|
|
print(f' - Unexpected error in receive thread: {e}')
|
|
self._stop_event.set()
|
|
|
|
print("Receive thread stopped.")
|
|
|
|
def start(self):
|
|
"""
|
|
Start the send and receive threads.
|
|
Must be called after initialize().
|
|
"""
|
|
if self.multi_series_mode:
|
|
self._start_multi_series()
|
|
else:
|
|
self._start_single_series()
|
|
|
|
def _start_single_series(self):
|
|
"""Start single-series (legacy) mode"""
|
|
if self.device_group is None:
|
|
raise RuntimeError("MultiDongle not initialized. Call initialize() first.")
|
|
|
|
if self._send_thread is None or not self._send_thread.is_alive():
|
|
self._stop_event.clear() # Clear stop event for a new start
|
|
self._send_thread = threading.Thread(target=self._send_thread_func, daemon=True)
|
|
self._send_thread.start()
|
|
print("Send thread started.")
|
|
|
|
if self._receive_thread is None or not self._receive_thread.is_alive():
|
|
self._receive_thread = threading.Thread(target=self._receive_thread_func, daemon=True)
|
|
self._receive_thread.start()
|
|
print("Receive thread started.")
|
|
|
|
def _start_multi_series(self):
|
|
"""Start multi-series mode"""
|
|
if not self.device_groups:
|
|
raise RuntimeError("MultiDongle not initialized. Call initialize() first.")
|
|
|
|
print("[Starting Multi-Series Threads]")
|
|
self._stop_event.clear()
|
|
|
|
# Start dispatcher thread
|
|
if self.dispatcher_thread is None or not self.dispatcher_thread.is_alive():
|
|
self.dispatcher_thread = threading.Thread(target=self._dispatcher_thread_func, daemon=True)
|
|
self.dispatcher_thread.start()
|
|
print("Dispatcher thread started.")
|
|
|
|
# Start send/receive threads for each series
|
|
for series_name in self.device_groups.keys():
|
|
# Start send thread for this series
|
|
if series_name not in self.send_threads or not self.send_threads[series_name].is_alive():
|
|
send_thread = threading.Thread(
|
|
target=self._multi_series_send_thread_func,
|
|
args=(series_name,),
|
|
daemon=True
|
|
)
|
|
self.send_threads[series_name] = send_thread
|
|
send_thread.start()
|
|
print(f"Send thread started for {series_name}.")
|
|
|
|
# Start receive thread for this series
|
|
if series_name not in self.receive_threads or not self.receive_threads[series_name].is_alive():
|
|
receive_thread = threading.Thread(
|
|
target=self._multi_series_receive_thread_func,
|
|
args=(series_name,),
|
|
daemon=True
|
|
)
|
|
self.receive_threads[series_name] = receive_thread
|
|
receive_thread.start()
|
|
print(f"Receive thread started for {series_name}.")
|
|
|
|
# Start result ordering thread
|
|
if self.result_ordering_thread is None or not self.result_ordering_thread.is_alive():
|
|
self.result_ordering_thread = threading.Thread(target=self._result_ordering_thread_func, daemon=True)
|
|
self.result_ordering_thread.start()
|
|
print("Result ordering thread started.")
|
|
|
|
def stop(self):
|
|
"""Improved stop method with better cleanup"""
|
|
if self._stop_event.is_set():
|
|
return # Already stopping
|
|
|
|
if self.multi_series_mode:
|
|
self._stop_multi_series()
|
|
else:
|
|
self._stop_single_series()
|
|
|
|
def _stop_single_series(self):
|
|
"""Stop single-series (legacy) mode"""
|
|
print("Stopping threads...")
|
|
self._stop_event.set()
|
|
|
|
# Clear queues to unblock threads
|
|
while not self._input_queue.empty():
|
|
try:
|
|
self._input_queue.get_nowait()
|
|
except queue.Empty:
|
|
break
|
|
|
|
# Signal send thread to wake up
|
|
self._input_queue.put(None)
|
|
|
|
# Join threads with timeout
|
|
for thread, name in [(self._send_thread, "Send"), (self._receive_thread, "Receive")]:
|
|
if thread and thread.is_alive():
|
|
thread.join(timeout=2.0)
|
|
if thread.is_alive():
|
|
print(f"Warning: {name} thread didn't stop cleanly")
|
|
|
|
print("Disconnecting device group...")
|
|
if self.device_group:
|
|
try:
|
|
kp.core.disconnect_devices(device_group=self.device_group)
|
|
print("Device group disconnected successfully.")
|
|
except kp.ApiKPException as e:
|
|
print(f"Error disconnecting device group: {e}")
|
|
self.device_group = None
|
|
|
|
def _stop_multi_series(self):
|
|
"""Stop multi-series mode"""
|
|
print("[Stopping Multi-Series Threads]")
|
|
self._stop_event.set()
|
|
|
|
# Clear input queue to unblock dispatcher
|
|
while not self._input_queue.empty():
|
|
try:
|
|
self._input_queue.get_nowait()
|
|
except queue.Empty:
|
|
break
|
|
|
|
# Signal dispatcher thread to wake up
|
|
self._input_queue.put(None)
|
|
|
|
# Clear series result queues
|
|
for series_name, result_queue in self.result_queues.items():
|
|
while not result_queue.empty():
|
|
try:
|
|
result_queue.get_nowait()
|
|
except queue.Empty:
|
|
break
|
|
|
|
# Stop all send threads
|
|
for series_name, send_thread in self.send_threads.items():
|
|
if send_thread and send_thread.is_alive():
|
|
send_thread.join(timeout=2.0)
|
|
if send_thread.is_alive():
|
|
print(f"Warning: Send thread for {series_name} didn't stop cleanly")
|
|
|
|
# Stop all receive threads
|
|
for series_name, receive_thread in self.receive_threads.items():
|
|
if receive_thread and receive_thread.is_alive():
|
|
receive_thread.join(timeout=2.0)
|
|
if receive_thread.is_alive():
|
|
print(f"Warning: Receive thread for {series_name} didn't stop cleanly")
|
|
|
|
# Stop dispatcher thread
|
|
if self.dispatcher_thread and self.dispatcher_thread.is_alive():
|
|
self.dispatcher_thread.join(timeout=2.0)
|
|
if self.dispatcher_thread.is_alive():
|
|
print("Warning: Dispatcher thread didn't stop cleanly")
|
|
|
|
# Stop result ordering thread
|
|
if self.result_ordering_thread and self.result_ordering_thread.is_alive():
|
|
self.result_ordering_thread.join(timeout=2.0)
|
|
if self.result_ordering_thread.is_alive():
|
|
print("Warning: Result ordering thread didn't stop cleanly")
|
|
|
|
# Disconnect all device groups
|
|
print("Disconnecting device groups...")
|
|
for series_name, device_group in self.device_groups.items():
|
|
try:
|
|
kp.core.disconnect_devices(device_group=device_group)
|
|
print(f"Device group for {series_name} disconnected successfully.")
|
|
except kp.ApiKPException as e:
|
|
print(f"Error disconnecting device group for {series_name}: {e}")
|
|
|
|
self.device_groups.clear()
|
|
|
|
def _dispatcher_thread_func(self):
|
|
"""Dispatcher thread: assigns tasks to dongles based on load balancing"""
|
|
print("Dispatcher thread started")
|
|
|
|
while not self._stop_event.is_set():
|
|
try:
|
|
task = self._input_queue.get(timeout=0.1)
|
|
if task is None: # Sentinel value
|
|
continue
|
|
|
|
# Select optimal dongle based on current load and capacity
|
|
selected_series = self._select_optimal_series()
|
|
if selected_series is None:
|
|
print("Warning: No series available for task dispatch")
|
|
continue
|
|
|
|
# Enqueue to selected series
|
|
self.result_queues[selected_series].put(task)
|
|
self.current_loads[selected_series] += 1
|
|
|
|
except queue.Empty:
|
|
continue
|
|
except Exception as e:
|
|
print(f"Error in dispatcher: {e}")
|
|
if not self._stop_event.is_set():
|
|
self._stop_event.set()
|
|
|
|
print("Dispatcher thread stopped")
|
|
|
|
def _multi_series_send_thread_func(self, series_name: str):
|
|
"""Send thread for specific dongle series - with tuple handling fix"""
|
|
print(f"Send worker started for {series_name}")
|
|
|
|
device_group = self.device_groups[series_name]
|
|
result_queue = self.result_queues[series_name]
|
|
model_descriptor = self.model_descriptors[series_name]
|
|
|
|
while not self._stop_event.is_set():
|
|
try:
|
|
task = result_queue.get(timeout=0.1)
|
|
if task is None:
|
|
continue
|
|
|
|
# Handle both tuple and dict formats
|
|
if isinstance(task, tuple):
|
|
# Legacy single-series format: (image_data, image_format)
|
|
image_data, image_format = task
|
|
sequence_id = getattr(self, '_inference_counter', 0)
|
|
self._inference_counter = sequence_id + 1
|
|
elif isinstance(task, dict):
|
|
# Multi-series format: dict with keys
|
|
image_data = task.get('image_data')
|
|
image_format = task.get('image_format', kp.ImageFormat.KP_IMAGE_FORMAT_RGB565)
|
|
sequence_id = task.get('sequence_id', 0)
|
|
else:
|
|
print(f"Error: Unknown task format: {type(task)}")
|
|
continue
|
|
|
|
if image_data is None:
|
|
print(f"Error: No image data in task")
|
|
continue
|
|
|
|
# Create inference descriptor for this task
|
|
inference_descriptor = kp.GenericImageInferenceDescriptor(
|
|
model_id=model_descriptor.models[0].id,
|
|
)
|
|
inference_descriptor.inference_number = sequence_id
|
|
|
|
inference_descriptor.input_node_image_list = [
|
|
kp.GenericInputNodeImage(
|
|
image=image_data,
|
|
image_format=image_format,
|
|
resize_mode=kp.ResizeMode.KP_RESIZE_ENABLE,
|
|
padding_mode=kp.PaddingMode.KP_PADDING_CORNER,
|
|
normalize_mode=kp.NormalizeMode.KP_NORMALIZE_KNERON
|
|
)
|
|
]
|
|
|
|
# Send inference
|
|
kp.inference.generic_image_inference_send(
|
|
device_group=device_group,
|
|
generic_inference_input_descriptor=inference_descriptor
|
|
)
|
|
|
|
except queue.Empty:
|
|
continue
|
|
except kp.ApiKPException as e:
|
|
print(f"Error in {series_name} send worker: {e}")
|
|
if not self._stop_event.is_set():
|
|
self._stop_event.set()
|
|
except Exception as e:
|
|
print(f"Unexpected error in {series_name} send worker: {e}")
|
|
if not self._stop_event.is_set():
|
|
self._stop_event.set()
|
|
|
|
print(f"Send worker stopped for {series_name}")
|
|
|
|
def _multi_series_receive_thread_func(self, series_name: str):
|
|
"""Receive thread for specific dongle series"""
|
|
print(f"Receive worker started for {series_name}")
|
|
|
|
device_group = self.device_groups[series_name]
|
|
|
|
while not self._stop_event.is_set():
|
|
try:
|
|
# Receive inference result
|
|
raw_result = kp.inference.generic_image_inference_receive(device_group=device_group)
|
|
|
|
# Create result object
|
|
result = {
|
|
'sequence_id': raw_result.header.inference_number,
|
|
'result': raw_result,
|
|
'dongle_series': series_name,
|
|
'timestamp': time.time()
|
|
}
|
|
|
|
# Add to pending results for ordering
|
|
self.pending_results[result['sequence_id']] = result
|
|
self.current_loads[series_name] = max(0, self.current_loads[series_name] - 1)
|
|
|
|
except kp.ApiKPException as e:
|
|
if not self._stop_event.is_set():
|
|
print(f"Error in {series_name} receive worker: {e}")
|
|
self._stop_event.set()
|
|
except Exception as e:
|
|
print(f"Unexpected error in {series_name} receive worker: {e}")
|
|
|
|
print(f"Receive worker stopped for {series_name}")
|
|
|
|
def _result_ordering_thread_func(self):
|
|
"""Result ordering thread: ensures results are output in sequence order"""
|
|
print("Result ordering worker started")
|
|
|
|
# Track when we started waiting for each sequence
|
|
sequence_wait_times = {}
|
|
MAX_WAIT_TIME = 2.0 # Maximum wait time for slow sequences (seconds)
|
|
|
|
while not self._stop_event.is_set():
|
|
current_time = time.time()
|
|
|
|
# Check if next expected result is available
|
|
if self.next_output_sequence in self.pending_results:
|
|
result = self.pending_results.pop(self.next_output_sequence)
|
|
self._ordered_output_queue.put(result)
|
|
|
|
# Remove from wait tracking
|
|
sequence_wait_times.pop(self.next_output_sequence, None)
|
|
self.next_output_sequence += 1
|
|
|
|
# Clean up old pending results to prevent memory bloat
|
|
if len(self.pending_results) > 1000: # result_buffer_size
|
|
oldest_sequences = sorted(self.pending_results.keys())[:500]
|
|
for seq_id in oldest_sequences:
|
|
if seq_id < self.next_output_sequence:
|
|
self.pending_results.pop(seq_id, None)
|
|
else:
|
|
# Track how long we've been waiting for this sequence
|
|
if self.next_output_sequence not in sequence_wait_times:
|
|
sequence_wait_times[self.next_output_sequence] = current_time
|
|
|
|
# Check if we've been waiting too long
|
|
wait_time = current_time - sequence_wait_times[self.next_output_sequence]
|
|
if wait_time > MAX_WAIT_TIME:
|
|
print(f"Warning: Skipping sequence {self.next_output_sequence} after {wait_time:.2f}s timeout")
|
|
|
|
# Create a timeout result
|
|
timeout_result = {
|
|
'sequence_id': self.next_output_sequence,
|
|
'result': {'error': 'timeout', 'probability': 0.0, 'result_string': 'Timeout'},
|
|
'dongle_series': 'timeout',
|
|
'timestamp': current_time
|
|
}
|
|
self._ordered_output_queue.put(timeout_result)
|
|
|
|
# Remove from wait tracking and advance sequence
|
|
sequence_wait_times.pop(self.next_output_sequence, None)
|
|
self.next_output_sequence += 1
|
|
else:
|
|
time.sleep(0.001) # Small delay to prevent busy waiting
|
|
|
|
print("Result ordering worker stopped")
|
|
|
|
def put_input(self, image: Union[str, np.ndarray], format: str, target_size: Tuple[int, int] = None):
|
|
"""
|
|
Put an image into the input queue with flexible preprocessing
|
|
"""
|
|
if isinstance(image, str):
|
|
image_data = cv2.imread(image)
|
|
if image_data is None:
|
|
raise FileNotFoundError(f"Image file not found at {image}")
|
|
if target_size:
|
|
image_data = cv2.resize(image_data, target_size)
|
|
elif isinstance(image, np.ndarray):
|
|
# Don't modify original array, make copy if needed
|
|
image_data = image.copy() if target_size is None else cv2.resize(image, target_size)
|
|
else:
|
|
raise ValueError("Image must be a file path (str) or a numpy array (ndarray).")
|
|
|
|
if format in self._FORMAT_MAPPING:
|
|
image_format_enum = self._FORMAT_MAPPING[format]
|
|
else:
|
|
raise ValueError(f"Unsupported format: {format}")
|
|
|
|
if self.multi_series_mode:
|
|
# In multi-series mode, create a task with sequence ID
|
|
with self.sequence_lock:
|
|
sequence_id = self.sequence_counter
|
|
self.sequence_counter += 1
|
|
|
|
task = {
|
|
'sequence_id': sequence_id,
|
|
'image_data': image_data,
|
|
'image_format': image_format_enum,
|
|
'timestamp': time.time()
|
|
}
|
|
self._input_queue.put(task)
|
|
else:
|
|
# In single-series mode, use the original format
|
|
self._input_queue.put((image_data, image_format_enum))
|
|
|
|
def get_output(self, timeout: float = None):
|
|
"""
|
|
Get the next received data from the output queue.
|
|
This method is non-blocking by default unless a timeout is specified.
|
|
:param timeout: Time in seconds to wait for data. If None, it's non-blocking.
|
|
:return: Received data (e.g., kp.GenericInferenceOutputDescriptor) or None if no data available within timeout.
|
|
"""
|
|
try:
|
|
if self.multi_series_mode:
|
|
# In multi-series mode, use the ordered output queue
|
|
result = self._ordered_output_queue.get(block=timeout is not None, timeout=timeout)
|
|
if result and isinstance(result, dict):
|
|
return result.get('result') # Extract the actual inference result
|
|
return result
|
|
else:
|
|
# In single-series mode, use the regular output queue
|
|
return self._output_queue.get(block=timeout is not None, timeout=timeout)
|
|
except queue.Empty:
|
|
return None
|
|
|
|
def get_device_info(self):
|
|
"""
|
|
Get information about connected devices including port IDs and series.
|
|
|
|
Returns:
|
|
List[Dict]: List of device information with port_id and series
|
|
"""
|
|
if self.auto_detect and self.connected_devices_info:
|
|
return self.connected_devices_info
|
|
|
|
# If not auto-detected, try to get info from device group
|
|
if self.device_group:
|
|
try:
|
|
device_info_list = []
|
|
|
|
# Get device group content
|
|
device_group_content = self.device_group.content
|
|
|
|
# Iterate through devices in the group
|
|
for i, port_id in enumerate(self.port_id):
|
|
device_info = {
|
|
'port_id': port_id,
|
|
'series': 'Unknown', # We'll try to determine this
|
|
'device_descriptor': None
|
|
}
|
|
|
|
# Try to get device series from device group
|
|
try:
|
|
# This is a simplified approach - you might need to adjust
|
|
# based on the actual device group structure
|
|
if hasattr(device_group_content, 'devices') and i < len(device_group_content.devices):
|
|
device = device_group_content.devices[i]
|
|
if hasattr(device, 'chip_id'):
|
|
device_info['series'] = self._chip_id_to_series(device.chip_id)
|
|
except:
|
|
# If we can't get series info, keep as 'Unknown'
|
|
pass
|
|
|
|
device_info_list.append(device_info)
|
|
|
|
return device_info_list
|
|
|
|
except Exception as e:
|
|
print(f"Warning: Could not get device info from device group: {str(e)}")
|
|
|
|
# Fallback: return basic info based on port_id
|
|
return [{'port_id': port_id, 'series': 'Unknown', 'device_descriptor': None} for port_id in self.port_id]
|
|
|
|
def _chip_id_to_series(self, chip_id):
|
|
"""
|
|
Convert chip ID to series name.
|
|
|
|
Args:
|
|
chip_id: Chip ID from device
|
|
|
|
Returns:
|
|
str: Device series name
|
|
"""
|
|
chip_mapping = {
|
|
'kl520': 'KL520',
|
|
'kl720': 'KL720',
|
|
'kl630': 'KL630',
|
|
'kl730': 'KL730',
|
|
# 'kl540': 'KL540',
|
|
}
|
|
|
|
if isinstance(chip_id, str):
|
|
return chip_mapping.get(chip_id.lower(), 'Unknown')
|
|
|
|
return 'Unknown'
|
|
|
|
def print_device_info(self):
|
|
"""
|
|
Print detailed information about connected devices.
|
|
"""
|
|
devices_info = self.get_device_info()
|
|
|
|
if not devices_info:
|
|
print("No device information available")
|
|
return
|
|
|
|
print(f"\n[Connected Devices - {len(devices_info)} device(s)]")
|
|
for i, device_info in enumerate(devices_info):
|
|
print(f" [{i+1}] Port ID: {device_info['port_id']}, Series: {device_info['series']}")
|
|
|
|
def set_postprocess_options(self, options: PostProcessorOptions):
|
|
"""Update postprocessing options"""
|
|
self.postprocess_options = options
|
|
self.postprocessor = PostProcessor(self.postprocess_options)
|
|
|
|
def get_postprocess_options(self) -> PostProcessorOptions:
|
|
"""Get current postprocessing options"""
|
|
return self.postprocess_options
|
|
|
|
def get_available_postprocess_types(self) -> List[PostProcessType]:
|
|
"""Get list of available postprocessing types"""
|
|
return list(PostProcessType)
|
|
|
|
def __del__(self):
|
|
"""Ensure resources are released when the object is garbage collected."""
|
|
self.stop()
|
|
if self.device_group:
|
|
try:
|
|
kp.core.disconnect_devices(device_group=self.device_group)
|
|
print("Device group disconnected in destructor.")
|
|
except Exception as e:
|
|
print(f"Error disconnecting device group in destructor: {e}")
|
|
|
|
def postprocess(raw_model_output: list) -> float:
|
|
"""
|
|
Legacy postprocess function for backward compatibility.
|
|
Post-processes the raw model output for fire detection.
|
|
Assumes the model output is a list/array where the first element is the desired probability.
|
|
"""
|
|
if raw_model_output is not None and len(raw_model_output) > 0:
|
|
probability = raw_model_output[0]
|
|
return float(probability)
|
|
return 0.0 # Default or error value
|
|
|
|
class WebcamInferenceRunner:
|
|
def __init__(self, multidongle: MultiDongle, image_format: str = 'BGR565'):
|
|
self.multidongle = multidongle
|
|
self.image_format = image_format
|
|
self.latest_result = None
|
|
self.result_str = "No result"
|
|
|
|
# Statistics tracking
|
|
self.processed_inference_count = 0
|
|
self.inference_fps_start_time = None
|
|
self.display_fps_start_time = None
|
|
self.display_frame_counter = 0
|
|
|
|
def run(self, camera_id: int = 0):
|
|
cap = cv2.VideoCapture(camera_id)
|
|
if not cap.isOpened():
|
|
raise RuntimeError("Cannot open webcam")
|
|
|
|
try:
|
|
while True:
|
|
ret, frame = cap.read()
|
|
if not ret:
|
|
break
|
|
|
|
# Track display FPS
|
|
if self.display_fps_start_time is None:
|
|
self.display_fps_start_time = time.time()
|
|
self.display_frame_counter += 1
|
|
|
|
# Preprocess and send frame
|
|
processed_frame = self.multidongle.preprocess_frame(frame, self.image_format)
|
|
self.multidongle.put_input(processed_frame, self.image_format)
|
|
|
|
# Get inference result
|
|
result, result_str = self.multidongle.get_latest_inference_result()
|
|
if result is not None:
|
|
# Track inference FPS
|
|
if self.inference_fps_start_time is None:
|
|
self.inference_fps_start_time = time.time()
|
|
self.processed_inference_count += 1
|
|
|
|
self.latest_result = result
|
|
self.result_str = result_str
|
|
|
|
# Display frame with results
|
|
self._display_results(frame)
|
|
|
|
if cv2.waitKey(1) & 0xFF == ord('q'):
|
|
break
|
|
|
|
finally:
|
|
# self._print_statistics()
|
|
cap.release()
|
|
cv2.destroyAllWindows()
|
|
|
|
def _display_results(self, frame):
|
|
display_frame = frame.copy()
|
|
|
|
# Handle different result types
|
|
if isinstance(self.latest_result, ClassificationResult):
|
|
self._draw_classification_result(display_frame, self.latest_result)
|
|
elif isinstance(self.latest_result, ObjectDetectionResult):
|
|
self._draw_object_detection_result(display_frame, self.latest_result)
|
|
else:
|
|
# Fallback for other result types
|
|
cv2.putText(display_frame, self.result_str,
|
|
(10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2)
|
|
|
|
# Calculate and display inference FPS
|
|
if self.inference_fps_start_time and self.processed_inference_count > 0:
|
|
elapsed_time = time.time() - self.inference_fps_start_time
|
|
if elapsed_time > 0:
|
|
inference_fps = self.processed_inference_count / elapsed_time
|
|
cv2.putText(display_frame, f"Inference FPS: {inference_fps:.2f}",
|
|
(10, 60), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 255), 2)
|
|
|
|
# Window title based on postprocessing type
|
|
window_title = f"Inference - {self.multidongle.postprocess_options.postprocess_type.value}"
|
|
cv2.imshow(window_title, display_frame)
|
|
|
|
def _draw_classification_result(self, frame, result: ClassificationResult):
|
|
"""Draw classification results on frame"""
|
|
color = (0, 255, 0) if result.is_positive else (0, 0, 255)
|
|
|
|
# Main result text
|
|
cv2.putText(frame, f"{result.class_name} (Prob: {result.probability:.2f})",
|
|
(10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, color, 2)
|
|
|
|
# Confidence indicator bar
|
|
bar_width = 200
|
|
bar_height = 20
|
|
bar_x, bar_y = 10, 80
|
|
|
|
# Background bar
|
|
cv2.rectangle(frame, (bar_x, bar_y), (bar_x + bar_width, bar_y + bar_height), (100, 100, 100), -1)
|
|
|
|
# Confidence bar
|
|
confidence_width = int(bar_width * result.probability)
|
|
cv2.rectangle(frame, (bar_x, bar_y), (bar_x + confidence_width, bar_y + bar_height), color, -1)
|
|
|
|
# Threshold line
|
|
threshold_x = int(bar_x + bar_width * result.confidence_threshold)
|
|
cv2.line(frame, (threshold_x, bar_y), (threshold_x, bar_y + bar_height), (255, 255, 255), 2)
|
|
|
|
def _draw_object_detection_result(self, frame, result: ObjectDetectionResult):
|
|
"""Draw enhanced object detection results on frame"""
|
|
if not result.box_list:
|
|
# No objects detected - show message
|
|
no_objects_text = "No objects detected"
|
|
cv2.putText(frame, no_objects_text,
|
|
(10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (100, 100, 100), 2)
|
|
return
|
|
|
|
# Predefined colors for better visibility (BGR format)
|
|
class_colors = [
|
|
(0, 255, 0), # Green
|
|
(0, 0, 255), # Red
|
|
(255, 0, 0), # Blue
|
|
(0, 255, 255), # Yellow
|
|
(255, 0, 255), # Magenta
|
|
(255, 255, 0), # Cyan
|
|
(128, 0, 128), # Purple
|
|
(255, 165, 0), # Orange
|
|
(0, 128, 255), # Orange-Red
|
|
(128, 255, 0), # Spring Green
|
|
]
|
|
|
|
# Draw bounding boxes
|
|
for i, box in enumerate(result.box_list):
|
|
# Use predefined colors cycling through available colors
|
|
color = class_colors[box.class_num % len(class_colors)]
|
|
|
|
# Ensure coordinates are valid
|
|
x1, y1, x2, y2 = max(0, box.x1), max(0, box.y1), box.x2, box.y2
|
|
|
|
# Draw thick bounding box
|
|
cv2.rectangle(frame, (x1, y1), (x2, y2), color, 3)
|
|
|
|
# Draw corner markers for better visibility
|
|
corner_length = 15
|
|
thickness = 3
|
|
# Top-left corner
|
|
cv2.line(frame, (x1, y1), (x1 + corner_length, y1), color, thickness)
|
|
cv2.line(frame, (x1, y1), (x1, y1 + corner_length), color, thickness)
|
|
# Top-right corner
|
|
cv2.line(frame, (x2, y1), (x2 - corner_length, y1), color, thickness)
|
|
cv2.line(frame, (x2, y1), (x2, y1 + corner_length), color, thickness)
|
|
# Bottom-left corner
|
|
cv2.line(frame, (x1, y2), (x1 + corner_length, y2), color, thickness)
|
|
cv2.line(frame, (x1, y2), (x1, y2 - corner_length), color, thickness)
|
|
# Bottom-right corner
|
|
cv2.line(frame, (x2, y2), (x2 - corner_length, y2), color, thickness)
|
|
cv2.line(frame, (x2, y2), (x2, y2 - corner_length), color, thickness)
|
|
|
|
# Draw label with score and confidence indicator
|
|
confidence_indicator = "●" if box.score > 0.7 else "◐" if box.score > 0.5 else "○"
|
|
label = f"{confidence_indicator} {box.class_name}: {box.score:.2f}"
|
|
|
|
# Use larger font for better readability
|
|
font_scale = 0.6
|
|
font_thickness = 2
|
|
label_size = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, font_scale, font_thickness)[0]
|
|
|
|
# Position label above bounding box, but within frame
|
|
label_y = max(y1 - 10, label_size[1] + 10)
|
|
label_x = x1
|
|
|
|
# Semi-transparent label background
|
|
overlay = frame.copy()
|
|
cv2.rectangle(overlay,
|
|
(label_x - 5, label_y - label_size[1] - 8),
|
|
(label_x + label_size[0] + 5, label_y + 5),
|
|
color, -1)
|
|
cv2.addWeighted(overlay, 0.7, frame, 0.3, 0, frame)
|
|
|
|
# Label text with outline for better visibility
|
|
cv2.putText(frame, label, (label_x, label_y),
|
|
cv2.FONT_HERSHEY_SIMPLEX, font_scale, (0, 0, 0), font_thickness + 1) # Black outline
|
|
cv2.putText(frame, label, (label_x, label_y),
|
|
cv2.FONT_HERSHEY_SIMPLEX, font_scale, (255, 255, 255), font_thickness) # White text
|
|
|
|
# Enhanced summary with object breakdown
|
|
object_counts = {}
|
|
for box in result.box_list:
|
|
class_name = box.class_name
|
|
if class_name in object_counts:
|
|
object_counts[class_name] += 1
|
|
else:
|
|
object_counts[class_name] = 1
|
|
|
|
# Multi-line summary display
|
|
y_offset = 30
|
|
cv2.putText(frame, f"Total Objects: {result.box_count}",
|
|
(10, y_offset), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2)
|
|
|
|
# Show breakdown by class (up to 5 classes to avoid clutter)
|
|
displayed_classes = 0
|
|
for class_name, count in sorted(object_counts.items()):
|
|
if displayed_classes >= 5: # Limit to prevent screen clutter
|
|
remaining = len(object_counts) - displayed_classes
|
|
y_offset += 25
|
|
cv2.putText(frame, f"... and {remaining} more classes",
|
|
(10, y_offset), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (200, 200, 200), 1)
|
|
break
|
|
|
|
y_offset += 25
|
|
plural = "s" if count > 1 else ""
|
|
cv2.putText(frame, f" {count} {class_name}{plural}",
|
|
(10, y_offset), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 1)
|
|
displayed_classes += 1
|
|
|
|
# def _print_statistics(self):
|
|
# """Print final statistics"""
|
|
# print(f"\n--- Summary ---")
|
|
# print(f"Total inferences processed: {self.processed_inference_count}")
|
|
|
|
# if self.inference_fps_start_time and self.processed_inference_count > 0:
|
|
# elapsed = time.time() - self.inference_fps_start_time
|
|
# if elapsed > 0:
|
|
# avg_inference_fps = self.processed_inference_count / elapsed
|
|
# print(f"Average Inference FPS: {avg_inference_fps:.2f}")
|
|
|
|
# if self.display_fps_start_time and self.display_frame_counter > 0:
|
|
# elapsed = time.time() - self.display_fps_start_time
|
|
# if elapsed > 0:
|
|
# avg_display_fps = self.display_frame_counter / elapsed
|
|
# print(f"Average Display FPS: {avg_display_fps:.2f}")
|
|
|
|
if __name__ == "__main__":
|
|
PORT_IDS = [28, 32]
|
|
SCPU_FW = r'fw_scpu.bin'
|
|
NCPU_FW = r'fw_ncpu.bin'
|
|
MODEL_PATH = r'fire_detection_520.nef'
|
|
|
|
try:
|
|
# Configure postprocessing options
|
|
# Default: Fire detection (classification)
|
|
postprocess_options = PostProcessorOptions(
|
|
postprocess_type=PostProcessType.FIRE_DETECTION,
|
|
threshold=0.5,
|
|
class_names=["No Fire", "Fire"]
|
|
)
|
|
|
|
# Alternative options for different model types:
|
|
#
|
|
# For YOLO v3 object detection:
|
|
# postprocess_options = PostProcessorOptions(
|
|
# postprocess_type=PostProcessType.YOLO_V3,
|
|
# threshold=0.3,
|
|
# class_names=["person", "bicycle", "car", "motorbike", "aeroplane"],
|
|
# nms_threshold=0.45
|
|
# )
|
|
#
|
|
# For general classification:
|
|
# postprocess_options = PostProcessorOptions(
|
|
# postprocess_type=PostProcessType.CLASSIFICATION,
|
|
# threshold=0.5,
|
|
# class_names=["class1", "class2", "class3"]
|
|
# )
|
|
|
|
# Initialize inference engine with postprocessing options
|
|
print("Initializing MultiDongle...")
|
|
multidongle = MultiDongle(
|
|
port_id=PORT_IDS,
|
|
scpu_fw_path=SCPU_FW,
|
|
ncpu_fw_path=NCPU_FW,
|
|
model_path=MODEL_PATH,
|
|
upload_fw=True,
|
|
postprocess_options=postprocess_options
|
|
)
|
|
multidongle.initialize()
|
|
multidongle.start()
|
|
|
|
print(f"Postprocessing type: {postprocess_options.postprocess_type.value}")
|
|
print(f"Available types: {[t.value for t in multidongle.get_available_postprocess_types()]}")
|
|
|
|
# Run using the new runner class
|
|
print("Starting webcam inference...")
|
|
runner = WebcamInferenceRunner(multidongle, 'BGR565')
|
|
runner.run()
|
|
|
|
except Exception as e:
|
|
print(f"Error: {e}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
finally:
|
|
if 'multidongle' in locals():
|
|
multidongle.stop()
|