File size: 12,428 Bytes
4fb223c
 
 
 
e9e68a0
f9006d9
4fb223c
28454be
 
 
 
 
 
 
 
 
 
4fb223c
89c8b6a
c712e9a
89c8b6a
 
 
 
4fb223c
 
 
 
f42dd10
 
f9006d9
4fb223c
 
 
 
f9006d9
4fb223c
 
f9006d9
 
4fb223c
f9006d9
 
 
 
4fb223c
f9006d9
 
 
 
4fb223c
f9006d9
 
4fb223c
f9006d9
 
 
 
 
 
4fb223c
f9006d9
4fb223c
 
f9006d9
 
 
 
 
 
 
 
 
 
 
 
 
 
 
4fb223c
f9006d9
4fb223c
 
e9e68a0
 
 
 
 
 
 
 
 
 
 
 
 
 
4fb223c
e9e68a0
4fb223c
 
 
28454be
 
 
 
148671a
28454be
 
148671a
 
28454be
 
d942ae3
28454be
 
 
 
 
4fb223c
e9e68a0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
28454be
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
e9e68a0
 
 
 
 
 
 
 
 
 
 
 
 
4fb223c
 
e9e68a0
 
f9006d9
4fb223c
 
e9e68a0
 
 
 
 
4fb223c
 
 
 
f9006d9
 
 
 
e9e68a0
f9006d9
e9e68a0
 
 
 
f9006d9
 
 
 
 
 
 
 
 
4fb223c
 
 
 
 
 
 
 
 
 
 
 
cb01350
e9e68a0
 
 
 
 
4fb223c
 
148671a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
4fb223c
148671a
4fb223c
 
148671a
4fb223c
e9e68a0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
4fb223c
 
e9e68a0
4fb223c
 
 
e9e68a0
4fb223c
 
 
 
 
 
 
 
 
 
e9e68a0
4fb223c
e9e68a0
 
 
 
 
 
4fb223c
 
 
e9e68a0
 
 
 
 
 
 
 
 
 
 
 
 
 
28454be
 
 
 
 
 
 
e9e68a0
28454be
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
89c8b6a
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
import logging
import os
import uuid
from datetime import datetime, timezone
from typing import List, Optional
from langchain_core.documents import Document as LangChainDocument
from qdrant_client import QdrantClient
from qdrant_client.http.exceptions import UnexpectedResponse
from qdrant_client.models import (
    Distance,
    FieldCondition,
    Filter,
    MatchValue,
    PayloadSchemaType,
    PointStruct,
    VectorParams,
)

from rag.chunking import smart_chunking
from core.config import QDRANT_API_KEY, QDRANT_URL
from database.document_db import Document, DocumentChunk, SessionLocal
from rag.models import embeddings
from utils.text_utils import clean_text
from rag.vectorstore import load_documents_from_file

logger = logging.getLogger(__name__)

_ALLOWED_EXTENSIONS = {".pdf", ".docx", ".txt"}


def _load_documents_for_ingest(path: str, extension: str) -> List[LangChainDocument]:
    extension = extension.lower()
    if extension not in _ALLOWED_EXTENSIONS:
        raise ValueError(f"Unsupported file extension: {extension}")

    return load_documents_from_file(path, os.path.basename(path))


def _clean_documents_for_ingest(docs: List[LangChainDocument], source_name: str) -> List[LangChainDocument]:
    cleaned_docs: List[LangChainDocument] = []

    for index, doc in enumerate(docs, 1):
        cleaned = clean_text(doc.page_content)
        if not cleaned or len(cleaned.split()) < 20:
            continue

        metadata = doc.metadata.copy() if isinstance(doc.metadata, dict) else {}
        page_number = metadata.get("page")
        if page_number is None:
            page_number = index

        metadata["source_file"] = source_name
        metadata["page_number"] = page_number

        cleaned_docs.append(
            LangChainDocument(
                page_content=cleaned,
                metadata=metadata,
            )
        )

    return cleaned_docs


