Spaces:
Sleeping
Sleeping
| import asyncio | |
| import copy | |
| import logging | |
| import os | |
| import time | |
| from jobs.storage import get_job_storage | |
| from models.isr.assessor import ISRAssessor | |
| logger = logging.getLogger(__name__) | |
| async def run_isr_assessor_loop( | |
| job_id: str, | |
| mission: str, | |
| interval_sec: float = 5.0, | |
| ) -> None: | |
| """ | |
| Run ISR assessment loop alongside inference pipeline. | |
| Every interval_sec seconds: | |
| 1. Read latest tracks from JobStorage | |
| 2. Get latest frame for cropping | |
| 3. Call ISRAssessor.assess_batch_sync (in thread pool) | |
| 4. Merge verdicts back into all stored frames by track_id | |
| """ | |
| api_key = os.environ.get("OPENAI_API_KEY") | |
| if not api_key: | |
| logger.warning("OPENAI_API_KEY not set, ISR assessment disabled for job %s", job_id) | |
| return | |
| assessor = ISRAssessor(mission=mission, api_key=api_key) | |
| storage = get_job_storage() | |
| assessed_track_ids: dict[str, dict] = {} # cache of latest verdicts by track_id | |
| logger.info("ISR assessor started for job %s (interval=%.1fs)", job_id, interval_sec) | |
| try: | |
| while True: | |
| await asyncio.sleep(interval_sec) | |
| # Check if job still exists and is processing | |
| job = storage.get(job_id) | |
| if not job: | |
| logger.info("ISR assessor: job %s gone, stopping", job_id) | |
| break | |
| # Get latest frame | |
| frame = storage.get_latest_frame(job_id) | |
| if frame is None: | |
| continue | |
| # Find latest frame index with track data | |
| with storage._lock: | |
| frame_indices = list(storage._tracks.get(job_id, {}).keys()) | |
| if not frame_indices: | |
| continue | |
| latest_idx = max(frame_indices) | |
| tracks = storage.get_track_data(job_id, latest_idx) | |
| if not tracks: | |
| continue | |
| # Deep copy tracks to avoid mutation during assessment | |
| tracks_copy = copy.deepcopy(tracks) | |
| # Run GPT assessment in thread pool (blocking IO) | |
| t0 = time.perf_counter() | |
| try: | |
| verdicts = await asyncio.to_thread( | |
| assessor.assess_batch_sync, tracks_copy, frame.copy() | |
| ) | |
| except Exception: | |
| logger.exception("ISR assessment call failed for job %s", job_id) | |
| continue | |
| elapsed = time.perf_counter() - t0 | |
| logger.info( | |
| "ISR assessment for job %s: %d tracks assessed in %.1fs", | |
| job_id, len(verdicts), elapsed | |
| ) | |
| if not verdicts: | |
| continue | |
| # Update cache | |
| assessed_track_ids.update(verdicts) | |
| # Merge verdicts into ALL stored frames | |
| _merge_verdicts(storage, job_id, assessed_track_ids, latest_idx) | |
| except asyncio.CancelledError: | |
| logger.info("ISR assessor cancelled for job %s, running final assessment", job_id) | |
| # Run one final assessment on cancellation | |
| frame = storage.get_latest_frame(job_id) | |
| if frame is not None: | |
| with storage._lock: | |
| frame_indices = list(storage._tracks.get(job_id, {}).keys()) | |
| if frame_indices: | |
| latest_idx = max(frame_indices) | |
| tracks = storage.get_track_data(job_id, latest_idx) | |
| if tracks: | |
| try: | |
| verdicts = assessor.assess_batch_sync(copy.deepcopy(tracks), frame.copy()) | |
| if verdicts: | |
| assessed_track_ids.update(verdicts) | |
| _merge_verdicts(storage, job_id, assessed_track_ids, latest_idx) | |
| logger.info("ISR final assessment: %d tracks", len(verdicts)) | |
| except Exception: | |
| logger.exception("ISR final assessment failed for job %s", job_id) | |
| except Exception: | |
| logger.exception("ISR assessor loop crashed for job %s", job_id) | |
| def _merge_verdicts(storage, job_id: str, verdicts: dict, assessment_frame_idx: int) -> None: | |
| """Merge verdict data into all stored frames for matching track_ids.""" | |
| with storage._lock: | |
| frames = storage._tracks.get(job_id, {}) | |
| for frame_idx, frame_tracks in frames.items(): | |
| for det in frame_tracks: | |
| tid = det.get("track_id") | |
| if tid and tid in verdicts: | |
| v = verdicts[tid] | |
| det["mission_relevant"] = v.get("mission_relevant", True) | |
| det["satisfies"] = v.get("satisfies") | |
| det["reason"] = v.get("reason", "") | |
| det["features"] = v.get("features", {}) | |
| det["assessment_status"] = "ASSESSED" | |
| det["assessment_frame_index"] = assessment_frame_idx | |
| # Store gpt_raw for frontend feature table | |
| det["gpt_raw"] = { | |
| "satisfies": v.get("satisfies"), | |
| "reason": v.get("reason", ""), | |
| **v.get("features", {}), | |
| } | |