stacklogix / app /ingestion.py
Deploy Bot
Deployment commit
6ca2339
"""Document ingestion pipeline β€” .docx files to Qdrant vectors."""
import os
import re
from docx import Document
from app.config import DOCS_DIR, CHUNK_SIZE, CHUNK_OVERLAP, COLLECTION_NAME
from app.retriever import get_retriever
def find_all_docx(root_dir: str) -> list[str]:
"""Recursively find all .docx files under root_dir."""
docx_files = []
for dirpath, dirnames, filenames in os.walk(root_dir):
for fname in filenames:
if fname.endswith(".docx") and not fname.startswith("~$"):
docx_files.append(os.path.join(dirpath, fname))
return docx_files
def extract_text_from_docx(filepath: str) -> str:
"""Extract all text from a .docx file."""
doc = Document(filepath)
paragraphs = []
for para in doc.paragraphs:
text = para.text.strip()
if text:
paragraphs.append(text)
# Also extract text from tables
for table in doc.tables:
for row in table.rows:
row_text = " | ".join(cell.text.strip() for cell in row.cells if cell.text.strip())
if row_text:
paragraphs.append(row_text)
return "\n".join(paragraphs)
def clean_text(text: str) -> str:
"""Normalize and clean extracted text."""
# Normalize whitespace
text = re.sub(r"\s+", " ", text)
# Remove special unicode characters
text = text.encode("ascii", errors="ignore").decode("ascii")
# Strip leading/trailing whitespace
text = text.strip()
return text
def chunk_text(text: str, chunk_size: int = CHUNK_SIZE, overlap: int = CHUNK_OVERLAP) -> list[str]:
"""Split text into overlapping chunks."""
if not text:
return []
chunks = []
start = 0
text_len = len(text)
while start < text_len:
end = start + chunk_size
# Try to break at a sentence boundary
if end < text_len:
# Look for sentence-ending punctuation near the chunk boundary
search_zone = text[max(end - 80, start):end]
last_period = search_zone.rfind(". ")
last_newline = search_zone.rfind("\n")
break_point = max(last_period, last_newline)
if break_point != -1:
end = max(end - 80, start) + break_point + 1
chunk = text[start:end].strip()
if chunk and len(chunk) > 30: # Skip tiny fragments
chunks.append(chunk)
start = end - overlap if end < text_len else text_len
return chunks
def build_metadata(filepath: str, docs_root: str) -> dict:
"""Build metadata dict from file path."""
rel_path = os.path.relpath(filepath, docs_root)
parts = rel_path.replace("\\", "/").split("/")
source_file = parts[-1]
folder = "/".join(parts[:-1]) if len(parts) > 1 else "root"
# Infer department/grouping from folder structure
department = parts[1] if len(parts) > 2 else parts[0] if len(parts) > 1 else "general"
return {
"source_file": source_file,
"folder": folder,
"department": department,
"relative_path": rel_path,
}
def ingest_all_documents():
"""Main ingestion pipeline: find β†’ extract β†’ chunk β†’ embed β†’ store."""
print(f"Scanning for documents in: {DOCS_DIR}")
docx_files = find_all_docx(DOCS_DIR)
print(f"Found {len(docx_files)} .docx files")
if not docx_files:
print("No documents found. Exiting.")
return
retriever = get_retriever()
# Ensure collection exists
retriever.ensure_collection()
# Clear existing data for clean re-ingestion
try:
retriever.client.delete_collection(COLLECTION_NAME)
print(f"Cleared existing collection: {COLLECTION_NAME}")
retriever.ensure_collection()
except Exception:
pass
all_chunks = []
total_chars = 0
for i, filepath in enumerate(docx_files, 1):
rel_path = os.path.relpath(filepath, DOCS_DIR)
print(f" [{i}/{len(docx_files)}] Processing: {rel_path}")
try:
raw_text = extract_text_from_docx(filepath)
cleaned = clean_text(raw_text)
total_chars += len(cleaned)
if not cleaned:
print(f" β†’ Skipped (empty after cleaning)")
continue
metadata = build_metadata(filepath, DOCS_DIR)
chunks = chunk_text(cleaned)
print(f" β†’ {len(chunks)} chunks ({len(cleaned)} chars)")
for idx, chunk in enumerate(chunks):
all_chunks.append({
"text": chunk,
"metadata": {**metadata, "chunk_index": idx},
})
except Exception as e:
print(f" β†’ ERROR: {e}")
print(f"\nTotal: {len(all_chunks)} chunks from {len(docx_files)} files ({total_chars:,} chars)")
print("Embedding and uploading to Qdrant...")
# Upsert all chunks
retriever.upsert_chunks(all_chunks)
# Verify
info = retriever.get_collection_info()
print(f"\nDone! Collection info: {info}")
if __name__ == "__main__":
ingest_all_documents()