KNEO-Academy/src/controllers/media_controller.py
HuangMason320 17deba3bdb Refactor: Clean up codebase and improve documentation
- Remove unused files (model_controller.py, model_service.py, device_connection_popup.py)
- Clean up commented code in device_service.py, device_popup.py, config.py
- Update docstrings and comments across all modules
- Improve code organization and readability
2025-12-30 16:47:31 +08:00

328 lines
13 KiB
Python

"""
media_controller.py - Media Controller
This module handles media operations including camera capture, video/audio
recording, screenshot capture, and image display with bounding box overlay.
"""
import cv2
import os
from PyQt5.QtWidgets import QFileDialog
from PyQt5.QtGui import QPixmap, QPainter, QPen, QFont, QColor
from PyQt5.QtCore import Qt, QRect
from src.models.video_thread import VideoThread
from src.utils.image_utils import qimage_to_numpy
class MediaController:
"""
Controller class for managing media operations.
Attributes:
main_window: Reference to the main application window
inference_controller: Reference to the inference controller
video_thread: Thread for camera video capture
recording: Flag indicating if video recording is active
recording_audio: Flag indicating if audio recording is active
recorded_frames: List of recorded video frames
"""
def __init__(self, main_window, inference_controller):
"""
Initialize the MediaController.
Args:
main_window: Reference to the main application window.
inference_controller: Reference to the inference controller.
"""
self.main_window = main_window
self.inference_controller = inference_controller
self.video_thread = None
self.recording = False
self.recording_audio = False
self.recorded_frames = []
self._signal_was_connected = False # Track if signal was previously connected
self._inference_paused = False # Track if inference is paused
def start_camera(self):
"""
Start the camera for video capture.
Initializes the video thread and connects the signal for frame updates.
"""
try:
if self.video_thread is None:
print("Initializing camera thread...")
# Clear any text or image on the canvas
if hasattr(self.main_window, 'canvas_label'):
self.main_window.canvas_label.clear()
self.video_thread = VideoThread()
if not self._signal_was_connected:
try:
self.video_thread.change_pixmap_signal.connect(self.update_image)
self._signal_was_connected = True
print("Camera signal connected successfully")
except Exception as e:
print(f"Error connecting camera signal: {e}")
# Start camera thread
self.video_thread.start()
print("Camera thread started successfully")
else:
print("Camera is already running")
except Exception as e:
print(f"Error starting camera: {e}")
import traceback
print(traceback.format_exc())
def stop_camera(self):
"""
Stop the camera and release resources.
Disconnects signal connections and stops the video thread.
"""
try:
if self.video_thread is not None:
print("Stopping camera thread")
# Disconnect signal connection first
if self._signal_was_connected:
try:
self.video_thread.change_pixmap_signal.disconnect()
self._signal_was_connected = False
print("Camera signal disconnected")
except Exception as e:
print(f"Error disconnecting signal: {e}")
# Stop thread
self.video_thread.stop()
self.video_thread = None
print("Camera completely stopped")
except Exception as e:
print(f"Error stopping camera: {e}")
import traceback
print(traceback.format_exc())
def update_image(self, qt_image):
"""
Update the image display and process inference.
Args:
qt_image (QImage): The image frame from the camera.
"""
try:
# Update image on canvas
if hasattr(self.main_window, 'canvas_label'):
pixmap = QPixmap.fromImage(qt_image)
# If there are bounding boxes, draw them
if hasattr(self.main_window, 'current_bounding_boxes') and self.main_window.current_bounding_boxes is not None:
painter = QPainter(pixmap)
pen = QPen(Qt.red)
pen.setWidth(2)
painter.setPen(pen)
# Iterate and draw all bounding boxes
for bbox_info in self.main_window.current_bounding_boxes:
# Ensure bounding box info is valid
if isinstance(bbox_info, list) and len(bbox_info) >= 4:
# Draw rectangle
x1, y1, x2, y2 = bbox_info[0], bbox_info[1], bbox_info[2], bbox_info[3]
painter.drawRect(QRect(x1, y1, x2 - x1, y2 - y1))
# If there's a label (5th element of bounding box), draw it
if len(bbox_info) > 4 and bbox_info[4]:
font = QFont()
font.setPointSize(10)
painter.setFont(font)
painter.setPen(QColor(255, 0, 0)) # Red color
# Calculate label position (above bounding box)
label_x = x1
label_y = y1 - 10
# Ensure label is within canvas bounds
if label_y < 10:
label_y = y2 + 15 # If not enough space above, place below
painter.drawText(label_x, label_y, str(bbox_info[4]))
painter.end()
# Display image
self.main_window.canvas_label.setPixmap(pixmap)
# Only add frame to inference queue if inference is not paused
if not self._inference_paused:
frame_np = qimage_to_numpy(qt_image)
self.inference_controller.add_frame_to_queue(frame_np)
except Exception as e:
print(f"Error updating image: {e}")
import traceback
print(traceback.format_exc())
def reconnect_camera_signal(self):
"""Reconnect the camera signal if it was previously disconnected"""
if self.video_thread is not None and self._signal_was_connected:
try:
# Check if the signal is already connected to avoid duplicate connections
# PyQt doesn't provide a direct way to check if a signal is connected,
# so we use a try-except block
try:
# Attempt to disconnect first to avoid multiple connections
self.video_thread.change_pixmap_signal.disconnect(self.update_image)
except TypeError:
# Signal was not connected, which is fine
pass
# Reconnect the signal
self.video_thread.change_pixmap_signal.connect(self.update_image)
print("Camera signal reconnected")
except Exception as e:
print(f"Error reconnecting camera signal: {e}")
def record_video(self, button=None):
"""Start or stop video recording"""
if not self.recording:
try:
self.recording = True
self.recorded_frames = []
print("Started video recording")
if button:
button.setStyleSheet("""
QPushButton {
background: rgba(255, 0, 0, 0.3);
border: 1px solid red;
border-radius: 10px;
}
""")
except Exception as e:
print(f"Error starting video recording: {e}")
else:
try:
self.recording = False
print("Stopped video recording")
if self.recorded_frames:
filename = QFileDialog.getSaveFileName(
self.main_window,
"Save Video",
"",
"Video Files (*.avi)"
)[0]
if filename:
height, width = self.recorded_frames[0].shape[:2]
out = cv2.VideoWriter(
filename,
cv2.VideoWriter_fourcc(*'XVID'),
20.0,
(width, height)
)
for frame in self.recorded_frames:
out.write(frame)
out.release()
print(f"Video saved to {filename}")
if button:
button.setStyleSheet("""
QPushButton {
background: transparent;
border: 1px transparent;
border-radius: 10px;
}
QPushButton:hover {
background-color: rgba(255, 255, 255, 50);
}
""")
except Exception as e:
print(f"Error stopping video recording: {e}")
def record_audio(self, button=None):
"""Start or stop audio recording"""
if not self.recording_audio:
try:
self.recording_audio = True
print("Started audio recording")
if button:
button.setStyleSheet("""
QPushButton {
background: rgba(255, 0, 0, 0.3);
border: 1px solid red;
border-radius: 10px;
}
""")
except Exception as e:
print(f"Error starting audio recording: {e}")
else:
try:
self.recording_audio = False
print("Stopped audio recording")
if button:
button.setStyleSheet("""
QPushButton {
background: transparent;
border: 1px transparent;
border-radius: 10px;
}
QPushButton:hover {
background-color: rgba(255, 255, 255, 50);
}
""")
except Exception as e:
print(f"Error stopping audio recording: {e}")
def take_screenshot(self):
"""Take a screenshot of the current frame"""
try:
if self.main_window.canvas_label.pixmap():
filename = QFileDialog.getSaveFileName(
self.main_window,
"Save Screenshot",
"",
"Image Files (*.png *.jpg)"
)[0]
if filename:
self.main_window.canvas_label.pixmap().save(filename)
print(f"Screenshot saved to {filename}")
except Exception as e:
print(f"Error taking screenshot: {e}")
def toggle_inference_pause(self):
"""
Toggle the inference pause state.
Returns:
bool: The new pause state (True if paused, False if running).
"""
try:
self._inference_paused = not self._inference_paused
if self._inference_paused:
# Clear bounding boxes when paused
self.main_window.current_bounding_boxes = None
else:
# When resuming inference, ensure camera is still running
if self.video_thread is None or not self.video_thread.isRunning():
self.start_camera()
return self._inference_paused
except Exception as e:
print(f"Error toggling inference pause state: {e}")
import traceback
print(traceback.format_exc())
return self._inference_paused
def is_inference_paused(self):
"""
Check if inference is paused.
Returns:
bool: True if inference is paused, False otherwise.
"""
return self._inference_paused