|
|
|
|
|
""" |
|
|
PDF Processing System for LiMp Training Data |
|
|
============================================ |
|
|
Advanced PDF processing system for generating training data from various document types. |
|
|
""" |
|
|
|
|
|
import os |
|
|
import json |
|
|
import asyncio |
|
|
import logging |
|
|
from pathlib import Path |
|
|
from typing import Dict, List, Any, Optional, Tuple |
|
|
from dataclasses import dataclass, asdict |
|
|
from datetime import datetime |
|
|
import hashlib |
|
|
|
|
|
|
|
|
try: |
|
|
import PyPDF2 |
|
|
import pdfplumber |
|
|
import fitz |
|
|
PDF_PROCESSING_AVAILABLE = True |
|
|
except ImportError: |
|
|
PDF_PROCESSING_AVAILABLE = False |
|
|
print("⚠️ PDF processing libraries not available. Install with: pip install PyPDF2 pdfplumber PyMuPDF") |
|
|
|
|
|
|
|
|
try: |
|
|
import nltk |
|
|
from nltk.tokenize import sent_tokenize, word_tokenize |
|
|
from nltk.corpus import stopwords |
|
|
from nltk.stem import WordNetLemmatizer |
|
|
TEXT_PROCESSING_AVAILABLE = True |
|
|
except ImportError: |
|
|
TEXT_PROCESSING_AVAILABLE = False |
|
|
print("⚠️ NLTK not available. Install with: pip install nltk") |
|
|
|
|
|
|
|
|
try: |
|
|
import numpy as np |
|
|
from sklearn.feature_extraction.text import TfidfVectorizer |
|
|
from sklearn.cluster import KMeans |
|
|
from sklearn.decomposition import LatentDirichletAllocation |
|
|
ML_AVAILABLE = True |
|
|
except ImportError: |
|
|
ML_AVAILABLE = False |
|
|
print("⚠️ ML libraries not available. Install with: pip install scikit-learn") |
|
|
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
@dataclass |
|
|
class PDFDocument: |
|
|
"""PDF document structure.""" |
|
|
file_path: str |
|
|
filename: str |
|
|
file_size: int |
|
|
page_count: int |
|
|
text_content: str |
|
|
metadata: Dict[str, Any] |
|
|
processing_timestamp: str |
|
|
content_hash: str |
|
|
|
|
|
@dataclass |
|
|
class ProcessedChunk: |
|
|
"""Processed text chunk.""" |
|
|
chunk_id: str |
|
|
source_document: str |
|
|
chunk_text: str |
|
|
chunk_type: str |
|
|
page_number: int |
|
|
position_in_document: int |
|
|
word_count: int |
|
|
character_count: int |
|
|
semantic_features: Dict[str, Any] |
|
|
processing_timestamp: str |
|
|
|
|
|
@dataclass |
|
|
class TrainingDataEntry: |
|
|
"""Training data entry for LiMp system.""" |
|
|
entry_id: str |
|
|
source_chunks: List[str] |
|
|
processed_text: str |
|
|
content_type: str |
|
|
complexity_score: float |
|
|
semantic_category: str |
|
|
keywords: List[str] |
|
|
entities: List[str] |
|
|
mathematical_expressions: List[str] |
|
|
dimensional_features: Dict[str, Any] |
|
|
metadata: Dict[str, Any] |
|
|
creation_timestamp: str |
|
|
|
|
|
class PDFProcessor: |
|
|
"""Advanced PDF processing system.""" |
|
|
|
|
|
def __init__(self, output_dir: str = "processed_pdfs"): |
|
|
self.output_dir = Path(output_dir) |
|
|
self.output_dir.mkdir(exist_ok=True) |
|
|
|
|
|
|
|
|
if TEXT_PROCESSING_AVAILABLE: |
|
|
try: |
|
|
nltk.download('punkt', quiet=True) |
|
|
nltk.download('stopwords', quiet=True) |
|
|
nltk.download('wordnet', quiet=True) |
|
|
self.lemmatizer = WordNetLemmatizer() |
|
|
self.stop_words = set(stopwords.words('english')) |
|
|
except Exception as e: |
|
|
logger.warning(f"NLTK initialization failed: {e}") |
|
|
self.lemmatizer = None |
|
|
self.stop_words = set() |
|
|
|
|
|
|
|
|
if ML_AVAILABLE: |
|
|
self.tfidf_vectorizer = TfidfVectorizer(max_features=1000, stop_words='english') |
|
|
self.lda_model = None |
|
|
|
|
|
self.processed_documents = [] |
|
|
self.processed_chunks = [] |
|
|
self.training_entries = [] |
|
|
|
|
|
def process_pdf_file(self, file_path: str) -> PDFDocument: |
|
|
"""Process a single PDF file and extract comprehensive information.""" |
|
|
|
|
|
logger.info(f"Processing PDF: {file_path}") |
|
|
|
|
|
if not PDF_PROCESSING_AVAILABLE: |
|
|
raise ImportError("PDF processing libraries not available") |
|
|
|
|
|
file_path = Path(file_path) |
|
|
if not file_path.exists(): |
|
|
raise FileNotFoundError(f"PDF file not found: {file_path}") |
|
|
|
|
|
|
|
|
file_size = file_path.stat().st_size |
|
|
filename = file_path.name |
|
|
|
|
|
|
|
|
text_content = "" |
|
|
metadata = {} |
|
|
page_count = 0 |
|
|
|
|
|
try: |
|
|
|
|
|
doc = fitz.open(str(file_path)) |
|
|
page_count = doc.page_count |
|
|
metadata = doc.metadata |
|
|
|
|
|
for page_num in range(page_count): |
|
|
page = doc.load_page(page_num) |
|
|
text_content += page.get_text() + "\n" |
|
|
|
|
|
doc.close() |
|
|
|
|
|
except Exception as e: |
|
|
logger.warning(f"PyMuPDF failed, trying PyPDF2: {e}") |
|
|
try: |
|
|
|
|
|
with open(file_path, 'rb') as file: |
|
|
pdf_reader = PyPDF2.PdfReader(file) |
|
|
page_count = len(pdf_reader.pages) |
|
|
metadata = pdf_reader.metadata |
|
|
|
|
|
for page in pdf_reader.pages: |
|
|
text_content += page.extract_text() + "\n" |
|
|
|
|
|
except Exception as e2: |
|
|
logger.warning(f"PyPDF2 failed, trying pdfplumber: {e2}") |
|
|
try: |
|
|
|
|
|
with pdfplumber.open(file_path) as pdf: |
|
|
page_count = len(pdf.pages) |
|
|
metadata = pdf.metadata |
|
|
|
|
|
for page in pdf.pages: |
|
|
page_text = page.extract_text() |
|
|
if page_text: |
|
|
text_content += page_text + "\n" |
|
|
|
|
|
except Exception as e3: |
|
|
raise Exception(f"All PDF processing methods failed: {e3}") |
|
|
|
|
|
|
|
|
text_content = self._clean_text(text_content) |
|
|
|
|
|
|
|
|
content_hash = hashlib.sha256(text_content.encode()).hexdigest()[:16] |
|
|
|
|
|
|
|
|
pdf_doc = PDFDocument( |
|
|
file_path=str(file_path), |
|
|
filename=filename, |
|
|
file_size=file_size, |
|
|
page_count=page_count, |
|
|
text_content=text_content, |
|
|
metadata=metadata or {}, |
|
|
processing_timestamp=datetime.now().isoformat(), |
|
|
content_hash=content_hash |
|
|
) |
|
|
|
|
|
self.processed_documents.append(pdf_doc) |
|
|
logger.info(f"Successfully processed PDF: {filename} ({page_count} pages, {len(text_content)} chars)") |
|
|
|
|
|
return pdf_doc |
|
|
|
|
|
def _clean_text(self, text: str) -> str: |
|
|
"""Clean and normalize text content.""" |
|
|
|
|
|
|
|
|
text = ' '.join(text.split()) |
|
|
|
|
|
|
|
|
import re |
|
|
text = re.sub(r'[^\w\s\.\,\!\?\;\:\-\(\)\[\]\{\}\+\-\*\/\=\<\>\^\%\$\#\@]', ' ', text) |
|
|
|
|
|
|
|
|
text = re.sub(r'\s+', ' ', text) |
|
|
|
|
|
return text.strip() |
|
|
|
|
|
def chunk_document(self, pdf_doc: PDFDocument, chunk_size: int = 1000, overlap: int = 200) -> List[ProcessedChunk]: |
|
|
"""Chunk document into processable segments.""" |
|
|
|
|
|
logger.info(f"Chunking document: {pdf_doc.filename}") |
|
|
|
|
|
chunks = [] |
|
|
text = pdf_doc.text_content |
|
|
|
|
|
if not text.strip(): |
|
|
logger.warning(f"No text content found in {pdf_doc.filename}") |
|
|
return chunks |
|
|
|
|
|
|
|
|
if TEXT_PROCESSING_AVAILABLE: |
|
|
sentences = sent_tokenize(text) |
|
|
else: |
|
|
sentences = text.split('. ') |
|
|
|
|
|
|
|
|
current_chunk = "" |
|
|
chunk_id = 0 |
|
|
position = 0 |
|
|
|
|
|
for sentence in sentences: |
|
|
if len(current_chunk + sentence) > chunk_size and current_chunk: |
|
|
|
|
|
chunk = self._process_chunk( |
|
|
chunk_id=str(chunk_id), |
|
|
source_document=pdf_doc.filename, |
|
|
chunk_text=current_chunk.strip(), |
|
|
page_number=1, |
|
|
position_in_document=position |
|
|
) |
|
|
chunks.append(chunk) |
|
|
|
|
|
|
|
|
overlap_text = current_chunk[-overlap:] if len(current_chunk) > overlap else current_chunk |
|
|
current_chunk = overlap_text + " " + sentence |
|
|
chunk_id += 1 |
|
|
position += len(current_chunk) |
|
|
else: |
|
|
current_chunk += " " + sentence if current_chunk else sentence |
|
|
|
|
|
|
|
|
if current_chunk.strip(): |
|
|
chunk = self._process_chunk( |
|
|
chunk_id=str(chunk_id), |
|
|
source_document=pdf_doc.filename, |
|
|
chunk_text=current_chunk.strip(), |
|
|
page_number=1, |
|
|
position_in_document=position |
|
|
) |
|
|
chunks.append(chunk) |
|
|
|
|
|
self.processed_chunks.extend(chunks) |
|
|
logger.info(f"Created {len(chunks)} chunks from {pdf_doc.filename}") |
|
|
|
|
|
return chunks |
|
|
|
|
|
def _process_chunk(self, chunk_id: str, source_document: str, chunk_text: str, |
|
|
page_number: int, position_in_document: int) -> ProcessedChunk: |
|
|
"""Process individual text chunk.""" |
|
|
|
|
|
|
|
|
chunk_type = self._classify_chunk_type(chunk_text) |
|
|
|
|
|
|
|
|
semantic_features = self._extract_semantic_features(chunk_text) |
|
|
|
|
|
return ProcessedChunk( |
|
|
chunk_id=chunk_id, |
|
|
source_document=source_document, |
|
|
chunk_text=chunk_text, |
|
|
chunk_type=chunk_type, |
|
|
page_number=page_number, |
|
|
position_in_document=position_in_document, |
|
|
word_count=len(chunk_text.split()), |
|
|
character_count=len(chunk_text), |
|
|
semantic_features=semantic_features, |
|
|
processing_timestamp=datetime.now().isoformat() |
|
|
) |
|
|
|
|
|
def _classify_chunk_type(self, text: str) -> str: |
|
|
"""Classify chunk type based on content.""" |
|
|
|
|
|
text_lower = text.lower() |
|
|
|
|
|
|
|
|
math_indicators = ['equation', 'formula', 'theorem', 'proof', 'calculate', 'solve', '=', '+', '-', '*', '/', '^'] |
|
|
if any(indicator in text_lower for indicator in math_indicators): |
|
|
return "mathematical" |
|
|
|
|
|
|
|
|
if 'table' in text_lower or '|' in text or '\t' in text: |
|
|
return "table" |
|
|
|
|
|
|
|
|
if 'figure' in text_lower or 'fig.' in text_lower or 'image' in text_lower: |
|
|
return "figure_caption" |
|
|
|
|
|
|
|
|
code_indicators = ['def ', 'function', 'class ', 'import', 'return', '{', '}', ';'] |
|
|
if any(indicator in text for indicator in code_indicators): |
|
|
return "code" |
|
|
|
|
|
|
|
|
return "paragraph" |
|
|
|
|
|
def _extract_semantic_features(self, text: str) -> Dict[str, Any]: |
|
|
"""Extract semantic features from text chunk.""" |
|
|
|
|
|
features = { |
|
|
"word_count": len(text.split()), |
|
|
"sentence_count": len(text.split('.')), |
|
|
"avg_word_length": np.mean([len(word) for word in text.split()]) if text.split() else 0, |
|
|
"complexity_score": 0.0, |
|
|
"topics": [], |
|
|
"entities": [], |
|
|
"keywords": [] |
|
|
} |
|
|
|
|
|
if TEXT_PROCESSING_AVAILABLE: |
|
|
|
|
|
words = word_tokenize(text.lower()) |
|
|
keywords = [word for word in words if word.isalpha() and word not in self.stop_words] |
|
|
features["keywords"] = list(set(keywords))[:10] |
|
|
|
|
|
|
|
|
features["complexity_score"] = min(1.0, len(keywords) / 50.0) |
|
|
|
|
|
return features |
|
|
|
|
|
def create_training_entries(self, chunks: List[ProcessedChunk]) -> List[TrainingDataEntry]: |
|
|
"""Create training data entries from processed chunks.""" |
|
|
|
|
|
logger.info(f"Creating training entries from {len(chunks)} chunks") |
|
|
|
|
|
training_entries = [] |
|
|
|
|
|
|
|
|
chunk_groups = {} |
|
|
for chunk in chunks: |
|
|
key = f"{chunk.source_document}_{chunk.chunk_type}" |
|
|
if key not in chunk_groups: |
|
|
chunk_groups[key] = [] |
|
|
chunk_groups[key].append(chunk) |
|
|
|
|
|
|
|
|
for group_key, group_chunks in chunk_groups.items(): |
|
|
if len(group_chunks) < 1: |
|
|
continue |
|
|
|
|
|
|
|
|
combined_text = " ".join([chunk.chunk_text for chunk in group_chunks]) |
|
|
source_chunks = [chunk.chunk_id for chunk in group_chunks] |
|
|
|
|
|
|
|
|
content_type = group_chunks[0].chunk_type |
|
|
complexity_score = np.mean([chunk.semantic_features.get("complexity_score", 0) for chunk in group_chunks]) |
|
|
|
|
|
|
|
|
semantic_category = self._determine_semantic_category(combined_text, content_type) |
|
|
|
|
|
|
|
|
all_keywords = [] |
|
|
all_entities = [] |
|
|
for chunk in group_chunks: |
|
|
all_keywords.extend(chunk.semantic_features.get("keywords", [])) |
|
|
all_entities.extend(chunk.semantic_features.get("entities", [])) |
|
|
|
|
|
|
|
|
dimensional_features = self._create_dimensional_features(combined_text, group_chunks) |
|
|
|
|
|
|
|
|
entry = TrainingDataEntry( |
|
|
entry_id=f"entry_{len(training_entries)}_{group_key}", |
|
|
source_chunks=source_chunks, |
|
|
processed_text=combined_text, |
|
|
content_type=content_type, |
|
|
complexity_score=complexity_score, |
|
|
semantic_category=semantic_category, |
|
|
keywords=list(set(all_keywords))[:20], |
|
|
entities=list(set(all_entities))[:10], |
|
|
mathematical_expressions=self._extract_math_expressions(combined_text), |
|
|
dimensional_features=dimensional_features, |
|
|
metadata={ |
|
|
"source_document": group_chunks[0].source_document, |
|
|
"chunk_count": len(group_chunks), |
|
|
"avg_word_count": np.mean([chunk.word_count for chunk in group_chunks]), |
|
|
"processing_method": "pdf_processing_system" |
|
|
}, |
|
|
creation_timestamp=datetime.now().isoformat() |
|
|
) |
|
|
|
|
|
training_entries.append(entry) |
|
|
|
|
|
self.training_entries.extend(training_entries) |
|
|
logger.info(f"Created {len(training_entries)} training entries") |
|
|
|
|
|
return training_entries |
|
|
|
|
|
def _determine_semantic_category(self, text: str, content_type: str) -> str: |
|
|
"""Determine semantic category of the content.""" |
|
|
|
|
|
text_lower = text.lower() |
|
|
|
|
|
|
|
|
if any(term in text_lower for term in ['algorithm', 'programming', 'code', 'software', 'system']): |
|
|
return "technical" |
|
|
elif any(term in text_lower for term in ['research', 'study', 'experiment', 'analysis', 'data']): |
|
|
return "research" |
|
|
elif any(term in text_lower for term in ['theory', 'concept', 'principle', 'framework', 'model']): |
|
|
return "theoretical" |
|
|
elif any(term in text_lower for term in ['application', 'use', 'practice', 'implementation']): |
|
|
return "practical" |
|
|
else: |
|
|
return "general" |
|
|
|
|
|
def _create_dimensional_features(self, text: str, chunks: List[ProcessedChunk]) -> Dict[str, Any]: |
|
|
"""Create dimensional features for LiMp processing.""" |
|
|
|
|
|
return { |
|
|
"text_dimension": len(text), |
|
|
"complexity_dimension": np.mean([chunk.semantic_features.get("complexity_score", 0) for chunk in chunks]), |
|
|
"semantic_density": len(text.split()) / len(text) if text else 0, |
|
|
"coherence_score": self._calculate_coherence_score(text), |
|
|
"novelty_score": self._calculate_novelty_score(text), |
|
|
"dimensional_entanglement": self._calculate_dimensional_entanglement(text, chunks) |
|
|
} |
|
|
|
|
|
def _calculate_coherence_score(self, text: str) -> float: |
|
|
"""Calculate text coherence score.""" |
|
|
|
|
|
sentences = text.split('.') |
|
|
if len(sentences) < 2: |
|
|
return 0.5 |
|
|
|
|
|
|
|
|
transition_words = ['however', 'therefore', 'moreover', 'furthermore', 'consequently', 'thus', 'hence'] |
|
|
transitions = sum(1 for word in transition_words if word in text.lower()) |
|
|
|
|
|
return min(1.0, transitions / len(sentences)) |
|
|
|
|
|
def _calculate_novelty_score(self, text: str) -> float: |
|
|
"""Calculate content novelty score.""" |
|
|
|
|
|
words = text.lower().split() |
|
|
unique_words = set(words) |
|
|
|
|
|
if not words: |
|
|
return 0.0 |
|
|
|
|
|
return len(unique_words) / len(words) |
|
|
|
|
|
def _calculate_dimensional_entanglement(self, text: str, chunks: List[ProcessedChunk]) -> float: |
|
|
"""Calculate dimensional entanglement score.""" |
|
|
|
|
|
chunk_count = len(chunks) |
|
|
if chunk_count < 2: |
|
|
return 0.0 |
|
|
|
|
|
|
|
|
similarities = [] |
|
|
for i in range(chunk_count - 1): |
|
|
chunk1_words = set(chunks[i].chunk_text.lower().split()) |
|
|
chunk2_words = set(chunks[i+1].chunk_text.lower().split()) |
|
|
|
|
|
if chunk1_words and chunk2_words: |
|
|
similarity = len(chunk1_words.intersection(chunk2_words)) / len(chunk1_words.union(chunk2_words)) |
|
|
similarities.append(similarity) |
|
|
|
|
|
return np.mean(similarities) if similarities else 0.0 |
|
|
|
|
|
def _extract_math_expressions(self, text: str) -> List[str]: |
|
|
"""Extract mathematical expressions from text.""" |
|
|
import re |
|
|
|
|
|
|
|
|
patterns = [ |
|
|
r'\b[a-zA-Z]\s*=\s*[^=]+\b', |
|
|
r'\b\d+[\+\-\*\/]\d+\b', |
|
|
r'\b[a-zA-Z]\^?\d+\b', |
|
|
r'\b\w+\s*\(\s*\w+\s*\)\s*=\s*\w+\b' |
|
|
] |
|
|
|
|
|
expressions = [] |
|
|
for pattern in patterns: |
|
|
matches = re.findall(pattern, text) |
|
|
expressions.extend(matches) |
|
|
|
|
|
return expressions[:5] |
|
|
|
|
|
def save_processed_data(self, filename_prefix: str = "pdf_processing_results") -> Dict[str, str]: |
|
|
"""Save all processed data to files.""" |
|
|
|
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") |
|
|
saved_files = {} |
|
|
|
|
|
|
|
|
if self.processed_documents: |
|
|
docs_file = self.output_dir / f"{filename_prefix}_documents_{timestamp}.json" |
|
|
with open(docs_file, 'w', encoding='utf-8') as f: |
|
|
json.dump([asdict(doc) for doc in self.processed_documents], f, indent=2, ensure_ascii=False) |
|
|
saved_files["documents"] = str(docs_file) |
|
|
|
|
|
|
|
|
if self.processed_chunks: |
|
|
chunks_file = self.output_dir / f"{filename_prefix}_chunks_{timestamp}.json" |
|
|
with open(chunks_file, 'w', encoding='utf-8') as f: |
|
|
json.dump([asdict(chunk) for chunk in self.processed_chunks], f, indent=2, ensure_ascii=False) |
|
|
saved_files["chunks"] = str(chunks_file) |
|
|
|
|
|
|
|
|
if self.training_entries: |
|
|
entries_file = self.output_dir / f"{filename_prefix}_training_entries_{timestamp}.json" |
|
|
with open(entries_file, 'w', encoding='utf-8') as f: |
|
|
json.dump([asdict(entry) for entry in self.training_entries], f, indent=2, ensure_ascii=False) |
|
|
saved_files["training_entries"] = str(entries_file) |
|
|
|
|
|
|
|
|
summary = { |
|
|
"timestamp": datetime.now().isoformat(), |
|
|
"documents_processed": len(self.processed_documents), |
|
|
"chunks_created": len(self.processed_chunks), |
|
|
"training_entries_created": len(self.training_entries), |
|
|
"saved_files": saved_files |
|
|
} |
|
|
|
|
|
summary_file = self.output_dir / f"{filename_prefix}_summary_{timestamp}.json" |
|
|
with open(summary_file, 'w', encoding='utf-8') as f: |
|
|
json.dump(summary, f, indent=2, ensure_ascii=False) |
|
|
saved_files["summary"] = str(summary_file) |
|
|
|
|
|
logger.info(f"Saved processed data to {len(saved_files)} files") |
|
|
return saved_files |
|
|
|
|
|
def main(): |
|
|
"""Main function to demonstrate PDF processing.""" |
|
|
|
|
|
print("📄 LiMp PDF Processing System") |
|
|
print("=" * 50) |
|
|
|
|
|
if not PDF_PROCESSING_AVAILABLE: |
|
|
print("❌ PDF processing libraries not available") |
|
|
print("Install with: pip install PyPDF2 pdfplumber PyMuPDF") |
|
|
return |
|
|
|
|
|
processor = PDFProcessor() |
|
|
|
|
|
|
|
|
print("📋 PDF Processing System Ready") |
|
|
print("\n🔧 Features:") |
|
|
print(" ✅ Multi-method PDF text extraction") |
|
|
print(" ✅ Intelligent document chunking") |
|
|
print(" ✅ Semantic feature extraction") |
|
|
print(" ✅ Training data generation") |
|
|
print(" ✅ Dimensional feature analysis") |
|
|
print(" ✅ Mathematical expression detection") |
|
|
|
|
|
print("\n💡 Usage:") |
|
|
print(" processor = PDFProcessor()") |
|
|
print(" pdf_doc = processor.process_pdf_file('document.pdf')") |
|
|
print(" chunks = processor.chunk_document(pdf_doc)") |
|
|
print(" training_entries = processor.create_training_entries(chunks)") |
|
|
print(" saved_files = processor.save_processed_data()") |
|
|
|
|
|
print("\n🎯 Ready for PDF processing and training data generation!") |
|
|
|
|
|
if __name__ == "__main__": |
|
|
main() |
|
|
|