| """Lightweight news sentiment scorer for use during ML stock scanning. |
| |
| This module: |
| 1. Fetches recent news for a given stock symbol via Google News RSS |
| 2. Runs sentiment analysis on each headline+snippet |
| 3. Returns an aggregate sentiment score + details |
| |
| Designed to be fast (< 3 seconds per stock) so it doesn't block the scanner. |
| Uses in-memory caching to avoid duplicate RSS fetches within the same scan run. |
| """ |
|
|
| from __future__ import annotations |
|
|
| import logging |
| import xml.etree.ElementTree as ET |
| from dataclasses import dataclass, field |
| from datetime import datetime, timedelta, timezone |
| from html import unescape |
| from typing import Any, Dict, List, Optional, Tuple |
| from urllib.parse import quote_plus |
| import re |
|
|
| import requests |
|
|
| logger = logging.getLogger(__name__) |
|
|
| |
| |
| POSITIVE_WORDS = { |
| 'yükseliş', 'yükseldi', 'yükseliyor', 'yükselişe', 'yükselişte', |
| 'arttı', 'artış', 'artıyor', 'artıda', 'artacak', |
| 'kazanç', 'kar', 'kâr', 'kazandı', 'kazandırdı', 'kazanıyor', 'kazandıran', |
| 'başarılı', 'başarı', 'olumlu', 'pozitif', 'güçlü', 'sağlam', |
| 'iyileşti', 'iyileşiyor', 'iyileşme', 'toparlanma', 'toparlandı', |
| 'büyüme', 'büyüyor', 'gelişim', 'avantaj', 'fırsat', |
| 'iyimser', 'parlak', 'rekor', 'zirve', 'zirvede', 'tavan', |
| 'yeşil', 'prim', 'alım', 'talep', 'al', |
| 'değer', 'hedef', 'potansiyel', 'beklenti', |
| 'temettü', 'dividend', 'bilanço', 'ciro', |
| 'ihracat', 'net', 'brüt', 'marj', |
| 'kırıyor', 'aşıyor', 'geçti', 'üzerinde', 'üstünde', |
| |
| 'rally', 'surge', 'gain', 'profit', 'bullish', 'upgrade', |
| 'outperform', 'beat', 'record', 'growth', 'boost', 'strong', |
| 'rise', 'rises', 'soars', 'buy', 'jump', 'high', |
| } |
|
|
| NEGATIVE_WORDS = { |
| 'düşüş', 'düştü', 'düşüyor', 'düşüşte', 'düşecek', 'düşme', |
| 'azaldı', 'azalma', 'azalıyor', 'geriledi', 'gerileme', 'geriliyor', |
| 'zarar', 'kayıp', 'kaybetti', 'kaybettirdi', 'kaybediyor', 'kaybeden', 'kaybettiren', |
| 'başarısız', 'olumsuz', 'negatif', 'zayıf', 'kırılgan', |
| 'kötüleşti', 'kötüleşiyor', 'kriz', 'risk', 'riskli', |
| 'tehlike', 'tehlikeli', 'endişe', 'endişeli', 'karamsar', |
| 'iskonto', 'dip', 'dipte', 'taban', 'kırmızı', 'ekside', 'eksiye', |
| 'satış', 'baskı', 'darbe', 'çöküş', 'çöktü', |
| 'iflas', 'borç', 'borçlanma', 'temerrüt', |
| 'ceza', 'soruşturma', 'dava', 'yaptırım', |
| 'enflasyon', 'devalüasyon', 'faiz', |
| 'altında', 'altına', 'kırıldı', 'kırdı', 'aşağı', 'aşağıda', |
| 'sat', 'satış', 'sattı', |
| 'alçalan', 'düşen', 'gerileyen', 'zayıflayan', |
| |
| 'crash', 'plunge', 'slump', 'drop', 'decline', 'loss', |
| 'fail', 'bearish', 'downgrade', 'underperform', 'miss', |
| 'warn', 'warning', 'threat', 'deficit', 'recession', 'sell', |
| } |
|
|
| INTENSIFIERS = {'çok', 'aşırı', 'son derece', 'oldukça', 'büyük', 'ciddi', 'önemli', 'resmen'} |
| NEGATORS = {'değil', 'yok', 'hayır', 'asla', 'hiç', 'hiçbir', 'olmadan'} |
|
|
|
|
| @dataclass |
| class NewsSentimentItem: |
| """Single news item with its sentiment.""" |
| title: str |
| source: str |
| published_at: str |
| url: str |
| sentiment_label: str |
| sentiment_score: float |
|
|
|
|
| @dataclass |
| class StockNewsSentiment: |
| """Aggregate sentiment result for a stock.""" |
| symbol: str |
| news_count: int = 0 |
| positive_count: int = 0 |
| negative_count: int = 0 |
| neutral_count: int = 0 |
| avg_sentiment_score: float = 0.0 |
| sentiment_label: str = "neutral" |
| signal: str = "TUT" |
| news_items: List[NewsSentimentItem] = field(default_factory=list) |
| error: Optional[str] = None |
|
|
|
|
| |
| _RSS_CACHE: Dict[str, Tuple[float, str]] = {} |
| _CACHE_TTL_SEC = 600 |
|
|
|
|
| |
| |
| _TR_SUFFIXES = [ |
| 'ları', 'leri', 'ında', 'inde', 'ndan', 'nden', 'ıyla', 'iyle', |
| 'dan', 'den', 'tan', 'ten', 'nda', 'nde', 'yla', 'yle', |
| 'da', 'de', 'ta', 'te', 'la', 'le', 'na', 'ne', |
| 'ın', 'in', 'un', 'ün', 'ı', 'i', 'u', 'ü', |
| ] |
|
|
|
|
| def _match_word(word: str, dictionary: set) -> bool: |
| """Match word against dictionary with Turkish suffix stripping.""" |
| if word in dictionary: |
| return True |
| for sfx in _TR_SUFFIXES: |
| if word.endswith(sfx) and len(word) > len(sfx) + 2: |
| stem = word[:-len(sfx)] |
| if stem in dictionary: |
| return True |
| return False |
|
|
|
|
| def _analyze_text(text: str) -> Tuple[float, str]: |
| """Analyze a Turkish/English financial text and return (score, label). |
| |
| Score is between -1 and 1. |
| """ |
| if not text: |
| return 0.0, "neutral" |
|
|
| words = re.findall(r'\b\w+\b', text.lower()) |
| if not words: |
| return 0.0, "neutral" |
|
|
| pos_count = 0 |
| neg_count = 0 |
| intensity = 1.0 |
| negate = False |
|
|
| for i, word in enumerate(words): |
| if word in NEGATORS: |
| negate = True |
| continue |
| if word in INTENSIFIERS: |
| intensity = 1.5 |
| continue |
|
|
| is_pos = _match_word(word, POSITIVE_WORDS) |
| is_neg = _match_word(word, NEGATIVE_WORDS) |
|
|
| if negate: |
| |
| is_pos, is_neg = is_neg, is_pos |
| negate = False |
|
|
| if is_pos: |
| pos_count += intensity |
| elif is_neg: |
| neg_count += intensity |
|
|
| intensity = 1.0 |
|
|
| total = pos_count + neg_count |
| if total == 0: |
| return 0.0, "neutral" |
|
|
| |
| raw_score = (pos_count - neg_count) / (pos_count + neg_count) |
|
|
| |
| coverage = min(total / len(words), 1.0) |
| score = raw_score * (0.5 + 0.5 * coverage) |
| score = max(-1.0, min(1.0, score)) |
|
|
| if score >= 0.1: |
| label = "positive" |
| elif score <= -0.1: |
| label = "negative" |
| else: |
| label = "neutral" |
|
|
| return round(score, 3), label |
|
|
|
|
| |
| _TAG_RE = re.compile(r'<[^>]+>') |
|
|
|
|
| def _strip_html(text: str) -> str: |
| if not text: |
| return "" |
| cleaned = _TAG_RE.sub(' ', text) |
| cleaned = unescape(cleaned) |
| cleaned = re.sub(r'\s+', ' ', cleaned).strip() |
| return cleaned |
|
|
|
|
| def _fetch_rss(query: str, timeout: int = 8) -> str: |
| """Fetch Google News RSS XML, with caching.""" |
| import time |
| now = time.time() |
|
|
| cache_key = query.strip().lower() |
| cached = _RSS_CACHE.get(cache_key) |
| if cached and (now - cached[0]) < _CACHE_TTL_SEC: |
| return cached[1] |
|
|
| url = ( |
| f"https://news.google.com/rss/search?q={quote_plus(query)}" |
| f"&hl=tr-TR&gl=TR&ceid=TR:tr" |
| ) |
|
|
| try: |
| resp = requests.get( |
| url, |
| timeout=timeout, |
| headers={ |
| 'User-Agent': 'borsa-scanner/1.0', |
| 'Accept': 'application/rss+xml,application/xml,text/xml,*/*', |
| } |
| ) |
| resp.raise_for_status() |
| xml_text = resp.text |
| _RSS_CACHE[cache_key] = (now, xml_text) |
| return xml_text |
| except Exception as e: |
| logger.warning(f"Google News RSS fetch failed for '{query}': {e}") |
| return "" |
|
|
|
|
| def _parse_rss_items(xml_text: str, symbol: Optional[str], limit: int) -> List[NewsSentimentItem]: |
| """Parse RSS XML and return sentiment-analyzed news items.""" |
| if not xml_text: |
| return [] |
|
|
| try: |
| root = ET.fromstring(xml_text) |
| except ET.ParseError: |
| return [] |
|
|
| channel = root.find("channel") |
| if channel is None: |
| return [] |
|
|
| items: List[NewsSentimentItem] = [] |
|
|
| for item_el in channel.findall("item"): |
| if len(items) >= limit: |
| break |
|
|
| title = (item_el.findtext("title") or "").strip() |
| link = (item_el.findtext("link") or "").strip() |
| pub_date = (item_el.findtext("pubDate") or "").strip() |
| desc_raw = (item_el.findtext("description") or "").strip() |
| content = _strip_html(desc_raw) |
|
|
| source_el = item_el.find("source") |
| source = (source_el.text or "").strip() if source_el is not None else "Google News" |
|
|
| if not title: |
| continue |
|
|
| |
| published_at = "" |
| try: |
| dt = datetime.strptime(pub_date, "%a, %d %b %Y %H:%M:%S %Z") |
| published_at = dt.isoformat() |
| except Exception: |
| try: |
| dt = datetime.strptime(pub_date, "%a, %d %b %Y %H:%M:%S %z") |
| published_at = dt.isoformat() |
| except Exception: |
| published_at = pub_date |
|
|
| |
| combined = f"{title} {content}".strip() |
| score, label = _analyze_text(combined) |
|
|
| items.append(NewsSentimentItem( |
| title=title, |
| source=source, |
| published_at=published_at, |
| url=link, |
| sentiment_label=label, |
| sentiment_score=score, |
| )) |
|
|
| return items |
|
|
|
|
| |
| def get_stock_sentiment(symbol: str, max_news: int = 8) -> StockNewsSentiment: |
| """Fetch news for a stock and compute aggregate sentiment. |
| |
| This is the main entry point for the ML scanner integration. |
| Typically completes in 1-3 seconds per stock. |
| |
| Args: |
| symbol: Stock symbol (e.g., "THYAO", "EREGL") |
| max_news: Maximum number of news items to fetch |
| |
| Returns: |
| StockNewsSentiment with aggregate score, signal, and individual items |
| """ |
| sym = (symbol or "").strip().upper() |
| if not sym: |
| return StockNewsSentiment(symbol=sym, error="empty_symbol") |
|
|
| result = StockNewsSentiment(symbol=sym) |
|
|
| try: |
| |
| query = f"{sym} hisse OR {sym} BIST OR {sym} borsa" |
| xml = _fetch_rss(query, timeout=8) |
| items = _parse_rss_items(xml, sym, max_news) |
|
|
| if not items: |
| |
| query2 = f"{sym} hisse senedi" |
| xml2 = _fetch_rss(query2, timeout=6) |
| items = _parse_rss_items(xml2, sym, max_news) |
|
|
| result.news_items = items |
| result.news_count = len(items) |
|
|
| if not items: |
| result.signal = "TUT" |
| return result |
|
|
| |
| for item in items: |
| if item.sentiment_label == "positive": |
| result.positive_count += 1 |
| elif item.sentiment_label == "negative": |
| result.negative_count += 1 |
| else: |
| result.neutral_count += 1 |
|
|
| |
| total_score = sum(it.sentiment_score for it in items) |
| result.avg_sentiment_score = round(total_score / len(items), 3) |
|
|
| |
| if result.avg_sentiment_score >= 0.1: |
| result.sentiment_label = "positive" |
| elif result.avg_sentiment_score <= -0.1: |
| result.sentiment_label = "negative" |
| else: |
| result.sentiment_label = "neutral" |
|
|
| |
| if result.avg_sentiment_score >= 0.25 and result.positive_count > result.negative_count * 2: |
| result.signal = "AL" |
| elif result.avg_sentiment_score <= -0.25 and result.negative_count > result.positive_count * 2: |
| result.signal = "SAT" |
| else: |
| result.signal = "TUT" |
|
|
| except Exception as e: |
| logger.error(f"News sentiment error for {sym}: {e}") |
| result.error = str(e) |
|
|
| return result |
|
|
|
|
| def get_market_sentiment(max_news: int = 15) -> StockNewsSentiment: |
| """Get overall market sentiment from general borsa news.""" |
| result = StockNewsSentiment(symbol="MARKET") |
|
|
| try: |
| query = "borsa istanbul BIST endeks" |
| xml = _fetch_rss(query, timeout=10) |
| items = _parse_rss_items(xml, None, max_news) |
|
|
| result.news_items = items |
| result.news_count = len(items) |
|
|
| for item in items: |
| if item.sentiment_label == "positive": |
| result.positive_count += 1 |
| elif item.sentiment_label == "negative": |
| result.negative_count += 1 |
| else: |
| result.neutral_count += 1 |
|
|
| if items: |
| result.avg_sentiment_score = round( |
| sum(it.sentiment_score for it in items) / len(items), 3 |
| ) |
|
|
| if result.avg_sentiment_score >= 0.1: |
| result.sentiment_label = "positive" |
| elif result.avg_sentiment_score <= -0.1: |
| result.sentiment_label = "negative" |
| else: |
| result.sentiment_label = "neutral" |
|
|
| except Exception as e: |
| logger.error(f"Market sentiment error: {e}") |
| result.error = str(e) |
|
|
| return result |
|
|