from llama_index.core import VectorStoreIndex, Document, Settings from llama_index.llms.groq import Groq from llama_index.embeddings.fastembed import FastEmbedEmbedding from llama_index.core.chat_engine import CondensePlusContextChatEngine from typing import Optional, AsyncGenerator, List from app.config import settings from app.models.schemas import SourceInfo import os class RAGService: def __init__(self): os.environ["GROQ_API_KEY"] = settings.groq_api_key self._llm_initialized = False self.index: Optional[VectorStoreIndex] = None self.chat_engine = None self.indexed_documents = [] def _initialize_llm(self): if not self._llm_initialized: Settings.llm = Groq(model="llama-3.1-8b-instant", api_key=settings.groq_api_key) # Use FastEmbed - lightweight embeddings optimized for low memory Settings.embed_model = FastEmbedEmbedding(model_name="BAAI/bge-small-en-v1.5") self._llm_initialized = True def create_index_from_text(self, text: str, source_name: str) -> None: self._initialize_llm() document = Document(text=text, metadata={"source": source_name}) self.indexed_documents.append(source_name) if self.index is None: self.index = VectorStoreIndex.from_documents([document]) else: self.index.insert(document) self.chat_engine = self.index.as_chat_engine( chat_mode="condense_plus_context", verbose=True ) def create_index_from_documents(self, documents: List[Document]) -> None: self._initialize_llm() for doc in documents: if "source" in doc.metadata: self.indexed_documents.append(doc.metadata["source"]) if self.index is None: self.index = VectorStoreIndex.from_documents(documents) else: for doc in documents: self.index.insert(doc) self.chat_engine = self.index.as_chat_engine( chat_mode="condense_plus_context", verbose=True ) async def stream_query(self, question: str) -> AsyncGenerator[str, None]: if self.chat_engine is None: raise ValueError("No documents indexed. Please upload a document first.") response = await self.chat_engine.astream_chat(question) async for token in response.async_response_gen(): yield token async def query(self, question: str) -> tuple[str, List[SourceInfo]]: if self.index is None: raise ValueError("No documents indexed. Please upload a document first.") query_engine = self.index.as_query_engine(similarity_top_k=3) response = await query_engine.aquery(question) sources = [] if hasattr(response, 'source_nodes'): for node in response.source_nodes: source_info = SourceInfo( file_name=node.metadata.get("source", "Unknown"), text=node.text[:300], score=node.score if hasattr(node, 'score') else None ) sources.append(source_info) return str(response), sources async def summarize(self, max_length: int = 500) -> str: if self.index is None: raise ValueError("No documents indexed. Please upload a document first.") query_engine = self.index.as_query_engine() summary_prompt = f"Provide a comprehensive summary of all the documents in approximately {max_length} words. Focus on the main ideas, key points, and important details." response = await query_engine.aquery(summary_prompt) return str(response) def reset_index(self) -> None: self.index = None self.chat_engine = None self.indexed_documents = [] def get_indexed_documents(self) -> List[str]: return self.indexed_documents def has_documents(self) -> bool: return self.index is not None