File size: 4,869 Bytes
0e61117 80ac589 0e61117 80ac589 0e61117 4ac1f80 198d594 0e61117 198d594 0e61117 198d594 0e61117 198d594 0e61117 198d594 0e61117 198d594 0e61117 198d594 0e61117 198d594 0e61117 198d594 0e61117 198d594 0e61117 198d594 0e61117 198d594 0e61117 4ac1f80 0e61117 198d594 0e61117 198d594 0e61117 198d594 0e61117 198d594 4ac1f80 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 |
"""
Background task processing (no Celery, no AWS).
"""
import json
import os
import threading
import time
from datetime import datetime, timezone
from .inference import run_inference
from .config import OUTPUTS_DIR
# In-memory runs store and lock for thread-safe updates
runs: dict[str, dict] = {}
runs_lock = threading.Lock()
# Optional dev flags for testing
FORCE_ERROR = os.getenv("FORCE_ERROR") == "1"
SLEEP_SECS = int(os.getenv("SLEEP_SECS", "0"))
# Get the base directory for file storage (project root)
# BASE_DIR = os.path.abspath(os.path.dirname(os.path.dirname(os.path.dirname(__file__))))
# OUTPUTS_DIR = os.path.join(BASE_DIR, "data", "outputs")
# OUTPUTS_DIR is now imported from config
def run_task(
run_id: str,
image_path: str,
topics: list = None,
creators: list = None,
model: str = "paintingclip",
) -> None:
"""
Process a single run: load image from disk, run ML inference, save output, update status.
"""
print(f"π Starting task for run {run_id}")
print(f"π Image path: {image_path}")
print(f"π Topics: {topics}, Creators: {creators}, Model: {model}")
# Enhanced logging: Check environment and paths
print(f"π Environment check:")
print(f" STUB_MODE: {os.getenv('STUB_MODE', 'not set')}")
print(f" Current working directory: {os.getcwd()}")
print(f" Image file exists: {os.path.exists(image_path)}")
if os.path.exists(image_path):
print(f" Image file size: {os.path.getsize(image_path)} bytes")
# Clear any cached images from patch inference
try:
from .patch_inference import _prepare_image
_prepare_image.cache_clear()
print(f"β
Cleared patch inference cache")
except ImportError as e:
print(f"β οΈ patch_inference import failed: {e}")
# Mark as processing
with runs_lock:
if run_id not in runs:
print(f"β Run {run_id} not found in runs store")
return
runs[run_id]["status"] = "processing"
runs[run_id]["startedAt"] = datetime.now(timezone.utc).isoformat(timespec="seconds")
runs[run_id]["updatedAt"] = runs[run_id]["startedAt"]
print(f"β
Run {run_id} marked as processing")
try:
# 1. Check if the image file exists
if not os.path.exists(image_path):
raise FileNotFoundError(f"Image file not found: {image_path}")
if SLEEP_SECS:
time.sleep(SLEEP_SECS)
print(f"π About to call run_inference...")
# 2. Run the ML inference with filtering
labels = run_inference(
image_path, filter_topics=topics, filter_creators=creators, model_type=model
)
print(f"β
run_inference completed successfully")
print(f"β
Labels type: {type(labels)}")
print(f"β
Labels length: {len(labels) if isinstance(labels, list) else 'not a list'}")
# If FORCE_ERROR is enabled (for testing), raise an error to simulate a failure
if FORCE_ERROR:
raise RuntimeError("Forced error for testing")
# 3. Save the labels to a JSON file in the outputs folder
print(f"π Saving results to outputs directory...")
os.makedirs(OUTPUTS_DIR, exist_ok=True)
output_filename = f"{run_id}.json"
output_path = os.path.join(OUTPUTS_DIR, output_filename)
output_key = f"outputs/{output_filename}"
with open(output_path, "w") as f:
json.dump(labels, f)
# Verify the file was created
if not os.path.exists(output_path):
raise RuntimeError(f"Failed to create output file: {output_path}")
# 4. Mark the run as done and store the output path
with runs_lock:
runs[run_id]["status"] = "done"
runs[run_id]["outputKey"] = output_key
runs[run_id]["finishedAt"] = datetime.now(timezone.utc).isoformat(timespec="seconds")
runs[run_id]["updatedAt"] = runs[run_id]["finishedAt"]
runs[run_id].pop("errorMessage", None)
print(f"β
Task completed successfully for run {run_id}")
print(f"β
Output saved to: {output_path}")
print(f"β
Output key: {output_key}")
except Exception as exc:
# Enhanced error logging
print(f"β Error in run {run_id}: {exc}")
print(f"β Error type: {type(exc).__name__}")
import traceback
print(f"β Full traceback:")
traceback.print_exc()
with runs_lock:
if run_id in runs:
runs[run_id]["status"] = "error"
runs[run_id]["errorMessage"] = str(exc)[:500]
runs[run_id]["updatedAt"] = datetime.now(timezone.utc).isoformat(timespec="seconds")
print(f"β Run {run_id} marked as error: {runs[run_id]['errorMessage']}")
|