secondme-api / lpm_kernel /file_data /document_repository.py
Gemini
feat: add detailed logging
01d5a5d
from typing import List, Optional, Dict
from sqlalchemy import select
from lpm_kernel.common.repository.base_repository import BaseRepository
from lpm_kernel.file_data.document import Document
from lpm_kernel.file_data.process_status import ProcessStatus
from lpm_kernel.file_data.document_dto import DocumentDTO
from lpm_kernel.file_data.models import ChunkModel, DocumentModel
from .dto.chunk_dto import ChunkDTO
import logging
logger = logging.getLogger(__name__)
class DocumentRepository(BaseRepository[Document]):
def __init__(self):
super().__init__(Document)
def update_document_analysis(
self, doc_id: int, insight: Dict, summary: Dict
) -> Optional[DocumentDTO]:
"""update doc's insight and summary"""
with self._db.session() as session:
document = session.get(self.model, doc_id)
if document:
document.insight = insight
document.summary = summary
document.analyze_status = ProcessStatus.SUCCESS
session.commit()
return Document.to_dto(document)
return None
def find_unanalyzed(self) -> List[DocumentDTO]:
"""search unanalyzed doc according to analyze_status"""
with self._db.session() as session:
query = select(self.model).where(
self.model.analyze_status.in_([ProcessStatus.INITIALIZED, ProcessStatus.FAILED])
)
result = session.execute(query)
return [Document.to_dto(doc) for doc in result.scalars().all()]
def find_chunks(self, document_id: int) -> List[ChunkDTO]:
"""search all chunks of the specified document"""
with self._db.session() as session:
chunks = (
session.query(ChunkModel)
.filter(ChunkModel.document_id == document_id)
.all()
)
return [
ChunkDTO(
id=chunk.id,
document_id=chunk.document_id,
has_embedding=chunk.has_embedding,
# embedding=chunk.embedding,
length=len(chunk.content) if chunk.content else 0,
content=chunk.content,
tags=chunk.tags,
topic=chunk.topic,
)
for chunk in chunks
]
def save_chunk(self, chunk: ChunkModel) -> ChunkModel:
"""save chunk"""
with self._db.session() as session:
session.add(chunk)
session.flush() # get auto-gen ID
session.refresh(chunk)
return chunk
def find_one(self, document_id: int) -> Optional[DocumentDTO]:
"""search doc by id"""
with self._db.session() as session:
document = session.get(self.model, document_id)
return Document.to_dto(document) if document else None
def update_chunk_embedding_status(self, chunk_id: int, has_embedding: bool) -> None:
"""update chunk embedding"""
try:
with self._db.session() as session:
chunk = (
session.query(ChunkModel).filter(ChunkModel.id == chunk_id).first()
)
if chunk:
chunk.has_embedding = has_embedding
session.commit()
logger.debug(f"Updated embedding status for chunk {chunk_id}")
else:
logger.warning(f"Chunk not found with id: {chunk_id}")
except Exception as e:
logger.error(f"Error updating chunk embedding status: {str(e)}")
raise
def find_unembedding(self) -> List[DocumentDTO]:
"""search unembedding documents according to embedding_status"""
with self._db.session() as session:
query = select(self.model).where(
self.model.embedding_status.in_([ProcessStatus.INITIALIZED, ProcessStatus.FAILED])
)
result = session.execute(query)
return [Document.to_dto(doc) for doc in result.scalars().all()]
def update_embedding_status(self, document_id: int, status: ProcessStatus) -> None:
"""update doc embedding"""
try:
with self._db.session() as session:
document = (
session.query(DocumentModel)
.filter(DocumentModel.id == document_id)
.first()
)
if document:
document.embedding_status = status.value
session.commit()
logger.debug(
f"Updated embedding status for document {document_id} to {status.value}"
)
else:
logger.warning(f"Document not found with id: {document_id}")
except Exception as e:
logger.error(f"Error updating document embedding status: {str(e)}")
raise