From af9adc8e823ccbce20d5106e843ff410431ee834 Mon Sep 17 00:00:00 2001 From: Masonmason Date: Thu, 17 Jul 2025 09:18:27 +0800 Subject: [PATCH] fix: Address file path and data processing bugs, add real-time viewer --- cluster4npu_ui/TODO.md | 8 ++-- .../core/functions/InferencePipeline.py | 3 +- .../core/functions/camera_source.py | 13 +++++- .../core/functions/result_handler.py | 24 ++++++++-- cluster4npu_ui/core/functions/video_source.py | 11 ++++- .../core/functions/workflow_orchestrator.py | 15 +++++- cluster4npu_ui/ui/dialogs/deployment.py | 46 ++++++++++++++++++- 7 files changed, 105 insertions(+), 15 deletions(-) diff --git a/cluster4npu_ui/TODO.md b/cluster4npu_ui/TODO.md index d884355..b9142fa 100644 --- a/cluster4npu_ui/TODO.md +++ b/cluster4npu_ui/TODO.md @@ -4,9 +4,11 @@ ✅ **Pipeline Core**: Multi-stage pipeline with device auto-detection working ✅ **Hardware Integration**: Kneron NPU dongles connecting and initializing successfully ✅ **Auto-resize Preprocessing**: Model input shape detection and automatic preprocessing implemented -❌ **Data Input Sources**: Missing camera and file input implementations -❌ **Result Persistence**: No result saving or output mechanisms -❌ **End-to-End Workflow**: Gaps between UI configuration and core pipeline execution +✅ **Data Input Sources**: Camera and video file inputs implemented +✅ **Result Persistence**: Result saving to file implemented +✅ **End-to-End Workflow**: UI configuration now connects to core pipeline execution +✅ **Bug Fixes**: Addressed file path and data processing issues +✅ **Real-time Viewer**: Implemented a live view for real-time inference visualization --- diff --git a/cluster4npu_ui/core/functions/InferencePipeline.py b/cluster4npu_ui/core/functions/InferencePipeline.py index cd47888..83d8914 100644 --- a/cluster4npu_ui/core/functions/InferencePipeline.py +++ b/cluster4npu_ui/core/functions/InferencePipeline.py @@ -215,7 +215,8 @@ class PipelineStage: timeout_start = time.time() while time.time() - timeout_start < 5.0: # 5 second timeout result = self.multidongle.get_latest_inference_result(timeout=0.1) - if result: + # Check if result is not None and not an empty dict + if result is not None and (not isinstance(result, dict) or result): inference_result = result break time.sleep(0.01) diff --git a/cluster4npu_ui/core/functions/camera_source.py b/cluster4npu_ui/core/functions/camera_source.py index 862a268..0d860bc 100644 --- a/cluster4npu_ui/core/functions/camera_source.py +++ b/cluster4npu_ui/core/functions/camera_source.py @@ -13,7 +13,8 @@ class CameraSource: camera_index: int = 0, resolution: Optional[tuple[int, int]] = None, fps: Optional[int] = None, - data_callback: Optional[Callable[[object], None]] = None): + data_callback: Optional[Callable[[object], None]] = None, + frame_callback: Optional[Callable[[object], None]] = None): """ Initializes the CameraSource. @@ -21,12 +22,14 @@ class CameraSource: camera_index (int): The index of the camera to use. resolution (Optional[tuple[int, int]]): The desired resolution (width, height). fps (Optional[int]): The desired frames per second. - data_callback (Optional[Callable[[object], None]]): A callback function to send data to. + data_callback (Optional[Callable[[object], None]]): A callback function to send data to the pipeline. + frame_callback (Optional[Callable[[object], None]]): A callback function for raw frame updates. """ self.camera_index = camera_index self.resolution = resolution self.fps = fps self.data_callback = data_callback + self.frame_callback = frame_callback self.cap = None self.running = False @@ -107,6 +110,12 @@ class CameraSource: self.data_callback(frame) except Exception as e: print(f"Error in data_callback: {e}") + + if self.frame_callback: + try: + self.frame_callback(frame) + except Exception as e: + print(f"Error in frame_callback: {e}") # Control frame rate if FPS is set if self.fps: diff --git a/cluster4npu_ui/core/functions/result_handler.py b/cluster4npu_ui/core/functions/result_handler.py index 5078b35..4d98b53 100644 --- a/cluster4npu_ui/core/functions/result_handler.py +++ b/cluster4npu_ui/core/functions/result_handler.py @@ -50,9 +50,18 @@ class FileOutputManager: format (str): The format to save the result in ('json' or 'csv'). """ try: + # Sanitize pipeline_name to be a valid directory name + sanitized_pipeline_name = "".join(c for c in pipeline_name if c.isalnum() or c in (' ', '_')).rstrip() + if not sanitized_pipeline_name: + sanitized_pipeline_name = "default_pipeline" + + # Ensure base_path is valid + if not self.base_path or not isinstance(self.base_path, str): + self.base_path = "./output" + # Create directory structure today = time.strftime("%Y-%m-%d") - output_dir = os.path.join(self.base_path, pipeline_name, today) + output_dir = os.path.join(self.base_path, sanitized_pipeline_name, today) os.makedirs(output_dir, exist_ok=True) # Create filename @@ -69,10 +78,15 @@ class FileOutputManager: # For CSV, we expect a list of dicts. If it's a single dict, wrap it. data_to_save = result_data if isinstance(result_data, list) else [result_data] if data_to_save: - fieldnames = list(data_to_save[0].keys()) - content = self.serializer.to_csv(data_to_save, fieldnames) - with open(file_path, 'w') as f: - f.write(content) + # Ensure all items in the list are dictionaries + if all(isinstance(item, dict) for item in data_to_save): + fieldnames = list(data_to_save[0].keys()) + content = self.serializer.to_csv(data_to_save, fieldnames) + with open(file_path, 'w') as f: + f.write(content) + else: + print(f"Error: CSV data must be a list of dictionaries.") + return else: print(f"Error: Unsupported format '{format}'") return diff --git a/cluster4npu_ui/core/functions/video_source.py b/cluster4npu_ui/core/functions/video_source.py index 623f53b..ff77915 100644 --- a/cluster4npu_ui/core/functions/video_source.py +++ b/cluster4npu_ui/core/functions/video_source.py @@ -12,17 +12,20 @@ class VideoFileSource: def __init__(self, file_path: str, data_callback: Optional[Callable[[object], None]] = None, + frame_callback: Optional[Callable[[object], None]] = None, loop: bool = False): """ Initializes the VideoFileSource. Args: file_path (str): The path to the video file. - data_callback (Optional[Callable[[object], None]]): A callback function to send data to. + data_callback (Optional[Callable[[object], None]]): A callback function to send data to the pipeline. + frame_callback (Optional[Callable[[object], None]]): A callback function for raw frame updates. loop (bool): Whether to loop the video when it ends. """ self.file_path = file_path self.data_callback = data_callback + self.frame_callback = frame_callback self.loop = loop self.cap = None @@ -105,6 +108,12 @@ class VideoFileSource: self.data_callback(frame) except Exception as e: print(f"Error in data_callback: {e}") + + if self.frame_callback: + try: + self.frame_callback(frame) + except Exception as e: + print(f"Error in frame_callback: {e}") # Control frame rate time.sleep(1.0 / self.fps) diff --git a/cluster4npu_ui/core/functions/workflow_orchestrator.py b/cluster4npu_ui/core/functions/workflow_orchestrator.py index d3aedac..cef8042 100644 --- a/cluster4npu_ui/core/functions/workflow_orchestrator.py +++ b/cluster4npu_ui/core/functions/workflow_orchestrator.py @@ -30,6 +30,7 @@ class WorkflowOrchestrator: self.result_handler = None self.running = False self._stop_event = threading.Event() + self.frame_callback = None def start(self): """ @@ -88,6 +89,12 @@ class WorkflowOrchestrator: print("Workflow orchestrator stopped.") + def set_frame_callback(self, callback): + """ + Sets the callback function for frame updates. + """ + self.frame_callback = callback + def _create_data_source(self) -> Optional[Any]: """ Creates the appropriate data source based on the input configuration. @@ -99,13 +106,17 @@ class WorkflowOrchestrator: return CameraSource( camera_index=self.input_config.get('device_id', 0), resolution=self._parse_resolution(self.input_config.get('resolution')), - fps=self.input_config.get('fps', 30) + fps=self.input_config.get('fps', 30), + data_callback=self.pipeline.put_data, + frame_callback=self.frame_callback ) elif source_type == 'file': # Assuming 'file' means video file for now return VideoFileSource( file_path=self.input_config.get('source_path', ''), - loop=True # Or get from config if available + loop=True, # Or get from config if available + data_callback=self.pipeline.put_data, + frame_callback=self.frame_callback ) # Add other source types here (e.g., 'rtsp stream', 'image file') else: diff --git a/cluster4npu_ui/ui/dialogs/deployment.py b/cluster4npu_ui/ui/dialogs/deployment.py index 4d084f8..bf1d033 100644 --- a/cluster4npu_ui/ui/dialogs/deployment.py +++ b/cluster4npu_ui/ui/dialogs/deployment.py @@ -31,7 +31,7 @@ from PyQt5.QtWidgets import ( QHeaderView, QMessageBox, QSplitter, QFrame ) from PyQt5.QtCore import Qt, QThread, pyqtSignal, QTimer -from PyQt5.QtGui import QFont, QColor, QPalette +from PyQt5.QtGui import QFont, QColor, QPalette, QImage, QPixmap # Import our converter and pipeline system sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..', 'core', 'functions')) @@ -63,6 +63,7 @@ class DeploymentWorker(QThread): deployment_started = pyqtSignal() deployment_completed = pyqtSignal(bool, str) # success, message error_occurred = pyqtSignal(str) + frame_updated = pyqtSignal(object) # For live view def __init__(self, pipeline_data: Dict[str, Any]): super().__init__() @@ -128,6 +129,7 @@ class DeploymentWorker(QThread): # Create and start the orchestrator self.orchestrator = WorkflowOrchestrator(pipeline, config.input_config, config.output_config) + self.orchestrator.set_frame_callback(self.frame_updated.emit) self.orchestrator.start() self.progress_updated.emit(100, "Pipeline deployed successfully!") @@ -188,6 +190,10 @@ class DeploymentDialog(QDialog): # Deployment tab self.deployment_tab = self.create_deployment_tab() self.tab_widget.addTab(self.deployment_tab, "Deployment") + + # Live View tab + self.live_view_tab = self.create_live_view_tab() + self.tab_widget.addTab(self.live_view_tab, "Live View") layout.addWidget(self.tab_widget) @@ -321,6 +327,30 @@ class DeploymentDialog(QDialog): layout.addWidget(status_group) return widget + + def create_live_view_tab(self) -> QWidget: + """Create the live view tab for real-time output.""" + widget = QWidget() + layout = QHBoxLayout(widget) + + # Video display + video_group = QGroupBox("Live Video Feed") + video_layout = QVBoxLayout(video_group) + self.live_view_label = QLabel("Live view will appear here after deployment.") + self.live_view_label.setAlignment(Qt.AlignCenter) + self.live_view_label.setMinimumSize(640, 480) + video_layout.addWidget(self.live_view_label) + layout.addWidget(video_group, 2) + + # Inference results + results_group = QGroupBox("Inference Results") + results_layout = QVBoxLayout(results_group) + self.results_text = QTextEdit() + self.results_text.setReadOnly(True) + results_layout.addWidget(self.results_text) + layout.addWidget(results_group, 1) + + return widget def populate_overview(self): """Populate overview tab with pipeline data.""" @@ -483,6 +513,7 @@ Stage Configurations: self.deployment_worker.deployment_started.connect(self.on_deployment_started) self.deployment_worker.deployment_completed.connect(self.on_deployment_completed) self.deployment_worker.error_occurred.connect(self.on_deployment_error) + self.deployment_worker.frame_updated.connect(self.update_live_view) self.deployment_worker.start() @@ -531,6 +562,19 @@ Stage Configurations: self.deploy_button.setEnabled(True) self.close_button.setText("Close") self.progress_bar.setVisible(False) + + def update_live_view(self, frame): + """Update the live view with a new frame.""" + try: + # Convert the OpenCV frame to a QImage + height, width, channel = frame.shape + bytes_per_line = 3 * width + q_image = QImage(frame.data, width, height, bytes_per_line, QImage.Format_RGB888).rgbSwapped() + + # Display the QImage in the QLabel + self.live_view_label.setPixmap(QPixmap.fromImage(q_image)) + except Exception as e: + print(f"Error updating live view: {e}") def apply_theme(self): """Apply consistent theme to the dialog."""