MetaCortex-Dynamics commited on
Commit
80768d2
·
verified ·
1 Parent(s): 2eeb4ee

Create pipeline/ingest/chat_archive.py

Browse files
Files changed (1) hide show
  1. pipeline/ingest/chat_archive.py +161 -0
pipeline/ingest/chat_archive.py ADDED
@@ -0,0 +1,161 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Ingest ChatGPT conversation archives into the governed pipeline.
3
+
4
+ Reads individual conversation JSON files (theory-archive format)
5
+ or the bulk conversations.json from ChatGPT data export.
6
+
7
+ Each message becomes a candidate Segment for S2 classification.
8
+ Technical messages (MaL reasoning, governance proofs, operator derivations)
9
+ proceed to S3 decomposition. Non-technical messages are excluded.
10
+ """
11
+
12
+ from __future__ import annotations
13
+
14
+ import json
15
+ import os
16
+ from datetime import datetime, timezone
17
+ from hashlib import sha256
18
+ from pathlib import Path
19
+ from typing import Iterator
20
+
21
+ from pipeline.stages.s1_segment import Segment
22
+ from pipeline.types import SourceProvenance, Tier
23
+
24
+
25
+ def ingest_conversation_file(path: str | Path) -> Iterator[Segment]:
26
+ """Ingest a single conversation JSON file (theory-archive format)."""
27
+ path = Path(path)
28
+ with open(path, encoding="utf-8") as f:
29
+ conv = json.load(f)
30
+
31
+ file_hash = sha256(path.read_bytes()).hexdigest()
32
+ title = conv.get("title", path.stem)
33
+ conv_id = conv.get("conversation_id", path.stem)
34
+ create_time = conv.get("create_time")
35
+ timestamp = (
36
+ datetime.fromtimestamp(create_time, tz=timezone.utc).isoformat()
37
+ if create_time else "unknown"
38
+ )
39
+
40
+ provenance = SourceProvenance(
41
+ source_id=f"chat:{conv_id}",
42
+ tier=Tier.T3,
43
+ url=f"theory-archive/{path.name}",
44
+ commit_or_version=timestamp,
45
+ license="proprietary",
46
+ acquired_at=datetime.now(timezone.utc).isoformat(),
47
+ artifact_sha256=file_hash,
48
+ )
49
+
50
+ # ChatGPT format: mapping dict with message nodes
51
+ mapping = conv.get("mapping", {})
52
+ offset = 0
53
+
54
+ for node_id, node in mapping.items():
55
+ msg = node.get("message")
56
+ if not msg:
57
+ continue
58
+
59
+ role = msg.get("author", {}).get("role", "unknown")
60
+ if role == "system":
61
+ continue # Skip system prompts
62
+
63
+ content = msg.get("content", {})
64
+ parts = content.get("parts", [])
65
+ text = ""
66
+ for part in parts:
67
+ if isinstance(part, str):
68
+ text += part
69
+
70
+ text = text.strip()
71
+ if not text:
72
+ continue
73
+
74
+ msg_id = msg.get("id", node_id)
75
+ byte_start = offset
76
+ byte_end = offset + len(text.encode("utf-8"))
77
+ offset = byte_end
78
+
79
+ yield Segment(
80
+ source=provenance,
81
+ byte_range=(byte_start, byte_end),
82
+ text=text,
83
+ segment_type=f"chat_{role}",
84
+ )
85
+
86
+
87
+ def ingest_conversation_dir(dir_path: str | Path) -> Iterator[Segment]:
88
+ """Ingest all conversation JSON files in a directory."""
89
+ dir_path = Path(dir_path)
90
+ files = sorted(dir_path.glob("conv_*.json"))
91
+ for path in files:
92
+ yield from ingest_conversation_file(path)
93
+
94
+
95
+ def ingest_bulk_conversations(zip_or_json_path: str | Path) -> Iterator[Segment]:
96
+ """Ingest the bulk conversations.json from a ChatGPT export."""
97
+ path = Path(zip_or_json_path)
98
+
99
+ if path.suffix == ".zip":
100
+ import zipfile
101
+ with zipfile.ZipFile(path) as z:
102
+ with z.open("conversations.json") as f:
103
+ conversations = json.load(f)
104
+ else:
105
+ with open(path, encoding="utf-8") as f:
106
+ conversations = json.load(f)
107
+
108
+ file_hash = sha256(path.read_bytes()).hexdigest()
109
+
110
+ for conv in conversations:
111
+ conv_id = conv.get("id", conv.get("conversation_id", "unknown"))
112
+ title = conv.get("title", "untitled")
113
+ create_time = conv.get("create_time")
114
+ timestamp = (
115
+ datetime.fromtimestamp(create_time, tz=timezone.utc).isoformat()
116
+ if create_time else "unknown"
117
+ )
118
+
119
+ provenance = SourceProvenance(
120
+ source_id=f"chat:{conv_id}",
121
+ tier=Tier.T3,
122
+ url=f"chatgpt-export/{title[:60]}",
123
+ commit_or_version=timestamp,
124
+ license="proprietary",
125
+ acquired_at=datetime.now(timezone.utc).isoformat(),
126
+ artifact_sha256=file_hash,
127
+ )
128
+
129
+ mapping = conv.get("mapping", {})
130
+ offset = 0
131
+
132
+ for node_id, node in mapping.items():
133
+ msg = node.get("message")
134
+ if not msg:
135
+ continue
136
+
137
+ role = msg.get("author", {}).get("role", "unknown")
138
+ if role == "system":
139
+ continue
140
+
141
+ content = msg.get("content", {})
142
+ parts = content.get("parts", [])
143
+ text = ""
144
+ for part in parts:
145
+ if isinstance(part, str):
146
+ text += part
147
+
148
+ text = text.strip()
149
+ if not text:
150
+ continue
151
+
152
+ byte_start = offset
153
+ byte_end = offset + len(text.encode("utf-8"))
154
+ offset = byte_end
155
+
156
+ yield Segment(
157
+ source=provenance,
158
+ byte_range=(byte_start, byte_end),
159
+ text=text,
160
+ segment_type=f"chat_{role}",
161
+ )