File size: 9,721 Bytes
e9e68a0
 
4fb223c
e9e68a0
 
4fb223c
e9e68a0
 
4fb223c
89c8b6a
4fb223c
 
e9e68a0
4fb223c
 
 
 
 
e9e68a0
 
 
4fb223c
 
 
 
 
 
 
 
 
 
 
 
e9e68a0
 
 
 
 
 
 
4fb223c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
e9e68a0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
4fb223c
 
e9e68a0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
4fb223c
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
import json
import logging
import uuid
from datetime import datetime, timedelta, timezone
from typing import Any, Dict, List, Optional

from sqlalchemy import Column, DateTime, ForeignKey, Integer, String, Text, create_engine, func, inspect, or_, text
from sqlalchemy.orm import Session, declarative_base, relationship, sessionmaker

from core.config import DOCUMENTS_DATABASE_URL

Base = declarative_base()
logger = logging.getLogger(__name__)

_connect_args = {"check_same_thread": False} if DOCUMENTS_DATABASE_URL.startswith("sqlite") else {}
engine = create_engine(DOCUMENTS_DATABASE_URL, connect_args=_connect_args)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)

RETRY_BASE_SECONDS_DEFAULT = 60
RETRY_MAX_SECONDS_DEFAULT = 3600


def utcnow() -> datetime:
    return datetime.now(timezone.utc)


class Document(Base):
    __tablename__ = "documents"

    id = Column(String(36), primary_key=True, default=lambda: str(uuid.uuid4()))
    original_name = Column(String(512), nullable=False)
    stored_name = Column(String(512), nullable=False)
    path = Column(String(1024), nullable=False)
    object_path = Column(String(1024), nullable=True, unique=True, index=True)
    folder_key = Column(String(255), nullable=True)
    collection_name = Column(String(255), nullable=True)
    source_etag = Column(String(255), nullable=True)
    source_updated_at = Column(DateTime(timezone=True), nullable=True)
    deleted_at = Column(DateTime(timezone=True), nullable=True)
    last_synced_at = Column(DateTime(timezone=True), nullable=True)
    mime_type = Column(String(255), nullable=False)
    size = Column(Integer, nullable=False)
    status = Column(String(32), nullable=False, default="pending")
    total_chunks = Column(Integer, nullable=False, default=0)
    error_message = Column(Text, nullable=True)
    created_at = Column(DateTime(timezone=True), nullable=False, default=utcnow)

    chunks = relationship("DocumentChunk", back_populates="document", cascade="all, delete-orphan")


class DocumentChunk(Base):
    __tablename__ = "document_chunks"

    id = Column(Integer, primary_key=True, autoincrement=True)
    document_id = Column(String(36), ForeignKey("documents.id", ondelete="CASCADE"), nullable=False)
    chunk_index = Column(Integer, nullable=False)
    content_preview = Column(String(200), nullable=False)
    qdrant_point_id = Column(String(64), nullable=True)
    created_at = Column(DateTime(timezone=True), nullable=False, default=utcnow)

    document = relationship("Document", back_populates="chunks")


class DocumentSyncError(Base):
    __tablename__ = "document_sync_errors"

    id = Column(Integer, primary_key=True, autoincrement=True)
    object_path = Column(String(1024), nullable=False, index=True)
    folder_key = Column(String(255), nullable=True, index=True)
    collection_name = Column(String(255), nullable=True)
    operation = Column(String(64), nullable=False, index=True)
    error_message = Column(Text, nullable=False)
    payload_json = Column(Text, nullable=True)
    retry_count = Column(Integer, nullable=False, default=0)
    next_retry_at = Column(DateTime(timezone=True), nullable=True, index=True)
    last_error_at = Column(DateTime(timezone=True), nullable=False, default=utcnow)
    resolved_at = Column(DateTime(timezone=True), nullable=True, index=True)
    created_at = Column(DateTime(timezone=True), nullable=False, default=utcnow)
    updated_at = Column(DateTime(timezone=True), nullable=False, default=utcnow, onupdate=utcnow)


def _ensure_documents_schema_compatibility() -> None:
    inspector = inspect(engine)
    if not inspector.has_table("documents"):
        return

    existing_columns = {column["name"] for column in inspector.get_columns("documents")}
    ddl_by_column = {
        "object_path": "ALTER TABLE documents ADD COLUMN object_path VARCHAR(1024)",
        "folder_key": "ALTER TABLE documents ADD COLUMN folder_key VARCHAR(255)",
        "collection_name": "ALTER TABLE documents ADD COLUMN collection_name VARCHAR(255)",
        "source_etag": "ALTER TABLE documents ADD COLUMN source_etag VARCHAR(255)",
        "source_updated_at": "ALTER TABLE documents ADD COLUMN source_updated_at TIMESTAMP",
        "deleted_at": "ALTER TABLE documents ADD COLUMN deleted_at TIMESTAMP",
        "last_synced_at": "ALTER TABLE documents ADD COLUMN last_synced_at TIMESTAMP",
    }

    try:
        with engine.begin() as connection:
            for column_name, ddl in ddl_by_column.items():
                if column_name not in existing_columns:
                    connection.execute(text(ddl))

            connection.execute(
                text("CREATE UNIQUE INDEX IF NOT EXISTS ux_documents_object_path ON documents (object_path)")
            )
    except Exception:
        logger.exception("Failed to ensure documents schema compatibility")


