import asyncio import copy import logging import os import time from jobs.storage import get_job_storage from models.isr.assessor import ISRAssessor logger = logging.getLogger(__name__) async def run_isr_assessor_loop( job_id: str, mission: str, interval_sec: float = 5.0, ) -> None: """ Run ISR assessment loop alongside inference pipeline. Every interval_sec seconds: 1. Read latest tracks from JobStorage 2. Get latest frame for cropping 3. Call ISRAssessor.assess_batch_sync (in thread pool) 4. Merge verdicts back into all stored frames by track_id """ api_key = os.environ.get("OPENAI_API_KEY") if not api_key: logger.warning("OPENAI_API_KEY not set, ISR assessment disabled for job %s", job_id) return assessor = ISRAssessor(mission=mission, api_key=api_key) storage = get_job_storage() assessed_track_ids: dict[str, dict] = {} # cache of latest verdicts by track_id logger.info("ISR assessor started for job %s (interval=%.1fs)", job_id, interval_sec) try: while True: await asyncio.sleep(interval_sec) # Check if job still exists and is processing job = storage.get(job_id) if not job: logger.info("ISR assessor: job %s gone, stopping", job_id) break # Get latest frame frame = storage.get_latest_frame(job_id) if frame is None: continue # Find latest frame index with track data with storage._lock: frame_indices = list(storage._tracks.get(job_id, {}).keys()) if not frame_indices: continue latest_idx = max(frame_indices) tracks = storage.get_track_data(job_id, latest_idx) if not tracks: continue # Deep copy tracks to avoid mutation during assessment tracks_copy = copy.deepcopy(tracks) # Run GPT assessment in thread pool (blocking IO) t0 = time.perf_counter() try: verdicts = await asyncio.to_thread( assessor.assess_batch_sync, tracks_copy, frame.copy() ) except Exception: logger.exception("ISR assessment call failed for job %s", job_id) continue elapsed = time.perf_counter() - t0 logger.info( "ISR assessment for job %s: %d tracks assessed in %.1fs", job_id, len(verdicts), elapsed ) if not verdicts: continue # Update cache assessed_track_ids.update(verdicts) # Merge verdicts into ALL stored frames _merge_verdicts(storage, job_id, assessed_track_ids, latest_idx) except asyncio.CancelledError: logger.info("ISR assessor cancelled for job %s, running final assessment", job_id) # Run one final assessment on cancellation frame = storage.get_latest_frame(job_id) if frame is not None: with storage._lock: frame_indices = list(storage._tracks.get(job_id, {}).keys()) if frame_indices: latest_idx = max(frame_indices) tracks = storage.get_track_data(job_id, latest_idx) if tracks: try: verdicts = assessor.assess_batch_sync(copy.deepcopy(tracks), frame.copy()) if verdicts: assessed_track_ids.update(verdicts) _merge_verdicts(storage, job_id, assessed_track_ids, latest_idx) logger.info("ISR final assessment: %d tracks", len(verdicts)) except Exception: logger.exception("ISR final assessment failed for job %s", job_id) except Exception: logger.exception("ISR assessor loop crashed for job %s", job_id) def _merge_verdicts(storage, job_id: str, verdicts: dict, assessment_frame_idx: int) -> None: """Merge verdict data into all stored frames for matching track_ids.""" with storage._lock: frames = storage._tracks.get(job_id, {}) for frame_idx, frame_tracks in frames.items(): for det in frame_tracks: tid = det.get("track_id") if tid and tid in verdicts: v = verdicts[tid] det["mission_relevant"] = v.get("mission_relevant", True) det["satisfies"] = v.get("satisfies") det["reason"] = v.get("reason", "") det["features"] = v.get("features", {}) det["assessment_status"] = "ASSESSED" det["assessment_frame_index"] = assessment_frame_idx # Store gpt_raw for frontend feature table det["gpt_raw"] = { "satisfies": v.get("satisfies"), "reason": v.get("reason", ""), **v.get("features", {}), }