AlphaV15-Quant-Engine / sentiment_engine.py
Nexo-S's picture
Update sentiment_engine.py
e76b09c verified
from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer
import aiohttp
import asyncio
import socket
# Initialisation de l'analyseur de texte
analyzer = SentimentIntensityAnalyzer()
# --- OPTIMISATION DU LEXIQUE CRYPTO ---
crypto_lexicon = {
'moon': 4.0, 'bullish': 3.5, 'long': 2.0, 'pump': 3.0, 'breakout': 2.5,
'dump': -4.0, 'bearish': -3.5, 'short': -2.0, 'rug': -5.0, 'scam': -4.5,
'crash': -4.0, 'rekt': -3.5, 'dip': -1.0, 'fud': -2.5, 'halt': -2.0
}
analyzer.lexicon.update(crypto_lexicon)
async def fetch_cryptopanic(session, coin):
API_KEY = "6388e0c06c9ea848afee62ffe6a3dc9f1022e7ad"
# 🛠️ FIX : On enlève "filter=important" pour avoir un gros volume de news, on met "kind=news"
url = f"https://cryptopanic.com/api/v1/posts/?auth_token={API_KEY}&currencies={coin}&kind=news"
try:
async with session.get(url, timeout=5) as resp:
if resp.status == 200:
data = await resp.json()
results = data.get('results', [])
if results:
return " ".join([str(post.get('title', '')) for post in results[:15]])
return ""
except Exception as e:
print(f"⚠️ Erreur CryptoPanic : {e}")
return ""
async def fetch_fear_and_greed(session):
"""🧠 NOUVEAU : Récupère l'humeur globale du marché Crypto (0 à 100)"""
url = "https://api.alternative.me/fng/"
try:
async with session.get(url, timeout=5) as resp:
if resp.status == 200:
data = await resp.json()
value = int(data['data'][0]['value'])
return value / 100.0 # Convertit par ex "75" (Greed) en 0.75
except Exception as e:
print(f"⚠️ Erreur Fear&Greed : {e}")
return 0.5 # Neutre en cas de panne mondiale
async def get_crypto_sentiment(symbol):
"""Fusionne CryptoPanic et le Fear & Greed Index"""
try:
base_coin = symbol.split('/')[0]
# 🛡️ FIX DNS : Protection anti-crash Cloud (comme dans app.py)
connector = aiohttp.TCPConnector(family=socket.AF_INET)
async with aiohttp.ClientSession(connector=connector) as session:
# Lancement des deux scans en parallèle
news_text, fng_score = await asyncio.gather(
fetch_cryptopanic(session, base_coin),
fetch_fear_and_greed(session)
)
# Si aucune news n'est trouvée pour cette crypto spécifique
if not news_text.strip():
# On ne renvoie plus 50% ! On renvoie l'humeur générale du marché (Fear & Greed)
return round(fng_score, 3)
# Si on a des news, on les analyse avec VADER
vs = analyzer.polarity_scores(news_text)
news_score = (vs['compound'] + 1) / 2
# ⚖️ PONDÉRATION : 60% News spécifiques / 40% Tendance Macro
final_score = (news_score * 0.6) + (fng_score * 0.4)
return round(final_score, 3)
except Exception as e:
print(f"⚠️ Erreur Sentiment Globale: {e}")
return 0.5