Spaces:
Sleeping
Sleeping
| # # backend/src/services/chat_service.py | |
| # import json | |
| # from sqlalchemy.ext.asyncio import AsyncSession | |
| # from sqlalchemy.future import select | |
| # # --- Model Imports --- | |
| # from backend.src.models.chat import ChatHistory | |
| # from backend.src.models.integration import UserIntegration | |
| # from backend.src.models.user import User | |
| # # --- Dynamic Factory & Tool Imports --- | |
| # from backend.src.services.llm.factory import get_llm_model | |
| # from backend.src.services.vector_store.qdrant_adapter import get_vector_store | |
| # from backend.src.services.security.pii_scrubber import PIIScrubber | |
| # # --- Agents --- | |
| # from backend.src.services.tools.secure_agent import get_secure_agent | |
| # from backend.src.services.tools.nosql_agent import get_nosql_agent | |
| # from backend.src.services.tools.cms_agent import get_cms_agent | |
| # # --- Router --- | |
| # from backend.src.services.routing.semantic_router import SemanticRouter | |
| # # --- LangChain Core --- | |
| # from langchain_core.messages import HumanMessage, AIMessage | |
| # from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder | |
| # # ========================================== | |
| # # HELPER FUNCTIONS | |
| # # ========================================== | |
| # async def get_user_integrations(user_id: str, db: AsyncSession) -> dict: | |
| # """Fetches active integrations and filters valid descriptions.""" | |
| # if not user_id: return {} | |
| # query = select(UserIntegration).where(UserIntegration.user_id == user_id, UserIntegration.is_active == True) | |
| # result = await db.execute(query) | |
| # integrations = result.scalars().all() | |
| # settings = {} | |
| # for i in integrations: | |
| # try: | |
| # creds = json.loads(i.credentials) | |
| # creds['provider'] = i.provider | |
| # creds['schema_map'] = i.schema_map if i.schema_map else {} | |
| # if i.profile_description: | |
| # creds['description'] = i.profile_description | |
| # settings[i.provider] = creds | |
| # except (json.JSONDecodeError, TypeError): | |
| # continue | |
| # return settings | |
| # async def save_chat_to_db(db: AsyncSession, session_id: str, human_msg: str, ai_msg: str, provider: str): | |
| # """Saves chat history with PII redaction.""" | |
| # if not session_id: return | |
| # safe_human = PIIScrubber.scrub(human_msg) | |
| # safe_ai = PIIScrubber.scrub(ai_msg) | |
| # new_chat = ChatHistory( | |
| # session_id=session_id, human_message=safe_human, ai_message=safe_ai, provider=provider | |
| # ) | |
| # db.add(new_chat) | |
| # await db.commit() | |
| # async def get_chat_history(session_id: str, db: AsyncSession): | |
| # """Retrieves past conversation history.""" | |
| # if not session_id: return [] | |
| # query = select(ChatHistory).where(ChatHistory.session_id == session_id).order_by(ChatHistory.timestamp.asc()) | |
| # result = await db.execute(query) | |
| # return result.scalars().all() | |
| # async def get_bot_persona(user_id: str, db: AsyncSession): | |
| # """Fetches custom Bot Name and Instructions from User table.""" | |
| # try: | |
| # stmt = select(User).where(User.id == int(user_id)) | |
| # result = await db.execute(stmt) | |
| # user = result.scalars().first() | |
| # if user: | |
| # return { | |
| # "name": getattr(user, "bot_name", "OmniAgent"), | |
| # "instruction": getattr(user, "bot_instruction", "You are a helpful AI assistant.") | |
| # } | |
| # except Exception as e: | |
| # print(f"β οΈ Error fetching persona: {e}") | |
| # pass | |
| # return {"name": "OmniAgent", "instruction": "You are a helpful AI assistant."} | |
| # # ========================================== | |
| # # MAIN CHAT LOGIC (Ultra-Strict Isolated Mode) | |
| # # ========================================== | |
| # async def process_chat(message: str, session_id: str, user_id: str, db: AsyncSession): | |
| # # 1. Fetch User Settings & Persona | |
| # user_settings = await get_user_integrations(user_id, db) | |
| # bot_persona = await get_bot_persona(user_id, db) | |
| # # 2. LLM Check | |
| # llm_creds = user_settings.get('groq') or user_settings.get('openai') | |
| # if not llm_creds: | |
| # return "Please configure your AI Model in Settings." | |
| # # 3. Build Tool Map for Router | |
| # tools_map = {} | |
| # for provider, config in user_settings.items(): | |
| # if provider in ['sanity', 'sql', 'mongodb']: | |
| # if config.get('description'): | |
| # tools_map[provider] = config['description'] | |
| # # 4. SEMANTIC DECISION (Router) | |
| # selected_provider = None | |
| # if tools_map: | |
| # router = SemanticRouter() | |
| # selected_provider = router.route(message, tools_map) | |
| # response_text = "" | |
| # provider_name = "general_chat" | |
| # # 5. Route to Winner (Agent Execution) | |
| # if selected_provider: | |
| # print(f"π [Router] Selected Tool: {selected_provider.upper()}") | |
| # try: | |
| # if selected_provider == 'sanity': | |
| # schema = user_settings['sanity'].get('schema_map', {}) | |
| # agent = get_cms_agent(user_id=user_id, schema_map=schema, llm_credentials=llm_creds) | |
| # res = await agent.ainvoke({"input": message}) | |
| # response_text = str(res.get('output', '')) | |
| # provider_name = "cms_agent" | |
| # elif selected_provider == 'sql': | |
| # role = "admin" if user_id == '99' else "customer" | |
| # agent = get_secure_agent(int(user_id), role, user_settings['sql'], llm_credentials=llm_creds) | |
| # res = await agent.ainvoke({"input": message}) | |
| # response_text = str(res.get('output', '')) | |
| # provider_name = "sql_agent" | |
| # elif selected_provider == 'mongodb': | |
| # agent = get_nosql_agent(user_id, user_settings['mongodb'], llm_credentials=llm_creds) | |
| # res = await agent.ainvoke({"input": message}) | |
| # response_text = str(res.get('output', '')) | |
| # provider_name = "nosql_agent" | |
| # if not response_text or "error" in response_text.lower(): | |
| # response_text = "" | |
| # except Exception as e: | |
| # print(f"β Agent Execution Failed: {e}") | |
| # response_text = "" | |
| # # 6. Fallback / RAG (ULTRA-STRICT MODE π‘οΈ) | |
| # if not response_text: | |
| # print("π [Router] Executing Strict RAG Fallback...") | |
| # try: | |
| # llm = get_llm_model(credentials=llm_creds) | |
| # # Context from Vector DB | |
| # context = "" | |
| # if 'qdrant' in user_settings: | |
| # try: | |
| # vector_store = get_vector_store(credentials=user_settings['qdrant']) | |
| # docs = await vector_store.asimilarity_search(message, k=3) | |
| # if docs: | |
| # context = "\n\n".join([d.page_content for d in docs]) | |
| # except Exception as e: | |
| # print(f"β οΈ RAG Warning: {e}") | |
| # # --- π₯ THE ULTRA-STRICT SYSTEM PROMPT --- | |
| # system_instruction = f""" | |
| # SYSTEM IDENTITY: | |
| # You are the '{bot_persona['name']}'. You are a 'Knowledge-Isolated' AI Assistant for this specific platform. | |
| # CORE MISSION: | |
| # Your ONLY source of truth is the 'CONTEXT FROM KNOWLEDGE BASE' provided below. | |
| # You must ignore ALL of your internal pre-trained general knowledge about the world, geography, famous people, or general facts. | |
| # STRICT OPERATING RULES: | |
| # 1. MANDATORY REFUSAL: If the user's question cannot be answered using ONLY the provided context, you MUST exactly say: "I apologize, but I am only authorized to provide information based on the provided database. This specific information is not currently available in my knowledge base." | |
| # 2. NO HALLUCINATION: Never attempt to be helpful using outside information. If a fact (like 'Japan's location') is not in the context, you do NOT know it. | |
| # 3. CONTEXT-ONLY: Your existence is bounded by the data below. If the data is empty, you cannot answer anything except greetings. | |
| # 4. GREETINGS: You may respond to 'Hi' or 'Hello' by briefly identifying yourself as '{bot_persona['name']}' and asking what data the user is looking for. | |
| # 5. PROHIBITED TOPICS: Do not discuss any topic that is not present in the provided context. | |
| # CONTEXT FROM KNOWLEDGE BASE: | |
| # --------------------------- | |
| # {context if context else "THE DATABASE IS CURRENTLY EMPTY. DO NOT PROVIDE ANY INFORMATION."} | |
| # --------------------------- | |
| # """ | |
| # # History Load | |
| # history = await get_chat_history(session_id, db) | |
| # formatted_history = [] | |
| # for chat in history: | |
| # formatted_history.append(HumanMessage(content=chat.human_message)) | |
| # if chat.ai_message: formatted_history.append(AIMessage(content=chat.ai_message)) | |
| # # LLM Chain Setup | |
| # prompt = ChatPromptTemplate.from_messages([ | |
| # ("system", system_instruction), | |
| # MessagesPlaceholder(variable_name="chat_history"), | |
| # ("human", "{question}") | |
| # ]) | |
| # chain = prompt | llm | |
| # ai_response = await chain.ainvoke({"chat_history": formatted_history, "question": message}) | |
| # response_text = ai_response.content | |
| # provider_name = "rag_fallback" | |
| # except Exception as e: | |
| # print(f"β Fallback Error: {e}") | |
| # response_text = "I apologize, but I am currently unable to process your request due to a system error." | |
| # # 7. Save to DB | |
| # await save_chat_to_db(db, session_id, message, response_text, provider_name) | |
| # return response_text | |
| import json | |
| from sqlalchemy.ext.asyncio import AsyncSession | |
| from sqlalchemy.future import select | |
| from qdrant_client.http import models # <--- NEW IMPORT (Filter ke liye) | |
| # --- Model Imports --- | |
| from backend.src.models.chat import ChatHistory | |
| from backend.src.models.integration import UserIntegration | |
| from backend.src.models.user import User | |
| # --- Dynamic Factory & Tool Imports --- | |
| from backend.src.services.llm.factory import get_llm_model | |
| from backend.src.services.vector_store.qdrant_adapter import get_vector_store | |
| from backend.src.services.security.pii_scrubber import PIIScrubber | |
| # --- Agents --- | |
| from backend.src.services.tools.secure_agent import get_secure_agent | |
| from backend.src.services.tools.nosql_agent import get_nosql_agent | |
| from backend.src.services.tools.cms_agent import get_cms_agent | |
| # --- Router --- | |
| from backend.src.services.routing.semantic_router import SemanticRouter | |
| # --- LangChain Core --- | |
| from langchain_core.messages import HumanMessage, AIMessage | |
| from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder | |
| # ========================================== | |
| # HELPER FUNCTIONS | |
| # ========================================== | |
| async def get_user_integrations(user_id: str, db: AsyncSession) -> dict: | |
| """Fetches active integrations and filters valid descriptions.""" | |
| if not user_id: return {} | |
| query = select(UserIntegration).where(UserIntegration.user_id == user_id, UserIntegration.is_active == True) | |
| result = await db.execute(query) | |
| integrations = result.scalars().all() | |
| settings = {} | |
| for i in integrations: | |
| try: | |
| creds = json.loads(i.credentials) | |
| creds['provider'] = i.provider | |
| creds['schema_map'] = i.schema_map if i.schema_map else {} | |
| if i.profile_description: | |
| creds['description'] = i.profile_description | |
| settings[i.provider] = creds | |
| except (json.JSONDecodeError, TypeError): | |
| continue | |
| return settings | |
| async def save_chat_to_db(db: AsyncSession, session_id: str, human_msg: str, ai_msg: str, provider: str): | |
| """Saves chat history with PII redaction.""" | |
| if not session_id: return | |
| safe_human = PIIScrubber.scrub(human_msg) | |
| safe_ai = PIIScrubber.scrub(ai_msg) | |
| new_chat = ChatHistory( | |
| session_id=session_id, human_message=safe_human, ai_message=safe_ai, provider=provider | |
| ) | |
| db.add(new_chat) | |
| await db.commit() | |
| async def get_chat_history(session_id: str, db: AsyncSession): | |
| """Retrieves past conversation history.""" | |
| if not session_id: return [] | |
| query = select(ChatHistory).where(ChatHistory.session_id == session_id).order_by(ChatHistory.timestamp.asc()) | |
| result = await db.execute(query) | |
| return result.scalars().all() | |
| async def get_bot_persona(user_id: str, db: AsyncSession): | |
| """Fetches custom Bot Name and Instructions from User table.""" | |
| try: | |
| stmt = select(User).where(User.id == int(user_id)) | |
| result = await db.execute(stmt) | |
| user = result.scalars().first() | |
| if user: | |
| return { | |
| "name": getattr(user, "bot_name", "OmniAgent"), | |
| "instruction": getattr(user, "bot_instruction", "You are a helpful AI assistant.") | |
| } | |
| except Exception as e: | |
| print(f"β οΈ Error fetching persona: {e}") | |
| pass | |
| return {"name": "OmniAgent", "instruction": "You are a helpful AI assistant."} | |
| # ========================================== | |
| # MAIN CHAT LOGIC (Ultra-Strict Isolated Mode) | |
| # ========================================== | |
| async def process_chat(message: str, session_id: str, user_id: str, db: AsyncSession): | |
| # 1. Fetch User Settings & Persona | |
| user_settings = await get_user_integrations(user_id, db) | |
| bot_persona = await get_bot_persona(user_id, db) | |
| # 2. LLM Check | |
| llm_creds = user_settings.get('groq') or user_settings.get('openai') | |
| if not llm_creds: | |
| return "Please configure your AI Model in Settings." | |
| # 3. Build Tool Map for Router | |
| tools_map = {} | |
| for provider, config in user_settings.items(): | |
| if provider in ['sanity', 'sql', 'mongodb']: | |
| if config.get('description'): | |
| tools_map[provider] = config['description'] | |
| # 4. SEMANTIC DECISION (Router) | |
| selected_provider = None | |
| if tools_map: | |
| router = SemanticRouter() | |
| selected_provider = router.route(message, tools_map) | |
| response_text = "" | |
| provider_name = "general_chat" | |
| # 5. Route to Winner (Agent Execution) | |
| if selected_provider: | |
| print(f"π [Router] Selected Tool: {selected_provider.upper()}") | |
| try: | |
| if selected_provider == 'sanity': | |
| schema = user_settings['sanity'].get('schema_map', {}) | |
| agent = get_cms_agent(user_id=user_id, schema_map=schema, llm_credentials=llm_creds) | |
| res = await agent.ainvoke({"input": message}) | |
| response_text = str(res.get('output', '')) | |
| provider_name = "cms_agent" | |
| elif selected_provider == 'sql': | |
| role = "admin" if user_id == '99' else "customer" | |
| agent = get_secure_agent(int(user_id), role, user_settings['sql'], llm_credentials=llm_creds) | |
| res = await agent.ainvoke({"input": message}) | |
| response_text = str(res.get('output', '')) | |
| provider_name = "sql_agent" | |
| elif selected_provider == 'mongodb': | |
| agent = get_nosql_agent(user_id, user_settings['mongodb'], llm_credentials=llm_creds) | |
| res = await agent.ainvoke({"input": message}) | |
| response_text = str(res.get('output', '')) | |
| provider_name = "nosql_agent" | |
| if not response_text or "error" in response_text.lower(): | |
| response_text = "" | |
| except Exception as e: | |
| print(f"β Agent Execution Failed: {e}") | |
| response_text = "" | |
| # 6. Fallback / RAG (ULTRA-STRICT MODE π‘οΈ) | |
| if not response_text: | |
| print("π [Router] Executing Strict RAG Fallback...") | |
| try: | |
| llm = get_llm_model(credentials=llm_creds) | |
| # Context from Vector DB | |
| context = "" | |
| if 'qdrant' in user_settings: | |
| try: | |
| vector_store = get_vector_store(credentials=user_settings['qdrant']) | |
| # π₯ SECURITY FIX: FILTER BY USER_ID π₯ | |
| # Hum ensure kar rahe hain ke LangChain sirf ISI USER ka data uthaye. | |
| # QdrantAdapter mein humne metadata_payload_key="metadata" set kiya tha. | |
| # Isliye key "metadata.user_id" hogi. | |
| user_filter = models.Filter( | |
| must=[ | |
| models.FieldCondition( | |
| key="metadata.user_id", | |
| match=models.MatchValue(value=str(user_id)) | |
| ) | |
| ] | |
| ) | |
| # Ab search mein filter pass karein | |
| docs = await vector_store.asimilarity_search( | |
| message, | |
| k=3, | |
| filter=user_filter | |
| ) | |
| if docs: | |
| context = "\n\n".join([d.page_content for d in docs]) | |
| except Exception as e: | |
| print(f"β οΈ RAG Warning: {e}") | |
| # --- π₯ THE ULTRA-STRICT SYSTEM PROMPT --- | |
| system_instruction = f""" | |
| SYSTEM IDENTITY: | |
| You are the '{bot_persona['name']}'. You are a 'Knowledge-Isolated' AI Assistant for this specific platform. | |
| CORE MISSION: | |
| Your ONLY source of truth is the 'CONTEXT FROM KNOWLEDGE BASE' provided below. | |
| You must ignore ALL of your internal pre-trained general knowledge about the world, geography, famous people, or general facts. | |
| STRICT OPERATING RULES: | |
| 1. MANDATORY REFUSAL: If the user's question cannot be answered using ONLY the provided context, you MUST exactly say: "I apologize, but I am only authorized to provide information based on the provided database. This specific information is not currently available in my knowledge base." | |
| 2. NO HALLUCINATION: Never attempt to be helpful using outside information. If a fact (like 'Japan's location') is not in the context, you do NOT know it. | |
| 3. CONTEXT-ONLY: Your existence is bounded by the data below. If the data is empty, you cannot answer anything except greetings. | |
| 4. GREETINGS: You may respond to 'Hi' or 'Hello' by briefly identifying yourself as '{bot_persona['name']}' and asking what data the user is looking for. | |
| 5. PROHIBITED TOPICS: Do not discuss any topic that is not present in the provided context. | |
| CONTEXT FROM KNOWLEDGE BASE: | |
| --------------------------- | |
| {context if context else "THE DATABASE IS CURRENTLY EMPTY. DO NOT PROVIDE ANY INFORMATION."} | |
| --------------------------- | |
| """ | |
| # History Load | |
| history = await get_chat_history(session_id, db) | |
| formatted_history = [] | |
| for chat in history: | |
| formatted_history.append(HumanMessage(content=chat.human_message)) | |
| if chat.ai_message: formatted_history.append(AIMessage(content=chat.ai_message)) | |
| # LLM Chain Setup | |
| prompt = ChatPromptTemplate.from_messages([ | |
| ("system", system_instruction), | |
| MessagesPlaceholder(variable_name="chat_history"), | |
| ("human", "{question}") | |
| ]) | |
| chain = prompt | llm | |
| ai_response = await chain.ainvoke({"chat_history": formatted_history, "question": message}) | |
| response_text = ai_response.content | |
| provider_name = "rag_fallback" | |
| except Exception as e: | |
| print(f"β Fallback Error: {e}") | |
| response_text = "I apologize, but I am currently unable to process your request due to a system error." | |
| # 7. Save to DB | |
| await save_chat_to_db(db, session_id, message, response_text, provider_name) | |
| return response_text |