USAMA BHATTI
Feat: Added Visual Search, API Key Auth, and Docker Optimization
ba2fc46
# # backend/src/services/chat_service.py
# import json
# from sqlalchemy.ext.asyncio import AsyncSession
# from sqlalchemy.future import select
# # --- Model Imports ---
# from backend.src.models.chat import ChatHistory
# from backend.src.models.integration import UserIntegration
# from backend.src.models.user import User
# # --- Dynamic Factory & Tool Imports ---
# from backend.src.services.llm.factory import get_llm_model
# from backend.src.services.vector_store.qdrant_adapter import get_vector_store
# from backend.src.services.security.pii_scrubber import PIIScrubber
# # --- Agents ---
# from backend.src.services.tools.secure_agent import get_secure_agent
# from backend.src.services.tools.nosql_agent import get_nosql_agent
# from backend.src.services.tools.cms_agent import get_cms_agent
# # --- Router ---
# from backend.src.services.routing.semantic_router import SemanticRouter
# # --- LangChain Core ---
# from langchain_core.messages import HumanMessage, AIMessage
# from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
# # ==========================================
# # HELPER FUNCTIONS
# # ==========================================
# async def get_user_integrations(user_id: str, db: AsyncSession) -> dict:
# """Fetches active integrations and filters valid descriptions."""
# if not user_id: return {}
# query = select(UserIntegration).where(UserIntegration.user_id == user_id, UserIntegration.is_active == True)
# result = await db.execute(query)
# integrations = result.scalars().all()
# settings = {}
# for i in integrations:
# try:
# creds = json.loads(i.credentials)
# creds['provider'] = i.provider
# creds['schema_map'] = i.schema_map if i.schema_map else {}
# if i.profile_description:
# creds['description'] = i.profile_description
# settings[i.provider] = creds
# except (json.JSONDecodeError, TypeError):
# continue
# return settings
# async def save_chat_to_db(db: AsyncSession, session_id: str, human_msg: str, ai_msg: str, provider: str):
# """Saves chat history with PII redaction."""
# if not session_id: return
# safe_human = PIIScrubber.scrub(human_msg)
# safe_ai = PIIScrubber.scrub(ai_msg)
# new_chat = ChatHistory(
# session_id=session_id, human_message=safe_human, ai_message=safe_ai, provider=provider
# )
# db.add(new_chat)
# await db.commit()
# async def get_chat_history(session_id: str, db: AsyncSession):
# """Retrieves past conversation history."""
# if not session_id: return []
# query = select(ChatHistory).where(ChatHistory.session_id == session_id).order_by(ChatHistory.timestamp.asc())
# result = await db.execute(query)
# return result.scalars().all()
# async def get_bot_persona(user_id: str, db: AsyncSession):
# """Fetches custom Bot Name and Instructions from User table."""
# try:
# stmt = select(User).where(User.id == int(user_id))
# result = await db.execute(stmt)
# user = result.scalars().first()
# if user:
# return {
# "name": getattr(user, "bot_name", "OmniAgent"),
# "instruction": getattr(user, "bot_instruction", "You are a helpful AI assistant.")
# }
# except Exception as e:
# print(f"⚠️ Error fetching persona: {e}")
# pass
# return {"name": "OmniAgent", "instruction": "You are a helpful AI assistant."}
# # ==========================================
# # MAIN CHAT LOGIC (Ultra-Strict Isolated Mode)
# # ==========================================
# async def process_chat(message: str, session_id: str, user_id: str, db: AsyncSession):
# # 1. Fetch User Settings & Persona
# user_settings = await get_user_integrations(user_id, db)
# bot_persona = await get_bot_persona(user_id, db)
# # 2. LLM Check
# llm_creds = user_settings.get('groq') or user_settings.get('openai')
# if not llm_creds:
# return "Please configure your AI Model in Settings."
# # 3. Build Tool Map for Router
# tools_map = {}
# for provider, config in user_settings.items():
# if provider in ['sanity', 'sql', 'mongodb']:
# if config.get('description'):
# tools_map[provider] = config['description']
# # 4. SEMANTIC DECISION (Router)
# selected_provider = None
# if tools_map:
# router = SemanticRouter()
# selected_provider = router.route(message, tools_map)
# response_text = ""
# provider_name = "general_chat"
# # 5. Route to Winner (Agent Execution)
# if selected_provider:
# print(f"πŸ‘‰ [Router] Selected Tool: {selected_provider.upper()}")
# try:
# if selected_provider == 'sanity':
# schema = user_settings['sanity'].get('schema_map', {})
# agent = get_cms_agent(user_id=user_id, schema_map=schema, llm_credentials=llm_creds)
# res = await agent.ainvoke({"input": message})
# response_text = str(res.get('output', ''))
# provider_name = "cms_agent"
# elif selected_provider == 'sql':
# role = "admin" if user_id == '99' else "customer"
# agent = get_secure_agent(int(user_id), role, user_settings['sql'], llm_credentials=llm_creds)
# res = await agent.ainvoke({"input": message})
# response_text = str(res.get('output', ''))
# provider_name = "sql_agent"
# elif selected_provider == 'mongodb':
# agent = get_nosql_agent(user_id, user_settings['mongodb'], llm_credentials=llm_creds)
# res = await agent.ainvoke({"input": message})
# response_text = str(res.get('output', ''))
# provider_name = "nosql_agent"
# if not response_text or "error" in response_text.lower():
# response_text = ""
# except Exception as e:
# print(f"❌ Agent Execution Failed: {e}")
# response_text = ""
# # 6. Fallback / RAG (ULTRA-STRICT MODE πŸ›‘οΈ)
# if not response_text:
# print("πŸ‘‰ [Router] Executing Strict RAG Fallback...")
# try:
# llm = get_llm_model(credentials=llm_creds)
# # Context from Vector DB
# context = ""
# if 'qdrant' in user_settings:
# try:
# vector_store = get_vector_store(credentials=user_settings['qdrant'])
# docs = await vector_store.asimilarity_search(message, k=3)
# if docs:
# context = "\n\n".join([d.page_content for d in docs])
# except Exception as e:
# print(f"⚠️ RAG Warning: {e}")
# # --- πŸ”₯ THE ULTRA-STRICT SYSTEM PROMPT ---
# system_instruction = f"""
# SYSTEM IDENTITY:
# You are the '{bot_persona['name']}'. You are a 'Knowledge-Isolated' AI Assistant for this specific platform.
# CORE MISSION:
# Your ONLY source of truth is the 'CONTEXT FROM KNOWLEDGE BASE' provided below.
# You must ignore ALL of your internal pre-trained general knowledge about the world, geography, famous people, or general facts.
# STRICT OPERATING RULES:
# 1. MANDATORY REFUSAL: If the user's question cannot be answered using ONLY the provided context, you MUST exactly say: "I apologize, but I am only authorized to provide information based on the provided database. This specific information is not currently available in my knowledge base."
# 2. NO HALLUCINATION: Never attempt to be helpful using outside information. If a fact (like 'Japan's location') is not in the context, you do NOT know it.
# 3. CONTEXT-ONLY: Your existence is bounded by the data below. If the data is empty, you cannot answer anything except greetings.
# 4. GREETINGS: You may respond to 'Hi' or 'Hello' by briefly identifying yourself as '{bot_persona['name']}' and asking what data the user is looking for.
# 5. PROHIBITED TOPICS: Do not discuss any topic that is not present in the provided context.
# CONTEXT FROM KNOWLEDGE BASE:
# ---------------------------
# {context if context else "THE DATABASE IS CURRENTLY EMPTY. DO NOT PROVIDE ANY INFORMATION."}
# ---------------------------
# """
# # History Load
# history = await get_chat_history(session_id, db)
# formatted_history = []
# for chat in history:
# formatted_history.append(HumanMessage(content=chat.human_message))
# if chat.ai_message: formatted_history.append(AIMessage(content=chat.ai_message))
# # LLM Chain Setup
# prompt = ChatPromptTemplate.from_messages([
# ("system", system_instruction),
# MessagesPlaceholder(variable_name="chat_history"),
# ("human", "{question}")
# ])
# chain = prompt | llm
# ai_response = await chain.ainvoke({"chat_history": formatted_history, "question": message})
# response_text = ai_response.content
# provider_name = "rag_fallback"
# except Exception as e:
# print(f"❌ Fallback Error: {e}")
# response_text = "I apologize, but I am currently unable to process your request due to a system error."
# # 7. Save to DB
# await save_chat_to_db(db, session_id, message, response_text, provider_name)
# return response_text
import json
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.future import select
from qdrant_client.http import models # <--- NEW IMPORT (Filter ke liye)
# --- Model Imports ---
from backend.src.models.chat import ChatHistory
from backend.src.models.integration import UserIntegration
from backend.src.models.user import User
# --- Dynamic Factory & Tool Imports ---
from backend.src.services.llm.factory import get_llm_model
from backend.src.services.vector_store.qdrant_adapter import get_vector_store
from backend.src.services.security.pii_scrubber import PIIScrubber
# --- Agents ---
from backend.src.services.tools.secure_agent import get_secure_agent
from backend.src.services.tools.nosql_agent import get_nosql_agent
from backend.src.services.tools.cms_agent import get_cms_agent
# --- Router ---
from backend.src.services.routing.semantic_router import SemanticRouter
# --- LangChain Core ---
from langchain_core.messages import HumanMessage, AIMessage
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
# ==========================================
# HELPER FUNCTIONS
# ==========================================
async def get_user_integrations(user_id: str, db: AsyncSession) -> dict:
"""Fetches active integrations and filters valid descriptions."""
if not user_id: return {}
query = select(UserIntegration).where(UserIntegration.user_id == user_id, UserIntegration.is_active == True)
result = await db.execute(query)
integrations = result.scalars().all()
settings = {}
for i in integrations:
try:
creds = json.loads(i.credentials)
creds['provider'] = i.provider
creds['schema_map'] = i.schema_map if i.schema_map else {}
if i.profile_description:
creds['description'] = i.profile_description
settings[i.provider] = creds
except (json.JSONDecodeError, TypeError):
continue
return settings
async def save_chat_to_db(db: AsyncSession, session_id: str, human_msg: str, ai_msg: str, provider: str):
"""Saves chat history with PII redaction."""
if not session_id: return
safe_human = PIIScrubber.scrub(human_msg)
safe_ai = PIIScrubber.scrub(ai_msg)
new_chat = ChatHistory(
session_id=session_id, human_message=safe_human, ai_message=safe_ai, provider=provider
)
db.add(new_chat)
await db.commit()
async def get_chat_history(session_id: str, db: AsyncSession):
"""Retrieves past conversation history."""
if not session_id: return []
query = select(ChatHistory).where(ChatHistory.session_id == session_id).order_by(ChatHistory.timestamp.asc())
result = await db.execute(query)
return result.scalars().all()
async def get_bot_persona(user_id: str, db: AsyncSession):
"""Fetches custom Bot Name and Instructions from User table."""
try:
stmt = select(User).where(User.id == int(user_id))
result = await db.execute(stmt)
user = result.scalars().first()
if user:
return {
"name": getattr(user, "bot_name", "OmniAgent"),
"instruction": getattr(user, "bot_instruction", "You are a helpful AI assistant.")
}
except Exception as e:
print(f"⚠️ Error fetching persona: {e}")
pass
return {"name": "OmniAgent", "instruction": "You are a helpful AI assistant."}
# ==========================================
# MAIN CHAT LOGIC (Ultra-Strict Isolated Mode)
# ==========================================
async def process_chat(message: str, session_id: str, user_id: str, db: AsyncSession):
# 1. Fetch User Settings & Persona
user_settings = await get_user_integrations(user_id, db)
bot_persona = await get_bot_persona(user_id, db)
# 2. LLM Check
llm_creds = user_settings.get('groq') or user_settings.get('openai')
if not llm_creds:
return "Please configure your AI Model in Settings."
# 3. Build Tool Map for Router
tools_map = {}
for provider, config in user_settings.items():
if provider in ['sanity', 'sql', 'mongodb']:
if config.get('description'):
tools_map[provider] = config['description']
# 4. SEMANTIC DECISION (Router)
selected_provider = None
if tools_map:
router = SemanticRouter()
selected_provider = router.route(message, tools_map)
response_text = ""
provider_name = "general_chat"
# 5. Route to Winner (Agent Execution)
if selected_provider:
print(f"πŸ‘‰ [Router] Selected Tool: {selected_provider.upper()}")
try:
if selected_provider == 'sanity':
schema = user_settings['sanity'].get('schema_map', {})
agent = get_cms_agent(user_id=user_id, schema_map=schema, llm_credentials=llm_creds)
res = await agent.ainvoke({"input": message})
response_text = str(res.get('output', ''))
provider_name = "cms_agent"
elif selected_provider == 'sql':
role = "admin" if user_id == '99' else "customer"
agent = get_secure_agent(int(user_id), role, user_settings['sql'], llm_credentials=llm_creds)
res = await agent.ainvoke({"input": message})
response_text = str(res.get('output', ''))
provider_name = "sql_agent"
elif selected_provider == 'mongodb':
agent = get_nosql_agent(user_id, user_settings['mongodb'], llm_credentials=llm_creds)
res = await agent.ainvoke({"input": message})
response_text = str(res.get('output', ''))
provider_name = "nosql_agent"
if not response_text or "error" in response_text.lower():
response_text = ""
except Exception as e:
print(f"❌ Agent Execution Failed: {e}")
response_text = ""
# 6. Fallback / RAG (ULTRA-STRICT MODE πŸ›‘οΈ)
if not response_text:
print("πŸ‘‰ [Router] Executing Strict RAG Fallback...")
try:
llm = get_llm_model(credentials=llm_creds)
# Context from Vector DB
context = ""
if 'qdrant' in user_settings:
try:
vector_store = get_vector_store(credentials=user_settings['qdrant'])
# πŸ”₯ SECURITY FIX: FILTER BY USER_ID πŸ”₯
# Hum ensure kar rahe hain ke LangChain sirf ISI USER ka data uthaye.
# QdrantAdapter mein humne metadata_payload_key="metadata" set kiya tha.
# Isliye key "metadata.user_id" hogi.
user_filter = models.Filter(
must=[
models.FieldCondition(
key="metadata.user_id",
match=models.MatchValue(value=str(user_id))
)
]
)
# Ab search mein filter pass karein
docs = await vector_store.asimilarity_search(
message,
k=3,
filter=user_filter
)
if docs:
context = "\n\n".join([d.page_content for d in docs])
except Exception as e:
print(f"⚠️ RAG Warning: {e}")
# --- πŸ”₯ THE ULTRA-STRICT SYSTEM PROMPT ---
system_instruction = f"""
SYSTEM IDENTITY:
You are the '{bot_persona['name']}'. You are a 'Knowledge-Isolated' AI Assistant for this specific platform.
CORE MISSION:
Your ONLY source of truth is the 'CONTEXT FROM KNOWLEDGE BASE' provided below.
You must ignore ALL of your internal pre-trained general knowledge about the world, geography, famous people, or general facts.
STRICT OPERATING RULES:
1. MANDATORY REFUSAL: If the user's question cannot be answered using ONLY the provided context, you MUST exactly say: "I apologize, but I am only authorized to provide information based on the provided database. This specific information is not currently available in my knowledge base."
2. NO HALLUCINATION: Never attempt to be helpful using outside information. If a fact (like 'Japan's location') is not in the context, you do NOT know it.
3. CONTEXT-ONLY: Your existence is bounded by the data below. If the data is empty, you cannot answer anything except greetings.
4. GREETINGS: You may respond to 'Hi' or 'Hello' by briefly identifying yourself as '{bot_persona['name']}' and asking what data the user is looking for.
5. PROHIBITED TOPICS: Do not discuss any topic that is not present in the provided context.
CONTEXT FROM KNOWLEDGE BASE:
---------------------------
{context if context else "THE DATABASE IS CURRENTLY EMPTY. DO NOT PROVIDE ANY INFORMATION."}
---------------------------
"""
# History Load
history = await get_chat_history(session_id, db)
formatted_history = []
for chat in history:
formatted_history.append(HumanMessage(content=chat.human_message))
if chat.ai_message: formatted_history.append(AIMessage(content=chat.ai_message))
# LLM Chain Setup
prompt = ChatPromptTemplate.from_messages([
("system", system_instruction),
MessagesPlaceholder(variable_name="chat_history"),
("human", "{question}")
])
chain = prompt | llm
ai_response = await chain.ainvoke({"chat_history": formatted_history, "question": message})
response_text = ai_response.content
provider_name = "rag_fallback"
except Exception as e:
print(f"❌ Fallback Error: {e}")
response_text = "I apologize, but I am currently unable to process your request due to a system error."
# 7. Save to DB
await save_chat_to_db(db, session_id, message, response_text, provider_name)
return response_text