From 67a1031009b5db4edb35423beb632845ce50b054 Mon Sep 17 00:00:00 2001 From: Masonmason Date: Thu, 24 Jul 2025 10:52:58 +0800 Subject: [PATCH] fix: Remove blocking while loop that prevented multi-dongle scaling MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The key issue was in InferencePipeline._process_data() where a 5-second while loop was blocking waiting for inference results. This completely serialized processing and prevented multiple dongles from working in parallel. Changes: - Replace blocking while loop with single non-blocking call - Use timeout=0.001 for get_latest_inference_result (async pattern) - Use block=False for put_input to prevent queue blocking - Increase worker queue timeout from 0.1s to 1.0s - Handle async processing status properly This matches the pattern from the standalone code that achieved 4.xx FPS (1 dongle) vs 9.xx FPS (2 dongles) scaling. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude --- .../core/functions/InferencePipeline.py | 71 ++++++++++--------- 1 file changed, 36 insertions(+), 35 deletions(-) diff --git a/cluster4npu_ui/core/functions/InferencePipeline.py b/cluster4npu_ui/core/functions/InferencePipeline.py index 760f672..13ed863 100644 --- a/cluster4npu_ui/core/functions/InferencePipeline.py +++ b/cluster4npu_ui/core/functions/InferencePipeline.py @@ -126,10 +126,12 @@ class PipelineStage: try: # Get input data try: - pipeline_data = self.input_queue.get(timeout=0.1) + pipeline_data = self.input_queue.get(timeout=1.0) if pipeline_data is None: # Sentinel value continue except queue.Empty: + if self._stop_event.is_set(): + break continue start_time = time.time() @@ -208,46 +210,45 @@ class PipelineStage: # Step 3: MultiDongle inference if isinstance(processed_data, np.ndarray): print(f"[Stage {self.stage_id}] Sending to MultiDongle: shape={processed_data.shape}, dtype={processed_data.dtype}") - self.multidongle.put_input(processed_data, 'BGR565') + self.multidongle.put_input(processed_data, 'BGR565', block=False) - # Get inference result with timeout + # Get inference result (non-blocking, async pattern like standalone code) + result = self.multidongle.get_latest_inference_result(timeout=0.001) + + # Process result if available inference_result = {} - timeout_start = time.time() - while time.time() - timeout_start < 5.0: # 5 second timeout - result = self.multidongle.get_latest_inference_result(timeout=0.1) - print(f"[Stage {self.stage_id}] Got result from MultiDongle: {result}") - - # Check if result is valid (not None, not (None, None)) - if result is not None: - if isinstance(result, tuple) and len(result) == 2: - # Handle tuple results like (probability, result_string) - prob, result_str = result - if prob is not None and result_str is not None: - print(f"[Stage {self.stage_id}] Valid result: prob={prob}, result={result_str}") - inference_result = result - break - else: - print(f"[Stage {self.stage_id}] Invalid tuple result: prob={prob}, result={result_str}") - elif isinstance(result, dict): - if result: # Non-empty dict - print(f"[Stage {self.stage_id}] Valid dict result: {result}") - inference_result = result - break - else: - print(f"[Stage {self.stage_id}] Empty dict result") - else: - print(f"[Stage {self.stage_id}] Other result type: {type(result)}") + if result is not None: + if isinstance(result, tuple) and len(result) == 2: + # Handle tuple results like (probability, result_string) + prob, result_str = result + if prob is not None and result_str is not None: + print(f"[Stage {self.stage_id}] Valid result: prob={prob}, result={result_str}") inference_result = result - break + else: + print(f"[Stage {self.stage_id}] Invalid tuple result: prob={prob}, result={result_str}") + elif isinstance(result, dict): + if result: # Non-empty dict + print(f"[Stage {self.stage_id}] Valid dict result: {result}") + inference_result = result + else: + print(f"[Stage {self.stage_id}] Empty dict result") else: - print(f"[Stage {self.stage_id}] No result yet, waiting...") - time.sleep(0.01) + print(f"[Stage {self.stage_id}] Other result type: {type(result)}") + inference_result = result + else: + # No result available - this is normal in async processing + print(f"[Stage {self.stage_id}] No result available (async processing)") + inference_result = {"status": "processing"} - # Check if inference_result is empty (handle both dict and tuple types) + # Handle result status (async processing doesn't need timeout warnings) if (inference_result is None or - (isinstance(inference_result, dict) and not inference_result) or - (isinstance(inference_result, tuple) and (not inference_result or inference_result == (None, None)))): - print(f"[Stage {self.stage_id}] Warning: No inference result received after 5 second timeout") + (isinstance(inference_result, dict) and inference_result.get("status") == "processing")): + # This is normal in async processing - use previous result or default + print(f"[Stage {self.stage_id}] Using async processing mode") + inference_result = {'probability': 0.0, 'result': 'Processing', 'status': 'async'} + elif (isinstance(inference_result, dict) and not inference_result) or \ + (isinstance(inference_result, tuple) and (not inference_result or inference_result == (None, None))): + print(f"[Stage {self.stage_id}] No valid result available") inference_result = {'probability': 0.0, 'result': 'No Result'} else: print(f"[Stage {self.stage_id}] ✅ Successfully received inference result: {inference_result}")