GuPT / src /document_processor.py
faerazo's picture
Initial commit to HFS
8629355 verified
import os
import re
import time
from typing import List, Optional, Dict, Any
from pathlib import Path
from langchain_community.document_loaders import PyPDFLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_core.documents import Document
from config import Config, AppConstants
from models import DocumentMetadata, ProcessingStats
class DocumentProcessor:
"""Handles document loading, processing, and chunking."""
def __init__(self, base_path: str = None):
"""Initialize the document processor.
Args:
base_path: Base path for document directories
"""
self.base_path = base_path or Config.DATA_BASE_PATH
self.text_splitter = RecursiveCharacterTextSplitter(
chunk_size=Config.CHUNK_SIZE,
chunk_overlap=Config.CHUNK_OVERLAP,
length_function=len,
separators=["\n\n", "\n", " ", ""]
)
def process_all_documents(self) -> List[Document]:
"""Process both markdown and PDF documents from courses and programs directories.
Returns:
List of processed documents with proper metadata
"""
start_time = time.time()
documents = {
'courses': [],
'programs': []
}
# Define paths for different document types
paths = self._get_document_paths()
# Create directories if they don't exist
self._ensure_directories_exist(paths)
# Process documents by category
for category in ['courses', 'programs']:
# Process markdown files
md_path = paths[f'{category}_md']
if os.path.exists(md_path):
documents[category].extend(self._process_markdown_files(md_path, category))
# Process PDF files
pdf_path = paths[f'{category}_pdf']
if os.path.exists(pdf_path):
documents[category].extend(self._process_pdf_files(pdf_path, category))
print(f"Processed {len(documents[category])} {category} documents")
# Combine all documents while maintaining their metadata
all_documents = documents['courses'] + documents['programs']
# Create processing stats
processing_time = time.time() - start_time
stats = ProcessingStats(
total_documents=len(all_documents),
courses_processed=len(documents['courses']),
programs_processed=len(documents['programs']),
chunks_created=0, # Will be updated after chunking
processing_time=processing_time
)
print(f"Total documents processed: {len(all_documents)}")
print(f"Courses: {len(documents['courses'])}, Programs: {len(documents['programs'])}")
print(f"Processing time: {processing_time:.2f} seconds")
return all_documents
def chunk_documents(self, documents: List[Document]) -> List[Document]:
"""Split documents into chunks for embedding.
Args:
documents: List of documents to chunk
Returns:
List of document chunks
"""
print(f"Splitting {len(documents)} documents into chunks...")
chunks = self.text_splitter.split_documents(documents)
print(f"Created {len(chunks)} document chunks")
return chunks
def _get_document_paths(self) -> Dict[str, str]:
"""Get paths for different document types.
Returns:
Dictionary with document paths
"""
return {
'courses_md': os.path.join(self.base_path, Config.COURSES_MD_PATH),
'courses_pdf': os.path.join(self.base_path, Config.COURSES_PDF_PATH),
'programs_md': os.path.join(self.base_path, Config.PROGRAMS_MD_PATH),
'programs_pdf': os.path.join(self.base_path, Config.PROGRAMS_PDF_PATH)
}
def _ensure_directories_exist(self, paths: Dict[str, str]) -> None:
"""Ensure all document directories exist.
Args:
paths: Dictionary of paths to create
"""
for path in paths.values():
if not os.path.exists(path):
os.makedirs(path, exist_ok=True)
print(f"Created directory: {path}")
def _process_markdown_files(self, path: str, category: str) -> List[Document]:
"""Process markdown files in a directory.
Args:
path: Path to the markdown files directory
category: Type of documents ('courses' or 'programs')
Returns:
List of processed markdown documents with metadata
"""
documents = []
if not os.path.exists(path):
print(f"Warning: Markdown directory {path} does not exist")
return documents
for filename in os.listdir(path):
if filename.endswith('.md'):
file_path = os.path.join(path, filename)
try:
content = self._read_file_with_fallback_encoding(file_path)
# Create metadata
metadata = {
'source': file_path,
'type': 'markdown',
'category': category,
'doc_type': category.rstrip('s'), # 'course' or 'program'
'filename': filename
}
# Extract course code if it's a course document
if category == 'courses':
code = self._extract_course_code(filename, content)
if code:
metadata['course_code'] = code
doc = Document(
page_content=content,
metadata=metadata
)
documents.append(doc)
except Exception as e:
print(f"Error processing markdown file {filename}: {str(e)}")
return documents
def _process_pdf_files(self, path: str, category: str) -> List[Document]:
"""Process PDF files in a directory.
Args:
path: Path to the PDF files directory
category: Type of documents ('courses' or 'programs')
Returns:
List of processed PDF documents with metadata
"""
documents = []
if not os.path.exists(path):
print(f"Warning: PDF directory {path} does not exist")
return documents
for filename in os.listdir(path):
if filename.endswith('.pdf'):
file_path = os.path.join(path, filename)
try:
loader = PyPDFLoader(file_path)
pdf_docs = loader.load()
# Create base metadata
metadata = {
'type': 'pdf',
'category': category,
'doc_type': category.rstrip('s'), # 'course' or 'program'
'filename': filename
}
# Add course code if it exists and it's a course document
if category == 'courses' and pdf_docs:
code = self._extract_course_code(filename, pdf_docs[0].page_content)
if code:
metadata['course_code'] = code
# Add metadata to each page
for doc in pdf_docs:
doc.metadata.update(metadata)
documents.extend(pdf_docs)
except Exception as e:
print(f"Error processing PDF {filename}: {str(e)}")
return documents
def _read_file_with_fallback_encoding(self, file_path: str) -> str:
"""Read a file with fallback encodings.
Args:
file_path: Path to the file to read
Returns:
File content as string
Raises:
UnicodeDecodeError: If file cannot be read with any encoding
"""
for encoding in AppConstants.SUPPORTED_FILE_ENCODINGS:
try:
with open(file_path, 'r', encoding=encoding) as f:
return f.read()
except UnicodeDecodeError:
continue
raise UnicodeDecodeError(f"Failed to read {file_path} with any encoding")
def _extract_course_code(self, filename: str, content: str) -> Optional[str]:
"""Extract course code from filename or content if possible.
Args:
filename: Name of the file
content: Content of the document
Returns:
Course code if found, None otherwise
"""
# Try to extract from filename first (e.g., "DIT134-advanced-programming.pdf")
code_match = re.search(r'([A-Z]{3}\d{3})', filename)
if code_match:
return code_match.group(1)
# Try to extract from content (first occurrence)
code_match = re.search(r'([A-Z]{3}\d{3})', content[:1000]) # Search in first 1000 chars
if code_match:
return code_match.group(1)
return None
def get_document_stats(self, documents: List[Document]) -> Dict[str, Any]:
"""Get statistics about processed documents.
Args:
documents: List of processed documents
Returns:
Dictionary with document statistics
"""
stats = {
'total_documents': len(documents),
'by_category': {},
'by_type': {},
'by_doc_type': {},
'course_codes': set(),
'total_content_length': 0
}
for doc in documents:
metadata = doc.metadata
# Count by category
category = metadata.get('category', 'unknown')
stats['by_category'][category] = stats['by_category'].get(category, 0) + 1
# Count by file type
file_type = metadata.get('type', 'unknown')
stats['by_type'][file_type] = stats['by_type'].get(file_type, 0) + 1
# Count by document type
doc_type = metadata.get('doc_type', 'unknown')
stats['by_doc_type'][doc_type] = stats['by_doc_type'].get(doc_type, 0) + 1
# Collect course codes
if metadata.get('course_code'):
stats['course_codes'].add(metadata['course_code'])
# Sum content length
stats['total_content_length'] += len(doc.page_content)
# Convert set to list for JSON serialization
stats['course_codes'] = list(stats['course_codes'])
stats['unique_course_codes'] = len(stats['course_codes'])
return stats
def validate_documents(self, documents: List[Document]) -> Dict[str, Any]:
"""Validate processed documents for common issues.
Args:
documents: List of documents to validate
Returns:
Dictionary with validation results
"""
validation_results = {
'total_documents': len(documents),
'issues': [],
'warnings': [],
'valid_documents': 0,
'empty_documents': 0,
'missing_metadata': 0
}
for i, doc in enumerate(documents):
# Check for empty content
if not doc.page_content or len(doc.page_content.strip()) == 0:
validation_results['empty_documents'] += 1
validation_results['issues'].append(f"Document {i}: Empty content")
continue
# Check for essential metadata
required_metadata = ['source', 'type', 'category', 'doc_type', 'filename']
missing_fields = [field for field in required_metadata if not doc.metadata.get(field)]
if missing_fields:
validation_results['missing_metadata'] += 1
validation_results['warnings'].append(
f"Document {i}: Missing metadata fields: {missing_fields}"
)
# Check content length
if len(doc.page_content) < 50:
validation_results['warnings'].append(
f"Document {i}: Very short content ({len(doc.page_content)} chars)"
)
validation_results['valid_documents'] += 1
return validation_results