RoyAalekh's picture
enhancements, added view for scehduled cases as tickets
9eaac57
from __future__ import annotations
from pathlib import Path
from typing import Tuple
import polars as pl
def build_ticket_views(run_dir: Path) -> Tuple[Path, Path, Path]:
"""Materialize post-run ticket views from events.csv into Parquet files.
Creates three artifacts in the run directory:
- ticket_journal.parquet
- ticket_summary.parquet
- ticket_state_spans.parquet
Returns paths to the three files in the order above.
"""
run_dir = Path(run_dir)
events_csv = run_dir / "events.csv"
if not events_csv.exists():
raise FileNotFoundError(f"events.csv not found in run dir: {events_csv}")
journal_pq = run_dir / "ticket_journal.parquet"
summary_pq = run_dir / "ticket_summary.parquet"
spans_pq = run_dir / "ticket_state_spans.parquet"
events = pl.scan_csv(str(events_csv))
# Normalize and order
journal = (
events.with_columns(
[
pl.col("date").str.to_date().alias("date"),
]
)
.sort(["case_id", "date"]) # lazy
.with_columns(
[
pl.arange(0, pl.len()).over("case_id").alias("seq_no"),
]
)
.collect(streaming=True)
)
journal.write_parquet(str(journal_pq))
# Outcomes for counts
heard = journal.filter(pl.col("type") == "outcome").with_columns(
(pl.col("detail") == "heard").alias("is_heard")
)
base_summary = journal.group_by("case_id").agg(
[
pl.first("case_type").alias("case_type"),
pl.first("date").alias("first_seen_date"),
pl.last("date").alias("last_seen_date"),
pl.col("stage").sort_by("seq_no").last().alias("current_stage"),
(pl.col("type") == "stage_change")
.cast(pl.Int64)
.sum()
.alias("stage_changes"),
(pl.col("type") == "ripeness_change")
.cast(pl.Int64)
.sum()
.alias("ripeness_transitions"),
]
)
outcome_summary = (
heard.group_by("case_id")
.agg(
[
pl.len().alias("total_hearings"),
pl.col("is_heard").cast(pl.Int64).sum().alias("heard_count"),
]
)
.with_columns(
(pl.col("total_hearings") - pl.col("heard_count")).alias("adjourned_count")
)
)
disposed = (
journal.filter(pl.col("type") == "disposed")
.group_by("case_id")
.agg([pl.min("date").alias("disposal_date")])
)
summary = (
base_summary.join(outcome_summary, on="case_id", how="left")
.with_columns(
[
pl.col("total_hearings").fill_null(0),
pl.col("heard_count").fill_null(0),
pl.col("adjourned_count").fill_null(0),
]
)
.join(disposed, on="case_id", how="left")
.with_columns(
[
# Compute age in full days from first to last seen.
# Use total_days() on duration to be compatible across Polars versions.
(pl.col("last_seen_date") - pl.col("first_seen_date"))
.dt.total_days()
.alias("age_days_end"),
pl.when(pl.col("disposal_date").is_not_null())
.then(pl.lit("DISPOSED"))
.otherwise(pl.lit("ACTIVE"))
.alias("final_status"),
]
)
)
summary.write_parquet(str(summary_pq))
# Spans from stage changes
sc = (
journal.filter(pl.col("type") == "stage_change")
.select(["case_id", "date", "stage"])
.rename({"date": "start_date"})
)
spans = sc.with_columns(
[
pl.col("start_date").shift(-1).over("case_id").alias("end_date"),
]
).with_columns(
[
pl.when(pl.col("end_date").is_null())
.then(pl.col("start_date"))
.otherwise(pl.col("end_date"))
.alias("end_date")
]
)
spans.write_parquet(str(spans_pq))
return journal_pq, summary_pq, spans_pq
def load_ticket_views(run_dir: Path):
"""Load ticket views; build them if missing. Returns (journal, summary, spans).
Uses Polars DataFrames if Polars is available; otherwise returns pandas DataFrames.
"""
run_dir = Path(run_dir)
journal_pq = run_dir / "ticket_journal.parquet"
summary_pq = run_dir / "ticket_summary.parquet"
spans_pq = run_dir / "ticket_state_spans.parquet"
if not (journal_pq.exists() and summary_pq.exists() and spans_pq.exists()):
build_ticket_views(run_dir)
journal = pl.read_parquet(str(journal_pq))
summary = pl.read_parquet(str(summary_pq))
spans = pl.read_parquet(str(spans_pq))
return journal, summary, spans