LisaMegaWatts commited on
Commit
d97776b
·
verified ·
1 Parent(s): bc4e57c

Upload pipeline.py with huggingface_hub

Browse files
Files changed (1) hide show
  1. pipeline.py +392 -0
pipeline.py ADDED
@@ -0,0 +1,392 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Text Processing Pipeline for MicroGPT Training
3
+
4
+ Drag-and-drop zip/epub/txt files into inbox/ and run this script
5
+ to parse, clean, chunk, and split them into train.txt/val.txt.
6
+
7
+ Usage:
8
+ python pipeline.py # Process inbox and rebuild output
9
+ python pipeline.py --rebuild # Only rebuild train/val from existing parsed chunks
10
+ python pipeline.py --stats # Show corpus statistics
11
+ python pipeline.py --push # Rebuild and push to HuggingFace
12
+ """
13
+
14
+ import argparse
15
+ import json
16
+ import logging
17
+ import random
18
+ import sys
19
+ from datetime import datetime
20
+ from pathlib import Path
21
+
22
+ import yaml
23
+
24
+ from cleaner import TextCleaner
25
+ from chunker import TextChunker
26
+ from parsers.txt_parser import parse_txt
27
+ from parsers.epub_parser import parse_epub
28
+ from parsers.zip_parser import parse_zip
29
+
30
+ SCRIPT_DIR = Path(__file__).resolve().parent
31
+
32
+ PARSERS = {
33
+ ".txt": parse_txt,
34
+ ".epub": parse_epub,
35
+ ".zip": parse_zip,
36
+ }
37
+
38
+
39
+ class Pipeline:
40
+ """Main text processing pipeline for MicroGPT training data."""
41
+
42
+ def __init__(self, config_path: Path | None = None):
43
+ if config_path is None:
44
+ config_path = SCRIPT_DIR / "config.yaml"
45
+
46
+ with open(config_path) as f:
47
+ self.config = yaml.safe_load(f)
48
+
49
+ # Resolve paths relative to script directory
50
+ paths = self.config["paths"]
51
+ self.inbox = SCRIPT_DIR / paths["inbox"]
52
+ self.output = SCRIPT_DIR / paths["output"]
53
+ self.archive = SCRIPT_DIR / paths["archive"]
54
+ self.logs = SCRIPT_DIR / paths["logs"]
55
+ self.parsed = SCRIPT_DIR / paths["parsed"]
56
+ self.manifest_path = SCRIPT_DIR / "processed_files.json"
57
+
58
+ # Create directories
59
+ for d in [self.inbox, self.output, self.archive, self.logs, self.parsed]:
60
+ d.mkdir(parents=True, exist_ok=True)
61
+
62
+ # Initialize components
63
+ self.cleaner = TextCleaner(self.config["cleaning"])
64
+ self.chunker = TextChunker(self.config["chunking"])
65
+
66
+ # Setup logging
67
+ self._setup_logging()
68
+
69
+ # Load manifest
70
+ self.manifest = self._load_manifest()
71
+
72
+ def _setup_logging(self):
73
+ log_file = self.logs / f"pipeline_{datetime.now():%Y%m%d}.log"
74
+ logging.basicConfig(
75
+ level=logging.INFO,
76
+ format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
77
+ handlers=[
78
+ logging.FileHandler(log_file, encoding="utf-8"),
79
+ logging.StreamHandler(sys.stdout),
80
+ ],
81
+ )
82
+ self.logger = logging.getLogger("pipeline")
83
+
84
+ def _load_manifest(self) -> dict:
85
+ if self.manifest_path.exists():
86
+ return json.loads(self.manifest_path.read_text(encoding="utf-8"))
87
+ return {"processed_files": []}
88
+
89
+ def _save_manifest(self):
90
+ self.manifest_path.write_text(
91
+ json.dumps(self.manifest, indent=2, ensure_ascii=False),
92
+ encoding="utf-8",
93
+ )
94
+
95
+ def process_file(self, filepath: Path) -> list[str]:
96
+ """Process a single file through parse -> clean -> chunk.
97
+
98
+ Args:
99
+ filepath: Path to the input file.
100
+
101
+ Returns:
102
+ List of text chunks ready for training.
103
+ """
104
+ ext = filepath.suffix.lower()
105
+ parser = PARSERS.get(ext)
106
+ if parser is None:
107
+ self.logger.warning("Unsupported file type: %s (%s)", filepath.name, ext)
108
+ return []
109
+
110
+ self.logger.info("Parsing %s ...", filepath.name)
111
+ try:
112
+ raw_text = parser(filepath)
113
+ except Exception as e:
114
+ self.logger.error("Parse error for %s: %s", filepath.name, e)
115
+ return []
116
+
117
+ if not raw_text.strip():
118
+ self.logger.warning("No text extracted from %s", filepath.name)
119
+ return []
120
+
121
+ self.logger.info(" Raw text: %d chars", len(raw_text))
122
+
123
+ # Clean
124
+ cleaned = self.cleaner.clean(raw_text)
125
+ self.logger.info(" Cleaned text: %d chars", len(cleaned))
126
+
127
+ if not cleaned:
128
+ self.logger.warning(" No text remaining after cleaning for %s", filepath.name)
129
+ return []
130
+
131
+ # Chunk
132
+ chunks = self.chunker.chunk(cleaned)
133
+ self.logger.info(" Chunks: %d (max %d chars each)", len(chunks), self.config["chunking"]["max_chars"])
134
+
135
+ return chunks
136
+
137
+ def process_inbox(self) -> int:
138
+ """Process all files in the inbox directory.
139
+
140
+ Returns:
141
+ Total number of new chunks added.
142
+ """
143
+ files = sorted(
144
+ f for f in self.inbox.iterdir()
145
+ if f.is_file() and f.suffix.lower() in PARSERS and not f.name.startswith(".")
146
+ )
147
+
148
+ if not files:
149
+ self.logger.info("No files to process in inbox/")
150
+ return 0
151
+
152
+ self.logger.info("Found %d file(s) in inbox/", len(files))
153
+ total_chunks = 0
154
+
155
+ for filepath in files:
156
+ chunks = self.process_file(filepath)
157
+
158
+ if chunks:
159
+ # Save chunks to parsed/ directory
160
+ slug = filepath.stem.replace(" ", "_").lower()
161
+ parsed_file = self.parsed / f"{slug}.txt"
162
+
163
+ # Handle name collisions
164
+ counter = 1
165
+ while parsed_file.exists():
166
+ parsed_file = self.parsed / f"{slug}_{counter}.txt"
167
+ counter += 1
168
+
169
+ parsed_file.write_text("\n".join(chunks), encoding="utf-8")
170
+ total_chunks += len(chunks)
171
+
172
+ self.logger.info(" Saved %d chunks to %s", len(chunks), parsed_file.name)
173
+
174
+ # Record in manifest
175
+ self.manifest["processed_files"].append({
176
+ "source": filepath.name,
177
+ "parsed_file": parsed_file.name,
178
+ "chunks": len(chunks),
179
+ "timestamp": datetime.now().isoformat(),
180
+ })
181
+
182
+ # Move to archive
183
+ archive_dest = self.archive / filepath.name
184
+ counter = 1
185
+ while archive_dest.exists():
186
+ archive_dest = self.archive / f"{filepath.stem}_{counter}{filepath.suffix}"
187
+ counter += 1
188
+
189
+ filepath.rename(archive_dest)
190
+ self.logger.info(" Archived %s -> %s", filepath.name, archive_dest.name)
191
+
192
+ self._save_manifest()
193
+ self.logger.info("Processed %d file(s), %d total new chunks", len(files), total_chunks)
194
+ return total_chunks
195
+
196
+ def rebuild_output(self) -> tuple[int, int]:
197
+ """Rebuild train.txt and val.txt from all parsed chunks.
198
+
199
+ Returns:
200
+ Tuple of (train_count, val_count).
201
+ """
202
+ # Collect all chunks from parsed/ directory
203
+ all_chunks = []
204
+ parsed_files = sorted(self.parsed.glob("*.txt"))
205
+
206
+ for pf in parsed_files:
207
+ lines = [
208
+ line.strip()
209
+ for line in pf.read_text(encoding="utf-8").splitlines()
210
+ if line.strip()
211
+ ]
212
+ all_chunks.extend(lines)
213
+ self.logger.info(" Loaded %d chunks from %s", len(lines), pf.name)
214
+
215
+ if not all_chunks:
216
+ self.logger.warning("No chunks found in parsed/ directory")
217
+ return 0, 0
218
+
219
+ # Shuffle and split
220
+ split_config = self.config["splitting"]
221
+ rng = random.Random(split_config.get("seed", 42))
222
+ if split_config.get("shuffle", True):
223
+ rng.shuffle(all_chunks)
224
+
225
+ train_ratio = split_config.get("train_ratio", 0.9)
226
+ split_idx = int(len(all_chunks) * train_ratio)
227
+ train_chunks = all_chunks[:split_idx]
228
+ val_chunks = all_chunks[split_idx:]
229
+
230
+ # Write output files
231
+ train_path = self.output / "train.txt"
232
+ val_path = self.output / "val.txt"
233
+ train_path.write_text("\n".join(train_chunks), encoding="utf-8")
234
+ val_path.write_text("\n".join(val_chunks), encoding="utf-8")
235
+
236
+ self.logger.info(
237
+ "Output: %d train chunks (%s), %d val chunks (%s)",
238
+ len(train_chunks), train_path.name,
239
+ len(val_chunks), val_path.name,
240
+ )
241
+
242
+ return len(train_chunks), len(val_chunks)
243
+
244
+ def push_to_hub(self, repo_id: str | None = None) -> str:
245
+ """Push train/val data to HuggingFace Hub as a dataset.
246
+
247
+ Args:
248
+ repo_id: HuggingFace repo (e.g. 'username/philosophy-corpus').
249
+ Falls back to config.yaml huggingface.repo_id.
250
+
251
+ Returns:
252
+ The repo URL.
253
+ """
254
+ from datasets import Dataset, DatasetDict
255
+
256
+ if repo_id is None:
257
+ hf_config = self.config.get("huggingface", {})
258
+ repo_id = hf_config.get("repo_id", "")
259
+
260
+ if not repo_id:
261
+ raise ValueError(
262
+ "No HuggingFace repo_id provided. Set it in config.yaml "
263
+ "under huggingface.repo_id or pass --hf-repo."
264
+ )
265
+
266
+ train_path = self.output / "train.txt"
267
+ val_path = self.output / "val.txt"
268
+
269
+ if not train_path.exists() or not val_path.exists():
270
+ raise FileNotFoundError(
271
+ "train.txt/val.txt not found. Run the pipeline first."
272
+ )
273
+
274
+ self.logger.info("Preparing dataset for HuggingFace Hub...")
275
+
276
+ def load_chunks(path: Path) -> list[dict]:
277
+ lines = [
278
+ l.strip()
279
+ for l in path.read_text(encoding="utf-8").splitlines()
280
+ if l.strip()
281
+ ]
282
+ return [{"text": line} for line in lines]
283
+
284
+ train_data = load_chunks(train_path)
285
+ val_data = load_chunks(val_path)
286
+
287
+ ds = DatasetDict({
288
+ "train": Dataset.from_list(train_data),
289
+ "validation": Dataset.from_list(val_data),
290
+ })
291
+
292
+ self.logger.info(
293
+ "Pushing to %s: %d train / %d val examples",
294
+ repo_id, len(train_data), len(val_data),
295
+ )
296
+
297
+ ds.push_to_hub(repo_id)
298
+
299
+ url = f"https://huggingface.co/datasets/{repo_id}"
300
+ self.logger.info("Dataset pushed: %s", url)
301
+ return url
302
+
303
+ def stats(self):
304
+ """Print corpus statistics."""
305
+ parsed_files = sorted(self.parsed.glob("*.txt"))
306
+ total_chunks = 0
307
+ total_chars = 0
308
+
309
+ print("\n=== Corpus Statistics ===\n")
310
+ print(f"{'File':<40} {'Chunks':>8} {'Chars':>10}")
311
+ print("-" * 60)
312
+
313
+ for pf in parsed_files:
314
+ lines = [l for l in pf.read_text(encoding="utf-8").splitlines() if l.strip()]
315
+ chars = sum(len(l) for l in lines)
316
+ total_chunks += len(lines)
317
+ total_chars += chars
318
+ print(f"{pf.name:<40} {len(lines):>8} {chars:>10}")
319
+
320
+ print("-" * 60)
321
+ print(f"{'TOTAL':<40} {total_chunks:>8} {total_chars:>10}")
322
+
323
+ if total_chunks > 0:
324
+ avg = total_chars / total_chunks
325
+ print(f"\nAverage chunk length: {avg:.0f} chars")
326
+
327
+ # Check output files
328
+ train_path = self.output / "train.txt"
329
+ val_path = self.output / "val.txt"
330
+ if train_path.exists() and val_path.exists():
331
+ train_lines = len([l for l in train_path.read_text(encoding="utf-8").splitlines() if l.strip()])
332
+ val_lines = len([l for l in val_path.read_text(encoding="utf-8").splitlines() if l.strip()])
333
+ print(f"\nOutput split: {train_lines} train / {val_lines} val")
334
+ else:
335
+ print("\nNo output files yet. Run pipeline to generate train.txt/val.txt")
336
+
337
+ # Vocabulary check
338
+ if train_path.exists():
339
+ text = train_path.read_text(encoding="utf-8")
340
+ vocab = sorted(set(text) - {"\n"})
341
+ print(f"Vocabulary: {len(vocab)} chars -> {''.join(vocab)}")
342
+
343
+ print()
344
+
345
+
346
+ def main():
347
+ parser = argparse.ArgumentParser(description="MicroGPT Text Processing Pipeline")
348
+ parser.add_argument("--rebuild", action="store_true", help="Only rebuild train/val from existing parsed chunks")
349
+ parser.add_argument("--stats", action="store_true", help="Show corpus statistics")
350
+ parser.add_argument("--push", action="store_true", help="Rebuild and push dataset to HuggingFace Hub")
351
+ parser.add_argument("--hf-repo", type=str, default=None, help="HuggingFace repo ID (e.g. username/dataset)")
352
+ parser.add_argument("--config", type=str, default=None, help="Path to config.yaml")
353
+ args = parser.parse_args()
354
+
355
+ config_path = Path(args.config) if args.config else None
356
+ pipeline = Pipeline(config_path)
357
+
358
+ if args.stats:
359
+ pipeline.stats()
360
+ return
361
+
362
+ if args.push:
363
+ print("Rebuilding output...")
364
+ train_n, val_n = pipeline.rebuild_output()
365
+ print(f"Output: {train_n} train / {val_n} val chunks")
366
+ print("Pushing to HuggingFace Hub...")
367
+ url = pipeline.push_to_hub(repo_id=args.hf_repo)
368
+ print(f"Dataset available at: {url}")
369
+ return
370
+
371
+ if args.rebuild:
372
+ print("Rebuilding output from existing parsed chunks...")
373
+ train_n, val_n = pipeline.rebuild_output()
374
+ print(f"Done: {train_n} train / {val_n} val chunks")
375
+ return
376
+
377
+ # Default: process inbox then rebuild
378
+ print("Processing inbox...")
379
+ new_chunks = pipeline.process_inbox()
380
+
381
+ print("Rebuilding output...")
382
+ train_n, val_n = pipeline.rebuild_output()
383
+
384
+ print(f"\n{'='*50}")
385
+ print(f"New chunks added: {new_chunks}")
386
+ print(f"Total output: {train_n} train / {val_n} val chunks")
387
+ print(f"Files: output/train.txt, output/val.txt")
388
+ print(f"{'='*50}")
389
+
390
+
391
+ if __name__ == "__main__":
392
+ main()