Masonmason cde1aac908 debug: Add comprehensive logging to diagnose pipeline hanging issue
- Add pipeline activity logging every 10 results to track processing
- Add queue size monitoring in InferencePipeline coordinator
- Add camera frame capture logging every 100 frames
- Add MultiDongle send/receive thread logging every 100 operations
- Add error handling for repeated callback failures in camera source

This will help identify where the pipeline stops processing:
- Camera capture stopping
- MultiDongle threads blocking
- Pipeline coordinator hanging
- Queue capacity issues

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-07-24 19:49:00 +08:00

152 lines
5.1 KiB
Python

import cv2
import threading
import time
from typing import Optional, Callable
class CameraSource:
"""
A class to handle camera input using cv2.VideoCapture.
It captures frames in a separate thread and can send them to a pipeline.
"""
def __init__(self,
camera_index: int = 0,
resolution: Optional[tuple[int, int]] = None,
fps: Optional[int] = None,
data_callback: Optional[Callable[[object], None]] = None,
frame_callback: Optional[Callable[[object], None]] = None):
"""
Initializes the CameraSource.
Args:
camera_index (int): The index of the camera to use.
resolution (Optional[tuple[int, int]]): The desired resolution (width, height).
fps (Optional[int]): The desired frames per second.
data_callback (Optional[Callable[[object], None]]): A callback function to send data to the pipeline.
frame_callback (Optional[Callable[[object], None]]): A callback function for raw frame updates.
"""
self.camera_index = camera_index
self.resolution = resolution
self.fps = fps
self.data_callback = data_callback
self.frame_callback = frame_callback
self.cap = None
self.running = False
self.thread = None
self._stop_event = threading.Event()
def initialize(self) -> bool:
"""
Initializes the camera capture.
Returns:
bool: True if initialization is successful, False otherwise.
"""
print(f"Initializing camera at index {self.camera_index}...")
self.cap = cv2.VideoCapture(self.camera_index)
if not self.cap.isOpened():
print(f"Error: Could not open camera at index {self.camera_index}.")
return False
if self.resolution:
self.cap.set(cv2.CAP_PROP_FRAME_WIDTH, self.resolution[0])
self.cap.set(cv2.CAP_PROP_FRAME_HEIGHT, self.resolution[1])
if self.fps:
self.cap.set(cv2.CAP_PROP_FPS, self.fps)
print("Camera initialized successfully.")
return True
def start(self):
"""
Starts the frame capture thread.
"""
if self.running:
print("Camera source is already running.")
return
if not self.cap or not self.cap.isOpened():
if not self.initialize():
return
self.running = True
self._stop_event.clear()
self.thread = threading.Thread(target=self._capture_loop, daemon=True)
self.thread.start()
print("Camera capture thread started.")
def stop(self):
"""
Stops the frame capture thread.
"""
self.running = False
if self.thread and self.thread.is_alive():
self._stop_event.set()
self.thread.join(timeout=2)
if self.cap and self.cap.isOpened():
self.cap.release()
self.cap = None
print("Camera source stopped.")
def _capture_loop(self):
"""
The main loop for capturing frames from the camera.
"""
frame_count = 0
while self.running and not self._stop_event.is_set():
ret, frame = self.cap.read()
if not ret:
print("Error: Could not read frame from camera. Reconnecting...")
self.cap.release()
time.sleep(1)
self.initialize()
continue
frame_count += 1
# Debug: Log camera activity every 100 frames
if frame_count % 100 == 0:
print(f"[Camera] Captured {frame_count} frames")
if self.data_callback:
try:
# Assuming the callback is thread-safe or handles its own locking
self.data_callback(frame)
except Exception as e:
print(f"Error in data_callback: {e}")
# If callback fails repeatedly, camera might need to stop
if frame_count > 10: # Allow some initial failures
print("Too many callback failures, stopping camera")
break
if self.frame_callback:
try:
self.frame_callback(frame)
except Exception as e:
print(f"Error in frame_callback: {e}")
# Control frame rate if FPS is set
if self.fps:
time.sleep(1.0 / self.fps)
def set_data_callback(self, callback: Callable[[object], None]):
"""
Sets the data callback function.
"""
self.data_callback = callback
def get_frame(self) -> Optional[object]:
"""
Gets a single frame from the camera. Not recommended for continuous capture.
"""
if not self.cap or not self.cap.isOpened():
if not self.initialize():
return None
ret, frame = self.cap.read()
if not ret:
return None
return frame