crowe-logic-mini / scripts /collect_data.py
mike1210's picture
Upload scripts/collect_data.py with huggingface_hub
4971a7f verified
#!/usr/bin/env python3
"""
Training Data Collection Pipeline for Crowe Logic Mini
Target: 1-2 billion tokens from scientific and domain-specific sources
Data sources:
1. Public datasets (The Pile, RedPajama, arXiv, Wikipedia) - 1.5B tokens
2. Domain-specific scraping (mycology, drug discovery) - 200M tokens
3. Proprietary data (Southwest Mushrooms, CrowLogic) - 20M tokens
4. Curated examples - 30M tokens
"""
import os
import json
import requests
import subprocess
from pathlib import Path
from typing import List, Dict, Optional
from dataclasses import dataclass
from tqdm import tqdm
import hashlib
@dataclass
class DataSource:
"""Configuration for a data source"""
name: str
url: Optional[str]
estimated_tokens: int
priority: str # "critical", "high", "medium", "low"
collection_method: str # "download", "api", "scrape", "manual"
status: str = "pending"
class DataCollectionPipeline:
"""Automated pipeline to collect 1-2B tokens of training data"""
def __init__(self, output_dir: str = "./data/raw", target_tokens: int = 1_500_000_000):
self.output_dir = Path(output_dir)
self.output_dir.mkdir(parents=True, exist_ok=True)
self.target_tokens = target_tokens
self.collected_tokens = 0
self.sources = self._define_data_sources()
def _define_data_sources(self) -> List[DataSource]:
"""Define all data sources with metadata"""
return [
# ===== PUBLIC DATASETS (Automated) =====
DataSource(
name="Wikipedia Science",
url="https://dumps.wikimedia.org/enwiki/latest/enwiki-latest-pages-articles.xml.bz2",
estimated_tokens=500_000_000,
priority="critical",
collection_method="download"
),
DataSource(
name="arXiv Papers",
url="s3://arxiv/", # AWS S3 bucket
estimated_tokens=300_000_000,
priority="critical",
collection_method="download"
),
DataSource(
name="The Pile - arXiv subset",
url="https://the-eye.eu/public/AI/pile/train/",
estimated_tokens=200_000_000,
priority="high",
collection_method="download"
),
DataSource(
name="The Pile - PubMed subset",
url="https://the-eye.eu/public/AI/pile/train/",
estimated_tokens=150_000_000,
priority="high",
collection_method="download"
),
DataSource(
name="PubMed Abstracts",
url="https://ftp.ncbi.nlm.nih.gov/pubmed/baseline/",
estimated_tokens=200_000_000,
priority="high",
collection_method="download"
),
DataSource(
name="RedPajama - Wikipedia",
url="https://data.together.xyz/redpajama-data-1T/v1.0.0/",
estimated_tokens=100_000_000,
priority="medium",
collection_method="download"
),
# ===== DOMAIN-SPECIFIC SOURCES =====
DataSource(
name="Mycology Literature",
url=None, # Multiple sources
estimated_tokens=50_000_000,
priority="critical",
collection_method="scrape"
),
DataSource(
name="Drug Discovery Papers",
url="https://www.ebi.ac.uk/chembl/",
estimated_tokens=50_000_000,
priority="critical",
collection_method="api"
),
DataSource(
name="AI/ML Papers (arXiv cs.AI)",
url="https://arxiv.org/list/cs.AI/recent",
estimated_tokens=100_000_000,
priority="high",
collection_method="api"
),
DataSource(
name="GitHub AI Documentation",
url="https://github.com/",
estimated_tokens=50_000_000,
priority="medium",
collection_method="api"
),
# ===== PROPRIETARY DATA =====
DataSource(
name="Southwest Mushrooms Data",
url=None,
estimated_tokens=10_000_000,
priority="critical",
collection_method="manual"
),
DataSource(
name="CrowLogic Documentation",
url=None,
estimated_tokens=5_000_000,
priority="critical",
collection_method="manual"
),
DataSource(
name="Prologic Methodology Examples",
url=None,
estimated_tokens=5_000_000,
priority="critical",
collection_method="manual"
),
# ===== CURATED EXAMPLES =====
DataSource(
name="Chain-of-Thought Examples",
url=None,
estimated_tokens=10_000_000,
priority="high",
collection_method="manual"
),
DataSource(
name="Domain Q&A Pairs",
url=None,
estimated_tokens=20_000_000,
priority="high",
collection_method="manual"
),
]
def download_wikipedia(self) -> Dict:
"""Download and extract Wikipedia dump"""
print("\n" + "="*70)
print("Downloading Wikipedia Science Articles")
print("="*70)
wiki_dir = self.output_dir / "wikipedia"
wiki_dir.mkdir(exist_ok=True)
# Download latest dump
dump_url = "https://dumps.wikimedia.org/enwiki/latest/enwiki-latest-pages-articles.xml.bz2"
dump_file = wiki_dir / "enwiki-latest.xml.bz2"
print(f"\n📥 Downloading from: {dump_url}")
print(f" Destination: {dump_file}")
print(f" Size: ~20GB (compressed), ~80GB (uncompressed)")
print("\n⚠️ This will take 1-4 hours depending on connection speed")
print("\nCommands to run:")
print(f" wget {dump_url} -O {dump_file}")
print(f" python -m wikiextractor.WikiExtractor {dump_file} -o {wiki_dir / 'extracted'} --json")
return {
"status": "manual_steps_needed",
"instructions": "Run wget and WikiExtractor commands above",
"estimated_tokens": 500_000_000
}
def download_arxiv(self) -> Dict:
"""Download arXiv papers"""
print("\n" + "="*70)
print("Downloading arXiv Papers")
print("="*70)
arxiv_dir = self.output_dir / "arxiv"
arxiv_dir.mkdir(exist_ok=True)
print("\nOptions for arXiv data:")
print("\n1. Bulk download from S3 (recommended):")
print(" aws s3 sync s3://arxiv/src/ ./data/raw/arxiv/ --no-sign-request")
print("\n2. Use arXiv API:")
print(" pip install arxiv")
print(" python scripts/download_arxiv_api.py")
print("\n3. Use existing preprocessed datasets:")
print(" - RedPajama arXiv subset")
print(" - The Pile arXiv subset")
return {
"status": "manual_steps_needed",
"instructions": "Choose one of the methods above",
"estimated_tokens": 300_000_000
}
def download_pubmed(self) -> Dict:
"""Download PubMed abstracts"""
print("\n" + "="*70)
print("Downloading PubMed Abstracts")
print("="*70)
pubmed_dir = self.output_dir / "pubmed"
pubmed_dir.mkdir(exist_ok=True)
base_url = "https://ftp.ncbi.nlm.nih.gov/pubmed/baseline/"
print(f"\n📥 PubMed Baseline Files")
print(f" URL: {base_url}")
print(f" Files: pubmed24n*.xml.gz (1000+ files)")
print(f" Total size: ~30GB")
print("\nCommand to download:")
print(f" wget -r -np -nd -A 'pubmed24n*.xml.gz' {base_url} -P {pubmed_dir}")
return {
"status": "manual_steps_needed",
"instructions": "Run wget command above",
"estimated_tokens": 200_000_000
}
def download_the_pile_subset(self, subset: str = "arxiv") -> Dict:
"""Download specific subset from The Pile"""
print(f"\n" + "="*70)
print(f"Downloading The Pile - {subset} subset")
print("="*70)
pile_dir = self.output_dir / "the_pile" / subset
pile_dir.mkdir(parents=True, exist_ok=True)
print(f"\nThe Pile subsets available:")
print(" - ArXiv")
print(" - PubMed Abstracts")
print(" - PubMed Central")
print(" - FreeLaw")
print(" - USPTO Backgrounds")
print(" - Wikipedia (en)")
print("\nDownload from: https://the-eye.eu/public/AI/pile/train/")
print(f"Save to: {pile_dir}")
return {
"status": "manual_steps_needed",
"instructions": f"Download {subset} subset from The Pile",
"estimated_tokens": 200_000_000
}
def create_mycology_corpus(self) -> Dict:
"""Instructions for collecting mycology data"""
print("\n" + "="*70)
print("Collecting Mycology Domain Data")
print("="*70)
myco_dir = self.output_dir / "mycology"
myco_dir.mkdir(exist_ok=True)
sources = {
"MushroomExpert.com": "http://www.mushroomexpert.com/",
"Shroomery": "https://www.shroomery.org/",
"MycoWorks Papers": "Research papers on fungal materials",
"Cultivation Guides": "Paul Stamets, Tradd Cotter books",
"Scientific Papers": "Search PubMed/arXiv for mycology",
"Southwest Mushrooms Data": "Your proprietary cultivation data",
}
print("\nMycology data sources:")
for name, desc in sources.items():
print(f" ✓ {name}: {desc}")
print(f"\nSave all mycology text to: {myco_dir}")
print("\nRecommended structure:")
print(" mycology/")
print(" ├── cultivation_guides.txt")
print(" ├── species_descriptions.txt")
print(" ├── scientific_papers.txt")
print(" ├── forum_discussions.txt")
print(" └── southwest_mushrooms.txt")
return {
"status": "manual_collection_needed",
"directory": str(myco_dir),
"estimated_tokens": 50_000_000
}
def create_drug_discovery_corpus(self) -> Dict:
"""Instructions for collecting drug discovery data"""
print("\n" + "="*70)
print("Collecting Drug Discovery Domain Data")
print("="*70)
drug_dir = self.output_dir / "drug_discovery"
drug_dir.mkdir(exist_ok=True)
sources = {
"ChEMBL": "https://www.ebi.ac.uk/chembl/ (API available)",
"PubChem": "https://pubchem.ncbi.nlm.nih.gov/",
"DrugBank": "https://www.drugbank.com/",
"Clinical Trials": "https://clinicaltrials.gov/",
"Patents": "USPTO chemical patents",
"Papers": "PubMed chemistry/pharmacology papers",
}
print("\nDrug discovery data sources:")
for name, url in sources.items():
print(f" ✓ {name}: {url}")
print(f"\nSave to: {drug_dir}")
return {
"status": "manual_collection_needed",
"directory": str(drug_dir),
"estimated_tokens": 50_000_000
}
def estimate_tokens(self, text_file: Path) -> int:
"""Estimate token count in a text file"""
if not text_file.exists():
return 0
# Rough estimate: 1 token ≈ 0.75 words ≈ 4 characters
file_size = text_file.stat().st_size
estimated_tokens = file_size // 4
return estimated_tokens
def generate_collection_plan(self) -> Dict:
"""Generate a detailed data collection plan"""
print("\n" + "="*70)
print("CROWE LOGIC MINI - DATA COLLECTION PLAN")
print("Target: 1-2 Billion Tokens")
print("="*70)
plan = {
"target_tokens": self.target_tokens,
"phases": []
}
# Phase 1: Automated downloads (1 week)
phase1 = {
"name": "Phase 1: Public Datasets (Automated)",
"timeline": "Week 1",
"target_tokens": 1_200_000_000,
"sources": [
{"name": "Wikipedia", "tokens": 500_000_000, "time": "6-12 hours"},
{"name": "arXiv", "tokens": 300_000_000, "time": "12-24 hours"},
{"name": "PubMed", "tokens": 200_000_000, "time": "6-12 hours"},
{"name": "The Pile subsets", "tokens": 200_000_000, "time": "6-12 hours"},
]
}
# Phase 2: Domain-specific collection (3-5 days)
phase2 = {
"name": "Phase 2: Domain-Specific Data",
"timeline": "Week 2 (3-5 days)",
"target_tokens": 200_000_000,
"sources": [
{"name": "Mycology", "tokens": 50_000_000, "method": "web scraping + papers"},
{"name": "Drug Discovery", "tokens": 50_000_000, "method": "APIs + databases"},
{"name": "AI/ML", "tokens": 100_000_000, "method": "arXiv subset + docs"},
]
}
# Phase 3: Proprietary data (1-2 days)
phase3 = {
"name": "Phase 3: Proprietary Data",
"timeline": "Week 2 (1-2 days)",
"target_tokens": 20_000_000,
"sources": [
{"name": "Southwest Mushrooms", "tokens": 10_000_000, "method": "extract from records"},
{"name": "CrowLogic/CriOS", "tokens": 10_000_000, "method": "documentation"},
]
}
# Phase 4: Curated examples (2-3 days)
phase4 = {
"name": "Phase 4: Curated Examples",
"timeline": "Week 2-3 (2-3 days)",
"target_tokens": 30_000_000,
"sources": [
{"name": "Chain-of-thought", "tokens": 10_000_000, "method": "manual creation"},
{"name": "Domain Q&A", "tokens": 20_000_000, "method": "curated + generated"},
]
}
plan["phases"] = [phase1, phase2, phase3, phase4]
# Print plan
for phase in plan["phases"]:
print(f"\n{phase['name']}")
print(f"Timeline: {phase['timeline']}")
print(f"Target: {phase['target_tokens']:,} tokens")
print(f"\nSources:")
for source in phase["sources"]:
print(f" ✓ {source['name']}: {source['tokens']:,} tokens")
total_tokens = sum(p["target_tokens"] for p in plan["phases"])
print(f"\n{'='*70}")
print(f"TOTAL: {total_tokens:,} tokens ({total_tokens/1e9:.1f}B)")
print(f"Timeline: 2-3 weeks")
print(f"{'='*70}")
return plan
def check_existing_data(self) -> Dict:
"""Check what data has already been collected"""
print("\n" + "="*70)
print("Checking Existing Data")
print("="*70)
collected = {}
total_tokens = 0
if not self.output_dir.exists():
print("\n⚠️ No data directory found. Starting from scratch.")
return collected
for subdir in self.output_dir.iterdir():
if subdir.is_dir():
tokens = 0
files = list(subdir.glob("**/*.txt")) + list(subdir.glob("**/*.json"))
for f in files:
tokens += self.estimate_tokens(f)
if tokens > 0:
collected[subdir.name] = {
"files": len(files),
"tokens": tokens
}
total_tokens += tokens
if collected:
print(f"\n✓ Found existing data:")
for name, info in collected.items():
print(f" {name}: {info['files']} files, ~{info['tokens']:,} tokens")
print(f"\nTotal collected: ~{total_tokens:,} tokens ({total_tokens/1e9:.2f}B)")
else:
print("\n⚠️ No existing data found.")
remaining = max(0, self.target_tokens - total_tokens)
print(f"Remaining to collect: ~{remaining:,} tokens ({remaining/1e9:.2f}B)")
return collected
def main():
"""Main execution"""
print("\n🚀 Crowe Logic Mini - Training Data Collection Pipeline\n")
pipeline = DataCollectionPipeline(
output_dir="./data/raw",
target_tokens=1_500_000_000 # 1.5B tokens
)
# Check existing data
pipeline.check_existing_data()
# Generate collection plan
plan = pipeline.generate_collection_plan()
# Provide next steps
print("\n" + "="*70)
print("📋 NEXT STEPS")
print("="*70)
print("\n1. Review the collection plan above")
print("2. Start with Phase 1 (automated downloads)")
print("3. Run individual collection scripts:")
print("\n python data_collection/download_wikipedia.py")
print(" python data_collection/download_arxiv.py")
print(" python data_collection/download_pubmed.py")
print("\n4. For domain-specific data, see instructions above")
print("5. Once data is collected, run preprocessing:")
print("\n python data_collection/preprocess_training_data.py")
print("\n6. Train tokenizer:")
print("\n python tokenizer/build_scientific_tokenizer.py")
print("\n" + "="*70)
print("For detailed instructions, see: DATA_COLLECTION_GUIDE.md")
print("="*70)
if __name__ == "__main__":
main()