Spaces:
Running on CPU Upgrade
Running on CPU Upgrade
| """v0.5 issue B+C — Pre-flight 0.4: validate the 24h time-bucket choice. | |
| Walk data/processed/cascade_chains/*.json, collect every node's | |
| time_offset_hours, and print three histograms (12h / 24h / 48h bucket | |
| widths). The bucket constant in src/rag/evidence_templating.py | |
| (_TIME_BUCKET_HOURS) defaults to 24h per spec §5.4; if the 24h histogram | |
| shows < 5 edges per (parent_domain × time bucket) on average, switch to | |
| 48h and document in the ablation report. | |
| One-shot helper. No CLI, no flags. Run from project root:: | |
| PYTHONPATH=. python scripts/v05_time_offset_histogram.py | |
| """ | |
| from __future__ import annotations | |
| import json | |
| from collections import Counter | |
| from pathlib import Path | |
| CHAINS_DIR = Path("data/processed/cascade_chains") | |
| def _bucket(hours: float, width: int) -> int: | |
| """Return the lower edge of the bucket containing `hours`.""" | |
| return int(hours // width) * width | |
| def main() -> None: | |
| offsets: list[tuple[str, float]] = [] # (parent_domain, offset) | |
| for path in sorted(CHAINS_DIR.glob("*.json")): | |
| try: | |
| chain = json.loads(path.read_text()) | |
| except (json.JSONDecodeError, OSError): | |
| continue | |
| nodes = chain.get("cascade_events") or [] | |
| # Build id → domain map so we can look up parents | |
| domain_by_id = {n["id"]: n["domain"] for n in nodes if "id" in n and "domain" in n} | |
| for node in nodes: | |
| t = node.get("time_offset_hours") | |
| if t is None: | |
| continue | |
| for pid in node.get("parent_ids") or []: | |
| pdom = domain_by_id.get(pid, "<root>") | |
| offsets.append((pdom, float(t))) | |
| if not offsets: | |
| print("No node offsets found.") | |
| return | |
| print(f"Total parent-edge offsets observed: {len(offsets)}") | |
| print(f"Range: min={min(t for _, t in offsets):.1f}h max={max(t for _, t in offsets):.1f}h") | |
| print() | |
| for width in (12, 24, 48): | |
| all_buckets: Counter = Counter(_bucket(t, width) for _, t in offsets) | |
| domain_bucket: Counter = Counter((pd, _bucket(t, width)) for pd, t in offsets) | |
| avg_per_dom_bucket = ( | |
| sum(domain_bucket.values()) / len(domain_bucket) if domain_bucket else 0 | |
| ) | |
| print(f"=== Bucket width = {width}h ===") | |
| print(f"Distinct buckets: {len(all_buckets)}") | |
| print(f"Distinct (parent_domain × bucket): {len(domain_bucket)}") | |
| print(f"Avg edges per (domain × bucket): {avg_per_dom_bucket:.2f}") | |
| print("Top buckets (overall):") | |
| for lo, count in all_buckets.most_common(8): | |
| hi = lo + width | |
| print(f" +{lo:>3}~{hi:<3}h: {count}") | |
| print() | |
| if __name__ == "__main__": | |
| main() | |