MetaCortex-Dynamics commited on
Commit
de5119b
Β·
verified Β·
1 Parent(s): 35a8c04

Create pipeline/stages/s5_emit.py

Browse files
Files changed (1) hide show
  1. pipeline/stages/s5_emit.py +122 -0
pipeline/stages/s5_emit.py ADDED
@@ -0,0 +1,122 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ S5: EMIT β€” Write validated FrameExamples to CORPUS-SEMI-001/.
3
+
4
+ Per SPEC-PIPELINE-001 Part B.2:
5
+ Format: One JSONL file per source, each line a complete FrameExample.
6
+ Corpus-level manifest.json with aggregate statistics.
7
+ Provenance chain: source β†’ segment β†’ classification β†’ decomposition β†’
8
+ validation β†’ crystallinity β†’ emission timestamp β†’ example SHA-256.
9
+ """
10
+
11
+ from __future__ import annotations
12
+
13
+ import json
14
+ import os
15
+ from dataclasses import asdict
16
+ from datetime import datetime, timezone
17
+ from typing import Sequence
18
+
19
+ from pipeline.stages.s4_validate import ValidationResult, Verdict
20
+ from pipeline.types import ExclusionRecord, FrameExample
21
+
22
+
23
+ def emit(
24
+ examples: Sequence[tuple[FrameExample, ValidationResult]],
25
+ exclusions: Sequence[ExclusionRecord],
26
+ output_dir: str,
27
+ ) -> str:
28
+ """Emit validated examples and exclusion manifest to output directory.
29
+
30
+ Returns path to manifest.json.
31
+ """
32
+ os.makedirs(output_dir, exist_ok=True)
33
+ examples_dir = os.path.join(output_dir, "examples")
34
+ os.makedirs(examples_dir, exist_ok=True)
35
+
36
+ # Group examples by source
37
+ by_source: dict[str, list[dict]] = {}
38
+ total_pass = 0
39
+ total_oracle = 0
40
+ total_fail = 0
41
+
42
+ for example, result in examples:
43
+ if result.verdict == Verdict.FAIL:
44
+ total_fail += 1
45
+ continue
46
+ if result.verdict == Verdict.ORACLE_QUEUE:
47
+ total_oracle += 1
48
+ continue
49
+
50
+ total_pass += 1
51
+ source_id = example.provenance.source_id
52
+ if source_id not in by_source:
53
+ by_source[source_id] = []
54
+
55
+ record = {
56
+ "content_hash": example.content_hash,
57
+ "provenance": asdict(example.provenance),
58
+ "channel_a": _grounding_to_dict(example.channel_a),
59
+ "channel_b": _grounding_to_dict(example.channel_b),
60
+ "channel_c": _grounding_to_dict(example.channel_c),
61
+ "witnesses": {
62
+ w.canonical_name: {"attested": a.attested, "evidence": a.evidence}
63
+ for w, a in example.witnesses.attestations.items()
64
+ },
65
+ "crystallinity": result.crystallinity_score,
66
+ "emitted_at": datetime.now(timezone.utc).isoformat(),
67
+ }
68
+ by_source[source_id].append(record)
69
+
70
+ # Write JSONL files
71
+ source_files = []
72
+ for source_id, records in by_source.items():
73
+ safe_name = source_id.replace("/", "_").replace(":", "_")[:80]
74
+ filename = f"{safe_name}.jsonl"
75
+ filepath = os.path.join(examples_dir, filename)
76
+ with open(filepath, "w", encoding="utf-8") as f:
77
+ for record in records:
78
+ f.write(json.dumps(record, ensure_ascii=False) + "\n")
79
+ source_files.append({
80
+ "source_id": source_id,
81
+ "filename": filename,
82
+ "example_count": len(records),
83
+ })
84
+
85
+ # Write exclusion manifest
86
+ exclusion_path = os.path.join(output_dir, "exclusion_manifest.jsonl")
87
+ with open(exclusion_path, "w", encoding="utf-8") as f:
88
+ for exc in exclusions:
89
+ f.write(json.dumps(asdict(exc), ensure_ascii=False) + "\n")
90
+
91
+ # Write corpus manifest
92
+ manifest = {
93
+ "spec": "SPEC-PIPELINE-001 v0.1.0",
94
+ "generated_at": datetime.now(timezone.utc).isoformat(),
95
+ "statistics": {
96
+ "total_pass": total_pass,
97
+ "total_oracle_queue": total_oracle,
98
+ "total_fail": total_fail,
99
+ "total_exclusions": len(exclusions),
100
+ "source_count": len(by_source),
101
+ },
102
+ "sources": source_files,
103
+ }
104
+ manifest_path = os.path.join(output_dir, "manifest.json")
105
+ with open(manifest_path, "w", encoding="utf-8") as f:
106
+ json.dump(manifest, f, indent=2, ensure_ascii=False)
107
+
108
+ return manifest_path
109
+
110
+
111
+ def _grounding_to_dict(grounding) -> dict:
112
+ return {
113
+ "modality": grounding.modality,
114
+ "operators": [
115
+ {
116
+ "operator": expr.operator.canonical_name,
117
+ "arguments": expr.arguments,
118
+ "evidence": expr.evidence,
119
+ }
120
+ for expr in grounding.operators.expressions
121
+ ],
122
+ }