Dmitry Beresnev
commited on
Commit
Β·
64315f0
1
Parent(s):
5fbc88e
add reddit news, cache system (per user), etc
Browse files- .gitignore +3 -1
- app/pages/05_Dashboard.py +157 -52
- app/services/news_scraper.py +6 -10
- app/services/reddit_news.py +316 -0
- app/services/twitter_news_playwright.py +5 -10
- app/utils/news_cache.py +347 -0
.gitignore
CHANGED
|
@@ -33,4 +33,6 @@ tests/__pycache__/
|
|
| 33 |
# Ignore md files
|
| 34 |
*.md
|
| 35 |
#
|
| 36 |
-
docs/
|
|
|
|
|
|
|
|
|
| 33 |
# Ignore md files
|
| 34 |
*.md
|
| 35 |
#
|
| 36 |
+
docs/
|
| 37 |
+
#
|
| 38 |
+
*_example.py
|
app/pages/05_Dashboard.py
CHANGED
|
@@ -31,6 +31,12 @@ try:
|
|
| 31 |
except ImportError:
|
| 32 |
TWITTER_AVAILABLE = False
|
| 33 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 34 |
|
| 35 |
# ---- Page Configuration ----
|
| 36 |
st.set_page_config(
|
|
@@ -50,8 +56,19 @@ if 'rss_monitor' not in st.session_state and RSS_AVAILABLE:
|
|
| 50 |
if 'twitter_monitor' not in st.session_state and TWITTER_AVAILABLE:
|
| 51 |
st.session_state.twitter_monitor = TwitterFinanceMonitor()
|
| 52 |
|
|
|
|
|
|
|
|
|
|
| 53 |
rss_monitor = st.session_state.get('rss_monitor')
|
| 54 |
twitter_monitor = st.session_state.get('twitter_monitor')
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 55 |
|
| 56 |
# ---- Header ----
|
| 57 |
st.markdown("# π€ Live Financial News & AI Dashboard")
|
|
@@ -118,6 +135,12 @@ with st.sidebar:
|
|
| 118 |
high_impact_count += twitter_stats['high_impact']
|
| 119 |
breaking_count += twitter_stats['breaking']
|
| 120 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 121 |
if rss_monitor:
|
| 122 |
rss_stats = rss_monitor.get_statistics()
|
| 123 |
total_stories += rss_stats['total']
|
|
@@ -133,8 +156,9 @@ with st.sidebar:
|
|
| 133 |
|
| 134 |
# Count total sources
|
| 135 |
twitter_sources = len(twitter_monitor.SOURCES) if twitter_monitor else 0
|
|
|
|
| 136 |
rss_sources = len(rss_monitor.SOURCES) if rss_monitor else 0
|
| 137 |
-
total_sources = twitter_sources + rss_sources
|
| 138 |
|
| 139 |
st.markdown(f"""
|
| 140 |
<div style='font-size: 11px; line-height: 1.6;'>
|
|
@@ -145,6 +169,11 @@ with st.sidebar:
|
|
| 145 |
β’ CNBC β’ BBC β’ MarketWatch
|
| 146 |
β’ The Economist β’ AP β’ AFP
|
| 147 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 148 |
**RSS + Web Scraping ({rss_sources})**
|
| 149 |
β’ CNBC β’ Bloomberg β’ FT β’ WSJ
|
| 150 |
β’ BBC β’ Yahoo Finance β’ The Economist
|
|
@@ -162,57 +191,133 @@ force_refresh = st.session_state.get('force_refresh', False)
|
|
| 162 |
if force_refresh:
|
| 163 |
st.session_state.force_refresh = False
|
| 164 |
|
| 165 |
-
# Fetch news from all sources
|
| 166 |
import pandas as pd
|
|
|
|
| 167 |
|
| 168 |
twitter_df = pd.DataFrame()
|
|
|
|
| 169 |
rss_all_df = pd.DataFrame()
|
| 170 |
rss_main_df = pd.DataFrame()
|
| 171 |
|
| 172 |
-
|
| 173 |
-
|
| 174 |
-
|
| 175 |
-
|
| 176 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 177 |
if twitter_news:
|
| 178 |
-
|
| 179 |
-
if not
|
| 180 |
-
|
| 181 |
-
|
| 182 |
-
|
| 183 |
-
|
| 184 |
-
|
| 185 |
-
|
| 186 |
-
|
| 187 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 188 |
if rss_news:
|
| 189 |
-
|
| 190 |
-
if not
|
| 191 |
-
|
| 192 |
-
|
| 193 |
-
|
| 194 |
-
|
| 195 |
-
|
| 196 |
-
|
| 197 |
-
|
| 198 |
-
|
| 199 |
-
|
| 200 |
-
|
| 201 |
-
|
| 202 |
-
|
| 203 |
-
|
| 204 |
-
|
| 205 |
-
|
| 206 |
-
|
| 207 |
-
|
| 208 |
-
|
| 209 |
-
|
| 210 |
-
|
| 211 |
-
|
| 212 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 213 |
|
| 214 |
# Combine all for breaking news banner
|
| 215 |
-
all_news_df = pd.concat([twitter_filtered, rss_all_filtered], ignore_index=True) if not twitter_filtered.empty or not rss_all_filtered.empty else pd.DataFrame()
|
| 216 |
|
| 217 |
# Display breaking news banner
|
| 218 |
if not all_news_df.empty:
|
|
@@ -225,20 +330,20 @@ st.markdown("---")
|
|
| 225 |
col1, col2, col3 = st.columns(3)
|
| 226 |
|
| 227 |
with col1:
|
| 228 |
-
# SECTION 1: Twitter/X Breaking News
|
| 229 |
-
if not
|
| 230 |
display_scrollable_news_section(
|
| 231 |
-
|
| 232 |
-
section_title="Twitter/X News",
|
| 233 |
-
section_icon="
|
| 234 |
-
section_subtitle="Real-time
|
| 235 |
-
max_items=
|
| 236 |
height="700px"
|
| 237 |
)
|
| 238 |
-
elif not twitter_df.empty:
|
| 239 |
-
st.info("π No Twitter news matches your current filters.")
|
| 240 |
else:
|
| 241 |
-
st.info("β³ Twitter
|
| 242 |
|
| 243 |
with col2:
|
| 244 |
# SECTION 2: Main Page News (Web-Scraped)
|
|
|
|
| 31 |
except ImportError:
|
| 32 |
TWITTER_AVAILABLE = False
|
| 33 |
|
| 34 |
+
try:
|
| 35 |
+
from services.reddit_news import RedditFinanceMonitor
|
| 36 |
+
REDDIT_AVAILABLE = True
|
| 37 |
+
except ImportError:
|
| 38 |
+
REDDIT_AVAILABLE = False
|
| 39 |
+
|
| 40 |
|
| 41 |
# ---- Page Configuration ----
|
| 42 |
st.set_page_config(
|
|
|
|
| 56 |
if 'twitter_monitor' not in st.session_state and TWITTER_AVAILABLE:
|
| 57 |
st.session_state.twitter_monitor = TwitterFinanceMonitor()
|
| 58 |
|
| 59 |
+
if 'reddit_monitor' not in st.session_state and REDDIT_AVAILABLE:
|
| 60 |
+
st.session_state.reddit_monitor = RedditFinanceMonitor()
|
| 61 |
+
|
| 62 |
rss_monitor = st.session_state.get('rss_monitor')
|
| 63 |
twitter_monitor = st.session_state.get('twitter_monitor')
|
| 64 |
+
reddit_monitor = st.session_state.get('reddit_monitor')
|
| 65 |
+
|
| 66 |
+
# Initialize unified cache manager
|
| 67 |
+
if 'news_cache_manager' not in st.session_state:
|
| 68 |
+
from utils.news_cache import NewsCacheManager
|
| 69 |
+
st.session_state.news_cache_manager = NewsCacheManager(default_ttl=180)
|
| 70 |
+
|
| 71 |
+
cache_manager = st.session_state.news_cache_manager
|
| 72 |
|
| 73 |
# ---- Header ----
|
| 74 |
st.markdown("# π€ Live Financial News & AI Dashboard")
|
|
|
|
| 135 |
high_impact_count += twitter_stats['high_impact']
|
| 136 |
breaking_count += twitter_stats['breaking']
|
| 137 |
|
| 138 |
+
if reddit_monitor:
|
| 139 |
+
reddit_stats = reddit_monitor.get_statistics()
|
| 140 |
+
total_stories += reddit_stats['total']
|
| 141 |
+
high_impact_count += reddit_stats['high_impact']
|
| 142 |
+
breaking_count += reddit_stats['breaking']
|
| 143 |
+
|
| 144 |
if rss_monitor:
|
| 145 |
rss_stats = rss_monitor.get_statistics()
|
| 146 |
total_stories += rss_stats['total']
|
|
|
|
| 156 |
|
| 157 |
# Count total sources
|
| 158 |
twitter_sources = len(twitter_monitor.SOURCES) if twitter_monitor else 0
|
| 159 |
+
reddit_sources = len(reddit_monitor.SUBREDDITS) if reddit_monitor else 0
|
| 160 |
rss_sources = len(rss_monitor.SOURCES) if rss_monitor else 0
|
| 161 |
+
total_sources = twitter_sources + reddit_sources + rss_sources
|
| 162 |
|
| 163 |
st.markdown(f"""
|
| 164 |
<div style='font-size: 11px; line-height: 1.6;'>
|
|
|
|
| 169 |
β’ CNBC β’ BBC β’ MarketWatch
|
| 170 |
β’ The Economist β’ AP β’ AFP
|
| 171 |
|
| 172 |
+
**Reddit Communities ({reddit_sources})**
|
| 173 |
+
β’ r/wallstreetbets β’ r/stocks β’ r/investing
|
| 174 |
+
β’ r/algotrading β’ r/economics β’ r/geopolitics
|
| 175 |
+
β’ r/options β’ r/SecurityAnalysis
|
| 176 |
+
|
| 177 |
**RSS + Web Scraping ({rss_sources})**
|
| 178 |
β’ CNBC β’ Bloomberg β’ FT β’ WSJ
|
| 179 |
β’ BBC β’ Yahoo Finance β’ The Economist
|
|
|
|
| 191 |
if force_refresh:
|
| 192 |
st.session_state.force_refresh = False
|
| 193 |
|
| 194 |
+
# Fetch news from all sources IN PARALLEL for maximum performance
|
| 195 |
import pandas as pd
|
| 196 |
+
from concurrent.futures import ThreadPoolExecutor
|
| 197 |
|
| 198 |
twitter_df = pd.DataFrame()
|
| 199 |
+
reddit_df = pd.DataFrame()
|
| 200 |
rss_all_df = pd.DataFrame()
|
| 201 |
rss_main_df = pd.DataFrame()
|
| 202 |
|
| 203 |
+
def fetch_twitter_news():
|
| 204 |
+
"""Fetch Twitter/X news via cache manager"""
|
| 205 |
+
try:
|
| 206 |
+
if twitter_monitor:
|
| 207 |
+
# Use cache manager for smart caching
|
| 208 |
+
twitter_news = cache_manager.get_news(
|
| 209 |
+
source='twitter',
|
| 210 |
+
fetcher_func=twitter_monitor.scrape_twitter_news,
|
| 211 |
+
force_refresh=force_refresh,
|
| 212 |
+
max_tweets=50
|
| 213 |
+
)
|
| 214 |
if twitter_news:
|
| 215 |
+
df = pd.DataFrame(twitter_news)
|
| 216 |
+
if not df.empty:
|
| 217 |
+
df['timestamp'] = pd.to_datetime(df['timestamp'])
|
| 218 |
+
return df, None
|
| 219 |
+
except Exception as e:
|
| 220 |
+
return pd.DataFrame(), f"Twitter scraping unavailable: {e}"
|
| 221 |
+
return pd.DataFrame(), None
|
| 222 |
+
|
| 223 |
+
def fetch_reddit_news():
|
| 224 |
+
"""Fetch Reddit news via cache manager"""
|
| 225 |
+
try:
|
| 226 |
+
if reddit_monitor:
|
| 227 |
+
# Use cache manager for smart caching
|
| 228 |
+
reddit_news = cache_manager.get_news(
|
| 229 |
+
source='reddit',
|
| 230 |
+
fetcher_func=reddit_monitor.scrape_reddit_news,
|
| 231 |
+
force_refresh=force_refresh,
|
| 232 |
+
max_posts=50,
|
| 233 |
+
hours=12
|
| 234 |
+
)
|
| 235 |
+
if reddit_news:
|
| 236 |
+
df = pd.DataFrame(reddit_news)
|
| 237 |
+
if not df.empty:
|
| 238 |
+
df['timestamp'] = pd.to_datetime(df['timestamp'])
|
| 239 |
+
return df, None
|
| 240 |
+
except Exception as e:
|
| 241 |
+
return pd.DataFrame(), f"Reddit scraping unavailable: {e}"
|
| 242 |
+
return pd.DataFrame(), None
|
| 243 |
+
|
| 244 |
+
def fetch_rss_news():
|
| 245 |
+
"""Fetch RSS + Web scraped news via cache manager"""
|
| 246 |
+
try:
|
| 247 |
+
if rss_monitor:
|
| 248 |
+
# Use cache manager for smart caching
|
| 249 |
+
rss_news = cache_manager.get_news(
|
| 250 |
+
source='rss',
|
| 251 |
+
fetcher_func=rss_monitor.scrape_news,
|
| 252 |
+
force_refresh=force_refresh,
|
| 253 |
+
max_items=100
|
| 254 |
+
)
|
| 255 |
if rss_news:
|
| 256 |
+
df = pd.DataFrame(rss_news)
|
| 257 |
+
if not df.empty:
|
| 258 |
+
df['timestamp'] = pd.to_datetime(df['timestamp'])
|
| 259 |
+
return df, None
|
| 260 |
+
except Exception as e:
|
| 261 |
+
return pd.DataFrame(), f"RSS scraping unavailable: {e}"
|
| 262 |
+
return pd.DataFrame(), None
|
| 263 |
+
|
| 264 |
+
with st.spinner("π Fetching latest financial news in parallel..."):
|
| 265 |
+
# Execute all news fetching operations in parallel using ThreadPoolExecutor
|
| 266 |
+
with ThreadPoolExecutor(max_workers=3) as executor:
|
| 267 |
+
# Submit all tasks
|
| 268 |
+
future_twitter = executor.submit(fetch_twitter_news)
|
| 269 |
+
future_reddit = executor.submit(fetch_reddit_news)
|
| 270 |
+
future_rss = executor.submit(fetch_rss_news)
|
| 271 |
+
|
| 272 |
+
# Collect results as they complete
|
| 273 |
+
futures = {
|
| 274 |
+
'twitter': future_twitter,
|
| 275 |
+
'reddit': future_reddit,
|
| 276 |
+
'rss': future_rss
|
| 277 |
+
}
|
| 278 |
+
|
| 279 |
+
for source_name, future in futures.items():
|
| 280 |
+
try:
|
| 281 |
+
result_df, error = future.result(timeout=90) # 90 second timeout per source
|
| 282 |
+
|
| 283 |
+
if source_name == 'twitter':
|
| 284 |
+
twitter_df = result_df
|
| 285 |
+
if error:
|
| 286 |
+
st.warning(error)
|
| 287 |
+
elif source_name == 'reddit':
|
| 288 |
+
reddit_df = result_df
|
| 289 |
+
if error:
|
| 290 |
+
st.warning(error)
|
| 291 |
+
elif source_name == 'rss':
|
| 292 |
+
rss_all_df = result_df
|
| 293 |
+
if error:
|
| 294 |
+
st.warning(error)
|
| 295 |
+
# Get main page news subset for RSS
|
| 296 |
+
if not rss_all_df.empty and 'from_web' in rss_all_df.columns:
|
| 297 |
+
rss_main_df = rss_all_df[rss_all_df['from_web'] == True].copy()
|
| 298 |
+
|
| 299 |
+
except Exception as e:
|
| 300 |
+
st.warning(f"Error fetching {source_name} news: {e}")
|
| 301 |
+
|
| 302 |
+
# Apply filters using cache manager (with filter result caching)
|
| 303 |
+
filters = {
|
| 304 |
+
'category': category_filter,
|
| 305 |
+
'sentiment': sentiment_filter,
|
| 306 |
+
'impact': impact_filter
|
| 307 |
+
}
|
| 308 |
+
|
| 309 |
+
twitter_filtered = cache_manager.get_filtered_news(twitter_df, filters, 'twitter') if not twitter_df.empty else twitter_df
|
| 310 |
+
reddit_filtered = cache_manager.get_filtered_news(reddit_df, filters, 'reddit') if not reddit_df.empty else reddit_df
|
| 311 |
+
rss_main_filtered = cache_manager.get_filtered_news(rss_main_df, filters, 'rss_main') if not rss_main_df.empty else rss_main_df
|
| 312 |
+
rss_all_filtered = cache_manager.get_filtered_news(rss_all_df, filters, 'rss_all') if not rss_all_df.empty else rss_all_df
|
| 313 |
+
|
| 314 |
+
# Combine Twitter and Reddit for first column
|
| 315 |
+
twitter_reddit_df = pd.concat([twitter_filtered, reddit_filtered], ignore_index=True) if not twitter_filtered.empty or not reddit_filtered.empty else pd.DataFrame()
|
| 316 |
+
if not twitter_reddit_df.empty:
|
| 317 |
+
twitter_reddit_df = twitter_reddit_df.sort_values('timestamp', ascending=False)
|
| 318 |
|
| 319 |
# Combine all for breaking news banner
|
| 320 |
+
all_news_df = pd.concat([twitter_filtered, reddit_filtered, rss_all_filtered], ignore_index=True) if not twitter_filtered.empty or not reddit_filtered.empty or not rss_all_filtered.empty else pd.DataFrame()
|
| 321 |
|
| 322 |
# Display breaking news banner
|
| 323 |
if not all_news_df.empty:
|
|
|
|
| 330 |
col1, col2, col3 = st.columns(3)
|
| 331 |
|
| 332 |
with col1:
|
| 333 |
+
# SECTION 1: Twitter/X & Reddit Breaking News
|
| 334 |
+
if not twitter_reddit_df.empty:
|
| 335 |
display_scrollable_news_section(
|
| 336 |
+
twitter_reddit_df,
|
| 337 |
+
section_title="Twitter/X & Reddit News",
|
| 338 |
+
section_icon="π",
|
| 339 |
+
section_subtitle="Real-time news from premium accounts & communities (last 12h)",
|
| 340 |
+
max_items=100,
|
| 341 |
height="700px"
|
| 342 |
)
|
| 343 |
+
elif not twitter_df.empty or not reddit_df.empty:
|
| 344 |
+
st.info("π No Twitter/Reddit news matches your current filters.")
|
| 345 |
else:
|
| 346 |
+
st.info("β³ Fetching Twitter & Reddit news... This may take 30-60 seconds on first load.")
|
| 347 |
|
| 348 |
with col2:
|
| 349 |
# SECTION 2: Main Page News (Web-Scraped)
|
app/services/news_scraper.py
CHANGED
|
@@ -135,10 +135,7 @@ class FinanceNewsScraper:
|
|
| 135 |
]
|
| 136 |
|
| 137 |
def __init__(self):
|
| 138 |
-
"""Initialize scraper
|
| 139 |
-
self.news_cache = []
|
| 140 |
-
self.last_fetch = None
|
| 141 |
-
self.cache_ttl = 180 # 3 minutes
|
| 142 |
self.session = requests.Session()
|
| 143 |
# Enhanced headers to avoid bot detection
|
| 144 |
self.session.headers.update({
|
|
@@ -331,8 +328,7 @@ class FinanceNewsScraper:
|
|
| 331 |
logger.error(f"Error scraping web page for {source_name}: {e}")
|
| 332 |
return []
|
| 333 |
|
| 334 |
-
|
| 335 |
-
def scrape_news(_self, max_items: int = 100) -> List[Dict]:
|
| 336 |
"""
|
| 337 |
Scrape news from all sources with caching
|
| 338 |
Uses ThreadPoolExecutor for parallel fetching from both RSS and web pages
|
|
@@ -345,12 +341,12 @@ class FinanceNewsScraper:
|
|
| 345 |
futures = []
|
| 346 |
|
| 347 |
# Submit both RSS and web scraping tasks for each source
|
| 348 |
-
for name, info in
|
| 349 |
# RSS feed task
|
| 350 |
-
futures.append((executor.submit(
|
| 351 |
# Web scraping task (only if web URL is configured)
|
| 352 |
if info.get('web'):
|
| 353 |
-
futures.append((executor.submit(
|
| 354 |
|
| 355 |
for future, source_name, method in futures:
|
| 356 |
try:
|
|
@@ -372,7 +368,7 @@ class FinanceNewsScraper:
|
|
| 372 |
# If no news was fetched, use mock data
|
| 373 |
if not all_news:
|
| 374 |
logger.warning("No news fetched from any source - using mock data")
|
| 375 |
-
return
|
| 376 |
|
| 377 |
# Sort by: web-scraped first, then breaking news, then impact, then timestamp
|
| 378 |
all_news.sort(
|
|
|
|
| 135 |
]
|
| 136 |
|
| 137 |
def __init__(self):
|
| 138 |
+
"""Initialize scraper"""
|
|
|
|
|
|
|
|
|
|
| 139 |
self.session = requests.Session()
|
| 140 |
# Enhanced headers to avoid bot detection
|
| 141 |
self.session.headers.update({
|
|
|
|
| 328 |
logger.error(f"Error scraping web page for {source_name}: {e}")
|
| 329 |
return []
|
| 330 |
|
| 331 |
+
def scrape_news(self, max_items: int = 100) -> List[Dict]:
|
|
|
|
| 332 |
"""
|
| 333 |
Scrape news from all sources with caching
|
| 334 |
Uses ThreadPoolExecutor for parallel fetching from both RSS and web pages
|
|
|
|
| 341 |
futures = []
|
| 342 |
|
| 343 |
# Submit both RSS and web scraping tasks for each source
|
| 344 |
+
for name, info in self.SOURCES.items():
|
| 345 |
# RSS feed task
|
| 346 |
+
futures.append((executor.submit(self._fetch_rss_feed, name, info), name, 'RSS'))
|
| 347 |
# Web scraping task (only if web URL is configured)
|
| 348 |
if info.get('web'):
|
| 349 |
+
futures.append((executor.submit(self._scrape_web_page, name, info), name, 'Web'))
|
| 350 |
|
| 351 |
for future, source_name, method in futures:
|
| 352 |
try:
|
|
|
|
| 368 |
# If no news was fetched, use mock data
|
| 369 |
if not all_news:
|
| 370 |
logger.warning("No news fetched from any source - using mock data")
|
| 371 |
+
return self._get_mock_news()
|
| 372 |
|
| 373 |
# Sort by: web-scraped first, then breaking news, then impact, then timestamp
|
| 374 |
all_news.sort(
|
app/services/reddit_news.py
ADDED
|
@@ -0,0 +1,316 @@
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 1 |
+
"""
|
| 2 |
+
Reddit Financial News Scraper
|
| 3 |
+
Scrapes financial, trading, quant, and geopolitical news from Reddit
|
| 4 |
+
No authentication required - uses public RSS feeds
|
| 5 |
+
"""
|
| 6 |
+
|
| 7 |
+
import feedparser
|
| 8 |
+
import logging
|
| 9 |
+
from datetime import datetime, timedelta
|
| 10 |
+
from typing import List, Dict
|
| 11 |
+
import re
|
| 12 |
+
|
| 13 |
+
logger = logging.getLogger(__name__)
|
| 14 |
+
|
| 15 |
+
|
| 16 |
+
class RedditFinanceMonitor:
|
| 17 |
+
"""
|
| 18 |
+
Reddit financial news aggregator using RSS feeds
|
| 19 |
+
No authentication required - public RSS feeds only
|
| 20 |
+
"""
|
| 21 |
+
|
| 22 |
+
# Premium financial subreddits
|
| 23 |
+
SUBREDDITS = {
|
| 24 |
+
# Financial & Markets
|
| 25 |
+
'wallstreetbets': {
|
| 26 |
+
'url': 'https://www.reddit.com/r/wallstreetbets/top/.rss?t=day',
|
| 27 |
+
'weight': 1.6,
|
| 28 |
+
'specialization': ['markets'],
|
| 29 |
+
'category': 'markets'
|
| 30 |
+
},
|
| 31 |
+
'stocks': {
|
| 32 |
+
'url': 'https://www.reddit.com/r/stocks/top/.rss?t=day',
|
| 33 |
+
'weight': 1.7,
|
| 34 |
+
'specialization': ['markets'],
|
| 35 |
+
'category': 'markets'
|
| 36 |
+
},
|
| 37 |
+
'investing': {
|
| 38 |
+
'url': 'https://www.reddit.com/r/investing/top/.rss?t=day',
|
| 39 |
+
'weight': 1.8,
|
| 40 |
+
'specialization': ['markets', 'macro'],
|
| 41 |
+
'category': 'markets'
|
| 42 |
+
},
|
| 43 |
+
'stockmarket': {
|
| 44 |
+
'url': 'https://www.reddit.com/r/StockMarket/top/.rss?t=day',
|
| 45 |
+
'weight': 1.6,
|
| 46 |
+
'specialization': ['markets'],
|
| 47 |
+
'category': 'markets'
|
| 48 |
+
},
|
| 49 |
+
'options': {
|
| 50 |
+
'url': 'https://www.reddit.com/r/options/top/.rss?t=day',
|
| 51 |
+
'weight': 1.5,
|
| 52 |
+
'specialization': ['markets'],
|
| 53 |
+
'category': 'markets'
|
| 54 |
+
},
|
| 55 |
+
'daytrading': {
|
| 56 |
+
'url': 'https://www.reddit.com/r/Daytrading/top/.rss?t=day',
|
| 57 |
+
'weight': 1.5,
|
| 58 |
+
'specialization': ['markets'],
|
| 59 |
+
'category': 'markets'
|
| 60 |
+
},
|
| 61 |
+
'securityanalysis': {
|
| 62 |
+
'url': 'https://www.reddit.com/r/SecurityAnalysis/top/.rss?t=day',
|
| 63 |
+
'weight': 1.7,
|
| 64 |
+
'specialization': ['markets'],
|
| 65 |
+
'category': 'markets'
|
| 66 |
+
},
|
| 67 |
+
|
| 68 |
+
# Economics & Macro
|
| 69 |
+
'economics': {
|
| 70 |
+
'url': 'https://www.reddit.com/r/Economics/top/.rss?t=day',
|
| 71 |
+
'weight': 1.8,
|
| 72 |
+
'specialization': ['macro'],
|
| 73 |
+
'category': 'macro'
|
| 74 |
+
},
|
| 75 |
+
'economy': {
|
| 76 |
+
'url': 'https://www.reddit.com/r/economy/top/.rss?t=day',
|
| 77 |
+
'weight': 1.6,
|
| 78 |
+
'specialization': ['macro'],
|
| 79 |
+
'category': 'macro'
|
| 80 |
+
},
|
| 81 |
+
|
| 82 |
+
# Quantitative Finance
|
| 83 |
+
'algotrading': {
|
| 84 |
+
'url': 'https://www.reddit.com/r/algotrading/top/.rss?t=day',
|
| 85 |
+
'weight': 1.7,
|
| 86 |
+
'specialization': ['markets'],
|
| 87 |
+
'category': 'markets'
|
| 88 |
+
},
|
| 89 |
+
'quantfinance': {
|
| 90 |
+
'url': 'https://www.reddit.com/r/quant/top/.rss?t=day',
|
| 91 |
+
'weight': 1.7,
|
| 92 |
+
'specialization': ['markets'],
|
| 93 |
+
'category': 'markets'
|
| 94 |
+
},
|
| 95 |
+
|
| 96 |
+
# Geopolitics
|
| 97 |
+
'geopolitics': {
|
| 98 |
+
'url': 'https://www.reddit.com/r/geopolitics/top/.rss?t=day',
|
| 99 |
+
'weight': 1.8,
|
| 100 |
+
'specialization': ['geopolitical'],
|
| 101 |
+
'category': 'geopolitical'
|
| 102 |
+
},
|
| 103 |
+
'worldnews': {
|
| 104 |
+
'url': 'https://www.reddit.com/r/worldnews/top/.rss?t=day',
|
| 105 |
+
'weight': 1.7,
|
| 106 |
+
'specialization': ['geopolitical'],
|
| 107 |
+
'category': 'geopolitical'
|
| 108 |
+
},
|
| 109 |
+
'neutralpolitics': {
|
| 110 |
+
'url': 'https://www.reddit.com/r/NeutralPolitics/top/.rss?t=day',
|
| 111 |
+
'weight': 1.6,
|
| 112 |
+
'specialization': ['geopolitical'],
|
| 113 |
+
'category': 'geopolitical'
|
| 114 |
+
},
|
| 115 |
+
}
|
| 116 |
+
|
| 117 |
+
# Keyword detection for additional categorization
|
| 118 |
+
MACRO_KEYWORDS = [
|
| 119 |
+
'Fed', 'ECB', 'BoE', 'BoJ', 'FOMC', 'Powell', 'Lagarde',
|
| 120 |
+
'interest rate', 'inflation', 'CPI', 'PPI', 'GDP',
|
| 121 |
+
'unemployment', 'jobs report', 'NFP', 'central bank',
|
| 122 |
+
'recession', 'QE', 'quantitative easing', 'monetary policy'
|
| 123 |
+
]
|
| 124 |
+
|
| 125 |
+
MARKETS_KEYWORDS = [
|
| 126 |
+
'stock', 'equity', 'bond', 'commodity', 'oil', 'gold',
|
| 127 |
+
'earnings', 'revenue', 'profit', 'IPO', 'merger',
|
| 128 |
+
'acquisition', 'trading', 'options', 'futures', 'forex'
|
| 129 |
+
]
|
| 130 |
+
|
| 131 |
+
GEOPOLITICAL_KEYWORDS = [
|
| 132 |
+
'war', 'conflict', 'sanction', 'trade', 'tariff',
|
| 133 |
+
'election', 'China', 'Russia', 'Ukraine', 'Taiwan',
|
| 134 |
+
'Middle East', 'Iran', 'Israel', 'NATO', 'UN'
|
| 135 |
+
]
|
| 136 |
+
|
| 137 |
+
def __init__(self):
|
| 138 |
+
"""Initialize Reddit monitor"""
|
| 139 |
+
pass
|
| 140 |
+
|
| 141 |
+
def _categorize_post(self, title: str, subreddit_info: Dict) -> str:
|
| 142 |
+
"""Categorize post based on title and subreddit"""
|
| 143 |
+
title_lower = title.lower()
|
| 144 |
+
|
| 145 |
+
# Use subreddit default category
|
| 146 |
+
default_category = subreddit_info.get('category', 'markets')
|
| 147 |
+
|
| 148 |
+
# Check keywords for override
|
| 149 |
+
if any(keyword.lower() in title_lower for keyword in self.MACRO_KEYWORDS):
|
| 150 |
+
return 'macro'
|
| 151 |
+
elif any(keyword.lower() in title_lower for keyword in self.GEOPOLITICAL_KEYWORDS):
|
| 152 |
+
return 'geopolitical'
|
| 153 |
+
elif any(keyword.lower() in title_lower for keyword in self.MARKETS_KEYWORDS):
|
| 154 |
+
return 'markets'
|
| 155 |
+
|
| 156 |
+
return default_category
|
| 157 |
+
|
| 158 |
+
def _detect_sentiment(self, title: str) -> str:
|
| 159 |
+
"""Simple sentiment detection based on keywords"""
|
| 160 |
+
title_lower = title.lower()
|
| 161 |
+
|
| 162 |
+
positive_words = ['bullish', 'bull', 'surge', 'gain', 'up', 'rally', 'boom', 'profit', 'growth']
|
| 163 |
+
negative_words = ['bearish', 'bear', 'crash', 'loss', 'down', 'fall', 'decline', 'recession', 'crisis']
|
| 164 |
+
|
| 165 |
+
positive_count = sum(1 for word in positive_words if word in title_lower)
|
| 166 |
+
negative_count = sum(1 for word in negative_words if word in title_lower)
|
| 167 |
+
|
| 168 |
+
if positive_count > negative_count:
|
| 169 |
+
return 'positive'
|
| 170 |
+
elif negative_count > positive_count:
|
| 171 |
+
return 'negative'
|
| 172 |
+
else:
|
| 173 |
+
return 'neutral'
|
| 174 |
+
|
| 175 |
+
def _calculate_impact(self, score: int, num_comments: int, subreddit_weight: float) -> str:
|
| 176 |
+
"""Calculate impact based on upvotes, comments, and subreddit weight"""
|
| 177 |
+
# Normalize score (upvotes - downvotes)
|
| 178 |
+
engagement_score = (score * 0.7) + (num_comments * 0.3)
|
| 179 |
+
weighted_score = engagement_score * subreddit_weight
|
| 180 |
+
|
| 181 |
+
if weighted_score > 500:
|
| 182 |
+
return 'high'
|
| 183 |
+
elif weighted_score > 100:
|
| 184 |
+
return 'medium'
|
| 185 |
+
else:
|
| 186 |
+
return 'low'
|
| 187 |
+
|
| 188 |
+
def scrape_reddit_news(self, max_posts: int = 100, hours: int = 12) -> List[Dict]:
|
| 189 |
+
"""
|
| 190 |
+
Scrape Reddit posts from financial subreddits
|
| 191 |
+
|
| 192 |
+
Args:
|
| 193 |
+
max_posts: Maximum number of posts to return
|
| 194 |
+
hours: Only include posts from the last N hours (default: 12)
|
| 195 |
+
|
| 196 |
+
Returns:
|
| 197 |
+
List of news items with metadata
|
| 198 |
+
"""
|
| 199 |
+
all_posts = []
|
| 200 |
+
seen_titles = set()
|
| 201 |
+
cutoff_time = datetime.now() - timedelta(hours=hours)
|
| 202 |
+
|
| 203 |
+
logger.info(f"Scraping Reddit posts from last {hours} hours...")
|
| 204 |
+
|
| 205 |
+
for subreddit_name, subreddit_info in self.SUBREDDITS.items():
|
| 206 |
+
try:
|
| 207 |
+
logger.info(f"Fetching r/{subreddit_name}...")
|
| 208 |
+
|
| 209 |
+
# Parse RSS feed
|
| 210 |
+
feed = feedparser.parse(subreddit_info['url'])
|
| 211 |
+
|
| 212 |
+
for entry in feed.entries[:20]: # Get top 20 per subreddit
|
| 213 |
+
try:
|
| 214 |
+
# Parse publication date
|
| 215 |
+
if hasattr(entry, 'published_parsed'):
|
| 216 |
+
pub_date = datetime(*entry.published_parsed[:6])
|
| 217 |
+
else:
|
| 218 |
+
pub_date = datetime.now()
|
| 219 |
+
|
| 220 |
+
# Filter by time (last 12 hours by default)
|
| 221 |
+
if pub_date < cutoff_time:
|
| 222 |
+
continue
|
| 223 |
+
|
| 224 |
+
# Extract title and link
|
| 225 |
+
title = entry.title.strip()
|
| 226 |
+
link = entry.link
|
| 227 |
+
|
| 228 |
+
# Deduplicate
|
| 229 |
+
title_hash = hash(title[:100])
|
| 230 |
+
if title_hash in seen_titles:
|
| 231 |
+
continue
|
| 232 |
+
seen_titles.add(title_hash)
|
| 233 |
+
|
| 234 |
+
# Extract score and comments from content
|
| 235 |
+
score = 0
|
| 236 |
+
num_comments = 0
|
| 237 |
+
if hasattr(entry, 'content'):
|
| 238 |
+
content_text = entry.content[0].value if entry.content else ''
|
| 239 |
+
# Try to extract score from content
|
| 240 |
+
score_match = re.search(r'(\d+)\s+points?', content_text)
|
| 241 |
+
if score_match:
|
| 242 |
+
score = int(score_match.group(1))
|
| 243 |
+
# Try to extract comments
|
| 244 |
+
comment_match = re.search(r'(\d+)\s+comments?', content_text)
|
| 245 |
+
if comment_match:
|
| 246 |
+
num_comments = int(comment_match.group(1))
|
| 247 |
+
|
| 248 |
+
# Categorize and analyze
|
| 249 |
+
category = self._categorize_post(title, subreddit_info)
|
| 250 |
+
sentiment = self._detect_sentiment(title)
|
| 251 |
+
impact = self._calculate_impact(score, num_comments, subreddit_info['weight'])
|
| 252 |
+
|
| 253 |
+
# Check if breaking news (high score in last 3 hours)
|
| 254 |
+
is_breaking = (
|
| 255 |
+
(datetime.now() - pub_date).total_seconds() < 10800 and # 3 hours
|
| 256 |
+
score > 1000
|
| 257 |
+
)
|
| 258 |
+
|
| 259 |
+
post_data = {
|
| 260 |
+
'title': title,
|
| 261 |
+
'summary': title, # Reddit posts don't have separate summaries
|
| 262 |
+
'url': link,
|
| 263 |
+
'source': f"r/{subreddit_name}",
|
| 264 |
+
'timestamp': pub_date,
|
| 265 |
+
'category': category,
|
| 266 |
+
'sentiment': sentiment,
|
| 267 |
+
'impact': impact,
|
| 268 |
+
'is_breaking': is_breaking,
|
| 269 |
+
'engagement': {
|
| 270 |
+
'score': score,
|
| 271 |
+
'comments': num_comments
|
| 272 |
+
},
|
| 273 |
+
'platform': 'reddit'
|
| 274 |
+
}
|
| 275 |
+
|
| 276 |
+
all_posts.append(post_data)
|
| 277 |
+
|
| 278 |
+
except Exception as e:
|
| 279 |
+
logger.error(f"Error processing entry from r/{subreddit_name}: {e}")
|
| 280 |
+
continue
|
| 281 |
+
|
| 282 |
+
logger.info(f"Fetched {len([p for p in all_posts if p['source'] == f'r/{subreddit_name}'])} posts from r/{subreddit_name}")
|
| 283 |
+
|
| 284 |
+
except Exception as e:
|
| 285 |
+
logger.error(f"Error fetching r/{subreddit_name}: {e}")
|
| 286 |
+
continue
|
| 287 |
+
|
| 288 |
+
# Sort by engagement score (weighted by source weight)
|
| 289 |
+
all_posts.sort(key=lambda x: x['engagement']['score'] * self.SUBREDDITS.get(
|
| 290 |
+
x['source'].replace('r/', ''), {}
|
| 291 |
+
).get('weight', 1.0), reverse=True)
|
| 292 |
+
|
| 293 |
+
logger.info(f"Total Reddit posts scraped: {len(all_posts)}")
|
| 294 |
+
|
| 295 |
+
return all_posts[:max_posts]
|
| 296 |
+
|
| 297 |
+
def get_statistics(self) -> Dict:
|
| 298 |
+
"""Get statistics about scraped Reddit posts"""
|
| 299 |
+
if not self.news_cache:
|
| 300 |
+
posts = self.scrape_reddit_news()
|
| 301 |
+
self.news_cache = posts
|
| 302 |
+
|
| 303 |
+
total = len(self.news_cache)
|
| 304 |
+
high_impact = len([p for p in self.news_cache if p['impact'] == 'high'])
|
| 305 |
+
breaking = len([p for p in self.news_cache if p.get('is_breaking', False)])
|
| 306 |
+
|
| 307 |
+
return {
|
| 308 |
+
'total': total,
|
| 309 |
+
'high_impact': high_impact,
|
| 310 |
+
'breaking': breaking,
|
| 311 |
+
'by_category': {
|
| 312 |
+
'macro': len([p for p in self.news_cache if p['category'] == 'macro']),
|
| 313 |
+
'markets': len([p for p in self.news_cache if p['category'] == 'markets']),
|
| 314 |
+
'geopolitical': len([p for p in self.news_cache if p['category'] == 'geopolitical'])
|
| 315 |
+
}
|
| 316 |
+
}
|
app/services/twitter_news_playwright.py
CHANGED
|
@@ -179,10 +179,6 @@ class TwitterFinanceMonitor:
|
|
| 179 |
|
| 180 |
def __init__(self):
|
| 181 |
"""Initialize monitor"""
|
| 182 |
-
self.news_cache = []
|
| 183 |
-
self.last_fetch = None
|
| 184 |
-
self.cache_ttl = 180 # 3 minutes
|
| 185 |
-
|
| 186 |
# Find Chromium executable
|
| 187 |
self.chromium_path = self._find_chromium()
|
| 188 |
|
|
@@ -311,22 +307,21 @@ class TwitterFinanceMonitor:
|
|
| 311 |
logger.error(f"Error scraping {source_name}: {e}")
|
| 312 |
return []
|
| 313 |
|
| 314 |
-
|
| 315 |
-
def scrape_twitter_news(_self, max_tweets: int = 100) -> List[Dict]:
|
| 316 |
"""
|
| 317 |
Scrape latest financial news from Twitter using Playwright
|
| 318 |
Runs in parallel for better performance - 19 sources in ~30-45 seconds
|
| 319 |
"""
|
| 320 |
if not PLAYWRIGHT_AVAILABLE:
|
| 321 |
logger.info("Playwright not available - using mock data")
|
| 322 |
-
return
|
| 323 |
|
| 324 |
all_news = []
|
| 325 |
seen_texts = set()
|
| 326 |
|
| 327 |
# Sort sources by weight (priority) - scrape high-value sources first
|
| 328 |
sorted_sources = sorted(
|
| 329 |
-
|
| 330 |
key=lambda x: x[1]['weight'],
|
| 331 |
reverse=True
|
| 332 |
)
|
|
@@ -337,7 +332,7 @@ class TwitterFinanceMonitor:
|
|
| 337 |
futures = []
|
| 338 |
for name, info in sorted_sources:
|
| 339 |
# Increased timeout for better success rate
|
| 340 |
-
future = executor.submit(
|
| 341 |
futures.append((future, name))
|
| 342 |
|
| 343 |
for future, source_name in futures:
|
|
@@ -365,7 +360,7 @@ class TwitterFinanceMonitor:
|
|
| 365 |
# If no news was fetched, use mock data
|
| 366 |
if not all_news:
|
| 367 |
logger.warning("No tweets fetched - using mock data")
|
| 368 |
-
return
|
| 369 |
|
| 370 |
# Sort by breaking news, then impact, then timestamp
|
| 371 |
all_news.sort(
|
|
|
|
| 179 |
|
| 180 |
def __init__(self):
|
| 181 |
"""Initialize monitor"""
|
|
|
|
|
|
|
|
|
|
|
|
|
| 182 |
# Find Chromium executable
|
| 183 |
self.chromium_path = self._find_chromium()
|
| 184 |
|
|
|
|
| 307 |
logger.error(f"Error scraping {source_name}: {e}")
|
| 308 |
return []
|
| 309 |
|
| 310 |
+
def scrape_twitter_news(self, max_tweets: int = 100) -> List[Dict]:
|
|
|
|
| 311 |
"""
|
| 312 |
Scrape latest financial news from Twitter using Playwright
|
| 313 |
Runs in parallel for better performance - 19 sources in ~30-45 seconds
|
| 314 |
"""
|
| 315 |
if not PLAYWRIGHT_AVAILABLE:
|
| 316 |
logger.info("Playwright not available - using mock data")
|
| 317 |
+
return self._get_mock_news()
|
| 318 |
|
| 319 |
all_news = []
|
| 320 |
seen_texts = set()
|
| 321 |
|
| 322 |
# Sort sources by weight (priority) - scrape high-value sources first
|
| 323 |
sorted_sources = sorted(
|
| 324 |
+
self.SOURCES.items(),
|
| 325 |
key=lambda x: x[1]['weight'],
|
| 326 |
reverse=True
|
| 327 |
)
|
|
|
|
| 332 |
futures = []
|
| 333 |
for name, info in sorted_sources:
|
| 334 |
# Increased timeout for better success rate
|
| 335 |
+
future = executor.submit(self._scrape_twitter_profile, name, info, timeout=30)
|
| 336 |
futures.append((future, name))
|
| 337 |
|
| 338 |
for future, source_name in futures:
|
|
|
|
| 360 |
# If no news was fetched, use mock data
|
| 361 |
if not all_news:
|
| 362 |
logger.warning("No tweets fetched - using mock data")
|
| 363 |
+
return self._get_mock_news()
|
| 364 |
|
| 365 |
# Sort by breaking news, then impact, then timestamp
|
| 366 |
all_news.sort(
|
app/utils/news_cache.py
ADDED
|
@@ -0,0 +1,347 @@
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 1 |
+
"""
|
| 2 |
+
Unified News Caching System
|
| 3 |
+
Centralized cache manager for Twitter, Reddit, and RSS news feeds
|
| 4 |
+
"""
|
| 5 |
+
|
| 6 |
+
import hashlib
|
| 7 |
+
import logging
|
| 8 |
+
import re
|
| 9 |
+
import pandas as pd
|
| 10 |
+
from datetime import datetime, timedelta
|
| 11 |
+
from typing import List, Dict, Optional, Callable
|
| 12 |
+
|
| 13 |
+
logger = logging.getLogger(__name__)
|
| 14 |
+
|
| 15 |
+
|
| 16 |
+
class NewsCacheManager:
|
| 17 |
+
"""
|
| 18 |
+
Centralized cache manager for news feeds with:
|
| 19 |
+
- Per-source caching with TTL
|
| 20 |
+
- Cross-service deduplication
|
| 21 |
+
- Filtered results caching
|
| 22 |
+
- Force refresh support
|
| 23 |
+
"""
|
| 24 |
+
|
| 25 |
+
def __init__(self, default_ttl: int = 180):
|
| 26 |
+
"""
|
| 27 |
+
Initialize cache manager
|
| 28 |
+
|
| 29 |
+
Args:
|
| 30 |
+
default_ttl: Default time-to-live in seconds (default: 180 = 3 minutes)
|
| 31 |
+
"""
|
| 32 |
+
self.cache = {
|
| 33 |
+
'twitter': {'raw_news': [], 'last_fetch': None, 'ttl': default_ttl},
|
| 34 |
+
'reddit': {'raw_news': [], 'last_fetch': None, 'ttl': default_ttl},
|
| 35 |
+
'rss': {'raw_news': [], 'last_fetch': None, 'ttl': default_ttl},
|
| 36 |
+
'dedup_index': {}, # Global deduplication index
|
| 37 |
+
'filtered_cache': {} # Cached filtered results
|
| 38 |
+
}
|
| 39 |
+
logger.info(f"NewsCacheManager initialized with {default_ttl}s TTL")
|
| 40 |
+
|
| 41 |
+
def get_news(
|
| 42 |
+
self,
|
| 43 |
+
source: str,
|
| 44 |
+
fetcher_func: Callable,
|
| 45 |
+
force_refresh: bool = False,
|
| 46 |
+
**kwargs
|
| 47 |
+
) -> List[Dict]:
|
| 48 |
+
"""
|
| 49 |
+
Get news from cache or fetch fresh if needed
|
| 50 |
+
|
| 51 |
+
Args:
|
| 52 |
+
source: News source ('twitter', 'reddit', 'rss')
|
| 53 |
+
fetcher_func: Function to fetch fresh news
|
| 54 |
+
force_refresh: If True, bypass cache and fetch fresh
|
| 55 |
+
**kwargs: Arguments to pass to fetcher_func
|
| 56 |
+
|
| 57 |
+
Returns:
|
| 58 |
+
List of news items
|
| 59 |
+
"""
|
| 60 |
+
if source not in ['twitter', 'reddit', 'rss']:
|
| 61 |
+
logger.error(f"Invalid source: {source}")
|
| 62 |
+
return []
|
| 63 |
+
|
| 64 |
+
# Force refresh clears dedup index for that source
|
| 65 |
+
if force_refresh:
|
| 66 |
+
self._clear_source_from_dedup(source)
|
| 67 |
+
|
| 68 |
+
# Check if cache is valid
|
| 69 |
+
if not force_refresh and self._is_cache_valid(source):
|
| 70 |
+
logger.info(f"β
Cache HIT for {source} (age: {self._get_cache_age(source):.1f}s)")
|
| 71 |
+
return self.cache[source]['raw_news']
|
| 72 |
+
|
| 73 |
+
# Cache miss or force refresh - fetch fresh news
|
| 74 |
+
logger.info(f"π Cache MISS for {source} - fetching fresh news...")
|
| 75 |
+
try:
|
| 76 |
+
new_items = fetcher_func(**kwargs)
|
| 77 |
+
|
| 78 |
+
if not new_items:
|
| 79 |
+
logger.warning(f"No news items fetched for {source}")
|
| 80 |
+
# Return cached data if available, even if expired
|
| 81 |
+
return self.cache[source]['raw_news']
|
| 82 |
+
|
| 83 |
+
# Update cache
|
| 84 |
+
self._update_cache(source, new_items)
|
| 85 |
+
|
| 86 |
+
# Deduplicate across sources
|
| 87 |
+
deduplicated = self._deduplicate(new_items, source)
|
| 88 |
+
|
| 89 |
+
logger.info(f"β
Fetched {len(new_items)} items for {source}, {len(deduplicated)} unique after dedup")
|
| 90 |
+
|
| 91 |
+
return deduplicated
|
| 92 |
+
|
| 93 |
+
except Exception as e:
|
| 94 |
+
logger.error(f"Error fetching news for {source}: {e}")
|
| 95 |
+
# Return cached data if available
|
| 96 |
+
return self.cache[source]['raw_news']
|
| 97 |
+
|
| 98 |
+
def _is_cache_valid(self, source: str) -> bool:
|
| 99 |
+
"""
|
| 100 |
+
Check if cached data is still fresh
|
| 101 |
+
|
| 102 |
+
Args:
|
| 103 |
+
source: News source to check
|
| 104 |
+
|
| 105 |
+
Returns:
|
| 106 |
+
True if cache is valid, False otherwise
|
| 107 |
+
"""
|
| 108 |
+
source_cache = self.cache[source]
|
| 109 |
+
if not source_cache['last_fetch']:
|
| 110 |
+
return False
|
| 111 |
+
|
| 112 |
+
age = (datetime.now() - source_cache['last_fetch']).total_seconds()
|
| 113 |
+
is_valid = age < source_cache['ttl']
|
| 114 |
+
|
| 115 |
+
return is_valid
|
| 116 |
+
|
| 117 |
+
def _get_cache_age(self, source: str) -> float:
|
| 118 |
+
"""
|
| 119 |
+
Get age of cached data in seconds
|
| 120 |
+
|
| 121 |
+
Args:
|
| 122 |
+
source: News source
|
| 123 |
+
|
| 124 |
+
Returns:
|
| 125 |
+
Age in seconds, or -1 if never fetched
|
| 126 |
+
"""
|
| 127 |
+
source_cache = self.cache[source]
|
| 128 |
+
if not source_cache['last_fetch']:
|
| 129 |
+
return -1
|
| 130 |
+
|
| 131 |
+
return (datetime.now() - source_cache['last_fetch']).total_seconds()
|
| 132 |
+
|
| 133 |
+
def _normalize_text(self, text: str) -> str:
|
| 134 |
+
"""
|
| 135 |
+
Normalize text for deduplication
|
| 136 |
+
|
| 137 |
+
Args:
|
| 138 |
+
text: Text to normalize
|
| 139 |
+
|
| 140 |
+
Returns:
|
| 141 |
+
Normalized text
|
| 142 |
+
"""
|
| 143 |
+
if not text:
|
| 144 |
+
return ""
|
| 145 |
+
|
| 146 |
+
# Convert to lowercase
|
| 147 |
+
text = text.lower().strip()
|
| 148 |
+
|
| 149 |
+
# Remove punctuation
|
| 150 |
+
text = re.sub(r'[^\w\s]', '', text)
|
| 151 |
+
|
| 152 |
+
# Normalize whitespace
|
| 153 |
+
text = re.sub(r'\s+', ' ', text)
|
| 154 |
+
|
| 155 |
+
return text
|
| 156 |
+
|
| 157 |
+
def _compute_hash(self, item: Dict) -> str:
|
| 158 |
+
"""
|
| 159 |
+
Compute content hash for deduplication
|
| 160 |
+
|
| 161 |
+
Args:
|
| 162 |
+
item: News item dict
|
| 163 |
+
|
| 164 |
+
Returns:
|
| 165 |
+
MD5 hash string
|
| 166 |
+
"""
|
| 167 |
+
title = self._normalize_text(item.get('title', ''))
|
| 168 |
+
summary = self._normalize_text(item.get('summary', '')[:200]) # First 200 chars
|
| 169 |
+
|
| 170 |
+
# Combine title and summary
|
| 171 |
+
combined = f"{title}|{summary}"
|
| 172 |
+
|
| 173 |
+
return hashlib.md5(combined.encode()).hexdigest()
|
| 174 |
+
|
| 175 |
+
def _deduplicate(self, items: List[Dict], source: str) -> List[Dict]:
|
| 176 |
+
"""
|
| 177 |
+
Remove duplicates using global dedup index
|
| 178 |
+
|
| 179 |
+
Args:
|
| 180 |
+
items: List of news items
|
| 181 |
+
source: Source name
|
| 182 |
+
|
| 183 |
+
Returns:
|
| 184 |
+
Deduplicated list of items
|
| 185 |
+
"""
|
| 186 |
+
deduplicated = []
|
| 187 |
+
duplicate_count = 0
|
| 188 |
+
|
| 189 |
+
for item in items:
|
| 190 |
+
content_hash = self._compute_hash(item)
|
| 191 |
+
|
| 192 |
+
if content_hash in self.cache['dedup_index']:
|
| 193 |
+
# Duplicate found - update sources list
|
| 194 |
+
dup_entry = self.cache['dedup_index'][content_hash]
|
| 195 |
+
if source not in dup_entry['sources']:
|
| 196 |
+
dup_entry['sources'].append(source)
|
| 197 |
+
duplicate_count += 1
|
| 198 |
+
else:
|
| 199 |
+
# New item - add to index and result
|
| 200 |
+
self.cache['dedup_index'][content_hash] = {
|
| 201 |
+
'first_seen': datetime.now(),
|
| 202 |
+
'sources': [source],
|
| 203 |
+
'canonical_item': item
|
| 204 |
+
}
|
| 205 |
+
deduplicated.append(item)
|
| 206 |
+
|
| 207 |
+
if duplicate_count > 0:
|
| 208 |
+
logger.info(f"π Deduplication: Found {duplicate_count} duplicates for {source}")
|
| 209 |
+
|
| 210 |
+
return deduplicated
|
| 211 |
+
|
| 212 |
+
def _update_cache(self, source: str, items: List[Dict]):
|
| 213 |
+
"""
|
| 214 |
+
Update cache with new items
|
| 215 |
+
|
| 216 |
+
Args:
|
| 217 |
+
source: News source
|
| 218 |
+
items: List of news items
|
| 219 |
+
"""
|
| 220 |
+
self.cache[source]['raw_news'] = items
|
| 221 |
+
self.cache[source]['last_fetch'] = datetime.now()
|
| 222 |
+
logger.info(f"π¦ Updated cache for {source} with {len(items)} items")
|
| 223 |
+
|
| 224 |
+
def get_filtered_news(
|
| 225 |
+
self,
|
| 226 |
+
source_df: pd.DataFrame,
|
| 227 |
+
filters: Dict,
|
| 228 |
+
source_name: str = "unknown"
|
| 229 |
+
) -> pd.DataFrame:
|
| 230 |
+
"""
|
| 231 |
+
Get filtered news with caching
|
| 232 |
+
|
| 233 |
+
Args:
|
| 234 |
+
source_df: Source dataframe
|
| 235 |
+
filters: Filter dict with 'category', 'sentiment', 'impact' keys
|
| 236 |
+
source_name: Name of source (for logging)
|
| 237 |
+
|
| 238 |
+
Returns:
|
| 239 |
+
Filtered dataframe
|
| 240 |
+
"""
|
| 241 |
+
if source_df.empty:
|
| 242 |
+
return source_df
|
| 243 |
+
|
| 244 |
+
# Create cache key from filters
|
| 245 |
+
category = filters.get('category', 'all')
|
| 246 |
+
sentiment = filters.get('sentiment', 'all')
|
| 247 |
+
impact = filters.get('impact', 'all')
|
| 248 |
+
cache_key = f"{source_name}_{category}_{sentiment}_{impact}"
|
| 249 |
+
|
| 250 |
+
# Check filtered cache
|
| 251 |
+
if cache_key in self.cache['filtered_cache']:
|
| 252 |
+
cached_entry = self.cache['filtered_cache'][cache_key]
|
| 253 |
+
if datetime.now() < cached_entry['expires_at']:
|
| 254 |
+
logger.debug(f"β
Filtered cache HIT for {cache_key}")
|
| 255 |
+
return cached_entry['results']
|
| 256 |
+
|
| 257 |
+
# Apply filters
|
| 258 |
+
filtered_df = source_df.copy()
|
| 259 |
+
|
| 260 |
+
if category != 'all':
|
| 261 |
+
filtered_df = filtered_df[filtered_df['category'] == category]
|
| 262 |
+
|
| 263 |
+
if sentiment != 'all':
|
| 264 |
+
filtered_df = filtered_df[filtered_df['sentiment'] == sentiment]
|
| 265 |
+
|
| 266 |
+
if impact != 'all':
|
| 267 |
+
filtered_df = filtered_df[filtered_df['impact'] == impact]
|
| 268 |
+
|
| 269 |
+
logger.debug(f"π Filtered {source_name}: {len(source_df)} β {len(filtered_df)} items")
|
| 270 |
+
|
| 271 |
+
# Cache filtered results (5 minute TTL)
|
| 272 |
+
self.cache['filtered_cache'][cache_key] = {
|
| 273 |
+
'results': filtered_df,
|
| 274 |
+
'expires_at': datetime.now() + timedelta(seconds=300)
|
| 275 |
+
}
|
| 276 |
+
|
| 277 |
+
return filtered_df
|
| 278 |
+
|
| 279 |
+
def _clear_source_from_dedup(self, source: str):
|
| 280 |
+
"""
|
| 281 |
+
Remove all entries from dedup index that only belong to this source
|
| 282 |
+
|
| 283 |
+
Args:
|
| 284 |
+
source: Source to remove from dedup index
|
| 285 |
+
"""
|
| 286 |
+
to_remove = []
|
| 287 |
+
for content_hash, entry in self.cache['dedup_index'].items():
|
| 288 |
+
# Remove source from sources list
|
| 289 |
+
if source in entry['sources']:
|
| 290 |
+
entry['sources'].remove(source)
|
| 291 |
+
# If no sources left, mark for removal
|
| 292 |
+
if not entry['sources']:
|
| 293 |
+
to_remove.append(content_hash)
|
| 294 |
+
|
| 295 |
+
# Remove entries with no sources
|
| 296 |
+
for content_hash in to_remove:
|
| 297 |
+
del self.cache['dedup_index'][content_hash]
|
| 298 |
+
|
| 299 |
+
if to_remove:
|
| 300 |
+
logger.info(f"ποΈ Removed {len(to_remove)} entries from dedup index for {source}")
|
| 301 |
+
|
| 302 |
+
def clear_cache(self, source: Optional[str] = None):
|
| 303 |
+
"""
|
| 304 |
+
Clear cache for specific source or all sources
|
| 305 |
+
|
| 306 |
+
Args:
|
| 307 |
+
source: Source to clear, or None to clear all
|
| 308 |
+
"""
|
| 309 |
+
if source:
|
| 310 |
+
self.cache[source] = {'raw_news': [], 'last_fetch': None, 'ttl': 180}
|
| 311 |
+
self._clear_source_from_dedup(source)
|
| 312 |
+
logger.info(f"ποΈ Cleared cache for {source}")
|
| 313 |
+
else:
|
| 314 |
+
for src in ['twitter', 'reddit', 'rss']:
|
| 315 |
+
self.cache[src] = {'raw_news': [], 'last_fetch': None, 'ttl': 180}
|
| 316 |
+
self.cache['dedup_index'] = {}
|
| 317 |
+
self.cache['filtered_cache'] = {}
|
| 318 |
+
logger.info("ποΈ Cleared ALL caches")
|
| 319 |
+
|
| 320 |
+
def get_statistics(self) -> Dict:
|
| 321 |
+
"""
|
| 322 |
+
Get cache statistics
|
| 323 |
+
|
| 324 |
+
Returns:
|
| 325 |
+
Dict with cache stats
|
| 326 |
+
"""
|
| 327 |
+
stats = {
|
| 328 |
+
'twitter': {
|
| 329 |
+
'items': len(self.cache['twitter']['raw_news']),
|
| 330 |
+
'age_seconds': self._get_cache_age('twitter'),
|
| 331 |
+
'is_valid': self._is_cache_valid('twitter')
|
| 332 |
+
},
|
| 333 |
+
'reddit': {
|
| 334 |
+
'items': len(self.cache['reddit']['raw_news']),
|
| 335 |
+
'age_seconds': self._get_cache_age('reddit'),
|
| 336 |
+
'is_valid': self._is_cache_valid('reddit')
|
| 337 |
+
},
|
| 338 |
+
'rss': {
|
| 339 |
+
'items': len(self.cache['rss']['raw_news']),
|
| 340 |
+
'age_seconds': self._get_cache_age('rss'),
|
| 341 |
+
'is_valid': self._is_cache_valid('rss')
|
| 342 |
+
},
|
| 343 |
+
'dedup_index_size': len(self.cache['dedup_index']),
|
| 344 |
+
'filtered_cache_size': len(self.cache['filtered_cache'])
|
| 345 |
+
}
|
| 346 |
+
|
| 347 |
+
return stats
|