jim800121chen efa67d59a4 Add web frontend, MinIO storage, monitoring, and docker-compose deployment
- Frontend: rewrite Home.vue to match backend POST /jobs API (remove single-stage options)
- Frontend: add Monitor page (/monitor) for queue and job monitoring
- Frontend: add job history with localStorage tracking (per-browser)
- Frontend: fix Nginx proxy rewrite (/api -> /) and add 500MB upload limit
- Backend: add MinIO storage support (STORAGE_BACKEND=minio) alongside local mode
- Backend: add GET /queues/stats API for queue monitoring
- Backend: fix download handler for MinIO (buffer mode for Node 18 compat)
- Workers: add S3/MinIO download/upload in consumer.py with isolated temp dirs
- Workers: add s3_storage.py helper with lifecycle rule (7-day TTL)
- Docker: add docker-compose.yml with all services (web, scheduler, redis, workers)
- Docker: ports mapped to 9500 (web) and 9501 (scheduler)
- Config: add .env to .gitignore to protect secrets

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-01 15:04:09 +08:00

146 lines
5.6 KiB
Python

"""
MinIO storage helper for workers.
Provides upload/download functionality to replace Docker Shared Volume.
Workers download inputs from MinIO to local temp dir, process, then upload results.
Uses boto3 (S3-compatible API) to communicate with MinIO.
"""
import logging
import os
import shutil
from typing import Optional
import boto3
from botocore.config import Config as BotoConfig
logger = logging.getLogger(__name__)
class MinIOStorage:
"""MinIO storage client for job file exchange."""
def __init__(
self,
endpoint_url: str = None,
bucket: str = None,
access_key: str = None,
secret_key: str = None,
region: str = None,
lifecycle_days: int = None,
):
self.endpoint_url = endpoint_url or os.environ.get("MINIO_ENDPOINT_URL", "http://192.168.0.130:9000")
self.bucket = bucket or os.environ.get("MINIO_BUCKET", "convertet-working-space")
self.access_key = access_key or os.environ.get("MINIO_ACCESS_KEY", "convuser")
self.secret_key = secret_key or os.environ.get("MINIO_SECRET_KEY", "")
self.region = region or os.environ.get("MINIO_REGION", "us-east-1")
self.lifecycle_days = lifecycle_days if lifecycle_days is not None else int(os.environ.get("MINIO_LIFECYCLE_DAYS", "7"))
self.client = boto3.client(
"s3",
endpoint_url=self.endpoint_url,
aws_access_key_id=self.access_key,
aws_secret_access_key=self.secret_key,
region_name=self.region,
config=BotoConfig(signature_version="s3v4"),
)
self._ensure_bucket()
if self.lifecycle_days > 0:
self._ensure_lifecycle_rule()
def _ensure_bucket(self):
"""Verify bucket exists (do not auto-create — bucket is managed externally)."""
try:
self.client.head_bucket(Bucket=self.bucket)
logger.info(f"MinIO bucket verified: {self.bucket}")
except Exception as e:
logger.error(f"MinIO bucket '{self.bucket}' is not accessible: {e}")
raise
def _ensure_lifecycle_rule(self):
"""Set lifecycle rule to auto-expire objects under jobs/ prefix."""
rule_id = "auto-cleanup-jobs"
try:
self.client.put_bucket_lifecycle_configuration(
Bucket=self.bucket,
LifecycleConfiguration={
"Rules": [
{
"ID": rule_id,
"Status": "Enabled",
"Filter": {"Prefix": "jobs/"},
"Expiration": {"Days": self.lifecycle_days},
}
]
},
)
logger.info(f"MinIO lifecycle rule set: jobs/* expire after {self.lifecycle_days} days")
except Exception as e:
logger.warning(f"Could not set lifecycle rule: {e}")
def upload_file(self, local_path: str, key: str):
"""Upload a local file to MinIO."""
self.client.upload_file(local_path, self.bucket, key)
logger.debug(f"Uploaded {local_path} -> minio://{self.bucket}/{key}")
def upload_data(self, data: bytes, key: str):
"""Upload raw bytes to MinIO."""
self.client.put_object(Bucket=self.bucket, Key=key, Body=data)
logger.debug(f"Uploaded {len(data)} bytes -> minio://{self.bucket}/{key}")
def download_file(self, key: str, local_path: str):
"""Download a file from MinIO to local path."""
os.makedirs(os.path.dirname(local_path), exist_ok=True)
self.client.download_file(self.bucket, key, local_path)
logger.debug(f"Downloaded minio://{self.bucket}/{key} -> {local_path}")
def download_to_stream(self, key: str):
"""Get a streaming body for a MinIO object."""
response = self.client.get_object(Bucket=self.bucket, Key=key)
return response["Body"]
def list_keys(self, prefix: str) -> list:
"""List all keys under a prefix."""
keys = []
paginator = self.client.get_paginator("list_objects_v2")
for page in paginator.paginate(Bucket=self.bucket, Prefix=prefix):
for obj in page.get("Contents", []):
keys.append(obj["Key"])
return keys
def upload_directory(self, local_dir: str, prefix: str):
"""Upload all files in a local directory to MinIO under a prefix."""
for root, _, files in os.walk(local_dir):
for fname in files:
local_path = os.path.join(root, fname)
rel_path = os.path.relpath(local_path, local_dir)
key = f"{prefix}/{rel_path}"
self.upload_file(local_path, key)
def download_prefix(self, prefix: str, local_dir: str):
"""Download all files under a MinIO prefix to a local directory."""
keys = self.list_keys(prefix)
for key in keys:
rel_path = key[len(prefix):].lstrip("/")
if not rel_path:
continue
local_path = os.path.join(local_dir, rel_path)
self.download_file(key, local_path)
def exists(self, key: str) -> bool:
"""Check if a MinIO key exists."""
try:
self.client.head_object(Bucket=self.bucket, Key=key)
return True
except Exception:
return False
def get_size(self, key: str) -> Optional[int]:
"""Get the size of a MinIO object in bytes."""
try:
response = self.client.head_object(Bucket=self.bucket, Key=key)
return response["ContentLength"]
except Exception:
return None