|
|
""" |
|
|
Background task processing (no Celery, no AWS). |
|
|
""" |
|
|
|
|
|
import json |
|
|
import os |
|
|
import threading |
|
|
import time |
|
|
from datetime import datetime, timezone |
|
|
|
|
|
from .inference import run_inference |
|
|
from .config import OUTPUTS_DIR |
|
|
|
|
|
|
|
|
runs: dict[str, dict] = {} |
|
|
runs_lock = threading.Lock() |
|
|
|
|
|
|
|
|
FORCE_ERROR = os.getenv("FORCE_ERROR") == "1" |
|
|
SLEEP_SECS = int(os.getenv("SLEEP_SECS", "0")) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def run_task( |
|
|
run_id: str, |
|
|
image_path: str, |
|
|
topics: list = None, |
|
|
creators: list = None, |
|
|
model: str = "paintingclip", |
|
|
) -> None: |
|
|
""" |
|
|
Process a single run: load image from disk, run ML inference, save output, update status. |
|
|
""" |
|
|
print(f"π Starting task for run {run_id}") |
|
|
print(f"π Image path: {image_path}") |
|
|
print(f"π Topics: {topics}, Creators: {creators}, Model: {model}") |
|
|
|
|
|
|
|
|
print(f"π Environment check:") |
|
|
print(f" STUB_MODE: {os.getenv('STUB_MODE', 'not set')}") |
|
|
print(f" Current working directory: {os.getcwd()}") |
|
|
print(f" Image file exists: {os.path.exists(image_path)}") |
|
|
if os.path.exists(image_path): |
|
|
print(f" Image file size: {os.path.getsize(image_path)} bytes") |
|
|
|
|
|
|
|
|
try: |
|
|
from .patch_inference import _prepare_image |
|
|
_prepare_image.cache_clear() |
|
|
print(f"β
Cleared patch inference cache") |
|
|
except ImportError as e: |
|
|
print(f"β οΈ patch_inference import failed: {e}") |
|
|
|
|
|
|
|
|
with runs_lock: |
|
|
if run_id not in runs: |
|
|
print(f"β Run {run_id} not found in runs store") |
|
|
return |
|
|
runs[run_id]["status"] = "processing" |
|
|
runs[run_id]["startedAt"] = datetime.now(timezone.utc).isoformat(timespec="seconds") |
|
|
runs[run_id]["updatedAt"] = runs[run_id]["startedAt"] |
|
|
print(f"β
Run {run_id} marked as processing") |
|
|
|
|
|
try: |
|
|
|
|
|
if not os.path.exists(image_path): |
|
|
raise FileNotFoundError(f"Image file not found: {image_path}") |
|
|
|
|
|
if SLEEP_SECS: |
|
|
time.sleep(SLEEP_SECS) |
|
|
|
|
|
print(f"π About to call run_inference...") |
|
|
|
|
|
|
|
|
labels = run_inference( |
|
|
image_path, filter_topics=topics, filter_creators=creators, model_type=model |
|
|
) |
|
|
|
|
|
print(f"β
run_inference completed successfully") |
|
|
print(f"β
Labels type: {type(labels)}") |
|
|
print(f"β
Labels length: {len(labels) if isinstance(labels, list) else 'not a list'}") |
|
|
|
|
|
|
|
|
if FORCE_ERROR: |
|
|
raise RuntimeError("Forced error for testing") |
|
|
|
|
|
|
|
|
print(f"π Saving results to outputs directory...") |
|
|
os.makedirs(OUTPUTS_DIR, exist_ok=True) |
|
|
output_filename = f"{run_id}.json" |
|
|
output_path = os.path.join(OUTPUTS_DIR, output_filename) |
|
|
output_key = f"outputs/{output_filename}" |
|
|
|
|
|
with open(output_path, "w") as f: |
|
|
json.dump(labels, f) |
|
|
|
|
|
|
|
|
if not os.path.exists(output_path): |
|
|
raise RuntimeError(f"Failed to create output file: {output_path}") |
|
|
|
|
|
|
|
|
with runs_lock: |
|
|
runs[run_id]["status"] = "done" |
|
|
runs[run_id]["outputKey"] = output_key |
|
|
runs[run_id]["finishedAt"] = datetime.now(timezone.utc).isoformat(timespec="seconds") |
|
|
runs[run_id]["updatedAt"] = runs[run_id]["finishedAt"] |
|
|
runs[run_id].pop("errorMessage", None) |
|
|
print(f"β
Task completed successfully for run {run_id}") |
|
|
print(f"β
Output saved to: {output_path}") |
|
|
print(f"β
Output key: {output_key}") |
|
|
|
|
|
except Exception as exc: |
|
|
|
|
|
print(f"β Error in run {run_id}: {exc}") |
|
|
print(f"β Error type: {type(exc).__name__}") |
|
|
import traceback |
|
|
print(f"β Full traceback:") |
|
|
traceback.print_exc() |
|
|
|
|
|
with runs_lock: |
|
|
if run_id in runs: |
|
|
runs[run_id]["status"] = "error" |
|
|
runs[run_id]["errorMessage"] = str(exc)[:500] |
|
|
runs[run_id]["updatedAt"] = datetime.now(timezone.utc).isoformat(timespec="seconds") |
|
|
print(f"β Run {run_id} marked as error: {runs[run_id]['errorMessage']}") |
|
|
|