| """ |
| Unified News Caching System |
| Centralized cache manager for Twitter, Reddit, RSS, and AI/Tech news feeds |
| """ |
|
|
| import hashlib |
| import logging |
| import re |
| from datetime import datetime, timedelta |
| from typing import List, Dict, Optional, Callable |
|
|
| import pandas as pd |
|
|
| logger = logging.getLogger(__name__) |
|
|
|
|
| class NewsCacheManager: |
| """ |
| Centralized cache manager for news feeds with: |
| - Per-source caching with TTL |
| - Cross-service deduplication |
| - Filtered results caching |
| - Force refresh support |
| """ |
|
|
| def __init__(self, default_ttl: int = 180): |
| """ |
| Initialize cache manager |
| |
| Args: |
| default_ttl: Default time-to-live in seconds (default: 180 = 3 minutes) |
| """ |
| self.cache = { |
| 'twitter': {'raw_news': [], 'last_fetch': None, 'ttl': default_ttl}, |
| 'reddit': {'raw_news': [], 'last_fetch': None, 'ttl': default_ttl}, |
| 'rss': {'raw_news': [], 'last_fetch': None, 'ttl': default_ttl}, |
| 'ai_tech': {'raw_news': [], 'last_fetch': None, 'ttl': default_ttl}, |
| 'predictions': {'raw_news': [], 'last_fetch': None, 'ttl': default_ttl}, |
| 'sectoral_news': {'raw_news': [], 'last_fetch': None, 'ttl': default_ttl}, |
| 'market_events': {'raw_news': [], 'last_fetch': None, 'ttl': default_ttl}, |
| 'economic_calendar': {'raw_news': [], 'last_fetch': None, 'ttl': default_ttl}, |
| 'dedup_index': {}, |
| 'filtered_cache': {} |
| } |
| logger.info(f"NewsCacheManager initialized with {default_ttl}s TTL") |
|
|
| def get_news( |
| self, |
| source: str, |
| fetcher_func: Callable, |
| force_refresh: bool = False, |
| deduplicate: bool = False, |
| **kwargs |
| ) -> List[Dict]: |
| """ |
| Get news from cache or fetch fresh if needed |
| |
| Args: |
| source: News source ('twitter', 'reddit', 'rss', 'ai_tech') |
| fetcher_func: Function to fetch fresh news |
| force_refresh: If True, bypass cache and fetch fresh |
| deduplicate: If True, remove duplicates across sources using global index |
| **kwargs: Arguments to pass to fetcher_func |
| |
| Returns: |
| List of news items |
| """ |
| if source not in ['twitter', 'reddit', 'rss', 'ai_tech', 'predictions', 'sectoral_news', 'market_events', 'economic_calendar']: |
| logger.error(f"Invalid source: {source}") |
| return [] |
|
|
| |
| if force_refresh: |
| self._clear_source_from_dedup(source) |
|
|
| |
| if not force_refresh and self._is_cache_valid(source): |
| logger.info(f"β
Cache HIT for {source} (age: {self._get_cache_age(source):.1f}s)") |
| return self.cache[source]['raw_news'] |
|
|
| |
| logger.info(f"π Cache MISS for {source} - fetching fresh news...") |
| try: |
| logger.info(f"π Calling fetcher for {source} with kwargs: {kwargs}") |
| new_items = fetcher_func(**kwargs) |
| logger.info(f"π¦ Fetcher returned {len(new_items) if new_items else 0} items for {source}") |
|
|
| if not new_items: |
| logger.warning(f"β οΈ No news items fetched for {source} - returning cached data") |
| |
| return self.cache[source]['raw_news'] |
|
|
| self._prepare_summaries(new_items) |
|
|
| |
| self._update_cache(source, new_items) |
|
|
| if deduplicate: |
| deduplicated = self._deduplicate(new_items, source) |
| logger.info(f"β
Fetched {len(new_items)} items for {source}, {len(deduplicated)} unique after dedup") |
| return deduplicated |
|
|
| logger.info(f"β
Fetched {len(new_items)} items for {source} (dedup disabled)") |
| return new_items |
|
|
| except Exception as e: |
| logger.error(f"Error fetching news for {source}: {e}") |
| |
| return self.cache[source]['raw_news'] |
|
|
| def _is_cache_valid(self, source: str) -> bool: |
| """ |
| Check if cached data is still fresh |
| |
| Args: |
| source: News source to check |
| |
| Returns: |
| True if cache is valid, False otherwise |
| """ |
| source_cache = self.cache[source] |
| if not source_cache['last_fetch']: |
| return False |
|
|
| age = (datetime.now() - source_cache['last_fetch']).total_seconds() |
| is_valid = age < source_cache['ttl'] |
|
|
| return is_valid |
|
|
| def _get_cache_age(self, source: str) -> float: |
| """ |
| Get age of cached data in seconds |
| |
| Args: |
| source: News source |
| |
| Returns: |
| Age in seconds, or -1 if never fetched |
| """ |
| source_cache = self.cache[source] |
| if not source_cache['last_fetch']: |
| return -1 |
|
|
| return (datetime.now() - source_cache['last_fetch']).total_seconds() |
|
|
| def _normalize_text(self, text: str) -> str: |
| """ |
| Normalize text for deduplication |
| |
| Args: |
| text: Text to normalize |
| |
| Returns: |
| Normalized text |
| """ |
| if not text: |
| return "" |
|
|
| |
| text = text.lower().strip() |
|
|
| |
| text = re.sub(r'[^\w\s]', '', text) |
|
|
| |
| text = re.sub(r'\s+', ' ', text) |
|
|
| return text |
|
|
| def _compute_hash(self, item: Dict) -> str: |
| """ |
| Compute content hash for deduplication |
| |
| Args: |
| item: News item dict |
| |
| Returns: |
| MD5 hash string |
| """ |
| title = self._normalize_text(item.get('title', '')) |
| summary_source = item.get('summary_raw', item.get('summary', '')) |
| summary = self._normalize_text(str(summary_source)[:200]) |
|
|
| |
| combined = f"{title}|{summary}" |
|
|
| return hashlib.md5(combined.encode()).hexdigest() |
|
|
| def _deduplicate(self, items: List[Dict], source: str) -> List[Dict]: |
| """ |
| Remove duplicates using global dedup index |
| |
| Args: |
| items: List of news items |
| source: Source name |
| |
| Returns: |
| Deduplicated list of items |
| """ |
| deduplicated = [] |
| duplicate_count = 0 |
|
|
| for item in items: |
| content_hash = self._compute_hash(item) |
|
|
| if content_hash in self.cache['dedup_index']: |
| |
| dup_entry = self.cache['dedup_index'][content_hash] |
| if source not in dup_entry['sources']: |
| dup_entry['sources'].append(source) |
| duplicate_count += 1 |
| else: |
| |
| self.cache['dedup_index'][content_hash] = { |
| 'first_seen': datetime.now(), |
| 'sources': [source], |
| 'canonical_item': item |
| } |
| deduplicated.append(item) |
|
|
| if duplicate_count > 0: |
| logger.info(f"π Deduplication: Found {duplicate_count} duplicates for {source}") |
|
|
| return deduplicated |
|
|
| def _update_cache(self, source: str, items: List[Dict]): |
| """ |
| Update cache with new items |
| |
| Args: |
| source: News source |
| items: List of news items |
| """ |
| self.cache[source]['raw_news'] = items |
| self.cache[source]['last_fetch'] = datetime.now() |
| logger.info(f"π¦ Updated cache for {source} with {len(items)} items") |
|
|
| def _prepare_summaries(self, items: List[Dict]): |
| for item in items: |
| if 'summary_raw' not in item: |
| item['summary_raw'] = item.get('summary', '') |
|
|
| def get_filtered_news( |
| self, |
| source_df: pd.DataFrame, |
| filters: Dict, |
| source_name: str = "unknown" |
| ) -> pd.DataFrame: |
| """ |
| Get filtered news with caching |
| |
| Args: |
| source_df: Source dataframe |
| filters: Filter dict with 'category', 'sentiment', 'impact' keys |
| source_name: Name of source (for logging) |
| |
| Returns: |
| Filtered dataframe |
| """ |
| if source_df.empty: |
| return source_df |
|
|
| |
| category = filters.get('category', 'all') |
| sentiment = filters.get('sentiment', 'all') |
| impact = filters.get('impact', 'all') |
| cache_key = f"{source_name}_{category}_{sentiment}_{impact}" |
|
|
| |
| if cache_key in self.cache['filtered_cache']: |
| cached_entry = self.cache['filtered_cache'][cache_key] |
| if datetime.now() < cached_entry['expires_at']: |
| logger.debug(f"β
Filtered cache HIT for {cache_key}") |
| return cached_entry['results'] |
|
|
| |
| filtered_df = source_df.copy() |
|
|
| if category != 'all': |
| filtered_df = filtered_df[filtered_df['category'] == category] |
|
|
| if sentiment != 'all': |
| filtered_df = filtered_df[filtered_df['sentiment'] == sentiment] |
|
|
| if impact != 'all': |
| filtered_df = filtered_df[filtered_df['impact'] == impact] |
|
|
| logger.debug(f"π Filtered {source_name}: {len(source_df)} β {len(filtered_df)} items") |
|
|
| |
| self.cache['filtered_cache'][cache_key] = { |
| 'results': filtered_df, |
| 'expires_at': datetime.now() + timedelta(seconds=300) |
| } |
|
|
| return filtered_df |
|
|
| def _clear_source_from_dedup(self, source: str): |
| """ |
| Remove all entries from dedup index that only belong to this source |
| |
| Args: |
| source: Source to remove from dedup index |
| """ |
| to_remove = [] |
| for content_hash, entry in self.cache['dedup_index'].items(): |
| |
| if source in entry['sources']: |
| entry['sources'].remove(source) |
| |
| if not entry['sources']: |
| to_remove.append(content_hash) |
|
|
| |
| for content_hash in to_remove: |
| del self.cache['dedup_index'][content_hash] |
|
|
| if to_remove: |
| logger.info(f"ποΈ Removed {len(to_remove)} entries from dedup index for {source}") |
|
|
| def clear_cache(self, source: Optional[str] = None): |
| """ |
| Clear cache for specific source or all sources |
| |
| Args: |
| source: Source to clear, or None to clear all |
| """ |
| if source: |
| self.cache[source] = {'raw_news': [], 'last_fetch': None, 'ttl': 180} |
| self._clear_source_from_dedup(source) |
| logger.info(f"ποΈ Cleared cache for {source}") |
| else: |
| for src in ['twitter', 'reddit', 'rss', 'ai_tech', 'predictions', 'sectoral_news', 'market_events', 'economic_calendar']: |
| self.cache[src] = {'raw_news': [], 'last_fetch': None, 'ttl': 180} |
| self.cache['dedup_index'] = {} |
| self.cache['filtered_cache'] = {} |
| logger.info("ποΈ Cleared ALL caches") |
|
|
| def get_statistics(self) -> Dict: |
| """ |
| Get cache statistics |
| |
| Returns: |
| Dict with cache stats |
| """ |
| stats = { |
| 'twitter': { |
| 'items': len(self.cache['twitter']['raw_news']), |
| 'age_seconds': self._get_cache_age('twitter'), |
| 'is_valid': self._is_cache_valid('twitter') |
| }, |
| 'reddit': { |
| 'items': len(self.cache['reddit']['raw_news']), |
| 'age_seconds': self._get_cache_age('reddit'), |
| 'is_valid': self._is_cache_valid('reddit') |
| }, |
| 'rss': { |
| 'items': len(self.cache['rss']['raw_news']), |
| 'age_seconds': self._get_cache_age('rss'), |
| 'is_valid': self._is_cache_valid('rss') |
| }, |
| 'ai_tech': { |
| 'items': len(self.cache['ai_tech']['raw_news']), |
| 'age_seconds': self._get_cache_age('ai_tech'), |
| 'is_valid': self._is_cache_valid('ai_tech') |
| }, |
| 'predictions': { |
| 'items': len(self.cache['predictions']['raw_news']), |
| 'age_seconds': self._get_cache_age('predictions'), |
| 'is_valid': self._is_cache_valid('predictions') |
| }, |
| 'sectoral_news': { |
| 'items': len(self.cache['sectoral_news']['raw_news']), |
| 'age_seconds': self._get_cache_age('sectoral_news'), |
| 'is_valid': self._is_cache_valid('sectoral_news') |
| }, |
| 'market_events': { |
| 'items': len(self.cache['market_events']['raw_news']), |
| 'age_seconds': self._get_cache_age('market_events'), |
| 'is_valid': self._is_cache_valid('market_events') |
| }, |
| 'economic_calendar': { |
| 'items': len(self.cache['economic_calendar']['raw_news']), |
| 'age_seconds': self._get_cache_age('economic_calendar'), |
| 'is_valid': self._is_cache_valid('economic_calendar') |
| }, |
| 'dedup_index_size': len(self.cache['dedup_index']), |
| 'filtered_cache_size': len(self.cache['filtered_cache']) |
| } |
|
|
| return stats |
|
|