Spaces:
Sleeping
Sleeping
| from __future__ import annotations | |
| from pathlib import Path | |
| from typing import Tuple | |
| import polars as pl | |
| def build_ticket_views(run_dir: Path) -> Tuple[Path, Path, Path]: | |
| """Materialize post-run ticket views from events.csv into Parquet files. | |
| Creates three artifacts in the run directory: | |
| - ticket_journal.parquet | |
| - ticket_summary.parquet | |
| - ticket_state_spans.parquet | |
| Returns paths to the three files in the order above. | |
| """ | |
| run_dir = Path(run_dir) | |
| events_csv = run_dir / "events.csv" | |
| if not events_csv.exists(): | |
| raise FileNotFoundError(f"events.csv not found in run dir: {events_csv}") | |
| journal_pq = run_dir / "ticket_journal.parquet" | |
| summary_pq = run_dir / "ticket_summary.parquet" | |
| spans_pq = run_dir / "ticket_state_spans.parquet" | |
| events = pl.scan_csv(str(events_csv)) | |
| # Normalize and order | |
| journal = ( | |
| events.with_columns( | |
| [ | |
| pl.col("date").str.to_date().alias("date"), | |
| ] | |
| ) | |
| .sort(["case_id", "date"]) # lazy | |
| .with_columns( | |
| [ | |
| pl.arange(0, pl.len()).over("case_id").alias("seq_no"), | |
| ] | |
| ) | |
| .collect(streaming=True) | |
| ) | |
| journal.write_parquet(str(journal_pq)) | |
| # Outcomes for counts | |
| heard = journal.filter(pl.col("type") == "outcome").with_columns( | |
| (pl.col("detail") == "heard").alias("is_heard") | |
| ) | |
| base_summary = journal.group_by("case_id").agg( | |
| [ | |
| pl.first("case_type").alias("case_type"), | |
| pl.first("date").alias("first_seen_date"), | |
| pl.last("date").alias("last_seen_date"), | |
| pl.col("stage").sort_by("seq_no").last().alias("current_stage"), | |
| (pl.col("type") == "stage_change") | |
| .cast(pl.Int64) | |
| .sum() | |
| .alias("stage_changes"), | |
| (pl.col("type") == "ripeness_change") | |
| .cast(pl.Int64) | |
| .sum() | |
| .alias("ripeness_transitions"), | |
| ] | |
| ) | |
| outcome_summary = ( | |
| heard.group_by("case_id") | |
| .agg( | |
| [ | |
| pl.len().alias("total_hearings"), | |
| pl.col("is_heard").cast(pl.Int64).sum().alias("heard_count"), | |
| ] | |
| ) | |
| .with_columns( | |
| (pl.col("total_hearings") - pl.col("heard_count")).alias("adjourned_count") | |
| ) | |
| ) | |
| disposed = ( | |
| journal.filter(pl.col("type") == "disposed") | |
| .group_by("case_id") | |
| .agg([pl.min("date").alias("disposal_date")]) | |
| ) | |
| summary = ( | |
| base_summary.join(outcome_summary, on="case_id", how="left") | |
| .with_columns( | |
| [ | |
| pl.col("total_hearings").fill_null(0), | |
| pl.col("heard_count").fill_null(0), | |
| pl.col("adjourned_count").fill_null(0), | |
| ] | |
| ) | |
| .join(disposed, on="case_id", how="left") | |
| .with_columns( | |
| [ | |
| # Compute age in full days from first to last seen. | |
| # Use total_days() on duration to be compatible across Polars versions. | |
| (pl.col("last_seen_date") - pl.col("first_seen_date")) | |
| .dt.total_days() | |
| .alias("age_days_end"), | |
| pl.when(pl.col("disposal_date").is_not_null()) | |
| .then(pl.lit("DISPOSED")) | |
| .otherwise(pl.lit("ACTIVE")) | |
| .alias("final_status"), | |
| ] | |
| ) | |
| ) | |
| summary.write_parquet(str(summary_pq)) | |
| # Spans from stage changes | |
| sc = ( | |
| journal.filter(pl.col("type") == "stage_change") | |
| .select(["case_id", "date", "stage"]) | |
| .rename({"date": "start_date"}) | |
| ) | |
| spans = sc.with_columns( | |
| [ | |
| pl.col("start_date").shift(-1).over("case_id").alias("end_date"), | |
| ] | |
| ).with_columns( | |
| [ | |
| pl.when(pl.col("end_date").is_null()) | |
| .then(pl.col("start_date")) | |
| .otherwise(pl.col("end_date")) | |
| .alias("end_date") | |
| ] | |
| ) | |
| spans.write_parquet(str(spans_pq)) | |
| return journal_pq, summary_pq, spans_pq | |
| def load_ticket_views(run_dir: Path): | |
| """Load ticket views; build them if missing. Returns (journal, summary, spans). | |
| Uses Polars DataFrames if Polars is available; otherwise returns pandas DataFrames. | |
| """ | |
| run_dir = Path(run_dir) | |
| journal_pq = run_dir / "ticket_journal.parquet" | |
| summary_pq = run_dir / "ticket_summary.parquet" | |
| spans_pq = run_dir / "ticket_state_spans.parquet" | |
| if not (journal_pq.exists() and summary_pq.exists() and spans_pq.exists()): | |
| build_ticket_views(run_dir) | |
| journal = pl.read_parquet(str(journal_pq)) | |
| summary = pl.read_parquet(str(summary_pq)) | |
| spans = pl.read_parquet(str(spans_pq)) | |
| return journal, summary, spans | |