Spaces:
Sleeping
Sleeping
| import logging | |
| import os | |
| import uuid | |
| from datetime import datetime, timezone | |
| from typing import List, Optional | |
| from langchain_core.documents import Document as LangChainDocument | |
| from qdrant_client import QdrantClient | |
| from qdrant_client.http.exceptions import UnexpectedResponse | |
| from qdrant_client.models import ( | |
| Distance, | |
| FieldCondition, | |
| Filter, | |
| MatchValue, | |
| PayloadSchemaType, | |
| PointStruct, | |
| VectorParams, | |
| ) | |
| from rag.chunking import smart_chunking | |
| from core.config import QDRANT_API_KEY, QDRANT_URL | |
| from database.document_db import Document, DocumentChunk, SessionLocal | |
| from rag.models import embeddings | |
| from utils.text_utils import clean_text | |
| from rag.vectorstore import load_documents_from_file | |
| logger = logging.getLogger(__name__) | |
| _ALLOWED_EXTENSIONS = {".pdf", ".docx", ".txt"} | |
| def _load_documents_for_ingest(path: str, extension: str) -> List[LangChainDocument]: | |
| extension = extension.lower() | |
| if extension not in _ALLOWED_EXTENSIONS: | |
| raise ValueError(f"Unsupported file extension: {extension}") | |
| return load_documents_from_file(path, os.path.basename(path)) | |
| def _clean_documents_for_ingest(docs: List[LangChainDocument], source_name: str) -> List[LangChainDocument]: | |
| cleaned_docs: List[LangChainDocument] = [] | |
| for index, doc in enumerate(docs, 1): | |
| cleaned = clean_text(doc.page_content) | |
| if not cleaned or len(cleaned.split()) < 20: | |
| continue | |
| metadata = doc.metadata.copy() if isinstance(doc.metadata, dict) else {} | |
| page_number = metadata.get("page") | |
| if page_number is None: | |
| page_number = index | |
| metadata["source_file"] = source_name | |
| metadata["page_number"] = page_number | |
| cleaned_docs.append( | |
| LangChainDocument( | |
| page_content=cleaned, | |
| metadata=metadata, | |
| ) | |
| ) | |
| return cleaned_docs | |
| def chunk_documents_for_ingest( | |
| path: str, | |
| extension: str, | |
| source_name: str, | |
| source_relpath: str, | |
| ) -> List[LangChainDocument]: | |
| loaded_docs = _load_documents_for_ingest(path, extension) | |
| cleaned_docs = _clean_documents_for_ingest(loaded_docs, source_name) | |
| if not cleaned_docs: | |
| return [] | |
| for doc in cleaned_docs: | |
| metadata = doc.metadata.copy() if isinstance(doc.metadata, dict) else {} | |
| metadata["source_relpath"] = source_relpath | |
| doc.metadata = metadata | |
| return [doc for doc in smart_chunking(cleaned_docs) if (doc.page_content or "").strip()] | |
| def _parse_datetime(value: Optional[str]): | |
| raw = (value or "").strip() | |
| if not raw: | |
| return None | |
| normalized = raw.replace("Z", "+00:00") | |
| try: | |
| return datetime.fromisoformat(normalized) | |
| except ValueError: | |
| return None | |
| def _ensure_qdrant_collection(client: QdrantClient, vector_size: int, collection_name: str) -> None: | |
| if not client.collection_exists(collection_name=collection_name): | |
| client.create_collection( | |
| collection_name=collection_name, | |
| vectors_config=VectorParams(size=vector_size, distance=Distance.COSINE), | |
| ) | |
| _ensure_payload_indexes(client, collection_name) | |
| def _ensure_payload_indexes(client: QdrantClient, collection_name: str) -> None: | |
| for field_name in ("object_path", "document_id"): | |
| client.create_payload_index( | |
| collection_name=collection_name, | |
| field_name=field_name, | |
| field_schema=PayloadSchemaType.KEYWORD, | |
| wait=True, | |
| ) | |
| def _is_missing_payload_index_error(error: Exception) -> bool: | |
| message = str(error) | |
| return "Index required but not found" in message | |
| def _delete_existing_document_points( | |
| client: QdrantClient, | |
| collection_name: str, | |
| object_path: Optional[str], | |
| document_id: str, | |
| ) -> None: | |
| if object_path: | |
| point_filter = Filter( | |
| must=[ | |
| FieldCondition( | |
| key="object_path", | |
| match=MatchValue(value=object_path), | |
| ) | |
| ] | |
| ) | |
| else: | |
| point_filter = Filter( | |
| must=[ | |
| FieldCondition( | |
| key="document_id", | |
| match=MatchValue(value=document_id), | |
| ) | |
| ] | |
| ) | |
| try: | |
| client.delete( | |
| collection_name=collection_name, | |
| points_selector=point_filter, | |
| wait=True, | |
| ) | |
| except UnexpectedResponse as error: | |
| if not _is_missing_payload_index_error(error): | |
| raise | |
| logger.warning( | |
| "Missing payload index detected while deleting old points in collection=%s. Rebuilding indexes and retrying once.", | |
| collection_name, | |
| ) | |
| _ensure_payload_indexes(client, collection_name) | |
| client.delete( | |
| collection_name=collection_name, | |
| points_selector=point_filter, | |
| wait=True, | |
| ) | |
| def process_document_ingest( | |
| document_id: str, | |
| file_path: Optional[str] = None, | |
| collection_name: Optional[str] = None, | |
| source_path: Optional[str] = None, | |
| source_object_path: Optional[str] = None, | |
| source_updated_at: Optional[str] = None, | |
| source_etag: Optional[str] = None, | |
| cleanup_file: bool = False, | |
| size: Optional[int] = None, | |
| ) -> bool: | |
| db = SessionLocal() | |
| effective_file_path = (file_path or "").strip() | |
| effective_source_path = (source_path or "").strip() | |
| source_object_ref = (source_object_path or "").strip() | |
| try: | |
| document = db.query(Document).filter(Document.id == document_id).first() | |
| if document is None: | |
| logger.error("Document not found for ingest: %s", document_id) | |
| return False | |
| document.status = "processing" | |
| document.error_message = None | |
| db.commit() | |
| if not effective_file_path: | |
| raise ValueError("Supabase-only ingest requires downloaded file_path.") | |
| if not source_object_ref: | |
| raise ValueError("Supabase-only ingest requires source_object_path.") | |
| ingest_file_path = effective_file_path | |
| extension_source = source_object_ref or document.stored_name or ingest_file_path | |
| _, extension = os.path.splitext(extension_source) | |
| source_name = os.path.basename(source_object_ref or document.stored_name or ingest_file_path) | |
| source_relpath = source_object_ref or source_name | |
| chunk_docs = chunk_documents_for_ingest( | |
| path=ingest_file_path, | |
| extension=extension, | |
| source_name=source_name, | |
| source_relpath=source_relpath, | |
| ) | |
| chunks = [doc.page_content for doc in chunk_docs] | |
| if not chunks: | |
| raise ValueError("Document has no readable content after normalization.") | |
| if not QDRANT_URL: | |
| raise ValueError("QDRANT_URL is required for ingest.") | |
| client = QdrantClient(url=QDRANT_URL, api_key=QDRANT_API_KEY or None) | |
| vectors = embeddings.embed_documents(chunks) | |
| if not vectors or not vectors[0]: | |
| raise ValueError("Failed to create embeddings for chunks.") | |
| target_collection = (collection_name or document.collection_name or "rag_docs" or "").strip() | |
| if not target_collection: | |
| raise ValueError("Target collection is empty.") | |
| _ensure_qdrant_collection(client, len(vectors[0]), target_collection) | |
| _delete_existing_document_points(client, target_collection, source_object_ref, document.id) | |
| created_at = datetime.now(timezone.utc).isoformat() | |
| points: List[PointStruct] = [] | |
| db_chunk_rows: List[DocumentChunk] = [] | |
| for index, (chunk_doc, vector) in enumerate(zip(chunk_docs, vectors)): | |
| chunk_text = chunk_doc.page_content | |
| metadata = chunk_doc.metadata if isinstance(chunk_doc.metadata, dict) else {} | |
| point_id = str(uuid.uuid4()) | |
| payload = { | |
| "document_id": document.id, | |
| "filename": document.original_name, | |
| "stored_name": document.stored_name, | |
| "path": effective_source_path or document.path, | |
| "object_path": source_object_ref, | |
| "folder_key": document.folder_key, | |
| "collection_name": target_collection, | |
| "source_file": metadata.get("source_file") or source_name, | |
| "source_relpath": metadata.get("source_relpath") or source_relpath, | |
| "page_number": metadata.get("page_number"), | |
| "source_updated_at": source_updated_at, | |
| "source_etag": source_etag, | |
| "chunk_index": index, | |
| "created_at": created_at, | |
| "content": chunk_text, | |
| } | |
| points.append(PointStruct(id=point_id, vector=vector, payload=payload)) | |
| db_chunk_rows.append( | |
| DocumentChunk( | |
| document_id=document.id, | |
| chunk_index=index, | |
| content_preview=chunk_text[:200], | |
| qdrant_point_id=point_id, | |
| ) | |
| ) | |
| client.upsert(collection_name=target_collection, points=points, wait=True) | |
| db.query(DocumentChunk).filter(DocumentChunk.document_id == document.id).delete() | |
| db.bulk_save_objects(db_chunk_rows) | |
| if effective_source_path: | |
| document.path = effective_source_path | |
| if source_object_ref: | |
| document.object_path = source_object_ref | |
| if source_etag: | |
| document.source_etag = source_etag | |
| if source_updated_at: | |
| parsed_source_updated = _parse_datetime(source_updated_at) | |
| if parsed_source_updated is not None: | |
| document.source_updated_at = parsed_source_updated | |
| if size is not None: | |
| document.size = int(size) | |
| document.collection_name = target_collection | |
| document.last_synced_at = datetime.now(timezone.utc) | |
| document.deleted_at = None | |
| document.total_chunks = len(chunks) | |
| document.status = "done" | |
| document.error_message = None | |
| db.commit() | |
| logger.info("Document ingest success. document_id=%s total_chunks=%s", document.id, len(chunks)) | |
| return True | |
| except Exception as error: | |
| db.rollback() | |
| failed_doc = db.query(Document).filter(Document.id == document_id).first() | |
| if failed_doc is not None: | |
| failed_doc.status = "failed" | |
| failed_doc.error_message = str(error) | |
| db.commit() | |
| logger.exception("Document ingest failed. document_id=%s", document_id) | |
| return False | |
| finally: | |
| if cleanup_file and effective_file_path and os.path.exists(effective_file_path): | |
| try: | |
| os.remove(effective_file_path) | |
| except Exception: | |
| logger.exception("Failed to remove temporary ingest file: %s", effective_file_path) | |
| db.close() | |
| def delete_vectors_for_object_path(collection_name: str, object_path: str) -> bool: | |
| if not QDRANT_URL: | |
| return False | |
| target_collection = (collection_name or "").strip() | |
| normalized_object_path = (object_path or "").strip() | |
| if not target_collection or not normalized_object_path: | |
| return False | |
| client = QdrantClient(url=QDRANT_URL, api_key=QDRANT_API_KEY or None) | |
| if not client.collection_exists(collection_name=target_collection): | |
| return False | |
| point_filter = Filter( | |
| must=[ | |
| FieldCondition( | |
| key="object_path", | |
| match=MatchValue(value=normalized_object_path), | |
| ) | |
| ] | |
| ) | |
| try: | |
| _ensure_payload_indexes(client, target_collection) | |
| client.delete( | |
| collection_name=target_collection, | |
| points_selector=point_filter, | |
| wait=True, | |
| ) | |
| except UnexpectedResponse as error: | |
| if not _is_missing_payload_index_error(error): | |
| raise | |
| logger.warning( | |
| "Missing payload index detected while deleting object_path in collection=%s. Rebuilding indexes and retrying once.", | |
| target_collection, | |
| ) | |
| _ensure_payload_indexes(client, target_collection) | |
| client.delete( | |
| collection_name=target_collection, | |
| points_selector=point_filter, | |
| wait=True, | |
| ) | |
| return True |