Spaces:
Sleeping
Sleeping
File size: 4,818 Bytes
9eaac57 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 |
from __future__ import annotations
from pathlib import Path
from typing import Tuple
import polars as pl
def build_ticket_views(run_dir: Path) -> Tuple[Path, Path, Path]:
"""Materialize post-run ticket views from events.csv into Parquet files.
Creates three artifacts in the run directory:
- ticket_journal.parquet
- ticket_summary.parquet
- ticket_state_spans.parquet
Returns paths to the three files in the order above.
"""
run_dir = Path(run_dir)
events_csv = run_dir / "events.csv"
if not events_csv.exists():
raise FileNotFoundError(f"events.csv not found in run dir: {events_csv}")
journal_pq = run_dir / "ticket_journal.parquet"
summary_pq = run_dir / "ticket_summary.parquet"
spans_pq = run_dir / "ticket_state_spans.parquet"
events = pl.scan_csv(str(events_csv))
# Normalize and order
journal = (
events.with_columns(
[
pl.col("date").str.to_date().alias("date"),
]
)
.sort(["case_id", "date"]) # lazy
.with_columns(
[
pl.arange(0, pl.len()).over("case_id").alias("seq_no"),
]
)
.collect(streaming=True)
)
journal.write_parquet(str(journal_pq))
# Outcomes for counts
heard = journal.filter(pl.col("type") == "outcome").with_columns(
(pl.col("detail") == "heard").alias("is_heard")
)
base_summary = journal.group_by("case_id").agg(
[
pl.first("case_type").alias("case_type"),
pl.first("date").alias("first_seen_date"),
pl.last("date").alias("last_seen_date"),
pl.col("stage").sort_by("seq_no").last().alias("current_stage"),
(pl.col("type") == "stage_change")
.cast(pl.Int64)
.sum()
.alias("stage_changes"),
(pl.col("type") == "ripeness_change")
.cast(pl.Int64)
.sum()
.alias("ripeness_transitions"),
]
)
outcome_summary = (
heard.group_by("case_id")
.agg(
[
pl.len().alias("total_hearings"),
pl.col("is_heard").cast(pl.Int64).sum().alias("heard_count"),
]
)
.with_columns(
(pl.col("total_hearings") - pl.col("heard_count")).alias("adjourned_count")
)
)
disposed = (
journal.filter(pl.col("type") == "disposed")
.group_by("case_id")
.agg([pl.min("date").alias("disposal_date")])
)
summary = (
base_summary.join(outcome_summary, on="case_id", how="left")
.with_columns(
[
pl.col("total_hearings").fill_null(0),
pl.col("heard_count").fill_null(0),
pl.col("adjourned_count").fill_null(0),
]
)
.join(disposed, on="case_id", how="left")
.with_columns(
[
# Compute age in full days from first to last seen.
# Use total_days() on duration to be compatible across Polars versions.
(pl.col("last_seen_date") - pl.col("first_seen_date"))
.dt.total_days()
.alias("age_days_end"),
pl.when(pl.col("disposal_date").is_not_null())
.then(pl.lit("DISPOSED"))
.otherwise(pl.lit("ACTIVE"))
.alias("final_status"),
]
)
)
summary.write_parquet(str(summary_pq))
# Spans from stage changes
sc = (
journal.filter(pl.col("type") == "stage_change")
.select(["case_id", "date", "stage"])
.rename({"date": "start_date"})
)
spans = sc.with_columns(
[
pl.col("start_date").shift(-1).over("case_id").alias("end_date"),
]
).with_columns(
[
pl.when(pl.col("end_date").is_null())
.then(pl.col("start_date"))
.otherwise(pl.col("end_date"))
.alias("end_date")
]
)
spans.write_parquet(str(spans_pq))
return journal_pq, summary_pq, spans_pq
def load_ticket_views(run_dir: Path):
"""Load ticket views; build them if missing. Returns (journal, summary, spans).
Uses Polars DataFrames if Polars is available; otherwise returns pandas DataFrames.
"""
run_dir = Path(run_dir)
journal_pq = run_dir / "ticket_journal.parquet"
summary_pq = run_dir / "ticket_summary.parquet"
spans_pq = run_dir / "ticket_state_spans.parquet"
if not (journal_pq.exists() and summary_pq.exists() and spans_pq.exists()):
build_ticket_views(run_dir)
journal = pl.read_parquet(str(journal_pq))
summary = pl.read_parquet(str(summary_pq))
spans = pl.read_parquet(str(spans_pq))
return journal, summary, spans
|