Spaces:
Sleeping
Sleeping
File size: 9,721 Bytes
e9e68a0 4fb223c e9e68a0 4fb223c e9e68a0 4fb223c 89c8b6a 4fb223c e9e68a0 4fb223c e9e68a0 4fb223c e9e68a0 4fb223c e9e68a0 4fb223c e9e68a0 4fb223c | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 | import json
import logging
import uuid
from datetime import datetime, timedelta, timezone
from typing import Any, Dict, List, Optional
from sqlalchemy import Column, DateTime, ForeignKey, Integer, String, Text, create_engine, func, inspect, or_, text
from sqlalchemy.orm import Session, declarative_base, relationship, sessionmaker
from core.config import DOCUMENTS_DATABASE_URL
Base = declarative_base()
logger = logging.getLogger(__name__)
_connect_args = {"check_same_thread": False} if DOCUMENTS_DATABASE_URL.startswith("sqlite") else {}
engine = create_engine(DOCUMENTS_DATABASE_URL, connect_args=_connect_args)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
RETRY_BASE_SECONDS_DEFAULT = 60
RETRY_MAX_SECONDS_DEFAULT = 3600
def utcnow() -> datetime:
return datetime.now(timezone.utc)
class Document(Base):
__tablename__ = "documents"
id = Column(String(36), primary_key=True, default=lambda: str(uuid.uuid4()))
original_name = Column(String(512), nullable=False)
stored_name = Column(String(512), nullable=False)
path = Column(String(1024), nullable=False)
object_path = Column(String(1024), nullable=True, unique=True, index=True)
folder_key = Column(String(255), nullable=True)
collection_name = Column(String(255), nullable=True)
source_etag = Column(String(255), nullable=True)
source_updated_at = Column(DateTime(timezone=True), nullable=True)
deleted_at = Column(DateTime(timezone=True), nullable=True)
last_synced_at = Column(DateTime(timezone=True), nullable=True)
mime_type = Column(String(255), nullable=False)
size = Column(Integer, nullable=False)
status = Column(String(32), nullable=False, default="pending")
total_chunks = Column(Integer, nullable=False, default=0)
error_message = Column(Text, nullable=True)
created_at = Column(DateTime(timezone=True), nullable=False, default=utcnow)
chunks = relationship("DocumentChunk", back_populates="document", cascade="all, delete-orphan")
class DocumentChunk(Base):
__tablename__ = "document_chunks"
id = Column(Integer, primary_key=True, autoincrement=True)
document_id = Column(String(36), ForeignKey("documents.id", ondelete="CASCADE"), nullable=False)
chunk_index = Column(Integer, nullable=False)
content_preview = Column(String(200), nullable=False)
qdrant_point_id = Column(String(64), nullable=True)
created_at = Column(DateTime(timezone=True), nullable=False, default=utcnow)
document = relationship("Document", back_populates="chunks")
class DocumentSyncError(Base):
__tablename__ = "document_sync_errors"
id = Column(Integer, primary_key=True, autoincrement=True)
object_path = Column(String(1024), nullable=False, index=True)
folder_key = Column(String(255), nullable=True, index=True)
collection_name = Column(String(255), nullable=True)
operation = Column(String(64), nullable=False, index=True)
error_message = Column(Text, nullable=False)
payload_json = Column(Text, nullable=True)
retry_count = Column(Integer, nullable=False, default=0)
next_retry_at = Column(DateTime(timezone=True), nullable=True, index=True)
last_error_at = Column(DateTime(timezone=True), nullable=False, default=utcnow)
resolved_at = Column(DateTime(timezone=True), nullable=True, index=True)
created_at = Column(DateTime(timezone=True), nullable=False, default=utcnow)
updated_at = Column(DateTime(timezone=True), nullable=False, default=utcnow, onupdate=utcnow)
def _ensure_documents_schema_compatibility() -> None:
inspector = inspect(engine)
if not inspector.has_table("documents"):
return
existing_columns = {column["name"] for column in inspector.get_columns("documents")}
ddl_by_column = {
"object_path": "ALTER TABLE documents ADD COLUMN object_path VARCHAR(1024)",
"folder_key": "ALTER TABLE documents ADD COLUMN folder_key VARCHAR(255)",
"collection_name": "ALTER TABLE documents ADD COLUMN collection_name VARCHAR(255)",
"source_etag": "ALTER TABLE documents ADD COLUMN source_etag VARCHAR(255)",
"source_updated_at": "ALTER TABLE documents ADD COLUMN source_updated_at TIMESTAMP",
"deleted_at": "ALTER TABLE documents ADD COLUMN deleted_at TIMESTAMP",
"last_synced_at": "ALTER TABLE documents ADD COLUMN last_synced_at TIMESTAMP",
}
try:
with engine.begin() as connection:
for column_name, ddl in ddl_by_column.items():
if column_name not in existing_columns:
connection.execute(text(ddl))
connection.execute(
text("CREATE UNIQUE INDEX IF NOT EXISTS ux_documents_object_path ON documents (object_path)")
)
except Exception:
logger.exception("Failed to ensure documents schema compatibility")
def init_document_db() -> None:
Base.metadata.create_all(bind=engine)
_ensure_documents_schema_compatibility()
def _compute_next_retry_at(
retry_count: int,
base_retry_seconds: int = RETRY_BASE_SECONDS_DEFAULT,
max_retry_seconds: int = RETRY_MAX_SECONDS_DEFAULT,
) -> datetime:
safe_retry_count = max(1, int(retry_count))
safe_base = max(1, int(base_retry_seconds))
safe_max = max(safe_base, int(max_retry_seconds))
delay_seconds = min(safe_max, safe_base * (2 ** (safe_retry_count - 1)))
return utcnow() + timedelta(seconds=delay_seconds)
def log_document_sync_error(
db: Session,
*,
object_path: str,
operation: str,
error_message: str,
folder_key: Optional[str] = None,
collection_name: Optional[str] = None,
payload: Optional[Dict[str, Any]] = None,
base_retry_seconds: int = RETRY_BASE_SECONDS_DEFAULT,
max_retry_seconds: int = RETRY_MAX_SECONDS_DEFAULT,
) -> DocumentSyncError:
normalized_path = (object_path or "").strip() or "__global__"
normalized_operation = (operation or "").strip() or "sync"
row = (
db.query(DocumentSyncError)
.filter(
DocumentSyncError.object_path == normalized_path,
DocumentSyncError.operation == normalized_operation,
DocumentSyncError.resolved_at.is_(None),
)
.order_by(DocumentSyncError.last_error_at.desc())
.first()
)
if row is None:
row = DocumentSyncError(
object_path=normalized_path,
operation=normalized_operation,
retry_count=0,
)
db.add(row)
if folder_key is not None:
row.folder_key = (folder_key or "").strip() or None
if collection_name is not None:
row.collection_name = (collection_name or "").strip() or None
row.error_message = (error_message or "Unknown sync error").strip() or "Unknown sync error"
row.payload_json = json.dumps(payload, ensure_ascii=False) if payload else None
row.retry_count = int(row.retry_count or 0) + 1
row.last_error_at = utcnow()
row.next_retry_at = _compute_next_retry_at(
retry_count=row.retry_count,
base_retry_seconds=base_retry_seconds,
max_retry_seconds=max_retry_seconds,
)
row.resolved_at = None
db.commit()
db.refresh(row)
return row
def list_due_document_sync_errors(
db: Session,
limit: int = 100,
as_of: Optional[datetime] = None,
) -> List[DocumentSyncError]:
target_time = as_of or utcnow()
safe_limit = max(1, min(int(limit or 100), 1000))
return (
db.query(DocumentSyncError)
.filter(
DocumentSyncError.resolved_at.is_(None),
or_(
DocumentSyncError.next_retry_at.is_(None),
DocumentSyncError.next_retry_at <= target_time,
),
)
.order_by(DocumentSyncError.next_retry_at.asc(), DocumentSyncError.last_error_at.asc())
.limit(safe_limit)
.all()
)
def mark_document_sync_error_resolved(
db: Session,
*,
object_path: str,
operation: Optional[str] = None,
) -> int:
normalized_path = (object_path or "").strip() or "__global__"
query = db.query(DocumentSyncError).filter(
DocumentSyncError.object_path == normalized_path,
DocumentSyncError.resolved_at.is_(None),
)
normalized_operation = (operation or "").strip()
if normalized_operation:
query = query.filter(DocumentSyncError.operation == normalized_operation)
rows = query.all()
if not rows:
return 0
resolved_time = utcnow()
for row in rows:
row.resolved_at = resolved_time
db.commit()
return len(rows)
def count_unresolved_document_sync_errors(db: Session) -> int:
return int(
db.query(func.count(DocumentSyncError.id))
.filter(DocumentSyncError.resolved_at.is_(None))
.scalar()
or 0
)
def list_active_collection_names(db: Session, limit: int = 3) -> List[str]:
safe_limit = max(1, min(int(limit or 3), 100))
rows = (
db.query(
Document.collection_name,
func.count(Document.id).label("doc_count"),
func.max(Document.last_synced_at).label("last_sync"),
)
.filter(
Document.collection_name.isnot(None),
Document.collection_name != "",
Document.deleted_at.is_(None),
Document.status == "done",
)
.group_by(Document.collection_name)
.order_by(func.max(Document.last_synced_at).desc(), func.count(Document.id).desc())
.limit(safe_limit)
.all()
)
return [str(row[0]) for row in rows if row and row[0]]
def get_document_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
|