| """ |
| Text Processing Pipeline for MicroGPT Training |
| |
| Drag-and-drop zip/epub/txt files into inbox/ and run this script |
| to parse, clean, chunk, and split them into train.txt/val.txt. |
| |
| Usage: |
| python pipeline.py # Process inbox and rebuild output |
| python pipeline.py --rebuild # Only rebuild train/val from existing parsed chunks |
| python pipeline.py --stats # Show corpus statistics |
| python pipeline.py --push # Rebuild and push to HuggingFace |
| """ |
|
|
| import argparse |
| import json |
| import logging |
| import random |
| import sys |
| from datetime import datetime |
| from pathlib import Path |
|
|
| import yaml |
|
|
| from cleaner import TextCleaner |
| from chunker import TextChunker |
| from parsers.txt_parser import parse_txt |
| from parsers.epub_parser import parse_epub |
| from parsers.zip_parser import parse_zip |
|
|
| SCRIPT_DIR = Path(__file__).resolve().parent |
|
|
| PARSERS = { |
| ".txt": parse_txt, |
| ".epub": parse_epub, |
| ".zip": parse_zip, |
| } |
|
|
|
|
| class Pipeline: |
| """Main text processing pipeline for MicroGPT training data.""" |
|
|
| def __init__(self, config_path: Path | None = None): |
| if config_path is None: |
| config_path = SCRIPT_DIR / "config.yaml" |
|
|
| with open(config_path) as f: |
| self.config = yaml.safe_load(f) |
|
|
| |
| paths = self.config["paths"] |
| self.inbox = SCRIPT_DIR / paths["inbox"] |
| self.output = SCRIPT_DIR / paths["output"] |
| self.archive = SCRIPT_DIR / paths["archive"] |
| self.logs = SCRIPT_DIR / paths["logs"] |
| self.parsed = SCRIPT_DIR / paths["parsed"] |
| self.manifest_path = SCRIPT_DIR / "processed_files.json" |
|
|
| |
| for d in [self.inbox, self.output, self.archive, self.logs, self.parsed]: |
| d.mkdir(parents=True, exist_ok=True) |
|
|
| |
| self.cleaner = TextCleaner(self.config["cleaning"]) |
| self.chunker = TextChunker(self.config["chunking"]) |
|
|
| |
| self._setup_logging() |
|
|
| |
| self.manifest = self._load_manifest() |
|
|
| def _setup_logging(self): |
| log_file = self.logs / f"pipeline_{datetime.now():%Y%m%d}.log" |
| logging.basicConfig( |
| level=logging.INFO, |
| format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", |
| handlers=[ |
| logging.FileHandler(log_file, encoding="utf-8"), |
| logging.StreamHandler(sys.stdout), |
| ], |
| ) |
| self.logger = logging.getLogger("pipeline") |
|
|
| def _load_manifest(self) -> dict: |
| if self.manifest_path.exists(): |
| return json.loads(self.manifest_path.read_text(encoding="utf-8")) |
| return {"processed_files": []} |
|
|
| def _save_manifest(self): |
| self.manifest_path.write_text( |
| json.dumps(self.manifest, indent=2, ensure_ascii=False), |
| encoding="utf-8", |
| ) |
|
|
| def process_file(self, filepath: Path) -> list[str]: |
| """Process a single file through parse -> clean -> chunk. |
| |
| Args: |
| filepath: Path to the input file. |
| |
| Returns: |
| List of text chunks ready for training. |
| """ |
| ext = filepath.suffix.lower() |
| parser = PARSERS.get(ext) |
| if parser is None: |
| self.logger.warning("Unsupported file type: %s (%s)", filepath.name, ext) |
| return [] |
|
|
| self.logger.info("Parsing %s ...", filepath.name) |
| try: |
| raw_text = parser(filepath) |
| except Exception as e: |
| self.logger.error("Parse error for %s: %s", filepath.name, e) |
| return [] |
|
|
| if not raw_text.strip(): |
| self.logger.warning("No text extracted from %s", filepath.name) |
| return [] |
|
|
| self.logger.info(" Raw text: %d chars", len(raw_text)) |
|
|
| |
| cleaned = self.cleaner.clean(raw_text) |
| self.logger.info(" Cleaned text: %d chars", len(cleaned)) |
|
|
| if not cleaned: |
| self.logger.warning(" No text remaining after cleaning for %s", filepath.name) |
| return [] |
|
|
| |
| chunks = self.chunker.chunk(cleaned) |
| self.logger.info(" Chunks: %d (max %d chars each)", len(chunks), self.config["chunking"]["max_chars"]) |
|
|
| return chunks |
|
|
| def process_inbox(self) -> int: |
| """Process all files in the inbox directory. |
| |
| Returns: |
| Total number of new chunks added. |
| """ |
| files = sorted( |
| f for f in self.inbox.iterdir() |
| if f.is_file() and f.suffix.lower() in PARSERS and not f.name.startswith(".") |
| ) |
|
|
| if not files: |
| self.logger.info("No files to process in inbox/") |
| return 0 |
|
|
| self.logger.info("Found %d file(s) in inbox/", len(files)) |
| total_chunks = 0 |
|
|
| for filepath in files: |
| chunks = self.process_file(filepath) |
|
|
| if chunks: |
| |
| slug = filepath.stem.replace(" ", "_").lower() |
| parsed_file = self.parsed / f"{slug}.txt" |
|
|
| |
| counter = 1 |
| while parsed_file.exists(): |
| parsed_file = self.parsed / f"{slug}_{counter}.txt" |
| counter += 1 |
|
|
| parsed_file.write_text("\n".join(chunks), encoding="utf-8") |
| total_chunks += len(chunks) |
|
|
| self.logger.info(" Saved %d chunks to %s", len(chunks), parsed_file.name) |
|
|
| |
| self.manifest["processed_files"].append({ |
| "source": filepath.name, |
| "parsed_file": parsed_file.name, |
| "chunks": len(chunks), |
| "timestamp": datetime.now().isoformat(), |
| }) |
|
|
| |
| archive_dest = self.archive / filepath.name |
| counter = 1 |
| while archive_dest.exists(): |
| archive_dest = self.archive / f"{filepath.stem}_{counter}{filepath.suffix}" |
| counter += 1 |
|
|
| filepath.rename(archive_dest) |
| self.logger.info(" Archived %s -> %s", filepath.name, archive_dest.name) |
|
|
| self._save_manifest() |
| self.logger.info("Processed %d file(s), %d total new chunks", len(files), total_chunks) |
| return total_chunks |
|
|
| def rebuild_output(self) -> tuple[int, int]: |
| """Rebuild train.txt and val.txt from all parsed chunks. |
| |
| Returns: |
| Tuple of (train_count, val_count). |
| """ |
| |
| all_chunks = [] |
| parsed_files = sorted(self.parsed.glob("*.txt")) |
|
|
| for pf in parsed_files: |
| lines = [ |
| line.strip() |
| for line in pf.read_text(encoding="utf-8").splitlines() |
| if line.strip() |
| ] |
| all_chunks.extend(lines) |
| self.logger.info(" Loaded %d chunks from %s", len(lines), pf.name) |
|
|
| if not all_chunks: |
| self.logger.warning("No chunks found in parsed/ directory") |
| return 0, 0 |
|
|
| |
| split_config = self.config["splitting"] |
| rng = random.Random(split_config.get("seed", 42)) |
| if split_config.get("shuffle", True): |
| rng.shuffle(all_chunks) |
|
|
| train_ratio = split_config.get("train_ratio", 0.9) |
| split_idx = int(len(all_chunks) * train_ratio) |
| train_chunks = all_chunks[:split_idx] |
| val_chunks = all_chunks[split_idx:] |
|
|
| |
| train_path = self.output / "train.txt" |
| val_path = self.output / "val.txt" |
| train_path.write_text("\n".join(train_chunks), encoding="utf-8") |
| val_path.write_text("\n".join(val_chunks), encoding="utf-8") |
|
|
| self.logger.info( |
| "Output: %d train chunks (%s), %d val chunks (%s)", |
| len(train_chunks), train_path.name, |
| len(val_chunks), val_path.name, |
| ) |
|
|
| return len(train_chunks), len(val_chunks) |
|
|
| def push_to_hub(self, repo_id: str | None = None) -> str: |
| """Push train/val data to HuggingFace Hub as a dataset. |
| |
| Args: |
| repo_id: HuggingFace repo (e.g. 'username/philosophy-corpus'). |
| Falls back to config.yaml huggingface.repo_id. |
| |
| Returns: |
| The repo URL. |
| """ |
| from datasets import Dataset, DatasetDict |
|
|
| if repo_id is None: |
| hf_config = self.config.get("huggingface", {}) |
| repo_id = hf_config.get("repo_id", "") |
|
|
| if not repo_id: |
| raise ValueError( |
| "No HuggingFace repo_id provided. Set it in config.yaml " |
| "under huggingface.repo_id or pass --hf-repo." |
| ) |
|
|
| train_path = self.output / "train.txt" |
| val_path = self.output / "val.txt" |
|
|
| if not train_path.exists() or not val_path.exists(): |
| raise FileNotFoundError( |
| "train.txt/val.txt not found. Run the pipeline first." |
| ) |
|
|
| self.logger.info("Preparing dataset for HuggingFace Hub...") |
|
|
| def load_chunks(path: Path) -> list[dict]: |
| lines = [ |
| l.strip() |
| for l in path.read_text(encoding="utf-8").splitlines() |
| if l.strip() |
| ] |
| return [{"text": line} for line in lines] |
|
|
| train_data = load_chunks(train_path) |
| val_data = load_chunks(val_path) |
|
|
| ds = DatasetDict({ |
| "train": Dataset.from_list(train_data), |
| "validation": Dataset.from_list(val_data), |
| }) |
|
|
| self.logger.info( |
| "Pushing to %s: %d train / %d val examples", |
| repo_id, len(train_data), len(val_data), |
| ) |
|
|
| ds.push_to_hub(repo_id) |
|
|
| url = f"https://huggingface.co/datasets/{repo_id}" |
| self.logger.info("Dataset pushed: %s", url) |
| return url |
|
|
| def stats(self): |
| """Print corpus statistics.""" |
| parsed_files = sorted(self.parsed.glob("*.txt")) |
| total_chunks = 0 |
| total_chars = 0 |
|
|
| print("\n=== Corpus Statistics ===\n") |
| print(f"{'File':<40} {'Chunks':>8} {'Chars':>10}") |
| print("-" * 60) |
|
|
| for pf in parsed_files: |
| lines = [l for l in pf.read_text(encoding="utf-8").splitlines() if l.strip()] |
| chars = sum(len(l) for l in lines) |
| total_chunks += len(lines) |
| total_chars += chars |
| print(f"{pf.name:<40} {len(lines):>8} {chars:>10}") |
|
|
| print("-" * 60) |
| print(f"{'TOTAL':<40} {total_chunks:>8} {total_chars:>10}") |
|
|
| if total_chunks > 0: |
| avg = total_chars / total_chunks |
| print(f"\nAverage chunk length: {avg:.0f} chars") |
|
|
| |
| train_path = self.output / "train.txt" |
| val_path = self.output / "val.txt" |
| if train_path.exists() and val_path.exists(): |
| train_lines = len([l for l in train_path.read_text(encoding="utf-8").splitlines() if l.strip()]) |
| val_lines = len([l for l in val_path.read_text(encoding="utf-8").splitlines() if l.strip()]) |
| print(f"\nOutput split: {train_lines} train / {val_lines} val") |
| else: |
| print("\nNo output files yet. Run pipeline to generate train.txt/val.txt") |
|
|
| |
| if train_path.exists(): |
| text = train_path.read_text(encoding="utf-8") |
| vocab = sorted(set(text) - {"\n"}) |
| print(f"Vocabulary: {len(vocab)} chars -> {''.join(vocab)}") |
|
|
| print() |
|
|
|
|
| def main(): |
| parser = argparse.ArgumentParser(description="MicroGPT Text Processing Pipeline") |
| parser.add_argument("--rebuild", action="store_true", help="Only rebuild train/val from existing parsed chunks") |
| parser.add_argument("--stats", action="store_true", help="Show corpus statistics") |
| parser.add_argument("--push", action="store_true", help="Rebuild and push dataset to HuggingFace Hub") |
| parser.add_argument("--hf-repo", type=str, default=None, help="HuggingFace repo ID (e.g. username/dataset)") |
| parser.add_argument("--config", type=str, default=None, help="Path to config.yaml") |
| args = parser.parse_args() |
|
|
| config_path = Path(args.config) if args.config else None |
| pipeline = Pipeline(config_path) |
|
|
| if args.stats: |
| pipeline.stats() |
| return |
|
|
| if args.push: |
| print("Rebuilding output...") |
| train_n, val_n = pipeline.rebuild_output() |
| print(f"Output: {train_n} train / {val_n} val chunks") |
| print("Pushing to HuggingFace Hub...") |
| url = pipeline.push_to_hub(repo_id=args.hf_repo) |
| print(f"Dataset available at: {url}") |
| return |
|
|
| if args.rebuild: |
| print("Rebuilding output from existing parsed chunks...") |
| train_n, val_n = pipeline.rebuild_output() |
| print(f"Done: {train_n} train / {val_n} val chunks") |
| return |
|
|
| |
| print("Processing inbox...") |
| new_chunks = pipeline.process_inbox() |
|
|
| print("Rebuilding output...") |
| train_n, val_n = pipeline.rebuild_output() |
|
|
| print(f"\n{'='*50}") |
| print(f"New chunks added: {new_chunks}") |
| print(f"Total output: {train_n} train / {val_n} val chunks") |
| print(f"Files: output/train.txt, output/val.txt") |
| print(f"{'='*50}") |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|