""" PaleoData Explorer — Data Processor ==================================== Takes the raw JSON list returned by :mod:`api_client` and transforms it into a clean, analysis-ready ``pandas.DataFrame``. Key responsibilities -------------------- 1. Parse PBDB JSON records into a flat DataFrame. 2. Filter out records that lack temporal bounds (``max_ma`` / ``min_ma``) or paleocoordinates (``paleolat`` / ``paleolng``). 3. Derive a ``middle_age`` column (``(max_ma + min_ma) / 2``) for point plotting on temporal charts. 4. Optionally filter by a user-supplied geological time window. 5. Surface statistics (record counts at each stage) so the UI can show the user what was discarded and why. """ from __future__ import annotations import logging from typing import Any, Dict, List, Optional, Tuple import pandas as pd logger = logging.getLogger(__name__) # --------------------------------------------------------------------------- # Constants # --------------------------------------------------------------------------- # PBDB raw field → canonical field name mapping. # The PBDB API returns abbreviated keys; we normalise them here so the # rest of the pipeline works with descriptive names. PBDB_COLUMN_MAP: Dict[str, str] = { "eag": "max_ma", # early age → older bound "lag": "min_ma", # late age → younger bound "tna": "matched_name", # taxon name "oei": "early_interval", "oli": "late_interval", "pla": "paleolat", "pln": "paleolng", "phl": "phylum", "cll": "class", "odl": "order", "fml": "family", "gnl": "genus", } # Canonical field names used after renaming. FIELDS_TEMPORAL = ("max_ma", "min_ma") FIELDS_SPATIAL = ("paleolat", "paleolng") FIELDS_TAXONOMIC = ( "matched_name", "early_interval", "late_interval", "phylum", "class", "order", "family", "genus", ) # --------------------------------------------------------------------------- # Public API # --------------------------------------------------------------------------- def records_to_dataframe(records: List[Dict[str, Any]]) -> pd.DataFrame: """Convert raw PBDB occurrence records into a :class:`pandas.DataFrame`. Parameters ---------- records : list[dict] The list returned by :func:`api_client.fetch_occurrences`. Returns ------- pd.DataFrame A DataFrame where each row is one occurrence. The columns include all keys present in the raw JSON, subject to Pandas' internal flattening of nested structures when possible. Returns an empty DataFrame (0 rows) if the input list is empty. """ if not records: logger.warning("records_to_dataframe called with empty list") return pd.DataFrame() df = pd.DataFrame(records) logger.debug("Raw DataFrame shape: %s", df.shape) return df def clean_occurrence_data( df: pd.DataFrame, *, min_ma: Optional[float] = None, max_ma: Optional[float] = None, ) -> Tuple[pd.DataFrame, Dict[str, int]]: """Clean a raw occurrence DataFrame and optionally subset it to a time window. Cleaning steps (in order) ------------------------- 1. Remove rows where **both** ``max_ma`` and ``min_ma`` are missing (records without any age information are unusable). 2. Impute missing ``max_ma`` / ``min_ma`` where only one is available by taking the available value (conservative estimate). 3. Remove rows where ``paleolat`` **or** ``paleolng`` is missing (cannot plot on a map). 4. Derive ``middle_age`` = ``(max_ma + min_ma) / 2``. 5. If ``min_ma`` and/or ``max_ma`` are supplied, filter the DataFrame to occurrences whose ``middle_age`` falls within the requested window. Parameters ---------- df : pd.DataFrame Raw DataFrame from :func:`records_to_dataframe`. min_ma, max_ma : float or None Optional temporal boundaries in Ma. Only occurrences whose ``middle_age`` lies **between** *max_ma* (older bound) and *min_ma* (younger bound) are kept. Returns ------- df : pd.DataFrame Cleaned, filtered DataFrame. May be empty. stats : dict Counts at each pipeline stage for transparency: ``{"raw": n, "has_temporal": n, "has_spatial": n, "in_window": n}`` """ n_raw = len(df) stats: Dict[str, int] = {"raw": n_raw} if df.empty: logger.warning("Input DataFrame is empty; nothing to clean.") stats.update(has_temporal=0, has_spatial=0, in_window=0) return df, stats # ---- Step 0: normalise PBDB abbreviated column names --------------- rename_map = {k: v for k, v in PBDB_COLUMN_MAP.items() if k in df.columns} df = df.rename(columns=rename_map) logger.debug("Renamed %d columns: %s", len(rename_map), list(rename_map.keys())) # ---- Step 1: require at least one temporal field ------------------- temporal_mask = df[list(FIELDS_TEMPORAL)].notna().any(axis=1) df = df.loc[temporal_mask].copy() stats["has_temporal"] = len(df) logger.debug("After temporal filter: %d rows (dropped %d)", stats["has_temporal"], n_raw - stats["has_temporal"]) # ---- Step 2: impute lone temporal values --------------------------- # If max_ma is NaN but min_ma exists, set max_ma = min_ma (point date) max_null = df["max_ma"].isna() if max_null.any(): df.loc[max_null, "max_ma"] = df.loc[max_null, "min_ma"] # If min_ma is NaN but max_ma exists, set min_ma = max_ma min_null = df["min_ma"].isna() if min_null.any(): df.loc[min_null, "min_ma"] = df.loc[min_null, "max_ma"] # ---- Step 3: require both paleocoordinates ------------------------- spatial_mask = df[list(FIELDS_SPATIAL)].notna().all(axis=1) df = df.loc[spatial_mask].copy() stats["has_spatial"] = len(df) logger.debug("After spatial filter: %d rows (dropped %d)", stats["has_spatial"], stats["has_temporal"] - stats["has_spatial"]) # ---- Step 4: compute middle_age ------------------------------------ df["middle_age"] = (df["max_ma"].astype(float) + df["min_ma"].astype(float)) / 2.0 # ---- Step 5: optional time-window filter --------------------------- if max_ma is not None or min_ma is not None: in_window = pd.Series(True, index=df.index) if max_ma is not None: in_window &= df["middle_age"] <= float(max_ma) if min_ma is not None: in_window &= df["middle_age"] >= float(min_ma) df = df.loc[in_window].copy() stats["in_window"] = len(df) logger.debug("After time-window filter: %d rows (dropped %d)", stats["in_window"], stats["has_spatial"] - stats["in_window"]) else: stats["in_window"] = stats["has_spatial"] # ---- Ensure consistent float types for critical columns ------------ for col in ("max_ma", "min_ma", "middle_age", "paleolat", "paleolng"): if col in df.columns: df[col] = pd.to_numeric(df[col], errors="coerce") logger.info("Cleaning pipeline done: raw=%d → has_temporal=%d → has_spatial=%d → in_window=%d", stats["raw"], stats["has_temporal"], stats["has_spatial"], stats["in_window"]) return df, stats def get_dataframe_statistics(df: pd.DataFrame) -> Dict[str, Any]: """Return a small summary dict for display in the UI. Includes unique taxa counts, time span, and geographic extent. """ if df.empty: return {"unique_taxa": 0, "time_span_ma": None} stats = { "record_count": len(df), "unique_taxa": int(df["matched_name"].nunique()), "unique_families": int(df["family"].nunique()) if "family" in df.columns else 0, "unique_genera": int(df["genus"].nunique()) if "genus" in df.columns else 0, "oldest_ma": float(df["middle_age"].max()), "youngest_ma": float(df["middle_age"].min()), "time_span_ma": float(df["middle_age"].max() - df["middle_age"].min()), "lat_extent": float(df["paleolat"].max() - df["paleolat"].min()) if "paleolat" in df.columns else None, "lng_extent": float(df["paleolng"].max() - df["paleolng"].min()) if "paleolng" in df.columns else None, } return stats