def init_document_db() -> None:
    Base.metadata.create_all(bind=engine)
    _ensure_documents_schema_compatibility()


def _compute_next_retry_at(
    retry_count: int,
    base_retry_seconds: int = RETRY_BASE_SECONDS_DEFAULT,
    max_retry_seconds: int = RETRY_MAX_SECONDS_DEFAULT,
) -> datetime:
    safe_retry_count = max(1, int(retry_count))
    safe_base = max(1, int(base_retry_seconds))
    safe_max = max(safe_base, int(max_retry_seconds))

    delay_seconds = min(safe_max, safe_base * (2 ** (safe_retry_count - 1)))
    return utcnow() + timedelta(seconds=delay_seconds)


def log_document_sync_error(
    db: Session,
    *,
    object_path: str,
    operation: str,
    error_message: str,
    folder_key: Optional[str] = None,
    collection_name: Optional[str] = None,
    payload: Optional[Dict[str, Any]] = None,
    base_retry_seconds: int = RETRY_BASE_SECONDS_DEFAULT,
    max_retry_seconds: int = RETRY_MAX_SECONDS_DEFAULT,
) -> DocumentSyncError:
    normalized_path = (object_path or "").strip() or "__global__"
    normalized_operation = (operation or "").strip() or "sync"

    row = (
        db.query(DocumentSyncError)
        .filter(
            DocumentSyncError.object_path == normalized_path,
            DocumentSyncError.operation == normalized_operation,
            DocumentSyncError.resolved_at.is_(None),
        )
        .order_by(DocumentSyncError.last_error_at.desc())
        .first()
    )

    if row is None:
        row = DocumentSyncError(
            object_path=normalized_path,
            operation=normalized_operation,
            retry_count=0,
        )
        db.add(row)

    if folder_key is not None:
        row.folder_key = (folder_key or "").strip() or None
    if collection_name is not None:
        row.collection_name = (collection_name or "").strip() or None

    row.error_message = (error_message or "Unknown sync error").strip() or "Unknown sync error"
    row.payload_json = json.dumps(payload, ensure_ascii=False) if payload else None
    row.retry_count = int(row.retry_count or 0) + 1
    row.last_error_at = utcnow()
    row.next_retry_at = _compute_next_retry_at(
        retry_count=row.retry_count,
        base_retry_seconds=base_retry_seconds,
        max_retry_seconds=max_retry_seconds,
    )
    row.resolved_at = None

    db.commit()
    db.refresh(row)
    return row


def list_due_document_sync_errors(
    db: Session,
    limit: int = 100,
    as_of: Optional[datetime] = None,
) -> List[DocumentSyncError]:
    target_time = as_of or utcnow()
    safe_limit = max(1, min(int(limit or 100), 1000))

    return (
        db.query(DocumentSyncError)
        .filter(
            DocumentSyncError.resolved_at.is_(None),
            or_(
                DocumentSyncError.next_retry_at.is_(None),
                DocumentSyncError.next_retry_at <= target_time,
            ),
        )
        .order_by(DocumentSyncError.next_retry_at.asc(), DocumentSyncError.last_error_at.asc())
        .limit(safe_limit)
        .all()
    )


def mark_document_sync_error_resolved(
    db: Session,
    *,
    object_path: str,
    operation: Optional[str] = None,
) -> int:
    normalized_path = (object_path or "").strip() or "__global__"

    query = db.query(DocumentSyncError).filter(
        DocumentSyncError.object_path == normalized_path,
        DocumentSyncError.resolved_at.is_(None),
    )

    normalized_operation = (operation or "").strip()
    if normalized_operation:
        query = query.filter(DocumentSyncError.operation == normalized_operation)

    rows = query.all()
    if not rows:
        return 0

    resolved_time = utcnow()
    for row in rows:
        row.resolved_at = resolved_time

    db.commit()
    return len(rows)


def count_unresolved_document_sync_errors(db: Session) -> int:
    return int(
        db.query(func.count(DocumentSyncError.id))
        .filter(DocumentSyncError.resolved_at.is_(None))
        .scalar()
        or 0
    )


def list_active_collection_names(db: Session, limit: int = 3) -> List[str]:
    safe_limit = max(1, min(int(limit or 3), 100))

    rows = (
        db.query(
            Document.collection_name,
            func.count(Document.id).label("doc_count"),
            func.max(Document.last_synced_at).label("last_sync"),
        )
        .filter(
            Document.collection_name.isnot(None),
            Document.collection_name != "",
            Document.deleted_at.is_(None),
            Document.status == "done",
        )
        .group_by(Document.collection_name)
        .order_by(func.max(Document.last_synced_at).desc(), func.count(Document.id).desc())
        .limit(safe_limit)
        .all()
    )

    return [str(row[0]) for row in rows if row and row[0]]


def get_document_db():
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()