fix: Correct FPS calculation to count actual inference results only
Key changes: 1. FPS Calculation: Only count when stage receives actual inference results - Add _has_inference_result() method to check for valid results - Only increment processed_count when real inference result is available - This measures "inferences per second" not "frames per second" 2. Reduced Log Spam: Remove excessive preprocessing debug logs - Remove shape/dtype logs for every frame - Only log successful inference results - Keep essential error logs 3. Maintain Async Pattern: Keep non-blocking processing - Still use timeout=0.001 for get_latest_inference_result - Still use block=False for put_input - No blocking while loops Expected result: ~4 FPS (1 dongle) vs ~9 FPS (2 dongles) matching standalone code behavior. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
parent
67a1031009
commit
80275bc774
@ -139,13 +139,15 @@ class PipelineStage:
|
|||||||
# Process data through this stage
|
# Process data through this stage
|
||||||
processed_data = self._process_data(pipeline_data)
|
processed_data = self._process_data(pipeline_data)
|
||||||
|
|
||||||
# Record processing time
|
# Only count and record timing for actual inference results
|
||||||
processing_time = time.time() - start_time
|
if processed_data and self._has_inference_result(processed_data):
|
||||||
self.processing_times.append(processing_time)
|
# Record processing time
|
||||||
if len(self.processing_times) > 1000: # Keep only recent times
|
processing_time = time.time() - start_time
|
||||||
self.processing_times = self.processing_times[-500:]
|
self.processing_times.append(processing_time)
|
||||||
|
if len(self.processing_times) > 1000: # Keep only recent times
|
||||||
|
self.processing_times = self.processing_times[-500:]
|
||||||
|
|
||||||
self.processed_count += 1
|
self.processed_count += 1
|
||||||
|
|
||||||
# Put result to output queue
|
# Put result to output queue
|
||||||
try:
|
try:
|
||||||
@ -165,33 +167,51 @@ class PipelineStage:
|
|||||||
|
|
||||||
print(f"[Stage {self.stage_id}] Worker loop stopped")
|
print(f"[Stage {self.stage_id}] Worker loop stopped")
|
||||||
|
|
||||||
|
def _has_inference_result(self, processed_data) -> bool:
|
||||||
|
"""Check if processed_data contains a valid inference result"""
|
||||||
|
if not processed_data:
|
||||||
|
return False
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Check if it's a PipelineData with stage results
|
||||||
|
if hasattr(processed_data, 'stage_results') and processed_data.stage_results:
|
||||||
|
stage_result = processed_data.stage_results.get(self.stage_id)
|
||||||
|
if stage_result:
|
||||||
|
# Check for tuple result (prob, result_str)
|
||||||
|
if isinstance(stage_result, tuple) and len(stage_result) == 2:
|
||||||
|
prob, result_str = stage_result
|
||||||
|
return prob is not None and result_str is not None
|
||||||
|
# Check for dict result with actual inference data
|
||||||
|
elif isinstance(stage_result, dict):
|
||||||
|
return (stage_result.get("status") != "processing" and
|
||||||
|
stage_result.get("status") != "async" and
|
||||||
|
stage_result)
|
||||||
|
else:
|
||||||
|
return stage_result is not None
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
|
return False
|
||||||
|
|
||||||
def _process_data(self, pipeline_data: PipelineData) -> PipelineData:
|
def _process_data(self, pipeline_data: PipelineData) -> PipelineData:
|
||||||
"""Process data through this stage"""
|
"""Process data through this stage"""
|
||||||
try:
|
try:
|
||||||
current_data = pipeline_data.data
|
current_data = pipeline_data.data
|
||||||
|
|
||||||
# Debug: Print data info
|
|
||||||
if isinstance(current_data, np.ndarray):
|
|
||||||
print(f"[Stage {self.stage_id}] Input data: shape={current_data.shape}, dtype={current_data.dtype}")
|
|
||||||
|
|
||||||
# Step 1: Input preprocessing (inter-stage)
|
# Step 1: Input preprocessing (inter-stage)
|
||||||
if self.input_preprocessor:
|
if self.input_preprocessor:
|
||||||
if isinstance(current_data, np.ndarray):
|
if isinstance(current_data, np.ndarray):
|
||||||
print(f"[Stage {self.stage_id}] Applying input preprocessor...")
|
|
||||||
current_data = self.input_preprocessor.process(
|
current_data = self.input_preprocessor.process(
|
||||||
current_data,
|
current_data,
|
||||||
self.multidongle.model_input_shape,
|
self.multidongle.model_input_shape,
|
||||||
'BGR565' # Default format
|
'BGR565' # Default format
|
||||||
)
|
)
|
||||||
print(f"[Stage {self.stage_id}] After input preprocess: shape={current_data.shape}, dtype={current_data.dtype}")
|
|
||||||
|
|
||||||
# Step 2: Always preprocess image data for MultiDongle
|
# Step 2: Always preprocess image data for MultiDongle
|
||||||
processed_data = None
|
processed_data = None
|
||||||
if isinstance(current_data, np.ndarray) and len(current_data.shape) == 3:
|
if isinstance(current_data, np.ndarray) and len(current_data.shape) == 3:
|
||||||
# Always use MultiDongle's preprocess_frame to ensure correct format
|
# Always use MultiDongle's preprocess_frame to ensure correct format
|
||||||
print(f"[Stage {self.stage_id}] Preprocessing frame for MultiDongle...")
|
|
||||||
processed_data = self.multidongle.preprocess_frame(current_data, 'BGR565')
|
processed_data = self.multidongle.preprocess_frame(current_data, 'BGR565')
|
||||||
print(f"[Stage {self.stage_id}] After MultiDongle preprocess: shape={processed_data.shape}, dtype={processed_data.dtype}")
|
|
||||||
|
|
||||||
# Validate processed data
|
# Validate processed data
|
||||||
if processed_data is None:
|
if processed_data is None:
|
||||||
@ -201,15 +221,12 @@ class PipelineStage:
|
|||||||
|
|
||||||
elif isinstance(current_data, dict) and 'raw_output' in current_data:
|
elif isinstance(current_data, dict) and 'raw_output' in current_data:
|
||||||
# This is result from previous stage, not suitable for direct inference
|
# This is result from previous stage, not suitable for direct inference
|
||||||
print(f"[Stage {self.stage_id}] Warning: Received processed result instead of image data")
|
|
||||||
processed_data = current_data
|
processed_data = current_data
|
||||||
else:
|
else:
|
||||||
print(f"[Stage {self.stage_id}] Warning: Unexpected data type: {type(current_data)}")
|
|
||||||
processed_data = current_data
|
processed_data = current_data
|
||||||
|
|
||||||
# Step 3: MultiDongle inference
|
# Step 3: MultiDongle inference
|
||||||
if isinstance(processed_data, np.ndarray):
|
if isinstance(processed_data, np.ndarray):
|
||||||
print(f"[Stage {self.stage_id}] Sending to MultiDongle: shape={processed_data.shape}, dtype={processed_data.dtype}")
|
|
||||||
self.multidongle.put_input(processed_data, 'BGR565', block=False)
|
self.multidongle.put_input(processed_data, 'BGR565', block=False)
|
||||||
|
|
||||||
# Get inference result (non-blocking, async pattern like standalone code)
|
# Get inference result (non-blocking, async pattern like standalone code)
|
||||||
@ -222,36 +239,17 @@ class PipelineStage:
|
|||||||
# Handle tuple results like (probability, result_string)
|
# Handle tuple results like (probability, result_string)
|
||||||
prob, result_str = result
|
prob, result_str = result
|
||||||
if prob is not None and result_str is not None:
|
if prob is not None and result_str is not None:
|
||||||
print(f"[Stage {self.stage_id}] Valid result: prob={prob}, result={result_str}")
|
print(f"[Stage {self.stage_id}] ✅ Inference result: prob={prob:.3f}, result={result_str}")
|
||||||
inference_result = result
|
inference_result = result
|
||||||
else:
|
elif isinstance(result, dict) and result: # Non-empty dict
|
||||||
print(f"[Stage {self.stage_id}] Invalid tuple result: prob={prob}, result={result_str}")
|
print(f"[Stage {self.stage_id}] ✅ Dict result: {result}")
|
||||||
elif isinstance(result, dict):
|
inference_result = result
|
||||||
if result: # Non-empty dict
|
else:
|
||||||
print(f"[Stage {self.stage_id}] Valid dict result: {result}")
|
|
||||||
inference_result = result
|
|
||||||
else:
|
|
||||||
print(f"[Stage {self.stage_id}] Empty dict result")
|
|
||||||
else:
|
|
||||||
print(f"[Stage {self.stage_id}] Other result type: {type(result)}")
|
|
||||||
inference_result = result
|
inference_result = result
|
||||||
else:
|
|
||||||
# No result available - this is normal in async processing
|
|
||||||
print(f"[Stage {self.stage_id}] No result available (async processing)")
|
|
||||||
inference_result = {"status": "processing"}
|
|
||||||
|
|
||||||
# Handle result status (async processing doesn't need timeout warnings)
|
# If no result, use default (don't spam logs)
|
||||||
if (inference_result is None or
|
if not inference_result:
|
||||||
(isinstance(inference_result, dict) and inference_result.get("status") == "processing")):
|
|
||||||
# This is normal in async processing - use previous result or default
|
|
||||||
print(f"[Stage {self.stage_id}] Using async processing mode")
|
|
||||||
inference_result = {'probability': 0.0, 'result': 'Processing', 'status': 'async'}
|
inference_result = {'probability': 0.0, 'result': 'Processing', 'status': 'async'}
|
||||||
elif (isinstance(inference_result, dict) and not inference_result) or \
|
|
||||||
(isinstance(inference_result, tuple) and (not inference_result or inference_result == (None, None))):
|
|
||||||
print(f"[Stage {self.stage_id}] No valid result available")
|
|
||||||
inference_result = {'probability': 0.0, 'result': 'No Result'}
|
|
||||||
else:
|
|
||||||
print(f"[Stage {self.stage_id}] ✅ Successfully received inference result: {inference_result}")
|
|
||||||
|
|
||||||
# Step 3: Output postprocessing (inter-stage)
|
# Step 3: Output postprocessing (inter-stage)
|
||||||
processed_result = inference_result
|
processed_result = inference_result
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user