Zhen Ye commited on
Commit
517108e
·
1 Parent(s): d257dcc

feat: add job processing timing and latency measurements to backend and frontend

Browse files
Files changed (5) hide show
  1. app.py +28 -2
  2. frontend/js/api/client.js +12 -0
  3. inference.py +4 -0
  4. jobs/background.py +30 -7
  5. jobs/models.py +6 -0
app.py CHANGED
@@ -28,6 +28,7 @@ except Exception as e:
28
  import asyncio
29
  import shutil
30
  import tempfile
 
31
  import uuid
32
  from contextlib import asynccontextmanager
33
  from datetime import timedelta
@@ -88,6 +89,7 @@ async def _enrich_first_frame_gpt(
88
  """
89
  if not enable_gpt or not detections:
90
  return
 
91
  try:
92
  # Non-LLM_EXTRACTED relevance filter runs BEFORE run_enrichment (FAST_PATH case)
93
  if mission_spec and mission_spec.parse_mode != "LLM_EXTRACTED":
@@ -99,7 +101,13 @@ async def _enrich_first_frame_gpt(
99
  if not filtered:
100
  for det in detections:
101
  det["assessment_status"] = AssessmentStatus.ASSESSED
102
- get_job_storage().update(job_id, first_frame_detections=detections)
 
 
 
 
 
 
103
  logging.info("All detections non-relevant for job %s; marked ASSESSED", job_id)
104
  return
105
 
@@ -107,13 +115,19 @@ async def _enrich_first_frame_gpt(
107
  run_enrichment, 0, frame, detections, mission_spec,
108
  job_id=job_id,
109
  )
 
 
110
  logging.info("Background GPT enrichment complete for job %s", job_id)
111
 
112
  if not gpt_results:
113
  # All detections filtered as not relevant
114
  for det in detections:
115
  det["assessment_status"] = AssessmentStatus.ASSESSED
116
- get_job_storage().update(job_id, first_frame_detections=detections)
 
 
 
 
117
  logging.info("All detections non-relevant for job %s; marked ASSESSED", job_id)
118
  return
119
 
@@ -127,6 +141,7 @@ async def _enrich_first_frame_gpt(
127
  job_id,
128
  first_frame_detections=detections,
129
  first_frame_gpt_results=gpt_results,
 
130
  )
131
  logging.info("Updated first_frame_detections with GPT results for job %s", job_id)
132
 
@@ -410,6 +425,7 @@ async def detect_async_endpoint(
410
  raise HTTPException(status_code=400, detail="Video file is required.")
411
 
412
  job_id = uuid.uuid4().hex
 
413
  job_dir = get_job_directory(job_id)
414
  input_path = get_input_video_path(job_id)
415
  output_path = get_output_video_path(job_id)
@@ -482,6 +498,8 @@ async def detect_async_endpoint(
482
  segmenter_name=segmenter,
483
  )
484
  cv2.imwrite(str(first_frame_path), processed_frame)
 
 
485
  # GPT and depth are now handled in the async pipeline (enrichment thread)
486
  depth_map = None
487
  first_frame_gpt_results = None
@@ -509,6 +527,7 @@ async def detect_async_endpoint(
509
  mission_spec=mission_spec,
510
  mission_mode=mission_mode,
511
  first_frame_gpt_results=first_frame_gpt_results,
 
512
  )
513
  get_job_storage().create(job)
514
  asyncio.create_task(process_video_async(job_id))
@@ -563,6 +582,13 @@ async def detect_status(job_id: str):
563
  "completed_at": job.completed_at.isoformat() if job.completed_at else None,
564
  "error": job.error,
565
  "first_frame_detections": job.first_frame_detections,
 
 
 
 
 
 
 
566
  }
567
 
568
 
 
28
  import asyncio
29
  import shutil
30
  import tempfile
31
+ import time
32
  import uuid
33
  from contextlib import asynccontextmanager
34
  from datetime import timedelta
 
89
  """
90
  if not enable_gpt or not detections:
91
  return
92
+ t_gpt_start = time.monotonic()
93
  try:
94
  # Non-LLM_EXTRACTED relevance filter runs BEFORE run_enrichment (FAST_PATH case)
95
  if mission_spec and mission_spec.parse_mode != "LLM_EXTRACTED":
 
101
  if not filtered:
102
  for det in detections:
103
  det["assessment_status"] = AssessmentStatus.ASSESSED
104
+ gpt_elapsed = time.monotonic() - t_gpt_start
105
+ logging.info("TIMING gpt_first_frame=%.3fs for job %s (all non-relevant)", gpt_elapsed, job_id)
106
+ get_job_storage().update(
107
+ job_id,
108
+ first_frame_detections=detections,
109
+ timing_gpt_first_frame_s=gpt_elapsed,
110
+ )
111
  logging.info("All detections non-relevant for job %s; marked ASSESSED", job_id)
112
  return
113
 
 
115
  run_enrichment, 0, frame, detections, mission_spec,
116
  job_id=job_id,
117
  )
