""" Pipeline Deployment Dialog This dialog handles the conversion of .mflow pipeline data to executable format and deployment to Kneron dongles using the InferencePipeline system. Main Components: - Pipeline conversion using MFlowConverter - Topology analysis and optimization - Dongle status monitoring - Real-time deployment progress - Error handling and troubleshooting Usage: from ui.dialogs.deployment import DeploymentDialog dialog = DeploymentDialog(pipeline_data, parent=self) dialog.exec_() """ import os import sys import json import threading import traceback from typing import Dict, Any, List, Optional from PyQt5.QtWidgets import ( QDialog, QVBoxLayout, QHBoxLayout, QLabel, QTextEdit, QPushButton, QProgressBar, QTabWidget, QWidget, QFormLayout, QLineEdit, QSpinBox, QCheckBox, QGroupBox, QScrollArea, QTableWidget, QTableWidgetItem, QHeaderView, QMessageBox, QSplitter, QFrame ) from PyQt5.QtCore import Qt, QThread, pyqtSignal, QTimer from PyQt5.QtGui import QFont, QColor, QPalette, QImage, QPixmap # Import our converter and pipeline system sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..', 'core', 'functions')) try: from ...core.functions.mflow_converter import MFlowConverter, PipelineConfig CONVERTER_AVAILABLE = True except ImportError as e: print(f"Warning: MFlow converter not available: {e}") CONVERTER_AVAILABLE = False try: from ...core.functions.Multidongle import MultiDongle from ...core.functions.InferencePipeline import InferencePipeline from ...core.functions.workflow_orchestrator import WorkflowOrchestrator # from workflow_orchestrator import WorkflowOrchestrator PIPELINE_AVAILABLE = True except ImportError as e: print(f"Warning: Pipeline system not available: {e}") PIPELINE_AVAILABLE = False class DeploymentWorker(QThread): """Worker thread for pipeline deployment to avoid blocking UI.""" # Signals progress_updated = pyqtSignal(int, str) # progress, message topology_analyzed = pyqtSignal(dict) # topology analysis results conversion_completed = pyqtSignal(object) # PipelineConfig object deployment_started = pyqtSignal() deployment_completed = pyqtSignal(bool, str) # success, message error_occurred = pyqtSignal(str) frame_updated = pyqtSignal('PyQt_PyObject') # For live view result_updated = pyqtSignal(dict) # For inference results def __init__(self, pipeline_data: Dict[str, Any]): super().__init__() self.pipeline_data = pipeline_data self.should_stop = False self.orchestrator = None def run(self): """Main deployment workflow.""" try: # Step 1: Convert .mflow to pipeline config self.progress_updated.emit(10, "Converting pipeline configuration...") if not CONVERTER_AVAILABLE: self.error_occurred.emit("MFlow converter not available. Please check installation.") return converter = MFlowConverter() config = converter._convert_mflow_to_config(self.pipeline_data) # Emit topology analysis results self.topology_analyzed.emit({ 'total_stages': len(config.stage_configs), 'pipeline_name': config.pipeline_name, 'input_config': config.input_config, 'output_config': config.output_config }) self.progress_updated.emit(30, "Pipeline conversion completed") self.conversion_completed.emit(config) if self.should_stop: return # Step 2: Validate configuration self.progress_updated.emit(40, "Validating pipeline configuration...") is_valid, errors = converter.validate_config(config) if not is_valid: error_msg = "Configuration validation failed:\n" + "\n".join(errors) self.error_occurred.emit(error_msg) return self.progress_updated.emit(60, "Configuration validation passed") if self.should_stop: return # Step 3: Initialize pipeline (if dongle system available) self.progress_updated.emit(70, "Initializing inference pipeline...") if not PIPELINE_AVAILABLE: self.progress_updated.emit(100, "Pipeline configuration ready (dongle system not available)") self.deployment_completed.emit(True, "Pipeline configuration prepared successfully. Dongle system not available for actual deployment.") return # Create InferencePipeline instance try: pipeline = converter.create_inference_pipeline(config) self.progress_updated.emit(80, "Initializing workflow orchestrator...") self.deployment_started.emit() # Create and start the orchestrator self.orchestrator = WorkflowOrchestrator(pipeline, config.input_config, config.output_config) self.orchestrator.set_frame_callback(self.frame_updated.emit) # Set up both GUI and terminal result callbacks def combined_result_callback(result_dict): # Print to terminal self._print_terminal_results(result_dict) # Emit for GUI self.result_updated.emit(result_dict) self.orchestrator.set_result_callback(combined_result_callback) self.orchestrator.start() self.progress_updated.emit(100, "Pipeline deployed successfully!") self.deployment_completed.emit(True, f"Pipeline '{config.pipeline_name}' deployed with {len(config.stage_configs)} stages") # Keep running until stop is requested while not self.should_stop: self.msleep(100) # Sleep for 100ms and check again except Exception as e: self.error_occurred.emit(f"Pipeline deployment failed: {str(e)}") except Exception as e: self.error_occurred.emit(f"Deployment error: {str(e)}") def stop(self): """Stop the deployment process.""" self.should_stop = True if self.orchestrator: self.orchestrator.stop() def _print_terminal_results(self, result_dict): """Print inference results to terminal with detailed formatting.""" try: from datetime import datetime # Header with timestamp timestamp = datetime.fromtimestamp(result_dict.get('timestamp', 0)).strftime("%H:%M:%S.%f")[:-3] pipeline_id = result_dict.get('pipeline_id', 'Unknown') print(f"\nšŸ”„ INFERENCE RESULT [{timestamp}]") print(f" Pipeline ID: {pipeline_id}") print(" " + "="*50) # Stage results stage_results = result_dict.get('stage_results', {}) if stage_results: for stage_id, result in stage_results.items(): print(f" šŸ“Š Stage: {stage_id}") if isinstance(result, tuple) and len(result) == 2: # Handle tuple results (result_string, probability) result_string, probability = result print(f" āœ… Result: {result_string}") print(f" šŸ“ˆ Probability: {probability:.3f}") # Add confidence level if probability > 0.8: confidence = "🟢 Very High" elif probability > 0.6: confidence = "🟔 High" elif probability > 0.4: confidence = "🟠 Medium" else: confidence = "šŸ”“ Low" print(f" šŸŽÆ Confidence: {confidence}") elif isinstance(result, dict): # Handle dict results for key, value in result.items(): if key == 'probability': print(f" šŸ“ˆ {key.title()}: {value:.3f}") elif key == 'result': print(f" āœ… {key.title()}: {value}") elif key == 'confidence': print(f" šŸŽÆ {key.title()}: {value}") elif key == 'fused_probability': print(f" šŸ”€ Fused Probability: {value:.3f}") elif key == 'individual_probs': print(f" šŸ“‹ Individual Probabilities:") for prob_key, prob_value in value.items(): print(f" {prob_key}: {prob_value:.3f}") else: print(f" šŸ“ {key}: {value}") else: # Handle other result types print(f" šŸ“ Raw Result: {result}") print() # Blank line between stages else: print(" āš ļø No stage results available") # Processing time if available metadata = result_dict.get('metadata', {}) if 'total_processing_time' in metadata: processing_time = metadata['total_processing_time'] print(f" ā±ļø Processing Time: {processing_time:.3f}s") # Add FPS calculation if processing_time > 0: fps = 1.0 / processing_time print(f" šŸš„ Theoretical FPS: {fps:.2f}") # Additional metadata if metadata: interesting_keys = ['dongle_count', 'stage_count', 'queue_sizes', 'error_count'] for key in interesting_keys: if key in metadata: print(f" šŸ“‹ {key.replace('_', ' ').title()}: {metadata[key]}") print(" " + "="*50) except Exception as e: print(f"āŒ Error printing terminal results: {e}") class DeploymentDialog(QDialog): """Main deployment dialog with comprehensive deployment management.""" def __init__(self, pipeline_data: Dict[str, Any], parent=None): super().__init__(parent) self.pipeline_data = pipeline_data self.deployment_worker = None self.pipeline_config = None self.setWindowTitle("Deploy Pipeline to Dongles") self.setMinimumSize(800, 600) self.setup_ui() self.apply_theme() def setup_ui(self): """Setup the dialog UI.""" layout = QVBoxLayout(self) # Header header_label = QLabel("Pipeline Deployment") header_label.setFont(QFont("Arial", 16, QFont.Bold)) header_label.setAlignment(Qt.AlignCenter) layout.addWidget(header_label) # Main content with tabs self.tab_widget = QTabWidget() # Overview tab self.overview_tab = self.create_overview_tab() self.tab_widget.addTab(self.overview_tab, "Overview") # Topology tab self.topology_tab = self.create_topology_tab() self.tab_widget.addTab(self.topology_tab, "Topology Analysis") # Configuration tab self.config_tab = self.create_configuration_tab() self.tab_widget.addTab(self.config_tab, "Configuration") # Deployment tab self.deployment_tab = self.create_deployment_tab() self.tab_widget.addTab(self.deployment_tab, "Deployment") # Live View tab self.live_view_tab = self.create_live_view_tab() self.tab_widget.addTab(self.live_view_tab, "Live View") layout.addWidget(self.tab_widget) # Progress bar self.progress_bar = QProgressBar() self.progress_bar.setVisible(False) layout.addWidget(self.progress_bar) # Status label self.status_label = QLabel("Ready to deploy") self.status_label.setAlignment(Qt.AlignCenter) layout.addWidget(self.status_label) # Buttons button_layout = QHBoxLayout() self.analyze_button = QPushButton("Analyze Pipeline") self.analyze_button.clicked.connect(self.analyze_pipeline) button_layout.addWidget(self.analyze_button) self.deploy_button = QPushButton("Deploy to Dongles") self.deploy_button.clicked.connect(self.start_deployment) self.deploy_button.setEnabled(False) button_layout.addWidget(self.deploy_button) self.stop_button = QPushButton("Stop Inference") self.stop_button.clicked.connect(self.stop_deployment) self.stop_button.setEnabled(False) self.stop_button.setVisible(False) button_layout.addWidget(self.stop_button) button_layout.addStretch() self.close_button = QPushButton("Close") self.close_button.clicked.connect(self.accept) button_layout.addWidget(self.close_button) layout.addLayout(button_layout) # Populate initial data self.populate_overview() def create_overview_tab(self) -> QWidget: """Create pipeline overview tab.""" widget = QWidget() layout = QVBoxLayout(widget) # Pipeline info info_group = QGroupBox("Pipeline Information") info_layout = QFormLayout(info_group) self.name_label = QLabel() self.description_label = QLabel() self.nodes_label = QLabel() self.connections_label = QLabel() info_layout.addRow("Name:", self.name_label) info_layout.addRow("Description:", self.description_label) info_layout.addRow("Nodes:", self.nodes_label) info_layout.addRow("Connections:", self.connections_label) layout.addWidget(info_group) # Nodes table nodes_group = QGroupBox("Pipeline Nodes") nodes_layout = QVBoxLayout(nodes_group) self.nodes_table = QTableWidget() self.nodes_table.setColumnCount(3) self.nodes_table.setHorizontalHeaderLabels(["Name", "Type", "Status"]) self.nodes_table.horizontalHeader().setStretchLastSection(True) nodes_layout.addWidget(self.nodes_table) layout.addWidget(nodes_group) return widget def create_topology_tab(self) -> QWidget: """Create topology analysis tab.""" widget = QWidget() layout = QVBoxLayout(widget) # Analysis results self.topology_text = QTextEdit() self.topology_text.setReadOnly(True) self.topology_text.setFont(QFont("Consolas", 10)) self.topology_text.setText("Click 'Analyze Pipeline' to see topology analysis...") layout.addWidget(self.topology_text) return widget def create_configuration_tab(self) -> QWidget: """Create configuration tab.""" widget = QWidget() layout = QVBoxLayout(widget) scroll_area = QScrollArea() scroll_content = QWidget() scroll_layout = QVBoxLayout(scroll_content) # Stage configurations will be populated after analysis self.config_content = QLabel("Run pipeline analysis to see stage configurations...") self.config_content.setAlignment(Qt.AlignCenter) scroll_layout.addWidget(self.config_content) scroll_area.setWidget(scroll_content) scroll_area.setWidgetResizable(True) layout.addWidget(scroll_area) return widget def create_deployment_tab(self) -> QWidget: """Create deployment monitoring tab.""" widget = QWidget() layout = QVBoxLayout(widget) # Deployment log log_group = QGroupBox("Deployment Log") log_layout = QVBoxLayout(log_group) self.deployment_log = QTextEdit() self.deployment_log.setReadOnly(True) self.deployment_log.setFont(QFont("Consolas", 9)) log_layout.addWidget(self.deployment_log) layout.addWidget(log_group) # Dongle status (placeholder) status_group = QGroupBox("Dongle Status") status_layout = QVBoxLayout(status_group) self.dongle_status = QLabel("No dongles detected") self.dongle_status.setAlignment(Qt.AlignCenter) status_layout.addWidget(self.dongle_status) layout.addWidget(status_group) return widget def create_live_view_tab(self) -> QWidget: """Create the live view tab for real-time output.""" widget = QWidget() layout = QHBoxLayout(widget) # Video display video_group = QGroupBox("Live Video Feed") video_layout = QVBoxLayout(video_group) self.live_view_label = QLabel("Live view will appear here after deployment.") self.live_view_label.setAlignment(Qt.AlignCenter) self.live_view_label.setMinimumSize(640, 480) video_layout.addWidget(self.live_view_label) layout.addWidget(video_group, 2) # Inference results results_group = QGroupBox("Inference Results") results_layout = QVBoxLayout(results_group) self.results_text = QTextEdit() self.results_text.setReadOnly(True) results_layout.addWidget(self.results_text) layout.addWidget(results_group, 1) return widget def populate_overview(self): """Populate overview tab with pipeline data.""" self.name_label.setText(self.pipeline_data.get('project_name', 'Untitled')) self.description_label.setText(self.pipeline_data.get('description', 'No description')) nodes = self.pipeline_data.get('nodes', []) connections = self.pipeline_data.get('connections', []) self.nodes_label.setText(str(len(nodes))) self.connections_label.setText(str(len(connections))) # Populate nodes table self.nodes_table.setRowCount(len(nodes)) for i, node in enumerate(nodes): self.nodes_table.setItem(i, 0, QTableWidgetItem(node.get('name', 'Unknown'))) self.nodes_table.setItem(i, 1, QTableWidgetItem(node.get('type', 'Unknown'))) self.nodes_table.setItem(i, 2, QTableWidgetItem("Ready")) def analyze_pipeline(self): """Analyze pipeline topology and configuration.""" if not CONVERTER_AVAILABLE: QMessageBox.warning(self, "Analysis Error", "Pipeline analyzer not available. Please check installation.") return try: self.status_label.setText("Analyzing pipeline...") self.analyze_button.setEnabled(False) # Create converter and analyze converter = MFlowConverter() config = converter._convert_mflow_to_config(self.pipeline_data) self.pipeline_config = config # Update topology tab analysis_text = f"""Pipeline Analysis Results: Name: {config.pipeline_name} Description: {config.description} Total Stages: {len(config.stage_configs)} Input Configuration: {json.dumps(config.input_config, indent=2)} Output Configuration: {json.dumps(config.output_config, indent=2)} Stage Configurations: """ for i, stage_config in enumerate(config.stage_configs, 1): analysis_text += f"\nStage {i}: {stage_config.stage_id}\n" analysis_text += f" Port IDs: {stage_config.port_ids}\n" analysis_text += f" Model Path: {stage_config.model_path}\n" analysis_text += f" SCPU Firmware: {stage_config.scpu_fw_path}\n" analysis_text += f" NCPU Firmware: {stage_config.ncpu_fw_path}\n" analysis_text += f" Upload Firmware: {stage_config.upload_fw}\n" analysis_text += f" Max Queue Size: {stage_config.max_queue_size}\n" self.topology_text.setText(analysis_text) # Update configuration tab self.update_configuration_tab(config) # Validate configuration is_valid, errors = converter.validate_config(config) if is_valid: self.status_label.setText("Pipeline analysis completed successfully") self.deploy_button.setEnabled(True) self.tab_widget.setCurrentIndex(1) # Switch to topology tab else: error_msg = "Configuration validation failed:\n" + "\n".join(errors) QMessageBox.warning(self, "Validation Error", error_msg) self.status_label.setText("Pipeline analysis failed validation") except Exception as e: QMessageBox.critical(self, "Analysis Error", f"Failed to analyze pipeline: {str(e)}") self.status_label.setText("Pipeline analysis failed") finally: self.analyze_button.setEnabled(True) def update_configuration_tab(self, config: 'PipelineConfig'): """Update configuration tab with detailed stage information.""" # Clear existing content scroll_content = QWidget() scroll_layout = QVBoxLayout(scroll_content) for i, stage_config in enumerate(config.stage_configs, 1): stage_group = QGroupBox(f"Stage {i}: {stage_config.stage_id}") stage_layout = QFormLayout(stage_group) # Create read-only fields for stage configuration model_path_edit = QLineEdit(stage_config.model_path) model_path_edit.setReadOnly(True) stage_layout.addRow("Model Path:", model_path_edit) scpu_fw_edit = QLineEdit(stage_config.scpu_fw_path) scpu_fw_edit.setReadOnly(True) stage_layout.addRow("SCPU Firmware:", scpu_fw_edit) ncpu_fw_edit = QLineEdit(stage_config.ncpu_fw_path) ncpu_fw_edit.setReadOnly(True) stage_layout.addRow("NCPU Firmware:", ncpu_fw_edit) port_ids_edit = QLineEdit(str(stage_config.port_ids)) port_ids_edit.setReadOnly(True) stage_layout.addRow("Port IDs:", port_ids_edit) queue_size_spin = QSpinBox() queue_size_spin.setValue(stage_config.max_queue_size) queue_size_spin.setReadOnly(True) stage_layout.addRow("Queue Size:", queue_size_spin) upload_fw_check = QCheckBox() upload_fw_check.setChecked(stage_config.upload_fw) upload_fw_check.setEnabled(False) stage_layout.addRow("Upload Firmware:", upload_fw_check) scroll_layout.addWidget(stage_group) # Update the configuration tab config_tab_layout = self.config_tab.layout() old_scroll_area = config_tab_layout.itemAt(0).widget() config_tab_layout.removeWidget(old_scroll_area) old_scroll_area.deleteLater() new_scroll_area = QScrollArea() new_scroll_area.setWidget(scroll_content) new_scroll_area.setWidgetResizable(True) config_tab_layout.addWidget(new_scroll_area) def start_deployment(self): """Start the deployment process.""" if not self.pipeline_config: QMessageBox.warning(self, "Deployment Error", "Please analyze the pipeline first.") return # Switch to deployment tab self.tab_widget.setCurrentIndex(3) # Setup UI for deployment self.progress_bar.setVisible(True) self.progress_bar.setValue(0) self.deploy_button.setEnabled(False) self.close_button.setText("Cancel") # Clear deployment log self.deployment_log.clear() self.deployment_log.append("Starting pipeline deployment...") # Create and start deployment worker self.deployment_worker = DeploymentWorker(self.pipeline_data) self.deployment_worker.progress_updated.connect(self.update_progress) self.deployment_worker.topology_analyzed.connect(self.update_topology_results) self.deployment_worker.conversion_completed.connect(self.on_conversion_completed) self.deployment_worker.deployment_started.connect(self.on_deployment_started) self.deployment_worker.deployment_completed.connect(self.on_deployment_completed) self.deployment_worker.error_occurred.connect(self.on_deployment_error) self.deployment_worker.frame_updated.connect(self.update_live_view) self.deployment_worker.result_updated.connect(self.update_inference_results) self.deployment_worker.start() def stop_deployment(self): """Stop the current deployment/inference.""" if self.deployment_worker and self.deployment_worker.isRunning(): reply = QMessageBox.question(self, "Stop Inference", "Are you sure you want to stop the inference?", QMessageBox.Yes | QMessageBox.No) if reply == QMessageBox.Yes: self.deployment_log.append("Stopping inference...") self.status_label.setText("Stopping inference...") # Disable stop button immediately to prevent multiple clicks self.stop_button.setEnabled(False) self.deployment_worker.stop() # Wait for worker to finish in a separate thread to avoid blocking UI def wait_for_stop(): if self.deployment_worker.wait(5000): # Wait up to 5 seconds self.deployment_log.append("Inference stopped successfully.") else: self.deployment_log.append("Warning: Inference may not have stopped cleanly.") # Update UI on main thread self.stop_button.setVisible(False) self.deploy_button.setEnabled(True) self.close_button.setText("Close") self.progress_bar.setVisible(False) self.status_label.setText("Inference stopped") self.dongle_status.setText("Pipeline stopped") import threading threading.Thread(target=wait_for_stop, daemon=True).start() def update_progress(self, value: int, message: str): """Update deployment progress.""" self.progress_bar.setValue(value) self.status_label.setText(message) self.deployment_log.append(f"[{value}%] {message}") def update_topology_results(self, results: Dict): """Update topology analysis results.""" self.deployment_log.append(f"Topology Analysis: {results['total_stages']} stages detected") def on_conversion_completed(self, config): """Handle conversion completion.""" self.deployment_log.append("Pipeline conversion completed successfully") def on_deployment_started(self): """Handle deployment start.""" self.deployment_log.append("Connecting to dongles...") self.dongle_status.setText("Initializing dongles...") # Show stop button and hide deploy button self.stop_button.setEnabled(True) self.stop_button.setVisible(True) self.deploy_button.setEnabled(False) def on_deployment_completed(self, success: bool, message: str): """Handle deployment completion.""" self.progress_bar.setValue(100) if success: self.deployment_log.append(f"SUCCESS: {message}") self.status_label.setText("Deployment completed successfully!") self.dongle_status.setText("Pipeline running on dongles") # Keep stop button visible for successful deployment self.stop_button.setEnabled(True) self.stop_button.setVisible(True) QMessageBox.information(self, "Deployment Success", message) else: self.deployment_log.append(f"FAILED: {message}") self.status_label.setText("Deployment failed") # Hide stop button for failed deployment self.stop_button.setEnabled(False) self.stop_button.setVisible(False) self.deploy_button.setEnabled(True) self.close_button.setText("Close") self.progress_bar.setVisible(False) def on_deployment_error(self, error: str): """Handle deployment error.""" self.deployment_log.append(f"ERROR: {error}") self.status_label.setText("Deployment failed") QMessageBox.critical(self, "Deployment Error", error) # Hide stop button and show deploy button on error self.stop_button.setEnabled(False) self.stop_button.setVisible(False) self.deploy_button.setEnabled(True) self.close_button.setText("Close") self.progress_bar.setVisible(False) def update_live_view(self, frame): """Update the live view with a new frame.""" try: # Convert the OpenCV frame to a QImage height, width, channel = frame.shape bytes_per_line = 3 * width q_image = QImage(frame.data, width, height, bytes_per_line, QImage.Format_RGB888).rgbSwapped() # Display the QImage in the QLabel self.live_view_label.setPixmap(QPixmap.fromImage(q_image)) except Exception as e: print(f"Error updating live view: {e}") def update_inference_results(self, result_dict): """Update the inference results display.""" try: import json from datetime import datetime # Format the results for display timestamp = datetime.fromtimestamp(result_dict.get('timestamp', 0)).strftime("%H:%M:%S.%f")[:-3] stage_results = result_dict.get('stage_results', {}) result_text = f"[{timestamp}] Pipeline ID: {result_dict.get('pipeline_id', 'Unknown')}\n" # Display results from each stage for stage_id, result in stage_results.items(): result_text += f" {stage_id}:\n" if isinstance(result, tuple) and len(result) == 2: # Handle tuple results (probability, result_string) probability, result_string = result result_text += f" Result: {result_string}\n" result_text += f" Probability: {probability:.3f}\n" elif isinstance(result, dict): # Handle dict results for key, value in result.items(): if key == 'probability': result_text += f" Probability: {value:.3f}\n" else: result_text += f" {key}: {value}\n" else: result_text += f" {result}\n" result_text += "-" * 50 + "\n" # Append to results display (keep last 100 lines) current_text = self.results_text.toPlainText() lines = current_text.split('\n') if len(lines) > 100: lines = lines[-50:] # Keep last 50 lines current_text = '\n'.join(lines) self.results_text.setPlainText(current_text + result_text) # Auto-scroll to bottom scrollbar = self.results_text.verticalScrollBar() scrollbar.setValue(scrollbar.maximum()) except Exception as e: print(f"Error updating inference results: {e}") def apply_theme(self): """Apply consistent theme to the dialog.""" self.setStyleSheet(""" QDialog { background-color: #1e1e2e; color: #cdd6f4; } QTabWidget::pane { border: 1px solid #45475a; background-color: #313244; } QTabWidget::tab-bar { alignment: center; } QTabBar::tab { background-color: #45475a; color: #cdd6f4; padding: 8px 16px; margin-right: 2px; border-top-left-radius: 4px; border-top-right-radius: 4px; } QTabBar::tab:selected { background-color: #89b4fa; color: #1e1e2e; } QTabBar::tab:hover { background-color: #585b70; } QGroupBox { font-weight: bold; border: 2px solid #45475a; border-radius: 5px; margin-top: 1ex; padding-top: 5px; } QGroupBox::title { subcontrol-origin: margin; left: 10px; padding: 0 10px 0 10px; } QPushButton { background-color: #45475a; color: #cdd6f4; border: 1px solid #6c7086; border-radius: 4px; padding: 8px 16px; font-weight: bold; } QPushButton:hover { background-color: #585b70; } QPushButton:pressed { background-color: #313244; } QPushButton:disabled { background-color: #313244; color: #6c7086; } QTextEdit, QLineEdit { background-color: #313244; color: #cdd6f4; border: 1px solid #45475a; border-radius: 4px; padding: 4px; } QTableWidget { background-color: #313244; alternate-background-color: #45475a; color: #cdd6f4; border: 1px solid #45475a; } QProgressBar { background-color: #313244; border: 1px solid #45475a; border-radius: 4px; text-align: center; } QProgressBar::chunk { background-color: #a6e3a1; border-radius: 3px; } """) def closeEvent(self, event): """Handle dialog close event.""" if self.deployment_worker and self.deployment_worker.isRunning(): reply = QMessageBox.question(self, "Cancel Deployment", "Deployment is in progress. Are you sure you want to cancel?", QMessageBox.Yes | QMessageBox.No) if reply == QMessageBox.Yes: self.deployment_worker.stop() self.deployment_worker.wait(3000) # Wait up to 3 seconds event.accept() else: event.ignore() else: event.accept()