diff --git a/cluster4npu_ui/core/functions/InferencePipeline.py b/cluster4npu_ui/core/functions/InferencePipeline.py index 13ed863..d1b4b29 100644 --- a/cluster4npu_ui/core/functions/InferencePipeline.py +++ b/cluster4npu_ui/core/functions/InferencePipeline.py @@ -139,13 +139,15 @@ class PipelineStage: # Process data through this stage processed_data = self._process_data(pipeline_data) - # Record processing time - processing_time = time.time() - start_time - self.processing_times.append(processing_time) - if len(self.processing_times) > 1000: # Keep only recent times - self.processing_times = self.processing_times[-500:] - - self.processed_count += 1 + # Only count and record timing for actual inference results + if processed_data and self._has_inference_result(processed_data): + # Record processing time + processing_time = time.time() - start_time + self.processing_times.append(processing_time) + if len(self.processing_times) > 1000: # Keep only recent times + self.processing_times = self.processing_times[-500:] + + self.processed_count += 1 # Put result to output queue try: @@ -165,33 +167,51 @@ class PipelineStage: print(f"[Stage {self.stage_id}] Worker loop stopped") + def _has_inference_result(self, processed_data) -> bool: + """Check if processed_data contains a valid inference result""" + if not processed_data: + return False + + try: + # Check if it's a PipelineData with stage results + if hasattr(processed_data, 'stage_results') and processed_data.stage_results: + stage_result = processed_data.stage_results.get(self.stage_id) + if stage_result: + # Check for tuple result (prob, result_str) + if isinstance(stage_result, tuple) and len(stage_result) == 2: + prob, result_str = stage_result + return prob is not None and result_str is not None + # Check for dict result with actual inference data + elif isinstance(stage_result, dict): + return (stage_result.get("status") != "processing" and + stage_result.get("status") != "async" and + stage_result) + else: + return stage_result is not None + except Exception: + pass + + return False + def _process_data(self, pipeline_data: PipelineData) -> PipelineData: """Process data through this stage""" try: current_data = pipeline_data.data - # Debug: Print data info - if isinstance(current_data, np.ndarray): - print(f"[Stage {self.stage_id}] Input data: shape={current_data.shape}, dtype={current_data.dtype}") - # Step 1: Input preprocessing (inter-stage) if self.input_preprocessor: if isinstance(current_data, np.ndarray): - print(f"[Stage {self.stage_id}] Applying input preprocessor...") current_data = self.input_preprocessor.process( current_data, self.multidongle.model_input_shape, 'BGR565' # Default format ) - print(f"[Stage {self.stage_id}] After input preprocess: shape={current_data.shape}, dtype={current_data.dtype}") # Step 2: Always preprocess image data for MultiDongle processed_data = None if isinstance(current_data, np.ndarray) and len(current_data.shape) == 3: # Always use MultiDongle's preprocess_frame to ensure correct format - print(f"[Stage {self.stage_id}] Preprocessing frame for MultiDongle...") processed_data = self.multidongle.preprocess_frame(current_data, 'BGR565') - print(f"[Stage {self.stage_id}] After MultiDongle preprocess: shape={processed_data.shape}, dtype={processed_data.dtype}") # Validate processed data if processed_data is None: @@ -201,15 +221,12 @@ class PipelineStage: elif isinstance(current_data, dict) and 'raw_output' in current_data: # This is result from previous stage, not suitable for direct inference - print(f"[Stage {self.stage_id}] Warning: Received processed result instead of image data") processed_data = current_data else: - print(f"[Stage {self.stage_id}] Warning: Unexpected data type: {type(current_data)}") processed_data = current_data # Step 3: MultiDongle inference if isinstance(processed_data, np.ndarray): - print(f"[Stage {self.stage_id}] Sending to MultiDongle: shape={processed_data.shape}, dtype={processed_data.dtype}") self.multidongle.put_input(processed_data, 'BGR565', block=False) # Get inference result (non-blocking, async pattern like standalone code) @@ -222,36 +239,17 @@ class PipelineStage: # Handle tuple results like (probability, result_string) prob, result_str = result if prob is not None and result_str is not None: - print(f"[Stage {self.stage_id}] Valid result: prob={prob}, result={result_str}") + print(f"[Stage {self.stage_id}] ✅ Inference result: prob={prob:.3f}, result={result_str}") inference_result = result - else: - print(f"[Stage {self.stage_id}] Invalid tuple result: prob={prob}, result={result_str}") - elif isinstance(result, dict): - if result: # Non-empty dict - print(f"[Stage {self.stage_id}] Valid dict result: {result}") - inference_result = result - else: - print(f"[Stage {self.stage_id}] Empty dict result") - else: - print(f"[Stage {self.stage_id}] Other result type: {type(result)}") + elif isinstance(result, dict) and result: # Non-empty dict + print(f"[Stage {self.stage_id}] ✅ Dict result: {result}") + inference_result = result + else: inference_result = result - else: - # No result available - this is normal in async processing - print(f"[Stage {self.stage_id}] No result available (async processing)") - inference_result = {"status": "processing"} - # Handle result status (async processing doesn't need timeout warnings) - if (inference_result is None or - (isinstance(inference_result, dict) and inference_result.get("status") == "processing")): - # This is normal in async processing - use previous result or default - print(f"[Stage {self.stage_id}] Using async processing mode") + # If no result, use default (don't spam logs) + if not inference_result: inference_result = {'probability': 0.0, 'result': 'Processing', 'status': 'async'} - elif (isinstance(inference_result, dict) and not inference_result) or \ - (isinstance(inference_result, tuple) and (not inference_result or inference_result == (None, None))): - print(f"[Stage {self.stage_id}] No valid result available") - inference_result = {'probability': 0.0, 'result': 'No Result'} - else: - print(f"[Stage {self.stage_id}] ✅ Successfully received inference result: {inference_result}") # Step 3: Output postprocessing (inter-stage) processed_result = inference_result