|
|
|
|
|
""" |
|
|
Document Processor for Training Data Generation |
|
|
============================================== |
|
|
Processes PDF files, text files, and markdown documents to create training data |
|
|
for the enhanced tokenizer system. |
|
|
""" |
|
|
|
|
|
import os |
|
|
import json |
|
|
import re |
|
|
import asyncio |
|
|
from pathlib import Path |
|
|
from typing import List, Dict, Any, Optional |
|
|
from datetime import datetime |
|
|
import hashlib |
|
|
|
|
|
|
|
|
try: |
|
|
import PyPDF2 |
|
|
PDF_AVAILABLE = True |
|
|
print("✅ PyPDF2 available for PDF processing") |
|
|
except ImportError: |
|
|
PDF_AVAILABLE = False |
|
|
print("⚠️ PyPDF2 not available - install with: pip install PyPDF2") |
|
|
|
|
|
try: |
|
|
import pdfplumber |
|
|
PDFPLUMBER_AVAILABLE = True |
|
|
print("✅ pdfplumber available for advanced PDF processing") |
|
|
except ImportError: |
|
|
PDFPLUMBER_AVAILABLE = False |
|
|
print("⚠️ pdfplumber not available - install with: pip install pdfplumber") |
|
|
|
|
|
class DocumentProcessor: |
|
|
"""Processes various document types for training data generation.""" |
|
|
|
|
|
def __init__(self): |
|
|
self.processed_documents = [] |
|
|
self.training_data = [] |
|
|
|
|
|
def extract_text_from_pdf_pypdf2(self, pdf_path: str) -> str: |
|
|
"""Extract text from PDF using PyPDF2.""" |
|
|
if not PDF_AVAILABLE: |
|
|
return "" |
|
|
|
|
|
try: |
|
|
text = "" |
|
|
with open(pdf_path, 'rb') as file: |
|
|
pdf_reader = PyPDF2.PdfReader(file) |
|
|
for page_num in range(len(pdf_reader.pages)): |
|
|
page = pdf_reader.pages[page_num] |
|
|
text += page.extract_text() + "\n" |
|
|
return text.strip() |
|
|
except Exception as e: |
|
|
print(f"❌ PyPDF2 extraction failed for {pdf_path}: {e}") |
|
|
return "" |
|
|
|
|
|
def extract_text_from_pdf_pdfplumber(self, pdf_path: str) -> str: |
|
|
"""Extract text from PDF using pdfplumber (more accurate).""" |
|
|
if not PDFPLUMBER_AVAILABLE: |
|
|
return "" |
|
|
|
|
|
try: |
|
|
text = "" |
|
|
with pdfplumber.open(pdf_path) as pdf: |
|
|
for page in pdf.pages: |
|
|
page_text = page.extract_text() |
|
|
if page_text: |
|
|
text += page_text + "\n" |
|
|
return text.strip() |
|
|
except Exception as e: |
|
|
print(f"❌ pdfplumber extraction failed for {pdf_path}: {e}") |
|
|
return "" |
|
|
|
|
|
def extract_text_from_pdf(self, pdf_path: str) -> str: |
|
|
"""Extract text from PDF using the best available method.""" |
|
|
print(f"📄 Processing PDF: {pdf_path}") |
|
|
|
|
|
|
|
|
if PDFPLUMBER_AVAILABLE: |
|
|
text = self.extract_text_from_pdf_pdfplumber(pdf_path) |
|
|
if text: |
|
|
print(f" ✅ Extracted {len(text)} characters using pdfplumber") |
|
|
return text |
|
|
|
|
|
|
|
|
if PDF_AVAILABLE: |
|
|
text = self.extract_text_from_pdf_pypdf2(pdf_path) |
|
|
if text: |
|
|
print(f" ✅ Extracted {len(text)} characters using PyPDF2") |
|
|
return text |
|
|
|
|
|
print(f" ❌ Could not extract text from {pdf_path}") |
|
|
return "" |
|
|
|
|
|
def extract_text_from_file(self, file_path: str) -> str: |
|
|
"""Extract text from various file types.""" |
|
|
file_path = Path(file_path) |
|
|
|
|
|
if not file_path.exists(): |
|
|
print(f"❌ File not found: {file_path}") |
|
|
return "" |
|
|
|
|
|
try: |
|
|
if file_path.suffix.lower() == '.pdf': |
|
|
return self.extract_text_from_pdf(str(file_path)) |
|
|
|
|
|
elif file_path.suffix.lower() in ['.txt', '.md', '.tex']: |
|
|
with open(file_path, 'r', encoding='utf-8') as f: |
|
|
text = f.read() |
|
|
print(f" ✅ Extracted {len(text)} characters from {file_path.name}") |
|
|
return text |
|
|
|
|
|
else: |
|
|
print(f" ⚠️ Unsupported file type: {file_path.suffix}") |
|
|
return "" |
|
|
|
|
|
except Exception as e: |
|
|
print(f" ❌ Error processing {file_path}: {e}") |
|
|
return "" |
|
|
|
|
|
def clean_and_preprocess_text(self, text: str) -> str: |
|
|
"""Clean and preprocess extracted text.""" |
|
|
if not text: |
|
|
return "" |
|
|
|
|
|
|
|
|
text = re.sub(r'\s+', ' ', text) |
|
|
|
|
|
|
|
|
text = re.sub(r'[^\w\s\.\,\!\?\;\:\-\(\)\[\]\{\}\$\^\+\-\*\/\=\<\>\%\@\#\&]', '', text) |
|
|
|
|
|
|
|
|
text = re.sub(r'\f', '\n', text) |
|
|
text = re.sub(r'\r\n', '\n', text) |
|
|
text = re.sub(r'\r', '\n', text) |
|
|
|
|
|
|
|
|
text = re.sub(r'\n\s*\n\s*\n+', '\n\n', text) |
|
|
|
|
|
return text.strip() |
|
|
|
|
|
def detect_content_type(self, text: str) -> str: |
|
|
"""Detect the type of content in the text.""" |
|
|
if not text: |
|
|
return "empty" |
|
|
|
|
|
|
|
|
math_indicators = len(re.findall(r'[\$\^\+\-\*\/\=\<\>\(\)]', text)) |
|
|
math_ratio = math_indicators / len(text) if text else 0 |
|
|
|
|
|
|
|
|
code_keywords = ['def ', 'class ', 'import ', 'function', 'var ', 'const ', 'if ', 'for ', 'while '] |
|
|
code_count = sum(1 for keyword in code_keywords if keyword.lower() in text.lower()) |
|
|
|
|
|
|
|
|
academic_keywords = ['research', 'study', 'analysis', 'methodology', 'results', 'conclusion', 'abstract'] |
|
|
academic_count = sum(1 for keyword in academic_keywords if keyword.lower() in text.lower()) |
|
|
|
|
|
if math_ratio > 0.01: |
|
|
return "mathematical" |
|
|
elif code_count > 3: |
|
|
return "code" |
|
|
elif academic_count > 2: |
|
|
return "academic" |
|
|
else: |
|
|
return "general" |
|
|
|
|
|
def chunk_text_for_training(self, text: str, chunk_size: int = 512, overlap: int = 50) -> List[str]: |
|
|
"""Chunk text into training-sized pieces.""" |
|
|
if not text: |
|
|
return [] |
|
|
|
|
|
words = text.split() |
|
|
chunks = [] |
|
|
|
|
|
for i in range(0, len(words), chunk_size - overlap): |
|
|
chunk = ' '.join(words[i:i + chunk_size]) |
|
|
if len(chunk.strip()) > 50: |
|
|
chunks.append(chunk.strip()) |
|
|
|
|
|
return chunks |
|
|
|
|
|
def create_training_entry(self, chunk: str, source_file: str, chunk_id: int) -> Dict[str, Any]: |
|
|
"""Create a training data entry from a text chunk.""" |
|
|
content_type = self.detect_content_type(chunk) |
|
|
|
|
|
|
|
|
metadata = { |
|
|
"source_file": source_file, |
|
|
"chunk_id": chunk_id, |
|
|
"content_type": content_type, |
|
|
"word_count": len(chunk.split()), |
|
|
"char_count": len(chunk), |
|
|
"processed_at": datetime.now().isoformat(), |
|
|
"chunk_hash": hashlib.md5(chunk.encode()).hexdigest()[:8] |
|
|
} |
|
|
|
|
|
|
|
|
math_expressions = re.findall(r'\$[^$]+\$|\$\$[^$]+\$\$|[\w\s]*[\+\-\*\/\=\<\>][\w\s]*', chunk) |
|
|
|
|
|
|
|
|
entities = re.findall(r'\b[A-Z][a-z]+ [A-Z][a-z]+\b|\b[A-Z][A-Z]+\b', chunk) |
|
|
|
|
|
return { |
|
|
"id": f"{Path(source_file).stem}_{chunk_id}", |
|
|
"content": chunk, |
|
|
"metadata": metadata, |
|
|
"features": { |
|
|
"content_type": content_type, |
|
|
"math_expressions": len(math_expressions), |
|
|
"entities": len(entities), |
|
|
"complexity_score": len(chunk.split()) / 100.0 |
|
|
}, |
|
|
"training_ready": True |
|
|
} |
|
|
|
|
|
def process_document(self, file_path: str) -> Dict[str, Any]: |
|
|
"""Process a single document and return training data.""" |
|
|
print(f"📄 Processing document: {file_path}") |
|
|
|
|
|
|
|
|
raw_text = self.extract_text_from_file(file_path) |
|
|
if not raw_text: |
|
|
return {"success": False, "error": "No text extracted"} |
|
|
|
|
|
|
|
|
clean_text = self.clean_and_preprocess_text(raw_text) |
|
|
if not clean_text: |
|
|
return {"success": False, "error": "No text after cleaning"} |
|
|
|
|
|
|
|
|
chunks = self.chunk_text_for_training(clean_text) |
|
|
if not chunks: |
|
|
return {"success": False, "error": "No valid chunks created"} |
|
|
|
|
|
|
|
|
training_entries = [] |
|
|
for i, chunk in enumerate(chunks): |
|
|
entry = self.create_training_entry(chunk, file_path, i) |
|
|
training_entries.append(entry) |
|
|
|
|
|
result = { |
|
|
"success": True, |
|
|
"source_file": file_path, |
|
|
"raw_text_length": len(raw_text), |
|
|
"clean_text_length": len(clean_text), |
|
|
"chunks_created": len(chunks), |
|
|
"training_entries": training_entries, |
|
|
"content_types": list(set(entry["features"]["content_type"] for entry in training_entries)), |
|
|
"total_math_expressions": sum(entry["features"]["math_expressions"] for entry in training_entries), |
|
|
"total_entities": sum(entry["features"]["entities"] for entry in training_entries) |
|
|
} |
|
|
|
|
|
print(f" ✅ Created {len(training_entries)} training entries") |
|
|
print(f" 📊 Content types: {result['content_types']}") |
|
|
print(f" 🧮 Math expressions: {result['total_math_expressions']}") |
|
|
print(f" 🏷️ Entities: {result['total_entities']}") |
|
|
|
|
|
return result |
|
|
|
|
|
def process_directory(self, directory_path: str, file_extensions: List[str] = None) -> Dict[str, Any]: |
|
|
"""Process all documents in a directory.""" |
|
|
if file_extensions is None: |
|
|
file_extensions = ['.pdf', '.txt', '.md', '.tex'] |
|
|
|
|
|
directory = Path(directory_path) |
|
|
if not directory.exists(): |
|
|
return {"success": False, "error": f"Directory not found: {directory_path}"} |
|
|
|
|
|
|
|
|
files_to_process = [] |
|
|
for ext in file_extensions: |
|
|
files_to_process.extend(directory.glob(f"**/*{ext}")) |
|
|
|
|
|
print(f"📁 Found {len(files_to_process)} files to process in {directory_path}") |
|
|
|
|
|
all_results = { |
|
|
"success": True, |
|
|
"directory": directory_path, |
|
|
"files_found": len(files_to_process), |
|
|
"files_processed": 0, |
|
|
"files_failed": 0, |
|
|
"total_training_entries": 0, |
|
|
"results": [] |
|
|
} |
|
|
|
|
|
|
|
|
for file_path in files_to_process: |
|
|
try: |
|
|
result = self.process_document(str(file_path)) |
|
|
all_results["results"].append(result) |
|
|
|
|
|
if result["success"]: |
|
|
all_results["files_processed"] += 1 |
|
|
all_results["total_training_entries"] += len(result["training_entries"]) |
|
|
else: |
|
|
all_results["files_failed"] += 1 |
|
|
|
|
|
except Exception as e: |
|
|
print(f"❌ Error processing {file_path}: {e}") |
|
|
all_results["files_failed"] += 1 |
|
|
all_results["results"].append({ |
|
|
"success": False, |
|
|
"source_file": str(file_path), |
|
|
"error": str(e) |
|
|
}) |
|
|
|
|
|
|
|
|
all_results["success_rate"] = all_results["files_processed"] / all_results["files_found"] if all_results["files_found"] > 0 else 0 |
|
|
|
|
|
print(f"\n📊 Processing Summary:") |
|
|
print(f" ✅ Files processed: {all_results['files_processed']}") |
|
|
print(f" ❌ Files failed: {all_results['files_failed']}") |
|
|
print(f" 📝 Total training entries: {all_results['total_training_entries']}") |
|
|
print(f" 📈 Success rate: {all_results['success_rate']:.1%}") |
|
|
|
|
|
return all_results |
|
|
|
|
|
def save_training_data(self, results: Dict[str, Any], output_file: str = "document_training_data.jsonl"): |
|
|
"""Save training data to JSONL file.""" |
|
|
training_entries = [] |
|
|
|
|
|
for result in results.get("results", []): |
|
|
if result.get("success") and "training_entries" in result: |
|
|
training_entries.extend(result["training_entries"]) |
|
|
|
|
|
print(f"💾 Saving {len(training_entries)} training entries to {output_file}") |
|
|
|
|
|
with open(output_file, 'w', encoding='utf-8') as f: |
|
|
for entry in training_entries: |
|
|
f.write(json.dumps(entry, ensure_ascii=False) + '\n') |
|
|
|
|
|
print(f"✅ Training data saved to {output_file}") |
|
|
return len(training_entries) |
|
|
|
|
|
def main(): |
|
|
"""Main function to process documents and generate training data.""" |
|
|
print("🚀 Document Processor for Training Data Generation") |
|
|
print("=" * 55) |
|
|
|
|
|
processor = DocumentProcessor() |
|
|
|
|
|
|
|
|
current_dir = "." |
|
|
|
|
|
print(f"📁 Processing directory: {current_dir}") |
|
|
results = processor.process_directory(current_dir) |
|
|
|
|
|
if results["success"] and results["total_training_entries"] > 0: |
|
|
|
|
|
entries_saved = processor.save_training_data(results) |
|
|
|
|
|
|
|
|
with open("document_processing_results.json", 'w', encoding='utf-8') as f: |
|
|
json.dump(results, f, indent=2, ensure_ascii=False) |
|
|
|
|
|
print(f"\n🎉 Processing complete!") |
|
|
print(f"📝 Created {entries_saved} training entries") |
|
|
print(f"📁 Results saved to document_processing_results.json") |
|
|
print(f"📁 Training data saved to document_training_data.jsonl") |
|
|
|
|
|
|
|
|
content_types = {} |
|
|
for result in results["results"]: |
|
|
if result.get("success"): |
|
|
for entry in result.get("training_entries", []): |
|
|
content_type = entry["features"]["content_type"] |
|
|
content_types[content_type] = content_types.get(content_type, 0) + 1 |
|
|
|
|
|
print(f"\n📊 Content Type Distribution:") |
|
|
for content_type, count in content_types.items(): |
|
|
print(f" {content_type}: {count} entries") |
|
|
|
|
|
else: |
|
|
print("❌ No training data generated") |
|
|
if "error" in results: |
|
|
print(f"Error: {results['error']}") |
|
|
|
|
|
if __name__ == "__main__": |
|
|
main() |
|
|
|