118
+ gpt_elapsed = time.monotonic() - t_gpt_start
119
+ logging.info("TIMING gpt_first_frame=%.3fs for job %s", gpt_elapsed, job_id)
120
  logging.info("Background GPT enrichment complete for job %s", job_id)
121
 
122
  if not gpt_results:
123
  # All detections filtered as not relevant
124
  for det in detections:
125
  det["assessment_status"] = AssessmentStatus.ASSESSED
126
+ get_job_storage().update(
127
+ job_id,
128
+ first_frame_detections=detections,
129
+ timing_gpt_first_frame_s=gpt_elapsed,
130
+ )
131
  logging.info("All detections non-relevant for job %s; marked ASSESSED", job_id)
132
  return
133
 
 
141
  job_id,
142
  first_frame_detections=detections,
143
  first_frame_gpt_results=gpt_results,
144
+ timing_gpt_first_frame_s=gpt_elapsed,
145
  )
146
  logging.info("Updated first_frame_detections with GPT results for job %s", job_id)
147
 
 
425
  raise HTTPException(status_code=400, detail="Video file is required.")
426
 
427
  job_id = uuid.uuid4().hex
428
+ t_job_start = time.monotonic()
429
  job_dir = get_job_directory(job_id)
430
  input_path = get_input_video_path(job_id)
431
  output_path = get_output_video_path(job_id)
 
498
  segmenter_name=segmenter,
499
  )
500
  cv2.imwrite(str(first_frame_path), processed_frame)
501
+ first_frame_elapsed = time.monotonic() - t_job_start
502
+ logging.info("TIMING first_frame=%.3fs for job %s", first_frame_elapsed, job_id)
503
  # GPT and depth are now handled in the async pipeline (enrichment thread)
504
  depth_map = None
505
  first_frame_gpt_results = None
 
527
  mission_spec=mission_spec,
528
  mission_mode=mission_mode,
529
  first_frame_gpt_results=first_frame_gpt_results,
530
+ timing_first_frame_s=first_frame_elapsed,
531
  )
532
  get_job_storage().create(job)
533
  asyncio.create_task(process_video_async(job_id))
 
582
  "completed_at": job.completed_at.isoformat() if job.completed_at else None,
583
  "error": job.error,
584
  "first_frame_detections": job.first_frame_detections,
585
+ "timing": {
586
+ "first_frame_s": job.timing_first_frame_s,
587
+ "video_processing_s": job.timing_video_processing_s,
588
+ "gpt_first_frame_s": job.timing_gpt_first_frame_s,
589
+ "gpt_enrichment_s": job.timing_gpt_enrichment_s,
590
+ "total_s": job.timing_total_s,
591
+ },
592
  }
593
 
594
 
frontend/js/api/client.js CHANGED
@@ -192,6 +192,18 @@ APP.api.client.pollAsyncJob = async function () {
192
  syncGpt(status.first_frame_detections, "final sync");
193
  }
194
 
 
 
 
 
 
 
 
 
 
 
 
 
195
  try {
196
  await fetchProcessedVideo();
197
  await fetchDepthVideo();
 
192
  syncGpt(status.first_frame_detections, "final sync");
193
  }
194
 
