Spaces:
Running
Running
File size: 2,506 Bytes
4d0ffdd f6c65ef 4d0ffdd efb0735 4d0ffdd efb0735 4d0ffdd |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 |
"""Event schema and writer for simulation audit trail.
Each event is a flat dict suitable for CSV logging with a 'type' field.
Types:
- filing: a new case filed into the system
- scheduled: a case scheduled on a date
- outcome: hearing outcome (heard/adjourned)
- stage_change: case progresses to a new stage
- disposed: case disposed
"""
from __future__ import annotations
import csv
from dataclasses import dataclass
from datetime import date
from pathlib import Path
@dataclass
class EventWriter:
path: Path
def __post_init__(self) -> None:
self.path.parent.mkdir(parents=True, exist_ok=True)
self._buffer = [] # in-memory rows to append
if not self.path.exists():
with self.path.open("w", newline="", encoding="utf-8") as f:
w = csv.writer(f)
w.writerow([
"date", "type", "case_id", "case_type", "stage", "courtroom_id",
"detail", "extra",
"priority_score", "age_days", "readiness_score", "is_urgent",
"adj_boost", "ripeness_status", "days_since_hearing"
])
def write(self, date_: date, type_: str, case_id: str, case_type: str = "",
stage: str = "", courtroom_id: int | None = None,
detail: str = "", extra: str = "",
priority_score: float | None = None, age_days: int | None = None,
readiness_score: float | None = None, is_urgent: bool | None = None,
adj_boost: float | None = None, ripeness_status: str = "",
days_since_hearing: int | None = None) -> None:
self._buffer.append([
date_.isoformat(), type_, case_id, case_type, stage,
courtroom_id if courtroom_id is not None else "",
detail, extra,
f"{priority_score:.4f}" if priority_score is not None else "",
age_days if age_days is not None else "",
f"{readiness_score:.4f}" if readiness_score is not None else "",
int(is_urgent) if is_urgent is not None else "",
f"{adj_boost:.4f}" if adj_boost is not None else "",
ripeness_status,
days_since_hearing if days_since_hearing is not None else "",
])
def flush(self) -> None:
if not self._buffer:
return
with self.path.open("a", newline="", encoding="utf-8") as f:
w = csv.writer(f)
w.writerows(self._buffer)
self._buffer.clear()
|