File size: 9,527 Bytes
5dccc28
62d2116
 
5dccc28
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
62d2116
5dccc28
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
d197c9d
5dccc28
62d2116
 
 
aefb7b1
d197c9d
aefb7b1
 
62d2116
d197c9d
62d2116
 
 
 
 
 
aefb7b1
62d2116
 
 
 
 
 
 
 
 
 
aefb7b1
 
 
 
d197c9d
 
aefb7b1
 
62d2116
 
 
 
 
 
 
 
 
 
 
 
d197c9d
62d2116
d197c9d
62d2116
d197c9d
5dccc28
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
import hashlib
import json
import re

from fastapi import UploadFile
from langchain_groq import ChatGroq
from sqlalchemy import func, select
from sqlalchemy.orm import Session

from app.config import get_settings
from app.models import Document, DocumentChunk, User, UserDocument
from app.services.pdf_utils import extract_pdf_pages_from_bytes, extract_pdf_text_from_bytes
from app.services.storage_service import StorageService
from app.services.vector_store import VectorStoreService


class DocumentService:
    def __init__(self) -> None:
        self.settings = get_settings()
        self.storage = StorageService()
        self.vector_store = VectorStoreService()
        self.summarizer = None
        self.matcher_llm = None

    async def save_upload(self, upload: UploadFile) -> tuple[bytes, str]:
        content = await upload.read()
        file_hash = hashlib.sha256(content).hexdigest()
        return content, file_hash

    def get_or_create_document(self, *, db: Session, user: User, upload: UploadFile, content: bytes, file_hash: str) -> tuple[Document, bool, bool]:
        existing_document = db.scalar(select(Document).where(Document.file_hash == file_hash))
        created = False
        processed = False

        if existing_document is None:
            file_path = self.storage.save_pdf(
                file_hash=file_hash,
                filename=upload.filename or "document.pdf",
                content=content,
            )
            preview_text, page_count = extract_pdf_text_from_bytes(content, max_pages=10)
            full_pages, _ = extract_pdf_pages_from_bytes(content)
            summary = self._summarize_preview(preview_text, upload.filename or "document.pdf")
            existing_document = Document(
                filename=upload.filename or "document.pdf",
                file_hash=file_hash,
                file_path=file_path,
                page_count=page_count,
                summary=summary,
                extracted_preview=preview_text[:8000],
                processing_status="completed",
            )
            db.add(existing_document)
            db.flush()
            self.vector_store.add_document(
                db=db,
                document_id=existing_document.id,
                file_hash=file_hash,
                filename=existing_document.filename,
                pages=full_pages,
            )
            created = True
            processed = True
        else:
            needs_page_reindex = db.scalar(
                select(DocumentChunk.id)
                .where(DocumentChunk.document_id == existing_document.id, DocumentChunk.page_number.is_(None))
                .limit(1)
            )
            if needs_page_reindex:
                content_bytes = self.storage.read_file_bytes(file_path=existing_document.file_path)
                full_pages, _ = extract_pdf_pages_from_bytes(content_bytes)
                self.vector_store.add_document(
                    db=db,
                    document_id=existing_document.id,
                    file_hash=existing_document.file_hash,
                    filename=existing_document.filename,
                    pages=full_pages,
                )
                processed = True

        link = db.scalar(
            select(UserDocument).where(
                UserDocument.user_id == user.id,
                UserDocument.document_id == existing_document.id,
            )
        )
        if link is None:
            db.add(UserDocument(user_id=user.id, document_id=existing_document.id))
            db.flush()

        return existing_document, created, processed

    def list_user_documents(self, db: Session, user: User) -> list[Document]:
        stmt = (
            select(Document)
            .join(UserDocument, UserDocument.document_id == Document.id)
            .where(UserDocument.user_id == user.id)
            .order_by(Document.created_at.desc())
        )
        return list(db.scalars(stmt))

    def delete_user_document(self, db: Session, *, user: User, document_id: int) -> dict[str, str | bool]:
        link = db.scalar(
            select(UserDocument).where(
                UserDocument.user_id == user.id,
                UserDocument.document_id == document_id,
            )
        )
        if link is None:
            raise ValueError("Document not found for this user.")

        document = db.get(Document, document_id)
        if document is None:
            raise ValueError("Document does not exist.")

        db.delete(link)
        db.flush()

        remaining_links = db.scalar(select(func.count()).select_from(UserDocument).where(UserDocument.document_id == document_id)) or 0
        deleted_shared_document = False

        if remaining_links == 0:
            db.delete(document)
            db.flush()
            self.storage.delete_file(file_path=document.file_path)
            deleted_shared_document = True

        return {
            "filename": document.filename,
            "deleted_shared_document": deleted_shared_document,
        }

    def resolve_relevant_document_hashes(self, db: Session, *, user: User, query: str) -> list[str]:
        docs = self.list_user_documents(db, user)
        if not docs:
            return []

        # Send all documents to LLM for semantic matching
        matched_hashes = self._llm_filter_documents(query=query, candidates=docs)
        print("Documents Matched ----->", matched_hashes)
        return matched_hashes

    def _llm_filter_documents(self, *, query: str, candidates: list[Document]) -> list[str]:
        if not self.settings.groq_api_key or not candidates:
            return []
        if self.matcher_llm is None:
            self.matcher_llm = ChatGroq(api_key=self.settings.groq_api_key, model=self.settings.model_name, temperature=0)

        payload = []
        for doc in candidates:
            payload.append(
                {
                    "file_hash": doc.file_hash,
                    "filename": doc.filename,
                    "summary": (doc.summary or "")[:1000],
                    "preview": (doc.extracted_preview or "")[:1200],
                }
            )

        prompt = (
            "You are a document relevance filter. Analyze the user query and select ONLY the truly relevant documents.\n"
            "Consider semantic similarity, topic alignment, and document purpose.\n\n"
            "IMPORTANT: Only include documents that are actually relevant to answering the query.\n"
            "It's better to return fewer relevant documents than to include irrelevant ones.\n"
            "Return ONLY a valid JSON array of relevant file hashes, for example:\n"
            '["<hash1>", "<hash2>"]\n\n'
            f"User query: {query}\n\n"
            f"Available documents:\n{json.dumps(payload, ensure_ascii=True, indent=2)}"
        )
        try:
            response = self.matcher_llm.invoke(prompt)
            content = response.content if isinstance(response.content, str) else str(response.content)
            
            # Handle markdown code blocks
            if "```json" in content:
                content = content.split("```json")[1].split("```")[0].strip()
            elif "```" in content:
                content = content.split("```")[1].split("```")[0].strip()
            
            data = json.loads(content)
            hashes = data if isinstance(data, list) else []
            valid = {item.get("file_hash", "") for item in payload}
            return [value for value in hashes if isinstance(value, str) and value in valid]
        except Exception:
            return [doc.file_hash for doc in candidates]

    def ensure_page_metadata_for_user(self, *, db: Session, user: User) -> None:
        docs = self.list_user_documents(db, user)
        changed = False
        for doc in docs:
            needs_page_reindex = db.scalar(
                select(DocumentChunk.id)
                .where(DocumentChunk.document_id == doc.id, DocumentChunk.page_number.is_(None))
                .limit(1)
            )
            if not needs_page_reindex:
                continue
            try:
                content_bytes = self.storage.read_file_bytes(file_path=doc.file_path)
            except Exception:
                continue
            full_pages, _ = extract_pdf_pages_from_bytes(content_bytes)
            self.vector_store.add_document(
                db=db,
                document_id=doc.id,
                file_hash=doc.file_hash,
                filename=doc.filename,
                pages=full_pages,
            )
            changed = True
        if changed:
            db.commit()

    def _summarize_preview(self, preview_text: str, filename: str) -> str:
        if not preview_text.strip():
            return f"No text could be extracted from the first pages of {filename}."
        if not self.settings.groq_api_key:
            return preview_text[:1200]
        if self.summarizer is None:
            self.summarizer = ChatGroq(api_key=self.settings.groq_api_key, model=self.settings.model_name, temperature=0)
        prompt = (
            "Summarize the following document preview in 6-8 concise bullet-style sentences. "
            "Focus on purpose, key topics, and likely use cases.\n\n"
            f"Filename: {filename}\n\nPreview:\n{preview_text[:16000]}"
        )
        response = self.summarizer.invoke(prompt)
        return response.content if isinstance(response.content, str) else str(response.content)