File size: 19,665 Bytes
ae74af5
 
bc642b5
ae74af5
 
f491e48
ae74af5
 
df6bf75
ae74af5
df6bf75
ae74af5
 
bc642b5
8202626
 
ae74af5
 
 
 
f491e48
a6638ce
f491e48
ae74af5
5bf6898
 
 
 
 
 
 
 
 
 
d2a9a16
 
 
 
 
 
 
bc642b5
 
d2a9a16
 
 
 
ef89b38
 
 
 
 
 
 
d2a9a16
ef89b38
d2a9a16
 
 
 
 
 
 
 
 
ef89b38
 
 
 
 
 
 
 
 
 
 
 
 
 
bc642b5
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
df6bf75
ae74af5
 
 
 
 
 
85f7c19
 
df6bf75
 
 
749b346
df6bf75
 
 
f491e48
df6bf75
f491e48
df6bf75
 
ad28ab3
df6bf75
 
c42be1f
 
 
 
 
df6bf75
 
ad28ab3
6ccfecb
 
c42be1f
 
 
 
 
 
df6bf75
c42be1f
 
df6bf75
c42be1f
 
df6bf75
c42be1f
 
 
 
 
df6bf75
c42be1f
 
 
df6bf75
 
c42be1f
 
 
df6bf75
c42be1f
 
 
 
 
 
 
 
 
df6bf75
f491e48
85f7c19
c42be1f
 
 
 
 
 
df6bf75
 
c42be1f
 
 
 
 
 
df6bf75
85f7c19
df6bf75
 
ad28ab3
df6bf75
85f7c19
df6bf75
 
 
 
ad28ab3
 
 
 
df6bf75
ad28ab3
df6bf75
ae74af5
df6bf75
85f7c19
ae74af5
 
 
 
 
 
 
85f7c19
ae74af5
749b346
ae74af5
 
749b346
ae74af5
 
 
 
 
 
bc642b5
749b346
 
1ee8d52
 
4939573
749b346
df6bf75
 
 
1ee8d52
 
 
 
4939573
1ee8d52
 
 
 
4939573
 
1ee8d52
 
 
85f7c19
bc642b5
85f7c19
 
 
 
bc642b5
 
 
 
 
 
 
 
 
749b346
d2a9a16
 
 
 
 
 
85f7c19
1e8f4b0
 
df6bf75
1e8f4b0
749b346
df6bf75
 
 
1e8f4b0
 
749b346
1e8f4b0
df6bf75
1e8f4b0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
749b346
1e8f4b0
749b346
1e8f4b0
 
b0128ec
 
 
 
 
 
1e8f4b0
 
b0128ec
 
 
 
 
 
 
 
 
 
1e8f4b0
 
 
b0128ec
 
749b346
df6bf75
1e8f4b0
749b346
1e8f4b0
b0128ec
 
 
 
 
 
1e8f4b0
 
7daf007
1e8f4b0
7daf007
1e8f4b0
 
 
 
 
 
7daf007
1e8f4b0
 
 
 
 
 
 
 
 
 
7daf007
 
df6bf75
7daf007
 
749b346
1e8f4b0
 
 
 
 
 
 
1da14bf
df6bf75
1da14bf
749b346
1da14bf
749b346
1da14bf
 
 
 
 
 
 
 
 
 
 
8202626
 
 
 
 
 
 
df6bf75
8202626
 
749b346
df6bf75
 
8202626
 
 
 
 
 
 
 
 
 
 
 
 
 
ae74af5
 
 
 
 
 
 
1da14bf
df6bf75
 
8202626
 
1e8f4b0
ae74af5
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
df6bf75
ae74af5
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
from __future__ import annotations
import asyncio
import json
import logging
import os
import time
import traceback
from app.database import Database
from app.eo_products.base import ProductRegistry
from app.models import JobStatus
from app.outputs.report import generate_pdf_report, _product_label
from app.outputs.package import create_data_package
from app.outputs.charts import render_timeseries_chart
from app.outputs.maps import render_indicator_map, render_status_map
from app.outputs.overview import compute_composite_score, write_overview_score
from app.outputs.maps import render_overview_map
from app.core.email import send_completion_email

