Merge branch 'developer' of github.com:HuangMason320/cluster4npu into developer
This commit is contained in:
commit
bfac50f066
@ -13,6 +13,7 @@ from abc import ABC, abstractmethod
|
|||||||
from typing import Callable, Optional, Any, Dict, List
|
from typing import Callable, Optional, Any, Dict, List
|
||||||
from dataclasses import dataclass
|
from dataclasses import dataclass
|
||||||
from collections import defaultdict
|
from collections import defaultdict
|
||||||
|
from enum import Enum
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
@ -23,12 +24,55 @@ class InferenceTask:
|
|||||||
timestamp: float
|
timestamp: float
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class BoundingBox:
|
||||||
|
"""Bounding box descriptor for object detection results"""
|
||||||
|
x1: int = 0
|
||||||
|
y1: int = 0
|
||||||
|
x2: int = 0
|
||||||
|
y2: int = 0
|
||||||
|
score: float = 0.0
|
||||||
|
class_num: int = 0
|
||||||
|
class_name: str = ""
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class ObjectDetectionResult:
|
||||||
|
"""Object detection result descriptor"""
|
||||||
|
class_count: int = 0
|
||||||
|
box_count: int = 0
|
||||||
|
box_list: List[BoundingBox] = None
|
||||||
|
|
||||||
|
def __post_init__(self):
|
||||||
|
if self.box_list is None:
|
||||||
|
self.box_list = []
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class ClassificationResult:
|
||||||
|
"""Classification result descriptor"""
|
||||||
|
probability: float = 0.0
|
||||||
|
class_name: str = ""
|
||||||
|
class_num: int = 0
|
||||||
|
confidence_threshold: float = 0.5
|
||||||
|
|
||||||
|
@property
|
||||||
|
def is_positive(self) -> bool:
|
||||||
|
return self.probability > self.confidence_threshold
|
||||||
|
|
||||||
|
class PostProcessType(Enum):
|
||||||
|
"""Enumeration of available postprocessing types"""
|
||||||
|
FIRE_DETECTION = "fire_detection"
|
||||||
|
YOLO_V3 = "yolo_v3"
|
||||||
|
YOLO_V5 = "yolo_v5"
|
||||||
|
CLASSIFICATION = "classification"
|
||||||
|
RAW_OUTPUT = "raw_output"
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
class InferenceResult:
|
class InferenceResult:
|
||||||
sequence_id: int
|
sequence_id: int
|
||||||
result: Any
|
result: Any
|
||||||
series_name: str
|
series_name: str
|
||||||
timestamp: float
|
timestamp: float
|
||||||
|
postprocess_type: PostProcessType = PostProcessType.RAW_OUTPUT
|
||||||
|
|
||||||
|
|
||||||
class DongleSeriesSpec:
|
class DongleSeriesSpec:
|
||||||
@ -78,17 +122,141 @@ class PreProcessor(DataProcessor):
|
|||||||
return frame
|
return frame
|
||||||
|
|
||||||
|
|
||||||
|
class PostProcessorOptions:
|
||||||
|
"""Configuration for postprocessing options"""
|
||||||
|
|
||||||
|
def __init__(self,
|
||||||
|
postprocess_type: PostProcessType = PostProcessType.FIRE_DETECTION,
|
||||||
|
threshold: float = 0.5,
|
||||||
|
class_names: List[str] = None,
|
||||||
|
nms_threshold: float = 0.45,
|
||||||
|
max_detections_per_class: int = 100):
|
||||||
|
self.postprocess_type = postprocess_type
|
||||||
|
self.threshold = threshold
|
||||||
|
self.class_names = class_names or []
|
||||||
|
self.nms_threshold = nms_threshold
|
||||||
|
self.max_detections_per_class = max_detections_per_class
|
||||||
|
|
||||||
class PostProcessor(DataProcessor):
|
class PostProcessor(DataProcessor):
|
||||||
"""Post-processor for handling output data from inference stages"""
|
"""Post-processor for handling output data from inference stages"""
|
||||||
|
|
||||||
def __init__(self, process_fn: Optional[Callable] = None):
|
def __init__(self, options: PostProcessorOptions = None):
|
||||||
self.process_fn = process_fn or self._default_process
|
self.options = options or PostProcessorOptions()
|
||||||
|
|
||||||
def process(self, data: Any, *args, **kwargs) -> Any:
|
def process(self, data: Any, *args, **kwargs) -> Any:
|
||||||
"""Process inference output data"""
|
"""Process inference output data based on configured type"""
|
||||||
return self.process_fn(data, *args, **kwargs)
|
if self.options.postprocess_type == PostProcessType.FIRE_DETECTION:
|
||||||
|
return self._process_fire_detection(data, *args, **kwargs)
|
||||||
|
elif self.options.postprocess_type == PostProcessType.CLASSIFICATION:
|
||||||
|
return self._process_classification(data, *args, **kwargs)
|
||||||
|
elif self.options.postprocess_type == PostProcessType.YOLO_V3:
|
||||||
|
return self._process_yolo_v3(data, *args, **kwargs)
|
||||||
|
elif self.options.postprocess_type == PostProcessType.YOLO_V5:
|
||||||
|
return self._process_yolo_v5(data, *args, **kwargs)
|
||||||
|
else:
|
||||||
|
return self._process_raw_output(data, *args, **kwargs)
|
||||||
|
|
||||||
def _default_process(self, data: Any, *args, **kwargs) -> Any:
|
def _process_fire_detection(self, raw_output: Any, *args, **kwargs) -> ClassificationResult:
|
||||||
|
"""Process fire detection output"""
|
||||||
|
if hasattr(raw_output, 'size') and raw_output.size > 0:
|
||||||
|
probability = float(raw_output.flatten()[0]) if raw_output.size > 0 else 0.0
|
||||||
|
elif isinstance(raw_output, (list, tuple)) and len(raw_output) > 0:
|
||||||
|
probability = float(raw_output[0])
|
||||||
|
else:
|
||||||
|
probability = 0.0
|
||||||
|
|
||||||
|
class_name = "Fire" if probability > self.options.threshold else "No Fire"
|
||||||
|
return ClassificationResult(
|
||||||
|
probability=probability,
|
||||||
|
class_name=class_name,
|
||||||
|
class_num=1 if probability > self.options.threshold else 0,
|
||||||
|
confidence_threshold=self.options.threshold
|
||||||
|
)
|
||||||
|
|
||||||
|
def _process_classification(self, raw_output: Any, *args, **kwargs) -> ClassificationResult:
|
||||||
|
"""Process general classification output"""
|
||||||
|
if hasattr(raw_output, 'flatten'):
|
||||||
|
output_array = raw_output.flatten()
|
||||||
|
elif isinstance(raw_output, (list, tuple)):
|
||||||
|
output_array = np.array(raw_output)
|
||||||
|
else:
|
||||||
|
return ClassificationResult()
|
||||||
|
|
||||||
|
if len(output_array) == 0:
|
||||||
|
return ClassificationResult()
|
||||||
|
|
||||||
|
if len(output_array) == 1:
|
||||||
|
# Binary classification
|
||||||
|
probability = float(output_array[0])
|
||||||
|
class_num = 1 if probability > self.options.threshold else 0
|
||||||
|
else:
|
||||||
|
# Multi-class classification
|
||||||
|
class_num = int(np.argmax(output_array))
|
||||||
|
probability = float(output_array[class_num])
|
||||||
|
|
||||||
|
class_name = self.options.class_names[class_num] if class_num < len(self.options.class_names) else f"Class_{class_num}"
|
||||||
|
|
||||||
|
return ClassificationResult(
|
||||||
|
probability=probability,
|
||||||
|
class_name=class_name,
|
||||||
|
class_num=class_num,
|
||||||
|
confidence_threshold=self.options.threshold
|
||||||
|
)
|
||||||
|
|
||||||
|
def _process_yolo_v3(self, inference_output_list: List, hardware_preproc_info=None, *args, **kwargs) -> ObjectDetectionResult:
|
||||||
|
"""Process YOLO v3 output for object detection"""
|
||||||
|
# Simplified YOLO v3 postprocessing (built-in version)
|
||||||
|
# This is a basic implementation - for full functionality, refer to Kneron examples
|
||||||
|
return self._process_yolo_generic(inference_output_list, hardware_preproc_info, version="v3")
|
||||||
|
|
||||||
|
def _process_yolo_v5(self, inference_output_list: List, hardware_preproc_info=None, *args, **kwargs) -> ObjectDetectionResult:
|
||||||
|
"""Process YOLO v5 output for object detection"""
|
||||||
|
# Simplified YOLO v5 postprocessing (built-in version)
|
||||||
|
return self._process_yolo_generic(inference_output_list, hardware_preproc_info, version="v5")
|
||||||
|
|
||||||
|
def _process_yolo_generic(self, inference_output_list: List, hardware_preproc_info=None, version="v3") -> ObjectDetectionResult:
|
||||||
|
"""Generic YOLO postprocessing - simplified version"""
|
||||||
|
# This is a basic implementation for demonstration
|
||||||
|
# For production use, implement full YOLO postprocessing based on Kneron examples
|
||||||
|
boxes = []
|
||||||
|
|
||||||
|
try:
|
||||||
|
if inference_output_list and len(inference_output_list) > 0:
|
||||||
|
# Basic bounding box extraction (simplified)
|
||||||
|
# In a real implementation, this would include proper anchor handling, NMS, etc.
|
||||||
|
for output in inference_output_list:
|
||||||
|
if hasattr(output, 'ndarray'):
|
||||||
|
arr = output.ndarray
|
||||||
|
elif hasattr(output, 'flatten'):
|
||||||
|
arr = output
|
||||||
|
else:
|
||||||
|
continue
|
||||||
|
|
||||||
|
# Simplified box extraction - this is just a placeholder
|
||||||
|
# Real implementation would parse YOLO output format properly
|
||||||
|
if arr.size >= 6: # Basic check for minimum box data
|
||||||
|
flat = arr.flatten()
|
||||||
|
if len(flat) >= 6 and flat[4] > self.options.threshold: # confidence check
|
||||||
|
box = BoundingBox(
|
||||||
|
x1=max(0, int(flat[0])),
|
||||||
|
y1=max(0, int(flat[1])),
|
||||||
|
x2=int(flat[2]),
|
||||||
|
y2=int(flat[3]),
|
||||||
|
score=float(flat[4]),
|
||||||
|
class_num=int(flat[5]) if len(flat) > 5 else 0,
|
||||||
|
class_name=self.options.class_names[int(flat[5])] if int(flat[5]) < len(self.options.class_names) else f"Object_{int(flat[5])}"
|
||||||
|
)
|
||||||
|
boxes.append(box)
|
||||||
|
except Exception as e:
|
||||||
|
print(f"Warning: YOLO postprocessing error: {e}")
|
||||||
|
|
||||||
|
return ObjectDetectionResult(
|
||||||
|
class_count=len(self.options.class_names) if self.options.class_names else 1,
|
||||||
|
box_count=len(boxes),
|
||||||
|
box_list=boxes
|
||||||
|
)
|
||||||
|
|
||||||
|
def _process_raw_output(self, data: Any, *args, **kwargs) -> Any:
|
||||||
"""Default post-processing - returns data unchanged"""
|
"""Default post-processing - returns data unchanged"""
|
||||||
return data
|
return data
|
||||||
|
|
||||||
@ -256,7 +424,8 @@ class MultiDongle:
|
|||||||
|
|
||||||
def __init__(self, port_id: list = None, scpu_fw_path: str = None, ncpu_fw_path: str = None,
|
def __init__(self, port_id: list = None, scpu_fw_path: str = None, ncpu_fw_path: str = None,
|
||||||
model_path: str = None, upload_fw: bool = False, auto_detect: bool = False,
|
model_path: str = None, upload_fw: bool = False, auto_detect: bool = False,
|
||||||
max_queue_size: int = 0, multi_series_config: dict = None):
|
max_queue_size: int = 0, multi_series_config: dict = None,
|
||||||
|
postprocess_options: PostProcessorOptions = None):
|
||||||
"""
|
"""
|
||||||
Initialize the MultiDongle class with support for both single and multi-series configurations.
|
Initialize the MultiDongle class with support for both single and multi-series configurations.
|
||||||
|
|
||||||
@ -278,7 +447,12 @@ class MultiDongle:
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
:param postprocess_options: PostProcessorOptions for configuring output processing
|
||||||
"""
|
"""
|
||||||
|
# Set up postprocessing
|
||||||
|
self.postprocess_options = postprocess_options or PostProcessorOptions()
|
||||||
|
self.postprocessor = PostProcessor(self.postprocess_options)
|
||||||
|
|
||||||
# Determine if we're using multi-series mode
|
# Determine if we're using multi-series mode
|
||||||
self.multi_series_mode = multi_series_config is not None
|
self.multi_series_mode = multi_series_config is not None
|
||||||
|
|
||||||
@ -701,10 +875,10 @@ class MultiDongle:
|
|||||||
else:
|
else:
|
||||||
return resized_frame # RAW8 or other formats
|
return resized_frame # RAW8 or other formats
|
||||||
|
|
||||||
def get_latest_inference_result(self, timeout: float = 0.01) -> Tuple[float, str]:
|
def get_latest_inference_result(self, timeout: float = 0.01) -> Tuple[Any, str]:
|
||||||
"""
|
"""
|
||||||
Get the latest inference result
|
Get the latest inference result with postprocessing
|
||||||
Returns: (probability, result_string) or (None, None) if no result
|
Returns: (processed_result, result_string) or (None, None) if no result
|
||||||
"""
|
"""
|
||||||
output_descriptor = self.get_output(timeout=timeout)
|
output_descriptor = self.get_output(timeout=timeout)
|
||||||
if not output_descriptor:
|
if not output_descriptor:
|
||||||
@ -725,7 +899,7 @@ class MultiDongle:
|
|||||||
generic_raw_result=output_descriptor,
|
generic_raw_result=output_descriptor,
|
||||||
channels_ordering=kp.ChannelOrdering.KP_CHANNEL_ORDERING_CHW
|
channels_ordering=kp.ChannelOrdering.KP_CHANNEL_ORDERING_CHW
|
||||||
)
|
)
|
||||||
inf_node_output_list.append(inference_float_node_output.ndarray.copy())
|
inf_node_output_list.append(inference_float_node_output)
|
||||||
except kp.ApiKPException as e:
|
except kp.ApiKPException as e:
|
||||||
retrieval_successful = False
|
retrieval_successful = False
|
||||||
break
|
break
|
||||||
@ -734,20 +908,43 @@ class MultiDongle:
|
|||||||
break
|
break
|
||||||
|
|
||||||
if retrieval_successful and len(inf_node_output_list) > 0:
|
if retrieval_successful and len(inf_node_output_list) > 0:
|
||||||
# Process output nodes
|
# Get hardware preprocessing info for YOLO models
|
||||||
if output_descriptor.header.num_output_node == 1:
|
hardware_preproc_info = None
|
||||||
raw_output_array = inf_node_output_list[0].flatten()
|
if hasattr(output_descriptor.header, 'hw_pre_proc_info_list') and len(output_descriptor.header.hw_pre_proc_info_list) > 0:
|
||||||
|
hardware_preproc_info = output_descriptor.header.hw_pre_proc_info_list[0]
|
||||||
|
|
||||||
|
# Process with configured postprocessor
|
||||||
|
if self.postprocess_options.postprocess_type in [PostProcessType.YOLO_V3, PostProcessType.YOLO_V5]:
|
||||||
|
# For YOLO models, pass the full output list and hardware info
|
||||||
|
processed_result = self.postprocessor.process(inf_node_output_list, hardware_preproc_info=hardware_preproc_info)
|
||||||
else:
|
else:
|
||||||
concatenated_outputs = [arr.flatten() for arr in inf_node_output_list]
|
# For classification models, process the raw array
|
||||||
|
if output_descriptor.header.num_output_node == 1:
|
||||||
|
raw_output_array = inf_node_output_list[0].ndarray.flatten()
|
||||||
|
else:
|
||||||
|
concatenated_outputs = [node.ndarray.flatten() for node in inf_node_output_list]
|
||||||
raw_output_array = np.concatenate(concatenated_outputs) if concatenated_outputs else np.array([])
|
raw_output_array = np.concatenate(concatenated_outputs) if concatenated_outputs else np.array([])
|
||||||
|
|
||||||
if raw_output_array.size > 0:
|
processed_result = self.postprocessor.process(raw_output_array)
|
||||||
probability = postprocess(raw_output_array)
|
|
||||||
result_str = "Fire" if probability > 0.5 else "No Fire"
|
# Generate result string based on output type
|
||||||
return probability, result_str
|
result_str = self._generate_result_string(processed_result)
|
||||||
|
return processed_result, result_str
|
||||||
|
|
||||||
return None, None
|
return None, None
|
||||||
|
|
||||||
|
def _generate_result_string(self, processed_result: Any) -> str:
|
||||||
|
"""Generate a human-readable result string from processed output"""
|
||||||
|
if isinstance(processed_result, ClassificationResult):
|
||||||
|
return f"{processed_result.class_name} (Prob: {processed_result.probability:.2f})"
|
||||||
|
elif isinstance(processed_result, ObjectDetectionResult):
|
||||||
|
if processed_result.box_count == 0:
|
||||||
|
return "No objects detected"
|
||||||
|
else:
|
||||||
|
return f"Detected {processed_result.box_count} object(s)"
|
||||||
|
else:
|
||||||
|
return str(processed_result)
|
||||||
|
|
||||||
|
|
||||||
# Modified _send_thread_func to get data from input queue
|
# Modified _send_thread_func to get data from input queue
|
||||||
def _send_thread_func(self):
|
def _send_thread_func(self):
|
||||||
@ -1326,6 +1523,19 @@ class MultiDongle:
|
|||||||
for i, device_info in enumerate(devices_info):
|
for i, device_info in enumerate(devices_info):
|
||||||
print(f" [{i+1}] Port ID: {device_info['port_id']}, Series: {device_info['series']}")
|
print(f" [{i+1}] Port ID: {device_info['port_id']}, Series: {device_info['series']}")
|
||||||
|
|
||||||
|
def set_postprocess_options(self, options: PostProcessorOptions):
|
||||||
|
"""Update postprocessing options"""
|
||||||
|
self.postprocess_options = options
|
||||||
|
self.postprocessor = PostProcessor(self.postprocess_options)
|
||||||
|
|
||||||
|
def get_postprocess_options(self) -> PostProcessorOptions:
|
||||||
|
"""Get current postprocessing options"""
|
||||||
|
return self.postprocess_options
|
||||||
|
|
||||||
|
def get_available_postprocess_types(self) -> List[PostProcessType]:
|
||||||
|
"""Get list of available postprocessing types"""
|
||||||
|
return list(PostProcessType)
|
||||||
|
|
||||||
def __del__(self):
|
def __del__(self):
|
||||||
"""Ensure resources are released when the object is garbage collected."""
|
"""Ensure resources are released when the object is garbage collected."""
|
||||||
self.stop()
|
self.stop()
|
||||||
@ -1338,7 +1548,8 @@ class MultiDongle:
|
|||||||
|
|
||||||
def postprocess(raw_model_output: list) -> float:
|
def postprocess(raw_model_output: list) -> float:
|
||||||
"""
|
"""
|
||||||
Post-processes the raw model output.
|
Legacy postprocess function for backward compatibility.
|
||||||
|
Post-processes the raw model output for fire detection.
|
||||||
Assumes the model output is a list/array where the first element is the desired probability.
|
Assumes the model output is a list/array where the first element is the desired probability.
|
||||||
"""
|
"""
|
||||||
if raw_model_output is not None and len(raw_model_output) > 0:
|
if raw_model_output is not None and len(raw_model_output) > 0:
|
||||||
@ -1350,8 +1561,8 @@ class WebcamInferenceRunner:
|
|||||||
def __init__(self, multidongle: MultiDongle, image_format: str = 'BGR565'):
|
def __init__(self, multidongle: MultiDongle, image_format: str = 'BGR565'):
|
||||||
self.multidongle = multidongle
|
self.multidongle = multidongle
|
||||||
self.image_format = image_format
|
self.image_format = image_format
|
||||||
self.latest_probability = 0.0
|
self.latest_result = None
|
||||||
self.result_str = "No Fire"
|
self.result_str = "No result"
|
||||||
|
|
||||||
# Statistics tracking
|
# Statistics tracking
|
||||||
self.processed_inference_count = 0
|
self.processed_inference_count = 0
|
||||||
@ -1380,15 +1591,15 @@ class WebcamInferenceRunner:
|
|||||||
self.multidongle.put_input(processed_frame, self.image_format)
|
self.multidongle.put_input(processed_frame, self.image_format)
|
||||||
|
|
||||||
# Get inference result
|
# Get inference result
|
||||||
prob, result = self.multidongle.get_latest_inference_result()
|
result, result_str = self.multidongle.get_latest_inference_result()
|
||||||
if prob is not None:
|
if result is not None:
|
||||||
# Track inference FPS
|
# Track inference FPS
|
||||||
if self.inference_fps_start_time is None:
|
if self.inference_fps_start_time is None:
|
||||||
self.inference_fps_start_time = time.time()
|
self.inference_fps_start_time = time.time()
|
||||||
self.processed_inference_count += 1
|
self.processed_inference_count += 1
|
||||||
|
|
||||||
self.latest_probability = prob
|
self.latest_result = result
|
||||||
self.result_str = result
|
self.result_str = result_str
|
||||||
|
|
||||||
# Display frame with results
|
# Display frame with results
|
||||||
self._display_results(frame)
|
self._display_results(frame)
|
||||||
@ -1403,11 +1614,16 @@ class WebcamInferenceRunner:
|
|||||||
|
|
||||||
def _display_results(self, frame):
|
def _display_results(self, frame):
|
||||||
display_frame = frame.copy()
|
display_frame = frame.copy()
|
||||||
text_color = (0, 255, 0) if "Fire" in self.result_str else (0, 0, 255)
|
|
||||||
|
|
||||||
# Display inference result
|
# Handle different result types
|
||||||
cv2.putText(display_frame, f"{self.result_str} (Prob: {self.latest_probability:.2f})",
|
if isinstance(self.latest_result, ClassificationResult):
|
||||||
(10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, text_color, 2)
|
self._draw_classification_result(display_frame, self.latest_result)
|
||||||
|
elif isinstance(self.latest_result, ObjectDetectionResult):
|
||||||
|
self._draw_object_detection_result(display_frame, self.latest_result)
|
||||||
|
else:
|
||||||
|
# Fallback for other result types
|
||||||
|
cv2.putText(display_frame, self.result_str,
|
||||||
|
(10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2)
|
||||||
|
|
||||||
# Calculate and display inference FPS
|
# Calculate and display inference FPS
|
||||||
if self.inference_fps_start_time and self.processed_inference_count > 0:
|
if self.inference_fps_start_time and self.processed_inference_count > 0:
|
||||||
@ -1417,7 +1633,66 @@ class WebcamInferenceRunner:
|
|||||||
cv2.putText(display_frame, f"Inference FPS: {inference_fps:.2f}",
|
cv2.putText(display_frame, f"Inference FPS: {inference_fps:.2f}",
|
||||||
(10, 60), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 255), 2)
|
(10, 60), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 255), 2)
|
||||||
|
|
||||||
cv2.imshow('Fire Detection', display_frame)
|
# Window title based on postprocessing type
|
||||||
|
window_title = f"Inference - {self.multidongle.postprocess_options.postprocess_type.value}"
|
||||||
|
cv2.imshow(window_title, display_frame)
|
||||||
|
|
||||||
|
def _draw_classification_result(self, frame, result: ClassificationResult):
|
||||||
|
"""Draw classification results on frame"""
|
||||||
|
color = (0, 255, 0) if result.is_positive else (0, 0, 255)
|
||||||
|
|
||||||
|
# Main result text
|
||||||
|
cv2.putText(frame, f"{result.class_name} (Prob: {result.probability:.2f})",
|
||||||
|
(10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, color, 2)
|
||||||
|
|
||||||
|
# Confidence indicator bar
|
||||||
|
bar_width = 200
|
||||||
|
bar_height = 20
|
||||||
|
bar_x, bar_y = 10, 80
|
||||||
|
|
||||||
|
# Background bar
|
||||||
|
cv2.rectangle(frame, (bar_x, bar_y), (bar_x + bar_width, bar_y + bar_height), (100, 100, 100), -1)
|
||||||
|
|
||||||
|
# Confidence bar
|
||||||
|
confidence_width = int(bar_width * result.probability)
|
||||||
|
cv2.rectangle(frame, (bar_x, bar_y), (bar_x + confidence_width, bar_y + bar_height), color, -1)
|
||||||
|
|
||||||
|
# Threshold line
|
||||||
|
threshold_x = int(bar_x + bar_width * result.confidence_threshold)
|
||||||
|
cv2.line(frame, (threshold_x, bar_y), (threshold_x, bar_y + bar_height), (255, 255, 255), 2)
|
||||||
|
|
||||||
|
def _draw_object_detection_result(self, frame, result: ObjectDetectionResult):
|
||||||
|
"""Draw object detection results on frame"""
|
||||||
|
# Draw bounding boxes
|
||||||
|
for i, box in enumerate(result.box_list):
|
||||||
|
# Color based on class
|
||||||
|
b = 100 + (25 * box.class_num) % 156
|
||||||
|
g = 100 + (80 + 40 * box.class_num) % 156
|
||||||
|
r = 100 + (120 + 60 * box.class_num) % 156
|
||||||
|
color = (b, g, r)
|
||||||
|
|
||||||
|
# Draw bounding box
|
||||||
|
cv2.rectangle(frame, (box.x1, box.y1), (box.x2, box.y2), color, 2)
|
||||||
|
|
||||||
|
# Draw label with score
|
||||||
|
label = f"{box.class_name}: {box.score:.2f}"
|
||||||
|
label_size = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.5, 1)[0]
|
||||||
|
|
||||||
|
# Label background
|
||||||
|
cv2.rectangle(frame,
|
||||||
|
(box.x1, box.y1 - label_size[1] - 10),
|
||||||
|
(box.x1 + label_size[0], box.y1),
|
||||||
|
color, -1)
|
||||||
|
|
||||||
|
# Label text
|
||||||
|
cv2.putText(frame, label,
|
||||||
|
(box.x1, box.y1 - 5),
|
||||||
|
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1)
|
||||||
|
|
||||||
|
# Summary text
|
||||||
|
summary_text = f"Objects: {result.box_count}"
|
||||||
|
cv2.putText(frame, summary_text,
|
||||||
|
(10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2)
|
||||||
|
|
||||||
# def _print_statistics(self):
|
# def _print_statistics(self):
|
||||||
# """Print final statistics"""
|
# """Print final statistics"""
|
||||||
@ -1443,12 +1718,47 @@ if __name__ == "__main__":
|
|||||||
MODEL_PATH = r'fire_detection_520.nef'
|
MODEL_PATH = r'fire_detection_520.nef'
|
||||||
|
|
||||||
try:
|
try:
|
||||||
# Initialize inference engine
|
# Configure postprocessing options
|
||||||
|
# Default: Fire detection (classification)
|
||||||
|
postprocess_options = PostProcessorOptions(
|
||||||
|
postprocess_type=PostProcessType.FIRE_DETECTION,
|
||||||
|
threshold=0.5,
|
||||||
|
class_names=["No Fire", "Fire"]
|
||||||
|
)
|
||||||
|
|
||||||
|
# Alternative options for different model types:
|
||||||
|
#
|
||||||
|
# For YOLO v3 object detection:
|
||||||
|
# postprocess_options = PostProcessorOptions(
|
||||||
|
# postprocess_type=PostProcessType.YOLO_V3,
|
||||||
|
# threshold=0.3,
|
||||||
|
# class_names=["person", "bicycle", "car", "motorbike", "aeroplane"],
|
||||||
|
# nms_threshold=0.45
|
||||||
|
# )
|
||||||
|
#
|
||||||
|
# For general classification:
|
||||||
|
# postprocess_options = PostProcessorOptions(
|
||||||
|
# postprocess_type=PostProcessType.CLASSIFICATION,
|
||||||
|
# threshold=0.5,
|
||||||
|
# class_names=["class1", "class2", "class3"]
|
||||||
|
# )
|
||||||
|
|
||||||
|
# Initialize inference engine with postprocessing options
|
||||||
print("Initializing MultiDongle...")
|
print("Initializing MultiDongle...")
|
||||||
multidongle = MultiDongle(PORT_IDS, SCPU_FW, NCPU_FW, MODEL_PATH, upload_fw=True)
|
multidongle = MultiDongle(
|
||||||
|
port_id=PORT_IDS,
|
||||||
|
scpu_fw_path=SCPU_FW,
|
||||||
|
ncpu_fw_path=NCPU_FW,
|
||||||
|
model_path=MODEL_PATH,
|
||||||
|
upload_fw=True,
|
||||||
|
postprocess_options=postprocess_options
|
||||||
|
)
|
||||||
multidongle.initialize()
|
multidongle.initialize()
|
||||||
multidongle.start()
|
multidongle.start()
|
||||||
|
|
||||||
|
print(f"Postprocessing type: {postprocess_options.postprocess_type.value}")
|
||||||
|
print(f"Available types: {[t.value for t in multidongle.get_available_postprocess_types()]}")
|
||||||
|
|
||||||
# Run using the new runner class
|
# Run using the new runner class
|
||||||
print("Starting webcam inference...")
|
print("Starting webcam inference...")
|
||||||
runner = WebcamInferenceRunner(multidongle, 'BGR565')
|
runner = WebcamInferenceRunner(multidongle, 'BGR565')
|
||||||
|
|||||||
@ -19,6 +19,7 @@ Usage:
|
|||||||
"""
|
"""
|
||||||
|
|
||||||
from .base_node import BaseNodeWithProperties
|
from .base_node import BaseNodeWithProperties
|
||||||
|
from ..functions.Multidongle import PostProcessType, PostProcessorOptions
|
||||||
|
|
||||||
|
|
||||||
class PostprocessNode(BaseNodeWithProperties):
|
class PostprocessNode(BaseNodeWithProperties):
|
||||||
@ -45,6 +46,17 @@ class PostprocessNode(BaseNodeWithProperties):
|
|||||||
|
|
||||||
def setup_properties(self):
|
def setup_properties(self):
|
||||||
"""Initialize postprocessing-specific properties."""
|
"""Initialize postprocessing-specific properties."""
|
||||||
|
# Postprocessing type - NEW: Integration with MultiDongle postprocessing
|
||||||
|
self.create_business_property('postprocess_type', 'fire_detection', [
|
||||||
|
'fire_detection', 'yolo_v3', 'yolo_v5', 'classification', 'raw_output'
|
||||||
|
])
|
||||||
|
|
||||||
|
# Class names for postprocessing
|
||||||
|
self.create_business_property('class_names', 'No Fire,Fire', {
|
||||||
|
'placeholder': 'comma-separated class names',
|
||||||
|
'description': 'Class names for model output (e.g., "No Fire,Fire" or "person,car,bicycle")'
|
||||||
|
})
|
||||||
|
|
||||||
# Output format
|
# Output format
|
||||||
self.create_business_property('output_format', 'JSON', [
|
self.create_business_property('output_format', 'JSON', [
|
||||||
'JSON', 'XML', 'CSV', 'Binary', 'MessagePack', 'YAML'
|
'JSON', 'XML', 'CSV', 'Binary', 'MessagePack', 'YAML'
|
||||||
@ -179,6 +191,33 @@ class PostprocessNode(BaseNodeWithProperties):
|
|||||||
|
|
||||||
return True, ""
|
return True, ""
|
||||||
|
|
||||||
|
def get_multidongle_postprocess_options(self) -> 'PostProcessorOptions':
|
||||||
|
"""Create PostProcessorOptions from node configuration."""
|
||||||
|
postprocess_type_str = self.get_property('postprocess_type')
|
||||||
|
|
||||||
|
# Map string to enum
|
||||||
|
type_mapping = {
|
||||||
|
'fire_detection': PostProcessType.FIRE_DETECTION,
|
||||||
|
'yolo_v3': PostProcessType.YOLO_V3,
|
||||||
|
'yolo_v5': PostProcessType.YOLO_V5,
|
||||||
|
'classification': PostProcessType.CLASSIFICATION,
|
||||||
|
'raw_output': PostProcessType.RAW_OUTPUT
|
||||||
|
}
|
||||||
|
|
||||||
|
postprocess_type = type_mapping.get(postprocess_type_str, PostProcessType.FIRE_DETECTION)
|
||||||
|
|
||||||
|
# Parse class names
|
||||||
|
class_names_str = self.get_property('class_names')
|
||||||
|
class_names = [name.strip() for name in class_names_str.split(',') if name.strip()] if class_names_str else []
|
||||||
|
|
||||||
|
return PostProcessorOptions(
|
||||||
|
postprocess_type=postprocess_type,
|
||||||
|
threshold=self.get_property('confidence_threshold'),
|
||||||
|
class_names=class_names,
|
||||||
|
nms_threshold=self.get_property('nms_threshold'),
|
||||||
|
max_detections_per_class=self.get_property('max_detections')
|
||||||
|
)
|
||||||
|
|
||||||
def get_postprocessing_config(self) -> dict:
|
def get_postprocessing_config(self) -> dict:
|
||||||
"""
|
"""
|
||||||
Get postprocessing configuration for pipeline execution.
|
Get postprocessing configuration for pipeline execution.
|
||||||
@ -189,6 +228,11 @@ class PostprocessNode(BaseNodeWithProperties):
|
|||||||
return {
|
return {
|
||||||
'node_id': self.id,
|
'node_id': self.id,
|
||||||
'node_name': self.name(),
|
'node_name': self.name(),
|
||||||
|
# NEW: MultiDongle postprocessing integration
|
||||||
|
'postprocess_type': self.get_property('postprocess_type'),
|
||||||
|
'class_names': self._parse_class_list(self.get_property('class_names')),
|
||||||
|
'multidongle_options': self.get_multidongle_postprocess_options(),
|
||||||
|
# Original properties
|
||||||
'output_format': self.get_property('output_format'),
|
'output_format': self.get_property('output_format'),
|
||||||
'confidence_threshold': self.get_property('confidence_threshold'),
|
'confidence_threshold': self.get_property('confidence_threshold'),
|
||||||
'enable_confidence_filter': self.get_property('enable_confidence_filter'),
|
'enable_confidence_filter': self.get_property('enable_confidence_filter'),
|
||||||
|
|||||||
141
example_postprocess_options.py
Normal file
141
example_postprocess_options.py
Normal file
@ -0,0 +1,141 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
"""
|
||||||
|
Example demonstrating the new default postprocess options in the app.
|
||||||
|
|
||||||
|
This script shows how to use the different postprocessing types:
|
||||||
|
- Fire detection (classification)
|
||||||
|
- YOLO v3/v5 (object detection with bounding boxes)
|
||||||
|
- General classification
|
||||||
|
- Raw output
|
||||||
|
|
||||||
|
The postprocessing options are built-in to the app and provide text output
|
||||||
|
and bounding box visualization in live view windows.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import sys
|
||||||
|
import os
|
||||||
|
|
||||||
|
# Add the project root to Python path
|
||||||
|
sys.path.insert(0, os.path.dirname(__file__))
|
||||||
|
|
||||||
|
from core.functions.Multidongle import (
|
||||||
|
MultiDongle,
|
||||||
|
PostProcessorOptions,
|
||||||
|
PostProcessType,
|
||||||
|
WebcamInferenceRunner
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def demo_fire_detection():
|
||||||
|
"""Demo fire detection postprocessing (default)"""
|
||||||
|
print("=== Fire Detection Demo ===")
|
||||||
|
|
||||||
|
# Configure for fire detection
|
||||||
|
options = PostProcessorOptions(
|
||||||
|
postprocess_type=PostProcessType.FIRE_DETECTION,
|
||||||
|
threshold=0.5,
|
||||||
|
class_names=["No Fire", "Fire"]
|
||||||
|
)
|
||||||
|
|
||||||
|
print(f"Postprocess type: {options.postprocess_type.value}")
|
||||||
|
print(f"Threshold: {options.threshold}")
|
||||||
|
print(f"Class names: {options.class_names}")
|
||||||
|
return options
|
||||||
|
|
||||||
|
|
||||||
|
def demo_yolo_object_detection():
|
||||||
|
"""Demo YOLO object detection with bounding boxes"""
|
||||||
|
print("=== YOLO Object Detection Demo ===")
|
||||||
|
|
||||||
|
# Configure for YOLO v5 object detection
|
||||||
|
options = PostProcessorOptions(
|
||||||
|
postprocess_type=PostProcessType.YOLO_V5,
|
||||||
|
threshold=0.3,
|
||||||
|
class_names=["person", "bicycle", "car", "motorbike", "aeroplane", "bus", "train", "truck"],
|
||||||
|
nms_threshold=0.5,
|
||||||
|
max_detections_per_class=50
|
||||||
|
)
|
||||||
|
|
||||||
|
print(f"Postprocess type: {options.postprocess_type.value}")
|
||||||
|
print(f"Detection threshold: {options.threshold}")
|
||||||
|
print(f"NMS threshold: {options.nms_threshold}")
|
||||||
|
print(f"Class names: {options.class_names[:5]}...") # Show first 5
|
||||||
|
return options
|
||||||
|
|
||||||
|
|
||||||
|
def demo_general_classification():
|
||||||
|
"""Demo general classification"""
|
||||||
|
print("=== General Classification Demo ===")
|
||||||
|
|
||||||
|
# Configure for general classification
|
||||||
|
options = PostProcessorOptions(
|
||||||
|
postprocess_type=PostProcessType.CLASSIFICATION,
|
||||||
|
threshold=0.6,
|
||||||
|
class_names=["cat", "dog", "bird", "fish", "horse"]
|
||||||
|
)
|
||||||
|
|
||||||
|
print(f"Postprocess type: {options.postprocess_type.value}")
|
||||||
|
print(f"Threshold: {options.threshold}")
|
||||||
|
print(f"Class names: {options.class_names}")
|
||||||
|
return options
|
||||||
|
|
||||||
|
|
||||||
|
def main():
|
||||||
|
"""Main demo function"""
|
||||||
|
print("Default Postprocess Options Demo")
|
||||||
|
print("=" * 40)
|
||||||
|
|
||||||
|
# Demo different postprocessing options
|
||||||
|
fire_options = demo_fire_detection()
|
||||||
|
print()
|
||||||
|
|
||||||
|
yolo_options = demo_yolo_object_detection()
|
||||||
|
print()
|
||||||
|
|
||||||
|
classification_options = demo_general_classification()
|
||||||
|
print()
|
||||||
|
|
||||||
|
# Example of how to initialize MultiDongle with options
|
||||||
|
print("=== MultiDongle Integration Example ===")
|
||||||
|
|
||||||
|
# NOTE: Update these paths according to your setup
|
||||||
|
PORT_IDS = [28, 32] # Update with your device port IDs
|
||||||
|
SCPU_FW = 'fw_scpu.bin' # Update with your firmware path
|
||||||
|
NCPU_FW = 'fw_ncpu.bin' # Update with your firmware path
|
||||||
|
MODEL_PATH = 'your_model.nef' # Update with your model path
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Example 1: Fire detection (default)
|
||||||
|
print("Initializing with fire detection...")
|
||||||
|
multidongle_fire = MultiDongle(
|
||||||
|
port_id=PORT_IDS,
|
||||||
|
scpu_fw_path=SCPU_FW,
|
||||||
|
ncpu_fw_path=NCPU_FW,
|
||||||
|
model_path=MODEL_PATH,
|
||||||
|
upload_fw=False, # Set to True if you need firmware upload
|
||||||
|
postprocess_options=fire_options
|
||||||
|
)
|
||||||
|
print(f"✓ Fire detection configured: {multidongle_fire.postprocess_options.postprocess_type.value}")
|
||||||
|
|
||||||
|
# Example 2: Change postprocessing options dynamically
|
||||||
|
print("Changing to YOLO detection...")
|
||||||
|
multidongle_fire.set_postprocess_options(yolo_options)
|
||||||
|
print(f"✓ YOLO detection configured: {multidongle_fire.postprocess_options.postprocess_type.value}")
|
||||||
|
|
||||||
|
# Example 3: Get available types
|
||||||
|
available_types = multidongle_fire.get_available_postprocess_types()
|
||||||
|
print(f"Available postprocess types: {[t.value for t in available_types]}")
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
print(f"Note: MultiDongle initialization skipped (no hardware): {e}")
|
||||||
|
|
||||||
|
print("\n=== Usage Notes ===")
|
||||||
|
print("1. Fire detection option is set as default")
|
||||||
|
print("2. Text output shows classification results with probabilities")
|
||||||
|
print("3. Bounding box output visualizes detected objects in live view")
|
||||||
|
print("4. All postprocessing is built-in to the app (no external dependencies)")
|
||||||
|
print("5. Exact nodes can configure postprocessing through UI properties")
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
Loading…
x
Reference in New Issue
Block a user