#!/usr/bin/env python3 """ Training Data Collection Pipeline for Crowe Logic Mini Target: 1-2 billion tokens from scientific and domain-specific sources Data sources: 1. Public datasets (The Pile, RedPajama, arXiv, Wikipedia) - 1.5B tokens 2. Domain-specific scraping (mycology, drug discovery) - 200M tokens 3. Proprietary data (Southwest Mushrooms, CrowLogic) - 20M tokens 4. Curated examples - 30M tokens """ import os import json import requests import subprocess from pathlib import Path from typing import List, Dict, Optional from dataclasses import dataclass from tqdm import tqdm import hashlib @dataclass class DataSource: """Configuration for a data source""" name: str url: Optional[str] estimated_tokens: int priority: str # "critical", "high", "medium", "low" collection_method: str # "download", "api", "scrape", "manual" status: str = "pending" class DataCollectionPipeline: """Automated pipeline to collect 1-2B tokens of training data""" def __init__(self, output_dir: str = "./data/raw", target_tokens: int = 1_500_000_000): self.output_dir = Path(output_dir) self.output_dir.mkdir(parents=True, exist_ok=True) self.target_tokens = target_tokens self.collected_tokens = 0 self.sources = self._define_data_sources() def _define_data_sources(self) -> List[DataSource]: """Define all data sources with metadata""" return [ # ===== PUBLIC DATASETS (Automated) ===== DataSource( name="Wikipedia Science", url="https://dumps.wikimedia.org/enwiki/latest/enwiki-latest-pages-articles.xml.bz2", estimated_tokens=500_000_000, priority="critical", collection_method="download" ), DataSource( name="arXiv Papers", url="s3://arxiv/", # AWS S3 bucket estimated_tokens=300_000_000, priority="critical", collection_method="download" ), DataSource( name="The Pile - arXiv subset", url="https://the-eye.eu/public/AI/pile/train/", estimated_tokens=200_000_000, priority="high", collection_method="download" ), DataSource( name="The Pile - PubMed subset", url="https://the-eye.eu/public/AI/pile/train/", estimated_tokens=150_000_000, priority="high", collection_method="download" ), DataSource( name="PubMed Abstracts", url="https://ftp.ncbi.nlm.nih.gov/pubmed/baseline/", estimated_tokens=200_000_000, priority="high", collection_method="download" ), DataSource( name="RedPajama - Wikipedia", url="https://data.together.xyz/redpajama-data-1T/v1.0.0/", estimated_tokens=100_000_000, priority="medium", collection_method="download" ), # ===== DOMAIN-SPECIFIC SOURCES ===== DataSource( name="Mycology Literature", url=None, # Multiple sources estimated_tokens=50_000_000, priority="critical", collection_method="scrape" ), DataSource( name="Drug Discovery Papers", url="https://www.ebi.ac.uk/chembl/", estimated_tokens=50_000_000, priority="critical", collection_method="api" ), DataSource( name="AI/ML Papers (arXiv cs.AI)", url="https://arxiv.org/list/cs.AI/recent", estimated_tokens=100_000_000, priority="high", collection_method="api" ), DataSource( name="GitHub AI Documentation", url="https://github.com/", estimated_tokens=50_000_000, priority="medium", collection_method="api" ), # ===== PROPRIETARY DATA ===== DataSource( name="Southwest Mushrooms Data", url=None, estimated_tokens=10_000_000, priority="critical", collection_method="manual" ), DataSource( name="CrowLogic Documentation", url=None, estimated_tokens=5_000_000, priority="critical", collection_method="manual" ), DataSource( name="Prologic Methodology Examples", url=None, estimated_tokens=5_000_000, priority="critical", collection_method="manual" ), # ===== CURATED EXAMPLES ===== DataSource( name="Chain-of-Thought Examples", url=None, estimated_tokens=10_000_000, priority="high", collection_method="manual" ), DataSource( name="Domain Q&A Pairs", url=None, estimated_tokens=20_000_000, priority="high", collection_method="manual" ), ] def download_wikipedia(self) -> Dict: """Download and extract Wikipedia dump""" print("\n" + "="*70) print("Downloading Wikipedia Science Articles") print("="*70) wiki_dir = self.output_dir / "wikipedia" wiki_dir.mkdir(exist_ok=True) # Download latest dump dump_url = "https://dumps.wikimedia.org/enwiki/latest/enwiki-latest-pages-articles.xml.bz2" dump_file = wiki_dir / "enwiki-latest.xml.bz2" print(f"\nšŸ“„ Downloading from: {dump_url}") print(f" Destination: {dump_file}") print(f" Size: ~20GB (compressed), ~80GB (uncompressed)") print("\nāš ļø This will take 1-4 hours depending on connection speed") print("\nCommands to run:") print(f" wget {dump_url} -O {dump_file}") print(f" python -m wikiextractor.WikiExtractor {dump_file} -o {wiki_dir / 'extracted'} --json") return { "status": "manual_steps_needed", "instructions": "Run wget and WikiExtractor commands above", "estimated_tokens": 500_000_000 } def download_arxiv(self) -> Dict: """Download arXiv papers""" print("\n" + "="*70) print("Downloading arXiv Papers") print("="*70) arxiv_dir = self.output_dir / "arxiv" arxiv_dir.mkdir(exist_ok=True) print("\nOptions for arXiv data:") print("\n1. Bulk download from S3 (recommended):") print(" aws s3 sync s3://arxiv/src/ ./data/raw/arxiv/ --no-sign-request") print("\n2. Use arXiv API:") print(" pip install arxiv") print(" python scripts/download_arxiv_api.py") print("\n3. Use existing preprocessed datasets:") print(" - RedPajama arXiv subset") print(" - The Pile arXiv subset") return { "status": "manual_steps_needed", "instructions": "Choose one of the methods above", "estimated_tokens": 300_000_000 } def download_pubmed(self) -> Dict: """Download PubMed abstracts""" print("\n" + "="*70) print("Downloading PubMed Abstracts") print("="*70) pubmed_dir = self.output_dir / "pubmed" pubmed_dir.mkdir(exist_ok=True) base_url = "https://ftp.ncbi.nlm.nih.gov/pubmed/baseline/" print(f"\nšŸ“„ PubMed Baseline Files") print(f" URL: {base_url}") print(f" Files: pubmed24n*.xml.gz (1000+ files)") print(f" Total size: ~30GB") print("\nCommand to download:") print(f" wget -r -np -nd -A 'pubmed24n*.xml.gz' {base_url} -P {pubmed_dir}") return { "status": "manual_steps_needed", "instructions": "Run wget command above", "estimated_tokens": 200_000_000 } def download_the_pile_subset(self, subset: str = "arxiv") -> Dict: """Download specific subset from The Pile""" print(f"\n" + "="*70) print(f"Downloading The Pile - {subset} subset") print("="*70) pile_dir = self.output_dir / "the_pile" / subset pile_dir.mkdir(parents=True, exist_ok=True) print(f"\nThe Pile subsets available:") print(" - ArXiv") print(" - PubMed Abstracts") print(" - PubMed Central") print(" - FreeLaw") print(" - USPTO Backgrounds") print(" - Wikipedia (en)") print("\nDownload from: https://the-eye.eu/public/AI/pile/train/") print(f"Save to: {pile_dir}") return { "status": "manual_steps_needed", "instructions": f"Download {subset} subset from The Pile", "estimated_tokens": 200_000_000 } def create_mycology_corpus(self) -> Dict: """Instructions for collecting mycology data""" print("\n" + "="*70) print("Collecting Mycology Domain Data") print("="*70) myco_dir = self.output_dir / "mycology" myco_dir.mkdir(exist_ok=True) sources = { "MushroomExpert.com": "http://www.mushroomexpert.com/", "Shroomery": "https://www.shroomery.org/", "MycoWorks Papers": "Research papers on fungal materials", "Cultivation Guides": "Paul Stamets, Tradd Cotter books", "Scientific Papers": "Search PubMed/arXiv for mycology", "Southwest Mushrooms Data": "Your proprietary cultivation data", } print("\nMycology data sources:") for name, desc in sources.items(): print(f" āœ“ {name}: {desc}") print(f"\nSave all mycology text to: {myco_dir}") print("\nRecommended structure:") print(" mycology/") print(" ā”œā”€ā”€ cultivation_guides.txt") print(" ā”œā”€ā”€ species_descriptions.txt") print(" ā”œā”€ā”€ scientific_papers.txt") print(" ā”œā”€ā”€ forum_discussions.txt") print(" └── southwest_mushrooms.txt") return { "status": "manual_collection_needed", "directory": str(myco_dir), "estimated_tokens": 50_000_000 } def create_drug_discovery_corpus(self) -> Dict: """Instructions for collecting drug discovery data""" print("\n" + "="*70) print("Collecting Drug Discovery Domain Data") print("="*70) drug_dir = self.output_dir / "drug_discovery" drug_dir.mkdir(exist_ok=True) sources = { "ChEMBL": "https://www.ebi.ac.uk/chembl/ (API available)", "PubChem": "https://pubchem.ncbi.nlm.nih.gov/", "DrugBank": "https://www.drugbank.com/", "Clinical Trials": "https://clinicaltrials.gov/", "Patents": "USPTO chemical patents", "Papers": "PubMed chemistry/pharmacology papers", } print("\nDrug discovery data sources:") for name, url in sources.items(): print(f" āœ“ {name}: {url}") print(f"\nSave to: {drug_dir}") return { "status": "manual_collection_needed", "directory": str(drug_dir), "estimated_tokens": 50_000_000 } def estimate_tokens(self, text_file: Path) -> int: """Estimate token count in a text file""" if not text_file.exists(): return 0 # Rough estimate: 1 token ā‰ˆ 0.75 words ā‰ˆ 4 characters file_size = text_file.stat().st_size estimated_tokens = file_size // 4 return estimated_tokens def generate_collection_plan(self) -> Dict: """Generate a detailed data collection plan""" print("\n" + "="*70) print("CROWE LOGIC MINI - DATA COLLECTION PLAN") print("Target: 1-2 Billion Tokens") print("="*70) plan = { "target_tokens": self.target_tokens, "phases": [] } # Phase 1: Automated downloads (1 week) phase1 = { "name": "Phase 1: Public Datasets (Automated)", "timeline": "Week 1", "target_tokens": 1_200_000_000, "sources": [ {"name": "Wikipedia", "tokens": 500_000_000, "time": "6-12 hours"}, {"name": "arXiv", "tokens": 300_000_000, "time": "12-24 hours"}, {"name": "PubMed", "tokens": 200_000_000, "time": "6-12 hours"}, {"name": "The Pile subsets", "tokens": 200_000_000, "time": "6-12 hours"}, ] } # Phase 2: Domain-specific collection (3-5 days) phase2 = { "name": "Phase 2: Domain-Specific Data", "timeline": "Week 2 (3-5 days)", "target_tokens": 200_000_000, "sources": [ {"name": "Mycology", "tokens": 50_000_000, "method": "web scraping + papers"}, {"name": "Drug Discovery", "tokens": 50_000_000, "method": "APIs + databases"}, {"name": "AI/ML", "tokens": 100_000_000, "method": "arXiv subset + docs"}, ] } # Phase 3: Proprietary data (1-2 days) phase3 = { "name": "Phase 3: Proprietary Data", "timeline": "Week 2 (1-2 days)", "target_tokens": 20_000_000, "sources": [ {"name": "Southwest Mushrooms", "tokens": 10_000_000, "method": "extract from records"}, {"name": "CrowLogic/CriOS", "tokens": 10_000_000, "method": "documentation"}, ] } # Phase 4: Curated examples (2-3 days) phase4 = { "name": "Phase 4: Curated Examples", "timeline": "Week 2-3 (2-3 days)", "target_tokens": 30_000_000, "sources": [ {"name": "Chain-of-thought", "tokens": 10_000_000, "method": "manual creation"}, {"name": "Domain Q&A", "tokens": 20_000_000, "method": "curated + generated"}, ] } plan["phases"] = [phase1, phase2, phase3, phase4] # Print plan for phase in plan["phases"]: print(f"\n{phase['name']}") print(f"Timeline: {phase['timeline']}") print(f"Target: {phase['target_tokens']:,} tokens") print(f"\nSources:") for source in phase["sources"]: print(f" āœ“ {source['name']}: {source['tokens']:,} tokens") total_tokens = sum(p["target_tokens"] for p in plan["phases"]) print(f"\n{'='*70}") print(f"TOTAL: {total_tokens:,} tokens ({total_tokens/1e9:.1f}B)") print(f"Timeline: 2-3 weeks") print(f"{'='*70}") return plan def check_existing_data(self) -> Dict: """Check what data has already been collected""" print("\n" + "="*70) print("Checking Existing Data") print("="*70) collected = {} total_tokens = 0 if not self.output_dir.exists(): print("\nāš ļø No data directory found. Starting from scratch.") return collected for subdir in self.output_dir.iterdir(): if subdir.is_dir(): tokens = 0 files = list(subdir.glob("**/*.txt")) + list(subdir.glob("**/*.json")) for f in files: tokens += self.estimate_tokens(f) if tokens > 0: collected[subdir.name] = { "files": len(files), "tokens": tokens } total_tokens += tokens if collected: print(f"\nāœ“ Found existing data:") for name, info in collected.items(): print(f" {name}: {info['files']} files, ~{info['tokens']:,} tokens") print(f"\nTotal collected: ~{total_tokens:,} tokens ({total_tokens/1e9:.2f}B)") else: print("\nāš ļø No existing data found.") remaining = max(0, self.target_tokens - total_tokens) print(f"Remaining to collect: ~{remaining:,} tokens ({remaining/1e9:.2f}B)") return collected def main(): """Main execution""" print("\nšŸš€ Crowe Logic Mini - Training Data Collection Pipeline\n") pipeline = DataCollectionPipeline( output_dir="./data/raw", target_tokens=1_500_000_000 # 1.5B tokens ) # Check existing data pipeline.check_existing_data() # Generate collection plan plan = pipeline.generate_collection_plan() # Provide next steps print("\n" + "="*70) print("šŸ“‹ NEXT STEPS") print("="*70) print("\n1. Review the collection plan above") print("2. Start with Phase 1 (automated downloads)") print("3. Run individual collection scripts:") print("\n python data_collection/download_wikipedia.py") print(" python data_collection/download_arxiv.py") print(" python data_collection/download_pubmed.py") print("\n4. For domain-specific data, see instructions above") print("5. Once data is collected, run preprocessing:") print("\n python data_collection/preprocess_training_data.py") print("\n6. Train tokenizer:") print("\n python tokenizer/build_scientific_tokenizer.py") print("\n" + "="*70) print("For detailed instructions, see: DATA_COLLECTION_GUIDE.md") print("="*70) if __name__ == "__main__": main()