logger = logging.getLogger(__name__)

BATCH_POLL_INTERVAL = 30  # seconds between status checks
BATCH_TIMEOUT = 5400  # 90 minutes per indicator


class _SkippedJob:
    """Stub for batch jobs that didn't finish — fails fast on download."""
    def __init__(self, job_id: str):
        self.job_id = job_id
    def download_results(self, *a, **kw):
        raise RuntimeError(f"Job {self.job_id} not finished, skipping download")
    def status(self):
        return "skipped"


def _save_spatial_json(spatial, status_value: str, path: str, product_obj=None) -> None:
    """Serialize spatial data to JSON for the frontend.

    Raster-type spatial data is downsampled into the same "grid" format
    the frontend already knows how to render — so the dashboard map
    shows the actual indicator overlay instead of just a tinted AOI box.
    """
    if spatial is None:
        obj = {"map_type": "status", "status": status_value}
    elif spatial.map_type == "raster" and product_obj is not None:
        from app.outputs.spatial_web import raster_to_grid_dict
        raster_path = getattr(product_obj, "_product_raster_path", None)
        render_band = getattr(product_obj, "_render_band", 1)
        logger.info(
            "Serializing raster spatial for %s: path=%s band=%s exists=%s",
            getattr(product_obj, "id", "?"),
            raster_path,
            render_band,
            os.path.exists(raster_path) if raster_path else False,
        )
        grid = None
        if raster_path and os.path.exists(raster_path):
            grid = raster_to_grid_dict(
                raster_path,
                band=render_band,
                spatial=spatial,
                status_value=status_value,
            )
        if grid is not None:
            obj = grid
        else:
            # Fall back to a descriptive payload so the frontend can still
            # show the legend + color hint instead of blanking the map.
            logger.warning(
                "No grid spatial for %s — falling back to no-overlay payload",
                getattr(product_obj, "id", "?"),
            )
            obj = {
                "map_type": "raster-unavailable",
                "status": status_value,
                "label": spatial.label,
                "colormap": spatial.colormap,
                "vmin": spatial.vmin,
                "vmax": spatial.vmax,
            }
    elif spatial.map_type == "grid":
        obj = {
            "map_type": "grid",
            "status": status_value,
            "data": spatial.data.tolist(),
            "lats": spatial.lats.tolist(),
            "lons": spatial.lons.tolist(),
            "label": spatial.label,
            "colormap": spatial.colormap,
        }
    else:
        obj = {
            "map_type": spatial.map_type,
            "status": status_value,
            "geojson": spatial.geojson,
            "label": spatial.label,
            "colormap": spatial.colormap,
        }
    with open(path, "w") as f:
        json.dump(obj, f)