def chunk_documents_for_ingest(
    path: str,
    extension: str,
    source_name: str,
    source_relpath: str,
) -> List[LangChainDocument]:
    loaded_docs = _load_documents_for_ingest(path, extension)
    cleaned_docs = _clean_documents_for_ingest(loaded_docs, source_name)
    if not cleaned_docs:
        return []

    for doc in cleaned_docs:
        metadata = doc.metadata.copy() if isinstance(doc.metadata, dict) else {}
        metadata["source_relpath"] = source_relpath
        doc.metadata = metadata

    return [doc for doc in smart_chunking(cleaned_docs) if (doc.page_content or "").strip()]


def _parse_datetime(value: Optional[str]):
    raw = (value or "").strip()
    if not raw:
        return None

    normalized = raw.replace("Z", "+00:00")
    try:
        return datetime.fromisoformat(normalized)
    except ValueError:
        return None


def _ensure_qdrant_collection(client: QdrantClient, vector_size: int, collection_name: str) -> None:
    if not client.collection_exists(collection_name=collection_name):
        client.create_collection(
            collection_name=collection_name,
            vectors_config=VectorParams(size=vector_size, distance=Distance.COSINE),
        )

    _ensure_payload_indexes(client, collection_name)


def _ensure_payload_indexes(client: QdrantClient, collection_name: str) -> None:
    for field_name in ("object_path", "document_id"):
        client.create_payload_index(
            collection_name=collection_name,
            field_name=field_name,
            field_schema=PayloadSchemaType.KEYWORD,
            wait=True,
        )


def _is_missing_payload_index_error(error: Exception) -> bool:
    message = str(error)
    return "Index required but not found" in message


def _delete_existing_document_points(
    client: QdrantClient,
    collection_name: str,
    object_path: Optional[str],
    document_id: str,
) -> None:
    if object_path:
        point_filter = Filter(
            must=[
                FieldCondition(
                    key="object_path",
                    match=MatchValue(value=object_path),
                )
            ]
        )
    else:
        point_filter = Filter(
            must=[
                FieldCondition(
                    key="document_id",
                    match=MatchValue(value=document_id),
                )
            ]
        )

    try:
        client.delete(
            collection_name=collection_name,
            points_selector=point_filter,
            wait=True,
        )
    except UnexpectedResponse as error:
        if not _is_missing_payload_index_error(error):
            raise

        logger.warning(
            "Missing payload index detected while deleting old points in collection=%s. Rebuilding indexes and retrying once.",
            collection_name,
        )
        _ensure_payload_indexes(client, collection_name)
        client.delete(
            collection_name=collection_name,
            points_selector=point_filter,
            wait=True,
        )


