From 0a946c5aaa1565ad7eb000965e13ee5514481140 Mon Sep 17 00:00:00 2001 From: HuangMason320 Date: Wed, 30 Jul 2025 22:46:08 +0800 Subject: [PATCH] feat: Implement memory management and queue optimization MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Major improvements: - Add intelligent memory management for both input and output queues - Implement frame dropping strategy to prevent memory overflow - Set output queue limit to 50 results with FIFO cleanup - Add input queue management with real-time frame dropping - Filter async results from callbacks and display to reduce noise - Improve system stability and prevent queue-related hangs - Add comprehensive logging for dropped frames and results Performance enhancements: - Maintain real-time processing by prioritizing latest frames - Prevent memory accumulation that previously caused system freezes - Ensure consistent queue size reporting and FPS calculations 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude --- .../core/functions/InferencePipeline.py | 80 +++++++++++++------ .../core/functions/workflow_orchestrator.py | 2 +- cluster4npu_ui/release_note.md | 8 ++ cluster4npu_ui/ui/dialogs/deployment.py | 2 +- 4 files changed, 65 insertions(+), 27 deletions(-) create mode 100644 cluster4npu_ui/release_note.md diff --git a/cluster4npu_ui/core/functions/InferencePipeline.py b/cluster4npu_ui/core/functions/InferencePipeline.py index 48ed746..1bdceb5 100644 --- a/cluster4npu_ui/core/functions/InferencePipeline.py +++ b/cluster4npu_ui/core/functions/InferencePipeline.py @@ -526,34 +526,43 @@ class InferencePipeline: current_data.metadata['start_timestamp'] ) - try: - self.pipeline_output_queue.put(current_data, block=False) + # Only put valid inference results into output queue + if has_valid_inference: + # Manage output queue size - maintain fixed upper limit for memory management + MAX_OUTPUT_QUEUE_SIZE = 50 # Set maximum output queue size - # Only count completed results if they contain valid inference - if has_valid_inference: + # If queue is getting full, remove old results to make space + while self.pipeline_output_queue.qsize() >= MAX_OUTPUT_QUEUE_SIZE: + try: + dropped_result = self.pipeline_output_queue.get_nowait() + # Track dropped results for debugging + if not hasattr(self, '_dropped_results_count'): + self._dropped_results_count = 0 + self._dropped_results_count += 1 + except queue.Empty: + break + + try: + self.pipeline_output_queue.put(current_data, block=False) self.completed_counter += 1 # Record output timestamp for FPS calculation self._record_output_timestamp() - - # Debug: Log pipeline activity every 10 results - if self.completed_counter % 10 == 0: - print(f"[{self.pipeline_name}] Processed {self.completed_counter} results") - print(f"[{self.pipeline_name}] Queue sizes - Input: {self.pipeline_input_queue.qsize()}, Output: {self.pipeline_output_queue.qsize()}") - - # Call result callback - if self.result_callback: - self.result_callback(current_data) - except queue.Full: - # Drop oldest and add new - try: - self.pipeline_output_queue.get_nowait() - self.pipeline_output_queue.put(current_data, block=False) - # Only record timestamp and count if valid inference result - if has_valid_inference: - self._record_output_timestamp() - except queue.Empty: - pass + # Debug: Log pipeline activity every 10 results + if self.completed_counter % 10 == 0: + print(f"[{self.pipeline_name}] Processed {self.completed_counter} results") + print(f"[{self.pipeline_name}] Queue sizes - Input: {self.pipeline_input_queue.qsize()}, Output: {self.pipeline_output_queue.qsize()}") + # Show dropped results info if any + if hasattr(self, '_dropped_results_count') and self._dropped_results_count > 0: + print(f"[{self.pipeline_name}] Dropped {self._dropped_results_count} old results for memory management") + + # Call result callback for valid inference results + if self.result_callback: + self.result_callback(current_data) + + except queue.Full: + # Fallback: should rarely happen due to pre-emptive cleaning above + print(f"[{self.pipeline_name}] Warning: Output queue still full after cleanup") else: self.error_counter += 1 if self.error_callback: @@ -567,12 +576,33 @@ class InferencePipeline: print(f"[{self.pipeline_name}] Coordinator stopped") def put_data(self, data: Any, timeout: float = 1.0) -> bool: - """Put data into pipeline""" + """Put data into pipeline with memory management""" try: self.pipeline_input_queue.put(data, timeout=timeout) return True except queue.Full: - return False + # Drop oldest frames to make space for new ones (for real-time processing) + try: + dropped_data = self.pipeline_input_queue.get_nowait() + self.pipeline_input_queue.put(data, block=False) + + # Track dropped frames for debugging + if not hasattr(self, '_dropped_frames_count'): + self._dropped_frames_count = 0 + self._dropped_frames_count += 1 + + # Log occasionally to show frame dropping (every 50 drops) + if self._dropped_frames_count % 50 == 0: + print(f"[{self.pipeline_name}] Dropped {self._dropped_frames_count} input frames for real-time processing") + + return True + except queue.Empty: + # Rare case: queue became empty between full check and get + try: + self.pipeline_input_queue.put(data, block=False) + return True + except queue.Full: + return False def get_result(self, timeout: float = 0.1) -> Optional[PipelineData]: """Get result from pipeline""" diff --git a/cluster4npu_ui/core/functions/workflow_orchestrator.py b/cluster4npu_ui/core/functions/workflow_orchestrator.py index 5235ba3..4bdafe7 100644 --- a/cluster4npu_ui/core/functions/workflow_orchestrator.py +++ b/cluster4npu_ui/core/functions/workflow_orchestrator.py @@ -181,7 +181,7 @@ class WorkflowOrchestrator: self.result_callback(result_dict) except Exception as e: - print(f"❌ Error handling result: {e}") + print(f"Error handling result: {e}") def _parse_resolution(self, resolution_str: Optional[str]) -> Optional[tuple[int, int]]: """ diff --git a/cluster4npu_ui/release_note.md b/cluster4npu_ui/release_note.md new file mode 100644 index 0000000..4344cac --- /dev/null +++ b/cluster4npu_ui/release_note.md @@ -0,0 +1,8 @@ +# Release Note for Cluster4NPU +## v0.0.2 (2025/7/31) +### Update +- +### Fix +- Can't inference twice in the same app +- FPS computation +- \ No newline at end of file diff --git a/cluster4npu_ui/ui/dialogs/deployment.py b/cluster4npu_ui/ui/dialogs/deployment.py index 1525bc3..be174bb 100644 --- a/cluster4npu_ui/ui/dialogs/deployment.py +++ b/cluster4npu_ui/ui/dialogs/deployment.py @@ -366,7 +366,7 @@ class DeploymentDialog(QDialog): # Topology tab self.topology_tab = self.create_topology_tab() - self.tab_widget.addTab(self.topology_tab, "Topology Analysis") + self.tab_widget.addTab(self.topology_tab, "Analysis") # Configuration tab self.config_tab = self.create_configuration_tab()