195
+ // Display timing summary
196
+ if (status.timing) {
197
+ const t = status.timing;
198
+ const parts = [];
199
+ if (t.first_frame_s != null) parts.push(`1st frame: ${t.first_frame_s.toFixed(2)}s`);
200
+ if (t.video_processing_s != null) parts.push(`video: ${t.video_processing_s.toFixed(2)}s`);
201
+ if (t.gpt_first_frame_s != null) parts.push(`GPT: ${t.gpt_first_frame_s.toFixed(2)}s`);
202
+ if (t.gpt_enrichment_s != null) parts.push(`GPT enrich: ${t.gpt_enrichment_s.toFixed(2)}s`);
203
+ if (t.total_s != null) parts.push(`total: ${t.total_s.toFixed(2)}s`);
204
+ if (parts.length) log(`Timing: ${parts.join(" | ")}`, "t");
205
+ }
206
+
207
  try {
208
  await fetchProcessedVideo();
209
  await fetchDepthVideo();
inference.py CHANGED
@@ -1985,12 +1985,15 @@ def run_grounded_sam2_tracking(
1985
  break
1986
  frame_idx, frame_data, gpt_dets, ms = item
1987
  try:
 
1988
  gpt_res = run_enrichment(
1989
  frame_idx, frame_data, gpt_dets, ms,
1990
  first_frame_gpt_results=first_frame_gpt_results,
1991
  job_id=job_id,
1992
  relevance_refined_event=_relevance_refined,
1993
  )
 
 
1994
 
1995
  # GSAM2-specific: store results in per-track dict and persist to job storage
1996
  if gpt_res:
@@ -2032,6 +2035,7 @@ def run_grounded_sam2_tracking(
2032
  job_id,
2033
  first_frame_detections=_st.first_frame_detections,
2034
  first_frame_gpt_results=gpt_res,
 
2035
  )
2036
  logging.info(
2037
  "GSAM2 enrichment: updated first_frame_detections in job storage for %s",
 
1985
  break
1986
  frame_idx, frame_data, gpt_dets, ms = item
1987
  try:
1988
+ t_enrich_start = time.monotonic()
1989
  gpt_res = run_enrichment(
1990
  frame_idx, frame_data, gpt_dets, ms,
1991
  first_frame_gpt_results=first_frame_gpt_results,
1992
  job_id=job_id,
1993
  relevance_refined_event=_relevance_refined,
1994
  )
1995
+ gpt_enrichment_elapsed = time.monotonic() - t_enrich_start
1996
+ logging.info("TIMING gpt_enrichment=%.3fs for job %s", gpt_enrichment_elapsed, job_id)
1997
 
1998
  # GSAM2-specific: store results in per-track dict and persist to job storage
1999
  if gpt_res:
 
2035
  job_id,
2036
  first_frame_detections=_st.first_frame_detections,
2037
  first_frame_gpt_results=gpt_res,
2038
+ timing_gpt_enrichment_s=gpt_enrichment_elapsed,
2039
  )
2040
  logging.info(
2041
  "GSAM2 enrichment: updated first_frame_detections in job storage for %s",
jobs/background.py CHANGED
@@ -1,5 +1,6 @@
1
  import asyncio
2
  import logging
 
3
  from datetime import datetime
4
 
5
  import torch
@@ -24,6 +25,8 @@ async def process_video_async(job_id: str) -> None:
24
  # Create stream for live view
25
  stream_queue = create_stream(job_id)
26
 
 
 
27
  try:
28
  # Run detection or segmentation first
29
  if job.mode == "segmentation":
@@ -63,8 +66,8 @@ async def process_video_async(job_id: str) -> None:
63
  detection_path, detections_list = result_pkg
64
 
65
  # If depth was requested, checking if output path exists for depth
66
- # The unified pipeline creates 'output_video_path'.
67
- # If depth enabled, it might have written depth there?
68
  # Actually run_inference returns (video_path, detections).
69
  # If depth was ON, the video at video_path *has* depth overlays.
70
  # But the 'Depth Video' (heatmap only) is usually separate.
@@ -74,48 +77,68 @@ async def process_video_async(job_id: str) -> None:
74
  # Let's keep it simple: If depth enabled, the main video IS the depth view (overlay).
75
  # Or if we want separate `depth_output_path`, we need `run_inference` to handle it.
76
  # Let's assume for now `run_inference` writes the main visualization path.
77
-
78
  if job.depth_estimator_name:
79
  # In unified mode, the main video contains the depth viz
80
  depth_path = detection_path
81
  logging.info("Depth estimation included in main video for job %s", job_id)
82
 
 
 
 
 
 
 
83
  # Mark as completed (with or without depth)
84
  storage.update(
85
  job_id,
86
  status=JobStatus.COMPLETED,
87
- completed_at=datetime.utcnow(),
88
  output_video_path=detection_path,
89
  depth_output_path=depth_path,
90
  partial_success=partial_success,
91
  depth_error=depth_error,
 
 
92
  )
93
 
94
  except RuntimeError as exc:
 
 
 
95
  # Handle cancellation specifically
96
  if "cancelled" in str(exc).lower():
97
  logging.info("Job %s was cancelled", job_id)
98
  storage.update(
99
  job_id,
100
  status=JobStatus.CANCELLED,
101
- completed_at=datetime.utcnow(),
102
  error="Cancelled by user",
 
 
103
  )
104
  else:
105
  logging.exception("Background processing failed for job %s", job_id)
106
  storage.update(
107
  job_id,
108
  status=JobStatus.FAILED,
109
- completed_at=datetime.utcnow(),
110
  error=str(exc),
 
 
111
  )
112
  except Exception as exc:
 
 
 
113
  logging.exception("Background processing failed for job %s", job_id)
114
  storage.update(
115
  job_id,
116
  status=JobStatus.FAILED,
117
- completed_at=datetime.utcnow(),
118
  error=str(exc),
 
 
119
  )
120
  finally:
121
  remove_stream(job_id)
 
1
  import asyncio
2
  import logging
3
+ import time
4
  from datetime import datetime
5
 
6
  import torch
 
25
  # Create stream for live view
26
  stream_queue = create_stream(job_id)
27
 
28
+ t_video_start = time.monotonic()
29
+
30
  try:
31
  # Run detection or segmentation first
32
  if job.mode == "segmentation":
 
66
  detection_path, detections_list = result_pkg
67
 
68
  # If depth was requested, checking if output path exists for depth
69
+ # The unified pipeline creates 'output_video_path'.
70
+ # If depth enabled, it might have written depth there?
71
  # Actually run_inference returns (video_path, detections).
72
  # If depth was ON, the video at video_path *has* depth overlays.
73
  # But the 'Depth Video' (heatmap only) is usually separate.
 
77
  # Let's keep it simple: If depth enabled, the main video IS the depth view (overlay).
78
  # Or if we want separate `depth_output_path`, we need `run_inference` to handle it.
79
  # Let's assume for now `run_inference` writes the main visualization path.
80
+
81
  if job.depth_estimator_name:
82
  # In unified mode, the main video contains the depth viz
83
  depth_path = detection_path
84
  logging.info("Depth estimation included in main video for job %s", job_id)
85
 
86
+ video_elapsed = time.monotonic() - t_video_start
87
+ completed_at = datetime.utcnow()
88
+ total_elapsed = (completed_at - job.created_at).total_seconds()
89
+ logging.info("TIMING video_processing=%.3fs for job %s", video_elapsed, job_id)
90
+ logging.info("TIMING total=%.3fs for job %s", total_elapsed, job_id)
91
+
92
  # Mark as completed (with or without depth)
93
  storage.update(
94
  job_id,
95
  status=JobStatus.COMPLETED,
96
+ completed_at=completed_at,
97
  output_video_path=detection_path,
98
  depth_output_path=depth_path,
99
  partial_success=partial_success,
100
  depth_error=depth_error,
101
+ timing_video_processing_s=video_elapsed,
102
+ timing_total_s=total_elapsed,
103
  )
104
 
105
  except RuntimeError as exc:
106
+ video_elapsed = time.monotonic() - t_video_start
107
+ completed_at = datetime.utcnow()
108
+ total_elapsed = (completed_at - job.created_at).total_seconds()
109
  # Handle cancellation specifically
110
  if "cancelled" in str(exc).lower():
111
  logging.info("Job %s was cancelled", job_id)
112
  storage.update(
113
  job_id,
114
  status=JobStatus.CANCELLED,
115
+ completed_at=completed_at,
116
  error="Cancelled by user",
117
+ timing_video_processing_s=video_elapsed,
118
+ timing_total_s=total_elapsed,
119
  )
120
  else:
121
  logging.exception("Background processing failed for job %s", job_id)
122
  storage.update(
123
  job_id,
124
  status=JobStatus.FAILED,
125
+ completed_at=completed_at,
126
  error=str(exc),
127
+ timing_video_processing_s=video_elapsed,
128
+ timing_total_s=total_elapsed,
129
  )
130
  except Exception as exc:
131
+ video_elapsed = time.monotonic() - t_video_start
132
+ completed_at = datetime.utcnow()
133
+ total_elapsed = (completed_at - job.created_at).total_seconds()
134
  logging.exception("Background processing failed for job %s", job_id)
135
  storage.update(
136
  job_id,
137
  status=JobStatus.FAILED,
138
+ completed_at=completed_at,
139
  error=str(exc),
140
+ timing_video_processing_s=video_elapsed,
141
+ timing_total_s=total_elapsed,
142
  )
143
  finally:
144
  remove_stream(job_id)
jobs/models.py CHANGED
@@ -38,3 +38,9 @@ class JobInfo:
38
  mission_spec: Optional[Any] = None # utils.schemas.MissionSpecification
39
  mission_mode: str = "LEGACY" # "MISSION" or "LEGACY"
40
  first_frame_gpt_results: Optional[Dict[str, Any]] = None # Cached GPT results from process_first_frame
 
 
 
 
 
 
 
38
  mission_spec: Optional[Any] = None # utils.schemas.MissionSpecification
39
  mission_mode: str = "LEGACY" # "MISSION" or "LEGACY"
40
  first_frame_gpt_results: Optional[Dict[str, Any]] = None # Cached GPT results from process_first_frame
41
+ # Latency measurements (seconds)
42
+ timing_first_frame_s: Optional[float] = None
43
+ timing_video_processing_s: Optional[float] = None
44
+ timing_gpt_first_frame_s: Optional[float] = None
45
+ timing_gpt_enrichment_s: Optional[float] = None
46
+ timing_total_s: Optional[float] = None