File size: 4,869 Bytes
0e61117
 
 
 
 
 
 
 
 
 
 
80ac589
0e61117
 
 
 
 
 
 
 
 
 
80ac589
 
 
 
0e61117
 
 
 
 
 
 
 
 
 
 
 
4ac1f80
 
 
198d594
 
 
 
 
 
 
 
 
0e61117
 
 
 
198d594
 
 
0e61117
198d594
0e61117
 
198d594
0e61117
 
198d594
0e61117
198d594
0e61117
 
 
 
 
 
 
198d594
0e61117
198d594
 
0e61117
 
 
 
198d594
 
 
 
0e61117
 
 
 
 
 
198d594
0e61117
 
 
198d594
0e61117
 
 
 
 
 
 
 
 
 
 
198d594
 
0e61117
 
4ac1f80
 
 
0e61117
 
198d594
 
 
0e61117
198d594
 
0e61117
 
198d594
0e61117
198d594
 
4ac1f80
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
"""
Background task processing (no Celery, no AWS).
"""

import json
import os
import threading
import time
from datetime import datetime, timezone

from .inference import run_inference
from .config import OUTPUTS_DIR

# In-memory runs store and lock for thread-safe updates
runs: dict[str, dict] = {}
runs_lock = threading.Lock()

# Optional dev flags for testing
FORCE_ERROR = os.getenv("FORCE_ERROR") == "1"
SLEEP_SECS = int(os.getenv("SLEEP_SECS", "0"))

# Get the base directory for file storage (project root)
# BASE_DIR = os.path.abspath(os.path.dirname(os.path.dirname(os.path.dirname(__file__))))
# OUTPUTS_DIR = os.path.join(BASE_DIR, "data", "outputs")

# OUTPUTS_DIR is now imported from config


def run_task(
    run_id: str,
    image_path: str,
    topics: list = None,
    creators: list = None,
    model: str = "paintingclip",
) -> None:
    """
    Process a single run: load image from disk, run ML inference, save output, update status.
    """
    print(f"πŸš€ Starting task for run {run_id}")
    print(f"πŸš€ Image path: {image_path}")
    print(f"πŸš€ Topics: {topics}, Creators: {creators}, Model: {model}")
    
    # Enhanced logging: Check environment and paths
    print(f"πŸ” Environment check:")
    print(f"   STUB_MODE: {os.getenv('STUB_MODE', 'not set')}")
    print(f"   Current working directory: {os.getcwd()}")
    print(f"   Image file exists: {os.path.exists(image_path)}")
    if os.path.exists(image_path):
        print(f"   Image file size: {os.path.getsize(image_path)} bytes")
    
    # Clear any cached images from patch inference
    try:
        from .patch_inference import _prepare_image
        _prepare_image.cache_clear()
        print(f"βœ… Cleared patch inference cache")
    except ImportError as e:
        print(f"⚠️  patch_inference import failed: {e}")

    # Mark as processing
    with runs_lock:
        if run_id not in runs:
            print(f"❌ Run {run_id} not found in runs store")
            return
        runs[run_id]["status"] = "processing"
        runs[run_id]["startedAt"] = datetime.now(timezone.utc).isoformat(timespec="seconds")
        runs[run_id]["updatedAt"] = runs[run_id]["startedAt"]
        print(f"βœ… Run {run_id} marked as processing")

    try:
        # 1. Check if the image file exists
        if not os.path.exists(image_path):
            raise FileNotFoundError(f"Image file not found: {image_path}")

        if SLEEP_SECS:
            time.sleep(SLEEP_SECS)

        print(f"πŸ” About to call run_inference...")
        
        # 2. Run the ML inference with filtering
        labels = run_inference(
            image_path, filter_topics=topics, filter_creators=creators, model_type=model
        )
        
        print(f"βœ… run_inference completed successfully")
        print(f"βœ… Labels type: {type(labels)}")
        print(f"βœ… Labels length: {len(labels) if isinstance(labels, list) else 'not a list'}")

        # If FORCE_ERROR is enabled (for testing), raise an error to simulate a failure
        if FORCE_ERROR:
            raise RuntimeError("Forced error for testing")

        # 3. Save the labels to a JSON file in the outputs folder
        print(f"πŸ” Saving results to outputs directory...")
        os.makedirs(OUTPUTS_DIR, exist_ok=True)
        output_filename = f"{run_id}.json"
        output_path = os.path.join(OUTPUTS_DIR, output_filename)
        output_key = f"outputs/{output_filename}"

        with open(output_path, "w") as f:
            json.dump(labels, f)

        # Verify the file was created
        if not os.path.exists(output_path):
            raise RuntimeError(f"Failed to create output file: {output_path}")

        # 4. Mark the run as done and store the output path
        with runs_lock:
            runs[run_id]["status"] = "done"
            runs[run_id]["outputKey"] = output_key
            runs[run_id]["finishedAt"] = datetime.now(timezone.utc).isoformat(timespec="seconds")
            runs[run_id]["updatedAt"] = runs[run_id]["finishedAt"]
            runs[run_id].pop("errorMessage", None)
            print(f"βœ… Task completed successfully for run {run_id}")
            print(f"βœ… Output saved to: {output_path}")
            print(f"βœ… Output key: {output_key}")

    except Exception as exc:
        # Enhanced error logging
        print(f"❌ Error in run {run_id}: {exc}")
        print(f"❌ Error type: {type(exc).__name__}")
        import traceback
        print(f"❌ Full traceback:")
        traceback.print_exc()

        with runs_lock:
            if run_id in runs:
                runs[run_id]["status"] = "error"
                runs[run_id]["errorMessage"] = str(exc)[:500]
                runs[run_id]["updatedAt"] = datetime.now(timezone.utc).isoformat(timespec="seconds")
                print(f"❌ Run {run_id} marked as error: {runs[run_id]['errorMessage']}")