Spaces:
Sleeping
Sleeping
| """ | |
| Simplified RAG system - No language detection | |
| Passes query directly to Qdrant and LLM | |
| """ | |
| import os | |
| import re | |
| import logging | |
| from typing import List, Dict, Optional | |
| from qdrant_setup import QdrantSetup | |
| from embedding_generator import EmbeddingGenerator | |
| from llm_manager import get_llm | |
| from language_constants import ( | |
| get_supported_languages, | |
| is_language_supported, | |
| get_language_info, | |
| SUPPORTED_LANGUAGES, | |
| ) | |
| from langchain_core.output_parsers import StrOutputParser | |
| from qdrant_client.http import models | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| # Prompt for AI4Bharat IndicLLM - Creative writing for 11 Indic languages | |
| CREATIVE_PROMPT = """You are a creative writer inspired by Hindi literature and nature poetry. | |
| Context for inspiration: | |
| {context_str} | |
| Write a poem or short story about: {query} | |
| Language: {language} | |
| Write only your creative piece - no explanations or meta-commentary:""" | |
| UNSUPPORTED_LANG_PROMPT = """I apologize, but I only support the following languages: | |
| {supported_languages_list} | |
| Please try asking your question in one of these supported languages.""" | |
| class SimpleRAGSystem: | |
| def __init__(self, llm_provider=None, model_kwargs=None): | |
| """Initialize the simplified RAG system""" | |
| # Setup Qdrant client | |
| qdrant_setup = QdrantSetup() | |
| self.qdrant_client = qdrant_setup.get_client() | |
| self.collection_name = qdrant_setup.get_collection_name() | |
| # Setup embedding generator | |
| self.embedding_generator = EmbeddingGenerator() | |
| # Setup LLM | |
| if llm_provider or model_kwargs: | |
| self.llm = get_llm(provider=llm_provider, model_kwargs=model_kwargs) | |
| else: | |
| self.llm = get_llm() | |
| if self.llm is None: | |
| logger.error("LLM initialization failed. RAG system will have limited functionality.") | |
| self.supported_languages = get_supported_languages() | |
| self.supported_language_codes = set(lang["code"] for lang in self.supported_languages) | |
| def _get_supported_languages_list(self) -> str: | |
| """Get formatted list of supported languages""" | |
| return "\n".join([ | |
| f" - {lang['name']} ({lang['native_name']})" | |
| for lang in self.supported_languages | |
| ]) | |
| def retrieve_relevant_documents(self, query: str, top_k: int = 5) -> List[Dict]: | |
| """Retrieve relevant documents from Qdrant based on the query""" | |
| query_embedding = self.embedding_generator.get_embedding(query) | |
| search_result = self.qdrant_client.query_points( | |
| collection_name=self.collection_name, | |
| query=query_embedding, | |
| limit=top_k | |
| ) | |
| retrieved_docs = [] | |
| for result in search_result.points: | |
| payload = result.payload if result.payload is not None else {} | |
| doc = { | |
| "score": result.score or 0, | |
| "title": payload.get("title", "") if payload else "", | |
| "author": payload.get("author", "") if payload else "", | |
| "genre": payload.get("genre", "") if payload else "", | |
| "text": payload.get("full_text", "") if payload else "", | |
| "source_file": payload.get("source_file", "") if payload else "", | |
| } | |
| retrieved_docs.append(doc) | |
| return retrieved_docs | |
| def generate_answer(self, query: str, context_docs: List[Dict], user_selected_language: Optional[str] = None) -> str: | |
| """ | |
| Generate an answer based on the query and retrieved documents | |
| Args: | |
| query: User query | |
| context_docs: Retrieved context documents | |
| user_selected_language: Optional language code selected by user in frontend | |
| """ | |
| if self.llm is None: | |
| logger.error("LLM not initialized. Returning synthesized answer from documents.") | |
| return self._synthesize_answer(query, context_docs) | |
| # Use user-selected language or try to infer from query context | |
| # For simplicity, we just pass the query and let the LLM handle it | |
| # The prompt instructs the LLM to respond in the same language as the query | |
| # Send only top 3 documents to LLM | |
| limited_docs = context_docs[:3] if len(context_docs) > 3 else context_docs | |
| # Format context with truncation | |
| formatted_contexts = [] | |
| for i, doc in enumerate(limited_docs, 1): | |
| # Truncate text to 800 chars per doc | |
| text_snippet = doc["text"][:800] + "..." if len(doc["text"]) > 800 else doc["text"] | |
| formatted_context = f"[{i}] Title: {doc['title']}\nAuthor: {doc['author']}\nGenre: {doc['genre']}\nContent: {text_snippet}\nScore: {doc['score']:.3f}\n" | |
| formatted_contexts.append(formatted_context) | |
| context_str = "\n\n".join(formatted_contexts) | |
| # Determine language for response | |
| response_language = user_selected_language if user_selected_language else "the same language as the query" | |
| # Build prompt | |
| prompt_text = CREATIVE_PROMPT.format( | |
| context_str=context_str, | |
| query=query, | |
| language=response_language | |
| ) | |
| try: | |
| chain = self.llm | StrOutputParser() | |
| response = chain.invoke(prompt_text) | |
| if response and len(response.strip()) > 10: | |
| # Clean up the response - remove meta-commentary and informal text | |
| cleaned = self._clean_output(response.strip()) | |
| # Validate output is actual creative content | |
| if self._is_valid_creative_output(cleaned): | |
| return cleaned | |
| logger.warning("LLM returned empty or invalid response. Using document synthesis.") | |
| except Exception as e: | |
| logger.error(f"LLM generation failed: {e}") | |
| return self._synthesize_answer(query, context_docs) | |
| def _is_valid_creative_output(self, text: str) -> bool: | |
| """Check if output is valid creative writing vs prompt/instruction leakage""" | |
| if not text or len(text) < 30: | |
| return False | |
| text_lower = text.lower() | |
| # Reject if contains prompt structure markers | |
| if any(marker in text_lower for marker in ['# context', '# objective', '# style', '# tone', '# audience', '# response']): | |
| return False | |
| # Reject if contains instruction patterns | |
| if any(marker in text_lower for marker in ['do not', 'you are a', 'you will', 'your task', 'output only', 'no explanations']): | |
| return False | |
| # Reject if contains technical/code patterns | |
| if any(marker in text_lower for marker in ['package ', 'import ', 'public class', 'private void', 'string ', 'return ']): | |
| return False | |
| # Reject if too short or lacks creative structure | |
| lines = [l.strip() for l in text.split('\n') if l.strip() and len(l.strip()) > 15] | |
| return len(lines) >= 2 | |
| def _clean_output(self, text: str) -> str: | |
| """Clean up LLM output - remove artifacts and formatting""" | |
| if not text: | |
| return text | |
| # Remove artifacts | |
| text = re.sub(r'</?s>', '', text) | |
| text = re.sub(r'\[/?INST\]', '', text) | |
| text = re.sub(r'\[/?SYS\]', '', text) | |
| text = re.sub(r'\[\d+\]', '', text) | |
| text = re.sub(r'^-{3,}$', '', text, flags=re.MULTILINE) | |
| # Remove lines that are clearly not creative content | |
| lines = text.split('\n') | |
| cleaned = [] | |
| for line in lines: | |
| line = line.strip() | |
| line_lower = line.lower() | |
| if len(line) < 5: | |
| continue | |
| # Skip instruction-like, technical, or meta lines | |
| if any(marker in line_lower for marker in [ | |
| 'context:', 'question:', 'language:', | |
| 'package ', 'import ', 'public class', 'private void', | |
| '@author', 'serializable', 'gh_stars' | |
| ]): | |
| continue | |
| cleaned.append(line) | |
| result = '\n'.join(cleaned).strip() | |
| return result if result else text | |
| def _synthesize_answer(self, query: str, context_docs: List[Dict]) -> str: | |
| """Synthesize an answer from retrieved documents when LLM is unavailable""" | |
| if not context_docs: | |
| return f"Sorry, no relevant documents found for: '{query}'" | |
| synthesized_answer = f"Question: {query}\n\n" | |
| synthesized_answer += "Information from retrieved documents:\n\n" | |
| for i, doc in enumerate(context_docs[:3], 1): | |
| synthesized_answer += f"{i}. {doc['title']} - {doc['author']} (Score: {doc['score']:.3f})\n" | |
| text_preview = doc["text"][:500] + "..." if len(doc["text"]) > 500 else doc["text"] | |
| synthesized_answer += f" Summary: {text_preview}\n\n" | |
| synthesized_answer += "Information synthesized from the above documents." | |
| return synthesized_answer | |
| def query(self, question: str, top_k: int = 5, user_selected_language: Optional[str] = None) -> Dict: | |
| """ | |
| Main query method that retrieves documents and generates an answer | |
| Args: | |
| question: User question | |
| top_k: Number of documents to retrieve | |
| user_selected_language: Optional language code selected by user | |
| """ | |
| relevant_docs = self.retrieve_relevant_documents(question, top_k) | |
| answer = self.generate_answer(question, relevant_docs, user_selected_language) | |
| return { | |
| "question": question, | |
| "answer": answer, | |
| "user_selected_language": user_selected_language, | |
| "relevant_documents": relevant_docs, | |
| "supported_languages": self.supported_languages, | |
| } | |
| def get_supported_languages(self) -> List[Dict]: | |
| """Get list of supported languages""" | |
| return self.supported_languages | |