""" MinIO storage helper for workers. Provides upload/download functionality to replace Docker Shared Volume. Workers download inputs from MinIO to local temp dir, process, then upload results. Uses boto3 (S3-compatible API) to communicate with MinIO. """ import logging import os import shutil from typing import Optional import boto3 from botocore.config import Config as BotoConfig logger = logging.getLogger(__name__) class MinIOStorage: """MinIO storage client for job file exchange.""" def __init__( self, endpoint_url: str = None, bucket: str = None, access_key: str = None, secret_key: str = None, region: str = None, lifecycle_days: int = None, ): self.endpoint_url = endpoint_url or os.environ.get("MINIO_ENDPOINT_URL", "http://192.168.0.130:9000") self.bucket = bucket or os.environ.get("MINIO_BUCKET", "convertet-working-space") self.access_key = access_key or os.environ.get("MINIO_ACCESS_KEY", "convuser") self.secret_key = secret_key or os.environ.get("MINIO_SECRET_KEY", "") self.region = region or os.environ.get("MINIO_REGION", "us-east-1") self.lifecycle_days = lifecycle_days if lifecycle_days is not None else int(os.environ.get("MINIO_LIFECYCLE_DAYS", "7")) self.client = boto3.client( "s3", endpoint_url=self.endpoint_url, aws_access_key_id=self.access_key, aws_secret_access_key=self.secret_key, region_name=self.region, config=BotoConfig(signature_version="s3v4"), ) self._ensure_bucket() if self.lifecycle_days > 0: self._ensure_lifecycle_rule() def _ensure_bucket(self): """Verify bucket exists (do not auto-create — bucket is managed externally).""" try: self.client.head_bucket(Bucket=self.bucket) logger.info(f"MinIO bucket verified: {self.bucket}") except Exception as e: logger.error(f"MinIO bucket '{self.bucket}' is not accessible: {e}") raise def _ensure_lifecycle_rule(self): """Set lifecycle rule to auto-expire objects under jobs/ prefix.""" rule_id = "auto-cleanup-jobs" try: self.client.put_bucket_lifecycle_configuration( Bucket=self.bucket, LifecycleConfiguration={ "Rules": [ { "ID": rule_id, "Status": "Enabled", "Filter": {"Prefix": "jobs/"}, "Expiration": {"Days": self.lifecycle_days}, } ] }, ) logger.info(f"MinIO lifecycle rule set: jobs/* expire after {self.lifecycle_days} days") except Exception as e: logger.warning(f"Could not set lifecycle rule: {e}") def upload_file(self, local_path: str, key: str): """Upload a local file to MinIO.""" self.client.upload_file(local_path, self.bucket, key) logger.debug(f"Uploaded {local_path} -> minio://{self.bucket}/{key}") def upload_data(self, data: bytes, key: str): """Upload raw bytes to MinIO.""" self.client.put_object(Bucket=self.bucket, Key=key, Body=data) logger.debug(f"Uploaded {len(data)} bytes -> minio://{self.bucket}/{key}") def download_file(self, key: str, local_path: str): """Download a file from MinIO to local path.""" os.makedirs(os.path.dirname(local_path), exist_ok=True) self.client.download_file(self.bucket, key, local_path) logger.debug(f"Downloaded minio://{self.bucket}/{key} -> {local_path}") def download_to_stream(self, key: str): """Get a streaming body for a MinIO object.""" response = self.client.get_object(Bucket=self.bucket, Key=key) return response["Body"] def list_keys(self, prefix: str) -> list: """List all keys under a prefix.""" keys = [] paginator = self.client.get_paginator("list_objects_v2") for page in paginator.paginate(Bucket=self.bucket, Prefix=prefix): for obj in page.get("Contents", []): keys.append(obj["Key"]) return keys def upload_directory(self, local_dir: str, prefix: str): """Upload all files in a local directory to MinIO under a prefix.""" for root, _, files in os.walk(local_dir): for fname in files: local_path = os.path.join(root, fname) rel_path = os.path.relpath(local_path, local_dir) key = f"{prefix}/{rel_path}" self.upload_file(local_path, key) def download_prefix(self, prefix: str, local_dir: str): """Download all files under a MinIO prefix to a local directory.""" keys = self.list_keys(prefix) for key in keys: rel_path = key[len(prefix):].lstrip("/") if not rel_path: continue local_path = os.path.join(local_dir, rel_path) self.download_file(key, local_path) def exists(self, key: str) -> bool: """Check if a MinIO key exists.""" try: self.client.head_object(Bucket=self.bucket, Key=key) return True except Exception: return False def get_size(self, key: str) -> Optional[int]: """Get the size of a MinIO object in bytes.""" try: response = self.client.head_object(Bucket=self.bucket, Key=key) return response["ContentLength"] except Exception: return None