ISR / models /isr /loop.py
Zhen Ye
feat(isr): create async assessor loop with verdict merging
a74fcd3
import asyncio
import copy
import logging
import os
import time
from jobs.storage import get_job_storage
from models.isr.assessor import ISRAssessor
logger = logging.getLogger(__name__)
async def run_isr_assessor_loop(
job_id: str,
mission: str,
interval_sec: float = 5.0,
) -> None:
"""
Run ISR assessment loop alongside inference pipeline.
Every interval_sec seconds:
1. Read latest tracks from JobStorage
2. Get latest frame for cropping
3. Call ISRAssessor.assess_batch_sync (in thread pool)
4. Merge verdicts back into all stored frames by track_id
"""
api_key = os.environ.get("OPENAI_API_KEY")
if not api_key:
logger.warning("OPENAI_API_KEY not set, ISR assessment disabled for job %s", job_id)
return
assessor = ISRAssessor(mission=mission, api_key=api_key)
storage = get_job_storage()
assessed_track_ids: dict[str, dict] = {} # cache of latest verdicts by track_id
logger.info("ISR assessor started for job %s (interval=%.1fs)", job_id, interval_sec)
try:
while True:
await asyncio.sleep(interval_sec)
# Check if job still exists and is processing
job = storage.get(job_id)
if not job:
logger.info("ISR assessor: job %s gone, stopping", job_id)
break
# Get latest frame
frame = storage.get_latest_frame(job_id)
if frame is None:
continue
# Find latest frame index with track data
with storage._lock:
frame_indices = list(storage._tracks.get(job_id, {}).keys())
if not frame_indices:
continue
latest_idx = max(frame_indices)
tracks = storage.get_track_data(job_id, latest_idx)
if not tracks:
continue
# Deep copy tracks to avoid mutation during assessment
tracks_copy = copy.deepcopy(tracks)
# Run GPT assessment in thread pool (blocking IO)
t0 = time.perf_counter()
try:
verdicts = await asyncio.to_thread(
assessor.assess_batch_sync, tracks_copy, frame.copy()
)
except Exception:
logger.exception("ISR assessment call failed for job %s", job_id)
continue
elapsed = time.perf_counter() - t0
logger.info(
"ISR assessment for job %s: %d tracks assessed in %.1fs",
job_id, len(verdicts), elapsed
)
if not verdicts:
continue
# Update cache
assessed_track_ids.update(verdicts)
# Merge verdicts into ALL stored frames
_merge_verdicts(storage, job_id, assessed_track_ids, latest_idx)
except asyncio.CancelledError:
logger.info("ISR assessor cancelled for job %s, running final assessment", job_id)
# Run one final assessment on cancellation
frame = storage.get_latest_frame(job_id)
if frame is not None:
with storage._lock:
frame_indices = list(storage._tracks.get(job_id, {}).keys())
if frame_indices:
latest_idx = max(frame_indices)
tracks = storage.get_track_data(job_id, latest_idx)
if tracks:
try:
verdicts = assessor.assess_batch_sync(copy.deepcopy(tracks), frame.copy())
if verdicts:
assessed_track_ids.update(verdicts)
_merge_verdicts(storage, job_id, assessed_track_ids, latest_idx)
logger.info("ISR final assessment: %d tracks", len(verdicts))
except Exception:
logger.exception("ISR final assessment failed for job %s", job_id)
except Exception:
logger.exception("ISR assessor loop crashed for job %s", job_id)
def _merge_verdicts(storage, job_id: str, verdicts: dict, assessment_frame_idx: int) -> None:
"""Merge verdict data into all stored frames for matching track_ids."""
with storage._lock:
frames = storage._tracks.get(job_id, {})
for frame_idx, frame_tracks in frames.items():
for det in frame_tracks:
tid = det.get("track_id")
if tid and tid in verdicts:
v = verdicts[tid]
det["mission_relevant"] = v.get("mission_relevant", True)
det["satisfies"] = v.get("satisfies")
det["reason"] = v.get("reason", "")
det["features"] = v.get("features", {})
det["assessment_status"] = "ASSESSED"
det["assessment_frame_index"] = assessment_frame_idx
# Store gpt_raw for frontend feature table
det["gpt_raw"] = {
"satisfies": v.get("satisfies"),
"reason": v.get("reason", ""),
**v.get("features", {}),
}