142 lines
4.6 KiB
Python
142 lines
4.6 KiB
Python
|
|
import cv2
|
|
import threading
|
|
import time
|
|
from typing import Optional, Callable
|
|
|
|
class CameraSource:
|
|
"""
|
|
A class to handle camera input using cv2.VideoCapture.
|
|
It captures frames in a separate thread and can send them to a pipeline.
|
|
"""
|
|
def __init__(self,
|
|
camera_index: int = 0,
|
|
resolution: Optional[tuple[int, int]] = None,
|
|
fps: Optional[int] = None,
|
|
data_callback: Optional[Callable[[object], None]] = None,
|
|
frame_callback: Optional[Callable[[object], None]] = None):
|
|
"""
|
|
Initializes the CameraSource.
|
|
|
|
Args:
|
|
camera_index (int): The index of the camera to use.
|
|
resolution (Optional[tuple[int, int]]): The desired resolution (width, height).
|
|
fps (Optional[int]): The desired frames per second.
|
|
data_callback (Optional[Callable[[object], None]]): A callback function to send data to the pipeline.
|
|
frame_callback (Optional[Callable[[object], None]]): A callback function for raw frame updates.
|
|
"""
|
|
self.camera_index = camera_index
|
|
self.resolution = resolution
|
|
self.fps = fps
|
|
self.data_callback = data_callback
|
|
self.frame_callback = frame_callback
|
|
|
|
self.cap = None
|
|
self.running = False
|
|
self.thread = None
|
|
self._stop_event = threading.Event()
|
|
|
|
def initialize(self) -> bool:
|
|
"""
|
|
Initializes the camera capture.
|
|
|
|
Returns:
|
|
bool: True if initialization is successful, False otherwise.
|
|
"""
|
|
print(f"Initializing camera at index {self.camera_index}...")
|
|
self.cap = cv2.VideoCapture(self.camera_index)
|
|
if not self.cap.isOpened():
|
|
print(f"Error: Could not open camera at index {self.camera_index}.")
|
|
return False
|
|
|
|
if self.resolution:
|
|
self.cap.set(cv2.CAP_PROP_FRAME_WIDTH, self.resolution[0])
|
|
self.cap.set(cv2.CAP_PROP_FRAME_HEIGHT, self.resolution[1])
|
|
|
|
if self.fps:
|
|
self.cap.set(cv2.CAP_PROP_FPS, self.fps)
|
|
|
|
print("Camera initialized successfully.")
|
|
return True
|
|
|
|
def start(self):
|
|
"""
|
|
Starts the frame capture thread.
|
|
"""
|
|
if self.running:
|
|
print("Camera source is already running.")
|
|
return
|
|
|
|
if not self.cap or not self.cap.isOpened():
|
|
if not self.initialize():
|
|
return
|
|
|
|
self.running = True
|
|
self._stop_event.clear()
|
|
self.thread = threading.Thread(target=self._capture_loop, daemon=True)
|
|
self.thread.start()
|
|
print("Camera capture thread started.")
|
|
|
|
def stop(self):
|
|
"""
|
|
Stops the frame capture thread.
|
|
"""
|
|
self.running = False
|
|
if self.thread and self.thread.is_alive():
|
|
self._stop_event.set()
|
|
self.thread.join(timeout=2)
|
|
|
|
if self.cap and self.cap.isOpened():
|
|
self.cap.release()
|
|
self.cap = None
|
|
print("Camera source stopped.")
|
|
|
|
def _capture_loop(self):
|
|
"""
|
|
The main loop for capturing frames from the camera.
|
|
"""
|
|
while self.running and not self._stop_event.is_set():
|
|
ret, frame = self.cap.read()
|
|
if not ret:
|
|
print("Error: Could not read frame from camera. Reconnecting...")
|
|
self.cap.release()
|
|
time.sleep(1)
|
|
self.initialize()
|
|
continue
|
|
|
|
if self.data_callback:
|
|
try:
|
|
# Assuming the callback is thread-safe or handles its own locking
|
|
self.data_callback(frame)
|
|
except Exception as e:
|
|
print(f"Error in data_callback: {e}")
|
|
|
|
if self.frame_callback:
|
|
try:
|
|
self.frame_callback(frame)
|
|
except Exception as e:
|
|
print(f"Error in frame_callback: {e}")
|
|
|
|
# Control frame rate if FPS is set
|
|
if self.fps:
|
|
time.sleep(1.0 / self.fps)
|
|
|
|
def set_data_callback(self, callback: Callable[[object], None]):
|
|
"""
|
|
Sets the data callback function.
|
|
"""
|
|
self.data_callback = callback
|
|
|
|
def get_frame(self) -> Optional[object]:
|
|
"""
|
|
Gets a single frame from the camera. Not recommended for continuous capture.
|
|
"""
|
|
if not self.cap or not self.cap.isOpened():
|
|
if not self.initialize():
|
|
return None
|
|
|
|
ret, frame = self.cap.read()
|
|
if not ret:
|
|
return None
|
|
return frame
|