| from __future__ import annotations |
| import asyncio |
| import json |
| import logging |
| import os |
| import time |
| import traceback |
| from app.database import Database |
| from app.eo_products.base import ProductRegistry |
| from app.models import JobStatus |
| from app.outputs.report import generate_pdf_report, _product_label |
| from app.outputs.package import create_data_package |
| from app.outputs.charts import render_timeseries_chart |
| from app.outputs.maps import render_indicator_map, render_status_map |
| from app.outputs.overview import compute_composite_score, write_overview_score |
| from app.outputs.maps import render_overview_map |
| from app.core.email import send_completion_email |
|
|
| logger = logging.getLogger(__name__) |
|
|
| BATCH_POLL_INTERVAL = 30 |
| BATCH_TIMEOUT = 5400 |
|
|
|
|
| class _SkippedJob: |
| """Stub for batch jobs that didn't finish — fails fast on download.""" |
| def __init__(self, job_id: str): |
| self.job_id = job_id |
| def download_results(self, *a, **kw): |
| raise RuntimeError(f"Job {self.job_id} not finished, skipping download") |
| def status(self): |
| return "skipped" |
|
|
|
|
| def _save_spatial_json(spatial, status_value: str, path: str, product_obj=None) -> None: |
| """Serialize spatial data to JSON for the frontend. |
| |
| Raster-type spatial data is downsampled into the same "grid" format |
| the frontend already knows how to render — so the dashboard map |
| shows the actual indicator overlay instead of just a tinted AOI box. |
| """ |
| if spatial is None: |
| obj = {"map_type": "status", "status": status_value} |
| elif spatial.map_type == "raster" and product_obj is not None: |
| from app.outputs.spatial_web import raster_to_grid_dict |
| raster_path = getattr(product_obj, "_product_raster_path", None) |
| render_band = getattr(product_obj, "_render_band", 1) |
| logger.info( |
| "Serializing raster spatial for %s: path=%s band=%s exists=%s", |
| getattr(product_obj, "id", "?"), |
| raster_path, |
| render_band, |
| os.path.exists(raster_path) if raster_path else False, |
| ) |
| grid = None |
| if raster_path and os.path.exists(raster_path): |
| grid = raster_to_grid_dict( |
| raster_path, |
| band=render_band, |
| spatial=spatial, |
| status_value=status_value, |
| ) |
| if grid is not None: |
| obj = grid |
| else: |
| |
| |
| logger.warning( |
| "No grid spatial for %s — falling back to no-overlay payload", |
| getattr(product_obj, "id", "?"), |
| ) |
| obj = { |
| "map_type": "raster-unavailable", |
| "status": status_value, |
| "label": spatial.label, |
| "colormap": spatial.colormap, |
| "vmin": spatial.vmin, |
| "vmax": spatial.vmax, |
| } |
| elif spatial.map_type == "grid": |
| obj = { |
| "map_type": "grid", |
| "status": status_value, |
| "data": spatial.data.tolist(), |
| "lats": spatial.lats.tolist(), |
| "lons": spatial.lons.tolist(), |
| "label": spatial.label, |
| "colormap": spatial.colormap, |
| } |
| else: |
| obj = { |
| "map_type": spatial.map_type, |
| "status": status_value, |
| "geojson": spatial.geojson, |
| "label": spatial.label, |
| "colormap": spatial.colormap, |
| } |
| with open(path, "w") as f: |
| json.dump(obj, f) |
|
|
|
|
| async def process_job(job_id: str, db: Database, registry: ProductRegistry) -> None: |
| job = await db.get_job(job_id) |
| if job is None: |
| logger.error(f"Job {job_id} not found") |
| return |
| await db.update_job_status(job_id, JobStatus.PROCESSING) |
| try: |
| spatial_cache = {} |
|
|
| |
| batch_products = {} |
| process_products = [] |
| for product_id in job.request.product_ids: |
| product = registry.get(product_id) |
| if product.uses_batch: |
| batch_products[product_id] = product |
| else: |
| process_products.append((product_id, product)) |
|
|
| |
| for product_id, product in batch_products.items(): |
| |
| await db.update_job_progress(job_id, product_id, "submitting") |
| jobs = await product.submit_batch( |
| job.request.aoi, |
| job.request.time_range, |
| season_months=job.request.season_months(), |
| ) |
| job_ids = [getattr(j, 'job_id', '?') for j in jobs] |
| print(f"[Aperture] Submitted {product_id} batch jobs: {job_ids}") |
| await db.update_job_progress(job_id, product_id, "processing on CDSE") |
|
|
| |
| GRACE_PERIOD = 600 |
| poll_start = time.monotonic() |
| first_finished_at = None |
| while True: |
| elapsed = time.monotonic() - poll_start |
| statuses = [j.status() for j in jobs] |
| job_ids = [getattr(j, 'job_id', '?') for j in jobs] |
| print(f"[Aperture] Poll {product_id} ({elapsed:.0f}s): {list(zip(job_ids, statuses))}") |
|
|
| if all(s == "finished" for s in statuses): |
| logger.info("Batch jobs finished for %s", product_id) |
| break |
| elif any(s in ("error", "canceled") for s in statuses): |
| logger.warning("Batch job failed for %s: %s", product_id, statuses) |
| break |
|
|
| |
| if first_finished_at is None and any(s == "finished" for s in statuses): |
| first_finished_at = time.monotonic() |
| print(f"[Aperture] {product_id}: first job finished, {GRACE_PERIOD}s grace for remaining") |
|
|
| |
| if first_finished_at and (time.monotonic() - first_finished_at) >= GRACE_PERIOD: |
| logger.info("Grace period expired for %s, harvesting partial results", product_id) |
| print(f"[Aperture] {product_id}: grace period expired, proceeding with partial results") |
| break |
|
|
| if elapsed >= BATCH_TIMEOUT: |
| logger.warning("Batch poll timeout after %.0fs for %s", elapsed, product_id) |
| break |
|
|
| await asyncio.sleep(BATCH_POLL_INTERVAL) |
|
|
| |
| any_finished = any(s == "finished" for s in statuses) |
| if not any_finished: |
| failed_statuses = list(zip(job_ids, statuses)) |
| raise RuntimeError( |
| f"All batch jobs failed for {product_id}: {failed_statuses}" |
| ) |
|
|
| |
| |
| harvest_jobs = [ |
| j if s == "finished" else _SkippedJob(getattr(j, 'job_id', '?')) |
| for j, s in zip(jobs, statuses) |
| ] |
| await db.update_job_progress(job_id, product_id, "downloading") |
| result = await product.harvest( |
| job.request.aoi, |
| job.request.time_range, |
| season_months=job.request.season_months(), |
| batch_jobs=harvest_jobs, |
| ) |
|
|
| spatial = product.get_spatial_data() |
| if spatial is not None: |
| spatial_cache[product_id] = spatial |
| print(f"[Aperture] Saving result for {product_id}: data_source={result.data_source}, headline={result.headline[:60]}") |
| await db.save_job_result(job_id, result) |
| await db.update_job_progress(job_id, product_id, "complete") |
|
|
| |
| for product_id, product in process_products: |
| await db.update_job_progress(job_id, product_id, "processing") |
| result = await product.process( |
| job.request.aoi, |
| job.request.time_range, |
| season_months=job.request.season_months(), |
| ) |
| spatial = product.get_spatial_data() |
| if spatial is not None: |
| spatial_cache[product_id] = spatial |
| await db.save_job_result(job_id, result) |
| await db.update_job_progress(job_id, product_id, "complete") |
|
|
| |
| job = await db.get_job(job_id) |
| results_dir = os.path.join("results", job_id) |
| os.makedirs(results_dir, exist_ok=True) |
|
|
| output_files = [] |
|
|
| |
| for result in job.results: |
| chart_path = os.path.join(results_dir, f"{result.product_id}_chart.png") |
| render_timeseries_chart( |
| chart_data=result.chart_data, |
| product_name=_product_label(result.product_id), |
| status=result.status, |
| trend=result.trend, |
| output_path=chart_path, |
| ) |
| output_files.append(chart_path) |
|
|
| |
| spatial = spatial_cache.get(result.product_id) |
| map_path = os.path.join(results_dir, f"{result.product_id}_map.png") |
|
|
| if spatial is not None and spatial.map_type == "raster": |
| |
| product_obj = registry.get(result.product_id) |
| raster_path = getattr(product_obj, '_product_raster_path', None) |
| true_color_path = getattr(product_obj, '_true_color_path', None) |
| render_band = getattr(product_obj, '_render_band', 1) |
| from app.outputs.maps import render_raster_map |
| render_raster_map( |
| true_color_path=true_color_path, |
| indicator_path=raster_path, |
| indicator_band=render_band, |
| aoi=job.request.aoi, |
| status=result.status, |
| output_path=map_path, |
| cmap=spatial.colormap, |
| vmin=spatial.vmin, |
| vmax=spatial.vmax, |
| label=spatial.label, |
| ) |
| elif spatial is not None: |
| render_indicator_map( |
| spatial=spatial, |
| aoi=job.request.aoi, |
| status=result.status, |
| output_path=map_path, |
| ) |
| else: |
| render_status_map( |
| aoi=job.request.aoi, |
| status=result.status, |
| output_path=map_path, |
| ) |
| output_files.append(map_path) |
|
|
| |
| spatial_json_path = os.path.join(results_dir, f"{result.product_id}_spatial.json") |
| _save_spatial_json( |
| spatial, |
| result.status.value, |
| spatial_json_path, |
| product_obj=registry.get(result.product_id), |
| ) |
|
|
| |
| from app.outputs.maps import render_hotspot_map |
| product_hotspot_paths = {} |
| for result in job.results: |
| product_obj = registry.get(result.product_id) |
| zscore_raster = getattr(product_obj, '_zscore_raster', None) |
| hotspot_mask = getattr(product_obj, '_hotspot_mask', None) |
| true_color_path_ind = getattr(product_obj, '_true_color_path', None) |
|
|
| if zscore_raster is not None and hotspot_mask is not None: |
| hotspot_path = os.path.join(results_dir, f"{result.product_id}_hotspot.png") |
|
|
| raster_path = getattr(product_obj, '_product_raster_path', None) |
| if raster_path: |
| import rasterio |
| with rasterio.open(raster_path) as src: |
| extent = [src.bounds.left, src.bounds.right, src.bounds.bottom, src.bounds.top] |
| else: |
| b = job.request.aoi.bbox |
| extent = [b[0], b[2], b[1], b[3]] |
|
|
| render_hotspot_map( |
| true_color_path=true_color_path_ind, |
| zscore_raster=zscore_raster, |
| hotspot_mask=hotspot_mask, |
| extent=extent, |
| aoi=job.request.aoi, |
| status=result.status, |
| output_path=hotspot_path, |
| label=result.product_id.upper(), |
| ) |
| product_hotspot_paths[result.product_id] = hotspot_path |
| output_files.append(hotspot_path) |
|
|
| |
| |
| |
| |
| |
| |
| from app.analysis.compound import detect_compound_signals |
| import numpy as np |
| from app.models import StatusLevel |
|
|
| unreliable_pids: set[str] = set() |
| for result in job.results: |
| if result.status == StatusLevel.GREEN: |
| unreliable_pids.add(result.product_id) |
| continue |
| headline_lower = (result.headline or "").lower() |
| if "baseline may be unreliable" in headline_lower: |
| unreliable_pids.add(result.product_id) |
|
|
| zscore_rasters = {} |
| for result in job.results: |
| if result.product_id in unreliable_pids: |
| continue |
| product_obj = registry.get(result.product_id) |
| z = getattr(product_obj, '_zscore_raster', None) |
| if z is not None: |
| zscore_rasters[result.product_id] = z |
|
|
| if unreliable_pids: |
| logger.info( |
| "Compound signal detection skipping unreliable indicators: %s", |
| sorted(unreliable_pids), |
| ) |
|
|
| compound_signals = [] |
| if len(zscore_rasters) >= 2: |
| |
| shapes = [z.shape for z in zscore_rasters.values()] |
| target_shape = max(shapes, key=lambda s: s[0] * s[1]) |
|
|
| resampled = {} |
| for ind_id, z in zscore_rasters.items(): |
| if z.shape != target_shape: |
| from scipy.ndimage import zoom |
| factors = (target_shape[0] / z.shape[0], target_shape[1] / z.shape[1]) |
| resampled[ind_id] = zoom(z, factors, order=1) |
| else: |
| resampled[ind_id] = z |
|
|
| pixel_area_ha = (job.request.aoi.area_km2 * 100) / (target_shape[0] * target_shape[1]) |
|
|
| compound_signals = detect_compound_signals( |
| zscore_rasters=resampled, |
| pixel_area_ha=pixel_area_ha, |
| threshold=2.0, |
| ) |
| del resampled |
|
|
| |
| del zscore_rasters |
| for result in job.results: |
| registry.get(result.product_id).release_rasters() |
|
|
| if compound_signals: |
| signals_path = os.path.join(results_dir, "compound_signals.json") |
| with open(signals_path, "w") as f: |
| json.dump([s.model_dump() for s in compound_signals], f, indent=2) |
| output_files.append(signals_path) |
|
|
| |
| product_map_paths = {} |
| for result in job.results: |
| mp = os.path.join(results_dir, f"{result.product_id}_map.png") |
| if os.path.exists(mp): |
| product_map_paths[result.product_id] = mp |
|
|
| |
| from app.models import StatusLevel |
| worst_status = max( |
| (r.status for r in job.results), |
| key=lambda s: [StatusLevel.GREEN, StatusLevel.AMBER, StatusLevel.RED].index(s), |
| ) |
| summary_map_path = os.path.join(results_dir, "summary_map.png") |
| render_status_map(aoi=job.request.aoi, status=worst_status, output_path=summary_map_path) |
| output_files.append(summary_map_path) |
|
|
| |
| overview_score = compute_composite_score(job.results) |
|
|
| overview_score_path = os.path.join(results_dir, "overview_score.json") |
| write_overview_score(overview_score, overview_score_path) |
| output_files.append(overview_score_path) |
|
|
| |
| overview_map_path = os.path.join(results_dir, "overview_map.png") |
| true_color_path = None |
| for product_id in job.request.product_ids: |
| product_obj = registry.get(product_id) |
| tc = getattr(product_obj, '_true_color_path', None) |
| if tc and os.path.exists(tc): |
| true_color_path = tc |
| break |
|
|
| if true_color_path: |
| render_overview_map( |
| true_color_path=true_color_path, |
| aoi=job.request.aoi, |
| output_path=overview_map_path, |
| title=f"{job.request.aoi.name} — Satellite Overview", |
| date_range=f"{job.request.time_range.start} to {job.request.time_range.end}", |
| ) |
| output_files.append(overview_map_path) |
|
|
| |
| report_path = os.path.join(results_dir, "report.pdf") |
| generate_pdf_report( |
| aoi=job.request.aoi, |
| time_range=job.request.time_range, |
| results=job.results, |
| output_path=report_path, |
| summary_map_path=summary_map_path, |
| product_map_paths=product_map_paths, |
| product_hotspot_paths=product_hotspot_paths, |
| overview_score=overview_score, |
| overview_map_path=overview_map_path if true_color_path else "", |
| compound_signals=compound_signals, |
| ) |
| output_files.append(report_path) |
|
|
| |
| package_path = os.path.join(results_dir, "package.zip") |
| create_data_package(files=output_files, output_path=package_path) |
|
|
| await db.update_job_status(job_id, JobStatus.COMPLETE) |
|
|
| |
| await send_completion_email( |
| to_email=job.request.email, |
| job_id=job_id, |
| aoi_name=job.request.aoi.name, |
| ) |
| except Exception as e: |
| logger.exception(f"Job {job_id} failed: {e}") |
| await db.update_job_status(job_id, JobStatus.FAILED, error=str(e)) |
|
|
|
|
| async def worker_loop(db: Database, registry: ProductRegistry) -> None: |
| logger.info("Background worker started") |
| while True: |
| job = await db.get_next_queued_job() |
| if job is not None: |
| logger.info(f"Processing job {job.id}") |
| await process_job(job.id, db, registry) |
| await asyncio.sleep(5) |
|
|