Spaces:
Sleeping
Sleeping
| """Document ingestion pipeline β .docx files to Qdrant vectors.""" | |
| import os | |
| import re | |
| from docx import Document | |
| from app.config import DOCS_DIR, CHUNK_SIZE, CHUNK_OVERLAP, COLLECTION_NAME | |
| from app.retriever import get_retriever | |
| def find_all_docx(root_dir: str) -> list[str]: | |
| """Recursively find all .docx files under root_dir.""" | |
| docx_files = [] | |
| for dirpath, dirnames, filenames in os.walk(root_dir): | |
| for fname in filenames: | |
| if fname.endswith(".docx") and not fname.startswith("~$"): | |
| docx_files.append(os.path.join(dirpath, fname)) | |
| return docx_files | |
| def extract_text_from_docx(filepath: str) -> str: | |
| """Extract all text from a .docx file.""" | |
| doc = Document(filepath) | |
| paragraphs = [] | |
| for para in doc.paragraphs: | |
| text = para.text.strip() | |
| if text: | |
| paragraphs.append(text) | |
| # Also extract text from tables | |
| for table in doc.tables: | |
| for row in table.rows: | |
| row_text = " | ".join(cell.text.strip() for cell in row.cells if cell.text.strip()) | |
| if row_text: | |
| paragraphs.append(row_text) | |
| return "\n".join(paragraphs) | |
| def clean_text(text: str) -> str: | |
| """Normalize and clean extracted text.""" | |
| # Normalize whitespace | |
| text = re.sub(r"\s+", " ", text) | |
| # Remove special unicode characters | |
| text = text.encode("ascii", errors="ignore").decode("ascii") | |
| # Strip leading/trailing whitespace | |
| text = text.strip() | |
| return text | |
| def chunk_text(text: str, chunk_size: int = CHUNK_SIZE, overlap: int = CHUNK_OVERLAP) -> list[str]: | |
| """Split text into overlapping chunks.""" | |
| if not text: | |
| return [] | |
| chunks = [] | |
| start = 0 | |
| text_len = len(text) | |
| while start < text_len: | |
| end = start + chunk_size | |
| # Try to break at a sentence boundary | |
| if end < text_len: | |
| # Look for sentence-ending punctuation near the chunk boundary | |
| search_zone = text[max(end - 80, start):end] | |
| last_period = search_zone.rfind(". ") | |
| last_newline = search_zone.rfind("\n") | |
| break_point = max(last_period, last_newline) | |
| if break_point != -1: | |
| end = max(end - 80, start) + break_point + 1 | |
| chunk = text[start:end].strip() | |
| if chunk and len(chunk) > 30: # Skip tiny fragments | |
| chunks.append(chunk) | |
| start = end - overlap if end < text_len else text_len | |
| return chunks | |
| def build_metadata(filepath: str, docs_root: str) -> dict: | |
| """Build metadata dict from file path.""" | |
| rel_path = os.path.relpath(filepath, docs_root) | |
| parts = rel_path.replace("\\", "/").split("/") | |
| source_file = parts[-1] | |
| folder = "/".join(parts[:-1]) if len(parts) > 1 else "root" | |
| # Infer department/grouping from folder structure | |
| department = parts[1] if len(parts) > 2 else parts[0] if len(parts) > 1 else "general" | |
| return { | |
| "source_file": source_file, | |
| "folder": folder, | |
| "department": department, | |
| "relative_path": rel_path, | |
| } | |
| def ingest_all_documents(): | |
| """Main ingestion pipeline: find β extract β chunk β embed β store.""" | |
| print(f"Scanning for documents in: {DOCS_DIR}") | |
| docx_files = find_all_docx(DOCS_DIR) | |
| print(f"Found {len(docx_files)} .docx files") | |
| if not docx_files: | |
| print("No documents found. Exiting.") | |
| return | |
| retriever = get_retriever() | |
| # Ensure collection exists | |
| retriever.ensure_collection() | |
| # Clear existing data for clean re-ingestion | |
| try: | |
| retriever.client.delete_collection(COLLECTION_NAME) | |
| print(f"Cleared existing collection: {COLLECTION_NAME}") | |
| retriever.ensure_collection() | |
| except Exception: | |
| pass | |
| all_chunks = [] | |
| total_chars = 0 | |
| for i, filepath in enumerate(docx_files, 1): | |
| rel_path = os.path.relpath(filepath, DOCS_DIR) | |
| print(f" [{i}/{len(docx_files)}] Processing: {rel_path}") | |
| try: | |
| raw_text = extract_text_from_docx(filepath) | |
| cleaned = clean_text(raw_text) | |
| total_chars += len(cleaned) | |
| if not cleaned: | |
| print(f" β Skipped (empty after cleaning)") | |
| continue | |
| metadata = build_metadata(filepath, DOCS_DIR) | |
| chunks = chunk_text(cleaned) | |
| print(f" β {len(chunks)} chunks ({len(cleaned)} chars)") | |
| for idx, chunk in enumerate(chunks): | |
| all_chunks.append({ | |
| "text": chunk, | |
| "metadata": {**metadata, "chunk_index": idx}, | |
| }) | |
| except Exception as e: | |
| print(f" β ERROR: {e}") | |
| print(f"\nTotal: {len(all_chunks)} chunks from {len(docx_files)} files ({total_chars:,} chars)") | |
| print("Embedding and uploading to Qdrant...") | |
| # Upsert all chunks | |
| retriever.upsert_chunks(all_chunks) | |
| # Verify | |
| info = retriever.get_collection_info() | |
| print(f"\nDone! Collection info: {info}") | |
| if __name__ == "__main__": | |
| ingest_all_documents() | |