daioe / src /data_manager.py
joseph-data's picture
updated the pipeline
eec71cb unverified
"""Data manager for loading and caching pipeline results.
This module encapsulates the logic for computing the heavy data
transformations in ``pipeline.py`` and persisting the results to disk.
It adds a small amount of resilience around caching and uses
``logging`` instead of printing directly to stdout. The cache files
include a version tag to make it easy to invalidate caches when
fundamental changes are made to the pipeline logic.
"""
from __future__ import annotations
import logging
import os
import tempfile
from functools import lru_cache
from pathlib import Path
from typing import Dict
import pandas as pd
from . import config, pipeline
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Cache setup
# ---------------------------------------------------------------------------
# A version tag to embed into the cache filenames. Bump this value
# whenever the underlying ``pipeline`` logic changes in a way that
# invalidates existing caches.
CACHE_VERSION: str = "v1"
def _resolve_cache_dir() -> Path:
"""Select a writable directory for caching.
The lookup order is:
1. The ``DATA_CACHE_DIR`` environment variable, if set.
2. A ``data`` folder at the repository root.
3. A temporary directory in ``/tmp``.
Each candidate path is tested for writability by attempting to
create and delete a sentinel file. The first path that succeeds
is returned. If none succeed, a final fallback directory in ``/tmp``
is created and returned.
"""
candidates: list[Path] = []
env = os.getenv("DATA_CACHE_DIR")
if env:
candidates.append(Path(env).expanduser().resolve())
candidates.append(Path(__file__).resolve().parent.parent / "data")
candidates.append(Path(tempfile.gettempdir()) / "employment_ai_cache")
for path in candidates:
try:
path.mkdir(parents=True, exist_ok=True)
test_file = path / f".write_test_{os.getpid()}"
test_file.write_text("ok", encoding="utf-8")
test_file.unlink()
return path
except Exception:
continue
fallback = Path(tempfile.gettempdir()) / "employment_ai_cache"
fallback.mkdir(parents=True, exist_ok=True)
return fallback
DATA_DIR: Path = _resolve_cache_dir()
WEIGHTED_CACHE: Path = DATA_DIR / f"daioe_weighted_{CACHE_VERSION}.csv"
SIMPLE_CACHE: Path = DATA_DIR / f"daioe_simple_{CACHE_VERSION}.csv"
def _atomic_to_csv(df: pd.DataFrame, path: Path) -> None:
"""Write a DataFrame to CSV atomically."""
path.parent.mkdir(parents=True, exist_ok=True)
tmp_path = path.with_suffix(path.suffix + ".tmp")
df.to_csv(tmp_path, index=False)
tmp_path.replace(path)
@lru_cache(maxsize=1)
def _compute_pipeline_payload() -> Dict[str, pd.DataFrame]:
"""Runs the heavy pipeline calculation."""
results = pipeline.run_pipeline()
weighted_frames: list[pd.DataFrame] = []
simple_frames: list[pd.DataFrame] = []
for _, taxonomy in config.TAXONOMY_OPTIONS:
payload = results.get(taxonomy)
if not payload:
continue
weighted = payload.get("weighted")
simple = payload.get("simple")
if isinstance(weighted, pd.DataFrame) and not weighted.empty:
weighted_frames.append(weighted)
if isinstance(simple, pd.DataFrame) and not simple.empty:
simple_frames.append(simple)
if not weighted_frames or not simple_frames:
raise RuntimeError("Pipeline did not return weighted + simple datasets.")
return {
"weighted": pd.concat(weighted_frames, ignore_index=True),
"simple": pd.concat(simple_frames, ignore_index=True),
}
def load_payload(force_recompute: bool = False) -> Dict[str, pd.DataFrame]:
"""Load data from disk cache if available, otherwise compute and save."""
if not force_recompute and WEIGHTED_CACHE.exists() and SIMPLE_CACHE.exists():
logger.info("Loading pipeline output from cache directory %s", DATA_DIR)
try:
dtype = {"taxonomy": str, "code": str}
weighted_df = pd.read_csv(WEIGHTED_CACHE, dtype=dtype)
simple_df = pd.read_csv(SIMPLE_CACHE, dtype=dtype)
return {"weighted": weighted_df, "simple": simple_df}
except Exception as exc:
logger.warning(
"Error reading cache files %s and %s: %s; falling back to recompute",
WEIGHTED_CACHE,
SIMPLE_CACHE,
exc,
)
if force_recompute:
_compute_pipeline_payload.cache_clear()
logger.info("Computing pipeline data – this may take a while…")
payload = _compute_pipeline_payload()
try:
_atomic_to_csv(payload["weighted"], WEIGHTED_CACHE)
_atomic_to_csv(payload["simple"], SIMPLE_CACHE)
logger.info(
"Cache updated: weighted=%s, simple=%s",
WEIGHTED_CACHE.name,
SIMPLE_CACHE.name,
)
except Exception as exc:
logger.warning("Could not write cache files: %s", exc)
return payload
def load_data(force_recompute: bool = False) -> Dict[str, pd.DataFrame]:
"""Load app-ready frames per taxonomy (from cache when possible)."""
payload = load_payload(force_recompute=force_recompute)
weighted_all = payload["weighted"]
simple_all = payload["simple"]
frames: Dict[str, pd.DataFrame] = {}
for _, taxonomy in config.TAXONOMY_OPTIONS:
dfs = []
weighted = (
weighted_all[weighted_all["taxonomy"] == taxonomy]
if "taxonomy" in weighted_all.columns
else weighted_all
)
simple = (
simple_all[simple_all["taxonomy"] == taxonomy]
if "taxonomy" in simple_all.columns
else simple_all
)
if not weighted.empty:
tmp = weighted.copy()
tmp["weighting"] = "emp_weighted"
tmp["weighting_label"] = config.WEIGHTING_OPTIONS[0][0]
dfs.append(tmp)
if not simple.empty:
tmp = simple.copy()
tmp["weighting"] = "simple_avg"
tmp["weighting_label"] = config.WEIGHTING_OPTIONS[1][0]
dfs.append(tmp)
if dfs:
full = pd.concat(dfs, ignore_index=True)
if "year" in full.columns:
full["year"] = full["year"].astype(int)
if "level" in full.columns:
full["level"] = full["level"].astype(int)
frames[taxonomy] = full
if not frames:
raise RuntimeError("No aggregated DAIOE datasets could be built in-memory.")
return frames