diff --git a/cluster4npu_ui/core/functions/InferencePipeline.py b/cluster4npu_ui/core/functions/InferencePipeline.py index 760f672..13ed863 100644 --- a/cluster4npu_ui/core/functions/InferencePipeline.py +++ b/cluster4npu_ui/core/functions/InferencePipeline.py @@ -126,10 +126,12 @@ class PipelineStage: try: # Get input data try: - pipeline_data = self.input_queue.get(timeout=0.1) + pipeline_data = self.input_queue.get(timeout=1.0) if pipeline_data is None: # Sentinel value continue except queue.Empty: + if self._stop_event.is_set(): + break continue start_time = time.time() @@ -208,46 +210,45 @@ class PipelineStage: # Step 3: MultiDongle inference if isinstance(processed_data, np.ndarray): print(f"[Stage {self.stage_id}] Sending to MultiDongle: shape={processed_data.shape}, dtype={processed_data.dtype}") - self.multidongle.put_input(processed_data, 'BGR565') + self.multidongle.put_input(processed_data, 'BGR565', block=False) - # Get inference result with timeout + # Get inference result (non-blocking, async pattern like standalone code) + result = self.multidongle.get_latest_inference_result(timeout=0.001) + + # Process result if available inference_result = {} - timeout_start = time.time() - while time.time() - timeout_start < 5.0: # 5 second timeout - result = self.multidongle.get_latest_inference_result(timeout=0.1) - print(f"[Stage {self.stage_id}] Got result from MultiDongle: {result}") - - # Check if result is valid (not None, not (None, None)) - if result is not None: - if isinstance(result, tuple) and len(result) == 2: - # Handle tuple results like (probability, result_string) - prob, result_str = result - if prob is not None and result_str is not None: - print(f"[Stage {self.stage_id}] Valid result: prob={prob}, result={result_str}") - inference_result = result - break - else: - print(f"[Stage {self.stage_id}] Invalid tuple result: prob={prob}, result={result_str}") - elif isinstance(result, dict): - if result: # Non-empty dict - print(f"[Stage {self.stage_id}] Valid dict result: {result}") - inference_result = result - break - else: - print(f"[Stage {self.stage_id}] Empty dict result") - else: - print(f"[Stage {self.stage_id}] Other result type: {type(result)}") + if result is not None: + if isinstance(result, tuple) and len(result) == 2: + # Handle tuple results like (probability, result_string) + prob, result_str = result + if prob is not None and result_str is not None: + print(f"[Stage {self.stage_id}] Valid result: prob={prob}, result={result_str}") inference_result = result - break + else: + print(f"[Stage {self.stage_id}] Invalid tuple result: prob={prob}, result={result_str}") + elif isinstance(result, dict): + if result: # Non-empty dict + print(f"[Stage {self.stage_id}] Valid dict result: {result}") + inference_result = result + else: + print(f"[Stage {self.stage_id}] Empty dict result") else: - print(f"[Stage {self.stage_id}] No result yet, waiting...") - time.sleep(0.01) + print(f"[Stage {self.stage_id}] Other result type: {type(result)}") + inference_result = result + else: + # No result available - this is normal in async processing + print(f"[Stage {self.stage_id}] No result available (async processing)") + inference_result = {"status": "processing"} - # Check if inference_result is empty (handle both dict and tuple types) + # Handle result status (async processing doesn't need timeout warnings) if (inference_result is None or - (isinstance(inference_result, dict) and not inference_result) or - (isinstance(inference_result, tuple) and (not inference_result or inference_result == (None, None)))): - print(f"[Stage {self.stage_id}] Warning: No inference result received after 5 second timeout") + (isinstance(inference_result, dict) and inference_result.get("status") == "processing")): + # This is normal in async processing - use previous result or default + print(f"[Stage {self.stage_id}] Using async processing mode") + inference_result = {'probability': 0.0, 'result': 'Processing', 'status': 'async'} + elif (isinstance(inference_result, dict) and not inference_result) or \ + (isinstance(inference_result, tuple) and (not inference_result or inference_result == (None, None))): + print(f"[Stage {self.stage_id}] No valid result available") inference_result = {'probability': 0.0, 'result': 'No Result'} else: print(f"[Stage {self.stage_id}] ✅ Successfully received inference result: {inference_result}")