import os import requests from loguru import logger from typing import List, Dict, Any class TwitterAPI: """ Integração simples com Twitter API v2 para busca de 'tretas' e 'mitadas'. """ def __init__(self, bearer_token: str = None): self.bearer_token = bearer_token or os.getenv("TWITTER_BEARER_TOKEN") self.base_url = "https://api.twitter.com/2" def search_tweets(self, query: str, max_results: int = 10) -> List[Dict[str, Any]]: """ Busca tweets recentes com base em uma query. """ if not self.bearer_token: logger.warning("⚠️ TWITTER_BEARER_TOKEN não configurado.") return [] headers = { "Authorization": f"Bearer {self.bearer_token}", "User-Agent": "v2RecentSearchPython" } params = { "query": f"{query} lang:pt -is:retweet", "max_results": max_results, "tweet.fields": "text,public_metrics,created_at" } try: response = requests.get(f"{self.base_url}/tweets/search/recent", headers=headers, params=params) if response.status_code == 200: data = response.json() return data.get("data", []) else: logger.error(f"❌ Erro Twitter API ({response.status_code}): {response.text}") return [] except Exception as e: logger.error(f"❌ Falha ao buscar tweets: {e}") return [] def get_savage_context(self, topic: str) -> str: """ Busca exemplos de 'mitadas' ou discussões acaloradas sobre um tema. """ queries = [ f"{topic} mita", f"{topic} treta", f"{topic} cancelado", f"{topic} 'na cara'", f"{topic} 'jantou'" ] all_tweets = [] for q in queries[:2]: # Tenta as 2 primeiras queries para economizar cota tweets = self.search_tweets(q, max_results=10) all_tweets.extend(tweets) if len(all_tweets) >= 10: break if not all_tweets: return "Nenhuma 'treta' recente encontrada no Twitter sobre este assunto." context = "Exemplos de discussões/mitadas no Twitter sobre este assunto:\n" for i, tweet in enumerate(all_tweets[:10]): text = tweet['text'].replace('\n', ' ') context += f"{i+1}. {text}\n" return context # Singleton _instance = None def get_twitter_api(): global _instance if _instance is None: _instance = TwitterAPI() return _instance