Spaces:
Sleeping
Sleeping
| """Module 1: Load, clean, and augment the High Court dataset. | |
| Responsibilities: | |
| - Read CSVs with robust null handling. | |
| - Normalise key text columns (case type, stages, judge names). | |
| - Basic integrity checks (nulls, duplicates, lifecycle). | |
| - Compute core per-case hearing gap stats (mean/median/std). | |
| - Save cleaned data as Parquet for downstream modules. | |
| """ | |
| from datetime import timedelta | |
| from pathlib import Path | |
| import polars as pl | |
| from eda.config import ( | |
| CASE_FILE_PARQUET, | |
| HEARING_FILE_PARQUET, | |
| RUN_TS, | |
| VERSION, | |
| _get_cases_parquet, | |
| _get_hearings_parquet, | |
| write_metadata, | |
| ) | |
| # ------------------------------------------------------------------- | |
| # Helpers | |
| # ------------------------------------------------------------------- | |
| def _norm_text_col(df: pl.DataFrame, col: str) -> pl.DataFrame: | |
| if col not in df.columns: | |
| return df | |
| return df.with_columns( | |
| pl.when( | |
| pl.col(col) | |
| .cast(pl.Utf8) | |
| .str.strip_chars() | |
| .str.to_uppercase() | |
| .is_in(["", "NA", "N/A", "NULL", "NONE", "-", "--"]) | |
| ) | |
| .then(pl.lit(None)) | |
| .otherwise(pl.col(col).cast(pl.Utf8).str.strip_chars().str.to_uppercase()) | |
| .alias(col) | |
| ) | |
| def _null_summary(df: pl.DataFrame, name: str) -> None: | |
| print(f"\n=== Null summary ({name}) ===") | |
| n = df.height | |
| row = {"TABLE": name, "ROWS": n} | |
| for c in df.columns: | |
| row[f"{c}__nulls"] = int(df.select(pl.col(c).is_null().sum()).item()) | |
| print(row) | |
| def load_raw() -> tuple[pl.DataFrame, pl.DataFrame]: | |
| cases_path = Path(CASE_FILE_PARQUET) | |
| hearings_path = Path(HEARING_FILE_PARQUET) | |
| if not (cases_path.exists() and hearings_path.exists()): | |
| raise FileNotFoundError( | |
| "Parquet files not found. Will not proceed with loading cleaned data." | |
| ) | |
| print(f"Loading Parquet files:\n- {cases_path}\n- {hearings_path}") | |
| cases = pl.read_parquet(cases_path, low_memory=True) | |
| hearings = pl.read_parquet(hearings_path, low_memory=True) | |
| print(f"Cases shape: {cases.shape}") | |
| print(f"Hearings shape: {hearings.shape}") | |
| return cases, hearings | |
| def clean_and_augment( | |
| cases: pl.DataFrame, hearings: pl.DataFrame | |
| ) -> tuple[pl.DataFrame, pl.DataFrame]: | |
| # Standardise date columns if needed | |
| for col in ["DATE_FILED", "DECISION_DATE", "REGISTRATION_DATE", "LAST_SYNC_TIME"]: | |
| if col in cases.columns and cases[col].dtype == pl.Utf8: | |
| cases = cases.with_columns( | |
| pl.col(col).str.strptime(pl.Date, "%d-%m-%Y", strict=False) | |
| ) | |
| # Deduplicate on keys | |
| if "CNR_NUMBER" in cases.columns: | |
| cases = cases.unique(subset=["CNR_NUMBER"]) | |
| if "Hearing_ID" in hearings.columns: | |
| hearings = hearings.unique(subset=["Hearing_ID"]) | |
| # Normalise key text fields | |
| cases = _norm_text_col(cases, "CASE_TYPE") | |
| for c in [ | |
| "Remappedstages", | |
| "PurposeofHearing", | |
| "BeforeHonourableJudge", | |
| ]: | |
| hearings = _norm_text_col(hearings, c) | |
| # Simple stage canonicalisation | |
| if "Remappedstages" in hearings.columns: | |
| STAGE_MAP = { | |
| "ORDERS/JUDGMENTS": "ORDERS / JUDGMENT", | |
| "ORDER/JUDGMENT": "ORDERS / JUDGMENT", | |
| "ORDERS / JUDGMENT": "ORDERS / JUDGMENT", | |
| "ORDERS /JUDGMENT": "ORDERS / JUDGMENT", | |
| "INTERLOCUTARY APPLICATION": "INTERLOCUTORY APPLICATION", | |
| "FRAMING OF CHARGE": "FRAMING OF CHARGES", | |
| "PRE ADMISSION": "PRE-ADMISSION", | |
| } | |
| hearings = hearings.with_columns( | |
| pl.col("Remappedstages") | |
| .map_elements(lambda x: STAGE_MAP.get(x, x) if x is not None else None) | |
| .alias("Remappedstages") | |
| ) | |
| # Normalise disposal time | |
| if "DISPOSALTIME_ADJ" in cases.columns: | |
| cases = cases.with_columns(pl.col("DISPOSALTIME_ADJ").cast(pl.Int32)) | |
| # Year fields | |
| if "DATE_FILED" in cases.columns: | |
| cases = cases.with_columns( | |
| [ | |
| pl.col("DATE_FILED").dt.year().alias("YEAR_FILED"), | |
| pl.col("DECISION_DATE").dt.year().alias("YEAR_DECISION"), | |
| ] | |
| ) | |
| # Hearing counts per case | |
| if {"CNR_NUMBER", "BusinessOnDate"}.issubset(hearings.columns): | |
| hearing_freq = hearings.group_by("CNR_NUMBER").agg( | |
| pl.count("BusinessOnDate").alias("N_HEARINGS") | |
| ) | |
| cases = cases.join(hearing_freq, on="CNR_NUMBER", how="left") | |
| else: | |
| cases = cases.with_columns(pl.lit(0).alias("N_HEARINGS")) | |
| # Per-case hearing gap stats (mean/median/std, p25, p75, count) | |
| if {"CNR_NUMBER", "BusinessOnDate"}.issubset(hearings.columns): | |
| hearing_gaps = ( | |
| hearings.filter(pl.col("BusinessOnDate").is_not_null()) | |
| .sort(["CNR_NUMBER", "BusinessOnDate"]) | |
| .with_columns( | |
| ( | |
| (pl.col("BusinessOnDate") - pl.col("BusinessOnDate").shift(1)) | |
| / timedelta(days=1) | |
| ) | |
| .over("CNR_NUMBER") | |
| .alias("HEARING_GAP_DAYS") | |
| ) | |
| ) | |
| gap_stats = hearing_gaps.group_by("CNR_NUMBER").agg( | |
| [ | |
| pl.col("HEARING_GAP_DAYS").mean().alias("GAP_MEAN"), | |
| pl.col("HEARING_GAP_DAYS").median().alias("GAP_MEDIAN"), | |
| pl.col("HEARING_GAP_DAYS").quantile(0.25).alias("GAP_P25"), | |
| pl.col("HEARING_GAP_DAYS").quantile(0.75).alias("GAP_P75"), | |
| pl.col("HEARING_GAP_DAYS").std(ddof=1).alias("GAP_STD"), | |
| pl.col("HEARING_GAP_DAYS").count().alias("N_GAPS"), | |
| ] | |
| ) | |
| cases = cases.join(gap_stats, on="CNR_NUMBER", how="left") | |
| else: | |
| for col in [ | |
| "GAP_MEAN", | |
| "GAP_MEDIAN", | |
| "GAP_P25", | |
| "GAP_P75", | |
| "GAP_STD", | |
| "N_GAPS", | |
| ]: | |
| cases = cases.with_columns(pl.lit(None).alias(col)) | |
| # Fill some basics | |
| cases = cases.with_columns( | |
| [ | |
| pl.col("N_HEARINGS").fill_null(0).cast(pl.Int64), | |
| pl.col("GAP_MEDIAN").fill_null(0.0).cast(pl.Float64), | |
| ] | |
| ) | |
| # Print audits | |
| print("\n=== dtypes (cases) ===") | |
| print(cases.dtypes) | |
| print("\n=== dtypes (hearings) ===") | |
| print(hearings.dtypes) | |
| _null_summary(cases, "cases") | |
| _null_summary(hearings, "hearings") | |
| # Simple lifecycle consistency check | |
| if {"DATE_FILED", "DECISION_DATE"}.issubset( | |
| cases.columns | |
| ) and "BusinessOnDate" in hearings.columns: | |
| h2 = hearings.join( | |
| cases.select(["CNR_NUMBER", "DATE_FILED", "DECISION_DATE"]), | |
| on="CNR_NUMBER", | |
| how="left", | |
| ) | |
| before_filed = h2.filter( | |
| pl.col("BusinessOnDate").is_not_null() | |
| & pl.col("DATE_FILED").is_not_null() | |
| & (pl.col("BusinessOnDate") < pl.col("DATE_FILED")) | |
| ) | |
| after_decision = h2.filter( | |
| pl.col("BusinessOnDate").is_not_null() | |
| & pl.col("DECISION_DATE").is_not_null() | |
| & (pl.col("BusinessOnDate") > pl.col("DECISION_DATE")) | |
| ) | |
| print( | |
| "Hearings before filing:", | |
| before_filed.height, | |
| "| after decision:", | |
| after_decision.height, | |
| ) | |
| return cases, hearings | |
| def save_clean(cases: pl.DataFrame, hearings: pl.DataFrame) -> None: | |
| cases.write_parquet(str(_get_cases_parquet())) | |
| hearings.write_parquet(str(_get_hearings_parquet())) | |
| print(f"Saved cleaned cases -> {str(_get_cases_parquet())}") | |
| print(f"Saved cleaned hearings -> {str(_get_hearings_parquet())}") | |
| meta = { | |
| "version": VERSION, | |
| "timestamp": RUN_TS, | |
| "cases_shape": list(cases.shape), | |
| "hearings_shape": list(hearings.shape), | |
| "cases_columns": cases.columns, | |
| "hearings_columns": hearings.columns, | |
| } | |
| write_metadata(meta) | |
| def run_load_and_clean() -> None: | |
| cases_raw, hearings_raw = load_raw() | |
| cases_clean, hearings_clean = clean_and_augment(cases_raw, hearings_raw) | |
| del cases_raw, hearings_raw | |
| save_clean(cases_clean, hearings_clean) | |
| if __name__ == "__main__": | |
| run_load_and_clean() | |