""" document_loader.py Parses uploaded files (PDF, DOCX, TXT/MD) into plain text. """ import os from pathlib import Path def load_documents(file_paths: list[str]) -> list[dict]: """ Given a list of file paths, parse each into a dict: { "source": filename, "text": full text content } Supports: .pdf, .docx, .txt, .md """ docs = [] for path in file_paths: if path is None: continue ext = Path(path).suffix.lower() name = Path(path).name try: if ext == ".pdf": text = _load_pdf(path) elif ext == ".docx": text = _load_docx(path) elif ext in (".txt", ".md", ".csv"): text = _load_text(path) else: print(f"[Loader] Unsupported file type: {ext} — skipping {name}") continue if text.strip(): docs.append({"source": name, "text": text}) else: print(f"[Loader] Empty content from {name} — skipping") except Exception as e: print(f"[Loader] Failed to load {name}: {e}") return docs def _load_pdf(path: str) -> str: import fitz # PyMuPDF doc = fitz.open(path) pages = [] for page in doc: pages.append(page.get_text("text")) doc.close() return "\n".join(pages) def _load_docx(path: str) -> str: from docx import Document doc = Document(path) parts: list[str] = [] # Body paragraphs (existing) for p in doc.paragraphs: if p.text.strip(): parts.append(p.text.strip()) # Tables — previously skipped entirely for table in doc.tables: for row in table.rows: cells = [cell.text.strip() for cell in row.cells if cell.text.strip()] if cells: parts.append("\t".join(cells)) return "\n".join(parts) def _load_text(path: str) -> str: """Load plain text files. CSVs are parsed into natural-language row sentences.""" ext = Path(path).suffix.lower() if ext == ".csv": return _load_csv(path) with open(path, "r", encoding="utf-8", errors="ignore") as f: return f.read() def _load_csv(path: str) -> str: """ Parse a CSV file into natural-language sentences. Each row becomes: "ColumnA: value1. ColumnB: value2. ..." This makes tabular data semantically meaningful to the LLM rather than presenting it as raw comma-separated text. """ import csv rows: list[str] = [] with open(path, "r", encoding="utf-8", errors="ignore", newline="") as f: reader = csv.DictReader(f) if reader.fieldnames is None: # Fallback to raw text for headerless CSVs f.seek(0) return f.read() for row in reader: parts = [f"{col}: {val.strip()}" for col, val in row.items() if val and val.strip()] if parts: rows.append(". ".join(parts) + ".") return "\n".join(rows)