""" Document Ingestion Module for VDHF Handles loading and preprocessing of documents for retrieval. Supports PDF, TXT, DOCX, and Excel (XLSX/XLS/CSV) files. """ import os import re from typing import List, Dict, Any, Optional from dataclasses import dataclass try: from PyPDF2 import PdfReader except ImportError: PdfReader = None try: from docx import Document as DocxDocument except ImportError: DocxDocument = None try: import openpyxl except ImportError: openpyxl = None import csv import io from config.settings import CHUNK_SIZE, CHUNK_OVERLAP @dataclass class DocumentChunk: """Represents a chunk of a document with metadata.""" content: str metadata: Dict[str, Any] chunk_id: str def __str__(self) -> str: return f"Chunk[{self.chunk_id}]: {self.content[:100]}..." class DocumentIngestion: """ Document Ingestion Module Responsibilities: - Load PDFs, text files, or DOCX content - Clean text (remove noise, headers, footers) - Split text into chunks - Attach metadata such as source and position """ def __init__( self, chunk_size: int = CHUNK_SIZE, chunk_overlap: int = CHUNK_OVERLAP ): self.chunk_size = chunk_size self.chunk_overlap = chunk_overlap def load_document(self, file_path: str) -> str: """ Load a document from file path. Args: file_path: Path to the document file Returns: Raw text content of the document """ if not os.path.exists(file_path): raise FileNotFoundError(f"Document not found: {file_path}") ext = os.path.splitext(file_path)[1].lower() if ext == ".txt": return self._load_txt(file_path) elif ext == ".pdf": return self._load_pdf(file_path) elif ext == ".docx": return self._load_docx(file_path) elif ext in (".xlsx", ".xls"): return self._load_excel(file_path) elif ext == ".csv": return self._load_csv(file_path) else: raise ValueError(f"Unsupported file format: {ext}") def _load_txt(self, file_path: str) -> str: """Load a text file.""" with open(file_path, "r", encoding="utf-8", errors="ignore") as f: return f.read() def _load_pdf(self, file_path: str) -> str: """Load a PDF file.""" if PdfReader is None: raise ImportError("PyPDF2 is required for PDF support. Install with: pip install PyPDF2") reader = PdfReader(file_path) text_parts = [] for page_num, page in enumerate(reader.pages): page_text = page.extract_text() if page_text: text_parts.append(page_text) return "\n\n".join(text_parts) def _load_docx(self, file_path: str) -> str: """Load a DOCX file.""" if DocxDocument is None: raise ImportError("python-docx is required for DOCX support. Install with: pip install python-docx") doc = DocxDocument(file_path) paragraphs = [para.text for para in doc.paragraphs if para.text.strip()] return "\n\n".join(paragraphs) def _load_excel(self, file_path: str) -> str: """Load an Excel file (.xlsx/.xls) — converts every sheet into readable text. Auto-detects the real header row (skips merged title rows) by looking for the first row where 3+ cells are filled with short text values. Also skips non-student rows like totals or max-marks rows. """ if openpyxl is None: raise ImportError("openpyxl is required for Excel support. Install with: pip install openpyxl") wb = openpyxl.load_workbook(file_path, read_only=True, data_only=True) text_parts = [] for sheet_name in wb.sheetnames: ws = wb[sheet_name] rows = list(ws.iter_rows(values_only=True)) if len(rows) < 2: continue # --- Auto-detect header row --- header_idx = self._find_header_row(rows) headers = [str(h).strip() if h is not None else f"Col{i}" for i, h in enumerate(rows[header_idx])] # Collect any title lines above the header (college name, dept, etc.) preamble_lines = [] for r in rows[:header_idx]: vals = [str(v).strip() for v in r if v is not None and str(v).strip()] if vals: preamble_lines.append(" ".join(vals)) sheet_lines = [] if preamble_lines: sheet_lines.append(" | ".join(preamble_lines)) # --- Process data rows (after header) --- for row in rows[header_idx + 1:]: cells = list(row) # Skip rows that are mostly empty filled = [c for c in cells if c is not None and str(c).strip()] if len(filled) < 2: continue # Skip rows without a text name (likely totals / max-marks) has_name = any( isinstance(c, str) and len(c.strip()) > 3 and not c.strip().replace('.', '').isdigit() for c in cells ) if not has_name: continue parts = [] for header, cell in zip(headers, cells): if cell is not None and str(cell).strip(): parts.append(f"{header}: {cell}") if parts: sheet_lines.append(". ".join(parts) + ".") if sheet_lines: text_parts.append("\n".join(sheet_lines)) wb.close() if not text_parts: raise ValueError(f"No readable data found in {file_path}") return "\n\n".join(text_parts) @staticmethod def _find_header_row(rows) -> int: """Find the first row that looks like column headers. A header row has 3+ non-empty short-ish text cells and often contains keywords like 'name', 'no', 'roll', 'total', 'sl'. Falls back to row 0 if nothing better is found. """ header_keywords = {'name', 'no', 'roll', 'sl', 'sno', 'total', 'id', 'section', 'subject', 'marks', 'grade', 'percentage', 'attendance', 'date', 'class', 'student'} best_idx = 0 best_score = 0 for i, row in enumerate(rows[:20]): # only scan first 20 rows cells = [str(c).strip().lower() for c in row if c is not None and str(c).strip()] if len(cells) < 3: continue # Score: how many cells match header keywords keyword_hits = sum( 1 for c in cells if any(kw in c for kw in header_keywords) ) # Also reward rows where most cells are short text (< 30 chars) short_text = sum(1 for c in cells if len(c) < 30) score = keyword_hits * 3 + short_text if score > best_score: best_score = score best_idx = i return best_idx def _load_csv(self, file_path: str) -> str: """Load a CSV file — converts rows into readable text.""" with open(file_path, "r", encoding="utf-8", errors="ignore") as f: reader = csv.reader(f) rows = list(reader) if not rows: raise ValueError(f"CSV file is empty: {file_path}") headers = rows[0] text_lines = [] for row in rows[1:]: parts = [] for header, cell in zip(headers, row): if cell and cell.strip(): parts.append(f"{header}: {cell}") if parts: text_lines.append(". ".join(parts) + ".") return "\n".join(text_lines) def clean_text(self, text: str) -> str: """ Clean text by removing noise. Args: text: Raw text content Returns: Cleaned text """ # Remove excessive whitespace text = re.sub(r'\s+', ' ', text) # Remove page numbers (common patterns) text = re.sub(r'\n\s*\d+\s*\n', '\n', text) text = re.sub(r'Page \d+ of \d+', '', text) # Remove headers/footers markers text = re.sub(r'^\s*[-_=]{3,}\s*$', '', text, flags=re.MULTILINE) # Normalize line breaks text = re.sub(r'\n{3,}', '\n\n', text) return text.strip() def split_into_chunks( self, text: str, source: str = "unknown" ) -> List[DocumentChunk]: """ Split text into overlapping chunks. Args: text: Cleaned text content source: Source identifier for metadata Returns: List of DocumentChunk objects """ chunks = [] start = 0 chunk_index = 0 while start < len(text): # Calculate end position end = start + self.chunk_size # Try to break at sentence boundary if end < len(text): # Look for sentence ending within last 100 chars search_start = max(end - 100, start) last_period = text.rfind('. ', search_start, end) if last_period > start: end = last_period + 1 # Extract chunk content content = text[start:end].strip() if content: chunk = DocumentChunk( content=content, metadata={ "source": source, "chunk_index": chunk_index, "start_char": start, "end_char": end }, chunk_id=f"{os.path.basename(source)}_{chunk_index}" ) chunks.append(chunk) chunk_index += 1 # Move start position with overlap start = end - self.chunk_overlap if start <= chunks[-1].metadata["start_char"] if chunks else 0: start = end # Prevent infinite loop return chunks def ingest_file(self, file_path: str) -> List[DocumentChunk]: """ Full ingestion pipeline for a single file. Args: file_path: Path to the document Returns: List of processed DocumentChunk objects """ raw_text = self.load_document(file_path) cleaned_text = self.clean_text(raw_text) chunks = self.split_into_chunks(cleaned_text, source=os.path.basename(file_path)) return chunks def ingest_directory( self, directory_path: str, extensions: Optional[List[str]] = None ) -> List[DocumentChunk]: """ Ingest all documents from a directory. Args: directory_path: Path to the directory extensions: List of file extensions to process (default: ['.txt', '.pdf', '.docx']) Returns: List of all DocumentChunk objects from all files """ if extensions is None: extensions = ['.txt', '.pdf', '.docx', '.xlsx', '.xls', '.csv'] all_chunks = [] for root, _, files in os.walk(directory_path): for file in files: ext = os.path.splitext(file)[1].lower() if ext in extensions: file_path = os.path.join(root, file) try: chunks = self.ingest_file(file_path) all_chunks.extend(chunks) print(f"Ingested {file}: {len(chunks)} chunks") except Exception as e: print(f"Error ingesting {file}: {e}") return all_chunks def ingest_text(self, text: str, source: str = "direct_input") -> List[DocumentChunk]: """ Ingest raw text directly. Args: text: Raw text content source: Source identifier Returns: List of DocumentChunk objects """ cleaned_text = self.clean_text(text) return self.split_into_chunks(cleaned_text, source=source) def ingest_documents(path: str) -> List[DocumentChunk]: """ Convenience function to ingest documents from a file or directory. Args: path: Path to file or directory Returns: List of DocumentChunk objects """ ingestion = DocumentIngestion() if os.path.isfile(path): return ingestion.ingest_file(path) elif os.path.isdir(path): return ingestion.ingest_directory(path) else: raise ValueError(f"Invalid path: {path}")