hackathon_code4change / src /eda_exploration.py
RoyAalekh's picture
WIP
4ffade4
raw
history blame
18.6 kB
"""Module 2: Visual and descriptive EDA.
Responsibilities:
- Case type distribution, filing trends, disposal distribution.
- Hearing gap distributions by type.
- Stage transition Sankey & stage bottlenecks.
- Cohorts by filing year.
- Seasonality and monthly anomalies.
- Judge and courtroom workload.
- Purpose tags and stage frequency.
Inputs:
- Cleaned Parquet from eda_load_clean.
Outputs:
- Interactive HTML plots in FIGURES_DIR and versioned copies in RUN_DIR.
- Some CSV summaries (e.g., stage_duration.csv, transitions.csv, monthly_anomalies.csv).
"""
from datetime import timedelta
import pandas as pd
import plotly.express as px
import plotly.graph_objects as go
import plotly.io as pio
import polars as pl
from src.eda_config import (
CASES_CLEAN_PARQUET,
FIGURES_DIR,
HEARINGS_CLEAN_PARQUET,
RUN_DIR,
copy_to_versioned,
)
pio.renderers.default = "browser"
def load_cleaned():
cases = pl.read_parquet(CASES_CLEAN_PARQUET)
hearings = pl.read_parquet(HEARINGS_CLEAN_PARQUET)
print("Loaded cleaned data for exploration")
print("Cases:", cases.shape, "Hearings:", hearings.shape)
return cases, hearings
def run_exploration() -> None:
cases, hearings = load_cleaned()
cases_pd = cases.to_pandas()
hearings_pd = hearings.to_pandas()
# --------------------------------------------------
# 1. Case Type Distribution
# --------------------------------------------------
fig1 = px.bar(
cases_pd,
x="CASE_TYPE",
color="CASE_TYPE",
title="Case Type Distribution",
)
fig1.update_layout(showlegend=False, xaxis_title="Case Type", yaxis_title="Number of Cases")
f1 = "1_case_type_distribution.html"
fig1.write_html(FIGURES_DIR / f1)
copy_to_versioned(f1)
# --------------------------------------------------
# 2. Filing Trends by Year
# --------------------------------------------------
if "YEAR_FILED" in cases_pd.columns:
year_counts = cases_pd.groupby("YEAR_FILED")["CNR_NUMBER"].count().reset_index(name="Count")
fig2 = px.line(
year_counts, x="YEAR_FILED", y="Count", markers=True, title="Cases Filed by Year"
)
fig2.update_traces(line_color="royalblue")
fig2.update_layout(xaxis=dict(rangeslider=dict(visible=True)))
f2 = "2_cases_filed_by_year.html"
fig2.write_html(FIGURES_DIR / f2)
copy_to_versioned(f2)
# --------------------------------------------------
# 3. Disposal Duration Distribution
# --------------------------------------------------
if "DISPOSALTIME_ADJ" in cases_pd.columns:
fig3 = px.histogram(
cases_pd,
x="DISPOSALTIME_ADJ",
nbins=50,
title="Distribution of Disposal Time (Adjusted Days)",
color_discrete_sequence=["indianred"],
)
fig3.update_layout(xaxis_title="Days", yaxis_title="Cases")
f3 = "3_disposal_time_distribution.html"
fig3.write_html(FIGURES_DIR / f3)
copy_to_versioned(f3)
# --------------------------------------------------
# 4. Hearings vs Disposal Time
# --------------------------------------------------
if {"N_HEARINGS", "DISPOSALTIME_ADJ"}.issubset(cases_pd.columns):
fig4 = px.scatter(
cases_pd,
x="N_HEARINGS",
y="DISPOSALTIME_ADJ",
color="CASE_TYPE",
hover_data=["CNR_NUMBER", "YEAR_FILED"],
title="Hearings vs Disposal Duration",
)
fig4.update_traces(marker=dict(size=6, opacity=0.7))
f4 = "4_hearings_vs_disposal.html"
fig4.write_html(FIGURES_DIR / f4)
copy_to_versioned(f4)
# --------------------------------------------------
# 5. Boxplot by Case Type
# --------------------------------------------------
fig5 = px.box(
cases_pd,
x="CASE_TYPE",
y="DISPOSALTIME_ADJ",
color="CASE_TYPE",
title="Disposal Time (Adjusted) by Case Type",
)
fig5.update_layout(showlegend=False)
f5 = "5_box_disposal_by_type.html"
fig5.write_html(FIGURES_DIR / f5)
copy_to_versioned(f5)
# --------------------------------------------------
# 6. Stage Frequency
# --------------------------------------------------
if "Remappedstages" in hearings_pd.columns:
stage_counts = hearings_pd["Remappedstages"].value_counts().reset_index()
stage_counts.columns = ["Stage", "Count"]
fig6 = px.bar(
stage_counts,
x="Stage",
y="Count",
color="Stage",
title="Frequency of Hearing Stages",
)
fig6.update_layout(showlegend=False, xaxis_title="Stage", yaxis_title="Count")
f6 = "6_stage_frequency.html"
fig6.write_html(FIGURES_DIR / f6)
copy_to_versioned(f6)
# --------------------------------------------------
# 7. Gap median by case type
# --------------------------------------------------
if "GAP_MEDIAN" in cases_pd.columns:
fig_gap = px.box(
cases_pd,
x="CASE_TYPE",
y="GAP_MEDIAN",
points=False,
title="Median Hearing Gap by Case Type",
)
fg = "9_gap_median_by_type.html"
fig_gap.write_html(FIGURES_DIR / fg)
copy_to_versioned(fg)
# --------------------------------------------------
# 8. Stage transitions & bottleneck plot
# --------------------------------------------------
stage_col = "Remappedstages" if "Remappedstages" in hearings.columns else None
transitions = None
stage_duration = None
if stage_col and "BusinessOnDate" in hearings.columns:
STAGE_ORDER = [
"PRE-ADMISSION",
"ADMISSION",
"FRAMING OF CHARGES",
"EVIDENCE",
"ARGUMENTS",
"INTERLOCUTORY APPLICATION",
"SETTLEMENT",
"ORDERS / JUDGMENT",
"FINAL DISPOSAL",
"OTHER",
"NA",
]
order_idx = {s: i for i, s in enumerate(STAGE_ORDER)}
h_stage = (
hearings.filter(pl.col("BusinessOnDate").is_not_null())
.sort(["CNR_NUMBER", "BusinessOnDate"])
.with_columns(
[
pl.col(stage_col)
.fill_null("NA")
.map_elements(
lambda s: s if s in STAGE_ORDER else ("OTHER" if s is not None else "NA")
)
.alias("STAGE"),
pl.col("BusinessOnDate").alias("DT"),
]
)
.with_columns(
[
(pl.col("STAGE") != pl.col("STAGE").shift(1))
.over("CNR_NUMBER")
.alias("STAGE_CHANGE"),
]
)
)
transitions_raw = (
h_stage.with_columns(
[
pl.col("STAGE").alias("STAGE_FROM"),
pl.col("STAGE").shift(-1).over("CNR_NUMBER").alias("STAGE_TO"),
]
)
.filter(pl.col("STAGE_TO").is_not_null())
.group_by(["STAGE_FROM", "STAGE_TO"])
.agg(pl.len().alias("N"))
)
transitions = transitions_raw.filter(
pl.col("STAGE_FROM").map_elements(lambda s: order_idx.get(s, 10))
<= pl.col("STAGE_TO").map_elements(lambda s: order_idx.get(s, 10))
).sort("N", descending=True)
transitions.write_csv(RUN_DIR / "transitions.csv")
runs = (
h_stage.with_columns(
[
pl.when(pl.col("STAGE_CHANGE"))
.then(1)
.otherwise(0)
.cum_sum()
.over("CNR_NUMBER")
.alias("RUN_ID")
]
)
.group_by(["CNR_NUMBER", "STAGE", "RUN_ID"])
.agg(
[
pl.col("DT").min().alias("RUN_START"),
pl.col("DT").max().alias("RUN_END"),
pl.len().alias("HEARINGS_IN_RUN"),
]
)
.with_columns(
((pl.col("RUN_END") - pl.col("RUN_START")) / timedelta(days=1)).alias("RUN_DAYS")
)
)
stage_duration = (
runs.group_by("STAGE")
.agg(
[
pl.col("RUN_DAYS").median().alias("RUN_MEDIAN_DAYS"),
pl.col("RUN_DAYS").mean().alias("RUN_MEAN_DAYS"),
pl.col("HEARINGS_IN_RUN").median().alias("HEARINGS_PER_RUN_MED"),
pl.len().alias("N_RUNS"),
]
)
.sort("RUN_MEDIAN_DAYS", descending=True)
)
stage_duration.write_csv(RUN_DIR / "stage_duration.csv")
# Sankey
try:
tr_df = transitions.to_pandas()
labels = [
s
for s in STAGE_ORDER
if s in set(tr_df["STAGE_FROM"]).union(set(tr_df["STAGE_TO"]))
]
idx = {l: i for i, l in enumerate(labels)}
tr_df = tr_df[tr_df["STAGE_FROM"].isin(labels) & tr_df["STAGE_TO"].isin(labels)].copy()
tr_df = tr_df.sort_values(by=["STAGE_FROM", "STAGE_TO"], key=lambda c: c.map(idx))
sankey = go.Figure(
data=[
go.Sankey(
arrangement="snap",
node=dict(label=labels, pad=15, thickness=18),
link=dict(
source=tr_df["STAGE_FROM"].map(idx).tolist(),
target=tr_df["STAGE_TO"].map(idx).tolist(),
value=tr_df["N"].tolist(),
),
)
]
)
sankey.update_layout(title_text="Stage Transition Sankey (Ordered)")
f10 = "10_stage_transition_sankey.html"
sankey.write_html(FIGURES_DIR / f10)
copy_to_versioned(f10)
except Exception as e:
print("Sankey error:", e)
# Bottleneck impact
try:
st_pd = stage_duration.with_columns(
(pl.col("RUN_MEDIAN_DAYS") * pl.col("N_RUNS")).alias("IMPACT")
).to_pandas()
fig_b = px.bar(
st_pd.sort_values("IMPACT", ascending=False),
x="STAGE",
y="IMPACT",
title="Stage Bottleneck Impact (Median Days x Runs)",
)
fb = "15_bottleneck_impact.html"
fig_b.write_html(FIGURES_DIR / fb)
copy_to_versioned(fb)
except Exception as e:
print("Bottleneck plot error:", e)
# --------------------------------------------------
# 9. Monthly seasonality and anomalies
# --------------------------------------------------
if "BusinessOnDate" in hearings.columns:
m_hear = (
hearings.filter(pl.col("BusinessOnDate").is_not_null())
.with_columns(
[
pl.col("BusinessOnDate").dt.year().alias("Y"),
pl.col("BusinessOnDate").dt.month().alias("M"),
]
)
.with_columns(pl.date(pl.col("Y"), pl.col("M"), pl.lit(1)).alias("YM"))
)
monthly_listings = m_hear.group_by("YM").agg(pl.len().alias("N_HEARINGS")).sort("YM")
monthly_listings.write_csv(RUN_DIR / "monthly_hearings.csv")
try:
fig_m = px.line(
monthly_listings.to_pandas(),
x="YM",
y="N_HEARINGS",
title="Monthly Hearings Listed",
)
fig_m.update_layout(yaxis=dict(tickformat=",d"))
fm = "11_monthly_hearings.html"
fig_m.write_html(FIGURES_DIR / fm)
copy_to_versioned(fm)
except Exception as e:
print("Monthly listings error:", e)
# Waterfall + anomalies
try:
ml = monthly_listings.with_columns(
[
pl.col("N_HEARINGS").shift(1).alias("PREV"),
(pl.col("N_HEARINGS") - pl.col("N_HEARINGS").shift(1)).alias("DELTA"),
]
)
ml_pd = ml.to_pandas()
ml_pd["ROLL_MEAN"] = ml_pd["N_HEARINGS"].rolling(window=12, min_periods=6).mean()
ml_pd["ROLL_STD"] = ml_pd["N_HEARINGS"].rolling(window=12, min_periods=6).std()
ml_pd["Z"] = (ml_pd["N_HEARINGS"] - ml_pd["ROLL_MEAN"]) / ml_pd["ROLL_STD"]
ml_pd["ANOM"] = ml_pd["Z"].abs() >= 3.0
measures = ["relative"] * len(ml_pd)
measures[0] = "absolute"
y_vals = ml_pd["DELTA"].astype(float).fillna(ml_pd["N_HEARINGS"].astype(float)).tolist()
fig_w = go.Figure(
go.Waterfall(
x=ml_pd["YM"],
measure=measures,
y=y_vals,
text=[f"{int(v):,}" if pd.notnull(v) else "" for v in ml_pd["N_HEARINGS"]],
increasing=dict(marker=dict(color="seagreen")),
decreasing=dict(marker=dict(color="indianred")),
connector={"line": {"color": "rgb(110,110,110)"}},
)
)
fig_w.add_trace(
go.Scatter(
x=ml_pd.loc[ml_pd["ANOM"], "YM"],
y=ml_pd.loc[ml_pd["ANOM"], "N_HEARINGS"],
mode="markers",
marker=dict(color="crimson", size=8),
name="Anomaly (|z|>=3)",
)
)
fig_w.update_layout(
title="Monthly Hearings Waterfall (MoM change) with Anomalies",
yaxis=dict(tickformat=",d"),
)
fw = "11b_monthly_waterfall.html"
fig_w.write_html(FIGURES_DIR / fw)
copy_to_versioned(fw)
ml_pd_out = ml_pd.copy()
ml_pd_out["YM"] = ml_pd_out["YM"].astype(str)
ml_pd_out.to_csv(RUN_DIR / "monthly_anomalies.csv", index=False)
except Exception as e:
print("Monthly waterfall error:", e)
# --------------------------------------------------
# 10. Judge and court workload
# --------------------------------------------------
judge_col = None
for c in [
"BeforeHonourableJudge",
"Before Hon'ble Judges",
"Before_Honble_Judges",
"NJDG_JUDGE_NAME",
]:
if c in hearings.columns:
judge_col = c
break
if judge_col and "BusinessOnDate" in hearings.columns:
jday = (
hearings.filter(pl.col("BusinessOnDate").is_not_null())
.group_by([judge_col, "BusinessOnDate"])
.agg(pl.len().alias("N_HEARINGS"))
)
try:
fig_j = px.box(
jday.to_pandas(),
x=judge_col,
y="N_HEARINGS",
title="Per-day Hearings per Judge",
)
fig_j.update_layout(
xaxis={"categoryorder": "total descending"}, yaxis=dict(tickformat=",d")
)
fj = "12_judge_day_load.html"
fig_j.write_html(FIGURES_DIR / fj)
copy_to_versioned(fj)
except Exception as e:
print("Judge workload error:", e)
court_col = None
for cc in ["COURT_NUMBER", "CourtName"]:
if cc in hearings.columns:
court_col = cc
break
if court_col and "BusinessOnDate" in hearings.columns:
cday = (
hearings.filter(pl.col("BusinessOnDate").is_not_null())
.group_by([court_col, "BusinessOnDate"])
.agg(pl.len().alias("N_HEARINGS"))
)
try:
fig_court = px.box(
cday.to_pandas(),
x=court_col,
y="N_HEARINGS",
title="Per-day Hearings per Courtroom",
)
fig_court.update_layout(
xaxis={"categoryorder": "total descending"}, yaxis=dict(tickformat=",d")
)
fc = "12b_court_day_load.html"
fig_court.write_html(FIGURES_DIR / fc)
copy_to_versioned(fc)
except Exception as e:
print("Court workload error:", e)
# --------------------------------------------------
# 11. Purpose tagging distributions
# --------------------------------------------------
text_col = None
for c in ["PurposeofHearing", "Purpose of Hearing", "PURPOSE_OF_HEARING"]:
if c in hearings.columns:
text_col = c
break
def _has_kw_expr(col: str, kws: list[str]):
expr = None
for k in kws:
e = pl.col(col).str.contains(k)
expr = e if expr is None else (expr | e)
return (expr if expr is not None else pl.lit(False)).fill_null(False)
if text_col:
hear_txt = hearings.with_columns(
pl.col(text_col).cast(pl.Utf8).str.strip_chars().str.to_uppercase().alias("PURPOSE_TXT")
)
async_kw = ["NON-COMPLIANCE", "OFFICE OBJECTION", "COMPLIANCE", "NOTICE", "SERVICE"]
subs_kw = ["EVIDENCE", "ARGUMENT", "FINAL HEARING", "JUDGMENT", "ORDER", "DISPOSAL"]
hear_txt = hear_txt.with_columns(
pl.when(_has_kw_expr("PURPOSE_TXT", async_kw))
.then(pl.lit("ASYNC_OR_ADMIN"))
.when(_has_kw_expr("PURPOSE_TXT", subs_kw))
.then(pl.lit("SUBSTANTIVE"))
.otherwise(pl.lit("UNKNOWN"))
.alias("PURPOSE_TAG")
)
tag_share = (
hear_txt.group_by(["CASE_TYPE", "PURPOSE_TAG"])
.agg(pl.len().alias("N"))
.with_columns((pl.col("N") / pl.col("N").sum().over("CASE_TYPE")).alias("SHARE"))
.sort(["CASE_TYPE", "SHARE"], descending=[False, True])
)
tag_share.write_csv(RUN_DIR / "purpose_tag_shares.csv")
try:
fig_t = px.bar(
tag_share.to_pandas(),
x="CASE_TYPE",
y="SHARE",
color="PURPOSE_TAG",
title="Purpose Tag Shares by Case Type",
barmode="stack",
)
ft = "14_purpose_tag_shares.html"
fig_t.write_html(FIGURES_DIR / ft)
copy_to_versioned(ft)
except Exception as e:
print("Purpose shares error:", e)
if __name__ == "__main__":
run_exploration()