from __future__ import annotations import asyncio import json import logging import os import time import traceback from app.database import Database from app.eo_products.base import ProductRegistry from app.models import JobStatus from app.outputs.report import generate_pdf_report, _product_label from app.outputs.package import create_data_package from app.outputs.charts import render_timeseries_chart from app.outputs.maps import render_indicator_map, render_status_map from app.outputs.overview import compute_composite_score, write_overview_score from app.outputs.maps import render_overview_map from app.core.email import send_completion_email logger = logging.getLogger(__name__) BATCH_POLL_INTERVAL = 30 # seconds between status checks BATCH_TIMEOUT = 5400 # 90 minutes per indicator class _SkippedJob: """Stub for batch jobs that didn't finish — fails fast on download.""" def __init__(self, job_id: str): self.job_id = job_id def download_results(self, *a, **kw): raise RuntimeError(f"Job {self.job_id} not finished, skipping download") def status(self): return "skipped" def _save_spatial_json(spatial, status_value: str, path: str, product_obj=None) -> None: """Serialize spatial data to JSON for the frontend. Raster-type spatial data is downsampled into the same "grid" format the frontend already knows how to render — so the dashboard map shows the actual indicator overlay instead of just a tinted AOI box. """ if spatial is None: obj = {"map_type": "status", "status": status_value} elif spatial.map_type == "raster" and product_obj is not None: from app.outputs.spatial_web import raster_to_grid_dict raster_path = getattr(product_obj, "_product_raster_path", None) render_band = getattr(product_obj, "_render_band", 1) logger.info( "Serializing raster spatial for %s: path=%s band=%s exists=%s", getattr(product_obj, "id", "?"), raster_path, render_band, os.path.exists(raster_path) if raster_path else False, ) grid = None if raster_path and os.path.exists(raster_path): grid = raster_to_grid_dict( raster_path, band=render_band, spatial=spatial, status_value=status_value, ) if grid is not None: obj = grid else: # Fall back to a descriptive payload so the frontend can still # show the legend + color hint instead of blanking the map. logger.warning( "No grid spatial for %s — falling back to no-overlay payload", getattr(product_obj, "id", "?"), ) obj = { "map_type": "raster-unavailable", "status": status_value, "label": spatial.label, "colormap": spatial.colormap, "vmin": spatial.vmin, "vmax": spatial.vmax, } elif spatial.map_type == "grid": obj = { "map_type": "grid", "status": status_value, "data": spatial.data.tolist(), "lats": spatial.lats.tolist(), "lons": spatial.lons.tolist(), "label": spatial.label, "colormap": spatial.colormap, } else: obj = { "map_type": spatial.map_type, "status": status_value, "geojson": spatial.geojson, "label": spatial.label, "colormap": spatial.colormap, } with open(path, "w") as f: json.dump(obj, f) async def process_job(job_id: str, db: Database, registry: ProductRegistry) -> None: job = await db.get_job(job_id) if job is None: logger.error(f"Job {job_id} not found") return await db.update_job_status(job_id, JobStatus.PROCESSING) try: spatial_cache = {} # Separate batch vs non-batch EO products batch_products = {} process_products = [] for product_id in job.request.product_ids: product = registry.get(product_id) if product.uses_batch: batch_products[product_id] = product else: process_products.append((product_id, product)) # -- Process batch EO products sequentially -- for product_id, product in batch_products.items(): # Submit await db.update_job_progress(job_id, product_id, "submitting") jobs = await product.submit_batch( job.request.aoi, job.request.time_range, season_months=job.request.season_months(), ) job_ids = [getattr(j, 'job_id', '?') for j in jobs] print(f"[Aperture] Submitted {product_id} batch jobs: {job_ids}") await db.update_job_progress(job_id, product_id, "processing on CDSE") # Poll — exit early once first job finishes + grace period for others GRACE_PERIOD = 600 # 10 min grace after first job finishes poll_start = time.monotonic() first_finished_at = None while True: elapsed = time.monotonic() - poll_start statuses = [j.status() for j in jobs] job_ids = [getattr(j, 'job_id', '?') for j in jobs] print(f"[Aperture] Poll {product_id} ({elapsed:.0f}s): {list(zip(job_ids, statuses))}") if all(s == "finished" for s in statuses): logger.info("Batch jobs finished for %s", product_id) break elif any(s in ("error", "canceled") for s in statuses): logger.warning("Batch job failed for %s: %s", product_id, statuses) break # Track when first job finishes if first_finished_at is None and any(s == "finished" for s in statuses): first_finished_at = time.monotonic() print(f"[Aperture] {product_id}: first job finished, {GRACE_PERIOD}s grace for remaining") # Grace period: once any job finished, give others 10 min then harvest partial if first_finished_at and (time.monotonic() - first_finished_at) >= GRACE_PERIOD: logger.info("Grace period expired for %s, harvesting partial results", product_id) print(f"[Aperture] {product_id}: grace period expired, proceeding with partial results") break if elapsed >= BATCH_TIMEOUT: logger.warning("Batch poll timeout after %.0fs for %s", elapsed, product_id) break await asyncio.sleep(BATCH_POLL_INTERVAL) # Harvest if any job finished (harvest methods handle per-job failures) any_finished = any(s == "finished" for s in statuses) if not any_finished: failed_statuses = list(zip(job_ids, statuses)) raise RuntimeError( f"All batch jobs failed for {product_id}: {failed_statuses}" ) # Wrap non-finished jobs so download_results() fails fast # instead of blocking for 30 min on a still-running job harvest_jobs = [ j if s == "finished" else _SkippedJob(getattr(j, 'job_id', '?')) for j, s in zip(jobs, statuses) ] await db.update_job_progress(job_id, product_id, "downloading") result = await product.harvest( job.request.aoi, job.request.time_range, season_months=job.request.season_months(), batch_jobs=harvest_jobs, ) spatial = product.get_spatial_data() if spatial is not None: spatial_cache[product_id] = spatial print(f"[Aperture] Saving result for {product_id}: data_source={result.data_source}, headline={result.headline[:60]}") await db.save_job_result(job_id, result) await db.update_job_progress(job_id, product_id, "complete") # -- Process non-batch EO products -- for product_id, product in process_products: await db.update_job_progress(job_id, product_id, "processing") result = await product.process( job.request.aoi, job.request.time_range, season_months=job.request.season_months(), ) spatial = product.get_spatial_data() if spatial is not None: spatial_cache[product_id] = spatial await db.save_job_result(job_id, result) await db.update_job_progress(job_id, product_id, "complete") # Generate outputs job = await db.get_job(job_id) results_dir = os.path.join("results", job_id) os.makedirs(results_dir, exist_ok=True) output_files = [] # Generate charts and maps for each result for result in job.results: chart_path = os.path.join(results_dir, f"{result.product_id}_chart.png") render_timeseries_chart( chart_data=result.chart_data, product_name=_product_label(result.product_id), status=result.status, trend=result.trend, output_path=chart_path, ) output_files.append(chart_path) # Generate map PNG for every indicator spatial = spatial_cache.get(result.product_id) map_path = os.path.join(results_dir, f"{result.product_id}_map.png") if spatial is not None and spatial.map_type == "raster": # Raster-on-true-color rendering for openEO/download indicators product_obj = registry.get(result.product_id) raster_path = getattr(product_obj, '_product_raster_path', None) true_color_path = getattr(product_obj, '_true_color_path', None) render_band = getattr(product_obj, '_render_band', 1) from app.outputs.maps import render_raster_map render_raster_map( true_color_path=true_color_path, indicator_path=raster_path, indicator_band=render_band, aoi=job.request.aoi, status=result.status, output_path=map_path, cmap=spatial.colormap, vmin=spatial.vmin, vmax=spatial.vmax, label=spatial.label, ) elif spatial is not None: render_indicator_map( spatial=spatial, aoi=job.request.aoi, status=result.status, output_path=map_path, ) else: render_status_map( aoi=job.request.aoi, status=result.status, output_path=map_path, ) output_files.append(map_path) # Save spatial data as JSON for frontend spatial_json_path = os.path.join(results_dir, f"{result.product_id}_spatial.json") _save_spatial_json( spatial, result.status.value, spatial_json_path, product_obj=registry.get(result.product_id), ) # Generate hotspot maps for indicators with z-score data from app.outputs.maps import render_hotspot_map product_hotspot_paths = {} for result in job.results: product_obj = registry.get(result.product_id) zscore_raster = getattr(product_obj, '_zscore_raster', None) hotspot_mask = getattr(product_obj, '_hotspot_mask', None) true_color_path_ind = getattr(product_obj, '_true_color_path', None) if zscore_raster is not None and hotspot_mask is not None: hotspot_path = os.path.join(results_dir, f"{result.product_id}_hotspot.png") raster_path = getattr(product_obj, '_product_raster_path', None) if raster_path: import rasterio with rasterio.open(raster_path) as src: extent = [src.bounds.left, src.bounds.right, src.bounds.bottom, src.bounds.top] else: b = job.request.aoi.bbox extent = [b[0], b[2], b[1], b[3]] render_hotspot_map( true_color_path=true_color_path_ind, zscore_raster=zscore_raster, hotspot_mask=hotspot_mask, extent=extent, aoi=job.request.aoi, status=result.status, output_path=hotspot_path, label=result.product_id.upper(), ) product_hotspot_paths[result.product_id] = hotspot_path output_files.append(hotspot_path) # Cross-indicator compound signal detection. # Skip indicators that cannot contribute reliably: # - GREEN status (no signal, including coverage-gated water) # - Headlines flagged as baseline drift # This prevents false-positive compound signals fired off pixel-level # noise from indicators we already deemed unreliable at the AOI level. from app.analysis.compound import detect_compound_signals import numpy as np from app.models import StatusLevel unreliable_pids: set[str] = set() for result in job.results: if result.status == StatusLevel.GREEN: unreliable_pids.add(result.product_id) continue headline_lower = (result.headline or "").lower() if "baseline may be unreliable" in headline_lower: unreliable_pids.add(result.product_id) zscore_rasters = {} for result in job.results: if result.product_id in unreliable_pids: continue product_obj = registry.get(result.product_id) z = getattr(product_obj, '_zscore_raster', None) if z is not None: zscore_rasters[result.product_id] = z if unreliable_pids: logger.info( "Compound signal detection skipping unreliable indicators: %s", sorted(unreliable_pids), ) compound_signals = [] if len(zscore_rasters) >= 2: # Upsample to finest resolution for best spatial overlap detection shapes = [z.shape for z in zscore_rasters.values()] target_shape = max(shapes, key=lambda s: s[0] * s[1]) resampled = {} for ind_id, z in zscore_rasters.items(): if z.shape != target_shape: from scipy.ndimage import zoom factors = (target_shape[0] / z.shape[0], target_shape[1] / z.shape[1]) resampled[ind_id] = zoom(z, factors, order=1) # bilinear for continuous z-scores else: resampled[ind_id] = z pixel_area_ha = (job.request.aoi.area_km2 * 100) / (target_shape[0] * target_shape[1]) compound_signals = detect_compound_signals( zscore_rasters=resampled, pixel_area_ha=pixel_area_ha, threshold=2.0, ) del resampled # Release z-score rasters from EO product instances to free memory del zscore_rasters for result in job.results: registry.get(result.product_id).release_rasters() if compound_signals: signals_path = os.path.join(results_dir, "compound_signals.json") with open(signals_path, "w") as f: json.dump([s.model_dump() for s in compound_signals], f, indent=2) output_files.append(signals_path) # Build map paths dict for PDF product_map_paths = {} for result in job.results: mp = os.path.join(results_dir, f"{result.product_id}_map.png") if os.path.exists(mp): product_map_paths[result.product_id] = mp # Generate summary map (worst-case status) from app.models import StatusLevel worst_status = max( (r.status for r in job.results), key=lambda s: [StatusLevel.GREEN, StatusLevel.AMBER, StatusLevel.RED].index(s), ) summary_map_path = os.path.join(results_dir, "summary_map.png") render_status_map(aoi=job.request.aoi, status=worst_status, output_path=summary_map_path) output_files.append(summary_map_path) # --- Visual Overview --- overview_score = compute_composite_score(job.results) overview_score_path = os.path.join(results_dir, "overview_score.json") write_overview_score(overview_score, overview_score_path) output_files.append(overview_score_path) # Overview map: reuse true-color from any raster EO product, or skip overview_map_path = os.path.join(results_dir, "overview_map.png") true_color_path = None for product_id in job.request.product_ids: product_obj = registry.get(product_id) tc = getattr(product_obj, '_true_color_path', None) if tc and os.path.exists(tc): true_color_path = tc break if true_color_path: render_overview_map( true_color_path=true_color_path, aoi=job.request.aoi, output_path=overview_map_path, title=f"{job.request.aoi.name} — Satellite Overview", date_range=f"{job.request.time_range.start} to {job.request.time_range.end}", ) output_files.append(overview_map_path) # Generate PDF report report_path = os.path.join(results_dir, "report.pdf") generate_pdf_report( aoi=job.request.aoi, time_range=job.request.time_range, results=job.results, output_path=report_path, summary_map_path=summary_map_path, product_map_paths=product_map_paths, product_hotspot_paths=product_hotspot_paths, overview_score=overview_score, overview_map_path=overview_map_path if true_color_path else "", compound_signals=compound_signals, ) output_files.append(report_path) # Package everything package_path = os.path.join(results_dir, "package.zip") create_data_package(files=output_files, output_path=package_path) await db.update_job_status(job_id, JobStatus.COMPLETE) # Send completion email await send_completion_email( to_email=job.request.email, job_id=job_id, aoi_name=job.request.aoi.name, ) except Exception as e: logger.exception(f"Job {job_id} failed: {e}") await db.update_job_status(job_id, JobStatus.FAILED, error=str(e)) async def worker_loop(db: Database, registry: ProductRegistry) -> None: logger.info("Background worker started") while True: job = await db.get_next_queued_job() if job is not None: logger.info(f"Processing job {job.id}") await process_job(job.id, db, registry) await asyncio.sleep(5)