Madras1's picture
Upload 63 files
270c1c7 verified
"""
Chat Service - Intelligent chat with RAG capabilities
Uses local database + Lancer for comprehensive responses
"""
import httpx
from typing import Optional, List, Dict, Any
from sqlalchemy.orm import Session
from app.config import settings
from app.models.entity import Entity, Relationship
LANCER_URL = "https://madras1-lancer.hf.space/api/v1"
SYSTEM_PROMPT = """Você é um assistente de inteligência do NUMIDIUM.
Você tem acesso a um grafo de conhecimento com entidades e relacionamentos,
e pode pesquisar na web para informações atualizadas.
Responda em português brasileiro de forma clara e direta.
Se não tiver certeza, diga que não sabe em vez de inventar."""
class ChatService:
"""Chat service with RAG using local database and Lancer"""
def __init__(self):
self.api_url = "https://api.cerebras.ai/v1/chat/completions"
self.conversation_history: Dict[str, List[Dict[str, str]]] = {}
def _get_history(self, session_id: Optional[str]) -> List[Dict[str, str]]:
key = session_id or "default"
if key not in self.conversation_history:
self.conversation_history[key] = []
return self.conversation_history[key]
def clear_history(self, session_id: Optional[str] = None):
"""Clear conversation history"""
key = session_id or "default"
self.conversation_history.pop(key, None)
def _get_local_context(self, query: str, db: Session, limit: int = 5) -> str:
"""Get relevant entities from local database"""
# Search entities by name
entities = db.query(Entity).filter(
Entity.name.ilike(f"%{query}%")
).limit(limit).all()
# Also search by description
if len(entities) < limit:
desc_entities = db.query(Entity).filter(
Entity.description.ilike(f"%{query}%")
).limit(limit - len(entities)).all()
entities.extend(desc_entities)
if not entities:
# Try splitting query into words
words = query.split()
for word in words:
if len(word) > 3:
word_entities = db.query(Entity).filter(
Entity.name.ilike(f"%{word}%")
).limit(2).all()
entities.extend(word_entities)
if not entities:
return ""
context_parts = []
seen_ids = set()
for entity in entities:
if entity.id in seen_ids:
continue
seen_ids.add(entity.id)
ctx = f"• {entity.name} ({entity.type})"
if entity.description:
ctx += f": {entity.description[:200]}"
# Get relationships
relationships = db.query(Relationship).filter(
(Relationship.source_id == entity.id) |
(Relationship.target_id == entity.id)
).limit(5).all()
if relationships:
related = []
for rel in relationships:
if rel.source_id == entity.id:
target = db.query(Entity).filter(Entity.id == rel.target_id).first()
if target:
related.append(f"{rel.type}{target.name}")
else:
source = db.query(Entity).filter(Entity.id == rel.source_id).first()
if source:
related.append(f"{source.name}{rel.type}")
if related:
ctx += f" | Relações: {', '.join(related[:3])}"
context_parts.append(ctx)
return "\n".join(context_parts)
async def _get_web_context(self, query: str) -> str:
"""Get context from Lancer web search"""
try:
async with httpx.AsyncClient(timeout=30.0) as client:
response = await client.post(
f"{LANCER_URL}/search",
json={
"query": query,
"max_results": 5,
"include_answer": True
}
)
if response.status_code == 200:
data = response.json()
if data.get("answer"):
return f"Informações da web:\n{data['answer'][:1000]}"
return ""
except Exception as e:
print(f"Lancer error: {e}")
return ""
async def _call_llm(self, messages: List[Dict[str, str]]) -> str:
"""Call Cerebras LLM"""
try:
async with httpx.AsyncClient(timeout=60.0) as client:
response = await client.post(
self.api_url,
headers={
"Authorization": f"Bearer {settings.cerebras_api_key}",
"Content-Type": "application/json"
},
json={
"model": "qwen-3-32b",
"messages": messages,
"temperature": 0.7,
"max_tokens": 2048
}
)
if response.status_code == 200:
data = response.json()
return data["choices"][0]["message"]["content"]
else:
return f"Erro na API: {response.status_code}"
except Exception as e:
return f"Erro: {str(e)}"
async def chat(
self,
message: str,
db: Session,
use_web: bool = True,
use_history: bool = True,
session_id: Optional[str] = None
) -> Dict[str, Any]:
"""Process chat message with RAG"""
history = self._get_history(session_id)
# Get local context
local_context = self._get_local_context(message, db)
# Get web context if enabled
web_context = ""
if use_web:
web_context = await self._get_web_context(message)
# Build context
context_parts = []
if local_context:
context_parts.append(f"📊 Conhecimento local:\n{local_context}")
if web_context:
context_parts.append(f"🌐 {web_context}")
context = "\n\n".join(context_parts) if context_parts else "Nenhum contexto disponível."
# Build messages
messages = [{"role": "system", "content": SYSTEM_PROMPT}]
if use_history and history:
messages.extend(history[-6:])
user_message = f"""Contexto:
{context}
Pergunta: {message}"""
messages.append({"role": "user", "content": user_message})
# Call LLM
response = await self._call_llm(messages)
# Store history
if use_history:
history.append({"role": "user", "content": message})
history.append({"role": "assistant", "content": response})
return {
"answer": response,
"local_context_used": bool(local_context),
"web_context_used": bool(web_context),
"entities_found": local_context.count("•") if local_context else 0
}
# Singleton
chat_service = ChatService()