Spaces:
Sleeping
Sleeping
| import json | |
| import logging | |
| import uuid | |
| from datetime import datetime, timedelta, timezone | |
| from typing import Any, Dict, List, Optional | |
| from sqlalchemy import Column, DateTime, ForeignKey, Integer, String, Text, create_engine, func, inspect, or_, text | |
| from sqlalchemy.orm import Session, declarative_base, relationship, sessionmaker | |
| from core.config import DOCUMENTS_DATABASE_URL | |
| Base = declarative_base() | |
| logger = logging.getLogger(__name__) | |
| _connect_args = {"check_same_thread": False} if DOCUMENTS_DATABASE_URL.startswith("sqlite") else {} | |
| engine = create_engine(DOCUMENTS_DATABASE_URL, connect_args=_connect_args) | |
| SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) | |
| RETRY_BASE_SECONDS_DEFAULT = 60 | |
| RETRY_MAX_SECONDS_DEFAULT = 3600 | |
| def utcnow() -> datetime: | |
| return datetime.now(timezone.utc) | |
| class Document(Base): | |
| __tablename__ = "documents" | |
| id = Column(String(36), primary_key=True, default=lambda: str(uuid.uuid4())) | |
| original_name = Column(String(512), nullable=False) | |
| stored_name = Column(String(512), nullable=False) | |
| path = Column(String(1024), nullable=False) | |
| object_path = Column(String(1024), nullable=True, unique=True, index=True) | |
| folder_key = Column(String(255), nullable=True) | |
| collection_name = Column(String(255), nullable=True) | |
| source_etag = Column(String(255), nullable=True) | |
| source_updated_at = Column(DateTime(timezone=True), nullable=True) | |
| deleted_at = Column(DateTime(timezone=True), nullable=True) | |
| last_synced_at = Column(DateTime(timezone=True), nullable=True) | |
| mime_type = Column(String(255), nullable=False) | |
| size = Column(Integer, nullable=False) | |
| status = Column(String(32), nullable=False, default="pending") | |
| total_chunks = Column(Integer, nullable=False, default=0) | |
| error_message = Column(Text, nullable=True) | |
| created_at = Column(DateTime(timezone=True), nullable=False, default=utcnow) | |
| chunks = relationship("DocumentChunk", back_populates="document", cascade="all, delete-orphan") | |
| class DocumentChunk(Base): | |
| __tablename__ = "document_chunks" | |
| id = Column(Integer, primary_key=True, autoincrement=True) | |
| document_id = Column(String(36), ForeignKey("documents.id", ondelete="CASCADE"), nullable=False) | |
| chunk_index = Column(Integer, nullable=False) | |
| content_preview = Column(String(200), nullable=False) | |
| qdrant_point_id = Column(String(64), nullable=True) | |
| created_at = Column(DateTime(timezone=True), nullable=False, default=utcnow) | |
| document = relationship("Document", back_populates="chunks") | |
| class DocumentSyncError(Base): | |
| __tablename__ = "document_sync_errors" | |
| id = Column(Integer, primary_key=True, autoincrement=True) | |
| object_path = Column(String(1024), nullable=False, index=True) | |
| folder_key = Column(String(255), nullable=True, index=True) | |
| collection_name = Column(String(255), nullable=True) | |
| operation = Column(String(64), nullable=False, index=True) | |
| error_message = Column(Text, nullable=False) | |
| payload_json = Column(Text, nullable=True) | |
| retry_count = Column(Integer, nullable=False, default=0) | |
| next_retry_at = Column(DateTime(timezone=True), nullable=True, index=True) | |
| last_error_at = Column(DateTime(timezone=True), nullable=False, default=utcnow) | |
| resolved_at = Column(DateTime(timezone=True), nullable=True, index=True) | |
| created_at = Column(DateTime(timezone=True), nullable=False, default=utcnow) | |
| updated_at = Column(DateTime(timezone=True), nullable=False, default=utcnow, onupdate=utcnow) | |
| def _ensure_documents_schema_compatibility() -> None: | |
| inspector = inspect(engine) | |
| if not inspector.has_table("documents"): | |
| return | |
| existing_columns = {column["name"] for column in inspector.get_columns("documents")} | |
| ddl_by_column = { | |
| "object_path": "ALTER TABLE documents ADD COLUMN object_path VARCHAR(1024)", | |
| "folder_key": "ALTER TABLE documents ADD COLUMN folder_key VARCHAR(255)", | |
| "collection_name": "ALTER TABLE documents ADD COLUMN collection_name VARCHAR(255)", | |
| "source_etag": "ALTER TABLE documents ADD COLUMN source_etag VARCHAR(255)", | |
| "source_updated_at": "ALTER TABLE documents ADD COLUMN source_updated_at TIMESTAMP", | |
| "deleted_at": "ALTER TABLE documents ADD COLUMN deleted_at TIMESTAMP", | |
| "last_synced_at": "ALTER TABLE documents ADD COLUMN last_synced_at TIMESTAMP", | |
| } | |
| try: | |
| with engine.begin() as connection: | |
| for column_name, ddl in ddl_by_column.items(): | |
| if column_name not in existing_columns: | |
| connection.execute(text(ddl)) | |
| connection.execute( | |
| text("CREATE UNIQUE INDEX IF NOT EXISTS ux_documents_object_path ON documents (object_path)") | |
| ) | |
| except Exception: | |
| logger.exception("Failed to ensure documents schema compatibility") | |
| def init_document_db() -> None: | |
| Base.metadata.create_all(bind=engine) | |
| _ensure_documents_schema_compatibility() | |
| def _compute_next_retry_at( | |
| retry_count: int, | |
| base_retry_seconds: int = RETRY_BASE_SECONDS_DEFAULT, | |
| max_retry_seconds: int = RETRY_MAX_SECONDS_DEFAULT, | |
| ) -> datetime: | |
| safe_retry_count = max(1, int(retry_count)) | |
| safe_base = max(1, int(base_retry_seconds)) | |
| safe_max = max(safe_base, int(max_retry_seconds)) | |
| delay_seconds = min(safe_max, safe_base * (2 ** (safe_retry_count - 1))) | |
| return utcnow() + timedelta(seconds=delay_seconds) | |
| def log_document_sync_error( | |
| db: Session, | |
| *, | |
| object_path: str, | |
| operation: str, | |
| error_message: str, | |
| folder_key: Optional[str] = None, | |
| collection_name: Optional[str] = None, | |
| payload: Optional[Dict[str, Any]] = None, | |
| base_retry_seconds: int = RETRY_BASE_SECONDS_DEFAULT, | |
| max_retry_seconds: int = RETRY_MAX_SECONDS_DEFAULT, | |
| ) -> DocumentSyncError: | |
| normalized_path = (object_path or "").strip() or "__global__" | |
| normalized_operation = (operation or "").strip() or "sync" | |
| row = ( | |
| db.query(DocumentSyncError) | |
| .filter( | |
| DocumentSyncError.object_path == normalized_path, | |
| DocumentSyncError.operation == normalized_operation, | |
| DocumentSyncError.resolved_at.is_(None), | |
| ) | |
| .order_by(DocumentSyncError.last_error_at.desc()) | |
| .first() | |
| ) | |
| if row is None: | |
| row = DocumentSyncError( | |
| object_path=normalized_path, | |
| operation=normalized_operation, | |
| retry_count=0, | |
| ) | |
| db.add(row) | |
| if folder_key is not None: | |
| row.folder_key = (folder_key or "").strip() or None | |
| if collection_name is not None: | |
| row.collection_name = (collection_name or "").strip() or None | |
| row.error_message = (error_message or "Unknown sync error").strip() or "Unknown sync error" | |
| row.payload_json = json.dumps(payload, ensure_ascii=False) if payload else None | |
| row.retry_count = int(row.retry_count or 0) + 1 | |
| row.last_error_at = utcnow() | |
| row.next_retry_at = _compute_next_retry_at( | |
| retry_count=row.retry_count, | |
| base_retry_seconds=base_retry_seconds, | |
| max_retry_seconds=max_retry_seconds, | |
| ) | |
| row.resolved_at = None | |
| db.commit() | |
| db.refresh(row) | |
| return row | |
| def list_due_document_sync_errors( | |
| db: Session, | |
| limit: int = 100, | |
| as_of: Optional[datetime] = None, | |
| ) -> List[DocumentSyncError]: | |
| target_time = as_of or utcnow() | |
| safe_limit = max(1, min(int(limit or 100), 1000)) | |
| return ( | |
| db.query(DocumentSyncError) | |
| .filter( | |
| DocumentSyncError.resolved_at.is_(None), | |
| or_( | |
| DocumentSyncError.next_retry_at.is_(None), | |
| DocumentSyncError.next_retry_at <= target_time, | |
| ), | |
| ) | |
| .order_by(DocumentSyncError.next_retry_at.asc(), DocumentSyncError.last_error_at.asc()) | |
| .limit(safe_limit) | |
| .all() | |
| ) | |
| def mark_document_sync_error_resolved( | |
| db: Session, | |
| *, | |
| object_path: str, | |
| operation: Optional[str] = None, | |
| ) -> int: | |
| normalized_path = (object_path or "").strip() or "__global__" | |
| query = db.query(DocumentSyncError).filter( | |
| DocumentSyncError.object_path == normalized_path, | |
| DocumentSyncError.resolved_at.is_(None), | |
| ) | |
| normalized_operation = (operation or "").strip() | |
| if normalized_operation: | |
| query = query.filter(DocumentSyncError.operation == normalized_operation) | |
| rows = query.all() | |
| if not rows: | |
| return 0 | |
| resolved_time = utcnow() | |
| for row in rows: | |
| row.resolved_at = resolved_time | |
| db.commit() | |
| return len(rows) | |
| def count_unresolved_document_sync_errors(db: Session) -> int: | |
| return int( | |
| db.query(func.count(DocumentSyncError.id)) | |
| .filter(DocumentSyncError.resolved_at.is_(None)) | |
| .scalar() | |
| or 0 | |
| ) | |
| def list_active_collection_names(db: Session, limit: int = 3) -> List[str]: | |
| safe_limit = max(1, min(int(limit or 3), 100)) | |
| rows = ( | |
| db.query( | |
| Document.collection_name, | |
| func.count(Document.id).label("doc_count"), | |
| func.max(Document.last_synced_at).label("last_sync"), | |
| ) | |
| .filter( | |
| Document.collection_name.isnot(None), | |
| Document.collection_name != "", | |
| Document.deleted_at.is_(None), | |
| Document.status == "done", | |
| ) | |
| .group_by(Document.collection_name) | |
| .order_by(func.max(Document.last_synced_at).desc(), func.count(Document.id).desc()) | |
| .limit(safe_limit) | |
| .all() | |
| ) | |
| return [str(row[0]) for row in rows if row and row[0]] | |
| def get_document_db(): | |
| db = SessionLocal() | |
| try: | |
| yield db | |
| finally: | |
| db.close() | |