Spaces:
Paused
Paused
File size: 4,765 Bytes
4c10904 ecfb6e4 4c10904 240e068 4c10904 240e068 4c10904 240e068 4c10904 ecfb6e4 4c10904 240e068 4c10904 ecfb6e4 4c10904 ecfb6e4 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 | import asyncio
import copy
import logging
import os
import time
from jobs.storage import get_job_storage
from models.isr.assessor import ISRAssessor
logger = logging.getLogger(__name__)
async def run_isr_assessor_loop(
job_id: str,
mission: str,
interval_sec: float = 5.0,
) -> None:
"""
Run ISR assessment loop alongside inference pipeline.
Every interval_sec seconds:
1. Read latest tracks from JobStorage
2. Get latest frame for cropping
3. Call ISRAssessor.assess_batch_sync (in thread pool)
4. Store verdicts in separate verdict store keyed by track_id
"""
if not os.environ.get("OPENAI_API_KEY"):
logger.warning("OPENAI_API_KEY not set, ISR assessment disabled for job %s", job_id)
return
assessor = ISRAssessor(mission=mission)
storage = get_job_storage()
assessed_track_ids: dict[str, dict] = {} # cache of latest verdicts by track_id
logger.info("ISR assessor started for job %s (interval=%.1fs)", job_id, interval_sec)
try:
while True:
await asyncio.sleep(interval_sec)
# Check if job still exists and is processing
job = storage.get(job_id)
if not job:
logger.info("ISR assessor: job %s gone, stopping", job_id)
break
# Get latest frame
frame = storage.get_latest_frame(job_id)
if frame is None:
continue
# Find latest frame index with track data
with storage._lock:
frame_indices = list(storage._tracks.get(job_id, {}).keys())
if not frame_indices:
continue
latest_idx = max(frame_indices)
tracks = storage.get_track_data(job_id, latest_idx)
if not tracks:
continue
# Deep copy tracks to avoid mutation during assessment
tracks_copy = copy.deepcopy(tracks)
# Run GPT assessment in thread pool (blocking IO)
t0 = time.perf_counter()
try:
verdicts = await asyncio.to_thread(
assessor.assess_batch_sync, tracks_copy, frame
)
except Exception:
logger.exception("ISR assessment call failed for job %s", job_id)
continue
elapsed = time.perf_counter() - t0
logger.info(
"ISR assessment for job %s: %d tracks assessed in %.1fs",
job_id, len(verdicts), elapsed
)
if not verdicts:
continue
# Update cache
assessed_track_ids.update(verdicts)
# Merge verdicts into ALL stored frames
_store_verdicts(storage, job_id, assessed_track_ids, latest_idx)
except asyncio.CancelledError:
logger.info("ISR assessor cancelled for job %s, running final assessment", job_id)
# Run one final assessment on cancellation
frame = storage.get_latest_frame(job_id)
if frame is not None:
with storage._lock:
frame_indices = list(storage._tracks.get(job_id, {}).keys())
if frame_indices:
latest_idx = max(frame_indices)
tracks = storage.get_track_data(job_id, latest_idx)
if tracks:
try:
verdicts = assessor.assess_batch_sync(copy.deepcopy(tracks), frame)
if verdicts:
assessed_track_ids.update(verdicts)
_store_verdicts(storage, job_id, assessed_track_ids, latest_idx)
logger.info("ISR final assessment: %d tracks", len(verdicts))
except Exception:
logger.exception("ISR final assessment failed for job %s", job_id)
except Exception:
logger.exception("ISR assessor loop crashed for job %s", job_id)
def _store_verdicts(storage, job_id: str, verdicts: dict, assessment_frame_idx: int) -> None:
"""Store verdict data in the verdict store (no mutation of track dicts)."""
batch = {}
for tid, v in verdicts.items():
batch[tid] = {
"mission_relevant": v.get("mission_relevant", True),
"satisfies": v.get("satisfies"),
"reason": v.get("reason", ""),
"features": v.get("features", {}),
"assessment_status": "ASSESSED",
"assessment_frame_index": assessment_frame_idx,
"gpt_raw": {
"satisfies": v.get("satisfies"),
"reason": v.get("reason", ""),
**v.get("features", {}),
},
}
storage.set_verdicts_batch(job_id, batch)
|