"""v0.5 issue B+C — Pre-flight 0.4: validate the 24h time-bucket choice. Walk data/processed/cascade_chains/*.json, collect every node's time_offset_hours, and print three histograms (12h / 24h / 48h bucket widths). The bucket constant in src/rag/evidence_templating.py (_TIME_BUCKET_HOURS) defaults to 24h per spec §5.4; if the 24h histogram shows < 5 edges per (parent_domain × time bucket) on average, switch to 48h and document in the ablation report. One-shot helper. No CLI, no flags. Run from project root:: PYTHONPATH=. python scripts/v05_time_offset_histogram.py """ from __future__ import annotations import json from collections import Counter from pathlib import Path CHAINS_DIR = Path("data/processed/cascade_chains") def _bucket(hours: float, width: int) -> int: """Return the lower edge of the bucket containing `hours`.""" return int(hours // width) * width def main() -> None: offsets: list[tuple[str, float]] = [] # (parent_domain, offset) for path in sorted(CHAINS_DIR.glob("*.json")): try: chain = json.loads(path.read_text()) except (json.JSONDecodeError, OSError): continue nodes = chain.get("cascade_events") or [] # Build id → domain map so we can look up parents domain_by_id = {n["id"]: n["domain"] for n in nodes if "id" in n and "domain" in n} for node in nodes: t = node.get("time_offset_hours") if t is None: continue for pid in node.get("parent_ids") or []: pdom = domain_by_id.get(pid, "") offsets.append((pdom, float(t))) if not offsets: print("No node offsets found.") return print(f"Total parent-edge offsets observed: {len(offsets)}") print(f"Range: min={min(t for _, t in offsets):.1f}h max={max(t for _, t in offsets):.1f}h") print() for width in (12, 24, 48): all_buckets: Counter = Counter(_bucket(t, width) for _, t in offsets) domain_bucket: Counter = Counter((pd, _bucket(t, width)) for pd, t in offsets) avg_per_dom_bucket = ( sum(domain_bucket.values()) / len(domain_bucket) if domain_bucket else 0 ) print(f"=== Bucket width = {width}h ===") print(f"Distinct buckets: {len(all_buckets)}") print(f"Distinct (parent_domain × bucket): {len(domain_bucket)}") print(f"Avg edges per (domain × bucket): {avg_per_dom_bucket:.2f}") print("Top buckets (overall):") for lo, count in all_buckets.most_common(8): hi = lo + width print(f" +{lo:>3}~{hi:<3}h: {count}") print() if __name__ == "__main__": main()