PaleoData-Explorer / data_processor.py
benjamintia's picture
Upload folder using huggingface_hub
81f7f5d verified
Raw
History Blame Contribute Delete
8.32 kB
"""
PaleoData Explorer — Data Processor
====================================
Takes the raw JSON list returned by :mod:`api_client` and transforms it
into a clean, analysis-ready ``pandas.DataFrame``.
Key responsibilities
--------------------
1. Parse PBDB JSON records into a flat DataFrame.
2. Filter out records that lack temporal bounds (``max_ma`` / ``min_ma``)
or paleocoordinates (``paleolat`` / ``paleolng``).
3. Derive a ``middle_age`` column (``(max_ma + min_ma) / 2``) for point
plotting on temporal charts.
4. Optionally filter by a user-supplied geological time window.
5. Surface statistics (record counts at each stage) so the UI can show
the user what was discarded and why.
"""
from __future__ import annotations
import logging
from typing import Any, Dict, List, Optional, Tuple
import pandas as pd
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Constants
# ---------------------------------------------------------------------------
# PBDB raw field → canonical field name mapping.
# The PBDB API returns abbreviated keys; we normalise them here so the
# rest of the pipeline works with descriptive names.
PBDB_COLUMN_MAP: Dict[str, str] = {
"eag": "max_ma", # early age → older bound
"lag": "min_ma", # late age → younger bound
"tna": "matched_name", # taxon name
"oei": "early_interval",
"oli": "late_interval",
"pla": "paleolat",
"pln": "paleolng",
"phl": "phylum",
"cll": "class",
"odl": "order",
"fml": "family",
"gnl": "genus",
}
# Canonical field names used after renaming.
FIELDS_TEMPORAL = ("max_ma", "min_ma")
FIELDS_SPATIAL = ("paleolat", "paleolng")
FIELDS_TAXONOMIC = (
"matched_name",
"early_interval",
"late_interval",
"phylum",
"class",
"order",
"family",
"genus",
)
# ---------------------------------------------------------------------------
# Public API
# ---------------------------------------------------------------------------
def records_to_dataframe(records: List[Dict[str, Any]]) -> pd.DataFrame:
"""Convert raw PBDB occurrence records into a :class:`pandas.DataFrame`.
Parameters
----------
records : list[dict]
The list returned by :func:`api_client.fetch_occurrences`.
Returns
-------
pd.DataFrame
A DataFrame where each row is one occurrence. The columns
include all keys present in the raw JSON, subject to Pandas'
internal flattening of nested structures when possible.
Returns an empty DataFrame (0 rows) if the input list is empty.
"""
if not records:
logger.warning("records_to_dataframe called with empty list")
return pd.DataFrame()
df = pd.DataFrame(records)
logger.debug("Raw DataFrame shape: %s", df.shape)
return df
def clean_occurrence_data(
df: pd.DataFrame,
*,
min_ma: Optional[float] = None,
max_ma: Optional[float] = None,
) -> Tuple[pd.DataFrame, Dict[str, int]]:
"""Clean a raw occurrence DataFrame and optionally subset it to a time window.
Cleaning steps (in order)
-------------------------
1. Remove rows where **both** ``max_ma`` and ``min_ma`` are missing
(records without any age information are unusable).
2. Impute missing ``max_ma`` / ``min_ma`` where only one is
available by taking the available value (conservative estimate).
3. Remove rows where ``paleolat`` **or** ``paleolng`` is missing
(cannot plot on a map).
4. Derive ``middle_age`` = ``(max_ma + min_ma) / 2``.
5. If ``min_ma`` and/or ``max_ma`` are supplied, filter the
DataFrame to occurrences whose ``middle_age`` falls within the
requested window.
Parameters
----------
df : pd.DataFrame
Raw DataFrame from :func:`records_to_dataframe`.
min_ma, max_ma : float or None
Optional temporal boundaries in Ma. Only occurrences whose
``middle_age`` lies **between** *max_ma* (older bound) and
*min_ma* (younger bound) are kept.
Returns
-------
df : pd.DataFrame
Cleaned, filtered DataFrame. May be empty.
stats : dict
Counts at each pipeline stage for transparency:
``{"raw": n, "has_temporal": n, "has_spatial": n, "in_window": n}``
"""
n_raw = len(df)
stats: Dict[str, int] = {"raw": n_raw}
if df.empty:
logger.warning("Input DataFrame is empty; nothing to clean.")
stats.update(has_temporal=0, has_spatial=0, in_window=0)
return df, stats
# ---- Step 0: normalise PBDB abbreviated column names ---------------
rename_map = {k: v for k, v in PBDB_COLUMN_MAP.items() if k in df.columns}
df = df.rename(columns=rename_map)
logger.debug("Renamed %d columns: %s", len(rename_map), list(rename_map.keys()))
# ---- Step 1: require at least one temporal field -------------------
temporal_mask = df[list(FIELDS_TEMPORAL)].notna().any(axis=1)
df = df.loc[temporal_mask].copy()
stats["has_temporal"] = len(df)
logger.debug("After temporal filter: %d rows (dropped %d)",
stats["has_temporal"], n_raw - stats["has_temporal"])
# ---- Step 2: impute lone temporal values ---------------------------
# If max_ma is NaN but min_ma exists, set max_ma = min_ma (point date)
max_null = df["max_ma"].isna()
if max_null.any():
df.loc[max_null, "max_ma"] = df.loc[max_null, "min_ma"]
# If min_ma is NaN but max_ma exists, set min_ma = max_ma
min_null = df["min_ma"].isna()
if min_null.any():
df.loc[min_null, "min_ma"] = df.loc[min_null, "max_ma"]
# ---- Step 3: require both paleocoordinates -------------------------
spatial_mask = df[list(FIELDS_SPATIAL)].notna().all(axis=1)
df = df.loc[spatial_mask].copy()
stats["has_spatial"] = len(df)
logger.debug("After spatial filter: %d rows (dropped %d)",
stats["has_spatial"], stats["has_temporal"] - stats["has_spatial"])
# ---- Step 4: compute middle_age ------------------------------------
df["middle_age"] = (df["max_ma"].astype(float) + df["min_ma"].astype(float)) / 2.0
# ---- Step 5: optional time-window filter ---------------------------
if max_ma is not None or min_ma is not None:
in_window = pd.Series(True, index=df.index)
if max_ma is not None:
in_window &= df["middle_age"] <= float(max_ma)
if min_ma is not None:
in_window &= df["middle_age"] >= float(min_ma)
df = df.loc[in_window].copy()
stats["in_window"] = len(df)
logger.debug("After time-window filter: %d rows (dropped %d)",
stats["in_window"], stats["has_spatial"] - stats["in_window"])
else:
stats["in_window"] = stats["has_spatial"]
# ---- Ensure consistent float types for critical columns ------------
for col in ("max_ma", "min_ma", "middle_age", "paleolat", "paleolng"):
if col in df.columns:
df[col] = pd.to_numeric(df[col], errors="coerce")
logger.info("Cleaning pipeline done: raw=%d → has_temporal=%d → has_spatial=%d → in_window=%d",
stats["raw"], stats["has_temporal"], stats["has_spatial"], stats["in_window"])
return df, stats
def get_dataframe_statistics(df: pd.DataFrame) -> Dict[str, Any]:
"""Return a small summary dict for display in the UI.
Includes unique taxa counts, time span, and geographic extent.
"""
if df.empty:
return {"unique_taxa": 0, "time_span_ma": None}
stats = {
"record_count": len(df),
"unique_taxa": int(df["matched_name"].nunique()),
"unique_families": int(df["family"].nunique()) if "family" in df.columns else 0,
"unique_genera": int(df["genus"].nunique()) if "genus" in df.columns else 0,
"oldest_ma": float(df["middle_age"].max()),
"youngest_ma": float(df["middle_age"].min()),
"time_span_ma": float(df["middle_age"].max() - df["middle_age"].min()),
"lat_extent": float(df["paleolat"].max() - df["paleolat"].min()) if "paleolat" in df.columns else None,
"lng_extent": float(df["paleolng"].max() - df["paleolng"].min()) if "paleolng" in df.columns else None,
}
return stats