borsa / data /news_sentiment.py
hfpush
fix: add ai/sentiment_model.py facade
c3c81a6
"""Lightweight news sentiment scorer for use during ML stock scanning.
This module:
1. Fetches recent news for a given stock symbol via Google News RSS
2. Runs sentiment analysis on each headline+snippet
3. Returns an aggregate sentiment score + details
Designed to be fast (< 3 seconds per stock) so it doesn't block the scanner.
Uses in-memory caching to avoid duplicate RSS fetches within the same scan run.
"""
from __future__ import annotations
import logging
import xml.etree.ElementTree as ET
from dataclasses import dataclass, field
from datetime import datetime, timedelta, timezone
from html import unescape
from typing import Any, Dict, List, Optional, Tuple
from urllib.parse import quote_plus
import re
import requests
logger = logging.getLogger(__name__)
# ── Türkçe kelime tabanlı sentiment analizi ─────────────────────
# VADER/TextBlob Türkçe'de zayıf çalışır. Finans-spesifik Türkçe sözlük kullanıyoruz.
POSITIVE_WORDS = {
'yükseliş', 'yükseldi', 'yükseliyor', 'yükselişe', 'yükselişte',
'arttı', 'artış', 'artıyor', 'artıda', 'artacak',
'kazanç', 'kar', 'kâr', 'kazandı', 'kazandırdı', 'kazanıyor', 'kazandıran',
'başarılı', 'başarı', 'olumlu', 'pozitif', 'güçlü', 'sağlam',
'iyileşti', 'iyileşiyor', 'iyileşme', 'toparlanma', 'toparlandı',
'büyüme', 'büyüyor', 'gelişim', 'avantaj', 'fırsat',
'iyimser', 'parlak', 'rekor', 'zirve', 'zirvede', 'tavan',
'yeşil', 'prim', 'alım', 'talep', 'al',
'değer', 'hedef', 'potansiyel', 'beklenti',
'temettü', 'dividend', 'bilanço', 'ciro',
'ihracat', 'net', 'brüt', 'marj',
'kırıyor', 'aşıyor', 'geçti', 'üzerinde', 'üstünde',
# İngilizce finans
'rally', 'surge', 'gain', 'profit', 'bullish', 'upgrade',
'outperform', 'beat', 'record', 'growth', 'boost', 'strong',
'rise', 'rises', 'soars', 'buy', 'jump', 'high',
}
NEGATIVE_WORDS = {
'düşüş', 'düştü', 'düşüyor', 'düşüşte', 'düşecek', 'düşme',
'azaldı', 'azalma', 'azalıyor', 'geriledi', 'gerileme', 'geriliyor',
'zarar', 'kayıp', 'kaybetti', 'kaybettirdi', 'kaybediyor', 'kaybeden', 'kaybettiren',
'başarısız', 'olumsuz', 'negatif', 'zayıf', 'kırılgan',
'kötüleşti', 'kötüleşiyor', 'kriz', 'risk', 'riskli',
'tehlike', 'tehlikeli', 'endişe', 'endişeli', 'karamsar',
'iskonto', 'dip', 'dipte', 'taban', 'kırmızı', 'ekside', 'eksiye',
'satış', 'baskı', 'darbe', 'çöküş', 'çöktü',
'iflas', 'borç', 'borçlanma', 'temerrüt',
'ceza', 'soruşturma', 'dava', 'yaptırım',
'enflasyon', 'devalüasyon', 'faiz',
'altında', 'altına', 'kırıldı', 'kırdı', 'aşağı', 'aşağıda',
'sat', 'satış', 'sattı',
'alçalan', 'düşen', 'gerileyen', 'zayıflayan',
# İngilizce finans
'crash', 'plunge', 'slump', 'drop', 'decline', 'loss',
'fail', 'bearish', 'downgrade', 'underperform', 'miss',
'warn', 'warning', 'threat', 'deficit', 'recession', 'sell',
}
INTENSIFIERS = {'çok', 'aşırı', 'son derece', 'oldukça', 'büyük', 'ciddi', 'önemli', 'resmen'}
NEGATORS = {'değil', 'yok', 'hayır', 'asla', 'hiç', 'hiçbir', 'olmadan'}
@dataclass
class NewsSentimentItem:
"""Single news item with its sentiment."""
title: str
source: str
published_at: str
url: str
sentiment_label: str # "positive" | "negative" | "neutral"
sentiment_score: float # -1 to 1
@dataclass
class StockNewsSentiment:
"""Aggregate sentiment result for a stock."""
symbol: str
news_count: int = 0
positive_count: int = 0
negative_count: int = 0
neutral_count: int = 0
avg_sentiment_score: float = 0.0 # -1 to 1
sentiment_label: str = "neutral" # overall label
signal: str = "TUT" # "AL" | "TUT" | "SAT"
news_items: List[NewsSentimentItem] = field(default_factory=list)
error: Optional[str] = None
# ── In-memory cache ──────────────────────────────────────────────
_RSS_CACHE: Dict[str, Tuple[float, str]] = {}
_CACHE_TTL_SEC = 600 # 10 minutes
# ── Core sentiment analysis ──────────────────────────────────────
# Turkish suffix list for stem matching
_TR_SUFFIXES = [
'ları', 'leri', 'ında', 'inde', 'ndan', 'nden', 'ıyla', 'iyle',
'dan', 'den', 'tan', 'ten', 'nda', 'nde', 'yla', 'yle',
'da', 'de', 'ta', 'te', 'la', 'le', 'na', 'ne',
'ın', 'in', 'un', 'ün', 'ı', 'i', 'u', 'ü',
]
def _match_word(word: str, dictionary: set) -> bool:
"""Match word against dictionary with Turkish suffix stripping."""
if word in dictionary:
return True
for sfx in _TR_SUFFIXES:
if word.endswith(sfx) and len(word) > len(sfx) + 2:
stem = word[:-len(sfx)]
if stem in dictionary:
return True
return False
def _analyze_text(text: str) -> Tuple[float, str]:
"""Analyze a Turkish/English financial text and return (score, label).
Score is between -1 and 1.
"""
if not text:
return 0.0, "neutral"
words = re.findall(r'\b\w+\b', text.lower())
if not words:
return 0.0, "neutral"
pos_count = 0
neg_count = 0
intensity = 1.0
negate = False
for i, word in enumerate(words):
if word in NEGATORS:
negate = True
continue
if word in INTENSIFIERS:
intensity = 1.5
continue
is_pos = _match_word(word, POSITIVE_WORDS)
is_neg = _match_word(word, NEGATIVE_WORDS)
if negate:
# Flip polarity
is_pos, is_neg = is_neg, is_pos
negate = False
if is_pos:
pos_count += intensity
elif is_neg:
neg_count += intensity
intensity = 1.0 # reset after use
total = pos_count + neg_count
if total == 0:
return 0.0, "neutral"
# Normalize score to [-1, 1]
raw_score = (pos_count - neg_count) / (pos_count + neg_count)
# Scale by coverage (what fraction of words are sentiment words)
coverage = min(total / len(words), 1.0)
score = raw_score * (0.5 + 0.5 * coverage)
score = max(-1.0, min(1.0, score))
if score >= 0.1:
label = "positive"
elif score <= -0.1:
label = "negative"
else:
label = "neutral"
return round(score, 3), label
# ── Google News RSS fetch ────────────────────────────────────────
_TAG_RE = re.compile(r'<[^>]+>')
def _strip_html(text: str) -> str:
if not text:
return ""
cleaned = _TAG_RE.sub(' ', text)
cleaned = unescape(cleaned)
cleaned = re.sub(r'\s+', ' ', cleaned).strip()
return cleaned
def _fetch_rss(query: str, timeout: int = 8) -> str:
"""Fetch Google News RSS XML, with caching."""
import time
now = time.time()
cache_key = query.strip().lower()
cached = _RSS_CACHE.get(cache_key)
if cached and (now - cached[0]) < _CACHE_TTL_SEC:
return cached[1]
url = (
f"https://news.google.com/rss/search?q={quote_plus(query)}"
f"&hl=tr-TR&gl=TR&ceid=TR:tr"
)
try:
resp = requests.get(
url,
timeout=timeout,
headers={
'User-Agent': 'borsa-scanner/1.0',
'Accept': 'application/rss+xml,application/xml,text/xml,*/*',
}
)
resp.raise_for_status()
xml_text = resp.text
_RSS_CACHE[cache_key] = (now, xml_text)
return xml_text
except Exception as e:
logger.warning(f"Google News RSS fetch failed for '{query}': {e}")
return ""
def _parse_rss_items(xml_text: str, symbol: Optional[str], limit: int) -> List[NewsSentimentItem]:
"""Parse RSS XML and return sentiment-analyzed news items."""
if not xml_text:
return []
try:
root = ET.fromstring(xml_text)
except ET.ParseError:
return []
channel = root.find("channel")
if channel is None:
return []
items: List[NewsSentimentItem] = []
for item_el in channel.findall("item"):
if len(items) >= limit:
break
title = (item_el.findtext("title") or "").strip()
link = (item_el.findtext("link") or "").strip()
pub_date = (item_el.findtext("pubDate") or "").strip()
desc_raw = (item_el.findtext("description") or "").strip()
content = _strip_html(desc_raw)
source_el = item_el.find("source")
source = (source_el.text or "").strip() if source_el is not None else "Google News"
if not title:
continue
# Parse date
published_at = ""
try:
dt = datetime.strptime(pub_date, "%a, %d %b %Y %H:%M:%S %Z")
published_at = dt.isoformat()
except Exception:
try:
dt = datetime.strptime(pub_date, "%a, %d %b %Y %H:%M:%S %z")
published_at = dt.isoformat()
except Exception:
published_at = pub_date
# Analyze sentiment on title + content
combined = f"{title} {content}".strip()
score, label = _analyze_text(combined)
items.append(NewsSentimentItem(
title=title,
source=source,
published_at=published_at,
url=link,
sentiment_label=label,
sentiment_score=score,
))
return items
# ── Public API ───────────────────────────────────────────────────
def get_stock_sentiment(symbol: str, max_news: int = 8) -> StockNewsSentiment:
"""Fetch news for a stock and compute aggregate sentiment.
This is the main entry point for the ML scanner integration.
Typically completes in 1-3 seconds per stock.
Args:
symbol: Stock symbol (e.g., "THYAO", "EREGL")
max_news: Maximum number of news items to fetch
Returns:
StockNewsSentiment with aggregate score, signal, and individual items
"""
sym = (symbol or "").strip().upper()
if not sym:
return StockNewsSentiment(symbol=sym, error="empty_symbol")
result = StockNewsSentiment(symbol=sym)
try:
# Fetch news via Google News RSS
query = f"{sym} hisse OR {sym} BIST OR {sym} borsa"
xml = _fetch_rss(query, timeout=8)
items = _parse_rss_items(xml, sym, max_news)
if not items:
# Try broader search
query2 = f"{sym} hisse senedi"
xml2 = _fetch_rss(query2, timeout=6)
items = _parse_rss_items(xml2, sym, max_news)
result.news_items = items
result.news_count = len(items)
if not items:
result.signal = "TUT"
return result
# Count labels
for item in items:
if item.sentiment_label == "positive":
result.positive_count += 1
elif item.sentiment_label == "negative":
result.negative_count += 1
else:
result.neutral_count += 1
# Average score
total_score = sum(it.sentiment_score for it in items)
result.avg_sentiment_score = round(total_score / len(items), 3)
# Overall label
if result.avg_sentiment_score >= 0.1:
result.sentiment_label = "positive"
elif result.avg_sentiment_score <= -0.1:
result.sentiment_label = "negative"
else:
result.sentiment_label = "neutral"
# Trading signal based on sentiment
if result.avg_sentiment_score >= 0.25 and result.positive_count > result.negative_count * 2:
result.signal = "AL"
elif result.avg_sentiment_score <= -0.25 and result.negative_count > result.positive_count * 2:
result.signal = "SAT"
else:
result.signal = "TUT"
except Exception as e:
logger.error(f"News sentiment error for {sym}: {e}")
result.error = str(e)
return result
def get_market_sentiment(max_news: int = 15) -> StockNewsSentiment:
"""Get overall market sentiment from general borsa news."""
result = StockNewsSentiment(symbol="MARKET")
try:
query = "borsa istanbul BIST endeks"
xml = _fetch_rss(query, timeout=10)
items = _parse_rss_items(xml, None, max_news)
result.news_items = items
result.news_count = len(items)
for item in items:
if item.sentiment_label == "positive":
result.positive_count += 1
elif item.sentiment_label == "negative":
result.negative_count += 1
else:
result.neutral_count += 1
if items:
result.avg_sentiment_score = round(
sum(it.sentiment_score for it in items) / len(items), 3
)
if result.avg_sentiment_score >= 0.1:
result.sentiment_label = "positive"
elif result.avg_sentiment_score <= -0.1:
result.sentiment_label = "negative"
else:
result.sentiment_label = "neutral"
except Exception as e:
logger.error(f"Market sentiment error: {e}")
result.error = str(e)
return result