|
|
|
|
|
import time |
|
|
import re |
|
|
import requests |
|
|
from typing import List, Dict |
|
|
from loguru import logger |
|
|
from bs4 import BeautifulSoup |
|
|
import os |
|
|
import config |
|
|
|
|
|
class SimpleCache: |
|
|
def __init__(self, ttl: int = 900): |
|
|
self.ttl = ttl |
|
|
self._data = {} |
|
|
def get(self, key): |
|
|
if key in self._data and time.time() - self._data[key][1] < self.ttl: |
|
|
return self._data[key][0] |
|
|
return None |
|
|
def set(self, key, value): |
|
|
self._data[key] = (value, time.time()) |
|
|
|
|
|
class WebSearch: |
|
|
def __init__(self): |
|
|
self.cache = SimpleCache() |
|
|
self.session = requests.Session() |
|
|
self.session.headers.update({ |
|
|
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36" |
|
|
}) |
|
|
self.fontes_angola = [ |
|
|
"https://www.angop.ao/ultimas", |
|
|
"https://www.novojornal.co.ao/", |
|
|
"https://www.jornaldeangola.ao/", |
|
|
"https://www.verangola.net/va/noticias" |
|
|
] |
|
|
|
|
|
def _limpar(self, texto: str) -> str: |
|
|
return re.sub(r'\s+', ' ', texto).strip()[:200] |
|
|
|
|
|
def _scraping_angola(self) -> str: |
|
|
key = "noticias_angola" |
|
|
cached = self.cache.get(key) |
|
|
if cached: return cached |
|
|
|
|
|
noticias = [] |
|
|
for url in self.fontes_angola: |
|
|
try: |
|
|
r = self.session.get(url, timeout=8) |
|
|
if r.status_code != 200: continue |
|
|
soup = BeautifulSoup(r.text, 'html.parser') |
|
|
for item in soup.select('.titulo a, h3 a, .noticia-item a')[:3]: |
|
|
titulo = self._limpar(item.get_text()) |
|
|
if titulo and len(titulo) > 20: |
|
|
noticias.append(f"• {titulo}") |
|
|
except: continue |
|
|
|
|
|
if not noticias: |
|
|
result = "Sem notícias recentes de Angola." |
|
|
else: |
|
|
result = "NOTÍCIAS DE ANGOLA:\n" + "\n".join(noticias[:5]) |
|
|
|
|
|
self.cache.set(key, result) |
|
|
return result |
|
|
|
|
|
def _busca_geral(self, query: str) -> str: |
|
|
key = f"geral_{query.lower()}" |
|
|
cached = self.cache.get(key) |
|
|
if cached: return cached |
|
|
|
|
|
if not config.SERPER_API_KEY: |
|
|
return "Busca geral não configurada. Configure SERPER_API_KEY no HF Space Secrets." |
|
|
|
|
|
try: |
|
|
|
|
|
url = "https://google.serper.dev/search" |
|
|
payload = {"q": query} |
|
|
headers = {"X-API-KEY": config.SERPER_API_KEY} |
|
|
r = requests.post(url, json=payload, headers=headers, timeout=10) |
|
|
|
|
|
if r.status_code != 200: |
|
|
return "Erro na API de busca geral." |
|
|
|
|
|
data = r.json() |
|
|
results = [] |
|
|
for result in data.get('organic', [])[:5]: |
|
|
title = result.get('title', '')[:100] |
|
|
snippet = result.get('snippet', '')[:150] |
|
|
if title: |
|
|
results.append(f"• {title}: {snippet}") |
|
|
|
|
|
if not results: |
|
|
result = "Nada encontrado na busca geral." |
|
|
else: |
|
|
result = "INFORMAÇÕES:\n" + "\n".join(results) |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Erro Serper: {e}") |
|
|
result = "Erro na busca geral." |
|
|
|
|
|
self.cache.set(key, result) |
|
|
return result |
|
|
|
|
|
def pesquisar(self, mensagem: str) -> str: |
|
|
"""Akira decide sozinha se precisa pesquisar (sem palavras-chave no prompt)""" |
|
|
|
|
|
if any(w in mensagem.lower() for w in ["angola", "luanda", "notícia", "jornal", "governo", "presidente"]): |
|
|
return self._scraping_angola() |
|
|
|
|
|
if any(w in mensagem.lower() for w in ["quem é", "o que é", "quando", "onde", "como", "por que", "quanto", "qual"]): |
|
|
return self._busca_geral(mensagem) |
|
|
return "" |