FinancialPlatform / app /utils /news_cache.py
Dmitry Beresnev
fix prediction market section, etc
650204f
"""
Unified News Caching System
Centralized cache manager for Twitter, Reddit, RSS, and AI/Tech news feeds
"""
import hashlib
import logging
import re
import pandas as pd
from datetime import datetime, timedelta
from typing import List, Dict, Optional, Callable
logger = logging.getLogger(__name__)
class NewsCacheManager:
"""
Centralized cache manager for news feeds with:
- Per-source caching with TTL
- Cross-service deduplication
- Filtered results caching
- Force refresh support
"""
def __init__(self, default_ttl: int = 180):
"""
Initialize cache manager
Args:
default_ttl: Default time-to-live in seconds (default: 180 = 3 minutes)
"""
self.cache = {
'twitter': {'raw_news': [], 'last_fetch': None, 'ttl': default_ttl},
'reddit': {'raw_news': [], 'last_fetch': None, 'ttl': default_ttl},
'rss': {'raw_news': [], 'last_fetch': None, 'ttl': default_ttl},
'ai_tech': {'raw_news': [], 'last_fetch': None, 'ttl': default_ttl},
'predictions': {'raw_news': [], 'last_fetch': None, 'ttl': default_ttl},
'sectoral_news': {'raw_news': [], 'last_fetch': None, 'ttl': default_ttl},
'market_events': {'raw_news': [], 'last_fetch': None, 'ttl': default_ttl},
'economic_calendar': {'raw_news': [], 'last_fetch': None, 'ttl': default_ttl},
'dedup_index': {}, # Global deduplication index
'filtered_cache': {} # Cached filtered results
}
logger.info(f"NewsCacheManager initialized with {default_ttl}s TTL")
def get_news(
self,
source: str,
fetcher_func: Callable,
force_refresh: bool = False,
**kwargs
) -> List[Dict]:
"""
Get news from cache or fetch fresh if needed
Args:
source: News source ('twitter', 'reddit', 'rss', 'ai_tech')
fetcher_func: Function to fetch fresh news
force_refresh: If True, bypass cache and fetch fresh
**kwargs: Arguments to pass to fetcher_func
Returns:
List of news items
"""
if source not in ['twitter', 'reddit', 'rss', 'ai_tech', 'predictions', 'sectoral_news', 'market_events', 'economic_calendar']:
logger.error(f"Invalid source: {source}")
return []
# Force refresh clears dedup index for that source
if force_refresh:
self._clear_source_from_dedup(source)
# Check if cache is valid
if not force_refresh and self._is_cache_valid(source):
logger.info(f"βœ… Cache HIT for {source} (age: {self._get_cache_age(source):.1f}s)")
return self.cache[source]['raw_news']
# Cache miss or force refresh - fetch fresh news
logger.info(f"πŸ”„ Cache MISS for {source} - fetching fresh news...")
try:
logger.info(f"πŸ“ž Calling fetcher for {source} with kwargs: {kwargs}")
new_items = fetcher_func(**kwargs)
logger.info(f"πŸ“¦ Fetcher returned {len(new_items) if new_items else 0} items for {source}")
if not new_items:
logger.warning(f"⚠️ No news items fetched for {source} - returning cached data")
# Return cached data if available, even if expired
return self.cache[source]['raw_news']
# Update cache
self._update_cache(source, new_items)
# Deduplicate across sources
deduplicated = self._deduplicate(new_items, source)
logger.info(f"βœ… Fetched {len(new_items)} items for {source}, {len(deduplicated)} unique after dedup")
return deduplicated
except Exception as e:
logger.error(f"Error fetching news for {source}: {e}")
# Return cached data if available
return self.cache[source]['raw_news']
def _is_cache_valid(self, source: str) -> bool:
"""
Check if cached data is still fresh
Args:
source: News source to check
Returns:
True if cache is valid, False otherwise
"""
source_cache = self.cache[source]
if not source_cache['last_fetch']:
return False
age = (datetime.now() - source_cache['last_fetch']).total_seconds()
is_valid = age < source_cache['ttl']
return is_valid
def _get_cache_age(self, source: str) -> float:
"""
Get age of cached data in seconds
Args:
source: News source
Returns:
Age in seconds, or -1 if never fetched
"""
source_cache = self.cache[source]
if not source_cache['last_fetch']:
return -1
return (datetime.now() - source_cache['last_fetch']).total_seconds()
def _normalize_text(self, text: str) -> str:
"""
Normalize text for deduplication
Args:
text: Text to normalize
Returns:
Normalized text
"""
if not text:
return ""
# Convert to lowercase
text = text.lower().strip()
# Remove punctuation
text = re.sub(r'[^\w\s]', '', text)
# Normalize whitespace
text = re.sub(r'\s+', ' ', text)
return text
def _compute_hash(self, item: Dict) -> str:
"""
Compute content hash for deduplication
Args:
item: News item dict
Returns:
MD5 hash string
"""
title = self._normalize_text(item.get('title', ''))
summary = self._normalize_text(item.get('summary', '')[:200]) # First 200 chars
# Combine title and summary
combined = f"{title}|{summary}"
return hashlib.md5(combined.encode()).hexdigest()
def _deduplicate(self, items: List[Dict], source: str) -> List[Dict]:
"""
Remove duplicates using global dedup index
Args:
items: List of news items
source: Source name
Returns:
Deduplicated list of items
"""
deduplicated = []
duplicate_count = 0
for item in items:
content_hash = self._compute_hash(item)
if content_hash in self.cache['dedup_index']:
# Duplicate found - update sources list
dup_entry = self.cache['dedup_index'][content_hash]
if source not in dup_entry['sources']:
dup_entry['sources'].append(source)
duplicate_count += 1
else:
# New item - add to index and result
self.cache['dedup_index'][content_hash] = {
'first_seen': datetime.now(),
'sources': [source],
'canonical_item': item
}
deduplicated.append(item)
if duplicate_count > 0:
logger.info(f"πŸ” Deduplication: Found {duplicate_count} duplicates for {source}")
return deduplicated
def _update_cache(self, source: str, items: List[Dict]):
"""
Update cache with new items
Args:
source: News source
items: List of news items
"""
self.cache[source]['raw_news'] = items
self.cache[source]['last_fetch'] = datetime.now()
logger.info(f"πŸ“¦ Updated cache for {source} with {len(items)} items")
def get_filtered_news(
self,
source_df: pd.DataFrame,
filters: Dict,
source_name: str = "unknown"
) -> pd.DataFrame:
"""
Get filtered news with caching
Args:
source_df: Source dataframe
filters: Filter dict with 'category', 'sentiment', 'impact' keys
source_name: Name of source (for logging)
Returns:
Filtered dataframe
"""
if source_df.empty:
return source_df
# Create cache key from filters
category = filters.get('category', 'all')
sentiment = filters.get('sentiment', 'all')
impact = filters.get('impact', 'all')
cache_key = f"{source_name}_{category}_{sentiment}_{impact}"
# Check filtered cache
if cache_key in self.cache['filtered_cache']:
cached_entry = self.cache['filtered_cache'][cache_key]
if datetime.now() < cached_entry['expires_at']:
logger.debug(f"βœ… Filtered cache HIT for {cache_key}")
return cached_entry['results']
# Apply filters
filtered_df = source_df.copy()
if category != 'all':
filtered_df = filtered_df[filtered_df['category'] == category]
if sentiment != 'all':
filtered_df = filtered_df[filtered_df['sentiment'] == sentiment]
if impact != 'all':
filtered_df = filtered_df[filtered_df['impact'] == impact]
logger.debug(f"πŸ” Filtered {source_name}: {len(source_df)} β†’ {len(filtered_df)} items")
# Cache filtered results (5 minute TTL)
self.cache['filtered_cache'][cache_key] = {
'results': filtered_df,
'expires_at': datetime.now() + timedelta(seconds=300)
}
return filtered_df
def _clear_source_from_dedup(self, source: str):
"""
Remove all entries from dedup index that only belong to this source
Args:
source: Source to remove from dedup index
"""
to_remove = []
for content_hash, entry in self.cache['dedup_index'].items():
# Remove source from sources list
if source in entry['sources']:
entry['sources'].remove(source)
# If no sources left, mark for removal
if not entry['sources']:
to_remove.append(content_hash)
# Remove entries with no sources
for content_hash in to_remove:
del self.cache['dedup_index'][content_hash]
if to_remove:
logger.info(f"πŸ—‘οΈ Removed {len(to_remove)} entries from dedup index for {source}")
def clear_cache(self, source: Optional[str] = None):
"""
Clear cache for specific source or all sources
Args:
source: Source to clear, or None to clear all
"""
if source:
self.cache[source] = {'raw_news': [], 'last_fetch': None, 'ttl': 180}
self._clear_source_from_dedup(source)
logger.info(f"πŸ—‘οΈ Cleared cache for {source}")
else:
for src in ['twitter', 'reddit', 'rss', 'ai_tech', 'predictions', 'sectoral_news', 'market_events', 'economic_calendar']:
self.cache[src] = {'raw_news': [], 'last_fetch': None, 'ttl': 180}
self.cache['dedup_index'] = {}
self.cache['filtered_cache'] = {}
logger.info("πŸ—‘οΈ Cleared ALL caches")
def get_statistics(self) -> Dict:
"""
Get cache statistics
Returns:
Dict with cache stats
"""
stats = {
'twitter': {
'items': len(self.cache['twitter']['raw_news']),
'age_seconds': self._get_cache_age('twitter'),
'is_valid': self._is_cache_valid('twitter')
},
'reddit': {
'items': len(self.cache['reddit']['raw_news']),
'age_seconds': self._get_cache_age('reddit'),
'is_valid': self._is_cache_valid('reddit')
},
'rss': {
'items': len(self.cache['rss']['raw_news']),
'age_seconds': self._get_cache_age('rss'),
'is_valid': self._is_cache_valid('rss')
},
'ai_tech': {
'items': len(self.cache['ai_tech']['raw_news']),
'age_seconds': self._get_cache_age('ai_tech'),
'is_valid': self._is_cache_valid('ai_tech')
},
'predictions': {
'items': len(self.cache['predictions']['raw_news']),
'age_seconds': self._get_cache_age('predictions'),
'is_valid': self._is_cache_valid('predictions')
},
'sectoral_news': {
'items': len(self.cache['sectoral_news']['raw_news']),
'age_seconds': self._get_cache_age('sectoral_news'),
'is_valid': self._is_cache_valid('sectoral_news')
},
'market_events': {
'items': len(self.cache['market_events']['raw_news']),
'age_seconds': self._get_cache_age('market_events'),
'is_valid': self._is_cache_valid('market_events')
},
'economic_calendar': {
'items': len(self.cache['economic_calendar']['raw_news']),
'age_seconds': self._get_cache_age('economic_calendar'),
'is_valid': self._is_cache_valid('economic_calendar')
},
'dedup_index_size': len(self.cache['dedup_index']),
'filtered_cache_size': len(self.cache['filtered_cache'])
}
return stats