From c9f294bb4c76b4c3547291f41bcaa422a226fe85 Mon Sep 17 00:00:00 2001 From: HuangMason320 Date: Wed, 30 Jul 2025 19:45:34 +0800 Subject: [PATCH] fix: Improve FPS calculation and filter async results MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 主要改進: - 修改 FPS 計算邏輯為累積式計算,避免初期不穩定的高 FPS 值 - 過濾掉 async 和 processing 狀態的結果,不顯示也不計入統計 - 只有真正的推理結果才會被計入 FPS 和處理計數 - 新增 _has_valid_inference_result 方法來驗證結果有效性 - 改善 MultiDongle 的 stop 方法,正確斷開設備連接 - 清理不必要的檔案和更新測試配置 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude --- .../core/functions/InferencePipeline.py | 74 +-- cluster4npu_ui/core/functions/Multidongle.py | 9 + cluster4npu_ui/debug_deployment.py | 273 ---------- cluster4npu_ui/example.py | 504 ++++++++++++++++++ cluster4npu_ui/resources/{__init__.py} | 0 cluster4npu_ui/test.mflow | 26 +- cluster4npu_ui/ui/dialogs/deployment.py | 36 +- cluster4npu_ui/ui/{__init__.py} | 0 8 files changed, 597 insertions(+), 325 deletions(-) delete mode 100644 cluster4npu_ui/debug_deployment.py create mode 100644 cluster4npu_ui/example.py delete mode 100644 cluster4npu_ui/resources/{__init__.py} delete mode 100644 cluster4npu_ui/ui/{__init__.py} diff --git a/cluster4npu_ui/core/functions/InferencePipeline.py b/cluster4npu_ui/core/functions/InferencePipeline.py index bc79140..48ed746 100644 --- a/cluster4npu_ui/core/functions/InferencePipeline.py +++ b/cluster4npu_ui/core/functions/InferencePipeline.py @@ -350,9 +350,8 @@ class InferencePipeline: self.completed_counter = 0 self.error_counter = 0 - # FPS calculation based on output queue throughput - self.fps_window_size = 10.0 # 10 second window - self.output_timestamps = [] # Track when outputs are generated + # FPS calculation based on output queue throughput (cumulative approach) + self.fps_start_time = None # Start time for FPS calculation self.fps_lock = threading.Lock() # Thread safety for FPS calculation def initialize(self): @@ -375,39 +374,48 @@ class InferencePipeline: def _record_output_timestamp(self): """Record timestamp when output is generated for FPS calculation""" with self.fps_lock: - current_time = time.time() - self.output_timestamps.append(current_time) - - # Remove timestamps older than window - cutoff_time = current_time - self.fps_window_size - self.output_timestamps = [t for t in self.output_timestamps if t > cutoff_time] + # Set start time only when we have our first completed result + if self.fps_start_time is None and self.completed_counter == 1: + self.fps_start_time = time.time() def get_current_fps(self) -> float: - """Calculate current FPS based on output queue throughput""" + """Calculate current FPS based on output queue throughput (cumulative approach like example.py)""" with self.fps_lock: - if len(self.output_timestamps) < 2: + if self.fps_start_time is None or self.completed_counter == 0: return 0.0 - current_time = time.time() - # Clean old timestamps - cutoff_time = current_time - self.fps_window_size - valid_timestamps = [t for t in self.output_timestamps if t > cutoff_time] - - if len(valid_timestamps) < 2: - return 0.0 - - # Calculate FPS over the time window - time_span = valid_timestamps[-1] - valid_timestamps[0] - if time_span > 0: - return (len(valid_timestamps) - 1) / time_span + elapsed_time = time.time() - self.fps_start_time + if elapsed_time > 0: + return self.completed_counter / elapsed_time return 0.0 + def _has_valid_inference_result(self, pipeline_data) -> bool: + """Check if pipeline data contains valid inference results (not async/processing status)""" + for stage_id, stage_result in pipeline_data.stage_results.items(): + if stage_result: + # Check for tuple result (prob, result_str) + if isinstance(stage_result, tuple) and len(stage_result) == 2: + prob, result_str = stage_result + if prob is not None and result_str not in ['Processing']: + return True + # Check for dict result with actual inference data + elif isinstance(stage_result, dict): + # Don't count "Processing" or "async" status as real results + if stage_result.get("status") in ["processing", "async"]: + continue + # Don't count empty results + if stage_result.get("result") == "Processing": + continue + # If we have a meaningful result, count it + return True + return False + def start(self): """Start the pipeline""" # Clear previous FPS data when starting with self.fps_lock: - self.output_timestamps.clear() + self.fps_start_time = None print(f"[{self.pipeline_name}] Starting pipeline...") @@ -507,8 +515,11 @@ class InferencePipeline: except Exception as e: print(f"[{self.pipeline_name}] Final postprocessing error: {e}") - # Output result + # Output result - but only if it's a real inference result, not async if success: + # Check if we have valid inference results (not async/processing status) + has_valid_inference = self._has_valid_inference_result(current_data) + current_data.metadata['end_timestamp'] = time.time() current_data.metadata['total_processing_time'] = ( current_data.metadata['end_timestamp'] - @@ -517,10 +528,12 @@ class InferencePipeline: try: self.pipeline_output_queue.put(current_data, block=False) - self.completed_counter += 1 - # Record output timestamp for FPS calculation - self._record_output_timestamp() + # Only count completed results if they contain valid inference + if has_valid_inference: + self.completed_counter += 1 + # Record output timestamp for FPS calculation + self._record_output_timestamp() # Debug: Log pipeline activity every 10 results if self.completed_counter % 10 == 0: @@ -536,8 +549,9 @@ class InferencePipeline: try: self.pipeline_output_queue.get_nowait() self.pipeline_output_queue.put(current_data, block=False) - # Record output timestamp even when queue was full - self._record_output_timestamp() + # Only record timestamp and count if valid inference result + if has_valid_inference: + self._record_output_timestamp() except queue.Empty: pass else: diff --git a/cluster4npu_ui/core/functions/Multidongle.py b/cluster4npu_ui/core/functions/Multidongle.py index e05fccd..9e51c5d 100644 --- a/cluster4npu_ui/core/functions/Multidongle.py +++ b/cluster4npu_ui/core/functions/Multidongle.py @@ -530,6 +530,15 @@ class MultiDongle: if thread.is_alive(): print(f"Warning: {name} thread didn't stop cleanly") + print("Disconnecting device group...") + if self.device_group: + try: + kp.core.disconnect_devices(device_group=self.device_group) + print("Device group disconnected successfully.") + except kp.ApiKPException as e: + print(f"Error disconnecting device group: {e}") + self.device_group = None + def put_input(self, image: Union[str, np.ndarray], format: str, target_size: Tuple[int, int] = None): """ Put an image into the input queue with flexible preprocessing diff --git a/cluster4npu_ui/debug_deployment.py b/cluster4npu_ui/debug_deployment.py deleted file mode 100644 index c004f5f..0000000 --- a/cluster4npu_ui/debug_deployment.py +++ /dev/null @@ -1,273 +0,0 @@ -#!/usr/bin/env python3 -""" -Debug script to trace deployment pipeline data flow. -This script helps identify where data flow breaks during deployment. -""" - -import sys -import os -import json -from typing import Dict, Any - -# Add the project root to the Python path -project_root = os.path.dirname(os.path.abspath(__file__)) -sys.path.insert(0, project_root) -sys.path.insert(0, os.path.join(project_root, 'core', 'functions')) - -try: - from core.functions.mflow_converter import MFlowConverter - from core.functions.workflow_orchestrator import WorkflowOrchestrator - from core.functions.InferencePipeline import InferencePipeline - IMPORTS_AVAILABLE = True -except ImportError as e: - print(f"❌ Import error: {e}") - IMPORTS_AVAILABLE = False - -def create_test_pipeline_data() -> Dict[str, Any]: - """Create a minimal test pipeline that should work.""" - return { - 'project_name': 'Debug Test Pipeline', - 'description': 'Simple test pipeline for debugging data flow', - 'version': '1.0', - 'nodes': [ - { - 'id': 'input_1', - 'name': 'Camera Input', - 'type': 'ExactInputNode', - 'pos': [100, 100], - 'properties': { - 'source_type': 'camera', # lowercase to match WorkflowOrchestrator - 'device_id': 0, - 'resolution': '640x480', # smaller resolution for testing - 'fps': 10 # lower fps for testing - } - }, - { - 'id': 'model_1', - 'name': 'Test Model', - 'type': 'ExactModelNode', - 'pos': [300, 100], - 'properties': { - 'model_path': 'C:/Users/mason/AppData/Local/Kneron_Academy/utils/yolov5s/yolov5s/kl520_20005_yolov5-noupsample_w640h640.nef', - 'scpu_fw_path': 'C:/Users/mason/Downloads/kneron_plus_v3.1.2/kneron_plus/res/firmware/KL520/fw_scpu.bin', - 'ncpu_fw_path': 'C:/Users/mason/Downloads/kneron_plus_v3.1.2/kneron_plus/res/firmware/KL520/fw_ncpu.bin', - 'port_ids': '32', - 'upload_fw': True - } - }, - { - 'id': 'output_1', - 'name': 'Debug Output', - 'type': 'ExactOutputNode', - 'pos': [500, 100], - 'properties': { - 'output_type': 'console', - 'destination': './debug_output' - } - } - ], - 'connections': [ - { - 'input_node': 'input_1', - 'input_port': 'output', - 'output_node': 'model_1', - 'output_port': 'input' - }, - { - 'input_node': 'model_1', - 'input_port': 'output', - 'output_node': 'output_1', - 'output_port': 'input' - } - ] - } - -def trace_pipeline_conversion(pipeline_data: Dict[str, Any]): - """Trace the conversion process step by step.""" - print("🔍 DEBUGGING PIPELINE CONVERSION") - print("=" * 60) - - if not IMPORTS_AVAILABLE: - print("❌ Cannot trace conversion - imports not available") - return None, None, None - - try: - print("1️⃣ Creating MFlowConverter...") - converter = MFlowConverter() - - print("2️⃣ Converting pipeline data to config...") - config = converter._convert_mflow_to_config(pipeline_data) - - print(f"✅ Conversion successful!") - print(f" Pipeline name: {config.pipeline_name}") - print(f" Total stages: {len(config.stage_configs)}") - - print("\n📊 INPUT CONFIG:") - print(json.dumps(config.input_config, indent=2)) - - print("\n📊 OUTPUT CONFIG:") - print(json.dumps(config.output_config, indent=2)) - - print("\n📊 STAGE CONFIGS:") - for i, stage_config in enumerate(config.stage_configs, 1): - print(f" Stage {i}: {stage_config.stage_id}") - print(f" Port IDs: {stage_config.port_ids}") - print(f" Model: {stage_config.model_path}") - - print("\n3️⃣ Validating configuration...") - is_valid, errors = converter.validate_config(config) - if is_valid: - print("✅ Configuration is valid") - else: - print("❌ Configuration validation failed:") - for error in errors: - print(f" - {error}") - - return converter, config, is_valid - - except Exception as e: - print(f"❌ Conversion failed: {e}") - import traceback - traceback.print_exc() - return None, None, False - -def trace_workflow_creation(converter, config): - """Trace the workflow orchestrator creation.""" - print("\n🔧 DEBUGGING WORKFLOW ORCHESTRATOR") - print("=" * 60) - - try: - print("1️⃣ Creating InferencePipeline...") - pipeline = converter.create_inference_pipeline(config) - print("✅ Pipeline created") - - print("2️⃣ Creating WorkflowOrchestrator...") - orchestrator = WorkflowOrchestrator(pipeline, config.input_config, config.output_config) - print("✅ Orchestrator created") - - print("3️⃣ Checking data source creation...") - data_source = orchestrator._create_data_source() - if data_source: - print(f"✅ Data source created: {type(data_source).__name__}") - - # Check if the data source can initialize - print("4️⃣ Testing data source initialization...") - if hasattr(data_source, 'initialize'): - init_result = data_source.initialize() - print(f" Initialization result: {init_result}") - else: - print(" Data source has no initialize method") - - else: - print("❌ Data source creation failed") - print(f" Source type: {config.input_config.get('source_type', 'MISSING')}") - - print("5️⃣ Checking result handler creation...") - result_handler = orchestrator._create_result_handler() - if result_handler: - print(f"✅ Result handler created: {type(result_handler).__name__}") - else: - print("⚠️ No result handler created (may be expected)") - - return orchestrator, data_source, result_handler - - except Exception as e: - print(f"❌ Workflow creation failed: {e}") - import traceback - traceback.print_exc() - return None, None, None - -def test_data_flow(orchestrator): - """Test the actual data flow without real dongles.""" - print("\n🌊 TESTING DATA FLOW") - print("=" * 60) - - # Set up result callback to track data - results_received = [] - - def debug_result_callback(result_dict): - print(f"🎯 RESULT RECEIVED: {result_dict}") - results_received.append(result_dict) - - def debug_frame_callback(frame): - print(f"📸 FRAME RECEIVED: {type(frame)} shape={getattr(frame, 'shape', 'N/A')}") - - try: - print("1️⃣ Setting up callbacks...") - orchestrator.set_result_callback(debug_result_callback) - orchestrator.set_frame_callback(debug_frame_callback) - - print("2️⃣ Starting orchestrator (this will fail with dongles, but should show data source activity)...") - orchestrator.start() - - print("3️⃣ Running for 5 seconds to capture any activity...") - import time - time.sleep(5) - - print("4️⃣ Stopping orchestrator...") - orchestrator.stop() - - print(f"📊 Results summary:") - print(f" Total results received: {len(results_received)}") - - return len(results_received) > 0 - - except Exception as e: - print(f"❌ Data flow test failed: {e}") - print(" This might be expected if dongles are not available") - return False - -def main(): - """Main debugging function.""" - print("🚀 CLUSTER4NPU DEPLOYMENT DEBUG TOOL") - print("=" * 60) - - # Create test pipeline data - pipeline_data = create_test_pipeline_data() - - # Trace conversion - converter, config, is_valid = trace_pipeline_conversion(pipeline_data) - - if not converter or not config or not is_valid: - print("\n❌ Cannot proceed - conversion failed or invalid") - return - - # Trace workflow creation - orchestrator, data_source, result_handler = trace_workflow_creation(converter, config) - - if not orchestrator: - print("\n❌ Cannot proceed - workflow creation failed") - return - - # Test data flow (this will likely fail with dongle connection, but shows data source behavior) - print("\n⚠️ Note: The following test will likely fail due to missing dongles,") - print(" but it will help us see if the data source is working correctly.") - - data_flowing = test_data_flow(orchestrator) - - print("\n📋 DEBUGGING SUMMARY") - print("=" * 60) - print(f"✅ Pipeline conversion: {'SUCCESS' if converter else 'FAILED'}") - print(f"✅ Configuration validation: {'SUCCESS' if is_valid else 'FAILED'}") - print(f"✅ Workflow orchestrator: {'SUCCESS' if orchestrator else 'FAILED'}") - print(f"✅ Data source creation: {'SUCCESS' if data_source else 'FAILED'}") - print(f"✅ Result handler creation: {'SUCCESS' if result_handler else 'N/A'}") - print(f"✅ Data flow test: {'SUCCESS' if data_flowing else 'FAILED (expected without dongles)'}") - - if data_source and not data_flowing: - print("\n🔍 DIAGNOSIS:") - print("The issue appears to be that:") - print("1. Pipeline configuration is working correctly") - print("2. Data source can be created") - print("3. BUT: Either the data source cannot initialize (camera not available)") - print(" OR: The pipeline cannot start (dongles not available)") - print(" OR: No data is being sent to the pipeline") - - print("\n💡 RECOMMENDATIONS:") - print("1. Check if a camera is connected at index 0") - print("2. Check if dongles are properly connected") - print("3. Add more detailed logging to WorkflowOrchestrator.start()") - print("4. Verify the pipeline.put_data() callback is being called") - -if __name__ == "__main__": - main() \ No newline at end of file diff --git a/cluster4npu_ui/example.py b/cluster4npu_ui/example.py new file mode 100644 index 0000000..6b73ded --- /dev/null +++ b/cluster4npu_ui/example.py @@ -0,0 +1,504 @@ +from typing import Union, Tuple +import os +import sys +import argparse +import time +import threading +import queue +import numpy as np +import kp +import cv2 +import time +from abc import ABC, abstractmethod +from typing import Callable, Optional, Any, Dict + + +# class PreProcessor(DataProcessor): # type: ignore +# def __init__(self, resize_fn: Optional[Callable] = None, +# format_convert_fn: Optional[Callable] = None): +# self.resize_fn = resize_fn or self._default_resize +# self.format_convert_fn = format_convert_fn or self._default_format_convert + +# def process(self, frame: np.ndarray, target_size: tuple, target_format: str) -> np.ndarray: +# """Main processing pipeline""" +# resized = self.resize_fn(frame, target_size) +# return self.format_convert_fn(resized, target_format) + +# def _default_resize(self, frame: np.ndarray, target_size: tuple) -> np.ndarray: +# """Default resize implementation""" +# return cv2.resize(frame, target_size) + +# def _default_format_convert(self, frame: np.ndarray, target_format: str) -> np.ndarray: +# """Default format conversion""" +# if target_format == 'BGR565': +# return cv2.cvtColor(frame, cv2.COLOR_BGR2BGR565) +# elif target_format == 'RGB8888': +# return cv2.cvtColor(frame, cv2.COLOR_BGR2RGBA) +# return frame + +class MultiDongle: + # Curently, only BGR565, RGB8888, YUYV, and RAW8 formats are supported + _FORMAT_MAPPING = { + 'BGR565': kp.ImageFormat.KP_IMAGE_FORMAT_RGB565, + 'RGB8888': kp.ImageFormat.KP_IMAGE_FORMAT_RGBA8888, + 'YUYV': kp.ImageFormat.KP_IMAGE_FORMAT_YUYV, + 'RAW8': kp.ImageFormat.KP_IMAGE_FORMAT_RAW8, + # 'YCBCR422_CRY1CBY0': kp.ImageFormat.KP_IMAGE_FORMAT_YCBCR422_CRY1CBY0, + # 'YCBCR422_CBY1CRY0': kp.ImageFormat.KP_IMAGE_FORMAT_CBY1CRY0, + # 'YCBCR422_Y1CRY0CB': kp.ImageFormat.KP_IMAGE_FORMAT_Y1CRY0CB, + # 'YCBCR422_Y1CBY0CR': kp.ImageFormat.KP_IMAGE_FORMAT_Y1CBY0CR, + # 'YCBCR422_CRY0CBY1': kp.ImageFormat.KP_IMAGE_FORMAT_CRY0CBY1, + # 'YCBCR422_CBY0CRY1': kp.ImageFormat.KP_IMAGE_FORMAT_CBY0CRY1, + # 'YCBCR422_Y0CRY1CB': kp.ImageFormat.KP_IMAGE_FORMAT_Y0CRY1CB, + # 'YCBCR422_Y0CBY1CR': kp.ImageFormat.KP_IMAGE_FORMAT_Y0CBY1CR, + } + + def __init__(self, port_id: list, scpu_fw_path: str, ncpu_fw_path: str, model_path: str, upload_fw: bool = False): + """ + Initialize the MultiDongle class. + :param port_id: List of USB port IDs for the same layer's devices. + :param scpu_fw_path: Path to the SCPU firmware file. + :param ncpu_fw_path: Path to the NCPU firmware file. + :param model_path: Path to the model file. + :param upload_fw: Flag to indicate whether to upload firmware. + """ + self.port_id = port_id + self.upload_fw = upload_fw + + # Check if the firmware is needed + if self.upload_fw: + self.scpu_fw_path = scpu_fw_path + self.ncpu_fw_path = ncpu_fw_path + + self.model_path = model_path + self.device_group = None + + # generic_inference_input_descriptor will be prepared in initialize + self.model_nef_descriptor = None + self.generic_inference_input_descriptor = None + # Queues for data + # Input queue for images to be sent + self._input_queue = queue.Queue() + # Output queue for received results + self._output_queue = queue.Queue() + + # Threading attributes + self._send_thread = None + self._receive_thread = None + self._stop_event = threading.Event() # Event to signal threads to stop + + self._inference_counter = 0 + + def initialize(self): + """ + Connect devices, upload firmware (if upload_fw is True), and upload model. + Must be called before start(). + """ + # Connect device and assign to self.device_group + try: + print('[Connect Device]') + self.device_group = kp.core.connect_devices(usb_port_ids=self.port_id) + print(' - Success') + except kp.ApiKPException as exception: + print('Error: connect device fail, port ID = \'{}\', error msg: [{}]'.format(self.port_id, str(exception))) + sys.exit(1) + + # setting timeout of the usb communication with the device + # print('[Set Device Timeout]') + # kp.core.set_timeout(device_group=self.device_group, milliseconds=5000) + # print(' - Success') + + if self.upload_fw: + try: + print('[Upload Firmware]') + kp.core.load_firmware_from_file(device_group=self.device_group, + scpu_fw_path=self.scpu_fw_path, + ncpu_fw_path=self.ncpu_fw_path) + print(' - Success') + except kp.ApiKPException as exception: + print('Error: upload firmware failed, error = \'{}\''.format(str(exception))) + sys.exit(1) + + # upload model to device + try: + print('[Upload Model]') + self.model_nef_descriptor = kp.core.load_model_from_file(device_group=self.device_group, + file_path=self.model_path) + print(' - Success') + except kp.ApiKPException as exception: + print('Error: upload model failed, error = \'{}\''.format(str(exception))) + sys.exit(1) + + # Extract model input dimensions automatically from model metadata + if self.model_nef_descriptor and self.model_nef_descriptor.models: + model = self.model_nef_descriptor.models[0] + if hasattr(model, 'input_nodes') and model.input_nodes: + input_node = model.input_nodes[0] + # From your JSON: "shape_npu": [1, 3, 128, 128] -> (width, height) + shape = input_node.tensor_shape_info.data.shape_npu + self.model_input_shape = (shape[3], shape[2]) # (width, height) + self.model_input_channels = shape[1] # 3 for RGB + print(f"Model input shape detected: {self.model_input_shape}, channels: {self.model_input_channels}") + else: + self.model_input_shape = (128, 128) # fallback + self.model_input_channels = 3 + print("Using default input shape (128, 128)") + else: + self.model_input_shape = (128, 128) + self.model_input_channels = 3 + print("Model info not available, using default shape") + + # Prepare generic inference input descriptor after model is loaded + if self.model_nef_descriptor: + self.generic_inference_input_descriptor = kp.GenericImageInferenceDescriptor( + model_id=self.model_nef_descriptor.models[0].id, + ) + else: + print("Warning: Could not get generic inference input descriptor from model.") + self.generic_inference_input_descriptor = None + + def preprocess_frame(self, frame: np.ndarray, target_format: str = 'BGR565') -> np.ndarray: + """ + Preprocess frame for inference + """ + resized_frame = cv2.resize(frame, self.model_input_shape) + + if target_format == 'BGR565': + return cv2.cvtColor(resized_frame, cv2.COLOR_BGR2BGR565) + elif target_format == 'RGB8888': + return cv2.cvtColor(resized_frame, cv2.COLOR_BGR2RGBA) + elif target_format == 'YUYV': + return cv2.cvtColor(resized_frame, cv2.COLOR_BGR2YUV_YUYV) + else: + return resized_frame # RAW8 or other formats + + def get_latest_inference_result(self, timeout: float = 0.01) -> Tuple[float, str]: + """ + Get the latest inference result + Returns: (probability, result_string) or (None, None) if no result + """ + output_descriptor = self.get_output(timeout=timeout) + if not output_descriptor: + return None, None + + # Process the output descriptor + if hasattr(output_descriptor, 'header') and \ + hasattr(output_descriptor.header, 'num_output_node') and \ + hasattr(output_descriptor.header, 'inference_number'): + + inf_node_output_list = [] + retrieval_successful = True + + for node_idx in range(output_descriptor.header.num_output_node): + try: + inference_float_node_output = kp.inference.generic_inference_retrieve_float_node( + node_idx=node_idx, + generic_raw_result=output_descriptor, + channels_ordering=kp.ChannelOrdering.KP_CHANNEL_ORDERING_CHW + ) + inf_node_output_list.append(inference_float_node_output.ndarray.copy()) + except kp.ApiKPException as e: + retrieval_successful = False + break + except Exception as e: + retrieval_successful = False + break + + if retrieval_successful and len(inf_node_output_list) > 0: + # Process output nodes + if output_descriptor.header.num_output_node == 1: + raw_output_array = inf_node_output_list[0].flatten() + else: + concatenated_outputs = [arr.flatten() for arr in inf_node_output_list] + raw_output_array = np.concatenate(concatenated_outputs) if concatenated_outputs else np.array([]) + + if raw_output_array.size > 0: + probability = postprocess(raw_output_array) + result_str = "Fire" if probability > 0.5 else "No Fire" + return probability, result_str + + return None, None + + # Modified _send_thread_func to get data from input queue + def _send_thread_func(self): + """Internal function run by the send thread, gets images from input queue.""" + print("Send thread started.") + while not self._stop_event.is_set(): + if self.generic_inference_input_descriptor is None: + # Wait for descriptor to be ready or stop + self._stop_event.wait(0.1) # Avoid busy waiting + continue + + try: + # Get image and format from the input queue + # Blocks until an item is available or stop event is set/timeout occurs + try: + # Use get with timeout or check stop event in a loop + # This pattern allows thread to check stop event while waiting on queue + item = self._input_queue.get(block=True, timeout=0.1) + # Check if this is our sentinel value + if item is None: + continue + + # Now safely unpack the tuple + image_data, image_format_enum = item + except queue.Empty: + # If queue is empty after timeout, check stop event and continue loop + continue + + # Configure and send the image + self._inference_counter += 1 # Increment counter for each image + self.generic_inference_input_descriptor.inference_number = self._inference_counter + self.generic_inference_input_descriptor.input_node_image_list = [kp.GenericInputNodeImage( + image=image_data, + image_format=image_format_enum, # Use the format from the queue + resize_mode=kp.ResizeMode.KP_RESIZE_ENABLE, + padding_mode=kp.PaddingMode.KP_PADDING_CORNER, + normalize_mode=kp.NormalizeMode.KP_NORMALIZE_KNERON + )] + + kp.inference.generic_image_inference_send(device_group=self.device_group, + generic_inference_input_descriptor=self.generic_inference_input_descriptor) + # print("Image sent.") # Optional: add log + # No need for sleep here usually, as queue.get is blocking + except kp.ApiKPException as exception: + print(f' - Error in send thread: inference send failed, error = {exception}') + self._stop_event.set() # Signal other thread to stop + except Exception as e: + print(f' - Unexpected error in send thread: {e}') + self._stop_event.set() + + print("Send thread stopped.") + + # _receive_thread_func remains the same + def _receive_thread_func(self): + """Internal function run by the receive thread, puts results into output queue.""" + print("Receive thread started.") + while not self._stop_event.is_set(): + try: + generic_inference_output_descriptor = kp.inference.generic_image_inference_receive(device_group=self.device_group) + self._output_queue.put(generic_inference_output_descriptor) + except kp.ApiKPException as exception: + if not self._stop_event.is_set(): # Avoid printing error if we are already stopping + print(f' - Error in receive thread: inference receive failed, error = {exception}') + self._stop_event.set() + except Exception as e: + print(f' - Unexpected error in receive thread: {e}') + self._stop_event.set() + + print("Receive thread stopped.") + + def start(self): + """ + Start the send and receive threads. + Must be called after initialize(). + """ + if self.device_group is None: + raise RuntimeError("MultiDongle not initialized. Call initialize() first.") + + if self._send_thread is None or not self._send_thread.is_alive(): + self._stop_event.clear() # Clear stop event for a new start + self._send_thread = threading.Thread(target=self._send_thread_func, daemon=True) + self._send_thread.start() + print("Send thread started.") + + if self._receive_thread is None or not self._receive_thread.is_alive(): + self._receive_thread = threading.Thread(target=self._receive_thread_func, daemon=True) + self._receive_thread.start() + print("Receive thread started.") + + def stop(self): + """Improved stop method with better cleanup""" + if self._stop_event.is_set(): + return # Already stopping + + print("Stopping threads...") + self._stop_event.set() + + # Clear queues to unblock threads + while not self._input_queue.empty(): + try: + self._input_queue.get_nowait() + except queue.Empty: + break + + # Signal send thread to wake up + self._input_queue.put(None) + + # Join threads with timeout + for thread, name in [(self._send_thread, "Send"), (self._receive_thread, "Receive")]: + if thread and thread.is_alive(): + thread.join(timeout=2.0) + if thread.is_alive(): + print(f"Warning: {name} thread didn't stop cleanly") + + def put_input(self, image: Union[str, np.ndarray], format: str, target_size: Tuple[int, int] = None): + """ + Put an image into the input queue with flexible preprocessing + """ + if isinstance(image, str): + image_data = cv2.imread(image) + if image_data is None: + raise FileNotFoundError(f"Image file not found at {image}") + if target_size: + image_data = cv2.resize(image_data, target_size) + elif isinstance(image, np.ndarray): + # Don't modify original array, make copy if needed + image_data = image.copy() if target_size is None else cv2.resize(image, target_size) + else: + raise ValueError("Image must be a file path (str) or a numpy array (ndarray).") + + if format in self._FORMAT_MAPPING: + image_format_enum = self._FORMAT_MAPPING[format] + else: + raise ValueError(f"Unsupported format: {format}") + + self._input_queue.put((image_data, image_format_enum)) + + def get_output(self, timeout: float = None): + """ + Get the next received data from the output queue. + This method is non-blocking by default unless a timeout is specified. + :param timeout: Time in seconds to wait for data. If None, it's non-blocking. + :return: Received data (e.g., kp.GenericInferenceOutputDescriptor) or None if no data available within timeout. + """ + try: + return self._output_queue.get(block=timeout is not None, timeout=timeout) + except queue.Empty: + return None + + def __del__(self): + """Ensure resources are released when the object is garbage collected.""" + self.stop() + if self.device_group: + try: + kp.core.disconnect_devices(device_group=self.device_group) + print("Device group disconnected in destructor.") + except Exception as e: + print(f"Error disconnecting device group in destructor: {e}") + +def postprocess(raw_model_output: list) -> float: + """ + Post-processes the raw model output. + Assumes the model output is a list/array where the first element is the desired probability. + """ + if raw_model_output is not None and len(raw_model_output) > 0: + probability = raw_model_output[0] + return float(probability) + return 0.0 # Default or error value + +class WebcamInferenceRunner: + def __init__(self, multidongle: MultiDongle, image_format: str = 'BGR565'): + self.multidongle = multidongle + self.image_format = image_format + self.latest_probability = 0.0 + self.result_str = "No Fire" + + # Statistics tracking + self.processed_inference_count = 0 + self.inference_fps_start_time = None + self.display_fps_start_time = None + self.display_frame_counter = 0 + + def run(self, camera_id: int = 0): + cap = cv2.VideoCapture(camera_id) + if not cap.isOpened(): + raise RuntimeError("Cannot open webcam") + + try: + while True: + ret, frame = cap.read() + if not ret: + break + + # Track display FPS + if self.display_fps_start_time is None: + self.display_fps_start_time = time.time() + self.display_frame_counter += 1 + + # Preprocess and send frame + processed_frame = self.multidongle.preprocess_frame(frame, self.image_format) + self.multidongle.put_input(processed_frame, self.image_format) + + # Get inference result + prob, result = self.multidongle.get_latest_inference_result() + if prob is not None: + # Track inference FPS + if self.inference_fps_start_time is None: + self.inference_fps_start_time = time.time() + self.processed_inference_count += 1 + + self.latest_probability = prob + self.result_str = result + + # Display frame with results + self._display_results(frame) + + if cv2.waitKey(1) & 0xFF == ord('q'): + break + + finally: + # self._print_statistics() + cap.release() + cv2.destroyAllWindows() + + def _display_results(self, frame): + display_frame = frame.copy() + text_color = (0, 255, 0) if "Fire" in self.result_str else (0, 0, 255) + + # Display inference result + cv2.putText(display_frame, f"{self.result_str} (Prob: {self.latest_probability:.2f})", + (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, text_color, 2) + + # Calculate and display inference FPS + if self.inference_fps_start_time and self.processed_inference_count > 0: + elapsed_time = time.time() - self.inference_fps_start_time + if elapsed_time > 0: + inference_fps = self.processed_inference_count / elapsed_time + cv2.putText(display_frame, f"Inference FPS: {inference_fps:.2f}", + (10, 60), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 255), 2) + + cv2.imshow('Fire Detection', display_frame) + + # def _print_statistics(self): + # """Print final statistics""" + # print(f"\n--- Summary ---") + # print(f"Total inferences processed: {self.processed_inference_count}") + + # if self.inference_fps_start_time and self.processed_inference_count > 0: + # elapsed = time.time() - self.inference_fps_start_time + # if elapsed > 0: + # avg_inference_fps = self.processed_inference_count / elapsed + # print(f"Average Inference FPS: {avg_inference_fps:.2f}") + + # if self.display_fps_start_time and self.display_frame_counter > 0: + # elapsed = time.time() - self.display_fps_start_time + # if elapsed > 0: + # avg_display_fps = self.display_frame_counter / elapsed + # print(f"Average Display FPS: {avg_display_fps:.2f}") + +if __name__ == "__main__": + PORT_IDS = [32] + SCPU_FW = r'C:/Users/mason/Downloads/kneron_plus_v3.1.2/kneron_plus/res/firmware/KL520/fw_scpu.bin' + NCPU_FW = r'C:/Users/mason/Downloads/kneron_plus_v3.1.2/kneron_plus/res/firmware/KL520/fw_ncpu.bin' + MODEL_PATH = r'C:/Users/mason/AppData/Local/Kneron_Academy/utils/yolov5s/yolov5s/kl520_20005_yolov5-noupsample_w640h640.nef' + + try: + # Initialize inference engine + print("Initializing MultiDongle...") + multidongle = MultiDongle(PORT_IDS, SCPU_FW, NCPU_FW, MODEL_PATH, upload_fw=True) + multidongle.initialize() + multidongle.start() + + # Run using the new runner class + print("Starting webcam inference...") + runner = WebcamInferenceRunner(multidongle, 'BGR565') + runner.run() + + except Exception as e: + print(f"Error: {e}") + import traceback + traceback.print_exc() + finally: + if 'multidongle' in locals(): + multidongle.stop() \ No newline at end of file diff --git a/cluster4npu_ui/resources/{__init__.py} b/cluster4npu_ui/resources/{__init__.py} deleted file mode 100644 index e69de29..0000000 diff --git a/cluster4npu_ui/test.mflow b/cluster4npu_ui/test.mflow index a9174d4..acda708 100644 --- a/cluster4npu_ui/test.mflow +++ b/cluster4npu_ui/test.mflow @@ -3,7 +3,7 @@ "description": "", "nodes": [ { - "id": "0x1ba4f6792d0", + "id": "0x1cfb7f56610", "name": "Input Node", "type": "ExactInputNode", "pos": [ @@ -19,12 +19,12 @@ } }, { - "id": "0x1ba4f6948d0", + "id": "0x1cfb7f75b90", "name": "Model Node", "type": "ExactModelNode", "pos": [ - 245.18958624423732, - 292.00000000000006 + 246.43484658813134, + 294.4905206877882 ], "properties": { "dongle_series": "520", @@ -32,11 +32,11 @@ "model_path": "C:/Users/mason/AppData/Local/Kneron_Academy/utils/yolov5s/yolov5s/kl520_20005_yolov5-noupsample_w640h640.nef", "scpu_fw_path": "C:/Users/mason/Downloads/kneron_plus_v3.1.2/kneron_plus/res/firmware/KL520/fw_scpu.bin", "ncpu_fw_path": "C:/Users/mason/Downloads/kneron_plus_v3.1.2/kneron_plus/res/firmware/KL520/fw_ncpu.bin", - "port_id": "6, 32" + "port_id": "6" } }, { - "id": "0x1ba4f696510", + "id": "0x1cfb7f77790", "name": "Output Node", "type": "ExactOutputNode", "pos": [ @@ -51,7 +51,7 @@ } }, { - "id": "0x1ba4f697810", + "id": "0x1cfb7f80a90", "name": "Preprocess Node", "type": "ExactPreprocessNode", "pos": [ @@ -67,21 +67,21 @@ ], "connections": [ { - "input_node": "0x1ba4f697810", + "input_node": "0x1cfb7f80a90", "input_port": "input", - "output_node": "0x1ba4f6792d0", + "output_node": "0x1cfb7f56610", "output_port": "output" }, { - "input_node": "0x1ba4f696510", + "input_node": "0x1cfb7f77790", "input_port": "input", - "output_node": "0x1ba4f6948d0", + "output_node": "0x1cfb7f75b90", "output_port": "output" }, { - "input_node": "0x1ba4f6948d0", + "input_node": "0x1cfb7f75b90", "input_port": "input", - "output_node": "0x1ba4f697810", + "output_node": "0x1cfb7f80a90", "output_port": "output" } ], diff --git a/cluster4npu_ui/ui/dialogs/deployment.py b/cluster4npu_ui/ui/dialogs/deployment.py index 7de2ecb..1525bc3 100644 --- a/cluster4npu_ui/ui/dialogs/deployment.py +++ b/cluster4npu_ui/ui/dialogs/deployment.py @@ -190,16 +190,34 @@ class DeploymentWorker(QThread): # Set up both GUI and terminal result callbacks def combined_result_callback(result_dict): - # Add current FPS from pipeline to result_dict - current_fps = pipeline.get_current_fps() - result_dict['current_pipeline_fps'] = current_fps - print(f"DEBUG: Pipeline FPS = {current_fps:.2f}") # Debug info + # Check if this is a valid result (not async/processing status) + stage_results = result_dict.get('stage_results', {}) + has_valid_result = False - # Send to GUI terminal and results display - terminal_output = self._format_terminal_results(result_dict) - self.terminal_output.emit(terminal_output) - # Emit for GUI - self.result_updated.emit(result_dict) + for stage_id, result in stage_results.items(): + if isinstance(result, dict): + status = result.get('status', '') + if status not in ['async', 'processing']: + has_valid_result = True + break + elif isinstance(result, tuple) and len(result) == 2: + prob, result_str = result + if prob is not None and result_str not in ['Processing']: + has_valid_result = True + break + + # Only display and process if we have valid results + if has_valid_result: + # Add current FPS from pipeline to result_dict + current_fps = pipeline.get_current_fps() + result_dict['current_pipeline_fps'] = current_fps + print(f"DEBUG: Pipeline FPS = {current_fps:.2f}") # Debug info + + # Send to GUI terminal and results display + terminal_output = self._format_terminal_results(result_dict) + self.terminal_output.emit(terminal_output) + # Emit for GUI + self.result_updated.emit(result_dict) self.orchestrator.set_result_callback(combined_result_callback) diff --git a/cluster4npu_ui/ui/{__init__.py} b/cluster4npu_ui/ui/{__init__.py} deleted file mode 100644 index e69de29..0000000