def process_document_ingest(
    document_id: str,
    file_path: Optional[str] = None,
    collection_name: Optional[str] = None,
    source_path: Optional[str] = None,
    source_object_path: Optional[str] = None,
    source_updated_at: Optional[str] = None,
    source_etag: Optional[str] = None,
    cleanup_file: bool = False,
    size: Optional[int] = None,
) -> bool:
    db = SessionLocal()

    effective_file_path = (file_path or "").strip()
    effective_source_path = (source_path or "").strip()
    source_object_ref = (source_object_path or "").strip()

    try:
        document = db.query(Document).filter(Document.id == document_id).first()
        if document is None:
            logger.error("Document not found for ingest: %s", document_id)
            return False

        document.status = "processing"
        document.error_message = None
        db.commit()

        if not effective_file_path:
            raise ValueError("Supabase-only ingest requires downloaded file_path.")
        if not source_object_ref:
            raise ValueError("Supabase-only ingest requires source_object_path.")

        ingest_file_path = effective_file_path

        extension_source = source_object_ref or document.stored_name or ingest_file_path
        _, extension = os.path.splitext(extension_source)

        source_name = os.path.basename(source_object_ref or document.stored_name or ingest_file_path)
        source_relpath = source_object_ref or source_name
        chunk_docs = chunk_documents_for_ingest(
            path=ingest_file_path,
            extension=extension,
            source_name=source_name,
            source_relpath=source_relpath,
        )
        chunks = [doc.page_content for doc in chunk_docs]

        if not chunks:
            raise ValueError("Document has no readable content after normalization.")

        if not QDRANT_URL:
            raise ValueError("QDRANT_URL is required for ingest.")

        client = QdrantClient(url=QDRANT_URL, api_key=QDRANT_API_KEY or None)
        vectors = embeddings.embed_documents(chunks)

        if not vectors or not vectors[0]:
            raise ValueError("Failed to create embeddings for chunks.")
        target_collection = (collection_name or document.collection_name or "rag_docs" or "").strip()
        if not target_collection:
            raise ValueError("Target collection is empty.")

        _ensure_qdrant_collection(client, len(vectors[0]), target_collection)
        _delete_existing_document_points(client, target_collection, source_object_ref, document.id)

        created_at = datetime.now(timezone.utc).isoformat()
        points: List[PointStruct] = []
        db_chunk_rows: List[DocumentChunk] = []

        for index, (chunk_doc, vector) in enumerate(zip(chunk_docs, vectors)):
            chunk_text = chunk_doc.page_content
            metadata = chunk_doc.metadata if isinstance(chunk_doc.metadata, dict) else {}
            point_id = str(uuid.uuid4())
            payload = {
                "document_id": document.id,
                "filename": document.original_name,
                "stored_name": document.stored_name,
                "path": effective_source_path or document.path,
                "object_path": source_object_ref,
                "folder_key": document.folder_key,
                "collection_name": target_collection,
                "source_file": metadata.get("source_file") or source_name,
                "source_relpath": metadata.get("source_relpath") or source_relpath,
                "page_number": metadata.get("page_number"),
                "source_updated_at": source_updated_at,
                "source_etag": source_etag,
                "chunk_index": index,
                "created_at": created_at,
                "content": chunk_text,
            }

            points.append(PointStruct(id=point_id, vector=vector, payload=payload))
            db_chunk_rows.append(
                DocumentChunk(
                    document_id=document.id,
                    chunk_index=index,
                    content_preview=chunk_text[:200],
                    qdrant_point_id=point_id,
                )
            )

        client.upsert(collection_name=target_collection, points=points, wait=True)

        db.query(DocumentChunk).filter(DocumentChunk.document_id == document.id).delete()
        db.bulk_save_objects(db_chunk_rows)

        if effective_source_path:
            document.path = effective_source_path
        if source_object_ref:
            document.object_path = source_object_ref
        if source_etag:
            document.source_etag = source_etag
        if source_updated_at:
            parsed_source_updated = _parse_datetime(source_updated_at)
            if parsed_source_updated is not None:
                document.source_updated_at = parsed_source_updated
        if size is not None:
            document.size = int(size)

        document.collection_name = target_collection
        document.last_synced_at = datetime.now(timezone.utc)
        document.deleted_at = None
        document.total_chunks = len(chunks)
        document.status = "done"
        document.error_message = None
        db.commit()

        logger.info("Document ingest success. document_id=%s total_chunks=%s", document.id, len(chunks))
        return True
    except Exception as error:
        db.rollback()

        failed_doc = db.query(Document).filter(Document.id == document_id).first()
        if failed_doc is not None:
            failed_doc.status = "failed"
            failed_doc.error_message = str(error)
            db.commit()

        logger.exception("Document ingest failed. document_id=%s", document_id)
        return False
    finally:
        if cleanup_file and effective_file_path and os.path.exists(effective_file_path):
            try:
                os.remove(effective_file_path)
            except Exception:
                logger.exception("Failed to remove temporary ingest file: %s", effective_file_path)

        db.close()


def delete_vectors_for_object_path(collection_name: str, object_path: str) -> bool:
    if not QDRANT_URL:
        return False

    target_collection = (collection_name or "").strip()
    normalized_object_path = (object_path or "").strip()
    if not target_collection or not normalized_object_path:
        return False

    client = QdrantClient(url=QDRANT_URL, api_key=QDRANT_API_KEY or None)

    if not client.collection_exists(collection_name=target_collection):
        return False

    point_filter = Filter(
        must=[
            FieldCondition(
                key="object_path",
                match=MatchValue(value=normalized_object_path),
            )
        ]
    )

    try:
        _ensure_payload_indexes(client, target_collection)
        client.delete(
            collection_name=target_collection,
            points_selector=point_filter,
            wait=True,
        )
    except UnexpectedResponse as error:
        if not _is_missing_payload_index_error(error):
            raise

        logger.warning(
            "Missing payload index detected while deleting object_path in collection=%s. Rebuilding indexes and retrying once.",
            target_collection,
        )
        _ensure_payload_indexes(client, target_collection)
        client.delete(
            collection_name=target_collection,
            points_selector=point_filter,
            wait=True,
        )

    return True