Spaces:
Sleeping
Sleeping
| """Module 2: Visual and descriptive EDA. | |
| Responsibilities: | |
| - Case type distribution, filing trends, disposal distribution. | |
| - Hearing gap distributions by type. | |
| - Stage transition Sankey & stage bottlenecks. | |
| - Cohorts by filing year. | |
| - Seasonality and monthly anomalies. | |
| - Judge and courtroom workload. | |
| - Purpose tags and stage frequency. | |
| Inputs: | |
| - Cleaned Parquet from eda_load_clean. | |
| Outputs: | |
| - Interactive HTML plots in FIGURES_DIR and versioned copies in RUN_DIR. | |
| - Some CSV summaries (e.g., stage_duration.csv, transitions.csv, monthly_anomalies.csv). | |
| """ | |
| from datetime import timedelta | |
| import pandas as pd | |
| import plotly.express as px | |
| import plotly.graph_objects as go | |
| import plotly.io as pio | |
| import polars as pl | |
| from src.eda_config import ( | |
| CASES_CLEAN_PARQUET, | |
| FIGURES_DIR, | |
| HEARINGS_CLEAN_PARQUET, | |
| RUN_DIR, | |
| copy_to_versioned, | |
| ) | |
| pio.renderers.default = "browser" | |
| def load_cleaned(): | |
| cases = pl.read_parquet(CASES_CLEAN_PARQUET) | |
| hearings = pl.read_parquet(HEARINGS_CLEAN_PARQUET) | |
| print("Loaded cleaned data for exploration") | |
| print("Cases:", cases.shape, "Hearings:", hearings.shape) | |
| return cases, hearings | |
| def run_exploration() -> None: | |
| cases, hearings = load_cleaned() | |
| cases_pd = cases.to_pandas() | |
| hearings_pd = hearings.to_pandas() | |
| # -------------------------------------------------- | |
| # 1. Case Type Distribution | |
| # -------------------------------------------------- | |
| fig1 = px.bar( | |
| cases_pd, | |
| x="CASE_TYPE", | |
| color="CASE_TYPE", | |
| title="Case Type Distribution", | |
| ) | |
| fig1.update_layout(showlegend=False, xaxis_title="Case Type", yaxis_title="Number of Cases") | |
| f1 = "1_case_type_distribution.html" | |
| fig1.write_html(FIGURES_DIR / f1) | |
| copy_to_versioned(f1) | |
| # -------------------------------------------------- | |
| # 2. Filing Trends by Year | |
| # -------------------------------------------------- | |
| if "YEAR_FILED" in cases_pd.columns: | |
| year_counts = cases_pd.groupby("YEAR_FILED")["CNR_NUMBER"].count().reset_index(name="Count") | |
| fig2 = px.line( | |
| year_counts, x="YEAR_FILED", y="Count", markers=True, title="Cases Filed by Year" | |
| ) | |
| fig2.update_traces(line_color="royalblue") | |
| fig2.update_layout(xaxis=dict(rangeslider=dict(visible=True))) | |
| f2 = "2_cases_filed_by_year.html" | |
| fig2.write_html(FIGURES_DIR / f2) | |
| copy_to_versioned(f2) | |
| # -------------------------------------------------- | |
| # 3. Disposal Duration Distribution | |
| # -------------------------------------------------- | |
| if "DISPOSALTIME_ADJ" in cases_pd.columns: | |
| fig3 = px.histogram( | |
| cases_pd, | |
| x="DISPOSALTIME_ADJ", | |
| nbins=50, | |
| title="Distribution of Disposal Time (Adjusted Days)", | |
| color_discrete_sequence=["indianred"], | |
| ) | |
| fig3.update_layout(xaxis_title="Days", yaxis_title="Cases") | |
| f3 = "3_disposal_time_distribution.html" | |
| fig3.write_html(FIGURES_DIR / f3) | |
| copy_to_versioned(f3) | |
| # -------------------------------------------------- | |
| # 4. Hearings vs Disposal Time | |
| # -------------------------------------------------- | |
| if {"N_HEARINGS", "DISPOSALTIME_ADJ"}.issubset(cases_pd.columns): | |
| fig4 = px.scatter( | |
| cases_pd, | |
| x="N_HEARINGS", | |
| y="DISPOSALTIME_ADJ", | |
| color="CASE_TYPE", | |
| hover_data=["CNR_NUMBER", "YEAR_FILED"], | |
| title="Hearings vs Disposal Duration", | |
| ) | |
| fig4.update_traces(marker=dict(size=6, opacity=0.7)) | |
| f4 = "4_hearings_vs_disposal.html" | |
| fig4.write_html(FIGURES_DIR / f4) | |
| copy_to_versioned(f4) | |
| # -------------------------------------------------- | |
| # 5. Boxplot by Case Type | |
| # -------------------------------------------------- | |
| fig5 = px.box( | |
| cases_pd, | |
| x="CASE_TYPE", | |
| y="DISPOSALTIME_ADJ", | |
| color="CASE_TYPE", | |
| title="Disposal Time (Adjusted) by Case Type", | |
| ) | |
| fig5.update_layout(showlegend=False) | |
| f5 = "5_box_disposal_by_type.html" | |
| fig5.write_html(FIGURES_DIR / f5) | |
| copy_to_versioned(f5) | |
| # -------------------------------------------------- | |
| # 6. Stage Frequency | |
| # -------------------------------------------------- | |
| if "Remappedstages" in hearings_pd.columns: | |
| stage_counts = hearings_pd["Remappedstages"].value_counts().reset_index() | |
| stage_counts.columns = ["Stage", "Count"] | |
| fig6 = px.bar( | |
| stage_counts, | |
| x="Stage", | |
| y="Count", | |
| color="Stage", | |
| title="Frequency of Hearing Stages", | |
| ) | |
| fig6.update_layout(showlegend=False, xaxis_title="Stage", yaxis_title="Count") | |
| f6 = "6_stage_frequency.html" | |
| fig6.write_html(FIGURES_DIR / f6) | |
| copy_to_versioned(f6) | |
| # -------------------------------------------------- | |
| # 7. Gap median by case type | |
| # -------------------------------------------------- | |
| if "GAP_MEDIAN" in cases_pd.columns: | |
| fig_gap = px.box( | |
| cases_pd, | |
| x="CASE_TYPE", | |
| y="GAP_MEDIAN", | |
| points=False, | |
| title="Median Hearing Gap by Case Type", | |
| ) | |
| fg = "9_gap_median_by_type.html" | |
| fig_gap.write_html(FIGURES_DIR / fg) | |
| copy_to_versioned(fg) | |
| # -------------------------------------------------- | |
| # 8. Stage transitions & bottleneck plot | |
| # -------------------------------------------------- | |
| stage_col = "Remappedstages" if "Remappedstages" in hearings.columns else None | |
| transitions = None | |
| stage_duration = None | |
| if stage_col and "BusinessOnDate" in hearings.columns: | |
| STAGE_ORDER = [ | |
| "PRE-ADMISSION", | |
| "ADMISSION", | |
| "FRAMING OF CHARGES", | |
| "EVIDENCE", | |
| "ARGUMENTS", | |
| "INTERLOCUTORY APPLICATION", | |
| "SETTLEMENT", | |
| "ORDERS / JUDGMENT", | |
| "FINAL DISPOSAL", | |
| "OTHER", | |
| "NA", | |
| ] | |
| order_idx = {s: i for i, s in enumerate(STAGE_ORDER)} | |
| h_stage = ( | |
| hearings.filter(pl.col("BusinessOnDate").is_not_null()) | |
| .sort(["CNR_NUMBER", "BusinessOnDate"]) | |
| .with_columns( | |
| [ | |
| pl.col(stage_col) | |
| .fill_null("NA") | |
| .map_elements( | |
| lambda s: s if s in STAGE_ORDER else ("OTHER" if s is not None else "NA") | |
| ) | |
| .alias("STAGE"), | |
| pl.col("BusinessOnDate").alias("DT"), | |
| ] | |
| ) | |
| .with_columns( | |
| [ | |
| (pl.col("STAGE") != pl.col("STAGE").shift(1)) | |
| .over("CNR_NUMBER") | |
| .alias("STAGE_CHANGE"), | |
| ] | |
| ) | |
| ) | |
| transitions_raw = ( | |
| h_stage.with_columns( | |
| [ | |
| pl.col("STAGE").alias("STAGE_FROM"), | |
| pl.col("STAGE").shift(-1).over("CNR_NUMBER").alias("STAGE_TO"), | |
| ] | |
| ) | |
| .filter(pl.col("STAGE_TO").is_not_null()) | |
| .group_by(["STAGE_FROM", "STAGE_TO"]) | |
| .agg(pl.len().alias("N")) | |
| ) | |
| transitions = transitions_raw.filter( | |
| pl.col("STAGE_FROM").map_elements(lambda s: order_idx.get(s, 10)) | |
| <= pl.col("STAGE_TO").map_elements(lambda s: order_idx.get(s, 10)) | |
| ).sort("N", descending=True) | |
| transitions.write_csv(RUN_DIR / "transitions.csv") | |
| runs = ( | |
| h_stage.with_columns( | |
| [ | |
| pl.when(pl.col("STAGE_CHANGE")) | |
| .then(1) | |
| .otherwise(0) | |
| .cum_sum() | |
| .over("CNR_NUMBER") | |
| .alias("RUN_ID") | |
| ] | |
| ) | |
| .group_by(["CNR_NUMBER", "STAGE", "RUN_ID"]) | |
| .agg( | |
| [ | |
| pl.col("DT").min().alias("RUN_START"), | |
| pl.col("DT").max().alias("RUN_END"), | |
| pl.len().alias("HEARINGS_IN_RUN"), | |
| ] | |
| ) | |
| .with_columns( | |
| ((pl.col("RUN_END") - pl.col("RUN_START")) / timedelta(days=1)).alias("RUN_DAYS") | |
| ) | |
| ) | |
| stage_duration = ( | |
| runs.group_by("STAGE") | |
| .agg( | |
| [ | |
| pl.col("RUN_DAYS").median().alias("RUN_MEDIAN_DAYS"), | |
| pl.col("RUN_DAYS").mean().alias("RUN_MEAN_DAYS"), | |
| pl.col("HEARINGS_IN_RUN").median().alias("HEARINGS_PER_RUN_MED"), | |
| pl.len().alias("N_RUNS"), | |
| ] | |
| ) | |
| .sort("RUN_MEDIAN_DAYS", descending=True) | |
| ) | |
| stage_duration.write_csv(RUN_DIR / "stage_duration.csv") | |
| # Sankey | |
| try: | |
| tr_df = transitions.to_pandas() | |
| labels = [ | |
| s | |
| for s in STAGE_ORDER | |
| if s in set(tr_df["STAGE_FROM"]).union(set(tr_df["STAGE_TO"])) | |
| ] | |
| idx = {l: i for i, l in enumerate(labels)} | |
| tr_df = tr_df[tr_df["STAGE_FROM"].isin(labels) & tr_df["STAGE_TO"].isin(labels)].copy() | |
| tr_df = tr_df.sort_values(by=["STAGE_FROM", "STAGE_TO"], key=lambda c: c.map(idx)) | |
| sankey = go.Figure( | |
| data=[ | |
| go.Sankey( | |
| arrangement="snap", | |
| node=dict(label=labels, pad=15, thickness=18), | |
| link=dict( | |
| source=tr_df["STAGE_FROM"].map(idx).tolist(), | |
| target=tr_df["STAGE_TO"].map(idx).tolist(), | |
| value=tr_df["N"].tolist(), | |
| ), | |
| ) | |
| ] | |
| ) | |
| sankey.update_layout(title_text="Stage Transition Sankey (Ordered)") | |
| f10 = "10_stage_transition_sankey.html" | |
| sankey.write_html(FIGURES_DIR / f10) | |
| copy_to_versioned(f10) | |
| except Exception as e: | |
| print("Sankey error:", e) | |
| # Bottleneck impact | |
| try: | |
| st_pd = stage_duration.with_columns( | |
| (pl.col("RUN_MEDIAN_DAYS") * pl.col("N_RUNS")).alias("IMPACT") | |
| ).to_pandas() | |
| fig_b = px.bar( | |
| st_pd.sort_values("IMPACT", ascending=False), | |
| x="STAGE", | |
| y="IMPACT", | |
| title="Stage Bottleneck Impact (Median Days x Runs)", | |
| ) | |
| fb = "15_bottleneck_impact.html" | |
| fig_b.write_html(FIGURES_DIR / fb) | |
| copy_to_versioned(fb) | |
| except Exception as e: | |
| print("Bottleneck plot error:", e) | |
| # -------------------------------------------------- | |
| # 9. Monthly seasonality and anomalies | |
| # -------------------------------------------------- | |
| if "BusinessOnDate" in hearings.columns: | |
| m_hear = ( | |
| hearings.filter(pl.col("BusinessOnDate").is_not_null()) | |
| .with_columns( | |
| [ | |
| pl.col("BusinessOnDate").dt.year().alias("Y"), | |
| pl.col("BusinessOnDate").dt.month().alias("M"), | |
| ] | |
| ) | |
| .with_columns(pl.date(pl.col("Y"), pl.col("M"), pl.lit(1)).alias("YM")) | |
| ) | |
| monthly_listings = m_hear.group_by("YM").agg(pl.len().alias("N_HEARINGS")).sort("YM") | |
| monthly_listings.write_csv(RUN_DIR / "monthly_hearings.csv") | |
| try: | |
| fig_m = px.line( | |
| monthly_listings.to_pandas(), | |
| x="YM", | |
| y="N_HEARINGS", | |
| title="Monthly Hearings Listed", | |
| ) | |
| fig_m.update_layout(yaxis=dict(tickformat=",d")) | |
| fm = "11_monthly_hearings.html" | |
| fig_m.write_html(FIGURES_DIR / fm) | |
| copy_to_versioned(fm) | |
| except Exception as e: | |
| print("Monthly listings error:", e) | |
| # Waterfall + anomalies | |
| try: | |
| ml = monthly_listings.with_columns( | |
| [ | |
| pl.col("N_HEARINGS").shift(1).alias("PREV"), | |
| (pl.col("N_HEARINGS") - pl.col("N_HEARINGS").shift(1)).alias("DELTA"), | |
| ] | |
| ) | |
| ml_pd = ml.to_pandas() | |
| ml_pd["ROLL_MEAN"] = ml_pd["N_HEARINGS"].rolling(window=12, min_periods=6).mean() | |
| ml_pd["ROLL_STD"] = ml_pd["N_HEARINGS"].rolling(window=12, min_periods=6).std() | |
| ml_pd["Z"] = (ml_pd["N_HEARINGS"] - ml_pd["ROLL_MEAN"]) / ml_pd["ROLL_STD"] | |
| ml_pd["ANOM"] = ml_pd["Z"].abs() >= 3.0 | |
| measures = ["relative"] * len(ml_pd) | |
| measures[0] = "absolute" | |
| y_vals = ml_pd["DELTA"].astype(float).fillna(ml_pd["N_HEARINGS"].astype(float)).tolist() | |
| fig_w = go.Figure( | |
| go.Waterfall( | |
| x=ml_pd["YM"], | |
| measure=measures, | |
| y=y_vals, | |
| text=[f"{int(v):,}" if pd.notnull(v) else "" for v in ml_pd["N_HEARINGS"]], | |
| increasing=dict(marker=dict(color="seagreen")), | |
| decreasing=dict(marker=dict(color="indianred")), | |
| connector={"line": {"color": "rgb(110,110,110)"}}, | |
| ) | |
| ) | |
| fig_w.add_trace( | |
| go.Scatter( | |
| x=ml_pd.loc[ml_pd["ANOM"], "YM"], | |
| y=ml_pd.loc[ml_pd["ANOM"], "N_HEARINGS"], | |
| mode="markers", | |
| marker=dict(color="crimson", size=8), | |
| name="Anomaly (|z|>=3)", | |
| ) | |
| ) | |
| fig_w.update_layout( | |
| title="Monthly Hearings Waterfall (MoM change) with Anomalies", | |
| yaxis=dict(tickformat=",d"), | |
| ) | |
| fw = "11b_monthly_waterfall.html" | |
| fig_w.write_html(FIGURES_DIR / fw) | |
| copy_to_versioned(fw) | |
| ml_pd_out = ml_pd.copy() | |
| ml_pd_out["YM"] = ml_pd_out["YM"].astype(str) | |
| ml_pd_out.to_csv(RUN_DIR / "monthly_anomalies.csv", index=False) | |
| except Exception as e: | |
| print("Monthly waterfall error:", e) | |
| # -------------------------------------------------- | |
| # 10. Judge and court workload | |
| # -------------------------------------------------- | |
| judge_col = None | |
| for c in [ | |
| "BeforeHonourableJudge", | |
| "Before Hon'ble Judges", | |
| "Before_Honble_Judges", | |
| "NJDG_JUDGE_NAME", | |
| ]: | |
| if c in hearings.columns: | |
| judge_col = c | |
| break | |
| if judge_col and "BusinessOnDate" in hearings.columns: | |
| jday = ( | |
| hearings.filter(pl.col("BusinessOnDate").is_not_null()) | |
| .group_by([judge_col, "BusinessOnDate"]) | |
| .agg(pl.len().alias("N_HEARINGS")) | |
| ) | |
| try: | |
| fig_j = px.box( | |
| jday.to_pandas(), | |
| x=judge_col, | |
| y="N_HEARINGS", | |
| title="Per-day Hearings per Judge", | |
| ) | |
| fig_j.update_layout( | |
| xaxis={"categoryorder": "total descending"}, yaxis=dict(tickformat=",d") | |
| ) | |
| fj = "12_judge_day_load.html" | |
| fig_j.write_html(FIGURES_DIR / fj) | |
| copy_to_versioned(fj) | |
| except Exception as e: | |
| print("Judge workload error:", e) | |
| court_col = None | |
| for cc in ["COURT_NUMBER", "CourtName"]: | |
| if cc in hearings.columns: | |
| court_col = cc | |
| break | |
| if court_col and "BusinessOnDate" in hearings.columns: | |
| cday = ( | |
| hearings.filter(pl.col("BusinessOnDate").is_not_null()) | |
| .group_by([court_col, "BusinessOnDate"]) | |
| .agg(pl.len().alias("N_HEARINGS")) | |
| ) | |
| try: | |
| fig_court = px.box( | |
| cday.to_pandas(), | |
| x=court_col, | |
| y="N_HEARINGS", | |
| title="Per-day Hearings per Courtroom", | |
| ) | |
| fig_court.update_layout( | |
| xaxis={"categoryorder": "total descending"}, yaxis=dict(tickformat=",d") | |
| ) | |
| fc = "12b_court_day_load.html" | |
| fig_court.write_html(FIGURES_DIR / fc) | |
| copy_to_versioned(fc) | |
| except Exception as e: | |
| print("Court workload error:", e) | |
| # -------------------------------------------------- | |
| # 11. Purpose tagging distributions | |
| # -------------------------------------------------- | |
| text_col = None | |
| for c in ["PurposeofHearing", "Purpose of Hearing", "PURPOSE_OF_HEARING"]: | |
| if c in hearings.columns: | |
| text_col = c | |
| break | |
| def _has_kw_expr(col: str, kws: list[str]): | |
| expr = None | |
| for k in kws: | |
| e = pl.col(col).str.contains(k) | |
| expr = e if expr is None else (expr | e) | |
| return (expr if expr is not None else pl.lit(False)).fill_null(False) | |
| if text_col: | |
| hear_txt = hearings.with_columns( | |
| pl.col(text_col).cast(pl.Utf8).str.strip_chars().str.to_uppercase().alias("PURPOSE_TXT") | |
| ) | |
| async_kw = ["NON-COMPLIANCE", "OFFICE OBJECTION", "COMPLIANCE", "NOTICE", "SERVICE"] | |
| subs_kw = ["EVIDENCE", "ARGUMENT", "FINAL HEARING", "JUDGMENT", "ORDER", "DISPOSAL"] | |
| hear_txt = hear_txt.with_columns( | |
| pl.when(_has_kw_expr("PURPOSE_TXT", async_kw)) | |
| .then(pl.lit("ASYNC_OR_ADMIN")) | |
| .when(_has_kw_expr("PURPOSE_TXT", subs_kw)) | |
| .then(pl.lit("SUBSTANTIVE")) | |
| .otherwise(pl.lit("UNKNOWN")) | |
| .alias("PURPOSE_TAG") | |
| ) | |
| tag_share = ( | |
| hear_txt.group_by(["CASE_TYPE", "PURPOSE_TAG"]) | |
| .agg(pl.len().alias("N")) | |
| .with_columns((pl.col("N") / pl.col("N").sum().over("CASE_TYPE")).alias("SHARE")) | |
| .sort(["CASE_TYPE", "SHARE"], descending=[False, True]) | |
| ) | |
| tag_share.write_csv(RUN_DIR / "purpose_tag_shares.csv") | |
| try: | |
| fig_t = px.bar( | |
| tag_share.to_pandas(), | |
| x="CASE_TYPE", | |
| y="SHARE", | |
| color="PURPOSE_TAG", | |
| title="Purpose Tag Shares by Case Type", | |
| barmode="stack", | |
| ) | |
| ft = "14_purpose_tag_shares.html" | |
| fig_t.write_html(FIGURES_DIR / ft) | |
| copy_to_versioned(ft) | |
| except Exception as e: | |
| print("Purpose shares error:", e) | |
| if __name__ == "__main__": | |
| run_exploration() | |