File size: 5,090 Bytes
6ca2339
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
"""Document ingestion pipeline β€” .docx files to Qdrant vectors."""

import os
import re
from docx import Document

from app.config import DOCS_DIR, CHUNK_SIZE, CHUNK_OVERLAP, COLLECTION_NAME
from app.retriever import get_retriever


def find_all_docx(root_dir: str) -> list[str]:
    """Recursively find all .docx files under root_dir."""
    docx_files = []
    for dirpath, dirnames, filenames in os.walk(root_dir):
        for fname in filenames:
            if fname.endswith(".docx") and not fname.startswith("~$"):
                docx_files.append(os.path.join(dirpath, fname))
    return docx_files


def extract_text_from_docx(filepath: str) -> str:
    """Extract all text from a .docx file."""
    doc = Document(filepath)
    paragraphs = []
    for para in doc.paragraphs:
        text = para.text.strip()
        if text:
            paragraphs.append(text)

    # Also extract text from tables
    for table in doc.tables:
        for row in table.rows:
            row_text = " | ".join(cell.text.strip() for cell in row.cells if cell.text.strip())
            if row_text:
                paragraphs.append(row_text)

    return "\n".join(paragraphs)


def clean_text(text: str) -> str:
    """Normalize and clean extracted text."""
    # Normalize whitespace
    text = re.sub(r"\s+", " ", text)
    # Remove special unicode characters
    text = text.encode("ascii", errors="ignore").decode("ascii")
    # Strip leading/trailing whitespace
    text = text.strip()
    return text


def chunk_text(text: str, chunk_size: int = CHUNK_SIZE, overlap: int = CHUNK_OVERLAP) -> list[str]:
    """Split text into overlapping chunks."""
    if not text:
        return []

    chunks = []
    start = 0
    text_len = len(text)

    while start < text_len:
        end = start + chunk_size

        # Try to break at a sentence boundary
        if end < text_len:
            # Look for sentence-ending punctuation near the chunk boundary
            search_zone = text[max(end - 80, start):end]
            last_period = search_zone.rfind(". ")
            last_newline = search_zone.rfind("\n")
            break_point = max(last_period, last_newline)

            if break_point != -1:
                end = max(end - 80, start) + break_point + 1

        chunk = text[start:end].strip()
        if chunk and len(chunk) > 30:  # Skip tiny fragments
            chunks.append(chunk)

        start = end - overlap if end < text_len else text_len

    return chunks


def build_metadata(filepath: str, docs_root: str) -> dict:
    """Build metadata dict from file path."""
    rel_path = os.path.relpath(filepath, docs_root)
    parts = rel_path.replace("\\", "/").split("/")

    source_file = parts[-1]
    folder = "/".join(parts[:-1]) if len(parts) > 1 else "root"

    # Infer department/grouping from folder structure
    department = parts[1] if len(parts) > 2 else parts[0] if len(parts) > 1 else "general"

    return {
        "source_file": source_file,
        "folder": folder,
        "department": department,
        "relative_path": rel_path,
    }


def ingest_all_documents():
    """Main ingestion pipeline: find β†’ extract β†’ chunk β†’ embed β†’ store."""
    print(f"Scanning for documents in: {DOCS_DIR}")
    docx_files = find_all_docx(DOCS_DIR)
    print(f"Found {len(docx_files)} .docx files")

    if not docx_files:
        print("No documents found. Exiting.")
        return

    retriever = get_retriever()

    # Ensure collection exists
    retriever.ensure_collection()

    # Clear existing data for clean re-ingestion
    try:
        retriever.client.delete_collection(COLLECTION_NAME)
        print(f"Cleared existing collection: {COLLECTION_NAME}")
        retriever.ensure_collection()
    except Exception:
        pass

    all_chunks = []
    total_chars = 0

    for i, filepath in enumerate(docx_files, 1):
        rel_path = os.path.relpath(filepath, DOCS_DIR)
        print(f"  [{i}/{len(docx_files)}] Processing: {rel_path}")

        try:
            raw_text = extract_text_from_docx(filepath)
            cleaned = clean_text(raw_text)
            total_chars += len(cleaned)

            if not cleaned:
                print(f"    β†’ Skipped (empty after cleaning)")
                continue

            metadata = build_metadata(filepath, DOCS_DIR)
            chunks = chunk_text(cleaned)
            print(f"    β†’ {len(chunks)} chunks ({len(cleaned)} chars)")

            for idx, chunk in enumerate(chunks):
                all_chunks.append({
                    "text": chunk,
                    "metadata": {**metadata, "chunk_index": idx},
                })

        except Exception as e:
            print(f"    β†’ ERROR: {e}")

    print(f"\nTotal: {len(all_chunks)} chunks from {len(docx_files)} files ({total_chars:,} chars)")
    print("Embedding and uploading to Qdrant...")

    # Upsert all chunks
    retriever.upsert_chunks(all_chunks)

    # Verify
    info = retriever.get_collection_info()
    print(f"\nDone! Collection info: {info}")


if __name__ == "__main__":
    ingest_all_documents()