KSvend Claude Happy commited on
Commit
ad28ab3
·
1 Parent(s): f79ac92

fix: sequential batch indicator processing with 40-min per-indicator timeout

Browse files

Generated with [Claude Code](https://claude.ai/code)
via [Happy](https://happy.engineering)

Co-Authored-By: Claude <noreply@anthropic.com>
Co-Authored-By: Happy <yesreply@happy.engineering>

Files changed (1) hide show
  1. app/worker.py +59 -64
app/worker.py CHANGED
@@ -19,7 +19,7 @@ from app.core.email import send_completion_email
19
  logger = logging.getLogger(__name__)
20
 
21
  BATCH_POLL_INTERVAL = 30 # seconds between status checks
22
- BATCH_TIMEOUT = 1200 # 20 minutes maximum wait
23
 
24
 
25
  def _save_spatial_json(spatial, status_value: str, path: str) -> None:
@@ -67,10 +67,11 @@ async def process_job(job_id: str, db: Database, registry: IndicatorRegistry) ->
67
  else:
68
  process_indicators.append((indicator_id, indicator))
69
 
70
- # -- Phase 1: Submit batch jobs --
71
- batch_submissions = {}
72
- fallback_ids = set()
73
  for indicator_id, indicator in batch_indicators.items():
 
 
 
74
  await db.update_job_progress(job_id, indicator_id, "submitting")
75
  try:
76
  jobs = await indicator.submit_batch(
@@ -78,71 +79,52 @@ async def process_job(job_id: str, db: Database, registry: IndicatorRegistry) ->
78
  job.request.time_range,
79
  season_months=job.request.season_months(),
80
  )
81
- batch_submissions[indicator_id] = jobs
82
  job_ids = [getattr(j, 'job_id', '?') for j in jobs]
83
  print(f"[Aperture] Submitted {indicator_id} batch jobs: {job_ids}")
84
  await db.update_job_progress(job_id, indicator_id, "processing on CDSE")
85
  except Exception as exc:
86
  logger.warning("Batch submit failed for %s, will use fallback: %s", indicator_id, exc)
87
- fallback_ids.add(indicator_id)
88
-
89
- # -- Phase 2: Poll until all batch jobs finish --
90
- poll_start = time.monotonic()
91
- pending = dict(batch_submissions)
92
-
93
- while pending:
94
- # Check current statuses before sleeping
95
- elapsed = time.monotonic() - poll_start
96
- for indicator_id in list(pending.keys()):
97
- jobs = pending[indicator_id]
98
- statuses = [j.status() for j in jobs]
99
- job_ids = [getattr(j, 'job_id', '?') for j in jobs]
100
- print(f"[Aperture] Poll {indicator_id} ({elapsed:.0f}s): {list(zip(job_ids, statuses))}")
101
- if all(s == "finished" for s in statuses):
102
- logger.info("Batch jobs finished for %s", indicator_id)
103
- del pending[indicator_id]
104
- elif any(s in ("error", "canceled") for s in statuses):
105
- logger.warning("Batch job failed for %s: %s", indicator_id, statuses)
106
- del pending[indicator_id]
107
-
108
- if not pending:
109
- break
110
-
111
- if elapsed >= BATCH_TIMEOUT:
112
- logger.warning("Batch poll timeout after %.0fs, remaining: %s", elapsed, list(pending.keys()))
113
- fallback_ids.update(pending.keys())
114
- break
115
-
116
- await asyncio.sleep(BATCH_POLL_INTERVAL)
117
-
118
- # -- Phase 3: Harvest batch results + process non-batch indicators --
119
- for indicator_id in job.request.indicator_ids:
120
- indicator = registry.get(indicator_id)
121
-
122
- if indicator_id in fallback_ids:
123
- await db.update_job_progress(job_id, indicator_id, "processing")
124
- result = await indicator.process(
125
- job.request.aoi,
126
- job.request.time_range,
127
- season_months=job.request.season_months(),
128
- )
129
- elif indicator_id in batch_submissions:
130
- await db.update_job_progress(job_id, indicator_id, "downloading")
131
- try:
132
- result = await indicator.harvest(
133
- job.request.aoi,
134
- job.request.time_range,
135
- season_months=job.request.season_months(),
136
- batch_jobs=batch_submissions[indicator_id],
137
- )
138
- except Exception as exc:
139
- logger.warning("Harvest failed for %s, using fallback: %s", indicator_id, exc)
140
- result = await indicator.process(
141
- job.request.aoi,
142
- job.request.time_range,
143
- season_months=job.request.season_months(),
144
- )
145
- else:
146
  await db.update_job_progress(job_id, indicator_id, "processing")
147
  result = await indicator.process(
148
  job.request.aoi,
@@ -153,7 +135,20 @@ async def process_job(job_id: str, db: Database, registry: IndicatorRegistry) ->
153
  spatial = indicator.get_spatial_data()
154
  if spatial is not None:
155
  spatial_cache[indicator_id] = spatial
 
 
156
 
 
 
 
 
 
 
 
 
 
 
 
157
  await db.save_job_result(job_id, result)
158
  await db.update_job_progress(job_id, indicator_id, "complete")
159
 
 
19
  logger = logging.getLogger(__name__)
20
 
21
  BATCH_POLL_INTERVAL = 30 # seconds between status checks
22
+ BATCH_TIMEOUT = 2400 # 40 minutes per indicator
23
 
24
 
25
  def _save_spatial_json(spatial, status_value: str, path: str) -> None:
 
67
  else:
68
  process_indicators.append((indicator_id, indicator))
69
 
70
+ # -- Process batch indicators sequentially --
 
 
71
  for indicator_id, indicator in batch_indicators.items():
72
+ result = None
73
+
74
+ # Submit
75
  await db.update_job_progress(job_id, indicator_id, "submitting")
76
  try:
77
  jobs = await indicator.submit_batch(
 
79
  job.request.time_range,
80
  season_months=job.request.season_months(),
81
  )
 
82
  job_ids = [getattr(j, 'job_id', '?') for j in jobs]
83
  print(f"[Aperture] Submitted {indicator_id} batch jobs: {job_ids}")
84
  await db.update_job_progress(job_id, indicator_id, "processing on CDSE")
85
  except Exception as exc:
86
  logger.warning("Batch submit failed for %s, will use fallback: %s", indicator_id, exc)
87
+ jobs = None
88
+
89
+ # Poll
90
+ if jobs is not None:
91
+ poll_start = time.monotonic()
92
+ finished = False
93
+ while True:
94
+ elapsed = time.monotonic() - poll_start
95
+ statuses = [j.status() for j in jobs]
96
+ job_ids = [getattr(j, 'job_id', '?') for j in jobs]
97
+ print(f"[Aperture] Poll {indicator_id} ({elapsed:.0f}s): {list(zip(job_ids, statuses))}")
98
+
99
+ if all(s == "finished" for s in statuses):
100
+ logger.info("Batch jobs finished for %s", indicator_id)
101
+ finished = True
102
+ break
103
+ elif any(s in ("error", "canceled") for s in statuses):
104
+ logger.warning("Batch job failed for %s: %s", indicator_id, statuses)
105
+ break
106
+
107
+ if elapsed >= BATCH_TIMEOUT:
108
+ logger.warning("Batch poll timeout after %.0fs for %s", elapsed, indicator_id)
109
+ break
110
+
111
+ await asyncio.sleep(BATCH_POLL_INTERVAL)
112
+
113
+ # Harvest
114
+ if finished:
115
+ await db.update_job_progress(job_id, indicator_id, "downloading")
116
+ try:
117
+ result = await indicator.harvest(
118
+ job.request.aoi,
119
+ job.request.time_range,
120
+ season_months=job.request.season_months(),
121
+ batch_jobs=jobs,
122
+ )
123
+ except Exception as exc:
124
+ logger.warning("Harvest failed for %s, using fallback: %s", indicator_id, exc)
125
+
126
+ # Fallback if submit failed, poll timed out, jobs errored, or harvest failed
127
+ if result is None:
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
128
  await db.update_job_progress(job_id, indicator_id, "processing")
129
  result = await indicator.process(
130
  job.request.aoi,
 
135
  spatial = indicator.get_spatial_data()
136
  if spatial is not None:
137
  spatial_cache[indicator_id] = spatial
138
+ await db.save_job_result(job_id, result)
139
+ await db.update_job_progress(job_id, indicator_id, "complete")
140
 
141
+ # -- Process non-batch indicators --
142
+ for indicator_id, indicator in process_indicators:
143
+ await db.update_job_progress(job_id, indicator_id, "processing")
144
+ result = await indicator.process(
145
+ job.request.aoi,
146
+ job.request.time_range,
147
+ season_months=job.request.season_months(),
148
+ )
149
+ spatial = indicator.get_spatial_data()
150
+ if spatial is not None:
151
+ spatial_cache[indicator_id] = spatial
152
  await db.save_job_result(job_id, result)
153
  await db.update_job_progress(job_id, indicator_id, "complete")
154