Spaces:
Sleeping
Sleeping
File size: 5,090 Bytes
6ca2339 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 | """Document ingestion pipeline β .docx files to Qdrant vectors."""
import os
import re
from docx import Document
from app.config import DOCS_DIR, CHUNK_SIZE, CHUNK_OVERLAP, COLLECTION_NAME
from app.retriever import get_retriever
def find_all_docx(root_dir: str) -> list[str]:
"""Recursively find all .docx files under root_dir."""
docx_files = []
for dirpath, dirnames, filenames in os.walk(root_dir):
for fname in filenames:
if fname.endswith(".docx") and not fname.startswith("~$"):
docx_files.append(os.path.join(dirpath, fname))
return docx_files
def extract_text_from_docx(filepath: str) -> str:
"""Extract all text from a .docx file."""
doc = Document(filepath)
paragraphs = []
for para in doc.paragraphs:
text = para.text.strip()
if text:
paragraphs.append(text)
# Also extract text from tables
for table in doc.tables:
for row in table.rows:
row_text = " | ".join(cell.text.strip() for cell in row.cells if cell.text.strip())
if row_text:
paragraphs.append(row_text)
return "\n".join(paragraphs)
def clean_text(text: str) -> str:
"""Normalize and clean extracted text."""
# Normalize whitespace
text = re.sub(r"\s+", " ", text)
# Remove special unicode characters
text = text.encode("ascii", errors="ignore").decode("ascii")
# Strip leading/trailing whitespace
text = text.strip()
return text
def chunk_text(text: str, chunk_size: int = CHUNK_SIZE, overlap: int = CHUNK_OVERLAP) -> list[str]:
"""Split text into overlapping chunks."""
if not text:
return []
chunks = []
start = 0
text_len = len(text)
while start < text_len:
end = start + chunk_size
# Try to break at a sentence boundary
if end < text_len:
# Look for sentence-ending punctuation near the chunk boundary
search_zone = text[max(end - 80, start):end]
last_period = search_zone.rfind(". ")
last_newline = search_zone.rfind("\n")
break_point = max(last_period, last_newline)
if break_point != -1:
end = max(end - 80, start) + break_point + 1
chunk = text[start:end].strip()
if chunk and len(chunk) > 30: # Skip tiny fragments
chunks.append(chunk)
start = end - overlap if end < text_len else text_len
return chunks
def build_metadata(filepath: str, docs_root: str) -> dict:
"""Build metadata dict from file path."""
rel_path = os.path.relpath(filepath, docs_root)
parts = rel_path.replace("\\", "/").split("/")
source_file = parts[-1]
folder = "/".join(parts[:-1]) if len(parts) > 1 else "root"
# Infer department/grouping from folder structure
department = parts[1] if len(parts) > 2 else parts[0] if len(parts) > 1 else "general"
return {
"source_file": source_file,
"folder": folder,
"department": department,
"relative_path": rel_path,
}
def ingest_all_documents():
"""Main ingestion pipeline: find β extract β chunk β embed β store."""
print(f"Scanning for documents in: {DOCS_DIR}")
docx_files = find_all_docx(DOCS_DIR)
print(f"Found {len(docx_files)} .docx files")
if not docx_files:
print("No documents found. Exiting.")
return
retriever = get_retriever()
# Ensure collection exists
retriever.ensure_collection()
# Clear existing data for clean re-ingestion
try:
retriever.client.delete_collection(COLLECTION_NAME)
print(f"Cleared existing collection: {COLLECTION_NAME}")
retriever.ensure_collection()
except Exception:
pass
all_chunks = []
total_chars = 0
for i, filepath in enumerate(docx_files, 1):
rel_path = os.path.relpath(filepath, DOCS_DIR)
print(f" [{i}/{len(docx_files)}] Processing: {rel_path}")
try:
raw_text = extract_text_from_docx(filepath)
cleaned = clean_text(raw_text)
total_chars += len(cleaned)
if not cleaned:
print(f" β Skipped (empty after cleaning)")
continue
metadata = build_metadata(filepath, DOCS_DIR)
chunks = chunk_text(cleaned)
print(f" β {len(chunks)} chunks ({len(cleaned)} chars)")
for idx, chunk in enumerate(chunks):
all_chunks.append({
"text": chunk,
"metadata": {**metadata, "chunk_index": idx},
})
except Exception as e:
print(f" β ERROR: {e}")
print(f"\nTotal: {len(all_chunks)} chunks from {len(docx_files)} files ({total_chars:,} chars)")
print("Embedding and uploading to Qdrant...")
# Upsert all chunks
retriever.upsert_chunks(all_chunks)
# Verify
info = retriever.get_collection_info()
print(f"\nDone! Collection info: {info}")
if __name__ == "__main__":
ingest_all_documents()
|