From 273ae71846c5a771e3b009c79b8fec02f3b5bd81 Mon Sep 17 00:00:00 2001 From: Masonmason Date: Thu, 24 Jul 2025 11:12:42 +0800 Subject: [PATCH] fix: Correct FPS calculation and reduce log spam MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Key fixes: 1. FPS Calculation: Only count actual inference results, not frame processing - Previous: counted every frame processed (~90 FPS, incorrect) - Now: only counts when actual inference results are received (~9 FPS, correct) - Return None from _process_data when no inference result available - Skip FPS counting for iterations without real results 2. Log Reduction: Significantly reduced verbose logging - Removed excessive debug prints for preprocessing steps - Removed "No inference result" spam messages - Only log actual successful inference results 3. Async Processing: Maintain proper async pattern - Still use non-blocking get_latest_inference_result(timeout=0.001) - Still use non-blocking put_input(block=False) - But only count real inference throughput for FPS This should now match standalone code behavior: ~4 FPS (1 dongle) vs ~9 FPS (2 dongles) 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude --- .../core/functions/InferencePipeline.py | 89 ++++++++----------- 1 file changed, 37 insertions(+), 52 deletions(-) diff --git a/cluster4npu_ui/core/functions/InferencePipeline.py b/cluster4npu_ui/core/functions/InferencePipeline.py index 13ed863..9869c0f 100644 --- a/cluster4npu_ui/core/functions/InferencePipeline.py +++ b/cluster4npu_ui/core/functions/InferencePipeline.py @@ -139,24 +139,29 @@ class PipelineStage: # Process data through this stage processed_data = self._process_data(pipeline_data) - # Record processing time - processing_time = time.time() - start_time - self.processing_times.append(processing_time) - if len(self.processing_times) > 1000: # Keep only recent times - self.processing_times = self.processing_times[-500:] - - self.processed_count += 1 - - # Put result to output queue - try: - self.output_queue.put(processed_data, block=False) - except queue.Full: - # Drop oldest and add new + # Only record processing and increment counter if we got a real result + if processed_data is not None: + # Record processing time + processing_time = time.time() - start_time + self.processing_times.append(processing_time) + if len(self.processing_times) > 1000: # Keep only recent times + self.processing_times = self.processing_times[-500:] + + self.processed_count += 1 + + # Put result to output queue try: - self.output_queue.get_nowait() self.output_queue.put(processed_data, block=False) - except queue.Empty: - pass + except queue.Full: + # Drop oldest and add new + try: + self.output_queue.get_nowait() + self.output_queue.put(processed_data, block=False) + except queue.Empty: + pass + else: + # No inference result - don't count this iteration + pass except Exception as e: self.error_count += 1 @@ -189,9 +194,7 @@ class PipelineStage: processed_data = None if isinstance(current_data, np.ndarray) and len(current_data.shape) == 3: # Always use MultiDongle's preprocess_frame to ensure correct format - print(f"[Stage {self.stage_id}] Preprocessing frame for MultiDongle...") processed_data = self.multidongle.preprocess_frame(current_data, 'BGR565') - print(f"[Stage {self.stage_id}] After MultiDongle preprocess: shape={processed_data.shape}, dtype={processed_data.dtype}") # Validate processed data if processed_data is None: @@ -201,57 +204,39 @@ class PipelineStage: elif isinstance(current_data, dict) and 'raw_output' in current_data: # This is result from previous stage, not suitable for direct inference - print(f"[Stage {self.stage_id}] Warning: Received processed result instead of image data") processed_data = current_data else: - print(f"[Stage {self.stage_id}] Warning: Unexpected data type: {type(current_data)}") processed_data = current_data # Step 3: MultiDongle inference if isinstance(processed_data, np.ndarray): - print(f"[Stage {self.stage_id}] Sending to MultiDongle: shape={processed_data.shape}, dtype={processed_data.dtype}") self.multidongle.put_input(processed_data, 'BGR565', block=False) # Get inference result (non-blocking, async pattern like standalone code) result = self.multidongle.get_latest_inference_result(timeout=0.001) - # Process result if available - inference_result = {} + # Process result if available - only count actual inference results for FPS + inference_result = None + has_real_result = False + if result is not None: if isinstance(result, tuple) and len(result) == 2: # Handle tuple results like (probability, result_string) prob, result_str = result if prob is not None and result_str is not None: - print(f"[Stage {self.stage_id}] Valid result: prob={prob}, result={result_str}") + print(f"[Stage {self.stage_id}] ✅ Valid inference result: prob={prob}, result={result_str}") inference_result = result - else: - print(f"[Stage {self.stage_id}] Invalid tuple result: prob={prob}, result={result_str}") - elif isinstance(result, dict): - if result: # Non-empty dict - print(f"[Stage {self.stage_id}] Valid dict result: {result}") - inference_result = result - else: - print(f"[Stage {self.stage_id}] Empty dict result") - else: - print(f"[Stage {self.stage_id}] Other result type: {type(result)}") - inference_result = result - else: - # No result available - this is normal in async processing - print(f"[Stage {self.stage_id}] No result available (async processing)") - inference_result = {"status": "processing"} - - # Handle result status (async processing doesn't need timeout warnings) - if (inference_result is None or - (isinstance(inference_result, dict) and inference_result.get("status") == "processing")): - # This is normal in async processing - use previous result or default - print(f"[Stage {self.stage_id}] Using async processing mode") - inference_result = {'probability': 0.0, 'result': 'Processing', 'status': 'async'} - elif (isinstance(inference_result, dict) and not inference_result) or \ - (isinstance(inference_result, tuple) and (not inference_result or inference_result == (None, None))): - print(f"[Stage {self.stage_id}] No valid result available") - inference_result = {'probability': 0.0, 'result': 'No Result'} - else: - print(f"[Stage {self.stage_id}] ✅ Successfully received inference result: {inference_result}") + has_real_result = True + elif isinstance(result, dict) and result: # Non-empty dict + print(f"[Stage {self.stage_id}] ✅ Valid dict result: {result}") + inference_result = result + has_real_result = True + + # If no valid result, don't process this iteration for FPS counting + if not has_real_result: + # Skip this iteration - no actual inference result to process + # (Don't spam logs - this is normal in async processing) + return None # Return None to indicate no processing occurred # Step 3: Output postprocessing (inter-stage) processed_result = inference_result