Zhen Ye Claude Opus 4.6 commited on
Commit
4c10904
·
1 Parent(s): 2327048

feat(isr): create async assessor loop with verdict merging

Browse files

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

Files changed (1) hide show
  1. models/isr/loop.py +137 -0
models/isr/loop.py ADDED
@@ -0,0 +1,137 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import asyncio
2
+ import copy
3
+ import logging
4
+ import os
5
+ import time
6
+
7
+ from jobs.storage import get_job_storage
8
+ from models.isr.assessor import ISRAssessor
9
+
10
+ logger = logging.getLogger(__name__)
11
+
12
+
13
+ async def run_isr_assessor_loop(
14
+ job_id: str,
15
+ mission: str,
16
+ interval_sec: float = 5.0,
17
+ ) -> None:
18
+ """
19
+ Run ISR assessment loop alongside inference pipeline.
20
+
21
+ Every interval_sec seconds:
22
+ 1. Read latest tracks from JobStorage
23
+ 2. Get latest frame for cropping
24
+ 3. Call ISRAssessor.assess_batch_sync (in thread pool)
25
+ 4. Merge verdicts back into all stored frames by track_id
26
+ """
27
+ api_key = os.environ.get("OPENAI_API_KEY")
28
+ if not api_key:
29
+ logger.warning("OPENAI_API_KEY not set, ISR assessment disabled for job %s", job_id)
30
+ return
31
+
32
+ assessor = ISRAssessor(mission=mission, api_key=api_key)
33
+ storage = get_job_storage()
34
+ assessed_track_ids: dict[str, dict] = {} # cache of latest verdicts by track_id
35
+
36
+ logger.info("ISR assessor started for job %s (interval=%.1fs)", job_id, interval_sec)
37
+
38
+ try:
39
+ while True:
40
+ await asyncio.sleep(interval_sec)
41
+
42
+ # Check if job still exists and is processing
43
+ job = storage.get(job_id)
44
+ if not job:
45
+ logger.info("ISR assessor: job %s gone, stopping", job_id)
46
+ break
47
+
48
+ # Get latest frame
49
+ frame = storage.get_latest_frame(job_id)
50
+ if frame is None:
51
+ continue
52
+
53
+ # Find latest frame index with track data
54
+ with storage._lock:
55
+ frame_indices = list(storage._tracks.get(job_id, {}).keys())
56
+
57
+ if not frame_indices:
58
+ continue
59
+
60
+ latest_idx = max(frame_indices)
61
+ tracks = storage.get_track_data(job_id, latest_idx)
62
+
63
+ if not tracks:
64
+ continue
65
+
66
+ # Deep copy tracks to avoid mutation during assessment
67
+ tracks_copy = copy.deepcopy(tracks)
68
+
69
+ # Run GPT assessment in thread pool (blocking IO)
70
+ t0 = time.perf_counter()
71
+ try:
72
+ verdicts = await asyncio.to_thread(
73
+ assessor.assess_batch_sync, tracks_copy, frame.copy()
74
+ )
75
+ except Exception:
76
+ logger.exception("ISR assessment call failed for job %s", job_id)
77
+ continue
78
+
79
+ elapsed = time.perf_counter() - t0
80
+ logger.info(
81
+ "ISR assessment for job %s: %d tracks assessed in %.1fs",
82
+ job_id, len(verdicts), elapsed
83
+ )
84
+
85
+ if not verdicts:
86
+ continue
87
+
88
+ # Update cache
89
+ assessed_track_ids.update(verdicts)
90
+
91
+ # Merge verdicts into ALL stored frames
92
+ _merge_verdicts(storage, job_id, assessed_track_ids, latest_idx)
93
+
94
+ except asyncio.CancelledError:
95
+ logger.info("ISR assessor cancelled for job %s, running final assessment", job_id)
96
+ # Run one final assessment on cancellation
97
+ frame = storage.get_latest_frame(job_id)
98
+ if frame is not None:
99
+ with storage._lock:
100
+ frame_indices = list(storage._tracks.get(job_id, {}).keys())
101
+ if frame_indices:
102
+ latest_idx = max(frame_indices)
103
+ tracks = storage.get_track_data(job_id, latest_idx)
104
+ if tracks:
105
+ try:
106
+ verdicts = assessor.assess_batch_sync(copy.deepcopy(tracks), frame.copy())
107
+ if verdicts:
108
+ assessed_track_ids.update(verdicts)
109
+ _merge_verdicts(storage, job_id, assessed_track_ids, latest_idx)
110
+ logger.info("ISR final assessment: %d tracks", len(verdicts))
111
+ except Exception:
112
+ logger.exception("ISR final assessment failed for job %s", job_id)
113
+ except Exception:
114
+ logger.exception("ISR assessor loop crashed for job %s", job_id)
115
+
116
+
117
+ def _merge_verdicts(storage, job_id: str, verdicts: dict, assessment_frame_idx: int) -> None:
118
+ """Merge verdict data into all stored frames for matching track_ids."""
119
+ with storage._lock:
120
+ frames = storage._tracks.get(job_id, {})
121
+ for frame_idx, frame_tracks in frames.items():
122
+ for det in frame_tracks:
123
+ tid = det.get("track_id")
124
+ if tid and tid in verdicts:
125
+ v = verdicts[tid]
126
+ det["mission_relevant"] = v.get("mission_relevant", True)
127
+ det["satisfies"] = v.get("satisfies")
128
+ det["reason"] = v.get("reason", "")
129
+ det["features"] = v.get("features", {})
130
+ det["assessment_status"] = "ASSESSED"
131
+ det["assessment_frame_index"] = assessment_frame_idx
132
+ # Store gpt_raw for frontend feature table
133
+ det["gpt_raw"] = {
134
+ "satisfies": v.get("satisfies"),
135
+ "reason": v.get("reason", ""),
136
+ **v.get("features", {}),
137
+ }