File size: 5,008 Bytes
01d5a5d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
from typing import List, Optional, Dict
from sqlalchemy import select
from lpm_kernel.common.repository.base_repository import BaseRepository
from lpm_kernel.file_data.document import Document
from lpm_kernel.file_data.process_status import ProcessStatus
from lpm_kernel.file_data.document_dto import DocumentDTO
from lpm_kernel.file_data.models import ChunkModel, DocumentModel
from .dto.chunk_dto import ChunkDTO
import logging

logger = logging.getLogger(__name__)


class DocumentRepository(BaseRepository[Document]):
    def __init__(self):
        super().__init__(Document)

    def update_document_analysis(
        self, doc_id: int, insight: Dict, summary: Dict
    ) -> Optional[DocumentDTO]:
        """update doc's insight and summary"""
        with self._db.session() as session:
            document = session.get(self.model, doc_id)
            if document:
                document.insight = insight
                document.summary = summary
                document.analyze_status = ProcessStatus.SUCCESS
                session.commit()
                return Document.to_dto(document)
            return None

    def find_unanalyzed(self) -> List[DocumentDTO]:
        """search unanalyzed doc according to analyze_status"""
        with self._db.session() as session:
            query = select(self.model).where(
                self.model.analyze_status.in_([ProcessStatus.INITIALIZED, ProcessStatus.FAILED])
            )
            result = session.execute(query)
            return [Document.to_dto(doc) for doc in result.scalars().all()]

    def find_chunks(self, document_id: int) -> List[ChunkDTO]:
        """search all chunks of the specified document"""
        with self._db.session() as session:
            chunks = (
                session.query(ChunkModel)
                .filter(ChunkModel.document_id == document_id)
                .all()
            )
            return [
                ChunkDTO(
                    id=chunk.id,
                    document_id=chunk.document_id,
                    has_embedding=chunk.has_embedding,
                    # embedding=chunk.embedding,
                    length=len(chunk.content) if chunk.content else 0,
                    content=chunk.content,
                    tags=chunk.tags,
                    topic=chunk.topic,
                )
                for chunk in chunks
            ]

    def save_chunk(self, chunk: ChunkModel) -> ChunkModel:
        """save chunk"""
        with self._db.session() as session:
            session.add(chunk)
            session.flush()  # get auto-gen ID
            session.refresh(chunk)
            return chunk

    def find_one(self, document_id: int) -> Optional[DocumentDTO]:
        """search doc by id"""
        with self._db.session() as session:
            document = session.get(self.model, document_id)
            return Document.to_dto(document) if document else None

    def update_chunk_embedding_status(self, chunk_id: int, has_embedding: bool) -> None:
        """update chunk embedding"""
        try:
            with self._db.session() as session:
                chunk = (
                    session.query(ChunkModel).filter(ChunkModel.id == chunk_id).first()
                )
                if chunk:
                    chunk.has_embedding = has_embedding
                    session.commit()
                    logger.debug(f"Updated embedding status for chunk {chunk_id}")
                else:
                    logger.warning(f"Chunk not found with id: {chunk_id}")
        except Exception as e:
            logger.error(f"Error updating chunk embedding status: {str(e)}")
            raise

    def find_unembedding(self) -> List[DocumentDTO]:
        """search unembedding documents according to embedding_status"""
        with self._db.session() as session:
            query = select(self.model).where(
                self.model.embedding_status.in_([ProcessStatus.INITIALIZED, ProcessStatus.FAILED])
            )
            result = session.execute(query)
            return [Document.to_dto(doc) for doc in result.scalars().all()]

    def update_embedding_status(self, document_id: int, status: ProcessStatus) -> None:
        """update doc embedding"""
        try:
            with self._db.session() as session:
                document = (
                    session.query(DocumentModel)
                    .filter(DocumentModel.id == document_id)
                    .first()
                )
                if document:
                    document.embedding_status = status.value
                    session.commit()
                    logger.debug(
                        f"Updated embedding status for document {document_id} to {status.value}"
                    )
                else:
                    logger.warning(f"Document not found with id: {document_id}")
        except Exception as e:
            logger.error(f"Error updating document embedding status: {str(e)}")
            raise