from pathlib import Path import random import shutil import sys ROOT = Path(__file__).resolve().parents[2] sys.path.insert(0, str(ROOT)) from services.workers.onnx.core import process_onnx_core from services.workers.bie.core import process_bie_core from services.workers.nef.core import process_nef_core def test_worker_flow_e2e_uses_single_workdir(): base_outputs = ROOT / "tests" / "fixtures" / "outputs" base_outputs.mkdir(parents=True, exist_ok=True) task_id = None work_dir = None for _ in range(50): candidate = random.randint(100, 999) candidate_dir = base_outputs / str(candidate) if not candidate_dir.exists(): task_id = candidate work_dir = candidate_dir break assert task_id is not None, "Unable to allocate a unique task id" work_dir.mkdir(parents=True, exist_ok=False) src_onnx_dir = ROOT / "tests" / "fixtures" / "onnx" src_images_dir = ROOT / "tests" / "fixtures" / "bie_images" ref_images_dir = work_dir / "ref_images" onnx_files = [p for p in src_onnx_dir.iterdir() if p.is_file() and p.suffix == ".onnx"] assert len(onnx_files) == 1, "Expected a single ONNX fixture file" input_file = work_dir / onnx_files[0].name shutil.copy2(onnx_files[0], input_file) shutil.copytree(src_images_dir, ref_images_dir) work_inputs = [p for p in work_dir.iterdir() if p.is_file()] assert len(work_inputs) == 1, "Working directory must contain a single input file" work_input_file = work_inputs[0] onnx_output = work_dir / "out.onnx" onnx_params = {"model_id": 10, "version": "e2e", "platform": "520", "work_dir": str(work_dir)} onnx_result = process_onnx_core( {"file_path": str(work_input_file)}, str(onnx_output), onnx_params, ) assert onnx_output.exists() assert onnx_result["file_path"] == str(onnx_output) assert onnx_result["file_size"] > 0 bie_output = work_dir / "out.bie" bie_params = {"model_id": 11, "version": "e2e", "platform": "530", "work_dir": str(work_dir)} bie_result = process_bie_core( {"onnx_file_path": str(onnx_output), "data_dir": str(ref_images_dir)}, str(bie_output), bie_params, ) assert bie_output.exists() assert bie_result["file_path"] == str(bie_output) assert bie_result["file_size"] > 0 nef_output = work_dir / "out.nef" nef_params = {"model_id": 12, "version": "e2e", "platform": "730", "work_dir": str(work_dir)} nef_result = process_nef_core( {"bie_file_path": str(bie_output)}, str(nef_output), nef_params, ) assert nef_output.exists() assert nef_result["file_path"] == str(nef_output) assert nef_result["file_size"] > 0