Aperture / app /worker.py
KSvend
fix: graceful fallback when indicator has no grid overlay
ef89b38
from __future__ import annotations
import asyncio
import json
import logging
import os
import time
import traceback
from app.database import Database
from app.eo_products.base import ProductRegistry
from app.models import JobStatus
from app.outputs.report import generate_pdf_report, _product_label
from app.outputs.package import create_data_package
from app.outputs.charts import render_timeseries_chart
from app.outputs.maps import render_indicator_map, render_status_map
from app.outputs.overview import compute_composite_score, write_overview_score
from app.outputs.maps import render_overview_map
from app.core.email import send_completion_email
logger = logging.getLogger(__name__)
BATCH_POLL_INTERVAL = 30 # seconds between status checks
BATCH_TIMEOUT = 5400 # 90 minutes per indicator
class _SkippedJob:
"""Stub for batch jobs that didn't finish — fails fast on download."""
def __init__(self, job_id: str):
self.job_id = job_id
def download_results(self, *a, **kw):
raise RuntimeError(f"Job {self.job_id} not finished, skipping download")
def status(self):
return "skipped"
def _save_spatial_json(spatial, status_value: str, path: str, product_obj=None) -> None:
"""Serialize spatial data to JSON for the frontend.
Raster-type spatial data is downsampled into the same "grid" format
the frontend already knows how to render — so the dashboard map
shows the actual indicator overlay instead of just a tinted AOI box.
"""
if spatial is None:
obj = {"map_type": "status", "status": status_value}
elif spatial.map_type == "raster" and product_obj is not None:
from app.outputs.spatial_web import raster_to_grid_dict
raster_path = getattr(product_obj, "_product_raster_path", None)
render_band = getattr(product_obj, "_render_band", 1)
logger.info(
"Serializing raster spatial for %s: path=%s band=%s exists=%s",
getattr(product_obj, "id", "?"),
raster_path,
render_band,
os.path.exists(raster_path) if raster_path else False,
)
grid = None
if raster_path and os.path.exists(raster_path):
grid = raster_to_grid_dict(
raster_path,
band=render_band,
spatial=spatial,
status_value=status_value,
)
if grid is not None:
obj = grid
else:
# Fall back to a descriptive payload so the frontend can still
# show the legend + color hint instead of blanking the map.
logger.warning(
"No grid spatial for %s — falling back to no-overlay payload",
getattr(product_obj, "id", "?"),
)
obj = {
"map_type": "raster-unavailable",
"status": status_value,
"label": spatial.label,
"colormap": spatial.colormap,
"vmin": spatial.vmin,
"vmax": spatial.vmax,
}
elif spatial.map_type == "grid":
obj = {
"map_type": "grid",
"status": status_value,
"data": spatial.data.tolist(),
"lats": spatial.lats.tolist(),
"lons": spatial.lons.tolist(),
"label": spatial.label,
"colormap": spatial.colormap,
}
else:
obj = {
"map_type": spatial.map_type,
"status": status_value,
"geojson": spatial.geojson,
"label": spatial.label,
"colormap": spatial.colormap,
}
with open(path, "w") as f:
json.dump(obj, f)
async def process_job(job_id: str, db: Database, registry: ProductRegistry) -> None:
job = await db.get_job(job_id)
if job is None:
logger.error(f"Job {job_id} not found")
return
await db.update_job_status(job_id, JobStatus.PROCESSING)
try:
spatial_cache = {}
# Separate batch vs non-batch EO products
batch_products = {}
process_products = []
for product_id in job.request.product_ids:
product = registry.get(product_id)
if product.uses_batch:
batch_products[product_id] = product
else:
process_products.append((product_id, product))
# -- Process batch EO products sequentially --
for product_id, product in batch_products.items():
# Submit
await db.update_job_progress(job_id, product_id, "submitting")
jobs = await product.submit_batch(
job.request.aoi,
job.request.time_range,
season_months=job.request.season_months(),
)
job_ids = [getattr(j, 'job_id', '?') for j in jobs]
print(f"[Aperture] Submitted {product_id} batch jobs: {job_ids}")
await db.update_job_progress(job_id, product_id, "processing on CDSE")
# Poll — exit early once first job finishes + grace period for others
GRACE_PERIOD = 600 # 10 min grace after first job finishes
poll_start = time.monotonic()
first_finished_at = None
while True:
elapsed = time.monotonic() - poll_start
statuses = [j.status() for j in jobs]
job_ids = [getattr(j, 'job_id', '?') for j in jobs]
print(f"[Aperture] Poll {product_id} ({elapsed:.0f}s): {list(zip(job_ids, statuses))}")
if all(s == "finished" for s in statuses):
logger.info("Batch jobs finished for %s", product_id)
break
elif any(s in ("error", "canceled") for s in statuses):
logger.warning("Batch job failed for %s: %s", product_id, statuses)
break
# Track when first job finishes
if first_finished_at is None and any(s == "finished" for s in statuses):
first_finished_at = time.monotonic()
print(f"[Aperture] {product_id}: first job finished, {GRACE_PERIOD}s grace for remaining")
# Grace period: once any job finished, give others 10 min then harvest partial
if first_finished_at and (time.monotonic() - first_finished_at) >= GRACE_PERIOD:
logger.info("Grace period expired for %s, harvesting partial results", product_id)
print(f"[Aperture] {product_id}: grace period expired, proceeding with partial results")
break
if elapsed >= BATCH_TIMEOUT:
logger.warning("Batch poll timeout after %.0fs for %s", elapsed, product_id)
break
await asyncio.sleep(BATCH_POLL_INTERVAL)
# Harvest if any job finished (harvest methods handle per-job failures)
any_finished = any(s == "finished" for s in statuses)
if not any_finished:
failed_statuses = list(zip(job_ids, statuses))
raise RuntimeError(
f"All batch jobs failed for {product_id}: {failed_statuses}"
)
# Wrap non-finished jobs so download_results() fails fast
# instead of blocking for 30 min on a still-running job
harvest_jobs = [
j if s == "finished" else _SkippedJob(getattr(j, 'job_id', '?'))
for j, s in zip(jobs, statuses)
]
await db.update_job_progress(job_id, product_id, "downloading")
result = await product.harvest(
job.request.aoi,
job.request.time_range,
season_months=job.request.season_months(),
batch_jobs=harvest_jobs,
)
spatial = product.get_spatial_data()
if spatial is not None:
spatial_cache[product_id] = spatial
print(f"[Aperture] Saving result for {product_id}: data_source={result.data_source}, headline={result.headline[:60]}")
await db.save_job_result(job_id, result)
await db.update_job_progress(job_id, product_id, "complete")
# -- Process non-batch EO products --
for product_id, product in process_products:
await db.update_job_progress(job_id, product_id, "processing")
result = await product.process(
job.request.aoi,
job.request.time_range,
season_months=job.request.season_months(),
)
spatial = product.get_spatial_data()
if spatial is not None:
spatial_cache[product_id] = spatial
await db.save_job_result(job_id, result)
await db.update_job_progress(job_id, product_id, "complete")
# Generate outputs
job = await db.get_job(job_id)
results_dir = os.path.join("results", job_id)
os.makedirs(results_dir, exist_ok=True)
output_files = []
# Generate charts and maps for each result
for result in job.results:
chart_path = os.path.join(results_dir, f"{result.product_id}_chart.png")
render_timeseries_chart(
chart_data=result.chart_data,
product_name=_product_label(result.product_id),
status=result.status,
trend=result.trend,
output_path=chart_path,
)
output_files.append(chart_path)
# Generate map PNG for every indicator
spatial = spatial_cache.get(result.product_id)
map_path = os.path.join(results_dir, f"{result.product_id}_map.png")
if spatial is not None and spatial.map_type == "raster":
# Raster-on-true-color rendering for openEO/download indicators
product_obj = registry.get(result.product_id)
raster_path = getattr(product_obj, '_product_raster_path', None)
true_color_path = getattr(product_obj, '_true_color_path', None)
render_band = getattr(product_obj, '_render_band', 1)
from app.outputs.maps import render_raster_map
render_raster_map(
true_color_path=true_color_path,
indicator_path=raster_path,
indicator_band=render_band,
aoi=job.request.aoi,
status=result.status,
output_path=map_path,
cmap=spatial.colormap,
vmin=spatial.vmin,
vmax=spatial.vmax,
label=spatial.label,
)
elif spatial is not None:
render_indicator_map(
spatial=spatial,
aoi=job.request.aoi,
status=result.status,
output_path=map_path,
)
else:
render_status_map(
aoi=job.request.aoi,
status=result.status,
output_path=map_path,
)
output_files.append(map_path)
# Save spatial data as JSON for frontend
spatial_json_path = os.path.join(results_dir, f"{result.product_id}_spatial.json")
_save_spatial_json(
spatial,
result.status.value,
spatial_json_path,
product_obj=registry.get(result.product_id),
)
# Generate hotspot maps for indicators with z-score data
from app.outputs.maps import render_hotspot_map
product_hotspot_paths = {}
for result in job.results:
product_obj = registry.get(result.product_id)
zscore_raster = getattr(product_obj, '_zscore_raster', None)
hotspot_mask = getattr(product_obj, '_hotspot_mask', None)
true_color_path_ind = getattr(product_obj, '_true_color_path', None)
if zscore_raster is not None and hotspot_mask is not None:
hotspot_path = os.path.join(results_dir, f"{result.product_id}_hotspot.png")
raster_path = getattr(product_obj, '_product_raster_path', None)
if raster_path:
import rasterio
with rasterio.open(raster_path) as src:
extent = [src.bounds.left, src.bounds.right, src.bounds.bottom, src.bounds.top]
else:
b = job.request.aoi.bbox
extent = [b[0], b[2], b[1], b[3]]
render_hotspot_map(
true_color_path=true_color_path_ind,
zscore_raster=zscore_raster,
hotspot_mask=hotspot_mask,
extent=extent,
aoi=job.request.aoi,
status=result.status,
output_path=hotspot_path,
label=result.product_id.upper(),
)
product_hotspot_paths[result.product_id] = hotspot_path
output_files.append(hotspot_path)
# Cross-indicator compound signal detection.
# Skip indicators that cannot contribute reliably:
# - GREEN status (no signal, including coverage-gated water)
# - Headlines flagged as baseline drift
# This prevents false-positive compound signals fired off pixel-level
# noise from indicators we already deemed unreliable at the AOI level.
from app.analysis.compound import detect_compound_signals
import numpy as np
from app.models import StatusLevel
unreliable_pids: set[str] = set()
for result in job.results:
if result.status == StatusLevel.GREEN:
unreliable_pids.add(result.product_id)
continue
headline_lower = (result.headline or "").lower()
if "baseline may be unreliable" in headline_lower:
unreliable_pids.add(result.product_id)
zscore_rasters = {}
for result in job.results:
if result.product_id in unreliable_pids:
continue
product_obj = registry.get(result.product_id)
z = getattr(product_obj, '_zscore_raster', None)
if z is not None:
zscore_rasters[result.product_id] = z
if unreliable_pids:
logger.info(
"Compound signal detection skipping unreliable indicators: %s",
sorted(unreliable_pids),
)
compound_signals = []
if len(zscore_rasters) >= 2:
# Upsample to finest resolution for best spatial overlap detection
shapes = [z.shape for z in zscore_rasters.values()]
target_shape = max(shapes, key=lambda s: s[0] * s[1])
resampled = {}
for ind_id, z in zscore_rasters.items():
if z.shape != target_shape:
from scipy.ndimage import zoom
factors = (target_shape[0] / z.shape[0], target_shape[1] / z.shape[1])
resampled[ind_id] = zoom(z, factors, order=1) # bilinear for continuous z-scores
else:
resampled[ind_id] = z
pixel_area_ha = (job.request.aoi.area_km2 * 100) / (target_shape[0] * target_shape[1])
compound_signals = detect_compound_signals(
zscore_rasters=resampled,
pixel_area_ha=pixel_area_ha,
threshold=2.0,
)
del resampled
# Release z-score rasters from EO product instances to free memory
del zscore_rasters
for result in job.results:
registry.get(result.product_id).release_rasters()
if compound_signals:
signals_path = os.path.join(results_dir, "compound_signals.json")
with open(signals_path, "w") as f:
json.dump([s.model_dump() for s in compound_signals], f, indent=2)
output_files.append(signals_path)
# Build map paths dict for PDF
product_map_paths = {}
for result in job.results:
mp = os.path.join(results_dir, f"{result.product_id}_map.png")
if os.path.exists(mp):
product_map_paths[result.product_id] = mp
# Generate summary map (worst-case status)
from app.models import StatusLevel
worst_status = max(
(r.status for r in job.results),
key=lambda s: [StatusLevel.GREEN, StatusLevel.AMBER, StatusLevel.RED].index(s),
)
summary_map_path = os.path.join(results_dir, "summary_map.png")
render_status_map(aoi=job.request.aoi, status=worst_status, output_path=summary_map_path)
output_files.append(summary_map_path)
# --- Visual Overview ---
overview_score = compute_composite_score(job.results)
overview_score_path = os.path.join(results_dir, "overview_score.json")
write_overview_score(overview_score, overview_score_path)
output_files.append(overview_score_path)
# Overview map: reuse true-color from any raster EO product, or skip
overview_map_path = os.path.join(results_dir, "overview_map.png")
true_color_path = None
for product_id in job.request.product_ids:
product_obj = registry.get(product_id)
tc = getattr(product_obj, '_true_color_path', None)
if tc and os.path.exists(tc):
true_color_path = tc
break
if true_color_path:
render_overview_map(
true_color_path=true_color_path,
aoi=job.request.aoi,
output_path=overview_map_path,
title=f"{job.request.aoi.name} — Satellite Overview",
date_range=f"{job.request.time_range.start} to {job.request.time_range.end}",
)
output_files.append(overview_map_path)
# Generate PDF report
report_path = os.path.join(results_dir, "report.pdf")
generate_pdf_report(
aoi=job.request.aoi,
time_range=job.request.time_range,
results=job.results,
output_path=report_path,
summary_map_path=summary_map_path,
product_map_paths=product_map_paths,
product_hotspot_paths=product_hotspot_paths,
overview_score=overview_score,
overview_map_path=overview_map_path if true_color_path else "",
compound_signals=compound_signals,
)
output_files.append(report_path)
# Package everything
package_path = os.path.join(results_dir, "package.zip")
create_data_package(files=output_files, output_path=package_path)
await db.update_job_status(job_id, JobStatus.COMPLETE)
# Send completion email
await send_completion_email(
to_email=job.request.email,
job_id=job_id,
aoi_name=job.request.aoi.name,
)
except Exception as e:
logger.exception(f"Job {job_id} failed: {e}")
await db.update_job_status(job_id, JobStatus.FAILED, error=str(e))
async def worker_loop(db: Database, registry: ProductRegistry) -> None:
logger.info("Background worker started")
while True:
job = await db.get_next_queued_job()
if job is not None:
logger.info(f"Processing job {job.id}")
await process_job(job.id, db, registry)
await asyncio.sleep(5)