|
|
|
|
|
""" |
|
|
Comprehensive Data Processor |
|
|
============================ |
|
|
Processes all available data sources: PDFs, documents, existing training data, |
|
|
and generates comprehensive training datasets for the enhanced tokenizer system. |
|
|
""" |
|
|
|
|
|
import json |
|
|
import os |
|
|
import re |
|
|
from pathlib import Path |
|
|
from typing import Dict, List, Any |
|
|
from datetime import datetime |
|
|
|
|
|
|
|
|
try: |
|
|
import PyPDF2 |
|
|
PDF_AVAILABLE = True |
|
|
except ImportError: |
|
|
PDF_AVAILABLE = False |
|
|
|
|
|
try: |
|
|
import pdfplumber |
|
|
PDFPLUMBER_AVAILABLE = True |
|
|
except ImportError: |
|
|
PDFPLUMBER_AVAILABLE = False |
|
|
|
|
|
class ComprehensiveDataProcessor: |
|
|
"""Processes all available data sources for training.""" |
|
|
|
|
|
def __init__(self): |
|
|
self.all_training_data = [] |
|
|
self.processing_stats = { |
|
|
"files_processed": 0, |
|
|
"total_entries": 0, |
|
|
"sources": {} |
|
|
} |
|
|
|
|
|
def extract_pdf_text(self, pdf_path: str) -> str: |
|
|
"""Extract text from PDF.""" |
|
|
try: |
|
|
if PDFPLUMBER_AVAILABLE: |
|
|
text = "" |
|
|
with pdfplumber.open(pdf_path) as pdf: |
|
|
for page in pdf.pages: |
|
|
page_text = page.extract_text() |
|
|
if page_text: |
|
|
text += page_text + "\n" |
|
|
return text.strip() |
|
|
elif PDF_AVAILABLE: |
|
|
text = "" |
|
|
with open(pdf_path, 'rb') as file: |
|
|
pdf_reader = PyPDF2.PdfReader(file) |
|
|
for page in pdf_reader.pages: |
|
|
text += page.extract_text() + "\n" |
|
|
return text.strip() |
|
|
except Exception as e: |
|
|
print(f"❌ PDF extraction failed for {pdf_path}: {e}") |
|
|
return "" |
|
|
|
|
|
def process_existing_jsonl(self, file_path: str) -> List[Dict[str, Any]]: |
|
|
"""Process existing JSONL training files.""" |
|
|
entries = [] |
|
|
try: |
|
|
with open(file_path, 'r', encoding='utf-8') as f: |
|
|
for line_num, line in enumerate(f, 1): |
|
|
line = line.strip() |
|
|
if line: |
|
|
try: |
|
|
data = json.loads(line) |
|
|
|
|
|
entry = { |
|
|
"id": f"{Path(file_path).stem}_{line_num}", |
|
|
"source": "existing_jsonl", |
|
|
"source_file": file_path, |
|
|
"prompt": data.get("prompt", ""), |
|
|
"completion": data.get("completion", ""), |
|
|
"content": f"{data.get('prompt', '')} {data.get('completion', '')}", |
|
|
"metadata": data.get("metadata", {}), |
|
|
"processed_at": datetime.now().isoformat() |
|
|
} |
|
|
entries.append(entry) |
|
|
except json.JSONDecodeError as e: |
|
|
print(f"⚠️ JSON decode error in {file_path} line {line_num}: {e}") |
|
|
except Exception as e: |
|
|
print(f"❌ Error processing {file_path}: {e}") |
|
|
|
|
|
print(f"✅ Processed {len(entries)} entries from {file_path}") |
|
|
return entries |
|
|
|
|
|
def process_text_file(self, file_path: str) -> List[Dict[str, Any]]: |
|
|
"""Process text/markdown files.""" |
|
|
entries = [] |
|
|
try: |
|
|
with open(file_path, 'r', encoding='utf-8') as f: |
|
|
content = f.read() |
|
|
|
|
|
|
|
|
content = re.sub(r'\s+', ' ', content).strip() |
|
|
|
|
|
|
|
|
chunks = self.chunk_text(content, chunk_size=512) |
|
|
|
|
|
for i, chunk in enumerate(chunks): |
|
|
entry = { |
|
|
"id": f"{Path(file_path).stem}_{i+1}", |
|
|
"source": "text_file", |
|
|
"source_file": file_path, |
|
|
"content": chunk, |
|
|
"metadata": { |
|
|
"file_type": Path(file_path).suffix, |
|
|
"chunk_id": i + 1, |
|
|
"total_chunks": len(chunks) |
|
|
}, |
|
|
"processed_at": datetime.now().isoformat() |
|
|
} |
|
|
entries.append(entry) |
|
|
|
|
|
except Exception as e: |
|
|
print(f"❌ Error processing {file_path}: {e}") |
|
|
|
|
|
print(f"✅ Processed {len(entries)} entries from {file_path}") |
|
|
return entries |
|
|
|
|
|
def process_pdf_file(self, file_path: str) -> List[Dict[str, Any]]: |
|
|
"""Process PDF files.""" |
|
|
entries = [] |
|
|
try: |
|
|
text = self.extract_pdf_text(file_path) |
|
|
if text: |
|
|
|
|
|
text = re.sub(r'\s+', ' ', text).strip() |
|
|
chunks = self.chunk_text(text, chunk_size=512) |
|
|
|
|
|
for i, chunk in enumerate(chunks): |
|
|
entry = { |
|
|
"id": f"{Path(file_path).stem}_{i+1}", |
|
|
"source": "pdf_file", |
|
|
"source_file": file_path, |
|
|
"content": chunk, |
|
|
"metadata": { |
|
|
"file_type": "pdf", |
|
|
"chunk_id": i + 1, |
|
|
"total_chunks": len(chunks), |
|
|
"extracted_length": len(text) |
|
|
}, |
|
|
"processed_at": datetime.now().isoformat() |
|
|
} |
|
|
entries.append(entry) |
|
|
except Exception as e: |
|
|
print(f"❌ Error processing {file_path}: {e}") |
|
|
|
|
|
print(f"✅ Processed {len(entries)} entries from {file_path}") |
|
|
return entries |
|
|
|
|
|
def chunk_text(self, text: str, chunk_size: int = 512) -> List[str]: |
|
|
"""Chunk text into manageable pieces.""" |
|
|
words = text.split() |
|
|
chunks = [] |
|
|
|
|
|
for i in range(0, len(words), chunk_size): |
|
|
chunk = ' '.join(words[i:i + chunk_size]) |
|
|
if len(chunk.strip()) > 50: |
|
|
chunks.append(chunk.strip()) |
|
|
|
|
|
return chunks |
|
|
|
|
|
def analyze_content_type(self, content: str) -> str: |
|
|
"""Analyze content type.""" |
|
|
content_lower = content.lower() |
|
|
|
|
|
|
|
|
if any(keyword in content_lower for keyword in ['def ', 'class ', 'import ', 'function', 'var ', 'const ']): |
|
|
return "code" |
|
|
|
|
|
|
|
|
if re.search(r'[\$\^\+\-\*\/\=\<\>\(\)]', content): |
|
|
return "mathematical" |
|
|
|
|
|
|
|
|
if any(keyword in content_lower for keyword in ['select', 'from', 'where', 'join', 'sql']): |
|
|
return "sql" |
|
|
|
|
|
|
|
|
if any(keyword in content_lower for keyword in ['research', 'study', 'analysis', 'methodology', 'results']): |
|
|
return "academic" |
|
|
|
|
|
return "general" |
|
|
|
|
|
def enhance_training_entries(self, entries: List[Dict[str, Any]]) -> List[Dict[str, Any]]: |
|
|
"""Enhance training entries with additional metadata.""" |
|
|
enhanced_entries = [] |
|
|
|
|
|
for entry in entries: |
|
|
content = entry.get("content", "") |
|
|
content_type = self.analyze_content_type(content) |
|
|
|
|
|
|
|
|
enhanced_entry = entry.copy() |
|
|
enhanced_entry["enhanced_metadata"] = { |
|
|
"content_type": content_type, |
|
|
"word_count": len(content.split()), |
|
|
"char_count": len(content), |
|
|
"has_code": "code" in content_type, |
|
|
"has_math": "mathematical" in content_type or "$" in content, |
|
|
"has_sql": "sql" in content_type, |
|
|
"complexity_score": len(content.split()) / 100.0, |
|
|
"unique_words": len(set(content.lower().split())), |
|
|
"avg_word_length": sum(len(word) for word in content.split()) / len(content.split()) if content.split() else 0 |
|
|
} |
|
|
|
|
|
enhanced_entries.append(enhanced_entry) |
|
|
|
|
|
return enhanced_entries |
|
|
|
|
|
def process_all_data_sources(self) -> Dict[str, Any]: |
|
|
"""Process all available data sources.""" |
|
|
print("🚀 Comprehensive Data Processing") |
|
|
print("=" * 40) |
|
|
|
|
|
|
|
|
jsonl_files = [ |
|
|
"matrix_training_data.jsonl", |
|
|
"training_data_emergent.jsonl", |
|
|
"comprehensive_training_data.jsonl" |
|
|
] |
|
|
|
|
|
text_files = [ |
|
|
"README.md", |
|
|
"COMPLETE_INTEGRATION_SUMMARY.md", |
|
|
"THE_BLOOM_IS_COMPLETE.md", |
|
|
"COMPLETE_ACHIEVEMENT_REPORT.md", |
|
|
"BENCHMARK_ANALYSIS.md" |
|
|
] |
|
|
|
|
|
pdf_files = [ |
|
|
"LOOM_OF_EMERGENCE.pdf" |
|
|
] |
|
|
|
|
|
all_entries = [] |
|
|
|
|
|
|
|
|
print("\n📄 Processing JSONL training files...") |
|
|
for file_path in jsonl_files: |
|
|
if Path(file_path).exists(): |
|
|
entries = self.process_existing_jsonl(file_path) |
|
|
all_entries.extend(entries) |
|
|
self.processing_stats["sources"][file_path] = len(entries) |
|
|
self.processing_stats["files_processed"] += 1 |
|
|
else: |
|
|
print(f"⚠️ File not found: {file_path}") |
|
|
|
|
|
|
|
|
print("\n📝 Processing text/markdown files...") |
|
|
for file_path in text_files: |
|
|
if Path(file_path).exists(): |
|
|
entries = self.process_text_file(file_path) |
|
|
all_entries.extend(entries) |
|
|
self.processing_stats["sources"][file_path] = len(entries) |
|
|
self.processing_stats["files_processed"] += 1 |
|
|
else: |
|
|
print(f"⚠️ File not found: {file_path}") |
|
|
|
|
|
|
|
|
print("\n📄 Processing PDF files...") |
|
|
for file_path in pdf_files: |
|
|
if Path(file_path).exists(): |
|
|
entries = self.process_pdf_file(file_path) |
|
|
all_entries.extend(entries) |
|
|
self.processing_stats["sources"][file_path] = len(entries) |
|
|
self.processing_stats["files_processed"] += 1 |
|
|
else: |
|
|
print(f"⚠️ File not found: {file_path}") |
|
|
|
|
|
|
|
|
print("\n🔧 Enhancing training entries...") |
|
|
enhanced_entries = self.enhance_training_entries(all_entries) |
|
|
|
|
|
self.processing_stats["total_entries"] = len(enhanced_entries) |
|
|
|
|
|
|
|
|
content_types = {} |
|
|
for entry in enhanced_entries: |
|
|
content_type = entry["enhanced_metadata"]["content_type"] |
|
|
content_types[content_type] = content_types.get(content_type, 0) + 1 |
|
|
|
|
|
results = { |
|
|
"processing_stats": self.processing_stats, |
|
|
"content_type_distribution": content_types, |
|
|
"total_entries": len(enhanced_entries), |
|
|
"timestamp": datetime.now().isoformat(), |
|
|
"sources_summary": { |
|
|
"jsonl_files": len([f for f in jsonl_files if Path(f).exists()]), |
|
|
"text_files": len([f for f in text_files if Path(f).exists()]), |
|
|
"pdf_files": len([f for f in pdf_files if Path(f).exists()]) |
|
|
} |
|
|
} |
|
|
|
|
|
return results, enhanced_entries |
|
|
|
|
|
def save_comprehensive_training_data(self, entries: List[Dict[str, Any]], results: Dict[str, Any]): |
|
|
"""Save comprehensive training data.""" |
|
|
print(f"\n💾 Saving {len(entries)} training entries...") |
|
|
|
|
|
|
|
|
with open("comprehensive_training_data.jsonl", 'w', encoding='utf-8') as f: |
|
|
for entry in entries: |
|
|
f.write(json.dumps(entry, ensure_ascii=False) + '\n') |
|
|
|
|
|
|
|
|
with open("comprehensive_processing_results.json", 'w', encoding='utf-8') as f: |
|
|
json.dump(results, f, indent=2, ensure_ascii=False) |
|
|
|
|
|
|
|
|
summary = { |
|
|
"total_entries": len(entries), |
|
|
"content_types": results["content_type_distribution"], |
|
|
"sources": results["processing_stats"]["sources"], |
|
|
"files_processed": results["processing_stats"]["files_processed"], |
|
|
"timestamp": results["timestamp"] |
|
|
} |
|
|
|
|
|
with open("training_data_summary.json", 'w', encoding='utf-8') as f: |
|
|
json.dump(summary, f, indent=2, ensure_ascii=False) |
|
|
|
|
|
print("✅ Training data saved:") |
|
|
print(" 📁 comprehensive_training_data.jsonl") |
|
|
print(" 📁 comprehensive_processing_results.json") |
|
|
print(" 📁 training_data_summary.json") |
|
|
|
|
|
def print_processing_summary(self, results: Dict[str, Any], entries: List[Dict[str, Any]]): |
|
|
"""Print processing summary.""" |
|
|
print("\n📊 Processing Summary") |
|
|
print("=" * 30) |
|
|
print(f"✅ Files processed: {results['processing_stats']['files_processed']}") |
|
|
print(f"📝 Total entries: {len(entries)}") |
|
|
|
|
|
print(f"\n📋 Content Type Distribution:") |
|
|
for content_type, count in results["content_type_distribution"].items(): |
|
|
percentage = (count / len(entries)) * 100 |
|
|
print(f" {content_type}: {count} entries ({percentage:.1f}%)") |
|
|
|
|
|
print(f"\n📁 Sources:") |
|
|
for source, count in results["processing_stats"]["sources"].items(): |
|
|
print(f" {Path(source).name}: {count} entries") |
|
|
|
|
|
print(f"\n🎯 Ready for training with {len(entries)} comprehensive entries!") |
|
|
|
|
|
def main(): |
|
|
"""Main processing function.""" |
|
|
processor = ComprehensiveDataProcessor() |
|
|
|
|
|
|
|
|
results, entries = processor.process_all_data_sources() |
|
|
|
|
|
|
|
|
processor.save_comprehensive_training_data(entries, results) |
|
|
|
|
|
|
|
|
processor.print_processing_summary(results, entries) |
|
|
|
|
|
return results, entries |
|
|
|
|
|
if __name__ == "__main__": |
|
|
main() |
|
|
|