|
|
import os |
|
|
import re |
|
|
import time |
|
|
from typing import List, Optional, Dict, Any |
|
|
from pathlib import Path |
|
|
|
|
|
from langchain_community.document_loaders import PyPDFLoader |
|
|
from langchain.text_splitter import RecursiveCharacterTextSplitter |
|
|
from langchain_core.documents import Document |
|
|
|
|
|
from config import Config, AppConstants |
|
|
from models import DocumentMetadata, ProcessingStats |
|
|
|
|
|
class DocumentProcessor: |
|
|
"""Handles document loading, processing, and chunking.""" |
|
|
|
|
|
def __init__(self, base_path: str = None): |
|
|
"""Initialize the document processor. |
|
|
|
|
|
Args: |
|
|
base_path: Base path for document directories |
|
|
""" |
|
|
self.base_path = base_path or Config.DATA_BASE_PATH |
|
|
self.text_splitter = RecursiveCharacterTextSplitter( |
|
|
chunk_size=Config.CHUNK_SIZE, |
|
|
chunk_overlap=Config.CHUNK_OVERLAP, |
|
|
length_function=len, |
|
|
separators=["\n\n", "\n", " ", ""] |
|
|
) |
|
|
|
|
|
def process_all_documents(self) -> List[Document]: |
|
|
"""Process both markdown and PDF documents from courses and programs directories. |
|
|
|
|
|
Returns: |
|
|
List of processed documents with proper metadata |
|
|
""" |
|
|
start_time = time.time() |
|
|
|
|
|
documents = { |
|
|
'courses': [], |
|
|
'programs': [] |
|
|
} |
|
|
|
|
|
|
|
|
paths = self._get_document_paths() |
|
|
|
|
|
|
|
|
self._ensure_directories_exist(paths) |
|
|
|
|
|
|
|
|
for category in ['courses', 'programs']: |
|
|
|
|
|
md_path = paths[f'{category}_md'] |
|
|
if os.path.exists(md_path): |
|
|
documents[category].extend(self._process_markdown_files(md_path, category)) |
|
|
|
|
|
|
|
|
pdf_path = paths[f'{category}_pdf'] |
|
|
if os.path.exists(pdf_path): |
|
|
documents[category].extend(self._process_pdf_files(pdf_path, category)) |
|
|
|
|
|
print(f"Processed {len(documents[category])} {category} documents") |
|
|
|
|
|
|
|
|
all_documents = documents['courses'] + documents['programs'] |
|
|
|
|
|
|
|
|
processing_time = time.time() - start_time |
|
|
stats = ProcessingStats( |
|
|
total_documents=len(all_documents), |
|
|
courses_processed=len(documents['courses']), |
|
|
programs_processed=len(documents['programs']), |
|
|
chunks_created=0, |
|
|
processing_time=processing_time |
|
|
) |
|
|
|
|
|
print(f"Total documents processed: {len(all_documents)}") |
|
|
print(f"Courses: {len(documents['courses'])}, Programs: {len(documents['programs'])}") |
|
|
print(f"Processing time: {processing_time:.2f} seconds") |
|
|
|
|
|
return all_documents |
|
|
|
|
|
def chunk_documents(self, documents: List[Document]) -> List[Document]: |
|
|
"""Split documents into chunks for embedding. |
|
|
|
|
|
Args: |
|
|
documents: List of documents to chunk |
|
|
|
|
|
Returns: |
|
|
List of document chunks |
|
|
""" |
|
|
print(f"Splitting {len(documents)} documents into chunks...") |
|
|
chunks = self.text_splitter.split_documents(documents) |
|
|
print(f"Created {len(chunks)} document chunks") |
|
|
return chunks |
|
|
|
|
|
def _get_document_paths(self) -> Dict[str, str]: |
|
|
"""Get paths for different document types. |
|
|
|
|
|
Returns: |
|
|
Dictionary with document paths |
|
|
""" |
|
|
return { |
|
|
'courses_md': os.path.join(self.base_path, Config.COURSES_MD_PATH), |
|
|
'courses_pdf': os.path.join(self.base_path, Config.COURSES_PDF_PATH), |
|
|
'programs_md': os.path.join(self.base_path, Config.PROGRAMS_MD_PATH), |
|
|
'programs_pdf': os.path.join(self.base_path, Config.PROGRAMS_PDF_PATH) |
|
|
} |
|
|
|
|
|
def _ensure_directories_exist(self, paths: Dict[str, str]) -> None: |
|
|
"""Ensure all document directories exist. |
|
|
|
|
|
Args: |
|
|
paths: Dictionary of paths to create |
|
|
""" |
|
|
for path in paths.values(): |
|
|
if not os.path.exists(path): |
|
|
os.makedirs(path, exist_ok=True) |
|
|
print(f"Created directory: {path}") |
|
|
|
|
|
def _process_markdown_files(self, path: str, category: str) -> List[Document]: |
|
|
"""Process markdown files in a directory. |
|
|
|
|
|
Args: |
|
|
path: Path to the markdown files directory |
|
|
category: Type of documents ('courses' or 'programs') |
|
|
|
|
|
Returns: |
|
|
List of processed markdown documents with metadata |
|
|
""" |
|
|
documents = [] |
|
|
|
|
|
if not os.path.exists(path): |
|
|
print(f"Warning: Markdown directory {path} does not exist") |
|
|
return documents |
|
|
|
|
|
for filename in os.listdir(path): |
|
|
if filename.endswith('.md'): |
|
|
file_path = os.path.join(path, filename) |
|
|
try: |
|
|
content = self._read_file_with_fallback_encoding(file_path) |
|
|
|
|
|
|
|
|
metadata = { |
|
|
'source': file_path, |
|
|
'type': 'markdown', |
|
|
'category': category, |
|
|
'doc_type': category.rstrip('s'), |
|
|
'filename': filename |
|
|
} |
|
|
|
|
|
|
|
|
if category == 'courses': |
|
|
code = self._extract_course_code(filename, content) |
|
|
if code: |
|
|
metadata['course_code'] = code |
|
|
|
|
|
doc = Document( |
|
|
page_content=content, |
|
|
metadata=metadata |
|
|
) |
|
|
documents.append(doc) |
|
|
|
|
|
except Exception as e: |
|
|
print(f"Error processing markdown file {filename}: {str(e)}") |
|
|
|
|
|
return documents |
|
|
|
|
|
def _process_pdf_files(self, path: str, category: str) -> List[Document]: |
|
|
"""Process PDF files in a directory. |
|
|
|
|
|
Args: |
|
|
path: Path to the PDF files directory |
|
|
category: Type of documents ('courses' or 'programs') |
|
|
|
|
|
Returns: |
|
|
List of processed PDF documents with metadata |
|
|
""" |
|
|
documents = [] |
|
|
|
|
|
if not os.path.exists(path): |
|
|
print(f"Warning: PDF directory {path} does not exist") |
|
|
return documents |
|
|
|
|
|
for filename in os.listdir(path): |
|
|
if filename.endswith('.pdf'): |
|
|
file_path = os.path.join(path, filename) |
|
|
try: |
|
|
loader = PyPDFLoader(file_path) |
|
|
pdf_docs = loader.load() |
|
|
|
|
|
|
|
|
metadata = { |
|
|
'type': 'pdf', |
|
|
'category': category, |
|
|
'doc_type': category.rstrip('s'), |
|
|
'filename': filename |
|
|
} |
|
|
|
|
|
|
|
|
if category == 'courses' and pdf_docs: |
|
|
code = self._extract_course_code(filename, pdf_docs[0].page_content) |
|
|
if code: |
|
|
metadata['course_code'] = code |
|
|
|
|
|
|
|
|
for doc in pdf_docs: |
|
|
doc.metadata.update(metadata) |
|
|
|
|
|
documents.extend(pdf_docs) |
|
|
|
|
|
except Exception as e: |
|
|
print(f"Error processing PDF {filename}: {str(e)}") |
|
|
|
|
|
return documents |
|
|
|
|
|
def _read_file_with_fallback_encoding(self, file_path: str) -> str: |
|
|
"""Read a file with fallback encodings. |
|
|
|
|
|
Args: |
|
|
file_path: Path to the file to read |
|
|
|
|
|
Returns: |
|
|
File content as string |
|
|
|
|
|
Raises: |
|
|
UnicodeDecodeError: If file cannot be read with any encoding |
|
|
""" |
|
|
for encoding in AppConstants.SUPPORTED_FILE_ENCODINGS: |
|
|
try: |
|
|
with open(file_path, 'r', encoding=encoding) as f: |
|
|
return f.read() |
|
|
except UnicodeDecodeError: |
|
|
continue |
|
|
|
|
|
raise UnicodeDecodeError(f"Failed to read {file_path} with any encoding") |
|
|
|
|
|
def _extract_course_code(self, filename: str, content: str) -> Optional[str]: |
|
|
"""Extract course code from filename or content if possible. |
|
|
|
|
|
Args: |
|
|
filename: Name of the file |
|
|
content: Content of the document |
|
|
|
|
|
Returns: |
|
|
Course code if found, None otherwise |
|
|
""" |
|
|
|
|
|
code_match = re.search(r'([A-Z]{3}\d{3})', filename) |
|
|
if code_match: |
|
|
return code_match.group(1) |
|
|
|
|
|
|
|
|
code_match = re.search(r'([A-Z]{3}\d{3})', content[:1000]) |
|
|
if code_match: |
|
|
return code_match.group(1) |
|
|
|
|
|
return None |
|
|
|
|
|
def get_document_stats(self, documents: List[Document]) -> Dict[str, Any]: |
|
|
"""Get statistics about processed documents. |
|
|
|
|
|
Args: |
|
|
documents: List of processed documents |
|
|
|
|
|
Returns: |
|
|
Dictionary with document statistics |
|
|
""" |
|
|
stats = { |
|
|
'total_documents': len(documents), |
|
|
'by_category': {}, |
|
|
'by_type': {}, |
|
|
'by_doc_type': {}, |
|
|
'course_codes': set(), |
|
|
'total_content_length': 0 |
|
|
} |
|
|
|
|
|
for doc in documents: |
|
|
metadata = doc.metadata |
|
|
|
|
|
|
|
|
category = metadata.get('category', 'unknown') |
|
|
stats['by_category'][category] = stats['by_category'].get(category, 0) + 1 |
|
|
|
|
|
|
|
|
file_type = metadata.get('type', 'unknown') |
|
|
stats['by_type'][file_type] = stats['by_type'].get(file_type, 0) + 1 |
|
|
|
|
|
|
|
|
doc_type = metadata.get('doc_type', 'unknown') |
|
|
stats['by_doc_type'][doc_type] = stats['by_doc_type'].get(doc_type, 0) + 1 |
|
|
|
|
|
|
|
|
if metadata.get('course_code'): |
|
|
stats['course_codes'].add(metadata['course_code']) |
|
|
|
|
|
|
|
|
stats['total_content_length'] += len(doc.page_content) |
|
|
|
|
|
|
|
|
stats['course_codes'] = list(stats['course_codes']) |
|
|
stats['unique_course_codes'] = len(stats['course_codes']) |
|
|
|
|
|
return stats |
|
|
|
|
|
def validate_documents(self, documents: List[Document]) -> Dict[str, Any]: |
|
|
"""Validate processed documents for common issues. |
|
|
|
|
|
Args: |
|
|
documents: List of documents to validate |
|
|
|
|
|
Returns: |
|
|
Dictionary with validation results |
|
|
""" |
|
|
validation_results = { |
|
|
'total_documents': len(documents), |
|
|
'issues': [], |
|
|
'warnings': [], |
|
|
'valid_documents': 0, |
|
|
'empty_documents': 0, |
|
|
'missing_metadata': 0 |
|
|
} |
|
|
|
|
|
for i, doc in enumerate(documents): |
|
|
|
|
|
if not doc.page_content or len(doc.page_content.strip()) == 0: |
|
|
validation_results['empty_documents'] += 1 |
|
|
validation_results['issues'].append(f"Document {i}: Empty content") |
|
|
continue |
|
|
|
|
|
|
|
|
required_metadata = ['source', 'type', 'category', 'doc_type', 'filename'] |
|
|
missing_fields = [field for field in required_metadata if not doc.metadata.get(field)] |
|
|
|
|
|
if missing_fields: |
|
|
validation_results['missing_metadata'] += 1 |
|
|
validation_results['warnings'].append( |
|
|
f"Document {i}: Missing metadata fields: {missing_fields}" |
|
|
) |
|
|
|
|
|
|
|
|
if len(doc.page_content) < 50: |
|
|
validation_results['warnings'].append( |
|
|
f"Document {i}: Very short content ({len(doc.page_content)} chars)" |
|
|
) |
|
|
|
|
|
validation_results['valid_documents'] += 1 |
|
|
|
|
|
return validation_results |