akira / web_search.py
akra35567's picture
Upload 4 files
61fbf18 verified
# type: ignore
"""
modules/web_search.py
================================================================================
WEB SEARCH MÓDULO - BUSCA AUTÔNOMA COMPLETA E PROFISSIONAL
================================================================================
Versão 3.0 - Motor de busca autônomo e inteligente
Features:
- DuckDuckGo via biblioteca `ddgs` (production-ready, sem scraping frágil)
- Busca de Texto, Notícias, Imagens e Vídeos (multi-tipo)
- Wikipedia via API oficial (conteúdo completo)
- Clima via OpenWeatherMap API (com fallback para wttr.in)
- Pesquisa Autônoma: AI decide QUANDO e O QUE buscar sem comando explícito
- Raspagem profunda de página web com extração de conteúdo limpo
- Cache TTL inteligente por tipo de busca
- Rate limiting respeitoso e rotação de User-Agent
- Integração direta com banco de dados (salva pesquisas para RAG)
Uso:
ws = WebSearch(db=db_instance)
resultado = ws.pesquisar("capital de angola")
conteudo = ws.buscar_conteudo_completo("presidente João Lourenço")
deve_ir = ws.deve_buscar_na_web("quem ganhou a copa ontem?")
================================================================================
"""
import os
import re
import random
import time
import hashlib
import sqlite3
import json
from dataclasses import dataclass
from typing import Dict, Any, List, Optional, Tuple, Union
from datetime import datetime
from loguru import logger
try:
from .config import DB_PATH
except (ImportError, ValueError):
try:
from modules.config import DB_PATH
except ImportError:
DB_PATH = "akira.db"
# ============================================================
# Imports opcionais com fallbacks
# ============================================================
try:
from ddgs import DDGS # type: ignore
DDGS_AVAILABLE = True
except ImportError:
try:
from duckduckgo_search import DDGS # type: ignore # nome antigo
DDGS_AVAILABLE = True
except ImportError:
DDGS_AVAILABLE = False
DDGS = None # type: ignore
try:
import requests # type: ignore
REQUESTS_AVAILABLE = True
except ImportError:
REQUESTS_AVAILABLE = False
requests = None # type: ignore
try:
from bs4 import BeautifulSoup # type: ignore
BS4_AVAILABLE = True
except ImportError:
BS4_AVAILABLE = False
BeautifulSoup = None # type: ignore
try:
from loguru import logger # type: ignore
except ImportError:
class _DummyLogger:
def info(self, *a, **k): pass
def success(self, *a, **k): pass
def warning(self, *a, **k): pass
def error(self, *a, **k): pass
def debug(self, *a, **k): pass
logger = _DummyLogger() # type: ignore
try:
from cachetools import TTLCache # type: ignore
_CacheOK = True
except ImportError:
_CacheOK = False
class TTLCache(dict): # type: ignore
def __init__(self, maxsize=100, ttl=900, **kwargs):
super().__init__(**kwargs)
self.maxsize = maxsize
self.ttl = ttl
self._ts: Dict[str, float] = {}
def __setitem__(self, key, value):
super().__setitem__(key, value)
self._ts[key] = time.time()
if len(self) > self.maxsize:
oldest = min(self._ts, key=lambda k: self._ts[k])
self.pop(oldest, None)
self._ts.pop(oldest, None)
def get(self, key, default=None):
if key in self._ts and time.time() - self._ts[key] > self.ttl:
self.pop(key, None)
self._ts.pop(key, None)
return default
return super().get(key, default)
# ============================================================
# CONFIGURAÇÕES GLOBAIS
# ============================================================
REQUEST_TIMEOUT = 12
# Cache com diferentes TTLs por tipo (segundos)
_CACHE_GERAL = TTLCache(maxsize=60, ttl=900) # 15 min
_CACHE_NOTICIAS= TTLCache(maxsize=30, ttl=300) # 5 min (notícias mudam rápido)
_CACHE_WIKI = TTLCache(maxsize=50, ttl=3600) # 1h (Wikipedia é estável)
_CACHE_CLIMA = TTLCache(maxsize=20, ttl=600) # 10 min
USER_AGENTS = [
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.5 Safari/605.1.15",
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36",
]
OPENWEATHER_KEY = os.getenv("OPENWEATHER_API_KEY", "")
# Palavras-gatilho para busca autônoma (contexto NLP)
_TRIGGERS_BUSCA = [
# Comandos explícitos
"pesquisa", "busca na web", "buscar na internet", "pesquise",
"me busca", "google", "procura",
# Eventos atuais
"o que está acontecendo", "últimas notícias", "notícias de hoje",
"o que aconteceu", "aconteceu", "novidades",
# Perguntas factuais específicas
"quem é o presidente", "qual é a população", "quantos habitantes",
"qual a capital", "onde fica", "quando foi fundado",
# Sports/resultados
"placar", "resultado do jogo", "ganhou a copa", "eliminado",
# Temporal
"ontem", "esta semana", "esse mês", "ano passado", "2025", "2026",
# Pessoas
"morreu", "foi preso", "foi assassinado", "renunciou", "eleito",
# Tempo/clima
"vai chover", "temperatura em", "clima em", "previsão do tempo",
]
_PERGUNTAS_FATOS = [
"?", "quem", "qual", "quando", "onde", "quanto", "quantos",
"por que", "como é", "o que é", "me conta", "explica",
]
# ============================================================
# CLASSE PRINCIPAL
# ============================================================
@dataclass
class WebSearchConfig:
db_path: str = DB_PATH
class WebSearch:
"""
Motor de busca autônoma profissional para AKIRA.
Prioridade de backends:
1. DDGS (duckduckgo-search) - principal, sem API key
2. Wikipedia API - para perguntas conceituais
3. OpenWeatherMap - para clima
4. Scraping direto via BeautifulSoup - fallback
"""
def __init__(self, db=None):
"""
Args:
db: Instância do Database para persistência das buscas (opcional)
"""
self.db = db
self._session = None
self._setup_session()
if DDGS_AVAILABLE:
logger.success("🔍 WebSearch: DDGS (DuckDuckGo) disponível e ativo")
else:
logger.warning("⚠️ WebSearch: ddgs não instalado – fallback via scraping")
def _setup_session(self):
"""Configura sessão HTTP com headers realistas."""
if not REQUESTS_AVAILABLE:
return
self._session = requests.Session()
self._session.headers.update({
"User-Agent": random.choice(USER_AGENTS),
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Accept-Language": "pt-BR,pt;q=0.9,en-US;q=0.8",
"Accept-Encoding": "gzip, deflate",
"Connection": "keep-alive",
})
def _rotate_ua(self):
"""Rotaciona User-Agent para evitar bloqueio."""
if self._session:
self._session.headers["User-Agent"] = random.choice(USER_AGENTS)
# ==================================================================
# 🌐 INTERFACE PRINCIPAL
# ==================================================================
def pesquisar(
self,
query: str,
num_results: int = 5,
tipo: Optional[str] = None,
) -> Dict[str, Any]:
"""
Pesquisa completa com detecção automática de tipo.
Args:
query: Termo de pesquisa
num_results: Número de resultados (max 10)
tipo: Forçar tipo: 'geral'|'noticias'|'wikipedia'|'clima'|'imagens'
Returns:
Dict com 'conteudo_bruto', 'resumo', 'tipo', 'resultados'
"""
if not query or not query.strip():
return self._erro("Query vazia")
query = query.strip()
cache_key = hashlib.md5(f"{query}:{num_results}:{tipo}".encode()).hexdigest()[:16]
# Detecta tipo se não especificado
tipo_detectado = tipo or self.detectar_tipo_pesquisa(query)
# Verifica cache específico por tipo
cache = self._get_cache(tipo_detectado)
cached = cache.get(cache_key)
if cached:
logger.debug(f"📦 Cache hit [{tipo_detectado}]: {query[:40]}")
return cached
# Rotaciona UA
self._rotate_ua()
# Executa busca pelo tipo
resultado: Dict[str, Any]
if tipo_detectado == "wikipedia":
resultado = self._buscar_wikipedia(query)
elif tipo_detectado == "noticias":
resultado = self._buscar_noticias(query, num_results)
elif tipo_detectado == "clima":
resultado = self._buscar_clima(query)
elif tipo_detectado == "imagens":
resultado = self._buscar_imagens(query, num_results)
else:
resultado = self._buscar_texto_ddgs(query, num_results)
# Salva no cache
cache[cache_key] = resultado
# Persiste no banco de dados para RAG futuro
self._persistir_busca(query, tipo_detectado, resultado)
return resultado
def buscar_conteudo_completo(self, query: str) -> str:
"""Retorna string bruta pronta para inserir no prompt."""
r = self.pesquisar(query)
return r.get("conteudo_bruto", "Sem resultados disponíveis.")
def buscar_resumido(self, query: str) -> str:
r = self.pesquisar(query, num_results=3)
return r.get("resumo", "Sem resumo disponível.")
# ==================================================================
# 🤖 PESQUISA AUTÔNOMA – a IA decide sozinha se deve buscar
# ==================================================================
def deve_buscar_na_web(self, mensagem: str, historico: Optional[List[str]] = None) -> bool:
"""
Decisão autônoma: a AKIRA deve buscar na web por conta própria?
Lógica em camadas:
1. Gatilhos explícitos (o usuário pediu)
2. Perguntas factuais com marcadores temporais
3. Tópicos que o modelo definitivamente não sabe (eventos pós-treino)
4. Palavras de eventos conhecidos recentes
Args:
mensagem: Última mensagem do usuário
historico: Últimas mensagens do histórico (contexto adicional)
Returns:
True se deve pesquisar na web
"""
msg = mensagem.lower().strip()
# 1. Gatilhos explícitos
if any(t in msg for t in _TRIGGERS_BUSCA):
logger.info(f"🔍 Pesquisa autônoma ativada [gatilho explícito]: {msg[:60]}")
return True
# 2. Pergunta + indicador temporal/factual
is_pergunta = (
"?" in msg or
any(msg.startswith(p) for p in _PERGUNTAS_FATOS)
)
indicadores_atuais = [
"atual", "recente", "novo", "último", "agora",
"hoje", "ontem", "semana", "mês", "2024", "2025", "2026",
"presidente", "governo", "eleição", "guerra", "acordo",
"crise", "epidemia", "terremoto", "furacão"
]
if is_pergunta and any(p in msg for p in indicadores_atuais):
logger.info(f"🔍 Pesquisa autônoma ativada [pergunta+temporal]: {msg[:60]}")
return True
# 3. Pessoa pede para contar/explicar com contexto que muda
frases_dinamicas = [
"me conta sobre", "o que você sabe sobre", "quem é",
"o que é", "me fala sobre", "sabes de", "sabe de"
]
if any(f in msg for f in frases_dinamicas):
# Verifica se é sobre algo que pode ser evento recente
entidades_suspeitas = msg.split()
# Heurística: mais de 1 palavra após a frase → provavelmente nome próprio
for frase in frases_dinamicas:
if frase in msg:
pos = msg.find(frase) + len(frase)
resto = msg[pos:].strip()
if len(resto.split()) >= 1:
logger.info(f"🔍 Pesquisa autônoma ativada [entidade]: {resto[:60]}")
return True
# 4. Contexto do histórico (se usuário estava pedindo info antes)
if historico and isinstance(historico, list):
try:
# Conversão ultra-segura: ignora None, extrai de tupla/dict ou converte str
historico_limpo = []
for h in historico[-5:]:
if h is None: continue
if isinstance(h, tuple) and len(h) > 0:
historico_limpo.append(str(h[0]))
elif isinstance(h, dict):
historico_limpo.append(str(h.get('content', h.get('mensagem', ''))))
else:
historico_limpo.append(str(h))
ultima_5 = " ".join(historico_limpo).lower()
if any(t in ultima_5 for t in ["pesquisa", "busca", "notícia", "aconteceu", "saber sobre"]):
return True
except Exception as e:
logger.warning(f"Erro ao processar histórico na busca: {e}")
return False
def extrair_assunto_busca(self, mensagem: str) -> str:
"""
Extrai o assunto principal da mensagem para usar como query.
Remove ruído, stopwords e foca em termos de busca eficientes.
"""
msg = mensagem.strip()
msg_lower = msg.lower()
# 1. Padrões de extração semântica
padroes = [
r"(?:pesquisa|busca|pesquise|procura|me busca|me fala|sabe sobre)\s+(?:sobre|de|a respeito de|do que|da)?\s*(.+)",
r"(?:quem é|o que é|o que são|onde fica|qual é|quando foi|como é|pq que|por que)\s+(.+)",
r"(?:me conta|me fala|explica|me explica|notícia|noticia|novidade)\s+(?:sobre|de)?\s*(.+)",
]
query_candidata = ""
for pat in padroes:
m = re.search(pat, msg_lower)
if m:
query_candidata = m.group(1).strip().rstrip(".,!?")
break
if not query_candidata:
query_candidata = msg_lower
# 2. Limpeza profunda de ruído conversacional (Stopwords e muletas)
stopwords = [
"pesquisa", "busca", "buscar", "procura", "me", "por favor", "pf", "pfv",
"akira", "você", "sabe", "dizer", "quero", "queria", "estão", "logo", "parece",
"que", "essa", "entre", "uma", "uns", "pelo", "pela", "num", "numa", "este", "esta"
]
tokens = query_candidata.split()
tokens_final = []
for t in tokens:
t_limpo = t.rstrip(".,!?;")
if t_limpo not in stopwords and len(t_limpo) > 1:
tokens_final.append(t_limpo)
# Se a limpeza removeu tudo, volta para a candidata original
return " ".join(tokens_final) if len(tokens_final) >= 2 else query_candidata
# ==================================================================
# 🎯 DETECÇÃO DE TIPO
# ==================================================================
def detectar_tipo_pesquisa(self, query: str) -> str:
"""
Detecta automaticamente o melhor tipo de busca para a query.
Returns:
'wikipedia' | 'noticias' | 'clima' | 'imagens' | 'geral'
"""
q = query.lower()
# Clima
clima_kws = ["clima", "tempo", "temperatura", "vai chover", "previsão", "chuva", "sol", "humidade"]
if any(k in q for k in clima_kws):
return "clima"
# Notícias – eventos atuais
news_kws = [
"notícia", "noticia", "última hora", "breaking", "aconteceu",
"hoje", "eleição", "guerra", "crise", "julgamento",
"preso", "morreu", "assassinado", "renunciou", "ganhou"
]
if any(k in q for k in news_kws):
return "noticias"
# Imagens
img_kws = ["foto de", "imagem de", "fotos de", "imagens de", "como é", "me mostra"]
if any(k in q for k in img_kws):
return "imagens"
# IMPORTANTE: Desativada a rota 'wikipedia' pois estava dando erro lib/HTTP.
# Agora perguntas que seriam wiki (biografias, o que é) caem na busca geral
# que já raspa o extract de boas fontes.
return "geral"
# ==================================================================
# 📰 BUSCA DE TEXTO VIA DDGS (principal)
# ==================================================================
def _buscar_texto_ddgs(self, query: str, num: int = 5) -> Dict[str, Any]:
"""Busca geral usando a biblioteca DDGS (DuckDuckGo Search)."""
if not DDGS_AVAILABLE:
return self._buscar_texto_fallback(query, num)
try:
resultados = []
with DDGS() as ddgs:
for r in ddgs.text(
query,
region="pt-pt", # Alterado de wt-wt para evitar erros de conexão
safesearch="off",
timelimit=None,
max_results=num,
):
resultados.append({
"titulo": r.get("title", ""),
"url": r.get("href", ""),
"snippet": r.get("body", ""),
})
if not resultados:
return self._erro("DDGS: nenhum resultado")
# Tenta enriquecer com conteúdo das páginas
for res in resultados[:2]: # Só as 2 primeiras para não overload
conteudo = self._raspar_pagina(res["url"])
if conteudo:
res["conteudo_pagina"] = conteudo[:2000]
bruto = self._montar_bruto_geral(query, resultados)
return {
"tipo": "geral",
"query": query,
"resumo": f"Web Search: '{query}' – {len(resultados)} resultados",
"conteudo_bruto": bruto,
"resultados": resultados,
"timestamp": datetime.now().isoformat(),
"fonte": "ddgs",
}
except Exception as e:
# Silencia erros de conexão específicos do DuckDuckGo para evitar log ruidoso
if "ConnectError" in str(e) or "DDGSException" in str(e):
logger.debug(f"DDGS redundante/conexão erro: {e}")
else:
logger.warning(f"DDGS texto error: {e}")
return self._buscar_texto_fallback(query, num)
# ==================================================================
# 📰 BUSCA DE NOTÍCIAS VIA DDGS
# ==================================================================
def _buscar_noticias(self, query: str, num: int = 5) -> Dict[str, Any]:
"""Busca notícias usando DDGS News backend."""
if not DDGS_AVAILABLE:
return self._buscar_texto_ddgs(query, num) # fallback para geral
try:
noticias = []
with DDGS() as ddgs:
for r in ddgs.news(
query,
region="pt-pt", # Alterado de wt-wt para evitar erros de conexão
safesearch="off",
timelimit="w", # última semana
max_results=num,
):
noticias.append({
"titulo": r.get("title", ""),
"url": r.get("url", ""),
"snippet": r.get("body", ""),
"fonte": r.get("source", ""),
"data": r.get("date", ""),
})
if not noticias:
# Tenta sem filtro de tempo
with DDGS() as ddgs:
for r in ddgs.news(query, max_results=num):
noticias.append({
"titulo": r.get("title", ""),
"url": r.get("url", ""),
"snippet": r.get("body", ""),
"fonte": r.get("source", ""),
"data": r.get("date", ""),
})
if not noticias:
return self._erro("Noticias: sem resultados")
bruto = f"=== 📰 NOTÍCIAS: {query.upper()} ===\n"
bruto += f"DATA DA BUSCA: {datetime.now().strftime('%d/%m/%Y %H:%M')}\n\n"
for i, n in enumerate(noticias, 1):
bruto += f"[{i}] {n['titulo']}\n"
if n.get("fonte"):
bruto += f" Fonte: {n['fonte']}"
if n.get("data"):
bruto += f" | Data: {n['data']}"
bruto += "\n"
if n.get("snippet"):
bruto += f" {n['snippet'][:300]}\n"
if n.get("url"):
bruto += f" 🔗 {n['url']}\n"
bruto += "\n"
bruto += "--- FIM DAS NOTÍCIAS ---\n"
return {
"tipo": "noticias",
"query": query,
"resumo": f"Notícias sobre '{query}': {len(noticias)} encontradas",
"conteudo_bruto": bruto,
"resultados": noticias,
"timestamp": datetime.now().isoformat(),
"fonte": "ddgs_news",
}
except Exception as e:
logger.warning(f"DDGS noticias error: {e}")
return self._buscar_texto_ddgs(query, num)
# ==================================================================
# 📚 WIKIPEDIA
# ==================================================================
def _buscar_wikipedia(self, query: str) -> Dict[str, Any]:
"""Busca na Wikipedia PT via API oficial com extração completa."""
if not REQUESTS_AVAILABLE:
return self._erro("Wikipedia: requests não disponível")
try:
# 1. Pesquisa para encontrar o artigo correto
search_url = "https://pt.wikipedia.org/w/api.php"
r = self._session.get(search_url, params={
"action": "query",
"format": "json",
"list": "search",
"srsearch": query,
"srlimit": 3,
}, timeout=REQUEST_TIMEOUT)
if r.status_code != 200:
return self._erro(f"Wikipedia HTTP {r.status_code}")
data = r.json()
resultados = data.get("query", {}).get("search", [])
if not resultados:
return self._erro("Wikipedia: nenhuma página encontrada")
# Pega o mais relevante
page_title = resultados[0]["title"]
# 2. Busca conteúdo completo da página
r2 = self._session.get(search_url, params={
"action": "query",
"format": "json",
"prop": "extracts|info",
"exintro": False,
"explaintext": True,
"titles": page_title,
"inprop": "url",
}, timeout=REQUEST_TIMEOUT)
data2 = r2.json()
pages = data2.get("query", {}).get("pages", {})
page = next(iter(pages.values()), {})
extract = page.get("extract", "")
fullurl = page.get("fullurl", f"https://pt.wikipedia.org/wiki/{page_title.replace(' ', '_')}")
# Limpa e formata
extract = re.sub(r'\[\d+\]', '', extract)
extract = re.sub(r'\s+', ' ', extract).strip()
bruto = f"=== 📚 WIKIPEDIA: {page_title} ===\n"
bruto += f"Fonte: {fullurl}\n"
bruto += f"Data da consulta: {datetime.now().strftime('%d/%m/%Y %H:%M')}\n\n"
bruto += "CONTEÚDO:\n"
bruto += extract[:6000]
bruto += "\n\n--- FIM WIKIPEDIA ---\n"
return {
"tipo": "wikipedia",
"titulo": page_title,
"url": fullurl,
"resumo": f"Wikipedia: {page_title}",
"conteudo_bruto": bruto,
"timestamp": datetime.now().isoformat(),
"fonte": "wikipedia_api",
}
except Exception as e:
logger.warning(f"Wikipedia error: {e}")
return self._erro(f"Wikipedia: {e}")
# ==================================================================
# 🌤️ CLIMA
# ==================================================================
def _buscar_clima(self, query: str) -> Dict[str, Any]:
"""
Busca clima via OpenWeatherMap (se API key disponível)
ou via wttr.in (sempre disponível, sem key).
"""
# Extrai cidade da query
cidade = self._extrair_cidade(query)
# Tenta wttr.in (sempre gratuito)
try:
if self._session:
url = f"https://wttr.in/{cidade}?format=j1&lang=pt"
r = self._session.get(url, timeout=REQUEST_TIMEOUT)
if r.status_code == 200:
data = r.json()
cc = data.get("current_condition", [{}])[0]
area = data.get("nearest_area", [{}])[0]
nome_area = area.get("areaName", [{}])[0].get("value", cidade)
pais = area.get("country", [{}])[0].get("value", "")
temp_c = cc.get("temp_C", "?")
sensacao = cc.get("FeelsLikeC", "?")
humidade = cc.get("humidity", "?")
vento_kmh = cc.get("windspeedKmph", "?")
descricao = cc.get("weatherDesc", [{}])[0].get("value", "")
bruto = f"=== 🌤️ CLIMA: {nome_area}, {pais} ===\n"
bruto += f"Data: {datetime.now().strftime('%d/%m/%Y %H:%M')}\n\n"
bruto += f"🌡️ Temperatura atual: {temp_c}°C (sensação: {sensacao}°C)\n"
bruto += f"💧 Humidade: {humidade}%\n"
bruto += f"💨 Vento: {vento_kmh} km/h\n"
bruto += f"☁️ Condição: {descricao}\n"
bruto += "\n--- FIM CLIMA ---\n"
return {
"tipo": "clima",
"cidade": nome_area,
"resumo": f"Clima em {nome_area}: {temp_c}°C, {descricao}",
"conteudo_bruto": bruto,
"temperatura": temp_c,
"timestamp": datetime.now().isoformat(),
"fonte": "wttr.in",
}
except Exception as e:
# Ignora erros de JSON format porque o wttr.in as vezes retorna HTML de erro
if "Expecting value" not in str(e) and "JSONDecodeError" not in str(e):
logger.warning(f"wttr.in error: {e}")
# Fallback: OpenWeatherMap se key disponível
if OPENWEATHER_KEY:
return self._clima_openweather(cidade)
return self._erro(f"Clima: não foi possível obter dados para '{cidade}'")
def _clima_openweather(self, cidade: str) -> Dict[str, Any]:
"""Fallback via OpenWeatherMap API."""
try:
url = "https://api.openweathermap.org/data/2.5/weather"
r = self._session.get(url, params={
"q": cidade,
"appid": OPENWEATHER_KEY,
"units": "metric",
"lang": "pt",
}, timeout=REQUEST_TIMEOUT)
if r.status_code != 200:
return self._erro(f"OpenWeather HTTP {r.status_code}")
data = r.json()
temp = data["main"]["temp"]
sensacao = data["main"]["feels_like"]
humidade = data["main"]["humidity"]
vento = data["wind"]["speed"] * 3.6 # m/s → km/h
desc = data["weather"][0]["description"]
nome = data.get("name", cidade)
bruto = f"=== 🌤️ CLIMA: {nome} ===\n"
bruto += f"Temperatura: {temp:.1f}°C (sensação: {sensacao:.1f}°C)\n"
bruto += f"Humidade: {humidade}%\n"
bruto += f"Vento: {vento:.1f} km/h\n"
bruto += f"Condição: {desc.capitalize()}\n"
bruto += "--- FIM CLIMA ---\n"
return {
"tipo": "clima", "cidade": nome,
"resumo": f"Clima em {nome}: {temp}°C, {desc}",
"conteudo_bruto": bruto,
"timestamp": datetime.now().isoformat(),
"fonte": "openweathermap",
}
except Exception as e:
return self._erro(f"OpenWeather: {e}")
# ==================================================================
# 🖼️ IMAGENS VIA DDGS
# ==================================================================
def _buscar_imagens(self, query: str, num: int = 5) -> Dict[str, Any]:
"""Busca URLs de imagens via DDGS."""
if not DDGS_AVAILABLE:
return self._erro("DDGS não disponível para imagens")
try:
imagens = []
with DDGS() as ddgs:
for r in ddgs.images(
query,
region="wt-wt",
safesearch="off",
size=None,
max_results=num,
):
imagens.append({
"titulo": r.get("title", ""),
"url_imagem": r.get("image", ""),
"url_pagina": r.get("url", ""),
"thumbnail": r.get("thumbnail", ""),
"fonte": r.get("source", ""),
})
if not imagens:
return self._erro("Imagens: sem resultados")
bruto = f"=== 🖼️ IMAGENS: {query} ===\n"
bruto += f"Data: {datetime.now().strftime('%d/%m/%Y')}\n\n"
for i, img in enumerate(imagens, 1):
bruto += f"[{i}] {img['titulo']}\n"
bruto += f" URL: {img['url_imagem']}\n"
if img.get("fonte"):
bruto += f" Fonte: {img['fonte']}\n"
bruto += "\n"
bruto += "--- FIM IMAGENS ---\n"
return {
"tipo": "imagens",
"query": query,
"resumo": f"Imagens de '{query}': {len(imagens)} encontradas",
"conteudo_bruto": bruto,
"resultados": imagens,
"timestamp": datetime.now().isoformat(),
"fonte": "ddgs_images",
}
except Exception as e:
logger.warning(f"DDGS imagens error: {e}")
return self._erro(f"Imagens: {e}")
# ==================================================================
# 🔄 FALLBACK – Scraping manual via BeautifulSoup
# ==================================================================
def _buscar_texto_fallback(self, query: str, num: int = 5) -> Dict[str, Any]:
"""Fallback: scraping HTML do DuckDuckGo se DDGS não estiver instalado."""
if not REQUESTS_AVAILABLE or not BS4_AVAILABLE:
return self._erro("Dependências insuficientes para busca fallback")
try:
from urllib.parse import urlencode
url = f"https://html.duckduckgo.com/html/?{urlencode({'q': query, 'kl': 'pt-pt'})}"
r = self._session.get(url, timeout=REQUEST_TIMEOUT)
if r.status_code != 200:
return self._erro(f"DuckDuckGo HTML: HTTP {r.status_code}")
soup = BeautifulSoup(r.text, "html.parser")
resultados = []
for res in soup.find_all("div", class_="result")[:num]:
a = res.find("a", class_="result__a")
snip = res.find("a", class_="result__snippet")
if a:
resultados.append({
"titulo": a.get_text(strip=True),
"url": a.get("href", ""),
"snippet": snip.get_text(strip=True) if snip else "",
})
if not resultados:
return self._erro("Fallback: sem resultados")
bruto = self._montar_bruto_geral(query, resultados)
return {
"tipo": "geral",
"query": query,
"resumo": f"Web: '{query}' – {len(resultados)} resultados",
"conteudo_bruto": bruto,
"resultados": resultados,
"timestamp": datetime.now().isoformat(),
"fonte": "scraping_fallback",
}
except Exception as e:
return self._erro(f"Fallback: {e}")
# ==================================================================
# 🌐 RASPAGEM DE CONTEÚDO DE PÁGINA
# ==================================================================
def _raspar_pagina(self, url: str) -> str:
"""
Extrai conteúdo relevante de uma URL.
Retorna texto limpo ou string vazia se falhar.
"""
if not REQUESTS_AVAILABLE or not BS4_AVAILABLE or not url:
return ""
# Evita PDFs, binários, etc.
ignorar = [".pdf", ".doc", ".xls", ".zip", ".exe", "javascript:", "mailto:"]
if any(url.lower().endswith(ext) or ext in url.lower() for ext in ignorar):
return ""
try:
r = self._session.get(url, timeout=8)
if r.status_code != 200:
return ""
soup = BeautifulSoup(r.text, "html.parser")
# Remove scripts, style, nav, footer
for tag in soup.find_all(["script", "style", "nav", "footer", "header", "aside"]):
tag.decompose()
# Tenta encontrar conteúdo principal
main_content = (
soup.find("article") or
soup.find("main") or
soup.find("div", {"id": re.compile(r"content|main|article", re.I)}) or
soup.find("div", {"class": re.compile(r"content|main|article|post", re.I)})
)
if main_content:
texto = main_content.get_text(separator=" ", strip=True)
else:
texto = soup.get_text(separator=" ", strip=True)
# Limpa espaços excessivos
texto = re.sub(r"\s+", " ", texto).strip()
return texto[:3000]
except Exception:
return ""
# ==================================================================
# 🛠️ UTILITÁRIOS
# ==================================================================
def _montar_bruto_geral(self, query: str, resultados: List[Dict]) -> str:
bruto = f"=== 🔎 PESQUISA WEB: {query.upper()} ===\n"
bruto += f"Data: {datetime.now().strftime('%d/%m/%Y %H:%M')}\n"
bruto += f"Total de resultados: {len(resultados)}\n\n"
for i, r in enumerate(resultados, 1):
bruto += f"[{i}] {r.get('titulo', 'Sem título')}\n"
bruto += f" 🔗 {r.get('url', '')}\n"
if r.get("snippet"):
bruto += f" {r['snippet'][:400]}\n"
if r.get("conteudo_pagina"):
bruto += f" [CONTEÚDO] {r['conteudo_pagina'][:800]}\n"
bruto += "\n"
bruto += "--- FIM DOS RESULTADOS ---\n"
return bruto
def _extrair_cidade(self, query: str) -> str:
"""Extrai nome de cidade de uma query sobre clima."""
q = query.lower()
prefixos = ["clima em", "tempo em", "temperatura em", "previsão em", "vai chover em", "como está o tempo em"]
for p in prefixos:
if p in q:
return q.split(p)[-1].strip().split()[0].capitalize()
# Heurística: última palavra relevante
tokens = [t for t in query.split() if t.lower() not in
["clima", "tempo", "temperatura", "previsão", "hoje", "amanhã", "de", "em", "o", "a"]]
return tokens[-1].capitalize() if tokens else "Luanda"
def _get_cache(self, tipo: str) -> TTLCache:
if tipo == "noticias":
return _CACHE_NOTICIAS
if tipo == "wikipedia":
return _CACHE_WIKI
if tipo == "clima":
return _CACHE_CLIMA
return _CACHE_GERAL
def _persistir_busca(self, query: str, tipo: str, resultado: Dict):
"""Salva a busca no banco para uso como contexto RAG futuro."""
if not self.db:
return
try:
resumo = resultado.get("resumo", "")
self.db.salvar_aprendizado_detalhado(
usuario="sistema",
chave=f"web_search_{tipo}_{hashlib.md5(query.encode()).hexdigest()[:8]}",
valor=json.dumps({
"query": query,
"tipo": tipo,
"resumo": resumo,
"timestamp": datetime.now().isoformat(),
}, ensure_ascii=False)
)
except Exception as e:
logger.debug(f"Persistência de busca ignorada: {e}")
def _erro(self, mensagem: str) -> Dict[str, Any]:
return {
"tipo": "erro",
"resumo": mensagem,
"conteudo_bruto": f"=== ⚠️ ERRO NA PESQUISA ===\n{mensagem}\n---",
"timestamp": datetime.now().isoformat(),
"erro": True,
}
def limpar_cache(self):
_CACHE_GERAL.clear()
_CACHE_NOTICIAS.clear()
_CACHE_WIKI.clear()
_CACHE_CLIMA.clear()
logger.info("🧹 Todos os caches de WebSearch limpos")
# ============================================================
# SINGLETON & HELPERS PÚBLICOS
# ============================================================
_instance: Optional[WebSearch] = None
def get_web_search(db=None) -> WebSearch:
"""Retorna instância singleton do WebSearch."""
global _instance
if _instance is None:
_instance = WebSearch(db=db)
return _instance
def buscar_na_web(query: str, db=None) -> str:
"""Helper rápido: busca e retorna conteúdo bruto."""
return get_web_search(db=db).buscar_conteudo_completo(query)
def deve_pesquisar(mensagem: str, historico: Optional[List[str]] = None) -> bool:
"""Helper: decide se deve pesquisar na web."""
return get_web_search().deve_buscar_na_web(mensagem, historico)
def extrair_pesquisa(mensagem: str) -> str:
"""Helper: extrai assunto de busca da mensagem."""
return get_web_search().extrair_assunto_busca(mensagem)
__all__ = [
"WebSearch",
"get_web_search",
"buscar_na_web",
"deve_pesquisar",
"extrair_pesquisa",
]