async def process_job(job_id: str, db: Database, registry: ProductRegistry) -> None:
    job = await db.get_job(job_id)
    if job is None:
        logger.error(f"Job {job_id} not found")
        return
    await db.update_job_status(job_id, JobStatus.PROCESSING)
    try:
        spatial_cache = {}

        # Separate batch vs non-batch EO products
        batch_products = {}
        process_products = []
        for product_id in job.request.product_ids:
            product = registry.get(product_id)
            if product.uses_batch:
                batch_products[product_id] = product
            else:
                process_products.append((product_id, product))

        # -- Process batch EO products sequentially --
        for product_id, product in batch_products.items():
            # Submit
            await db.update_job_progress(job_id, product_id, "submitting")
            jobs = await product.submit_batch(
                job.request.aoi,
                job.request.time_range,
                season_months=job.request.season_months(),
            )
            job_ids = [getattr(j, 'job_id', '?') for j in jobs]
            print(f"[Aperture] Submitted {product_id} batch jobs: {job_ids}")
            await db.update_job_progress(job_id, product_id, "processing on CDSE")

            # Poll — exit early once first job finishes + grace period for others
            GRACE_PERIOD = 600  # 10 min grace after first job finishes
            poll_start = time.monotonic()
            first_finished_at = None
            while True:
                elapsed = time.monotonic() - poll_start
                statuses = [j.status() for j in jobs]
                job_ids = [getattr(j, 'job_id', '?') for j in jobs]
                print(f"[Aperture] Poll {product_id} ({elapsed:.0f}s): {list(zip(job_ids, statuses))}")

                if all(s == "finished" for s in statuses):
                    logger.info("Batch jobs finished for %s", product_id)
                    break
                elif any(s in ("error", "canceled") for s in statuses):
                    logger.warning("Batch job failed for %s: %s", product_id, statuses)
                    break

                # Track when first job finishes
                if first_finished_at is None and any(s == "finished" for s in statuses):
                    first_finished_at = time.monotonic()
                    print(f"[Aperture] {product_id}: first job finished, {GRACE_PERIOD}s grace for remaining")

                # Grace period: once any job finished, give others 10 min then harvest partial
                if first_finished_at and (time.monotonic() - first_finished_at) >= GRACE_PERIOD:
                    logger.info("Grace period expired for %s, harvesting partial results", product_id)
                    print(f"[Aperture] {product_id}: grace period expired, proceeding with partial results")
                    break

                if elapsed >= BATCH_TIMEOUT:
                    logger.warning("Batch poll timeout after %.0fs for %s", elapsed, product_id)
                    break

                await asyncio.sleep(BATCH_POLL_INTERVAL)

            # Harvest if any job finished (harvest methods handle per-job failures)
            any_finished = any(s == "finished" for s in statuses)
            if not any_finished:
                failed_statuses = list(zip(job_ids, statuses))
                raise RuntimeError(
                    f"All batch jobs failed for {product_id}: {failed_statuses}"
                )

            # Wrap non-finished jobs so download_results() fails fast
            # instead of blocking for 30 min on a still-running job
            harvest_jobs = [
                j if s == "finished" else _SkippedJob(getattr(j, 'job_id', '?'))
                for j, s in zip(jobs, statuses)
            ]
            await db.update_job_progress(job_id, product_id, "downloading")
            result = await product.harvest(
                job.request.aoi,
                job.request.time_range,
                season_months=job.request.season_months(),
                batch_jobs=harvest_jobs,
            )

            spatial = product.get_spatial_data()
            if spatial is not None:
                spatial_cache[product_id] = spatial
            print(f"[Aperture] Saving result for {product_id}: data_source={result.data_source}, headline={result.headline[:60]}")
            await db.save_job_result(job_id, result)
            await db.update_job_progress(job_id, product_id, "complete")

        # -- Process non-batch EO products --
        for product_id, product in process_products:
            await db.update_job_progress(job_id, product_id, "processing")
            result = await product.process(
                job.request.aoi,
                job.request.time_range,
                season_months=job.request.season_months(),
            )
            spatial = product.get_spatial_data()
            if spatial is not None:
                spatial_cache[product_id] = spatial
            await db.save_job_result(job_id, result)
            await db.update_job_progress(job_id, product_id, "complete")

        # Generate outputs
        job = await db.get_job(job_id)
        results_dir = os.path.join("results", job_id)
        os.makedirs(results_dir, exist_ok=True)

        output_files = []

        # Generate charts and maps for each result
        for result in job.results:
            chart_path = os.path.join(results_dir, f"{result.product_id}_chart.png")
            render_timeseries_chart(
                chart_data=result.chart_data,
                product_name=_product_label(result.product_id),
                status=result.status,
                trend=result.trend,
                output_path=chart_path,
            )
            output_files.append(chart_path)

            # Generate map PNG for every indicator
            spatial = spatial_cache.get(result.product_id)
            map_path = os.path.join(results_dir, f"{result.product_id}_map.png")

            if spatial is not None and spatial.map_type == "raster":
                # Raster-on-true-color rendering for openEO/download indicators
                product_obj = registry.get(result.product_id)
                raster_path = getattr(product_obj, '_product_raster_path', None)
                true_color_path = getattr(product_obj, '_true_color_path', None)
                render_band = getattr(product_obj, '_render_band', 1)
                from app.outputs.maps import render_raster_map
                render_raster_map(
                    true_color_path=true_color_path,
                    indicator_path=raster_path,
                    indicator_band=render_band,
                    aoi=job.request.aoi,
                    status=result.status,
                    output_path=map_path,
                    cmap=spatial.colormap,
                    vmin=spatial.vmin,
                    vmax=spatial.vmax,
                    label=spatial.label,
                )
            elif spatial is not None:
                render_indicator_map(
                    spatial=spatial,
                    aoi=job.request.aoi,
                    status=result.status,
                    output_path=map_path,
                )
            else:
                render_status_map(
                    aoi=job.request.aoi,
                    status=result.status,
                    output_path=map_path,
                )
            output_files.append(map_path)

            # Save spatial data as JSON for frontend
            spatial_json_path = os.path.join(results_dir, f"{result.product_id}_spatial.json")
            _save_spatial_json(
                spatial,
                result.status.value,
                spatial_json_path,
                product_obj=registry.get(result.product_id),
            )

        # Generate hotspot maps for indicators with z-score data
        from app.outputs.maps import render_hotspot_map
        product_hotspot_paths = {}
        for result in job.results:
            product_obj = registry.get(result.product_id)
            zscore_raster = getattr(product_obj, '_zscore_raster', None)
            hotspot_mask = getattr(product_obj, '_hotspot_mask', None)
            true_color_path_ind = getattr(product_obj, '_true_color_path', None)

            if zscore_raster is not None and hotspot_mask is not None:
                hotspot_path = os.path.join(results_dir, f"{result.product_id}_hotspot.png")

                raster_path = getattr(product_obj, '_product_raster_path', None)
                if raster_path:
                    import rasterio
                    with rasterio.open(raster_path) as src:
                        extent = [src.bounds.left, src.bounds.right, src.bounds.bottom, src.bounds.top]
                else:
                    b = job.request.aoi.bbox
                    extent = [b[0], b[2], b[1], b[3]]

                render_hotspot_map(
                    true_color_path=true_color_path_ind,
                    zscore_raster=zscore_raster,
                    hotspot_mask=hotspot_mask,
                    extent=extent,
                    aoi=job.request.aoi,
                    status=result.status,
                    output_path=hotspot_path,
                    label=result.product_id.upper(),
                )
                product_hotspot_paths[result.product_id] = hotspot_path
                output_files.append(hotspot_path)

        # Cross-indicator compound signal detection.
        # Skip indicators that cannot contribute reliably:
        #   - GREEN status (no signal, including coverage-gated water)
        #   - Headlines flagged as baseline drift
        # This prevents false-positive compound signals fired off pixel-level
        # noise from indicators we already deemed unreliable at the AOI level.
        from app.analysis.compound import detect_compound_signals
        import numpy as np
        from app.models import StatusLevel

        unreliable_pids: set[str] = set()
        for result in job.results:
            if result.status == StatusLevel.GREEN:
                unreliable_pids.add(result.product_id)
                continue
            headline_lower = (result.headline or "").lower()
            if "baseline may be unreliable" in headline_lower:
                unreliable_pids.add(result.product_id)

        zscore_rasters = {}
        for result in job.results:
            if result.product_id in unreliable_pids:
                continue
            product_obj = registry.get(result.product_id)
            z = getattr(product_obj, '_zscore_raster', None)
            if z is not None:
                zscore_rasters[result.product_id] = z

        if unreliable_pids:
            logger.info(
                "Compound signal detection skipping unreliable indicators: %s",
                sorted(unreliable_pids),
            )

        compound_signals = []
        if len(zscore_rasters) >= 2:
            # Upsample to finest resolution for best spatial overlap detection
            shapes = [z.shape for z in zscore_rasters.values()]
            target_shape = max(shapes, key=lambda s: s[0] * s[1])

            resampled = {}
            for ind_id, z in zscore_rasters.items():
                if z.shape != target_shape:
                    from scipy.ndimage import zoom
                    factors = (target_shape[0] / z.shape[0], target_shape[1] / z.shape[1])
                    resampled[ind_id] = zoom(z, factors, order=1)  # bilinear for continuous z-scores
                else:
                    resampled[ind_id] = z

            pixel_area_ha = (job.request.aoi.area_km2 * 100) / (target_shape[0] * target_shape[1])

            compound_signals = detect_compound_signals(
                zscore_rasters=resampled,
                pixel_area_ha=pixel_area_ha,
                threshold=2.0,
            )
            del resampled

        # Release z-score rasters from EO product instances to free memory
        del zscore_rasters
        for result in job.results:
            registry.get(result.product_id).release_rasters()

        if compound_signals:
            signals_path = os.path.join(results_dir, "compound_signals.json")
            with open(signals_path, "w") as f:
                json.dump([s.model_dump() for s in compound_signals], f, indent=2)
            output_files.append(signals_path)

        # Build map paths dict for PDF
        product_map_paths = {}
        for result in job.results:
            mp = os.path.join(results_dir, f"{result.product_id}_map.png")
            if os.path.exists(mp):
                product_map_paths[result.product_id] = mp

        # Generate summary map (worst-case status)
        from app.models import StatusLevel
        worst_status = max(
            (r.status for r in job.results),
            key=lambda s: [StatusLevel.GREEN, StatusLevel.AMBER, StatusLevel.RED].index(s),
        )
        summary_map_path = os.path.join(results_dir, "summary_map.png")
        render_status_map(aoi=job.request.aoi, status=worst_status, output_path=summary_map_path)
        output_files.append(summary_map_path)

        # --- Visual Overview ---
        overview_score = compute_composite_score(job.results)

        overview_score_path = os.path.join(results_dir, "overview_score.json")
        write_overview_score(overview_score, overview_score_path)
        output_files.append(overview_score_path)

        # Overview map: reuse true-color from any raster EO product, or skip
        overview_map_path = os.path.join(results_dir, "overview_map.png")
        true_color_path = None
        for product_id in job.request.product_ids:
            product_obj = registry.get(product_id)
            tc = getattr(product_obj, '_true_color_path', None)
            if tc and os.path.exists(tc):
                true_color_path = tc
                break

        if true_color_path:
            render_overview_map(
                true_color_path=true_color_path,
                aoi=job.request.aoi,
                output_path=overview_map_path,
                title=f"{job.request.aoi.name} — Satellite Overview",
                date_range=f"{job.request.time_range.start} to {job.request.time_range.end}",
            )
            output_files.append(overview_map_path)

        # Generate PDF report
        report_path = os.path.join(results_dir, "report.pdf")
        generate_pdf_report(
            aoi=job.request.aoi,
            time_range=job.request.time_range,
            results=job.results,
            output_path=report_path,
            summary_map_path=summary_map_path,
            product_map_paths=product_map_paths,
            product_hotspot_paths=product_hotspot_paths,
            overview_score=overview_score,
            overview_map_path=overview_map_path if true_color_path else "",
            compound_signals=compound_signals,
        )
        output_files.append(report_path)

        # Package everything
        package_path = os.path.join(results_dir, "package.zip")
        create_data_package(files=output_files, output_path=package_path)

        await db.update_job_status(job_id, JobStatus.COMPLETE)

        # Send completion email
        await send_completion_email(
            to_email=job.request.email,
            job_id=job_id,
            aoi_name=job.request.aoi.name,
        )
    except Exception as e:
        logger.exception(f"Job {job_id} failed: {e}")
        await db.update_job_status(job_id, JobStatus.FAILED, error=str(e))


async def worker_loop(db: Database, registry: ProductRegistry) -> None:
    logger.info("Background worker started")
    while True:
        job = await db.get_next_queued_job()
        if job is not None:
            logger.info(f"Processing job {job.id}")
            await process_job(job.id, db, registry)
        await asyncio.sleep(5)