KSvend Claude Happy commited on
Commit
5bf6898
·
1 Parent(s): c90dd1c

fix: wrap non-finished batch jobs to fail fast on download

Browse files

When poll times out with 2/3 jobs finished, calling
download_results() on a still-running job blocks for 30 min
(HTTP read timeout). Now non-finished jobs are wrapped with
_SkippedJob that raises immediately — the harvest methods'
existing try/except handles this as graceful degradation.

Generated with [Claude Code](https://claude.ai/code)
via [Happy](https://happy.engineering)

Co-Authored-By: Claude <noreply@anthropic.com>
Co-Authored-By: Happy <yesreply@happy.engineering>

Files changed (1) hide show
  1. app/worker.py +17 -1
app/worker.py CHANGED
@@ -22,6 +22,16 @@ BATCH_POLL_INTERVAL = 30 # seconds between status checks
22
  BATCH_TIMEOUT = 5400 # 90 minutes per indicator
23
 
24
 
 
 
 
 
 
 
 
 
 
 
25
  def _save_spatial_json(spatial, status_value: str, path: str) -> None:
26
  """Serialize spatial data to JSON for the frontend."""
27
  if spatial is None:
@@ -113,13 +123,19 @@ async def process_job(job_id: str, db: Database, registry: IndicatorRegistry) ->
113
  # Harvest if any job finished (harvest methods handle per-job failures)
114
  any_finished = any(s == "finished" for s in statuses)
115
  if any_finished:
 
 
 
 
 
 
116
  await db.update_job_progress(job_id, indicator_id, "downloading")
117
  try:
118
  result = await indicator.harvest(
119
  job.request.aoi,
120
  job.request.time_range,
121
  season_months=job.request.season_months(),
122
- batch_jobs=jobs,
123
  )
124
  except Exception as exc:
125
  logger.warning("Harvest failed for %s, using fallback: %s", indicator_id, exc)
 
22
  BATCH_TIMEOUT = 5400 # 90 minutes per indicator
23
 
24
 
25
+ class _SkippedJob:
26
+ """Stub for batch jobs that didn't finish — fails fast on download."""
27
+ def __init__(self, job_id: str):
28
+ self.job_id = job_id
29
+ def download_results(self, *a, **kw):
30
+ raise RuntimeError(f"Job {self.job_id} not finished, skipping download")
31
+ def status(self):
32
+ return "skipped"
33
+
34
+
35
  def _save_spatial_json(spatial, status_value: str, path: str) -> None:
36
  """Serialize spatial data to JSON for the frontend."""
37
  if spatial is None:
 
123
  # Harvest if any job finished (harvest methods handle per-job failures)
124
  any_finished = any(s == "finished" for s in statuses)
125
  if any_finished:
126
+ # Wrap non-finished jobs so download_results() fails fast
127
+ # instead of blocking for 30 min on a still-running job
128
+ harvest_jobs = [
129
+ j if s == "finished" else _SkippedJob(getattr(j, 'job_id', '?'))
130
+ for j, s in zip(jobs, statuses)
131
+ ]
132
  await db.update_job_progress(job_id, indicator_id, "downloading")
133
  try:
134
  result = await indicator.harvest(
135
  job.request.aoi,
136
  job.request.time_range,
137
  season_months=job.request.season_months(),
138
+ batch_jobs=harvest_jobs,
139
  )
140
  except Exception as exc:
141
  logger.warning("Harvest failed for %s, using fallback: %s", indicator_id, exc)