KSvend Claude Happy commited on
Commit
f20ef5a
·
1 Parent(s): 9791720

docs: add openEO batch jobs implementation plan

Browse files

9 tasks covering: submit_as_batch helper, BaseIndicator batch interface,
NDVI submit/harvest, three-phase worker, E2E tests, live verification,
cleanup, and baseline restoration.

Generated with [Claude Code](https://claude.ai/code)
via [Happy](https://happy.engineering)

Co-Authored-By: Claude <noreply@anthropic.com>
Co-Authored-By: Happy <yesreply@happy.engineering>

docs/superpowers/plans/2026-04-01-openeo-batch-jobs.md ADDED
@@ -0,0 +1,1272 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # openEO Batch Job Processing — Implementation Plan
2
+
3
+ > **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking.
4
+
5
+ **Goal:** Replace synchronous `cube.download()` with openEO batch jobs so NDVI (and later SAR, buildup, water) can process on CDSE free tier without hanging.
6
+
7
+ **Architecture:** Three-phase worker (submit → poll → harvest). openEO indicators gain `submit_batch()` and `harvest()` methods; non-openEO indicators keep `process()`. All batch jobs run in parallel on CDSE during the poll phase.
8
+
9
+ **Tech Stack:** openEO Python client (`openeo.rest.job.BatchJob`), asyncio, existing indicator/worker infrastructure.
10
+
11
+ **Spec:** `docs/superpowers/specs/2026-04-01-openeo-batch-jobs-design.md`
12
+
13
+ ---
14
+
15
+ ### Task 1: Add `submit_as_batch()` helper to openeo_client.py
16
+
17
+ **Files:**
18
+ - Modify: `app/openeo_client.py` (add function after `_bbox_dict`)
19
+ - Test: `tests/test_openeo_client.py` (add test at end)
20
+
21
+ - [ ] **Step 1: Write the failing test**
22
+
23
+ Add to `tests/test_openeo_client.py`:
24
+
25
+ ```python
26
+ def test_submit_as_batch_creates_and_starts_job():
27
+ """submit_as_batch() creates a batch job and starts it."""
28
+ from app.openeo_client import submit_as_batch
29
+
30
+ mock_conn = MagicMock()
31
+ mock_cube = MagicMock()
32
+ mock_job = MagicMock()
33
+ mock_job.job_id = "j-12345"
34
+ mock_conn.create_job.return_value = mock_job
35
+
36
+ result = submit_as_batch(mock_conn, mock_cube, "ndvi-current-Test")
37
+
38
+ mock_conn.create_job.assert_called_once_with(mock_cube, title="ndvi-current-Test")
39
+ mock_job.start.assert_called_once()
40
+ assert result is mock_job
41
+ ```
42
+
43
+ - [ ] **Step 2: Run test to verify it fails**
44
+
45
+ Run: `pytest tests/test_openeo_client.py::test_submit_as_batch_creates_and_starts_job -v`
46
+ Expected: FAIL with `ImportError: cannot import name 'submit_as_batch'`
47
+
48
+ - [ ] **Step 3: Write the implementation**
49
+
50
+ Add to `app/openeo_client.py` after the `_bbox_dict` function (after line 59):
51
+
52
+ ```python
53
+ def submit_as_batch(
54
+ conn: openeo.Connection, cube: openeo.DataCube, title: str
55
+ ) -> "openeo.rest.job.BatchJob":
56
+ """Submit a datacube as a batch job on CDSE and start it."""
57
+ job = conn.create_job(cube, title=title)
58
+ job.start()
59
+ logger.info("Batch job %s started: %s", job.job_id, title)
60
+ return job
61
+ ```
62
+
63
+ - [ ] **Step 4: Run test to verify it passes**
64
+
65
+ Run: `pytest tests/test_openeo_client.py::test_submit_as_batch_creates_and_starts_job -v`
66
+ Expected: PASS
67
+
68
+ - [ ] **Step 5: Commit**
69
+
70
+ ```bash
71
+ git add app/openeo_client.py tests/test_openeo_client.py
72
+ git commit -m "feat: add submit_as_batch() helper for openEO batch jobs"
73
+ ```
74
+
75
+ ---
76
+
77
+ ### Task 2: Add batch interface to BaseIndicator
78
+
79
+ **Files:**
80
+ - Modify: `app/indicators/base.py` (add `uses_batch`, `submit_batch`, `harvest`)
81
+ - Test: `tests/test_indicator_base.py` (new file)
82
+
83
+ - [ ] **Step 1: Write the failing test**
84
+
85
+ Create `tests/test_indicator_base.py`:
86
+
87
+ ```python
88
+ """Tests for BaseIndicator batch interface."""
89
+ from __future__ import annotations
90
+
91
+ import pytest
92
+ from unittest.mock import MagicMock
93
+ from app.indicators.base import BaseIndicator
94
+ from app.models import AOI, TimeRange, IndicatorResult, StatusLevel, TrendDirection, ConfidenceLevel
95
+ from datetime import date
96
+
97
+
98
+ class PlainIndicator(BaseIndicator):
99
+ """Non-batch indicator for testing."""
100
+ id = "plain"
101
+ name = "Plain"
102
+ category = "T1"
103
+ question = "Test?"
104
+ estimated_minutes = 1
105
+
106
+ async def process(self, aoi, time_range, season_months=None):
107
+ return IndicatorResult(
108
+ indicator_id="plain", headline="ok",
109
+ status=StatusLevel.GREEN, trend=TrendDirection.STABLE,
110
+ confidence=ConfidenceLevel.HIGH, map_layer_path="",
111
+ chart_data={}, summary="", methodology="", limitations=[],
112
+ )
113
+
114
+
115
+ class BatchIndicator(BaseIndicator):
116
+ """Batch indicator for testing."""
117
+ id = "batch"
118
+ name = "Batch"
119
+ category = "T2"
120
+ question = "Batch test?"
121
+ estimated_minutes = 5
122
+ uses_batch = True
123
+
124
+ async def process(self, aoi, time_range, season_months=None):
125
+ return IndicatorResult(
126
+ indicator_id="batch", headline="fallback",
127
+ status=StatusLevel.GREEN, trend=TrendDirection.STABLE,
128
+ confidence=ConfidenceLevel.LOW, map_layer_path="",
129
+ chart_data={}, data_source="placeholder",
130
+ summary="", methodology="", limitations=[],
131
+ )
132
+
133
+ async def submit_batch(self, aoi, time_range, season_months=None):
134
+ return [MagicMock()]
135
+
136
+ async def harvest(self, aoi, time_range, season_months=None, batch_jobs=None):
137
+ return IndicatorResult(
138
+ indicator_id="batch", headline="harvested",
139
+ status=StatusLevel.GREEN, trend=TrendDirection.STABLE,
140
+ confidence=ConfidenceLevel.HIGH, map_layer_path="",
141
+ chart_data={}, data_source="satellite",
142
+ summary="", methodology="", limitations=[],
143
+ )
144
+
145
+
146
+ def test_plain_indicator_uses_batch_is_false():
147
+ ind = PlainIndicator()
148
+ assert ind.uses_batch is False
149
+
150
+
151
+ def test_batch_indicator_uses_batch_is_true():
152
+ ind = BatchIndicator()
153
+ assert ind.uses_batch is True
154
+
155
+
156
+ @pytest.mark.asyncio
157
+ async def test_plain_indicator_submit_batch_raises():
158
+ ind = PlainIndicator()
159
+ with pytest.raises(NotImplementedError):
160
+ await ind.submit_batch(
161
+ AOI(name="T", bbox=[32, 15, 33, 16]),
162
+ TimeRange(start=date(2025, 1, 1), end=date(2025, 6, 30)),
163
+ )
164
+
165
+
166
+ @pytest.mark.asyncio
167
+ async def test_plain_indicator_harvest_raises():
168
+ ind = PlainIndicator()
169
+ with pytest.raises(NotImplementedError):
170
+ await ind.harvest(
171
+ AOI(name="T", bbox=[32, 15, 33, 16]),
172
+ TimeRange(start=date(2025, 1, 1), end=date(2025, 6, 30)),
173
+ batch_jobs=[],
174
+ )
175
+
176
+
177
+ @pytest.mark.asyncio
178
+ async def test_batch_indicator_submit_returns_jobs():
179
+ ind = BatchIndicator()
180
+ jobs = await ind.submit_batch(
181
+ AOI(name="T", bbox=[32, 15, 33, 16]),
182
+ TimeRange(start=date(2025, 1, 1), end=date(2025, 6, 30)),
183
+ )
184
+ assert len(jobs) == 1
185
+
186
+
187
+ @pytest.mark.asyncio
188
+ async def test_batch_indicator_harvest_returns_result():
189
+ ind = BatchIndicator()
190
+ result = await ind.harvest(
191
+ AOI(name="T", bbox=[32, 15, 33, 16]),
192
+ TimeRange(start=date(2025, 1, 1), end=date(2025, 6, 30)),
193
+ batch_jobs=[MagicMock()],
194
+ )
195
+ assert result.data_source == "satellite"
196
+ assert result.headline == "harvested"
197
+ ```
198
+
199
+ - [ ] **Step 2: Run tests to verify they fail**
200
+
201
+ Run: `pytest tests/test_indicator_base.py -v`
202
+ Expected: FAIL — `uses_batch` attribute missing on `PlainIndicator`, `submit_batch` and `harvest` not on `BaseIndicator`
203
+
204
+ - [ ] **Step 3: Write the implementation**
205
+
206
+ Modify `app/indicators/base.py`. Add to the `BaseIndicator` class, after the `process` abstract method (after line 46):
207
+
208
+ ```python
209
+ uses_batch: bool = False
210
+
211
+ async def submit_batch(
212
+ self, aoi: AOI, time_range: TimeRange, season_months: list[int] | None = None
213
+ ) -> list:
214
+ """Submit openEO batch jobs. Override in batch indicators."""
215
+ raise NotImplementedError(f"{self.id} does not support batch processing")
216
+
217
+ async def harvest(
218
+ self, aoi: AOI, time_range: TimeRange, season_months: list[int] | None = None,
219
+ batch_jobs: list | None = None,
220
+ ) -> IndicatorResult:
221
+ """Download completed batch jobs and compute result. Override in batch indicators."""
222
+ raise NotImplementedError(f"{self.id} does not support batch harvesting")
223
+ ```
224
+
225
+ - [ ] **Step 4: Run tests to verify they pass**
226
+
227
+ Run: `pytest tests/test_indicator_base.py -v`
228
+ Expected: All 6 tests PASS
229
+
230
+ - [ ] **Step 5: Run full suite to check nothing broke**
231
+
232
+ Run: `pytest tests/ -x -q`
233
+ Expected: 145+ tests pass
234
+
235
+ - [ ] **Step 6: Commit**
236
+
237
+ ```bash
238
+ git add app/indicators/base.py tests/test_indicator_base.py
239
+ git commit -m "feat: add batch job interface to BaseIndicator (uses_batch, submit_batch, harvest)"
240
+ ```
241
+
242
+ ---
243
+
244
+ ### Task 3: Implement NDVI `submit_batch()`
245
+
246
+ **Files:**
247
+ - Modify: `app/indicators/ndvi.py` (add `uses_batch`, `submit_batch`)
248
+ - Test: `tests/test_indicator_ndvi.py` (add test)
249
+
250
+ - [ ] **Step 1: Write the failing test**
251
+
252
+ Add to `tests/test_indicator_ndvi.py`:
253
+
254
+ ```python
255
+ @pytest.mark.asyncio
256
+ async def test_ndvi_submit_batch_creates_three_jobs(test_aoi, test_time_range):
257
+ """submit_batch() creates current, baseline, and true-color batch jobs."""
258
+ from app.indicators.ndvi import NdviIndicator
259
+
260
+ indicator = NdviIndicator()
261
+
262
+ mock_conn = MagicMock()
263
+ mock_job = MagicMock()
264
+ mock_job.job_id = "j-test"
265
+ mock_conn.create_job.return_value = mock_job
266
+
267
+ with patch("app.indicators.ndvi.get_connection", return_value=mock_conn), \
268
+ patch("app.indicators.ndvi.build_ndvi_graph") as mock_ndvi_graph, \
269
+ patch("app.indicators.ndvi.build_true_color_graph") as mock_tc_graph:
270
+
271
+ mock_ndvi_graph.return_value = MagicMock()
272
+ mock_tc_graph.return_value = MagicMock()
273
+
274
+ jobs = await indicator.submit_batch(test_aoi, test_time_range)
275
+
276
+ assert len(jobs) == 3
277
+ assert mock_conn.create_job.call_count == 3
278
+ assert mock_job.start.call_count == 3
279
+
280
+ # Verify graph builders called with correct temporal extents
281
+ assert mock_ndvi_graph.call_count == 2 # current + baseline
282
+ assert mock_tc_graph.call_count == 1 # true-color
283
+ ```
284
+
285
+ - [ ] **Step 2: Run test to verify it fails**
286
+
287
+ Run: `pytest tests/test_indicator_ndvi.py::test_ndvi_submit_batch_creates_three_jobs -v`
288
+ Expected: FAIL — `NdviIndicator` has no `submit_batch` override yet
289
+
290
+ - [ ] **Step 3: Write the implementation**
291
+
292
+ Add to `app/indicators/ndvi.py` in the `NdviIndicator` class. Add import at top of file:
293
+
294
+ ```python
295
+ from app.openeo_client import get_connection, build_ndvi_graph, build_true_color_graph, _bbox_dict, submit_as_batch
296
+ ```
297
+
298
+ Add class attribute and method after `_true_color_path` (after line 42):
299
+
300
+ ```python
301
+ uses_batch = True
302
+
303
+ async def submit_batch(
304
+ self, aoi: AOI, time_range: TimeRange, season_months: list[int] | None = None
305
+ ) -> list:
306
+ conn = get_connection()
307
+ bbox = _bbox_dict(aoi.bbox)
308
+
309
+ current_start = time_range.start.isoformat()
310
+ current_end = time_range.end.isoformat()
311
+
312
+ baseline_start = date(
313
+ time_range.start.year - BASELINE_YEARS,
314
+ time_range.start.month,
315
+ time_range.start.day,
316
+ ).isoformat()
317
+ baseline_end = date(
318
+ time_range.start.year,
319
+ time_range.start.month,
320
+ time_range.start.day,
321
+ ).isoformat()
322
+
323
+ current_cube = build_ndvi_graph(
324
+ conn=conn, bbox=bbox,
325
+ temporal_extent=[current_start, current_end],
326
+ resolution_m=RESOLUTION_M,
327
+ )
328
+ baseline_cube = build_ndvi_graph(
329
+ conn=conn, bbox=bbox,
330
+ temporal_extent=[baseline_start, baseline_end],
331
+ resolution_m=RESOLUTION_M,
332
+ )
333
+ true_color_cube = build_true_color_graph(
334
+ conn=conn, bbox=bbox,
335
+ temporal_extent=[current_start, current_end],
336
+ resolution_m=RESOLUTION_M,
337
+ )
338
+
339
+ return [
340
+ submit_as_batch(conn, current_cube, f"ndvi-current-{aoi.name}"),
341
+ submit_as_batch(conn, baseline_cube, f"ndvi-baseline-{aoi.name}"),
342
+ submit_as_batch(conn, true_color_cube, f"ndvi-truecolor-{aoi.name}"),
343
+ ]
344
+ ```
345
+
346
+ - [ ] **Step 4: Run test to verify it passes**
347
+
348
+ Run: `pytest tests/test_indicator_ndvi.py::test_ndvi_submit_batch_creates_three_jobs -v`
349
+ Expected: PASS
350
+
351
+ - [ ] **Step 5: Commit**
352
+
353
+ ```bash
354
+ git add app/indicators/ndvi.py tests/test_indicator_ndvi.py
355
+ git commit -m "feat: implement NdviIndicator.submit_batch() for openEO batch jobs"
356
+ ```
357
+
358
+ ---
359
+
360
+ ### Task 4: Implement NDVI `harvest()`
361
+
362
+ **Files:**
363
+ - Modify: `app/indicators/ndvi.py` (add `harvest`)
364
+ - Test: `tests/test_indicator_ndvi.py` (add test)
365
+
366
+ - [ ] **Step 1: Write the failing test**
367
+
368
+ Add to `tests/test_indicator_ndvi.py`:
369
+
370
+ ```python
371
+ @pytest.mark.asyncio
372
+ async def test_ndvi_harvest_computes_result_from_batch_jobs(test_aoi, test_time_range):
373
+ """harvest() downloads batch results and returns IndicatorResult."""
374
+ from app.indicators.ndvi import NdviIndicator
375
+
376
+ indicator = NdviIndicator()
377
+
378
+ with tempfile.TemporaryDirectory() as tmpdir:
379
+ ndvi_path = os.path.join(tmpdir, "ndvi.tif")
380
+ rgb_path = os.path.join(tmpdir, "rgb.tif")
381
+ _mock_ndvi_tif(ndvi_path)
382
+ _mock_true_color_tif(rgb_path)
383
+
384
+ def make_mock_job(src_path):
385
+ job = MagicMock()
386
+ job.job_id = "j-test"
387
+
388
+ def fake_download_results(target):
389
+ import shutil
390
+ os.makedirs(target, exist_ok=True)
391
+ dest = os.path.join(target, "result.tif")
392
+ shutil.copy(src_path, dest)
393
+ from pathlib import Path
394
+ return {Path(dest): {"type": "image/tiff"}}
395
+ job.download_results.side_effect = fake_download_results
396
+ job.status.return_value = "finished"
397
+ return job
398
+
399
+ current_job = make_mock_job(ndvi_path)
400
+ baseline_job = make_mock_job(ndvi_path)
401
+ true_color_job = make_mock_job(rgb_path)
402
+
403
+ result = await indicator.harvest(
404
+ test_aoi, test_time_range,
405
+ batch_jobs=[current_job, baseline_job, true_color_job],
406
+ )
407
+
408
+ assert result.indicator_id == "ndvi"
409
+ assert result.data_source == "satellite"
410
+ assert result.status in (StatusLevel.GREEN, StatusLevel.AMBER, StatusLevel.RED)
411
+ assert result.confidence in (ConfidenceLevel.HIGH, ConfidenceLevel.MODERATE, ConfidenceLevel.LOW)
412
+ assert len(result.chart_data.get("dates", [])) > 0
413
+
414
+
415
+ @pytest.mark.asyncio
416
+ async def test_ndvi_harvest_degrades_when_baseline_fails(test_aoi, test_time_range):
417
+ """harvest() returns partial result when baseline job failed."""
418
+ from app.indicators.ndvi import NdviIndicator
419
+
420
+ indicator = NdviIndicator()
421
+
422
+ with tempfile.TemporaryDirectory() as tmpdir:
423
+ ndvi_path = os.path.join(tmpdir, "ndvi.tif")
424
+ rgb_path = os.path.join(tmpdir, "rgb.tif")
425
+ _mock_ndvi_tif(ndvi_path)
426
+ _mock_true_color_tif(rgb_path)
427
+
428
+ def make_mock_job(src_path, status="finished"):
429
+ job = MagicMock()
430
+ job.job_id = "j-test"
431
+ job.status.return_value = status
432
+
433
+ def fake_download_results(target):
434
+ if status == "error":
435
+ raise Exception("Batch job failed on CDSE")
436
+ os.makedirs(target, exist_ok=True)
437
+ dest = os.path.join(target, "result.tif")
438
+ import shutil
439
+ shutil.copy(src_path, dest)
440
+ from pathlib import Path
441
+ return {Path(dest): {"type": "image/tiff"}}
442
+ job.download_results.side_effect = fake_download_results
443
+ return job
444
+
445
+ current_job = make_mock_job(ndvi_path)
446
+ baseline_job = make_mock_job(ndvi_path, status="error")
447
+ true_color_job = make_mock_job(rgb_path)
448
+
449
+ result = await indicator.harvest(
450
+ test_aoi, test_time_range,
451
+ batch_jobs=[current_job, baseline_job, true_color_job],
452
+ )
453
+
454
+ assert result.indicator_id == "ndvi"
455
+ assert result.data_source == "satellite"
456
+ assert result.confidence == ConfidenceLevel.LOW
457
+ assert result.trend == TrendDirection.STABLE
458
+
459
+
460
+ @pytest.mark.asyncio
461
+ async def test_ndvi_harvest_falls_back_when_current_fails(test_aoi, test_time_range):
462
+ """harvest() returns placeholder when current NDVI job failed."""
463
+ from app.indicators.ndvi import NdviIndicator
464
+
465
+ indicator = NdviIndicator()
466
+
467
+ current_job = MagicMock()
468
+ current_job.status.return_value = "error"
469
+ current_job.download_results.side_effect = Exception("failed")
470
+ baseline_job = MagicMock()
471
+ baseline_job.status.return_value = "finished"
472
+ true_color_job = MagicMock()
473
+ true_color_job.status.return_value = "finished"
474
+
475
+ result = await indicator.harvest(
476
+ test_aoi, test_time_range,
477
+ batch_jobs=[current_job, baseline_job, true_color_job],
478
+ )
479
+
480
+ assert result.data_source == "placeholder"
481
+ ```
482
+
483
+ - [ ] **Step 2: Run tests to verify they fail**
484
+
485
+ Run: `pytest tests/test_indicator_ndvi.py -k harvest -v`
486
+ Expected: FAIL — `harvest` not implemented
487
+
488
+ - [ ] **Step 3: Write the implementation**
489
+
490
+ Add to `app/indicators/ndvi.py` in the `NdviIndicator` class, after `submit_batch`:
491
+
492
+ ```python
493
+ async def harvest(
494
+ self, aoi: AOI, time_range: TimeRange, season_months: list[int] | None = None,
495
+ batch_jobs: list | None = None,
496
+ ) -> IndicatorResult:
497
+ """Download completed batch job results and compute NDVI statistics."""
498
+ current_job, baseline_job, true_color_job = batch_jobs
499
+
500
+ results_dir = tempfile.mkdtemp(prefix="aperture_ndvi_batch_")
501
+
502
+ # Download current NDVI — required
503
+ try:
504
+ current_dir = os.path.join(results_dir, "current")
505
+ paths = current_job.download_results(current_dir)
506
+ current_path = self._find_tif(paths, current_dir)
507
+ except Exception as exc:
508
+ logger.warning("NDVI current batch download failed: %s", exc)
509
+ return self._fallback(aoi, time_range)
510
+
511
+ # Download baseline — optional (degrades gracefully)
512
+ baseline_path = None
513
+ try:
514
+ baseline_dir = os.path.join(results_dir, "baseline")
515
+ paths = baseline_job.download_results(baseline_dir)
516
+ baseline_path = self._find_tif(paths, baseline_dir)
517
+ except Exception as exc:
518
+ logger.warning("NDVI baseline batch download failed, degrading: %s", exc)
519
+
520
+ # Download true-color — optional
521
+ true_color_path = None
522
+ try:
523
+ tc_dir = os.path.join(results_dir, "truecolor")
524
+ paths = true_color_job.download_results(tc_dir)
525
+ true_color_path = self._find_tif(paths, tc_dir)
526
+ except Exception as exc:
527
+ logger.warning("NDVI true-color batch download failed: %s", exc)
528
+
529
+ # Compute statistics
530
+ current_stats = self._compute_stats(current_path)
531
+ current_mean = current_stats["overall_mean"]
532
+
533
+ if baseline_path:
534
+ baseline_stats = self._compute_stats(baseline_path)
535
+ baseline_mean = baseline_stats["overall_mean"]
536
+ change = current_mean - baseline_mean
537
+ confidence = (
538
+ ConfidenceLevel.HIGH if current_stats["valid_months"] >= 6
539
+ else ConfidenceLevel.MODERATE if current_stats["valid_months"] >= 3
540
+ else ConfidenceLevel.LOW
541
+ )
542
+ chart_data = self._build_chart_data(
543
+ current_stats["monthly_means"],
544
+ baseline_stats["monthly_means"],
545
+ time_range,
546
+ )
547
+ else:
548
+ baseline_mean = current_mean
549
+ change = 0.0
550
+ confidence = ConfidenceLevel.LOW
551
+ chart_data = {
552
+ "dates": [f"{time_range.end.year}-{m+1:02d}" for m in range(len(current_stats["monthly_means"]))],
553
+ "values": [round(v, 3) for v in current_stats["monthly_means"]],
554
+ "label": "NDVI",
555
+ }
556
+
557
+ status = self._classify(change)
558
+ trend = self._compute_trend(change) if baseline_path else TrendDirection.STABLE
559
+
560
+ if abs(change) <= 0.05:
561
+ headline = f"Vegetation stable (NDVI {current_mean:.2f}, Δ{change:+.2f} vs baseline)"
562
+ elif change > 0:
563
+ headline = f"Vegetation greening (NDVI +{change:.2f} vs baseline)"
564
+ else:
565
+ headline = f"Vegetation decline (NDVI {change:.2f} vs baseline)"
566
+
567
+ self._spatial_data = SpatialData(
568
+ map_type="raster", label="NDVI", colormap="RdYlGn",
569
+ vmin=-0.2, vmax=0.9,
570
+ )
571
+ self._indicator_raster_path = current_path
572
+ self._true_color_path = true_color_path
573
+ self._ndvi_peak_band = current_stats["peak_month_band"]
574
+ self._render_band = current_stats["peak_month_band"]
575
+
576
+ return IndicatorResult(
577
+ indicator_id=self.id,
578
+ headline=headline,
579
+ status=status,
580
+ trend=trend,
581
+ confidence=confidence,
582
+ map_layer_path=current_path,
583
+ chart_data=chart_data,
584
+ data_source="satellite",
585
+ summary=(
586
+ f"Mean NDVI is {current_mean:.3f} compared to a {BASELINE_YEARS}-year "
587
+ f"baseline of {baseline_mean:.3f} (Δ{change:+.3f}). "
588
+ f"Pixel-level analysis at {RESOLUTION_M}m resolution from "
589
+ f"{current_stats['valid_months']} monthly composites."
590
+ ),
591
+ methodology=(
592
+ f"Sentinel-2 L2A pixel-level NDVI = (B08 − B04) / (B08 + B04). "
593
+ f"Cloud-masked using SCL band (classes 4, 5, 6 retained). "
594
+ f"Monthly median composites at {RESOLUTION_M}m resolution. "
595
+ f"Baseline: {BASELINE_YEARS}-year monthly medians. "
596
+ f"Processed server-side via CDSE openEO batch jobs."
597
+ ),
598
+ limitations=[
599
+ f"Resampled to {RESOLUTION_M}m — sub-field variability not captured at this resolution.",
600
+ "Cloud cover reduces observation count in rainy seasons.",
601
+ "NDVI does not distinguish crop from natural vegetation.",
602
+ "Seasonal variation may mask long-term trends if analysis windows differ.",
603
+ ] + (["Baseline unavailable — change and trend not computed."] if not baseline_path else []),
604
+ )
605
+
606
+ @staticmethod
607
+ def _find_tif(download_paths: dict, fallback_dir: str) -> str:
608
+ """Find the GeoTIFF file from batch job download results."""
609
+ if download_paths:
610
+ for p in download_paths:
611
+ if str(p).endswith(".tif") or str(p).endswith(".tiff"):
612
+ return str(p)
613
+ # Fallback: look for any .tif in the directory
614
+ for f in os.listdir(fallback_dir):
615
+ if f.endswith(".tif") or f.endswith(".tiff"):
616
+ return os.path.join(fallback_dir, f)
617
+ raise FileNotFoundError(f"No GeoTIFF found in {fallback_dir}")
618
+ ```
619
+
620
+ - [ ] **Step 4: Run tests to verify they pass**
621
+
622
+ Run: `pytest tests/test_indicator_ndvi.py -k harvest -v`
623
+ Expected: All 3 harvest tests PASS
624
+
625
+ - [ ] **Step 5: Run full suite**
626
+
627
+ Run: `pytest tests/ -x -q`
628
+ Expected: All tests pass (existing tests unaffected since `process()` is unchanged)
629
+
630
+ - [ ] **Step 6: Commit**
631
+
632
+ ```bash
633
+ git add app/indicators/ndvi.py tests/test_indicator_ndvi.py
634
+ git commit -m "feat: implement NdviIndicator.harvest() with graceful degradation"
635
+ ```
636
+
637
+ ---
638
+
639
+ ### Task 5: Rewrite worker `process_job()` with three-phase flow
640
+
641
+ **Files:**
642
+ - Modify: `app/worker.py` (rewrite `process_job`)
643
+ - Test: `tests/test_worker.py` (add batch worker test)
644
+
645
+ - [ ] **Step 1: Write the failing test**
646
+
647
+ Add to `tests/test_worker.py`:
648
+
649
+ ```python
650
+ class MockBatchIndicator(BaseIndicator):
651
+ """Batch indicator for testing the three-phase worker."""
652
+ id = "ndvi"
653
+ name = "Vegetation (NDVI)"
654
+ category = "D2"
655
+ question = "Is vegetation cover declining?"
656
+ estimated_minutes = 8
657
+ uses_batch = True
658
+
659
+ async def process(self, aoi, time_range, season_months=None):
660
+ return IndicatorResult(
661
+ indicator_id="ndvi", headline="placeholder",
662
+ status=StatusLevel.GREEN, trend=TrendDirection.STABLE,
663
+ confidence=ConfidenceLevel.LOW, map_layer_path="",
664
+ chart_data={"dates": ["2025"], "values": [0.3], "label": "NDVI"},
665
+ data_source="placeholder",
666
+ summary="Fallback.", methodology="Placeholder.", limitations=[],
667
+ )
668
+
669
+ async def submit_batch(self, aoi, time_range, season_months=None):
670
+ mock_job = MagicMock()
671
+ mock_job.job_id = "j-test"
672
+ mock_job.status.return_value = "finished"
673
+ return [mock_job, mock_job, mock_job]
674
+
675
+ async def harvest(self, aoi, time_range, season_months=None, batch_jobs=None):
676
+ return IndicatorResult(
677
+ indicator_id="ndvi", headline="Real NDVI data",
678
+ status=StatusLevel.GREEN, trend=TrendDirection.STABLE,
679
+ confidence=ConfidenceLevel.HIGH, map_layer_path="",
680
+ chart_data={"dates": ["2025-01"], "values": [0.45], "label": "NDVI"},
681
+ data_source="satellite",
682
+ summary="Real.", methodology="Sentinel-2.", limitations=[],
683
+ )
684
+
685
+
686
+ @pytest.mark.asyncio
687
+ async def test_process_job_uses_batch_flow(temp_db_path):
688
+ """Worker uses submit_batch → poll → harvest for batch indicators."""
689
+ db = Database(temp_db_path)
690
+ await db.init()
691
+ reg = IndicatorRegistry()
692
+ reg.register(MockBatchIndicator())
693
+ request = JobRequest(
694
+ aoi=AOI(name="Test", bbox=[32.45, 15.65, 32.65, 15.80]),
695
+ time_range=TimeRange(start=date(2025, 3, 1), end=date(2026, 3, 1)),
696
+ indicator_ids=["ndvi"],
697
+ email="test@example.com",
698
+ )
699
+ job_id = await db.create_job(request)
700
+ await process_job(job_id, db, reg)
701
+ job = await db.get_job(job_id)
702
+ assert job.status == JobStatus.COMPLETE
703
+ assert len(job.results) == 1
704
+ assert job.results[0].data_source == "satellite"
705
+ assert job.results[0].headline == "Real NDVI data"
706
+
707
+
708
+ @pytest.mark.asyncio
709
+ async def test_process_job_mixes_batch_and_process(temp_db_path):
710
+ """Worker handles batch and non-batch indicators in the same job."""
711
+ db = Database(temp_db_path)
712
+ await db.init()
713
+ reg = IndicatorRegistry()
714
+ reg.register(MockBatchIndicator())
715
+ reg.register(MockFiresIndicator())
716
+ request = JobRequest(
717
+ aoi=AOI(name="Test", bbox=[32.45, 15.65, 32.65, 15.80]),
718
+ time_range=TimeRange(start=date(2025, 3, 1), end=date(2026, 3, 1)),
719
+ indicator_ids=["ndvi", "fires"],
720
+ email="test@example.com",
721
+ )
722
+ job_id = await db.create_job(request)
723
+ await process_job(job_id, db, reg)
724
+ job = await db.get_job(job_id)
725
+ assert job.status == JobStatus.COMPLETE
726
+ assert len(job.results) == 2
727
+
728
+ ndvi_result = next(r for r in job.results if r.indicator_id == "ndvi")
729
+ fires_result = next(r for r in job.results if r.indicator_id == "fires")
730
+ assert ndvi_result.data_source == "satellite"
731
+ assert fires_result.headline == "3 fire events detected"
732
+
733
+
734
+ @pytest.mark.asyncio
735
+ async def test_process_job_batch_submit_failure_falls_back(temp_db_path):
736
+ """If submit_batch() fails, worker falls back to process()."""
737
+
738
+ class FailingBatchIndicator(MockBatchIndicator):
739
+ async def submit_batch(self, aoi, time_range, season_months=None):
740
+ raise ConnectionError("CDSE unreachable")
741
+
742
+ db = Database(temp_db_path)
743
+ await db.init()
744
+ reg = IndicatorRegistry()
745
+ reg.register(FailingBatchIndicator())
746
+ request = JobRequest(
747
+ aoi=AOI(name="Test", bbox=[32.45, 15.65, 32.65, 15.80]),
748
+ time_range=TimeRange(start=date(2025, 3, 1), end=date(2026, 3, 1)),
749
+ indicator_ids=["ndvi"],
750
+ email="test@example.com",
751
+ )
752
+ job_id = await db.create_job(request)
753
+ await process_job(job_id, db, reg)
754
+ job = await db.get_job(job_id)
755
+ assert job.status == JobStatus.COMPLETE
756
+ assert job.results[0].data_source == "placeholder"
757
+ ```
758
+
759
+ - [ ] **Step 2: Run tests to verify they fail**
760
+
761
+ Run: `pytest tests/test_worker.py -k batch -v`
762
+ Expected: FAIL — current `process_job` doesn't call `submit_batch` or `harvest`
763
+
764
+ - [ ] **Step 3: Write the implementation**
765
+
766
+ Replace `process_job` in `app/worker.py` (lines 47-207):
767
+
768
+ ```python
769
+ BATCH_POLL_INTERVAL = 30 # seconds between status checks
770
+ BATCH_TIMEOUT = 1200 # 20 minutes maximum wait
771
+
772
+
773
+ async def process_job(job_id: str, db: Database, registry: IndicatorRegistry) -> None:
774
+ job = await db.get_job(job_id)
775
+ if job is None:
776
+ logger.error(f"Job {job_id} not found")
777
+ return
778
+ await db.update_job_status(job_id, JobStatus.PROCESSING)
779
+ try:
780
+ spatial_cache = {}
781
+
782
+ # Separate batch vs non-batch indicators
783
+ batch_indicators = {}
784
+ process_indicators = []
785
+ for indicator_id in job.request.indicator_ids:
786
+ indicator = registry.get(indicator_id)
787
+ if indicator.uses_batch:
788
+ batch_indicators[indicator_id] = indicator
789
+ else:
790
+ process_indicators.append((indicator_id, indicator))
791
+
792
+ # ── Phase 1: Submit batch jobs ──
793
+ batch_submissions = {} # {indicator_id: list[BatchJob]}
794
+ fallback_ids = set() # indicators that failed to submit
795
+ for indicator_id, indicator in batch_indicators.items():
796
+ await db.update_job_progress(job_id, indicator_id, "submitting")
797
+ try:
798
+ jobs = await indicator.submit_batch(
799
+ job.request.aoi,
800
+ job.request.time_range,
801
+ season_months=job.request.season_months(),
802
+ )
803
+ batch_submissions[indicator_id] = jobs
804
+ await db.update_job_progress(job_id, indicator_id, "processing on CDSE")
805
+ except Exception as exc:
806
+ logger.warning("Batch submit failed for %s, will use fallback: %s", indicator_id, exc)
807
+ fallback_ids.add(indicator_id)
808
+
809
+ # ── Phase 2: Poll until all batch jobs finish ──
810
+ import time
811
+ poll_start = time.monotonic()
812
+ pending = dict(batch_submissions)
813
+
814
+ while pending:
815
+ elapsed = time.monotonic() - poll_start
816
+ if elapsed >= BATCH_TIMEOUT:
817
+ logger.warning("Batch poll timeout after %.0fs, remaining: %s", elapsed, list(pending.keys()))
818
+ fallback_ids.update(pending.keys())
819
+ break
820
+
821
+ await asyncio.sleep(BATCH_POLL_INTERVAL)
822
+
823
+ for indicator_id in list(pending.keys()):
824
+ jobs = pending[indicator_id]
825
+ statuses = [j.status() for j in jobs]
826
+ if all(s == "finished" for s in statuses):
827
+ logger.info("Batch jobs finished for %s", indicator_id)
828
+ del pending[indicator_id]
829
+ elif any(s in ("error", "canceled") for s in statuses):
830
+ logger.warning("Batch job failed for %s: %s", indicator_id, statuses)
831
+ del pending[indicator_id]
832
+ # Don't add to fallback — harvest() handles partial failure
833
+
834
+ # ── Phase 3: Harvest batch results + process non-batch indicators ──
835
+ for indicator_id in job.request.indicator_ids:
836
+ indicator = registry.get(indicator_id)
837
+
838
+ if indicator_id in fallback_ids:
839
+ # Submit failed or timed out — use process() fallback
840
+ await db.update_job_progress(job_id, indicator_id, "processing")
841
+ result = await indicator.process(
842
+ job.request.aoi,
843
+ job.request.time_range,
844
+ season_months=job.request.season_months(),
845
+ )
846
+ elif indicator_id in batch_submissions:
847
+ # Harvest batch results
848
+ await db.update_job_progress(job_id, indicator_id, "downloading")
849
+ try:
850
+ result = await indicator.harvest(
851
+ job.request.aoi,
852
+ job.request.time_range,
853
+ season_months=job.request.season_months(),
854
+ batch_jobs=batch_submissions[indicator_id],
855
+ )
856
+ except Exception as exc:
857
+ logger.warning("Harvest failed for %s, using fallback: %s", indicator_id, exc)
858
+ result = await indicator.process(
859
+ job.request.aoi,
860
+ job.request.time_range,
861
+ season_months=job.request.season_months(),
862
+ )
863
+ else:
864
+ # Non-batch indicator — use process() directly
865
+ await db.update_job_progress(job_id, indicator_id, "processing")
866
+ result = await indicator.process(
867
+ job.request.aoi,
868
+ job.request.time_range,
869
+ season_months=job.request.season_months(),
870
+ )
871
+
872
+ spatial = indicator.get_spatial_data()
873
+ if spatial is not None:
874
+ spatial_cache[indicator_id] = spatial
875
+
876
+ await db.save_job_result(job_id, result)
877
+ await db.update_job_progress(job_id, indicator_id, "complete")
878
+
879
+ # ── Generate outputs (unchanged) ──
880
+ job = await db.get_job(job_id)
881
+ results_dir = os.path.join("results", job_id)
882
+ os.makedirs(results_dir, exist_ok=True)
883
+
884
+ output_files = []
885
+
886
+ for result in job.results:
887
+ chart_path = os.path.join(results_dir, f"{result.indicator_id}_chart.png")
888
+ render_timeseries_chart(
889
+ chart_data=result.chart_data,
890
+ indicator_name=_indicator_label(result.indicator_id),
891
+ status=result.status,
892
+ trend=result.trend,
893
+ output_path=chart_path,
894
+ )
895
+ output_files.append(chart_path)
896
+
897
+ spatial = spatial_cache.get(result.indicator_id)
898
+ map_path = os.path.join(results_dir, f"{result.indicator_id}_map.png")
899
+
900
+ if spatial is not None and spatial.map_type == "raster":
901
+ indicator_obj = registry.get(result.indicator_id)
902
+ raster_path = getattr(indicator_obj, '_indicator_raster_path', None)
903
+ true_color_path = getattr(indicator_obj, '_true_color_path', None)
904
+ render_band = getattr(indicator_obj, '_render_band', 1)
905
+ from app.outputs.maps import render_raster_map
906
+ render_raster_map(
907
+ true_color_path=true_color_path,
908
+ indicator_path=raster_path,
909
+ indicator_band=render_band,
910
+ aoi=job.request.aoi,
911
+ status=result.status,
912
+ output_path=map_path,
913
+ cmap=spatial.colormap,
914
+ vmin=spatial.vmin,
915
+ vmax=spatial.vmax,
916
+ label=spatial.label,
917
+ )
918
+ elif spatial is not None:
919
+ render_indicator_map(
920
+ spatial=spatial,
921
+ aoi=job.request.aoi,
922
+ status=result.status,
923
+ output_path=map_path,
924
+ )
925
+ else:
926
+ render_status_map(
927
+ aoi=job.request.aoi,
928
+ status=result.status,
929
+ output_path=map_path,
930
+ )
931
+ output_files.append(map_path)
932
+
933
+ spatial_json_path = os.path.join(results_dir, f"{result.indicator_id}_spatial.json")
934
+ _save_spatial_json(spatial, result.status.value, spatial_json_path)
935
+
936
+ indicator_map_paths = {}
937
+ for result in job.results:
938
+ mp = os.path.join(results_dir, f"{result.indicator_id}_map.png")
939
+ if os.path.exists(mp):
940
+ indicator_map_paths[result.indicator_id] = mp
941
+
942
+ from app.models import StatusLevel
943
+ worst_status = max(
944
+ (r.status for r in job.results),
945
+ key=lambda s: [StatusLevel.GREEN, StatusLevel.AMBER, StatusLevel.RED].index(s),
946
+ )
947
+ summary_map_path = os.path.join(results_dir, "summary_map.png")
948
+ render_status_map(aoi=job.request.aoi, status=worst_status, output_path=summary_map_path)
949
+ output_files.append(summary_map_path)
950
+
951
+ overview_score = compute_composite_score(job.results)
952
+
953
+ overview_score_path = os.path.join(results_dir, "overview_score.json")
954
+ write_overview_score(overview_score, overview_score_path)
955
+ output_files.append(overview_score_path)
956
+
957
+ overview_map_path = os.path.join(results_dir, "overview_map.png")
958
+ true_color_path = None
959
+ for ind_id in job.request.indicator_ids:
960
+ ind_obj = registry.get(ind_id)
961
+ tc = getattr(ind_obj, '_true_color_path', None)
962
+ if tc and os.path.exists(tc):
963
+ true_color_path = tc
964
+ break
965
+
966
+ if true_color_path:
967
+ render_overview_map(
968
+ true_color_path=true_color_path,
969
+ aoi=job.request.aoi,
970
+ output_path=overview_map_path,
971
+ title=f"{job.request.aoi.name} — Satellite Overview",
972
+ date_range=f"{job.request.time_range.start} to {job.request.time_range.end}",
973
+ )
974
+ output_files.append(overview_map_path)
975
+
976
+ report_path = os.path.join(results_dir, "report.pdf")
977
+ generate_pdf_report(
978
+ aoi=job.request.aoi,
979
+ time_range=job.request.time_range,
980
+ results=job.results,
981
+ output_path=report_path,
982
+ summary_map_path=summary_map_path,
983
+ indicator_map_paths=indicator_map_paths,
984
+ overview_score=overview_score,
985
+ overview_map_path=overview_map_path if true_color_path else "",
986
+ )
987
+ output_files.append(report_path)
988
+
989
+ package_path = os.path.join(results_dir, "package.zip")
990
+ create_data_package(files=output_files, output_path=package_path)
991
+
992
+ await db.update_job_status(job_id, JobStatus.COMPLETE)
993
+
994
+ await send_completion_email(
995
+ to_email=job.request.email,
996
+ job_id=job_id,
997
+ aoi_name=job.request.aoi.name,
998
+ )
999
+ except Exception as e:
1000
+ logger.exception(f"Job {job_id} failed: {e}")
1001
+ await db.update_job_status(job_id, JobStatus.FAILED, error=str(e))
1002
+ ```
1003
+
1004
+ Also add `import time` to the top imports if not already present (it's used inside the function but importing at module level is cleaner).
1005
+
1006
+ - [ ] **Step 4: Run batch worker tests**
1007
+
1008
+ Run: `pytest tests/test_worker.py -v`
1009
+ Expected: All tests PASS (existing + 3 new batch tests)
1010
+
1011
+ - [ ] **Step 5: Run full suite**
1012
+
1013
+ Run: `pytest tests/ -x -q`
1014
+ Expected: All tests pass
1015
+
1016
+ - [ ] **Step 6: Commit**
1017
+
1018
+ ```bash
1019
+ git add app/worker.py tests/test_worker.py
1020
+ git commit -m "feat: three-phase batch job worker (submit → poll → harvest)"
1021
+ ```
1022
+
1023
+ ---
1024
+
1025
+ ### Task 6: Update E2E test for batch flow
1026
+
1027
+ **Files:**
1028
+ - Modify: `tests/test_ndvi_e2e.py` (add batch E2E test)
1029
+
1030
+ - [ ] **Step 1: Add batch E2E test**
1031
+
1032
+ Add to `tests/test_ndvi_e2e.py`:
1033
+
1034
+ ```python
1035
+ @pytest.mark.asyncio
1036
+ async def test_ndvi_batch_pipeline():
1037
+ """Batch pipeline: submit → harvest → render map → render chart."""
1038
+ from app.indicators.ndvi import NdviIndicator
1039
+ import shutil
1040
+
1041
+ aoi = AOI(name="Khartoum Batch", bbox=BBOX)
1042
+ time_range = TimeRange(start=date(2025, 3, 1), end=date(2026, 3, 1))
1043
+
1044
+ with tempfile.TemporaryDirectory() as tmpdir:
1045
+ ndvi_path = os.path.join(tmpdir, "ndvi.tif")
1046
+ rgb_path = os.path.join(tmpdir, "rgb.tif")
1047
+ _write_ndvi_tif(ndvi_path)
1048
+ _write_rgb_tif(rgb_path)
1049
+
1050
+ mock_conn = MagicMock()
1051
+
1052
+ def make_mock_job(src_path):
1053
+ job = MagicMock()
1054
+ job.job_id = "j-e2e"
1055
+ job.status.return_value = "finished"
1056
+ def fake_download_results(target):
1057
+ os.makedirs(target, exist_ok=True)
1058
+ dest = os.path.join(target, "result.tif")
1059
+ shutil.copy(src_path, dest)
1060
+ from pathlib import Path
1061
+ return {Path(dest): {"type": "image/tiff"}}
1062
+ job.download_results.side_effect = fake_download_results
1063
+ return job
1064
+
1065
+ mock_ndvi_job = make_mock_job(ndvi_path)
1066
+ mock_tc_job = make_mock_job(rgb_path)
1067
+ mock_conn.create_job.return_value = mock_ndvi_job
1068
+
1069
+ with patch("app.indicators.ndvi.get_connection", return_value=mock_conn), \
1070
+ patch("app.indicators.ndvi.build_ndvi_graph", return_value=MagicMock()), \
1071
+ patch("app.indicators.ndvi.build_true_color_graph", return_value=MagicMock()), \
1072
+ patch("app.indicators.ndvi.submit_as_batch") as mock_submit:
1073
+
1074
+ mock_submit.side_effect = [
1075
+ make_mock_job(ndvi_path), # current
1076
+ make_mock_job(ndvi_path), # baseline
1077
+ make_mock_job(rgb_path), # true-color
1078
+ ]
1079
+
1080
+ indicator = NdviIndicator()
1081
+
1082
+ # Phase 1: submit
1083
+ jobs = await indicator.submit_batch(aoi, time_range)
1084
+ assert len(jobs) == 3
1085
+
1086
+ # Phase 3: harvest
1087
+ result = await indicator.harvest(aoi, time_range, batch_jobs=jobs)
1088
+
1089
+ assert result.indicator_id == "ndvi"
1090
+ assert result.data_source == "satellite"
1091
+ assert len(result.chart_data["dates"]) >= 6
1092
+
1093
+ # Render the raster map
1094
+ map_out = os.path.join(tmpdir, "ndvi_map.png")
1095
+ raster_path = indicator._indicator_raster_path
1096
+ tc_path = indicator._true_color_path
1097
+ peak = indicator._ndvi_peak_band
1098
+
1099
+ render_raster_map(
1100
+ true_color_path=tc_path,
1101
+ indicator_path=raster_path,
1102
+ indicator_band=peak,
1103
+ aoi=aoi,
1104
+ status=result.status,
1105
+ output_path=map_out,
1106
+ cmap="RdYlGn",
1107
+ vmin=-0.2,
1108
+ vmax=0.9,
1109
+ label="NDVI",
1110
+ )
1111
+ assert os.path.exists(map_out)
1112
+ assert os.path.getsize(map_out) > 10000
1113
+
1114
+ # Render the chart
1115
+ chart_out = os.path.join(tmpdir, "ndvi_chart.png")
1116
+ render_timeseries_chart(
1117
+ chart_data=result.chart_data,
1118
+ indicator_name="Vegetation (NDVI)",
1119
+ status=result.status,
1120
+ trend=result.trend,
1121
+ output_path=chart_out,
1122
+ y_label="NDVI",
1123
+ )
1124
+ assert os.path.exists(chart_out)
1125
+ assert os.path.getsize(chart_out) > 5000
1126
+ ```
1127
+
1128
+ - [ ] **Step 2: Run test**
1129
+
1130
+ Run: `pytest tests/test_ndvi_e2e.py::test_ndvi_batch_pipeline -v`
1131
+ Expected: PASS
1132
+
1133
+ - [ ] **Step 3: Run full suite**
1134
+
1135
+ Run: `pytest tests/ -x -q`
1136
+ Expected: All tests pass
1137
+
1138
+ - [ ] **Step 4: Commit**
1139
+
1140
+ ```bash
1141
+ git add tests/test_ndvi_e2e.py
1142
+ git commit -m "test: add batch flow E2E test for NDVI pipeline"
1143
+ ```
1144
+
1145
+ ---
1146
+
1147
+ ### Task 7: Deploy and verify on HF Space
1148
+
1149
+ **Files:**
1150
+ - No code changes — deployment and live verification
1151
+
1152
+ - [ ] **Step 1: Push to both remotes**
1153
+
1154
+ ```bash
1155
+ git push origin main && git push hf main
1156
+ ```
1157
+
1158
+ - [ ] **Step 2: Wait for Space rebuild**
1159
+
1160
+ Poll until the Space is running with the new SHA:
1161
+
1162
+ ```bash
1163
+ python3 -c "
1164
+ from huggingface_hub import HfApi
1165
+ api = HfApi()
1166
+ rt = api.get_space_runtime('MERLx/Aperture')
1167
+ print('Stage:', rt.stage, 'SHA:', rt.raw.get('sha', 'unknown'))
1168
+ "
1169
+ ```
1170
+
1171
+ Wait until `Stage: RUNNING` with the latest commit SHA.
1172
+
1173
+ - [ ] **Step 3: Submit NDVI test job**
1174
+
1175
+ ```bash
1176
+ # Get auth token
1177
+ curl -s -X POST https://merlx-aperture.hf.space/api/auth/request \
1178
+ -H 'Content-Type: application/json' -d '{"email":"test@aperture.dev"}'
1179
+
1180
+ # Submit job (use the demo_token from response)
1181
+ AUTH="Bearer test@aperture.dev:<token>"
1182
+ curl -s -X POST https://merlx-aperture.hf.space/api/jobs \
1183
+ -H "Content-Type: application/json" \
1184
+ -H "Authorization: $AUTH" \
1185
+ -d '{
1186
+ "aoi": {"name": "Khartoum Batch Test", "bbox": [32.52, 15.58, 32.57, 15.63]},
1187
+ "time_range": {"start": "2025-01-01", "end": "2025-06-30"},
1188
+ "indicator_ids": ["ndvi"],
1189
+ "email": "test@aperture.dev"
1190
+ }'
1191
+ ```
1192
+
1193
+ - [ ] **Step 4: Poll until complete**
1194
+
1195
+ ```bash
1196
+ curl -s https://merlx-aperture.hf.space/api/jobs/<job_id> \
1197
+ -H "Authorization: $AUTH" | python3 -m json.tool
1198
+ ```
1199
+
1200
+ Poll every 2 minutes. Check progress states: `submitting` → `processing on CDSE` → `downloading` → `complete`.
1201
+
1202
+ - [ ] **Step 5: Verify real data**
1203
+
1204
+ Check the response for:
1205
+ - `data_source` is `"satellite"` (not `"placeholder"`)
1206
+ - `confidence` is `"high"` or `"moderate"` (not `"low"`)
1207
+ - `methodology` contains `"batch jobs"`
1208
+ - `chart_data.dates` has monthly entries
1209
+ - `chart_data.values` has realistic NDVI values (0.1–0.8 range)
1210
+
1211
+ ---
1212
+
1213
+ ### Task 8: Clean up diagnostic endpoint
1214
+
1215
+ **Files:**
1216
+ - Modify: `app/main.py` (remove `/api/debug/cdse-status`)
1217
+
1218
+ - [ ] **Step 1: Remove diagnostic endpoint**
1219
+
1220
+ Delete the entire `@app.get("/api/debug/cdse-status")` function from `app/main.py` (the block added during debugging).
1221
+
1222
+ - [ ] **Step 2: Run tests**
1223
+
1224
+ Run: `pytest tests/ -x -q`
1225
+ Expected: All tests pass
1226
+
1227
+ - [ ] **Step 3: Commit**
1228
+
1229
+ ```bash
1230
+ git add app/main.py
1231
+ git commit -m "chore: remove temporary CDSE diagnostic endpoint"
1232
+ ```
1233
+
1234
+ - [ ] **Step 4: Push to both remotes**
1235
+
1236
+ ```bash
1237
+ git push origin main && git push hf main
1238
+ ```
1239
+
1240
+ ---
1241
+
1242
+ ### Task 9: Restore BASELINE_YEARS to 5
1243
+
1244
+ **Files:**
1245
+ - Modify: `app/indicators/ndvi.py` (change constant)
1246
+
1247
+ - [ ] **Step 1: Update constant**
1248
+
1249
+ In `app/indicators/ndvi.py`, change:
1250
+
1251
+ ```python
1252
+ BASELINE_YEARS = 1
1253
+ ```
1254
+
1255
+ to:
1256
+
1257
+ ```python
1258
+ BASELINE_YEARS = 5
1259
+ ```
1260
+
1261
+ - [ ] **Step 2: Run tests**
1262
+
1263
+ Run: `pytest tests/ -x -q`
1264
+ Expected: All tests pass (tests don't depend on this value)
1265
+
1266
+ - [ ] **Step 3: Commit and push**
1267
+
1268
+ ```bash
1269
+ git add app/indicators/ndvi.py
1270
+ git commit -m "feat: restore 5-year NDVI baseline now that batch jobs handle the load"
1271
+ git push origin main && git push hf main
1272
+ ```