"""Document ingestion pipeline — .docx files to Qdrant vectors.""" import os import re from docx import Document from app.config import DOCS_DIR, CHUNK_SIZE, CHUNK_OVERLAP, COLLECTION_NAME from app.retriever import get_retriever def find_all_docx(root_dir: str) -> list[str]: """Recursively find all .docx files under root_dir.""" docx_files = [] for dirpath, dirnames, filenames in os.walk(root_dir): for fname in filenames: if fname.endswith(".docx") and not fname.startswith("~$"): docx_files.append(os.path.join(dirpath, fname)) return docx_files def extract_text_from_docx(filepath: str) -> str: """Extract all text from a .docx file.""" doc = Document(filepath) paragraphs = [] for para in doc.paragraphs: text = para.text.strip() if text: paragraphs.append(text) # Also extract text from tables for table in doc.tables: for row in table.rows: row_text = " | ".join(cell.text.strip() for cell in row.cells if cell.text.strip()) if row_text: paragraphs.append(row_text) return "\n".join(paragraphs) def clean_text(text: str) -> str: """Normalize and clean extracted text.""" # Normalize whitespace text = re.sub(r"\s+", " ", text) # Remove special unicode characters text = text.encode("ascii", errors="ignore").decode("ascii") # Strip leading/trailing whitespace text = text.strip() return text def chunk_text(text: str, chunk_size: int = CHUNK_SIZE, overlap: int = CHUNK_OVERLAP) -> list[str]: """Split text into overlapping chunks.""" if not text: return [] chunks = [] start = 0 text_len = len(text) while start < text_len: end = start + chunk_size # Try to break at a sentence boundary if end < text_len: # Look for sentence-ending punctuation near the chunk boundary search_zone = text[max(end - 80, start):end] last_period = search_zone.rfind(". ") last_newline = search_zone.rfind("\n") break_point = max(last_period, last_newline) if break_point != -1: end = max(end - 80, start) + break_point + 1 chunk = text[start:end].strip() if chunk and len(chunk) > 30: # Skip tiny fragments chunks.append(chunk) start = end - overlap if end < text_len else text_len return chunks def build_metadata(filepath: str, docs_root: str) -> dict: """Build metadata dict from file path.""" rel_path = os.path.relpath(filepath, docs_root) parts = rel_path.replace("\\", "/").split("/") source_file = parts[-1] folder = "/".join(parts[:-1]) if len(parts) > 1 else "root" # Infer department/grouping from folder structure department = parts[1] if len(parts) > 2 else parts[0] if len(parts) > 1 else "general" return { "source_file": source_file, "folder": folder, "department": department, "relative_path": rel_path, } def ingest_all_documents(): """Main ingestion pipeline: find → extract → chunk → embed → store.""" print(f"Scanning for documents in: {DOCS_DIR}") docx_files = find_all_docx(DOCS_DIR) print(f"Found {len(docx_files)} .docx files") if not docx_files: print("No documents found. Exiting.") return retriever = get_retriever() # Ensure collection exists retriever.ensure_collection() # Clear existing data for clean re-ingestion try: retriever.client.delete_collection(COLLECTION_NAME) print(f"Cleared existing collection: {COLLECTION_NAME}") retriever.ensure_collection() except Exception: pass all_chunks = [] total_chars = 0 for i, filepath in enumerate(docx_files, 1): rel_path = os.path.relpath(filepath, DOCS_DIR) print(f" [{i}/{len(docx_files)}] Processing: {rel_path}") try: raw_text = extract_text_from_docx(filepath) cleaned = clean_text(raw_text) total_chars += len(cleaned) if not cleaned: print(f" → Skipped (empty after cleaning)") continue metadata = build_metadata(filepath, DOCS_DIR) chunks = chunk_text(cleaned) print(f" → {len(chunks)} chunks ({len(cleaned)} chars)") for idx, chunk in enumerate(chunks): all_chunks.append({ "text": chunk, "metadata": {**metadata, "chunk_index": idx}, }) except Exception as e: print(f" → ERROR: {e}") print(f"\nTotal: {len(all_chunks)} chunks from {len(docx_files)} files ({total_chars:,} chars)") print("Embedding and uploading to Qdrant...") # Upsert all chunks retriever.upsert_chunks(all_chunks) # Verify info = retriever.get_collection_info() print(f"\nDone! Collection info: {info}") if __name__ == "__main__": ingest_all_documents()