- Frontend: rewrite Home.vue to match backend POST /jobs API (remove single-stage options) - Frontend: add Monitor page (/monitor) for queue and job monitoring - Frontend: add job history with localStorage tracking (per-browser) - Frontend: fix Nginx proxy rewrite (/api -> /) and add 500MB upload limit - Backend: add MinIO storage support (STORAGE_BACKEND=minio) alongside local mode - Backend: add GET /queues/stats API for queue monitoring - Backend: fix download handler for MinIO (buffer mode for Node 18 compat) - Workers: add S3/MinIO download/upload in consumer.py with isolated temp dirs - Workers: add s3_storage.py helper with lifecycle rule (7-day TTL) - Docker: add docker-compose.yml with all services (web, scheduler, redis, workers) - Docker: ports mapped to 9500 (web) and 9501 (scheduler) - Config: add .env to .gitignore to protect secrets Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
146 lines
5.6 KiB
Python
146 lines
5.6 KiB
Python
"""
|
|
MinIO storage helper for workers.
|
|
|
|
Provides upload/download functionality to replace Docker Shared Volume.
|
|
Workers download inputs from MinIO to local temp dir, process, then upload results.
|
|
|
|
Uses boto3 (S3-compatible API) to communicate with MinIO.
|
|
"""
|
|
|
|
import logging
|
|
import os
|
|
import shutil
|
|
from typing import Optional
|
|
|
|
import boto3
|
|
from botocore.config import Config as BotoConfig
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class MinIOStorage:
|
|
"""MinIO storage client for job file exchange."""
|
|
|
|
def __init__(
|
|
self,
|
|
endpoint_url: str = None,
|
|
bucket: str = None,
|
|
access_key: str = None,
|
|
secret_key: str = None,
|
|
region: str = None,
|
|
lifecycle_days: int = None,
|
|
):
|
|
self.endpoint_url = endpoint_url or os.environ.get("MINIO_ENDPOINT_URL", "http://192.168.0.130:9000")
|
|
self.bucket = bucket or os.environ.get("MINIO_BUCKET", "convertet-working-space")
|
|
self.access_key = access_key or os.environ.get("MINIO_ACCESS_KEY", "convuser")
|
|
self.secret_key = secret_key or os.environ.get("MINIO_SECRET_KEY", "")
|
|
self.region = region or os.environ.get("MINIO_REGION", "us-east-1")
|
|
self.lifecycle_days = lifecycle_days if lifecycle_days is not None else int(os.environ.get("MINIO_LIFECYCLE_DAYS", "7"))
|
|
|
|
self.client = boto3.client(
|
|
"s3",
|
|
endpoint_url=self.endpoint_url,
|
|
aws_access_key_id=self.access_key,
|
|
aws_secret_access_key=self.secret_key,
|
|
region_name=self.region,
|
|
config=BotoConfig(signature_version="s3v4"),
|
|
)
|
|
self._ensure_bucket()
|
|
if self.lifecycle_days > 0:
|
|
self._ensure_lifecycle_rule()
|
|
|
|
def _ensure_bucket(self):
|
|
"""Verify bucket exists (do not auto-create — bucket is managed externally)."""
|
|
try:
|
|
self.client.head_bucket(Bucket=self.bucket)
|
|
logger.info(f"MinIO bucket verified: {self.bucket}")
|
|
except Exception as e:
|
|
logger.error(f"MinIO bucket '{self.bucket}' is not accessible: {e}")
|
|
raise
|
|
|
|
def _ensure_lifecycle_rule(self):
|
|
"""Set lifecycle rule to auto-expire objects under jobs/ prefix."""
|
|
rule_id = "auto-cleanup-jobs"
|
|
try:
|
|
self.client.put_bucket_lifecycle_configuration(
|
|
Bucket=self.bucket,
|
|
LifecycleConfiguration={
|
|
"Rules": [
|
|
{
|
|
"ID": rule_id,
|
|
"Status": "Enabled",
|
|
"Filter": {"Prefix": "jobs/"},
|
|
"Expiration": {"Days": self.lifecycle_days},
|
|
}
|
|
]
|
|
},
|
|
)
|
|
logger.info(f"MinIO lifecycle rule set: jobs/* expire after {self.lifecycle_days} days")
|
|
except Exception as e:
|
|
logger.warning(f"Could not set lifecycle rule: {e}")
|
|
|
|
def upload_file(self, local_path: str, key: str):
|
|
"""Upload a local file to MinIO."""
|
|
self.client.upload_file(local_path, self.bucket, key)
|
|
logger.debug(f"Uploaded {local_path} -> minio://{self.bucket}/{key}")
|
|
|
|
def upload_data(self, data: bytes, key: str):
|
|
"""Upload raw bytes to MinIO."""
|
|
self.client.put_object(Bucket=self.bucket, Key=key, Body=data)
|
|
logger.debug(f"Uploaded {len(data)} bytes -> minio://{self.bucket}/{key}")
|
|
|
|
def download_file(self, key: str, local_path: str):
|
|
"""Download a file from MinIO to local path."""
|
|
os.makedirs(os.path.dirname(local_path), exist_ok=True)
|
|
self.client.download_file(self.bucket, key, local_path)
|
|
logger.debug(f"Downloaded minio://{self.bucket}/{key} -> {local_path}")
|
|
|
|
def download_to_stream(self, key: str):
|
|
"""Get a streaming body for a MinIO object."""
|
|
response = self.client.get_object(Bucket=self.bucket, Key=key)
|
|
return response["Body"]
|
|
|
|
def list_keys(self, prefix: str) -> list:
|
|
"""List all keys under a prefix."""
|
|
keys = []
|
|
paginator = self.client.get_paginator("list_objects_v2")
|
|
for page in paginator.paginate(Bucket=self.bucket, Prefix=prefix):
|
|
for obj in page.get("Contents", []):
|
|
keys.append(obj["Key"])
|
|
return keys
|
|
|
|
def upload_directory(self, local_dir: str, prefix: str):
|
|
"""Upload all files in a local directory to MinIO under a prefix."""
|
|
for root, _, files in os.walk(local_dir):
|
|
for fname in files:
|
|
local_path = os.path.join(root, fname)
|
|
rel_path = os.path.relpath(local_path, local_dir)
|
|
key = f"{prefix}/{rel_path}"
|
|
self.upload_file(local_path, key)
|
|
|
|
def download_prefix(self, prefix: str, local_dir: str):
|
|
"""Download all files under a MinIO prefix to a local directory."""
|
|
keys = self.list_keys(prefix)
|
|
for key in keys:
|
|
rel_path = key[len(prefix):].lstrip("/")
|
|
if not rel_path:
|
|
continue
|
|
local_path = os.path.join(local_dir, rel_path)
|
|
self.download_file(key, local_path)
|
|
|
|
def exists(self, key: str) -> bool:
|
|
"""Check if a MinIO key exists."""
|
|
try:
|
|
self.client.head_object(Bucket=self.bucket, Key=key)
|
|
return True
|
|
except Exception:
|
|
return False
|
|
|
|
def get_size(self, key: str) -> Optional[int]:
|
|
"""Get the size of a MinIO object in bytes."""
|
|
try:
|
|
response = self.client.head_object(Bucket=self.bucket, Key=key)
|
|
return response["ContentLength"]
|
|
except Exception:
|
|
return None
|