|
|
""" |
|
|
Unified News Caching System |
|
|
Centralized cache manager for Twitter, Reddit, RSS, and AI/Tech news feeds |
|
|
""" |
|
|
|
|
|
import hashlib |
|
|
import logging |
|
|
import re |
|
|
import pandas as pd |
|
|
from datetime import datetime, timedelta |
|
|
from typing import List, Dict, Optional, Callable |
|
|
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
|
|
|
class NewsCacheManager: |
|
|
""" |
|
|
Centralized cache manager for news feeds with: |
|
|
- Per-source caching with TTL |
|
|
- Cross-service deduplication |
|
|
- Filtered results caching |
|
|
- Force refresh support |
|
|
""" |
|
|
|
|
|
def __init__(self, default_ttl: int = 180): |
|
|
""" |
|
|
Initialize cache manager |
|
|
|
|
|
Args: |
|
|
default_ttl: Default time-to-live in seconds (default: 180 = 3 minutes) |
|
|
""" |
|
|
self.cache = { |
|
|
'twitter': {'raw_news': [], 'last_fetch': None, 'ttl': default_ttl}, |
|
|
'reddit': {'raw_news': [], 'last_fetch': None, 'ttl': default_ttl}, |
|
|
'rss': {'raw_news': [], 'last_fetch': None, 'ttl': default_ttl}, |
|
|
'ai_tech': {'raw_news': [], 'last_fetch': None, 'ttl': default_ttl}, |
|
|
'predictions': {'raw_news': [], 'last_fetch': None, 'ttl': default_ttl}, |
|
|
'sectoral_news': {'raw_news': [], 'last_fetch': None, 'ttl': default_ttl}, |
|
|
'market_events': {'raw_news': [], 'last_fetch': None, 'ttl': default_ttl}, |
|
|
'economic_calendar': {'raw_news': [], 'last_fetch': None, 'ttl': default_ttl}, |
|
|
'dedup_index': {}, |
|
|
'filtered_cache': {} |
|
|
} |
|
|
logger.info(f"NewsCacheManager initialized with {default_ttl}s TTL") |
|
|
|
|
|
def get_news( |
|
|
self, |
|
|
source: str, |
|
|
fetcher_func: Callable, |
|
|
force_refresh: bool = False, |
|
|
**kwargs |
|
|
) -> List[Dict]: |
|
|
""" |
|
|
Get news from cache or fetch fresh if needed |
|
|
|
|
|
Args: |
|
|
source: News source ('twitter', 'reddit', 'rss', 'ai_tech') |
|
|
fetcher_func: Function to fetch fresh news |
|
|
force_refresh: If True, bypass cache and fetch fresh |
|
|
**kwargs: Arguments to pass to fetcher_func |
|
|
|
|
|
Returns: |
|
|
List of news items |
|
|
""" |
|
|
if source not in ['twitter', 'reddit', 'rss', 'ai_tech', 'predictions', 'sectoral_news', 'market_events', 'economic_calendar']: |
|
|
logger.error(f"Invalid source: {source}") |
|
|
return [] |
|
|
|
|
|
|
|
|
if force_refresh: |
|
|
self._clear_source_from_dedup(source) |
|
|
|
|
|
|
|
|
if not force_refresh and self._is_cache_valid(source): |
|
|
logger.info(f"β
Cache HIT for {source} (age: {self._get_cache_age(source):.1f}s)") |
|
|
return self.cache[source]['raw_news'] |
|
|
|
|
|
|
|
|
logger.info(f"π Cache MISS for {source} - fetching fresh news...") |
|
|
try: |
|
|
logger.info(f"π Calling fetcher for {source} with kwargs: {kwargs}") |
|
|
new_items = fetcher_func(**kwargs) |
|
|
logger.info(f"π¦ Fetcher returned {len(new_items) if new_items else 0} items for {source}") |
|
|
|
|
|
if not new_items: |
|
|
logger.warning(f"β οΈ No news items fetched for {source} - returning cached data") |
|
|
|
|
|
return self.cache[source]['raw_news'] |
|
|
|
|
|
|
|
|
self._update_cache(source, new_items) |
|
|
|
|
|
|
|
|
deduplicated = self._deduplicate(new_items, source) |
|
|
|
|
|
logger.info(f"β
Fetched {len(new_items)} items for {source}, {len(deduplicated)} unique after dedup") |
|
|
|
|
|
return deduplicated |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error fetching news for {source}: {e}") |
|
|
|
|
|
return self.cache[source]['raw_news'] |
|
|
|
|
|
def _is_cache_valid(self, source: str) -> bool: |
|
|
""" |
|
|
Check if cached data is still fresh |
|
|
|
|
|
Args: |
|
|
source: News source to check |
|
|
|
|
|
Returns: |
|
|
True if cache is valid, False otherwise |
|
|
""" |
|
|
source_cache = self.cache[source] |
|
|
if not source_cache['last_fetch']: |
|
|
return False |
|
|
|
|
|
age = (datetime.now() - source_cache['last_fetch']).total_seconds() |
|
|
is_valid = age < source_cache['ttl'] |
|
|
|
|
|
return is_valid |
|
|
|
|
|
def _get_cache_age(self, source: str) -> float: |
|
|
""" |
|
|
Get age of cached data in seconds |
|
|
|
|
|
Args: |
|
|
source: News source |
|
|
|
|
|
Returns: |
|
|
Age in seconds, or -1 if never fetched |
|
|
""" |
|
|
source_cache = self.cache[source] |
|
|
if not source_cache['last_fetch']: |
|
|
return -1 |
|
|
|
|
|
return (datetime.now() - source_cache['last_fetch']).total_seconds() |
|
|
|
|
|
def _normalize_text(self, text: str) -> str: |
|
|
""" |
|
|
Normalize text for deduplication |
|
|
|
|
|
Args: |
|
|
text: Text to normalize |
|
|
|
|
|
Returns: |
|
|
Normalized text |
|
|
""" |
|
|
if not text: |
|
|
return "" |
|
|
|
|
|
|
|
|
text = text.lower().strip() |
|
|
|
|
|
|
|
|
text = re.sub(r'[^\w\s]', '', text) |
|
|
|
|
|
|
|
|
text = re.sub(r'\s+', ' ', text) |
|
|
|
|
|
return text |
|
|
|
|
|
def _compute_hash(self, item: Dict) -> str: |
|
|
""" |
|
|
Compute content hash for deduplication |
|
|
|
|
|
Args: |
|
|
item: News item dict |
|
|
|
|
|
Returns: |
|
|
MD5 hash string |
|
|
""" |
|
|
title = self._normalize_text(item.get('title', '')) |
|
|
summary = self._normalize_text(item.get('summary', '')[:200]) |
|
|
|
|
|
|
|
|
combined = f"{title}|{summary}" |
|
|
|
|
|
return hashlib.md5(combined.encode()).hexdigest() |
|
|
|
|
|
def _deduplicate(self, items: List[Dict], source: str) -> List[Dict]: |
|
|
""" |
|
|
Remove duplicates using global dedup index |
|
|
|
|
|
Args: |
|
|
items: List of news items |
|
|
source: Source name |
|
|
|
|
|
Returns: |
|
|
Deduplicated list of items |
|
|
""" |
|
|
deduplicated = [] |
|
|
duplicate_count = 0 |
|
|
|
|
|
for item in items: |
|
|
content_hash = self._compute_hash(item) |
|
|
|
|
|
if content_hash in self.cache['dedup_index']: |
|
|
|
|
|
dup_entry = self.cache['dedup_index'][content_hash] |
|
|
if source not in dup_entry['sources']: |
|
|
dup_entry['sources'].append(source) |
|
|
duplicate_count += 1 |
|
|
else: |
|
|
|
|
|
self.cache['dedup_index'][content_hash] = { |
|
|
'first_seen': datetime.now(), |
|
|
'sources': [source], |
|
|
'canonical_item': item |
|
|
} |
|
|
deduplicated.append(item) |
|
|
|
|
|
if duplicate_count > 0: |
|
|
logger.info(f"π Deduplication: Found {duplicate_count} duplicates for {source}") |
|
|
|
|
|
return deduplicated |
|
|
|
|
|
def _update_cache(self, source: str, items: List[Dict]): |
|
|
""" |
|
|
Update cache with new items |
|
|
|
|
|
Args: |
|
|
source: News source |
|
|
items: List of news items |
|
|
""" |
|
|
self.cache[source]['raw_news'] = items |
|
|
self.cache[source]['last_fetch'] = datetime.now() |
|
|
logger.info(f"π¦ Updated cache for {source} with {len(items)} items") |
|
|
|
|
|
def get_filtered_news( |
|
|
self, |
|
|
source_df: pd.DataFrame, |
|
|
filters: Dict, |
|
|
source_name: str = "unknown" |
|
|
) -> pd.DataFrame: |
|
|
""" |
|
|
Get filtered news with caching |
|
|
|
|
|
Args: |
|
|
source_df: Source dataframe |
|
|
filters: Filter dict with 'category', 'sentiment', 'impact' keys |
|
|
source_name: Name of source (for logging) |
|
|
|
|
|
Returns: |
|
|
Filtered dataframe |
|
|
""" |
|
|
if source_df.empty: |
|
|
return source_df |
|
|
|
|
|
|
|
|
category = filters.get('category', 'all') |
|
|
sentiment = filters.get('sentiment', 'all') |
|
|
impact = filters.get('impact', 'all') |
|
|
cache_key = f"{source_name}_{category}_{sentiment}_{impact}" |
|
|
|
|
|
|
|
|
if cache_key in self.cache['filtered_cache']: |
|
|
cached_entry = self.cache['filtered_cache'][cache_key] |
|
|
if datetime.now() < cached_entry['expires_at']: |
|
|
logger.debug(f"β
Filtered cache HIT for {cache_key}") |
|
|
return cached_entry['results'] |
|
|
|
|
|
|
|
|
filtered_df = source_df.copy() |
|
|
|
|
|
if category != 'all': |
|
|
filtered_df = filtered_df[filtered_df['category'] == category] |
|
|
|
|
|
if sentiment != 'all': |
|
|
filtered_df = filtered_df[filtered_df['sentiment'] == sentiment] |
|
|
|
|
|
if impact != 'all': |
|
|
filtered_df = filtered_df[filtered_df['impact'] == impact] |
|
|
|
|
|
logger.debug(f"π Filtered {source_name}: {len(source_df)} β {len(filtered_df)} items") |
|
|
|
|
|
|
|
|
self.cache['filtered_cache'][cache_key] = { |
|
|
'results': filtered_df, |
|
|
'expires_at': datetime.now() + timedelta(seconds=300) |
|
|
} |
|
|
|
|
|
return filtered_df |
|
|
|
|
|
def _clear_source_from_dedup(self, source: str): |
|
|
""" |
|
|
Remove all entries from dedup index that only belong to this source |
|
|
|
|
|
Args: |
|
|
source: Source to remove from dedup index |
|
|
""" |
|
|
to_remove = [] |
|
|
for content_hash, entry in self.cache['dedup_index'].items(): |
|
|
|
|
|
if source in entry['sources']: |
|
|
entry['sources'].remove(source) |
|
|
|
|
|
if not entry['sources']: |
|
|
to_remove.append(content_hash) |
|
|
|
|
|
|
|
|
for content_hash in to_remove: |
|
|
del self.cache['dedup_index'][content_hash] |
|
|
|
|
|
if to_remove: |
|
|
logger.info(f"ποΈ Removed {len(to_remove)} entries from dedup index for {source}") |
|
|
|
|
|
def clear_cache(self, source: Optional[str] = None): |
|
|
""" |
|
|
Clear cache for specific source or all sources |
|
|
|
|
|
Args: |
|
|
source: Source to clear, or None to clear all |
|
|
""" |
|
|
if source: |
|
|
self.cache[source] = {'raw_news': [], 'last_fetch': None, 'ttl': 180} |
|
|
self._clear_source_from_dedup(source) |
|
|
logger.info(f"ποΈ Cleared cache for {source}") |
|
|
else: |
|
|
for src in ['twitter', 'reddit', 'rss', 'ai_tech', 'predictions', 'sectoral_news', 'market_events', 'economic_calendar']: |
|
|
self.cache[src] = {'raw_news': [], 'last_fetch': None, 'ttl': 180} |
|
|
self.cache['dedup_index'] = {} |
|
|
self.cache['filtered_cache'] = {} |
|
|
logger.info("ποΈ Cleared ALL caches") |
|
|
|
|
|
def get_statistics(self) -> Dict: |
|
|
""" |
|
|
Get cache statistics |
|
|
|
|
|
Returns: |
|
|
Dict with cache stats |
|
|
""" |
|
|
stats = { |
|
|
'twitter': { |
|
|
'items': len(self.cache['twitter']['raw_news']), |
|
|
'age_seconds': self._get_cache_age('twitter'), |
|
|
'is_valid': self._is_cache_valid('twitter') |
|
|
}, |
|
|
'reddit': { |
|
|
'items': len(self.cache['reddit']['raw_news']), |
|
|
'age_seconds': self._get_cache_age('reddit'), |
|
|
'is_valid': self._is_cache_valid('reddit') |
|
|
}, |
|
|
'rss': { |
|
|
'items': len(self.cache['rss']['raw_news']), |
|
|
'age_seconds': self._get_cache_age('rss'), |
|
|
'is_valid': self._is_cache_valid('rss') |
|
|
}, |
|
|
'ai_tech': { |
|
|
'items': len(self.cache['ai_tech']['raw_news']), |
|
|
'age_seconds': self._get_cache_age('ai_tech'), |
|
|
'is_valid': self._is_cache_valid('ai_tech') |
|
|
}, |
|
|
'predictions': { |
|
|
'items': len(self.cache['predictions']['raw_news']), |
|
|
'age_seconds': self._get_cache_age('predictions'), |
|
|
'is_valid': self._is_cache_valid('predictions') |
|
|
}, |
|
|
'sectoral_news': { |
|
|
'items': len(self.cache['sectoral_news']['raw_news']), |
|
|
'age_seconds': self._get_cache_age('sectoral_news'), |
|
|
'is_valid': self._is_cache_valid('sectoral_news') |
|
|
}, |
|
|
'market_events': { |
|
|
'items': len(self.cache['market_events']['raw_news']), |
|
|
'age_seconds': self._get_cache_age('market_events'), |
|
|
'is_valid': self._is_cache_valid('market_events') |
|
|
}, |
|
|
'economic_calendar': { |
|
|
'items': len(self.cache['economic_calendar']['raw_news']), |
|
|
'age_seconds': self._get_cache_age('economic_calendar'), |
|
|
'is_valid': self._is_cache_valid('economic_calendar') |
|
|
}, |
|
|
'dedup_index_size': len(self.cache['dedup_index']), |
|
|
'filtered_cache_size': len(self.cache['filtered_cache']) |
|
|
} |
|
|
|
|
|
return stats |
|
|
|