""" Chat Service - Intelligent chat with RAG capabilities Uses local database + Lancer for comprehensive responses """ import httpx from typing import Optional, List, Dict, Any from sqlalchemy.orm import Session from app.config import settings from app.models.entity import Entity, Relationship LANCER_URL = "https://madras1-lancer.hf.space/api/v1" SYSTEM_PROMPT = """Você é um assistente de inteligência do NUMIDIUM. Você tem acesso a um grafo de conhecimento com entidades e relacionamentos, e pode pesquisar na web para informações atualizadas. Responda em português brasileiro de forma clara e direta. Se não tiver certeza, diga que não sabe em vez de inventar.""" class ChatService: """Chat service with RAG using local database and Lancer""" def __init__(self): self.api_url = "https://api.cerebras.ai/v1/chat/completions" self.conversation_history: Dict[str, List[Dict[str, str]]] = {} def _get_history(self, session_id: Optional[str]) -> List[Dict[str, str]]: key = session_id or "default" if key not in self.conversation_history: self.conversation_history[key] = [] return self.conversation_history[key] def clear_history(self, session_id: Optional[str] = None): """Clear conversation history""" key = session_id or "default" self.conversation_history.pop(key, None) def _get_local_context(self, query: str, db: Session, limit: int = 5) -> str: """Get relevant entities from local database""" # Search entities by name entities = db.query(Entity).filter( Entity.name.ilike(f"%{query}%") ).limit(limit).all() # Also search by description if len(entities) < limit: desc_entities = db.query(Entity).filter( Entity.description.ilike(f"%{query}%") ).limit(limit - len(entities)).all() entities.extend(desc_entities) if not entities: # Try splitting query into words words = query.split() for word in words: if len(word) > 3: word_entities = db.query(Entity).filter( Entity.name.ilike(f"%{word}%") ).limit(2).all() entities.extend(word_entities) if not entities: return "" context_parts = [] seen_ids = set() for entity in entities: if entity.id in seen_ids: continue seen_ids.add(entity.id) ctx = f"• {entity.name} ({entity.type})" if entity.description: ctx += f": {entity.description[:200]}" # Get relationships relationships = db.query(Relationship).filter( (Relationship.source_id == entity.id) | (Relationship.target_id == entity.id) ).limit(5).all() if relationships: related = [] for rel in relationships: if rel.source_id == entity.id: target = db.query(Entity).filter(Entity.id == rel.target_id).first() if target: related.append(f"{rel.type} → {target.name}") else: source = db.query(Entity).filter(Entity.id == rel.source_id).first() if source: related.append(f"{source.name} → {rel.type}") if related: ctx += f" | Relações: {', '.join(related[:3])}" context_parts.append(ctx) return "\n".join(context_parts) async def _get_web_context(self, query: str) -> str: """Get context from Lancer web search""" try: async with httpx.AsyncClient(timeout=30.0) as client: response = await client.post( f"{LANCER_URL}/search", json={ "query": query, "max_results": 5, "include_answer": True } ) if response.status_code == 200: data = response.json() if data.get("answer"): return f"Informações da web:\n{data['answer'][:1000]}" return "" except Exception as e: print(f"Lancer error: {e}") return "" async def _call_llm(self, messages: List[Dict[str, str]]) -> str: """Call Cerebras LLM""" try: async with httpx.AsyncClient(timeout=60.0) as client: response = await client.post( self.api_url, headers={ "Authorization": f"Bearer {settings.cerebras_api_key}", "Content-Type": "application/json" }, json={ "model": "qwen-3-32b", "messages": messages, "temperature": 0.7, "max_tokens": 2048 } ) if response.status_code == 200: data = response.json() return data["choices"][0]["message"]["content"] else: return f"Erro na API: {response.status_code}" except Exception as e: return f"Erro: {str(e)}" async def chat( self, message: str, db: Session, use_web: bool = True, use_history: bool = True, session_id: Optional[str] = None ) -> Dict[str, Any]: """Process chat message with RAG""" history = self._get_history(session_id) # Get local context local_context = self._get_local_context(message, db) # Get web context if enabled web_context = "" if use_web: web_context = await self._get_web_context(message) # Build context context_parts = [] if local_context: context_parts.append(f"📊 Conhecimento local:\n{local_context}") if web_context: context_parts.append(f"🌐 {web_context}") context = "\n\n".join(context_parts) if context_parts else "Nenhum contexto disponível." # Build messages messages = [{"role": "system", "content": SYSTEM_PROMPT}] if use_history and history: messages.extend(history[-6:]) user_message = f"""Contexto: {context} Pergunta: {message}""" messages.append({"role": "user", "content": user_message}) # Call LLM response = await self._call_llm(messages) # Store history if use_history: history.append({"role": "user", "content": message}) history.append({"role": "assistant", "content": response}) return { "answer": response, "local_context_used": bool(local_context), "web_context_used": bool(web_context), "entities_found": local_context.count("•") if local_context else 0 } # Singleton chat_service = ChatService()