File size: 4,765 Bytes
4c10904
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
ecfb6e4
4c10904
240e068
4c10904
 
 
240e068
4c10904
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
240e068
4c10904
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
ecfb6e4
4c10904
 
 
 
 
 
 
 
 
 
 
 
 
240e068
4c10904
 
ecfb6e4
4c10904
 
 
 
 
 
 
ecfb6e4
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
import asyncio
import copy
import logging
import os
import time

from jobs.storage import get_job_storage
from models.isr.assessor import ISRAssessor

logger = logging.getLogger(__name__)


async def run_isr_assessor_loop(
    job_id: str,
    mission: str,
    interval_sec: float = 5.0,
) -> None:
    """
    Run ISR assessment loop alongside inference pipeline.

    Every interval_sec seconds:
    1. Read latest tracks from JobStorage
    2. Get latest frame for cropping
    3. Call ISRAssessor.assess_batch_sync (in thread pool)
    4. Store verdicts in separate verdict store keyed by track_id
    """
    if not os.environ.get("OPENAI_API_KEY"):
        logger.warning("OPENAI_API_KEY not set, ISR assessment disabled for job %s", job_id)
        return

    assessor = ISRAssessor(mission=mission)
    storage = get_job_storage()
    assessed_track_ids: dict[str, dict] = {}  # cache of latest verdicts by track_id

    logger.info("ISR assessor started for job %s (interval=%.1fs)", job_id, interval_sec)

    try:
        while True:
            await asyncio.sleep(interval_sec)

            # Check if job still exists and is processing
            job = storage.get(job_id)
            if not job:
                logger.info("ISR assessor: job %s gone, stopping", job_id)
                break

            # Get latest frame
            frame = storage.get_latest_frame(job_id)
            if frame is None:
                continue

            # Find latest frame index with track data
            with storage._lock:
                frame_indices = list(storage._tracks.get(job_id, {}).keys())

            if not frame_indices:
                continue

            latest_idx = max(frame_indices)
            tracks = storage.get_track_data(job_id, latest_idx)

            if not tracks:
                continue

            # Deep copy tracks to avoid mutation during assessment
            tracks_copy = copy.deepcopy(tracks)

            # Run GPT assessment in thread pool (blocking IO)
            t0 = time.perf_counter()
            try:
                verdicts = await asyncio.to_thread(
                    assessor.assess_batch_sync, tracks_copy, frame
                )
            except Exception:
                logger.exception("ISR assessment call failed for job %s", job_id)
                continue

            elapsed = time.perf_counter() - t0
            logger.info(
                "ISR assessment for job %s: %d tracks assessed in %.1fs",
                job_id, len(verdicts), elapsed
            )

            if not verdicts:
                continue

            # Update cache
            assessed_track_ids.update(verdicts)

            # Merge verdicts into ALL stored frames
            _store_verdicts(storage, job_id, assessed_track_ids, latest_idx)

    except asyncio.CancelledError:
        logger.info("ISR assessor cancelled for job %s, running final assessment", job_id)
        # Run one final assessment on cancellation
        frame = storage.get_latest_frame(job_id)
        if frame is not None:
            with storage._lock:
                frame_indices = list(storage._tracks.get(job_id, {}).keys())
            if frame_indices:
                latest_idx = max(frame_indices)
                tracks = storage.get_track_data(job_id, latest_idx)
                if tracks:
                    try:
                        verdicts = assessor.assess_batch_sync(copy.deepcopy(tracks), frame)
                        if verdicts:
                            assessed_track_ids.update(verdicts)
                            _store_verdicts(storage, job_id, assessed_track_ids, latest_idx)
                            logger.info("ISR final assessment: %d tracks", len(verdicts))
                    except Exception:
                        logger.exception("ISR final assessment failed for job %s", job_id)
    except Exception:
        logger.exception("ISR assessor loop crashed for job %s", job_id)


def _store_verdicts(storage, job_id: str, verdicts: dict, assessment_frame_idx: int) -> None:
    """Store verdict data in the verdict store (no mutation of track dicts)."""
    batch = {}
    for tid, v in verdicts.items():
        batch[tid] = {
            "mission_relevant": v.get("mission_relevant", True),
            "satisfies": v.get("satisfies"),
            "reason": v.get("reason", ""),
            "features": v.get("features", {}),
            "assessment_status": "ASSESSED",
            "assessment_frame_index": assessment_frame_idx,
            "gpt_raw": {
                "satisfies": v.get("satisfies"),
                "reason": v.get("reason", ""),
                **v.get("features", {}),
            },
        }
    storage.set_verdicts_batch(job_id, batch)