fix: Remove blocking while loop that prevented multi-dongle scaling
The key issue was in InferencePipeline._process_data() where a 5-second while loop was blocking waiting for inference results. This completely serialized processing and prevented multiple dongles from working in parallel. Changes: - Replace blocking while loop with single non-blocking call - Use timeout=0.001 for get_latest_inference_result (async pattern) - Use block=False for put_input to prevent queue blocking - Increase worker queue timeout from 0.1s to 1.0s - Handle async processing status properly This matches the pattern from the standalone code that achieved 4.xx FPS (1 dongle) vs 9.xx FPS (2 dongles) scaling. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
parent
bc92761a83
commit
67a1031009
@ -126,10 +126,12 @@ class PipelineStage:
|
|||||||
try:
|
try:
|
||||||
# Get input data
|
# Get input data
|
||||||
try:
|
try:
|
||||||
pipeline_data = self.input_queue.get(timeout=0.1)
|
pipeline_data = self.input_queue.get(timeout=1.0)
|
||||||
if pipeline_data is None: # Sentinel value
|
if pipeline_data is None: # Sentinel value
|
||||||
continue
|
continue
|
||||||
except queue.Empty:
|
except queue.Empty:
|
||||||
|
if self._stop_event.is_set():
|
||||||
|
break
|
||||||
continue
|
continue
|
||||||
|
|
||||||
start_time = time.time()
|
start_time = time.time()
|
||||||
@ -208,46 +210,45 @@ class PipelineStage:
|
|||||||
# Step 3: MultiDongle inference
|
# Step 3: MultiDongle inference
|
||||||
if isinstance(processed_data, np.ndarray):
|
if isinstance(processed_data, np.ndarray):
|
||||||
print(f"[Stage {self.stage_id}] Sending to MultiDongle: shape={processed_data.shape}, dtype={processed_data.dtype}")
|
print(f"[Stage {self.stage_id}] Sending to MultiDongle: shape={processed_data.shape}, dtype={processed_data.dtype}")
|
||||||
self.multidongle.put_input(processed_data, 'BGR565')
|
self.multidongle.put_input(processed_data, 'BGR565', block=False)
|
||||||
|
|
||||||
# Get inference result with timeout
|
# Get inference result (non-blocking, async pattern like standalone code)
|
||||||
|
result = self.multidongle.get_latest_inference_result(timeout=0.001)
|
||||||
|
|
||||||
|
# Process result if available
|
||||||
inference_result = {}
|
inference_result = {}
|
||||||
timeout_start = time.time()
|
if result is not None:
|
||||||
while time.time() - timeout_start < 5.0: # 5 second timeout
|
if isinstance(result, tuple) and len(result) == 2:
|
||||||
result = self.multidongle.get_latest_inference_result(timeout=0.1)
|
# Handle tuple results like (probability, result_string)
|
||||||
print(f"[Stage {self.stage_id}] Got result from MultiDongle: {result}")
|
prob, result_str = result
|
||||||
|
if prob is not None and result_str is not None:
|
||||||
# Check if result is valid (not None, not (None, None))
|
print(f"[Stage {self.stage_id}] Valid result: prob={prob}, result={result_str}")
|
||||||
if result is not None:
|
|
||||||
if isinstance(result, tuple) and len(result) == 2:
|
|
||||||
# Handle tuple results like (probability, result_string)
|
|
||||||
prob, result_str = result
|
|
||||||
if prob is not None and result_str is not None:
|
|
||||||
print(f"[Stage {self.stage_id}] Valid result: prob={prob}, result={result_str}")
|
|
||||||
inference_result = result
|
|
||||||
break
|
|
||||||
else:
|
|
||||||
print(f"[Stage {self.stage_id}] Invalid tuple result: prob={prob}, result={result_str}")
|
|
||||||
elif isinstance(result, dict):
|
|
||||||
if result: # Non-empty dict
|
|
||||||
print(f"[Stage {self.stage_id}] Valid dict result: {result}")
|
|
||||||
inference_result = result
|
|
||||||
break
|
|
||||||
else:
|
|
||||||
print(f"[Stage {self.stage_id}] Empty dict result")
|
|
||||||
else:
|
|
||||||
print(f"[Stage {self.stage_id}] Other result type: {type(result)}")
|
|
||||||
inference_result = result
|
inference_result = result
|
||||||
break
|
else:
|
||||||
|
print(f"[Stage {self.stage_id}] Invalid tuple result: prob={prob}, result={result_str}")
|
||||||
|
elif isinstance(result, dict):
|
||||||
|
if result: # Non-empty dict
|
||||||
|
print(f"[Stage {self.stage_id}] Valid dict result: {result}")
|
||||||
|
inference_result = result
|
||||||
|
else:
|
||||||
|
print(f"[Stage {self.stage_id}] Empty dict result")
|
||||||
else:
|
else:
|
||||||
print(f"[Stage {self.stage_id}] No result yet, waiting...")
|
print(f"[Stage {self.stage_id}] Other result type: {type(result)}")
|
||||||
time.sleep(0.01)
|
inference_result = result
|
||||||
|
else:
|
||||||
|
# No result available - this is normal in async processing
|
||||||
|
print(f"[Stage {self.stage_id}] No result available (async processing)")
|
||||||
|
inference_result = {"status": "processing"}
|
||||||
|
|
||||||
# Check if inference_result is empty (handle both dict and tuple types)
|
# Handle result status (async processing doesn't need timeout warnings)
|
||||||
if (inference_result is None or
|
if (inference_result is None or
|
||||||
(isinstance(inference_result, dict) and not inference_result) or
|
(isinstance(inference_result, dict) and inference_result.get("status") == "processing")):
|
||||||
(isinstance(inference_result, tuple) and (not inference_result or inference_result == (None, None)))):
|
# This is normal in async processing - use previous result or default
|
||||||
print(f"[Stage {self.stage_id}] Warning: No inference result received after 5 second timeout")
|
print(f"[Stage {self.stage_id}] Using async processing mode")
|
||||||
|
inference_result = {'probability': 0.0, 'result': 'Processing', 'status': 'async'}
|
||||||
|
elif (isinstance(inference_result, dict) and not inference_result) or \
|
||||||
|
(isinstance(inference_result, tuple) and (not inference_result or inference_result == (None, None))):
|
||||||
|
print(f"[Stage {self.stage_id}] No valid result available")
|
||||||
inference_result = {'probability': 0.0, 'result': 'No Result'}
|
inference_result = {'probability': 0.0, 'result': 'No Result'}
|
||||||
else:
|
else:
|
||||||
print(f"[Stage {self.stage_id}] ✅ Successfully received inference result: {inference_result}")
|
print(f"[Stage {self.stage_id}] ✅ Successfully received inference result: {inference_result}")
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user