Spaces:
Running
Running
| # type: ignore | |
| """ | |
| modules/web_search.py | |
| ================================================================================ | |
| WEB SEARCH MÓDULO - BUSCA AUTÔNOMA COMPLETA E PROFISSIONAL | |
| ================================================================================ | |
| Versão 3.0 - Motor de busca autônomo e inteligente | |
| Features: | |
| - DuckDuckGo via biblioteca `ddgs` (production-ready, sem scraping frágil) | |
| - Busca de Texto, Notícias, Imagens e Vídeos (multi-tipo) | |
| - Wikipedia via API oficial (conteúdo completo) | |
| - Clima via OpenWeatherMap API (com fallback para wttr.in) | |
| - Pesquisa Autônoma: AI decide QUANDO e O QUE buscar sem comando explícito | |
| - Raspagem profunda de página web com extração de conteúdo limpo | |
| - Cache TTL inteligente por tipo de busca | |
| - Rate limiting respeitoso e rotação de User-Agent | |
| - Integração direta com banco de dados (salva pesquisas para RAG) | |
| Uso: | |
| ws = WebSearch(db=db_instance) | |
| resultado = ws.pesquisar("capital de angola") | |
| conteudo = ws.buscar_conteudo_completo("presidente João Lourenço") | |
| deve_ir = ws.deve_buscar_na_web("quem ganhou a copa ontem?") | |
| ================================================================================ | |
| """ | |
| import os | |
| import re | |
| import random | |
| import time | |
| import hashlib | |
| import sqlite3 | |
| import json | |
| from dataclasses import dataclass | |
| from typing import Dict, Any, List, Optional, Tuple, Union | |
| from datetime import datetime | |
| from loguru import logger | |
| try: | |
| from .config import DB_PATH | |
| except (ImportError, ValueError): | |
| try: | |
| from modules.config import DB_PATH | |
| except ImportError: | |
| DB_PATH = "akira.db" | |
| # ============================================================ | |
| # Imports opcionais com fallbacks | |
| # ============================================================ | |
| try: | |
| from ddgs import DDGS # type: ignore | |
| DDGS_AVAILABLE = True | |
| except ImportError: | |
| try: | |
| from duckduckgo_search import DDGS # type: ignore # nome antigo | |
| DDGS_AVAILABLE = True | |
| except ImportError: | |
| DDGS_AVAILABLE = False | |
| DDGS = None # type: ignore | |
| try: | |
| import requests # type: ignore | |
| REQUESTS_AVAILABLE = True | |
| except ImportError: | |
| REQUESTS_AVAILABLE = False | |
| requests = None # type: ignore | |
| try: | |
| from bs4 import BeautifulSoup # type: ignore | |
| BS4_AVAILABLE = True | |
| except ImportError: | |
| BS4_AVAILABLE = False | |
| BeautifulSoup = None # type: ignore | |
| try: | |
| from loguru import logger # type: ignore | |
| except ImportError: | |
| class _DummyLogger: | |
| def info(self, *a, **k): pass | |
| def success(self, *a, **k): pass | |
| def warning(self, *a, **k): pass | |
| def error(self, *a, **k): pass | |
| def debug(self, *a, **k): pass | |
| logger = _DummyLogger() # type: ignore | |
| try: | |
| from cachetools import TTLCache # type: ignore | |
| _CacheOK = True | |
| except ImportError: | |
| _CacheOK = False | |
| class TTLCache(dict): # type: ignore | |
| def __init__(self, maxsize=100, ttl=900, **kwargs): | |
| super().__init__(**kwargs) | |
| self.maxsize = maxsize | |
| self.ttl = ttl | |
| self._ts: Dict[str, float] = {} | |
| def __setitem__(self, key, value): | |
| super().__setitem__(key, value) | |
| self._ts[key] = time.time() | |
| if len(self) > self.maxsize: | |
| oldest = min(self._ts, key=lambda k: self._ts[k]) | |
| self.pop(oldest, None) | |
| self._ts.pop(oldest, None) | |
| def get(self, key, default=None): | |
| if key in self._ts and time.time() - self._ts[key] > self.ttl: | |
| self.pop(key, None) | |
| self._ts.pop(key, None) | |
| return default | |
| return super().get(key, default) | |
| # ============================================================ | |
| # CONFIGURAÇÕES GLOBAIS | |
| # ============================================================ | |
| REQUEST_TIMEOUT = 12 | |
| # Cache com diferentes TTLs por tipo (segundos) | |
| _CACHE_GERAL = TTLCache(maxsize=60, ttl=900) # 15 min | |
| _CACHE_NOTICIAS= TTLCache(maxsize=30, ttl=300) # 5 min (notícias mudam rápido) | |
| _CACHE_WIKI = TTLCache(maxsize=50, ttl=3600) # 1h (Wikipedia é estável) | |
| _CACHE_CLIMA = TTLCache(maxsize=20, ttl=600) # 10 min | |
| USER_AGENTS = [ | |
| "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36", | |
| "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.5 Safari/605.1.15", | |
| "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36", | |
| ] | |
| OPENWEATHER_KEY = os.getenv("OPENWEATHER_API_KEY", "") | |
| # Palavras-gatilho para busca autônoma (contexto NLP) | |
| _TRIGGERS_BUSCA = [ | |
| # Comandos explícitos | |
| "pesquisa", "busca na web", "buscar na internet", "pesquise", | |
| "me busca", "google", "procura", | |
| # Eventos atuais | |
| "o que está acontecendo", "últimas notícias", "notícias de hoje", | |
| "o que aconteceu", "aconteceu", "novidades", | |
| # Perguntas factuais específicas | |
| "quem é o presidente", "qual é a população", "quantos habitantes", | |
| "qual a capital", "onde fica", "quando foi fundado", | |
| # Sports/resultados | |
| "placar", "resultado do jogo", "ganhou a copa", "eliminado", | |
| # Temporal | |
| "ontem", "esta semana", "esse mês", "ano passado", "2025", "2026", | |
| # Pessoas | |
| "morreu", "foi preso", "foi assassinado", "renunciou", "eleito", | |
| # Tempo/clima | |
| "vai chover", "temperatura em", "clima em", "previsão do tempo", | |
| ] | |
| _PERGUNTAS_FATOS = [ | |
| "?", "quem", "qual", "quando", "onde", "quanto", "quantos", | |
| "por que", "como é", "o que é", "me conta", "explica", | |
| ] | |
| # ============================================================ | |
| # CLASSE PRINCIPAL | |
| # ============================================================ | |
| class WebSearchConfig: | |
| db_path: str = DB_PATH | |
| class WebSearch: | |
| """ | |
| Motor de busca autônoma profissional para AKIRA. | |
| Prioridade de backends: | |
| 1. DDGS (duckduckgo-search) - principal, sem API key | |
| 2. Wikipedia API - para perguntas conceituais | |
| 3. OpenWeatherMap - para clima | |
| 4. Scraping direto via BeautifulSoup - fallback | |
| """ | |
| def __init__(self, db=None): | |
| """ | |
| Args: | |
| db: Instância do Database para persistência das buscas (opcional) | |
| """ | |
| self.db = db | |
| self._session = None | |
| self._setup_session() | |
| if DDGS_AVAILABLE: | |
| logger.success("🔍 WebSearch: DDGS (DuckDuckGo) disponível e ativo") | |
| else: | |
| logger.warning("⚠️ WebSearch: ddgs não instalado – fallback via scraping") | |
| def _setup_session(self): | |
| """Configura sessão HTTP com headers realistas.""" | |
| if not REQUESTS_AVAILABLE: | |
| return | |
| self._session = requests.Session() | |
| self._session.headers.update({ | |
| "User-Agent": random.choice(USER_AGENTS), | |
| "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", | |
| "Accept-Language": "pt-BR,pt;q=0.9,en-US;q=0.8", | |
| "Accept-Encoding": "gzip, deflate", | |
| "Connection": "keep-alive", | |
| }) | |
| def _rotate_ua(self): | |
| """Rotaciona User-Agent para evitar bloqueio.""" | |
| if self._session: | |
| self._session.headers["User-Agent"] = random.choice(USER_AGENTS) | |
| # ================================================================== | |
| # 🌐 INTERFACE PRINCIPAL | |
| # ================================================================== | |
| def pesquisar( | |
| self, | |
| query: str, | |
| num_results: int = 5, | |
| tipo: Optional[str] = None, | |
| ) -> Dict[str, Any]: | |
| """ | |
| Pesquisa completa com detecção automática de tipo. | |
| Args: | |
| query: Termo de pesquisa | |
| num_results: Número de resultados (max 10) | |
| tipo: Forçar tipo: 'geral'|'noticias'|'wikipedia'|'clima'|'imagens' | |
| Returns: | |
| Dict com 'conteudo_bruto', 'resumo', 'tipo', 'resultados' | |
| """ | |
| if not query or not query.strip(): | |
| return self._erro("Query vazia") | |
| query = query.strip() | |
| cache_key = hashlib.md5(f"{query}:{num_results}:{tipo}".encode()).hexdigest()[:16] | |
| # Detecta tipo se não especificado | |
| tipo_detectado = tipo or self.detectar_tipo_pesquisa(query) | |
| # Verifica cache específico por tipo | |
| cache = self._get_cache(tipo_detectado) | |
| cached = cache.get(cache_key) | |
| if cached: | |
| logger.debug(f"📦 Cache hit [{tipo_detectado}]: {query[:40]}") | |
| return cached | |
| # Rotaciona UA | |
| self._rotate_ua() | |
| # Executa busca pelo tipo | |
| resultado: Dict[str, Any] | |
| if tipo_detectado == "wikipedia": | |
| resultado = self._buscar_wikipedia(query) | |
| elif tipo_detectado == "noticias": | |
| resultado = self._buscar_noticias(query, num_results) | |
| elif tipo_detectado == "clima": | |
| resultado = self._buscar_clima(query) | |
| elif tipo_detectado == "imagens": | |
| resultado = self._buscar_imagens(query, num_results) | |
| else: | |
| resultado = self._buscar_texto_ddgs(query, num_results) | |
| # Salva no cache | |
| cache[cache_key] = resultado | |
| # Persiste no banco de dados para RAG futuro | |
| self._persistir_busca(query, tipo_detectado, resultado) | |
| return resultado | |
| def buscar_conteudo_completo(self, query: str) -> str: | |
| """Retorna string bruta pronta para inserir no prompt.""" | |
| r = self.pesquisar(query) | |
| return r.get("conteudo_bruto", "Sem resultados disponíveis.") | |
| def buscar_resumido(self, query: str) -> str: | |
| r = self.pesquisar(query, num_results=3) | |
| return r.get("resumo", "Sem resumo disponível.") | |
| # ================================================================== | |
| # 🤖 PESQUISA AUTÔNOMA – a IA decide sozinha se deve buscar | |
| # ================================================================== | |
| def deve_buscar_na_web(self, mensagem: str, historico: Optional[List[str]] = None) -> bool: | |
| """ | |
| Decisão autônoma: a AKIRA deve buscar na web por conta própria? | |
| Lógica em camadas: | |
| 1. Gatilhos explícitos (o usuário pediu) | |
| 2. Perguntas factuais com marcadores temporais | |
| 3. Tópicos que o modelo definitivamente não sabe (eventos pós-treino) | |
| 4. Palavras de eventos conhecidos recentes | |
| Args: | |
| mensagem: Última mensagem do usuário | |
| historico: Últimas mensagens do histórico (contexto adicional) | |
| Returns: | |
| True se deve pesquisar na web | |
| """ | |
| msg = mensagem.lower().strip() | |
| # 1. Gatilhos explícitos | |
| if any(t in msg for t in _TRIGGERS_BUSCA): | |
| logger.info(f"🔍 Pesquisa autônoma ativada [gatilho explícito]: {msg[:60]}") | |
| return True | |
| # 2. Pergunta + indicador temporal/factual | |
| is_pergunta = ( | |
| "?" in msg or | |
| any(msg.startswith(p) for p in _PERGUNTAS_FATOS) | |
| ) | |
| indicadores_atuais = [ | |
| "atual", "recente", "novo", "último", "agora", | |
| "hoje", "ontem", "semana", "mês", "2024", "2025", "2026", | |
| "presidente", "governo", "eleição", "guerra", "acordo", | |
| "crise", "epidemia", "terremoto", "furacão" | |
| ] | |
| if is_pergunta and any(p in msg for p in indicadores_atuais): | |
| logger.info(f"🔍 Pesquisa autônoma ativada [pergunta+temporal]: {msg[:60]}") | |
| return True | |
| # 3. Pessoa pede para contar/explicar com contexto que muda | |
| frases_dinamicas = [ | |
| "me conta sobre", "o que você sabe sobre", "quem é", | |
| "o que é", "me fala sobre", "sabes de", "sabe de" | |
| ] | |
| if any(f in msg for f in frases_dinamicas): | |
| # Verifica se é sobre algo que pode ser evento recente | |
| entidades_suspeitas = msg.split() | |
| # Heurística: mais de 1 palavra após a frase → provavelmente nome próprio | |
| for frase in frases_dinamicas: | |
| if frase in msg: | |
| pos = msg.find(frase) + len(frase) | |
| resto = msg[pos:].strip() | |
| if len(resto.split()) >= 1: | |
| logger.info(f"🔍 Pesquisa autônoma ativada [entidade]: {resto[:60]}") | |
| return True | |
| # 4. Contexto do histórico (se usuário estava pedindo info antes) | |
| if historico and isinstance(historico, list): | |
| try: | |
| # Conversão ultra-segura: ignora None, extrai de tupla/dict ou converte str | |
| historico_limpo = [] | |
| for h in historico[-5:]: | |
| if h is None: continue | |
| if isinstance(h, tuple) and len(h) > 0: | |
| historico_limpo.append(str(h[0])) | |
| elif isinstance(h, dict): | |
| historico_limpo.append(str(h.get('content', h.get('mensagem', '')))) | |
| else: | |
| historico_limpo.append(str(h)) | |
| ultima_5 = " ".join(historico_limpo).lower() | |
| if any(t in ultima_5 for t in ["pesquisa", "busca", "notícia", "aconteceu", "saber sobre"]): | |
| return True | |
| except Exception as e: | |
| logger.warning(f"Erro ao processar histórico na busca: {e}") | |
| return False | |
| def extrair_assunto_busca(self, mensagem: str) -> str: | |
| """ | |
| Extrai o assunto principal da mensagem para usar como query. | |
| Remove ruído, stopwords e foca em termos de busca eficientes. | |
| """ | |
| msg = mensagem.strip() | |
| msg_lower = msg.lower() | |
| # 1. Padrões de extração semântica | |
| padroes = [ | |
| r"(?:pesquisa|busca|pesquise|procura|me busca|me fala|sabe sobre)\s+(?:sobre|de|a respeito de|do que|da)?\s*(.+)", | |
| r"(?:quem é|o que é|o que são|onde fica|qual é|quando foi|como é|pq que|por que)\s+(.+)", | |
| r"(?:me conta|me fala|explica|me explica|notícia|noticia|novidade)\s+(?:sobre|de)?\s*(.+)", | |
| ] | |
| query_candidata = "" | |
| for pat in padroes: | |
| m = re.search(pat, msg_lower) | |
| if m: | |
| query_candidata = m.group(1).strip().rstrip(".,!?") | |
| break | |
| if not query_candidata: | |
| query_candidata = msg_lower | |
| # 2. Limpeza profunda de ruído conversacional (Stopwords e muletas) | |
| stopwords = [ | |
| "pesquisa", "busca", "buscar", "procura", "me", "por favor", "pf", "pfv", | |
| "akira", "você", "sabe", "dizer", "quero", "queria", "estão", "logo", "parece", | |
| "que", "essa", "entre", "uma", "uns", "pelo", "pela", "num", "numa", "este", "esta" | |
| ] | |
| tokens = query_candidata.split() | |
| tokens_final = [] | |
| for t in tokens: | |
| t_limpo = t.rstrip(".,!?;") | |
| if t_limpo not in stopwords and len(t_limpo) > 1: | |
| tokens_final.append(t_limpo) | |
| # Se a limpeza removeu tudo, volta para a candidata original | |
| return " ".join(tokens_final) if len(tokens_final) >= 2 else query_candidata | |
| # ================================================================== | |
| # 🎯 DETECÇÃO DE TIPO | |
| # ================================================================== | |
| def detectar_tipo_pesquisa(self, query: str) -> str: | |
| """ | |
| Detecta automaticamente o melhor tipo de busca para a query. | |
| Returns: | |
| 'wikipedia' | 'noticias' | 'clima' | 'imagens' | 'geral' | |
| """ | |
| q = query.lower() | |
| # Clima | |
| clima_kws = ["clima", "tempo", "temperatura", "vai chover", "previsão", "chuva", "sol", "humidade"] | |
| if any(k in q for k in clima_kws): | |
| return "clima" | |
| # Notícias – eventos atuais | |
| news_kws = [ | |
| "notícia", "noticia", "última hora", "breaking", "aconteceu", | |
| "hoje", "eleição", "guerra", "crise", "julgamento", | |
| "preso", "morreu", "assassinado", "renunciou", "ganhou" | |
| ] | |
| if any(k in q for k in news_kws): | |
| return "noticias" | |
| # Imagens | |
| img_kws = ["foto de", "imagem de", "fotos de", "imagens de", "como é", "me mostra"] | |
| if any(k in q for k in img_kws): | |
| return "imagens" | |
| # IMPORTANTE: Desativada a rota 'wikipedia' pois estava dando erro lib/HTTP. | |
| # Agora perguntas que seriam wiki (biografias, o que é) caem na busca geral | |
| # que já raspa o extract de boas fontes. | |
| return "geral" | |
| # ================================================================== | |
| # 📰 BUSCA DE TEXTO VIA DDGS (principal) | |
| # ================================================================== | |
| def _buscar_texto_ddgs(self, query: str, num: int = 5) -> Dict[str, Any]: | |
| """Busca geral usando a biblioteca DDGS (DuckDuckGo Search).""" | |
| if not DDGS_AVAILABLE: | |
| return self._buscar_texto_fallback(query, num) | |
| try: | |
| resultados = [] | |
| with DDGS() as ddgs: | |
| for r in ddgs.text( | |
| query, | |
| region="pt-pt", # Alterado de wt-wt para evitar erros de conexão | |
| safesearch="off", | |
| timelimit=None, | |
| max_results=num, | |
| ): | |
| resultados.append({ | |
| "titulo": r.get("title", ""), | |
| "url": r.get("href", ""), | |
| "snippet": r.get("body", ""), | |
| }) | |
| if not resultados: | |
| return self._erro("DDGS: nenhum resultado") | |
| # Tenta enriquecer com conteúdo das páginas | |
| for res in resultados[:2]: # Só as 2 primeiras para não overload | |
| conteudo = self._raspar_pagina(res["url"]) | |
| if conteudo: | |
| res["conteudo_pagina"] = conteudo[:2000] | |
| bruto = self._montar_bruto_geral(query, resultados) | |
| return { | |
| "tipo": "geral", | |
| "query": query, | |
| "resumo": f"Web Search: '{query}' – {len(resultados)} resultados", | |
| "conteudo_bruto": bruto, | |
| "resultados": resultados, | |
| "timestamp": datetime.now().isoformat(), | |
| "fonte": "ddgs", | |
| } | |
| except Exception as e: | |
| # Silencia erros de conexão específicos do DuckDuckGo para evitar log ruidoso | |
| if "ConnectError" in str(e) or "DDGSException" in str(e): | |
| logger.debug(f"DDGS redundante/conexão erro: {e}") | |
| else: | |
| logger.warning(f"DDGS texto error: {e}") | |
| return self._buscar_texto_fallback(query, num) | |
| # ================================================================== | |
| # 📰 BUSCA DE NOTÍCIAS VIA DDGS | |
| # ================================================================== | |
| def _buscar_noticias(self, query: str, num: int = 5) -> Dict[str, Any]: | |
| """Busca notícias usando DDGS News backend.""" | |
| if not DDGS_AVAILABLE: | |
| return self._buscar_texto_ddgs(query, num) # fallback para geral | |
| try: | |
| noticias = [] | |
| with DDGS() as ddgs: | |
| for r in ddgs.news( | |
| query, | |
| region="pt-pt", # Alterado de wt-wt para evitar erros de conexão | |
| safesearch="off", | |
| timelimit="w", # última semana | |
| max_results=num, | |
| ): | |
| noticias.append({ | |
| "titulo": r.get("title", ""), | |
| "url": r.get("url", ""), | |
| "snippet": r.get("body", ""), | |
| "fonte": r.get("source", ""), | |
| "data": r.get("date", ""), | |
| }) | |
| if not noticias: | |
| # Tenta sem filtro de tempo | |
| with DDGS() as ddgs: | |
| for r in ddgs.news(query, max_results=num): | |
| noticias.append({ | |
| "titulo": r.get("title", ""), | |
| "url": r.get("url", ""), | |
| "snippet": r.get("body", ""), | |
| "fonte": r.get("source", ""), | |
| "data": r.get("date", ""), | |
| }) | |
| if not noticias: | |
| return self._erro("Noticias: sem resultados") | |
| bruto = f"=== 📰 NOTÍCIAS: {query.upper()} ===\n" | |
| bruto += f"DATA DA BUSCA: {datetime.now().strftime('%d/%m/%Y %H:%M')}\n\n" | |
| for i, n in enumerate(noticias, 1): | |
| bruto += f"[{i}] {n['titulo']}\n" | |
| if n.get("fonte"): | |
| bruto += f" Fonte: {n['fonte']}" | |
| if n.get("data"): | |
| bruto += f" | Data: {n['data']}" | |
| bruto += "\n" | |
| if n.get("snippet"): | |
| bruto += f" {n['snippet'][:300]}\n" | |
| if n.get("url"): | |
| bruto += f" 🔗 {n['url']}\n" | |
| bruto += "\n" | |
| bruto += "--- FIM DAS NOTÍCIAS ---\n" | |
| return { | |
| "tipo": "noticias", | |
| "query": query, | |
| "resumo": f"Notícias sobre '{query}': {len(noticias)} encontradas", | |
| "conteudo_bruto": bruto, | |
| "resultados": noticias, | |
| "timestamp": datetime.now().isoformat(), | |
| "fonte": "ddgs_news", | |
| } | |
| except Exception as e: | |
| logger.warning(f"DDGS noticias error: {e}") | |
| return self._buscar_texto_ddgs(query, num) | |
| # ================================================================== | |
| # 📚 WIKIPEDIA | |
| # ================================================================== | |
| def _buscar_wikipedia(self, query: str) -> Dict[str, Any]: | |
| """Busca na Wikipedia PT via API oficial com extração completa.""" | |
| if not REQUESTS_AVAILABLE: | |
| return self._erro("Wikipedia: requests não disponível") | |
| try: | |
| # 1. Pesquisa para encontrar o artigo correto | |
| search_url = "https://pt.wikipedia.org/w/api.php" | |
| r = self._session.get(search_url, params={ | |
| "action": "query", | |
| "format": "json", | |
| "list": "search", | |
| "srsearch": query, | |
| "srlimit": 3, | |
| }, timeout=REQUEST_TIMEOUT) | |
| if r.status_code != 200: | |
| return self._erro(f"Wikipedia HTTP {r.status_code}") | |
| data = r.json() | |
| resultados = data.get("query", {}).get("search", []) | |
| if not resultados: | |
| return self._erro("Wikipedia: nenhuma página encontrada") | |
| # Pega o mais relevante | |
| page_title = resultados[0]["title"] | |
| # 2. Busca conteúdo completo da página | |
| r2 = self._session.get(search_url, params={ | |
| "action": "query", | |
| "format": "json", | |
| "prop": "extracts|info", | |
| "exintro": False, | |
| "explaintext": True, | |
| "titles": page_title, | |
| "inprop": "url", | |
| }, timeout=REQUEST_TIMEOUT) | |
| data2 = r2.json() | |
| pages = data2.get("query", {}).get("pages", {}) | |
| page = next(iter(pages.values()), {}) | |
| extract = page.get("extract", "") | |
| fullurl = page.get("fullurl", f"https://pt.wikipedia.org/wiki/{page_title.replace(' ', '_')}") | |
| # Limpa e formata | |
| extract = re.sub(r'\[\d+\]', '', extract) | |
| extract = re.sub(r'\s+', ' ', extract).strip() | |
| bruto = f"=== 📚 WIKIPEDIA: {page_title} ===\n" | |
| bruto += f"Fonte: {fullurl}\n" | |
| bruto += f"Data da consulta: {datetime.now().strftime('%d/%m/%Y %H:%M')}\n\n" | |
| bruto += "CONTEÚDO:\n" | |
| bruto += extract[:6000] | |
| bruto += "\n\n--- FIM WIKIPEDIA ---\n" | |
| return { | |
| "tipo": "wikipedia", | |
| "titulo": page_title, | |
| "url": fullurl, | |
| "resumo": f"Wikipedia: {page_title}", | |
| "conteudo_bruto": bruto, | |
| "timestamp": datetime.now().isoformat(), | |
| "fonte": "wikipedia_api", | |
| } | |
| except Exception as e: | |
| logger.warning(f"Wikipedia error: {e}") | |
| return self._erro(f"Wikipedia: {e}") | |
| # ================================================================== | |
| # 🌤️ CLIMA | |
| # ================================================================== | |
| def _buscar_clima(self, query: str) -> Dict[str, Any]: | |
| """ | |
| Busca clima via OpenWeatherMap (se API key disponível) | |
| ou via wttr.in (sempre disponível, sem key). | |
| """ | |
| # Extrai cidade da query | |
| cidade = self._extrair_cidade(query) | |
| # Tenta wttr.in (sempre gratuito) | |
| try: | |
| if self._session: | |
| url = f"https://wttr.in/{cidade}?format=j1&lang=pt" | |
| r = self._session.get(url, timeout=REQUEST_TIMEOUT) | |
| if r.status_code == 200: | |
| data = r.json() | |
| cc = data.get("current_condition", [{}])[0] | |
| area = data.get("nearest_area", [{}])[0] | |
| nome_area = area.get("areaName", [{}])[0].get("value", cidade) | |
| pais = area.get("country", [{}])[0].get("value", "") | |
| temp_c = cc.get("temp_C", "?") | |
| sensacao = cc.get("FeelsLikeC", "?") | |
| humidade = cc.get("humidity", "?") | |
| vento_kmh = cc.get("windspeedKmph", "?") | |
| descricao = cc.get("weatherDesc", [{}])[0].get("value", "") | |
| bruto = f"=== 🌤️ CLIMA: {nome_area}, {pais} ===\n" | |
| bruto += f"Data: {datetime.now().strftime('%d/%m/%Y %H:%M')}\n\n" | |
| bruto += f"🌡️ Temperatura atual: {temp_c}°C (sensação: {sensacao}°C)\n" | |
| bruto += f"💧 Humidade: {humidade}%\n" | |
| bruto += f"💨 Vento: {vento_kmh} km/h\n" | |
| bruto += f"☁️ Condição: {descricao}\n" | |
| bruto += "\n--- FIM CLIMA ---\n" | |
| return { | |
| "tipo": "clima", | |
| "cidade": nome_area, | |
| "resumo": f"Clima em {nome_area}: {temp_c}°C, {descricao}", | |
| "conteudo_bruto": bruto, | |
| "temperatura": temp_c, | |
| "timestamp": datetime.now().isoformat(), | |
| "fonte": "wttr.in", | |
| } | |
| except Exception as e: | |
| # Ignora erros de JSON format porque o wttr.in as vezes retorna HTML de erro | |
| if "Expecting value" not in str(e) and "JSONDecodeError" not in str(e): | |
| logger.warning(f"wttr.in error: {e}") | |
| # Fallback: OpenWeatherMap se key disponível | |
| if OPENWEATHER_KEY: | |
| return self._clima_openweather(cidade) | |
| return self._erro(f"Clima: não foi possível obter dados para '{cidade}'") | |
| def _clima_openweather(self, cidade: str) -> Dict[str, Any]: | |
| """Fallback via OpenWeatherMap API.""" | |
| try: | |
| url = "https://api.openweathermap.org/data/2.5/weather" | |
| r = self._session.get(url, params={ | |
| "q": cidade, | |
| "appid": OPENWEATHER_KEY, | |
| "units": "metric", | |
| "lang": "pt", | |
| }, timeout=REQUEST_TIMEOUT) | |
| if r.status_code != 200: | |
| return self._erro(f"OpenWeather HTTP {r.status_code}") | |
| data = r.json() | |
| temp = data["main"]["temp"] | |
| sensacao = data["main"]["feels_like"] | |
| humidade = data["main"]["humidity"] | |
| vento = data["wind"]["speed"] * 3.6 # m/s → km/h | |
| desc = data["weather"][0]["description"] | |
| nome = data.get("name", cidade) | |
| bruto = f"=== 🌤️ CLIMA: {nome} ===\n" | |
| bruto += f"Temperatura: {temp:.1f}°C (sensação: {sensacao:.1f}°C)\n" | |
| bruto += f"Humidade: {humidade}%\n" | |
| bruto += f"Vento: {vento:.1f} km/h\n" | |
| bruto += f"Condição: {desc.capitalize()}\n" | |
| bruto += "--- FIM CLIMA ---\n" | |
| return { | |
| "tipo": "clima", "cidade": nome, | |
| "resumo": f"Clima em {nome}: {temp}°C, {desc}", | |
| "conteudo_bruto": bruto, | |
| "timestamp": datetime.now().isoformat(), | |
| "fonte": "openweathermap", | |
| } | |
| except Exception as e: | |
| return self._erro(f"OpenWeather: {e}") | |
| # ================================================================== | |
| # 🖼️ IMAGENS VIA DDGS | |
| # ================================================================== | |
| def _buscar_imagens(self, query: str, num: int = 5) -> Dict[str, Any]: | |
| """Busca URLs de imagens via DDGS.""" | |
| if not DDGS_AVAILABLE: | |
| return self._erro("DDGS não disponível para imagens") | |
| try: | |
| imagens = [] | |
| with DDGS() as ddgs: | |
| for r in ddgs.images( | |
| query, | |
| region="wt-wt", | |
| safesearch="off", | |
| size=None, | |
| max_results=num, | |
| ): | |
| imagens.append({ | |
| "titulo": r.get("title", ""), | |
| "url_imagem": r.get("image", ""), | |
| "url_pagina": r.get("url", ""), | |
| "thumbnail": r.get("thumbnail", ""), | |
| "fonte": r.get("source", ""), | |
| }) | |
| if not imagens: | |
| return self._erro("Imagens: sem resultados") | |
| bruto = f"=== 🖼️ IMAGENS: {query} ===\n" | |
| bruto += f"Data: {datetime.now().strftime('%d/%m/%Y')}\n\n" | |
| for i, img in enumerate(imagens, 1): | |
| bruto += f"[{i}] {img['titulo']}\n" | |
| bruto += f" URL: {img['url_imagem']}\n" | |
| if img.get("fonte"): | |
| bruto += f" Fonte: {img['fonte']}\n" | |
| bruto += "\n" | |
| bruto += "--- FIM IMAGENS ---\n" | |
| return { | |
| "tipo": "imagens", | |
| "query": query, | |
| "resumo": f"Imagens de '{query}': {len(imagens)} encontradas", | |
| "conteudo_bruto": bruto, | |
| "resultados": imagens, | |
| "timestamp": datetime.now().isoformat(), | |
| "fonte": "ddgs_images", | |
| } | |
| except Exception as e: | |
| logger.warning(f"DDGS imagens error: {e}") | |
| return self._erro(f"Imagens: {e}") | |
| # ================================================================== | |
| # 🔄 FALLBACK – Scraping manual via BeautifulSoup | |
| # ================================================================== | |
| def _buscar_texto_fallback(self, query: str, num: int = 5) -> Dict[str, Any]: | |
| """Fallback: scraping HTML do DuckDuckGo se DDGS não estiver instalado.""" | |
| if not REQUESTS_AVAILABLE or not BS4_AVAILABLE: | |
| return self._erro("Dependências insuficientes para busca fallback") | |
| try: | |
| from urllib.parse import urlencode | |
| url = f"https://html.duckduckgo.com/html/?{urlencode({'q': query, 'kl': 'pt-pt'})}" | |
| r = self._session.get(url, timeout=REQUEST_TIMEOUT) | |
| if r.status_code != 200: | |
| return self._erro(f"DuckDuckGo HTML: HTTP {r.status_code}") | |
| soup = BeautifulSoup(r.text, "html.parser") | |
| resultados = [] | |
| for res in soup.find_all("div", class_="result")[:num]: | |
| a = res.find("a", class_="result__a") | |
| snip = res.find("a", class_="result__snippet") | |
| if a: | |
| resultados.append({ | |
| "titulo": a.get_text(strip=True), | |
| "url": a.get("href", ""), | |
| "snippet": snip.get_text(strip=True) if snip else "", | |
| }) | |
| if not resultados: | |
| return self._erro("Fallback: sem resultados") | |
| bruto = self._montar_bruto_geral(query, resultados) | |
| return { | |
| "tipo": "geral", | |
| "query": query, | |
| "resumo": f"Web: '{query}' – {len(resultados)} resultados", | |
| "conteudo_bruto": bruto, | |
| "resultados": resultados, | |
| "timestamp": datetime.now().isoformat(), | |
| "fonte": "scraping_fallback", | |
| } | |
| except Exception as e: | |
| return self._erro(f"Fallback: {e}") | |
| # ================================================================== | |
| # 🌐 RASPAGEM DE CONTEÚDO DE PÁGINA | |
| # ================================================================== | |
| def _raspar_pagina(self, url: str) -> str: | |
| """ | |
| Extrai conteúdo relevante de uma URL. | |
| Retorna texto limpo ou string vazia se falhar. | |
| """ | |
| if not REQUESTS_AVAILABLE or not BS4_AVAILABLE or not url: | |
| return "" | |
| # Evita PDFs, binários, etc. | |
| ignorar = [".pdf", ".doc", ".xls", ".zip", ".exe", "javascript:", "mailto:"] | |
| if any(url.lower().endswith(ext) or ext in url.lower() for ext in ignorar): | |
| return "" | |
| try: | |
| r = self._session.get(url, timeout=8) | |
| if r.status_code != 200: | |
| return "" | |
| soup = BeautifulSoup(r.text, "html.parser") | |
| # Remove scripts, style, nav, footer | |
| for tag in soup.find_all(["script", "style", "nav", "footer", "header", "aside"]): | |
| tag.decompose() | |
| # Tenta encontrar conteúdo principal | |
| main_content = ( | |
| soup.find("article") or | |
| soup.find("main") or | |
| soup.find("div", {"id": re.compile(r"content|main|article", re.I)}) or | |
| soup.find("div", {"class": re.compile(r"content|main|article|post", re.I)}) | |
| ) | |
| if main_content: | |
| texto = main_content.get_text(separator=" ", strip=True) | |
| else: | |
| texto = soup.get_text(separator=" ", strip=True) | |
| # Limpa espaços excessivos | |
| texto = re.sub(r"\s+", " ", texto).strip() | |
| return texto[:3000] | |
| except Exception: | |
| return "" | |
| # ================================================================== | |
| # 🛠️ UTILITÁRIOS | |
| # ================================================================== | |
| def _montar_bruto_geral(self, query: str, resultados: List[Dict]) -> str: | |
| bruto = f"=== 🔎 PESQUISA WEB: {query.upper()} ===\n" | |
| bruto += f"Data: {datetime.now().strftime('%d/%m/%Y %H:%M')}\n" | |
| bruto += f"Total de resultados: {len(resultados)}\n\n" | |
| for i, r in enumerate(resultados, 1): | |
| bruto += f"[{i}] {r.get('titulo', 'Sem título')}\n" | |
| bruto += f" 🔗 {r.get('url', '')}\n" | |
| if r.get("snippet"): | |
| bruto += f" {r['snippet'][:400]}\n" | |
| if r.get("conteudo_pagina"): | |
| bruto += f" [CONTEÚDO] {r['conteudo_pagina'][:800]}\n" | |
| bruto += "\n" | |
| bruto += "--- FIM DOS RESULTADOS ---\n" | |
| return bruto | |
| def _extrair_cidade(self, query: str) -> str: | |
| """Extrai nome de cidade de uma query sobre clima.""" | |
| q = query.lower() | |
| prefixos = ["clima em", "tempo em", "temperatura em", "previsão em", "vai chover em", "como está o tempo em"] | |
| for p in prefixos: | |
| if p in q: | |
| return q.split(p)[-1].strip().split()[0].capitalize() | |
| # Heurística: última palavra relevante | |
| tokens = [t for t in query.split() if t.lower() not in | |
| ["clima", "tempo", "temperatura", "previsão", "hoje", "amanhã", "de", "em", "o", "a"]] | |
| return tokens[-1].capitalize() if tokens else "Luanda" | |
| def _get_cache(self, tipo: str) -> TTLCache: | |
| if tipo == "noticias": | |
| return _CACHE_NOTICIAS | |
| if tipo == "wikipedia": | |
| return _CACHE_WIKI | |
| if tipo == "clima": | |
| return _CACHE_CLIMA | |
| return _CACHE_GERAL | |
| def _persistir_busca(self, query: str, tipo: str, resultado: Dict): | |
| """Salva a busca no banco para uso como contexto RAG futuro.""" | |
| if not self.db: | |
| return | |
| try: | |
| resumo = resultado.get("resumo", "") | |
| self.db.salvar_aprendizado_detalhado( | |
| usuario="sistema", | |
| chave=f"web_search_{tipo}_{hashlib.md5(query.encode()).hexdigest()[:8]}", | |
| valor=json.dumps({ | |
| "query": query, | |
| "tipo": tipo, | |
| "resumo": resumo, | |
| "timestamp": datetime.now().isoformat(), | |
| }, ensure_ascii=False) | |
| ) | |
| except Exception as e: | |
| logger.debug(f"Persistência de busca ignorada: {e}") | |
| def _erro(self, mensagem: str) -> Dict[str, Any]: | |
| return { | |
| "tipo": "erro", | |
| "resumo": mensagem, | |
| "conteudo_bruto": f"=== ⚠️ ERRO NA PESQUISA ===\n{mensagem}\n---", | |
| "timestamp": datetime.now().isoformat(), | |
| "erro": True, | |
| } | |
| def limpar_cache(self): | |
| _CACHE_GERAL.clear() | |
| _CACHE_NOTICIAS.clear() | |
| _CACHE_WIKI.clear() | |
| _CACHE_CLIMA.clear() | |
| logger.info("🧹 Todos os caches de WebSearch limpos") | |
| # ============================================================ | |
| # SINGLETON & HELPERS PÚBLICOS | |
| # ============================================================ | |
| _instance: Optional[WebSearch] = None | |
| def get_web_search(db=None) -> WebSearch: | |
| """Retorna instância singleton do WebSearch.""" | |
| global _instance | |
| if _instance is None: | |
| _instance = WebSearch(db=db) | |
| return _instance | |
| def buscar_na_web(query: str, db=None) -> str: | |
| """Helper rápido: busca e retorna conteúdo bruto.""" | |
| return get_web_search(db=db).buscar_conteudo_completo(query) | |
| def deve_pesquisar(mensagem: str, historico: Optional[List[str]] = None) -> bool: | |
| """Helper: decide se deve pesquisar na web.""" | |
| return get_web_search().deve_buscar_na_web(mensagem, historico) | |
| def extrair_pesquisa(mensagem: str) -> str: | |
| """Helper: extrai assunto de busca da mensagem.""" | |
| return get_web_search().extrair_assunto_busca(mensagem) | |
| __all__ = [ | |
| "WebSearch", | |
| "get_web_search", | |
| "buscar_na_web", | |
| "deve_pesquisar", | |
| "extrair_pesquisa", | |
| ] | |