cascade_risk / scripts /v05_time_offset_histogram.py
Lucasoppem's picture
Sync from GitHub main (part 2)
36f9d47 verified
Raw
History Blame Contribute Delete
2.73 kB
"""v0.5 issue B+C — Pre-flight 0.4: validate the 24h time-bucket choice.
Walk data/processed/cascade_chains/*.json, collect every node's
time_offset_hours, and print three histograms (12h / 24h / 48h bucket
widths). The bucket constant in src/rag/evidence_templating.py
(_TIME_BUCKET_HOURS) defaults to 24h per spec §5.4; if the 24h histogram
shows < 5 edges per (parent_domain × time bucket) on average, switch to
48h and document in the ablation report.
One-shot helper. No CLI, no flags. Run from project root::
PYTHONPATH=. python scripts/v05_time_offset_histogram.py
"""
from __future__ import annotations
import json
from collections import Counter
from pathlib import Path
CHAINS_DIR = Path("data/processed/cascade_chains")
def _bucket(hours: float, width: int) -> int:
"""Return the lower edge of the bucket containing `hours`."""
return int(hours // width) * width
def main() -> None:
offsets: list[tuple[str, float]] = [] # (parent_domain, offset)
for path in sorted(CHAINS_DIR.glob("*.json")):
try:
chain = json.loads(path.read_text())
except (json.JSONDecodeError, OSError):
continue
nodes = chain.get("cascade_events") or []
# Build id → domain map so we can look up parents
domain_by_id = {n["id"]: n["domain"] for n in nodes if "id" in n and "domain" in n}
for node in nodes:
t = node.get("time_offset_hours")
if t is None:
continue
for pid in node.get("parent_ids") or []:
pdom = domain_by_id.get(pid, "<root>")
offsets.append((pdom, float(t)))
if not offsets:
print("No node offsets found.")
return
print(f"Total parent-edge offsets observed: {len(offsets)}")
print(f"Range: min={min(t for _, t in offsets):.1f}h max={max(t for _, t in offsets):.1f}h")
print()
for width in (12, 24, 48):
all_buckets: Counter = Counter(_bucket(t, width) for _, t in offsets)
domain_bucket: Counter = Counter((pd, _bucket(t, width)) for pd, t in offsets)
avg_per_dom_bucket = (
sum(domain_bucket.values()) / len(domain_bucket) if domain_bucket else 0
)
print(f"=== Bucket width = {width}h ===")
print(f"Distinct buckets: {len(all_buckets)}")
print(f"Distinct (parent_domain × bucket): {len(domain_bucket)}")
print(f"Avg edges per (domain × bucket): {avg_per_dom_bucket:.2f}")
print("Top buckets (overall):")
for lo, count in all_buckets.most_common(8):
hi = lo + width
print(f" +{lo:>3}~{hi:<3}h: {count}")
print()
if __name__ == "__main__":
main()