diff --git a/cluster4npu_ui/ui/dialogs/deployment.py b/cluster4npu_ui/ui/dialogs/deployment.py index 52f6253..3a40024 100644 --- a/cluster4npu_ui/ui/dialogs/deployment.py +++ b/cluster4npu_ui/ui/dialogs/deployment.py @@ -23,6 +23,8 @@ import sys import json import threading import traceback +import io +import contextlib from typing import Dict, Any, List, Optional from PyQt5.QtWidgets import ( QDialog, QVBoxLayout, QHBoxLayout, QLabel, QTextEdit, QPushButton, @@ -54,6 +56,55 @@ except ImportError as e: PIPELINE_AVAILABLE = False +class StdoutCapture: + """Context manager to capture stdout/stderr and emit to signal.""" + + def __init__(self, signal_emitter): + self.signal_emitter = signal_emitter + self.original_stdout = None + self.original_stderr = None + self.captured_output = io.StringIO() + + def __enter__(self): + self.original_stdout = sys.stdout + self.original_stderr = sys.stderr + + # Create a custom write function that both prints to original and captures + class TeeWriter: + def __init__(self, original, captured, emitter): + self.original = original + self.captured = captured + self.emitter = emitter + self._emitting = False # Prevent recursion + + def write(self, text): + # Write to original stdout/stderr (so it still appears in terminal) + self.original.write(text) + self.original.flush() + + # Capture for GUI if it's a substantial message and not already emitting + if text.strip() and not self._emitting: + self._emitting = True + try: + self.emitter(text) + finally: + self._emitting = False + + def flush(self): + self.original.flush() + + # Replace stdout and stderr with our tee writers + sys.stdout = TeeWriter(self.original_stdout, self.captured_output, self.signal_emitter) + sys.stderr = TeeWriter(self.original_stderr, self.captured_output, self.signal_emitter) + + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + # Restore original stdout/stderr + sys.stdout = self.original_stdout + sys.stderr = self.original_stderr + + class DeploymentWorker(QThread): """Worker thread for pipeline deployment to avoid blocking UI.""" @@ -67,6 +118,7 @@ class DeploymentWorker(QThread): frame_updated = pyqtSignal('PyQt_PyObject') # For live view result_updated = pyqtSignal(dict) # For inference results terminal_output = pyqtSignal(str) # For terminal output in GUI + stdout_captured = pyqtSignal(str) # For captured stdout/stderr def __init__(self, pipeline_data: Dict[str, Any]): super().__init__() @@ -123,36 +175,37 @@ class DeploymentWorker(QThread): self.deployment_completed.emit(True, "Pipeline configuration prepared successfully. Dongle system not available for actual deployment.") return - # Create InferencePipeline instance + # Create InferencePipeline instance with stdout capture try: - pipeline = converter.create_inference_pipeline(config) - - self.progress_updated.emit(80, "Initializing workflow orchestrator...") - self.deployment_started.emit() - - # Create and start the orchestrator - self.orchestrator = WorkflowOrchestrator(pipeline, config.input_config, config.output_config) - self.orchestrator.set_frame_callback(self.frame_updated.emit) - - # Set up both GUI and terminal result callbacks - def combined_result_callback(result_dict): - # Send to GUI terminal and results display - terminal_output = self._format_terminal_results(result_dict) - self.terminal_output.emit(terminal_output) - # Emit for GUI - self.result_updated.emit(result_dict) - - self.orchestrator.set_result_callback(combined_result_callback) - - - self.orchestrator.start() - - self.progress_updated.emit(100, "Pipeline deployed successfully!") - self.deployment_completed.emit(True, f"Pipeline '{config.pipeline_name}' deployed with {len(config.stage_configs)} stages") - - # Keep running until stop is requested - while not self.should_stop: - self.msleep(100) # Sleep for 100ms and check again + # Capture all stdout/stderr during pipeline operations + with StdoutCapture(self.stdout_captured.emit): + pipeline = converter.create_inference_pipeline(config) + + self.progress_updated.emit(80, "Initializing workflow orchestrator...") + self.deployment_started.emit() + + # Create and start the orchestrator + self.orchestrator = WorkflowOrchestrator(pipeline, config.input_config, config.output_config) + self.orchestrator.set_frame_callback(self.frame_updated.emit) + + # Set up both GUI and terminal result callbacks + def combined_result_callback(result_dict): + # Send to GUI terminal and results display + terminal_output = self._format_terminal_results(result_dict) + self.terminal_output.emit(terminal_output) + # Emit for GUI + self.result_updated.emit(result_dict) + + self.orchestrator.set_result_callback(combined_result_callback) + + self.orchestrator.start() + + self.progress_updated.emit(100, "Pipeline deployed successfully!") + self.deployment_completed.emit(True, f"Pipeline '{config.pipeline_name}' deployed with {len(config.stage_configs)} stages") + + # Keep running until stop is requested with continued stdout capture + while not self.should_stop: + self.msleep(100) # Sleep for 100ms and check again except Exception as e: self.error_occurred.emit(f"Pipeline deployment failed: {str(e)}") @@ -657,6 +710,7 @@ Stage Configurations: self.deployment_worker.frame_updated.connect(self.update_live_view) self.deployment_worker.result_updated.connect(self.update_inference_results) self.deployment_worker.terminal_output.connect(self.update_terminal_output) + self.deployment_worker.stdout_captured.connect(self.update_terminal_output) self.deployment_worker.start()