ifieryarrows commited on
Commit
b163e21
·
verified ·
1 Parent(s): 86c0ef4

Update app/data_manager.py

Browse files
Files changed (1) hide show
  1. app/data_manager.py +587 -559
app/data_manager.py CHANGED
@@ -1,559 +1,587 @@
1
- """
2
- Data Manager: News and price data ingestion.
3
-
4
- Handles:
5
- - NewsAPI fetching (if API key provided)
6
- - RSS feed fallback (Google News)
7
- - Fuzzy deduplication for RSS noise
8
- - Multi-symbol yfinance price ingestion
9
- - Language filtering for FinBERT compatibility
10
-
11
- Usage:
12
- python -m app.data_manager --fetch
13
- python -m app.data_manager --fetch --news-only
14
- python -m app.data_manager --fetch --prices-only
15
- """
16
-
17
- import argparse
18
- import logging
19
- from datetime import datetime, timedelta, timezone
20
- from typing import Optional
21
-
22
- import requests
23
- import yfinance as yf
24
- from rapidfuzz import fuzz
25
- from langdetect import detect, LangDetectException
26
- from sqlalchemy.dialects.sqlite import insert as sqlite_insert
27
- from sqlalchemy.orm import Session
28
-
29
- from app.db import SessionLocal, init_db
30
- from app.models import NewsArticle, PriceBar
31
- from app.settings import get_settings
32
- from app.rss_ingest import fetch_google_news
33
- from app.utils import (
34
- clean_text,
35
- canonical_title,
36
- normalize_url,
37
- generate_dedup_key,
38
- truncate_text,
39
- )
40
- from app.lock import pipeline_lock
41
-
42
- logging.basicConfig(
43
- level=logging.INFO,
44
- format="%(asctime)s - %(levelname)s - %(message)s"
45
- )
46
- logger = logging.getLogger(__name__)
47
-
48
-
49
- # =============================================================================
50
- # NewsAPI Fetching
51
- # =============================================================================
52
-
53
- def fetch_newsapi_articles(
54
- api_key: str,
55
- query: str,
56
- language: str = "en",
57
- lookback_days: int = 30,
58
- page_size: int = 100
59
- ) -> list[dict]:
60
- """
61
- Fetch articles from NewsAPI.
62
-
63
- Note: Free plan limits to ~1 month of history.
64
- """
65
- logger.info(f"Fetching from NewsAPI: query='{query}', language={language}")
66
-
67
- # Calculate date range
68
- to_date = datetime.now(timezone.utc)
69
- from_date = to_date - timedelta(days=min(lookback_days, 30)) # API limit
70
-
71
- url = "https://newsapi.org/v2/everything"
72
- params = {
73
- "apiKey": api_key,
74
- "q": query,
75
- "language": language,
76
- "from": from_date.strftime("%Y-%m-%d"),
77
- "to": to_date.strftime("%Y-%m-%d"),
78
- "sortBy": "publishedAt",
79
- "pageSize": page_size,
80
- }
81
-
82
- try:
83
- response = requests.get(url, params=params, timeout=30)
84
- response.raise_for_status()
85
- data = response.json()
86
-
87
- if data.get("status") != "ok":
88
- logger.error(f"NewsAPI error: {data.get('message', 'Unknown error')}")
89
- return []
90
-
91
- articles = []
92
- for item in data.get("articles", []):
93
- try:
94
- published_str = item.get("publishedAt", "")
95
- published_at = datetime.fromisoformat(published_str.replace("Z", "+00:00")) if published_str else datetime.now(timezone.utc)
96
-
97
- articles.append({
98
- "title": item.get("title", ""),
99
- "description": item.get("description", ""),
100
- "content": item.get("content", ""),
101
- "url": item.get("url", ""),
102
- "source": item.get("source", {}).get("name", ""),
103
- "author": item.get("author", ""),
104
- "published_at": published_at,
105
- })
106
- except Exception as e:
107
- logger.debug(f"Error parsing NewsAPI article: {e}")
108
- continue
109
-
110
- logger.info(f"Fetched {len(articles)} articles from NewsAPI")
111
- return articles
112
-
113
- except requests.RequestException as e:
114
- logger.error(f"NewsAPI request failed: {e}")
115
- return []
116
-
117
-
118
- # =============================================================================
119
- # Language Detection
120
- # =============================================================================
121
-
122
- def detect_language(text: str) -> Optional[str]:
123
- """Detect language of text. Returns None if detection fails."""
124
- if not text or len(text) < 20:
125
- return None
126
-
127
- try:
128
- return detect(text)
129
- except LangDetectException:
130
- return None
131
-
132
-
133
- def filter_by_language(
134
- articles: list[dict],
135
- target_language: str = "en"
136
- ) -> tuple[list[dict], int]:
137
- """
138
- Filter articles by language.
139
-
140
- Returns:
141
- Tuple of (filtered_articles, num_filtered_out)
142
- """
143
- filtered = []
144
- filtered_out = 0
145
-
146
- for article in articles:
147
- # Try to detect from title + description
148
- text = f"{article.get('title', '')} {article.get('description', '')}"
149
- lang = detect_language(text)
150
-
151
- if lang is None or lang == target_language:
152
- filtered.append(article)
153
- else:
154
- filtered_out += 1
155
- logger.debug(f"Filtered out ({lang}): {article.get('title', '')[:50]}")
156
-
157
- if filtered_out > 0:
158
- logger.info(f"Language filter: kept {len(filtered)}, filtered out {filtered_out}")
159
-
160
- return filtered, filtered_out
161
-
162
-
163
- # =============================================================================
164
- # Fuzzy Deduplication
165
- # =============================================================================
166
-
167
- def get_recent_titles(
168
- session: Session,
169
- window_hours: int = 48
170
- ) -> list[str]:
171
- """Get canonical titles from recent articles for fuzzy dedup."""
172
- cutoff = datetime.now(timezone.utc) - timedelta(hours=window_hours)
173
-
174
- articles = session.query(NewsArticle.canonical_title).filter(
175
- NewsArticle.published_at >= cutoff,
176
- NewsArticle.canonical_title.isnot(None)
177
- ).all()
178
-
179
- return [a[0] for a in articles if a[0]]
180
-
181
-
182
- def is_fuzzy_duplicate(
183
- title: str,
184
- existing_titles: list[str],
185
- threshold: int = 85
186
- ) -> bool:
187
- """
188
- Check if title is too similar to existing titles.
189
- Uses token_set_ratio for robust matching.
190
- """
191
- if not title or not existing_titles:
192
- return False
193
-
194
- canon = canonical_title(title)
195
-
196
- for existing in existing_titles:
197
- similarity = fuzz.token_set_ratio(canon, existing)
198
- if similarity >= threshold:
199
- logger.debug(f"Fuzzy duplicate ({similarity}%): '{title[:50]}...'")
200
- return True
201
-
202
- return False
203
-
204
-
205
- # =============================================================================
206
- # News Ingestion
207
- # =============================================================================
208
-
209
- def ingest_news(session: Session) -> dict:
210
- """
211
- Ingest news from all configured sources.
212
-
213
- Returns:
214
- Dict with stats: imported, duplicates, language_filtered, fuzzy_filtered
215
- """
216
- settings = get_settings()
217
-
218
- stats = {
219
- "imported": 0,
220
- "duplicates": 0,
221
- "language_filtered": 0,
222
- "fuzzy_filtered": 0,
223
- "source": "unknown",
224
- }
225
-
226
- # Collect articles from sources
227
- all_articles = []
228
-
229
- # Try NewsAPI first if key is available
230
- if settings.newsapi_key:
231
- articles = fetch_newsapi_articles(
232
- api_key=settings.newsapi_key,
233
- query=settings.news_query,
234
- language=settings.news_language,
235
- lookback_days=settings.lookback_days,
236
- )
237
- if articles:
238
- all_articles.extend(articles)
239
- stats["source"] = "newsapi"
240
-
241
- # RSS fallback/supplement
242
- if not all_articles or not settings.newsapi_key:
243
- rss_articles = fetch_google_news(
244
- query=settings.news_query,
245
- language=settings.news_language,
246
- )
247
- all_articles.extend(rss_articles)
248
- stats["source"] = "rss" if not settings.newsapi_key else "newsapi+rss"
249
-
250
- if not all_articles:
251
- logger.warning("No articles fetched from any source")
252
- return stats
253
-
254
- logger.info(f"Total articles fetched: {len(all_articles)}")
255
-
256
- # Language filter
257
- all_articles, lang_filtered = filter_by_language(
258
- all_articles,
259
- target_language=settings.news_language
260
- )
261
- stats["language_filtered"] = lang_filtered
262
-
263
- # Get recent titles for fuzzy dedup
264
- recent_titles = get_recent_titles(
265
- session,
266
- window_hours=settings.fuzzy_dedup_window_hours
267
- )
268
-
269
- # Process articles
270
- for article in all_articles:
271
- try:
272
- title = clean_text(article.get("title", ""))
273
- if not title:
274
- continue
275
-
276
- # Fuzzy dedup check
277
- if is_fuzzy_duplicate(
278
- title,
279
- recent_titles,
280
- threshold=settings.fuzzy_dedup_threshold
281
- ):
282
- stats["fuzzy_filtered"] += 1
283
- continue
284
-
285
- # Prepare fields
286
- description = clean_text(article.get("description", ""))
287
- content = clean_text(article.get("content", ""))
288
- url = normalize_url(article.get("url", ""))
289
- source = article.get("source", "Unknown")
290
- author = article.get("author", "")
291
- published_at = article.get("published_at", datetime.now(timezone.utc))
292
-
293
- # Generate keys
294
- dedup_key = generate_dedup_key(
295
- url=url,
296
- title=title,
297
- published_at=published_at,
298
- source=source
299
- )
300
- canon_title = canonical_title(title)
301
-
302
- # Upsert
303
- stmt = sqlite_insert(NewsArticle).values(
304
- dedup_key=dedup_key,
305
- title=truncate_text(title, 500),
306
- canonical_title=truncate_text(canon_title, 500),
307
- description=truncate_text(description, 2000) if description else None,
308
- content=truncate_text(content, 10000) if content else None,
309
- url=url or None,
310
- source=source,
311
- author=author or None,
312
- language=settings.news_language,
313
- published_at=published_at,
314
- fetched_at=datetime.now(timezone.utc),
315
- ).on_conflict_do_nothing(index_elements=["dedup_key"])
316
-
317
- result = session.execute(stmt)
318
-
319
- if result.rowcount > 0:
320
- stats["imported"] += 1
321
- # Add to recent titles for this batch
322
- recent_titles.append(canon_title)
323
- else:
324
- stats["duplicates"] += 1
325
-
326
- except Exception as e:
327
- logger.warning(f"Error processing article: {e}")
328
- continue
329
-
330
- session.commit()
331
-
332
- logger.info(
333
- f"News ingestion complete: "
334
- f"{stats['imported']} imported, "
335
- f"{stats['duplicates']} duplicates, "
336
- f"{stats['fuzzy_filtered']} fuzzy filtered, "
337
- f"{stats['language_filtered']} language filtered"
338
- )
339
-
340
- return stats
341
-
342
-
343
- # =============================================================================
344
- # Price Ingestion
345
- # =============================================================================
346
-
347
- def ingest_prices(session: Session) -> dict:
348
- """
349
- Ingest price data for all configured symbols.
350
-
351
- Returns:
352
- Dict with stats per symbol
353
- """
354
- settings = get_settings()
355
- symbols = settings.symbols_list
356
-
357
- stats = {}
358
-
359
- # Calculate date range
360
- end_date = datetime.now(timezone.utc)
361
- start_date = end_date - timedelta(days=settings.lookback_days)
362
-
363
- for symbol in symbols:
364
- logger.info(f"Fetching prices for {symbol}...")
365
-
366
- try:
367
- ticker = yf.Ticker(symbol)
368
- df = ticker.history(
369
- start=start_date.strftime("%Y-%m-%d"),
370
- end=end_date.strftime("%Y-%m-%d"),
371
- interval="1d"
372
- )
373
-
374
- if df.empty:
375
- logger.warning(f"No data returned for {symbol}")
376
- stats[symbol] = {"imported": 0, "duplicates": 0, "error": "no_data"}
377
- continue
378
-
379
- imported = 0
380
- duplicates = 0
381
-
382
- for date_idx, row in df.iterrows():
383
- try:
384
- # Convert index to datetime
385
- if hasattr(date_idx, 'to_pydatetime'):
386
- bar_date = date_idx.to_pydatetime()
387
- else:
388
- bar_date = date_idx
389
-
390
- # Ensure timezone
391
- if bar_date.tzinfo is None:
392
- bar_date = bar_date.replace(tzinfo=timezone.utc)
393
-
394
- # Upsert
395
- stmt = sqlite_insert(PriceBar).values(
396
- symbol=symbol,
397
- date=bar_date,
398
- open=float(row.get("Open", 0)) if row.get("Open") else None,
399
- high=float(row.get("High", 0)) if row.get("High") else None,
400
- low=float(row.get("Low", 0)) if row.get("Low") else None,
401
- close=float(row["Close"]),
402
- volume=float(row.get("Volume", 0)) if row.get("Volume") else None,
403
- adj_close=float(row.get("Adj Close", row["Close"])),
404
- fetched_at=datetime.now(timezone.utc),
405
- ).on_conflict_do_update(
406
- index_elements=["symbol", "date"],
407
- set_={
408
- "close": float(row["Close"]),
409
- "adj_close": float(row.get("Adj Close", row["Close"])),
410
- "fetched_at": datetime.now(timezone.utc),
411
- }
412
- )
413
-
414
- result = session.execute(stmt)
415
-
416
- if result.rowcount > 0:
417
- imported += 1
418
- else:
419
- duplicates += 1
420
-
421
- except Exception as e:
422
- logger.debug(f"Error processing price bar: {e}")
423
- continue
424
-
425
- session.commit()
426
-
427
- stats[symbol] = {"imported": imported, "duplicates": duplicates}
428
- logger.info(f"{symbol}: {imported} bars imported, {duplicates} updated")
429
-
430
- except Exception as e:
431
- logger.error(f"Failed to fetch {symbol}: {e}")
432
- stats[symbol] = {"imported": 0, "duplicates": 0, "error": str(e)}
433
-
434
- return stats
435
-
436
-
437
- # =============================================================================
438
- # Main Entry Point
439
- # =============================================================================
440
-
441
- def fetch_all(
442
- news: bool = True,
443
- prices: bool = True
444
- ) -> dict:
445
- """
446
- Run full data ingestion pipeline.
447
-
448
- Args:
449
- news: Whether to fetch news
450
- prices: Whether to fetch prices
451
-
452
- Returns:
453
- Combined stats dict
454
- """
455
- logger.info("Starting data ingestion pipeline...")
456
-
457
- results = {
458
- "news": None,
459
- "prices": None,
460
- "timestamp": datetime.now(timezone.utc).isoformat(),
461
- }
462
-
463
- with SessionLocal() as session:
464
- if news:
465
- results["news"] = ingest_news(session)
466
-
467
- if prices:
468
- results["prices"] = ingest_prices(session)
469
-
470
- logger.info("Data ingestion complete")
471
- return results
472
-
473
-
474
- def main():
475
- parser = argparse.ArgumentParser(
476
- description="Fetch news and price data"
477
- )
478
- parser.add_argument(
479
- "--fetch",
480
- action="store_true",
481
- help="Run data fetch"
482
- )
483
- parser.add_argument(
484
- "--news-only",
485
- action="store_true",
486
- help="Fetch only news"
487
- )
488
- parser.add_argument(
489
- "--prices-only",
490
- action="store_true",
491
- help="Fetch only prices"
492
- )
493
- parser.add_argument(
494
- "--no-lock",
495
- action="store_true",
496
- help="Skip pipeline lock (for testing)"
497
- )
498
- parser.add_argument(
499
- "--verbose", "-v",
500
- action="store_true",
501
- help="Verbose logging"
502
- )
503
-
504
- args = parser.parse_args()
505
-
506
- if args.verbose:
507
- logging.getLogger().setLevel(logging.DEBUG)
508
-
509
- if not args.fetch:
510
- parser.print_help()
511
- return
512
-
513
- # Initialize database
514
- logger.info("Initializing database...")
515
- init_db()
516
-
517
- # Determine what to fetch
518
- fetch_news = not args.prices_only
519
- fetch_prices = not args.news_only
520
-
521
- # Run with or without lock
522
- if args.no_lock:
523
- results = fetch_all(news=fetch_news, prices=fetch_prices)
524
- else:
525
- try:
526
- with pipeline_lock():
527
- results = fetch_all(news=fetch_news, prices=fetch_prices)
528
- except RuntimeError as e:
529
- logger.error(f"Could not acquire lock: {e}")
530
- logger.info("Another pipeline process may be running. Use --no-lock to bypass.")
531
- return
532
-
533
- # Print summary
534
- print("\n" + "=" * 50)
535
- print("DATA INGESTION SUMMARY")
536
- print("=" * 50)
537
-
538
- if results.get("news"):
539
- news = results["news"]
540
- print(f"\nNews ({news.get('source', 'unknown')}):")
541
- print(f" - Imported: {news.get('imported', 0)}")
542
- print(f" - Duplicates: {news.get('duplicates', 0)}")
543
- print(f" - Fuzzy filtered: {news.get('fuzzy_filtered', 0)}")
544
- print(f" - Language filtered: {news.get('language_filtered', 0)}")
545
-
546
- if results.get("prices"):
547
- print("\nPrices:")
548
- for symbol, stats in results["prices"].items():
549
- status = f"{stats.get('imported', 0)} imported"
550
- if stats.get("error"):
551
- status = f"ERROR: {stats['error']}"
552
- print(f" - {symbol}: {status}")
553
-
554
- print(f"\nTimestamp: {results.get('timestamp', 'N/A')}")
555
-
556
-
557
- if __name__ == "__main__":
558
- main()
559
-
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Data Manager: News and price data ingestion.
3
+
4
+ Handles:
5
+ - NewsAPI fetching (if API key provided)
6
+ - RSS feed fallback (Google News)
7
+ - Fuzzy deduplication for RSS noise
8
+ - Multi-symbol yfinance price ingestion
9
+ - Language filtering for FinBERT compatibility
10
+
11
+ Usage:
12
+ python -m app.data_manager --fetch
13
+ python -m app.data_manager --fetch --news-only
14
+ python -m app.data_manager --fetch --prices-only
15
+ """
16
+
17
+ import argparse
18
+ import logging
19
+ from datetime import datetime, timedelta, timezone
20
+ from typing import Optional
21
+
22
+ import requests
23
+ import yfinance as yf
24
+ from rapidfuzz import fuzz
25
+ from langdetect import detect, LangDetectException
26
+ from sqlalchemy.dialects.sqlite import insert as sqlite_insert
27
+ from sqlalchemy.dialects.postgresql import insert as pg_insert
28
+ from sqlalchemy.orm import Session
29
+
30
+ from app.db import SessionLocal, init_db, get_db_type
31
+ from app.models import NewsArticle, PriceBar
32
+ from app.settings import get_settings
33
+ from app.rss_ingest import fetch_google_news
34
+ from app.utils import (
35
+ clean_text,
36
+ canonical_title,
37
+ normalize_url,
38
+ generate_dedup_key,
39
+ truncate_text,
40
+ )
41
+ from app.lock import pipeline_lock
42
+
43
+ logging.basicConfig(
44
+ level=logging.INFO,
45
+ format="%(asctime)s - %(levelname)s - %(message)s"
46
+ )
47
+ logger = logging.getLogger(__name__)
48
+
49
+
50
+ def get_upsert_stmt(table, values: dict, index_elements: list, update_set: dict = None):
51
+ """Create database-agnostic upsert statement."""
52
+ db_type = get_db_type()
53
+
54
+ if db_type == "postgresql":
55
+ stmt = pg_insert(table).values(**values)
56
+ if update_set:
57
+ stmt = stmt.on_conflict_do_update(index_elements=index_elements, set_=update_set)
58
+ else:
59
+ stmt = stmt.on_conflict_do_nothing(index_elements=index_elements)
60
+ else:
61
+ # SQLite
62
+ stmt = sqlite_insert(table).values(**values)
63
+ if update_set:
64
+ stmt = stmt.on_conflict_do_update(index_elements=index_elements, set_=update_set)
65
+ else:
66
+ stmt = stmt.on_conflict_do_nothing(index_elements=index_elements)
67
+
68
+ return stmt
69
+
70
+
71
+ # =============================================================================
72
+ # NewsAPI Fetching
73
+ # =============================================================================
74
+
75
+ def fetch_newsapi_articles(
76
+ api_key: str,
77
+ query: str,
78
+ language: str = "en",
79
+ lookback_days: int = 30,
80
+ page_size: int = 100
81
+ ) -> list[dict]:
82
+ """
83
+ Fetch articles from NewsAPI.
84
+
85
+ Note: Free plan limits to ~1 month of history.
86
+ """
87
+ logger.info(f"Fetching from NewsAPI: query='{query}', language={language}")
88
+
89
+ # Calculate date range
90
+ to_date = datetime.now(timezone.utc)
91
+ from_date = to_date - timedelta(days=min(lookback_days, 30)) # API limit
92
+
93
+ url = "https://newsapi.org/v2/everything"
94
+ params = {
95
+ "apiKey": api_key,
96
+ "q": query,
97
+ "language": language,
98
+ "from": from_date.strftime("%Y-%m-%d"),
99
+ "to": to_date.strftime("%Y-%m-%d"),
100
+ "sortBy": "publishedAt",
101
+ "pageSize": page_size,
102
+ }
103
+
104
+ try:
105
+ response = requests.get(url, params=params, timeout=30)
106
+ response.raise_for_status()
107
+ data = response.json()
108
+
109
+ if data.get("status") != "ok":
110
+ logger.error(f"NewsAPI error: {data.get('message', 'Unknown error')}")
111
+ return []
112
+
113
+ articles = []
114
+ for item in data.get("articles", []):
115
+ try:
116
+ published_str = item.get("publishedAt", "")
117
+ published_at = datetime.fromisoformat(published_str.replace("Z", "+00:00")) if published_str else datetime.now(timezone.utc)
118
+
119
+ articles.append({
120
+ "title": item.get("title", ""),
121
+ "description": item.get("description", ""),
122
+ "content": item.get("content", ""),
123
+ "url": item.get("url", ""),
124
+ "source": item.get("source", {}).get("name", ""),
125
+ "author": item.get("author", ""),
126
+ "published_at": published_at,
127
+ })
128
+ except Exception as e:
129
+ logger.debug(f"Error parsing NewsAPI article: {e}")
130
+ continue
131
+
132
+ logger.info(f"Fetched {len(articles)} articles from NewsAPI")
133
+ return articles
134
+
135
+ except requests.RequestException as e:
136
+ logger.error(f"NewsAPI request failed: {e}")
137
+ return []
138
+
139
+
140
+ # =============================================================================
141
+ # Language Detection
142
+ # =============================================================================
143
+
144
+ def detect_language(text: str) -> Optional[str]:
145
+ """Detect language of text. Returns None if detection fails."""
146
+ if not text or len(text) < 20:
147
+ return None
148
+
149
+ try:
150
+ return detect(text)
151
+ except LangDetectException:
152
+ return None
153
+
154
+
155
+ def filter_by_language(
156
+ articles: list[dict],
157
+ target_language: str = "en"
158
+ ) -> tuple[list[dict], int]:
159
+ """
160
+ Filter articles by language.
161
+
162
+ Returns:
163
+ Tuple of (filtered_articles, num_filtered_out)
164
+ """
165
+ filtered = []
166
+ filtered_out = 0
167
+
168
+ for article in articles:
169
+ # Try to detect from title + description
170
+ text = f"{article.get('title', '')} {article.get('description', '')}"
171
+ lang = detect_language(text)
172
+
173
+ if lang is None or lang == target_language:
174
+ filtered.append(article)
175
+ else:
176
+ filtered_out += 1
177
+ logger.debug(f"Filtered out ({lang}): {article.get('title', '')[:50]}")
178
+
179
+ if filtered_out > 0:
180
+ logger.info(f"Language filter: kept {len(filtered)}, filtered out {filtered_out}")
181
+
182
+ return filtered, filtered_out
183
+
184
+
185
+ # =============================================================================
186
+ # Fuzzy Deduplication
187
+ # =============================================================================
188
+
189
+ def get_recent_titles(
190
+ session: Session,
191
+ window_hours: int = 48
192
+ ) -> list[str]:
193
+ """Get canonical titles from recent articles for fuzzy dedup."""
194
+ cutoff = datetime.now(timezone.utc) - timedelta(hours=window_hours)
195
+
196
+ articles = session.query(NewsArticle.canonical_title).filter(
197
+ NewsArticle.published_at >= cutoff,
198
+ NewsArticle.canonical_title.isnot(None)
199
+ ).all()
200
+
201
+ return [a[0] for a in articles if a[0]]
202
+
203
+
204
+ def is_fuzzy_duplicate(
205
+ title: str,
206
+ existing_titles: list[str],
207
+ threshold: int = 85
208
+ ) -> bool:
209
+ """
210
+ Check if title is too similar to existing titles.
211
+ Uses token_set_ratio for robust matching.
212
+ """
213
+ if not title or not existing_titles:
214
+ return False
215
+
216
+ canon = canonical_title(title)
217
+
218
+ for existing in existing_titles:
219
+ similarity = fuzz.token_set_ratio(canon, existing)
220
+ if similarity >= threshold:
221
+ logger.debug(f"Fuzzy duplicate ({similarity}%): '{title[:50]}...'")
222
+ return True
223
+
224
+ return False
225
+
226
+
227
+ # =============================================================================
228
+ # News Ingestion
229
+ # =============================================================================
230
+
231
+ def ingest_news(session: Session) -> dict:
232
+ """
233
+ Ingest news from all configured sources.
234
+
235
+ Returns:
236
+ Dict with stats: imported, duplicates, language_filtered, fuzzy_filtered
237
+ """
238
+ settings = get_settings()
239
+
240
+ stats = {
241
+ "imported": 0,
242
+ "duplicates": 0,
243
+ "language_filtered": 0,
244
+ "fuzzy_filtered": 0,
245
+ "source": "unknown",
246
+ }
247
+
248
+ # Collect articles from sources
249
+ all_articles = []
250
+
251
+ # Try NewsAPI first if key is available
252
+ if settings.newsapi_key:
253
+ articles = fetch_newsapi_articles(
254
+ api_key=settings.newsapi_key,
255
+ query=settings.news_query,
256
+ language=settings.news_language,
257
+ lookback_days=settings.lookback_days,
258
+ )
259
+ if articles:
260
+ all_articles.extend(articles)
261
+ stats["source"] = "newsapi"
262
+
263
+ # RSS fallback/supplement
264
+ if not all_articles or not settings.newsapi_key:
265
+ rss_articles = fetch_google_news(
266
+ query=settings.news_query,
267
+ language=settings.news_language,
268
+ )
269
+ all_articles.extend(rss_articles)
270
+ stats["source"] = "rss" if not settings.newsapi_key else "newsapi+rss"
271
+
272
+ if not all_articles:
273
+ logger.warning("No articles fetched from any source")
274
+ return stats
275
+
276
+ logger.info(f"Total articles fetched: {len(all_articles)}")
277
+
278
+ # Language filter
279
+ all_articles, lang_filtered = filter_by_language(
280
+ all_articles,
281
+ target_language=settings.news_language
282
+ )
283
+ stats["language_filtered"] = lang_filtered
284
+
285
+ # Get recent titles for fuzzy dedup
286
+ recent_titles = get_recent_titles(
287
+ session,
288
+ window_hours=settings.fuzzy_dedup_window_hours
289
+ )
290
+
291
+ # Process articles
292
+ for article in all_articles:
293
+ try:
294
+ title = clean_text(article.get("title", ""))
295
+ if not title:
296
+ continue
297
+
298
+ # Fuzzy dedup check
299
+ if is_fuzzy_duplicate(
300
+ title,
301
+ recent_titles,
302
+ threshold=settings.fuzzy_dedup_threshold
303
+ ):
304
+ stats["fuzzy_filtered"] += 1
305
+ continue
306
+
307
+ # Prepare fields
308
+ description = clean_text(article.get("description", ""))
309
+ content = clean_text(article.get("content", ""))
310
+ url = normalize_url(article.get("url", ""))
311
+ source = article.get("source", "Unknown")
312
+ author = article.get("author", "")
313
+ published_at = article.get("published_at", datetime.now(timezone.utc))
314
+
315
+ # Generate keys
316
+ dedup_key = generate_dedup_key(
317
+ url=url,
318
+ title=title,
319
+ published_at=published_at,
320
+ source=source
321
+ )
322
+ canon_title = canonical_title(title)
323
+
324
+ # Upsert
325
+ stmt = get_upsert_stmt(
326
+ NewsArticle,
327
+ values={
328
+ "dedup_key": dedup_key,
329
+ "title": truncate_text(title, 500),
330
+ "canonical_title": truncate_text(canon_title, 500),
331
+ "description": truncate_text(description, 2000) if description else None,
332
+ "content": truncate_text(content, 10000) if content else None,
333
+ "url": url or None,
334
+ "source": source,
335
+ "author": author or None,
336
+ "language": settings.news_language,
337
+ "published_at": published_at,
338
+ "fetched_at": datetime.now(timezone.utc),
339
+ },
340
+ index_elements=["dedup_key"]
341
+ )
342
+
343
+ result = session.execute(stmt)
344
+
345
+ if result.rowcount > 0:
346
+ stats["imported"] += 1
347
+ # Add to recent titles for this batch
348
+ recent_titles.append(canon_title)
349
+ else:
350
+ stats["duplicates"] += 1
351
+
352
+ except Exception as e:
353
+ logger.warning(f"Error processing article: {e}")
354
+ continue
355
+
356
+ session.commit()
357
+
358
+ logger.info(
359
+ f"News ingestion complete: "
360
+ f"{stats['imported']} imported, "
361
+ f"{stats['duplicates']} duplicates, "
362
+ f"{stats['fuzzy_filtered']} fuzzy filtered, "
363
+ f"{stats['language_filtered']} language filtered"
364
+ )
365
+
366
+ return stats
367
+
368
+
369
+ # =============================================================================
370
+ # Price Ingestion
371
+ # =============================================================================
372
+
373
+ def ingest_prices(session: Session) -> dict:
374
+ """
375
+ Ingest price data for all configured symbols.
376
+
377
+ Returns:
378
+ Dict with stats per symbol
379
+ """
380
+ settings = get_settings()
381
+ symbols = settings.symbols_list
382
+
383
+ stats = {}
384
+
385
+ # Calculate date range
386
+ end_date = datetime.now(timezone.utc)
387
+ start_date = end_date - timedelta(days=settings.lookback_days)
388
+
389
+ for symbol in symbols:
390
+ logger.info(f"Fetching prices for {symbol}...")
391
+
392
+ try:
393
+ ticker = yf.Ticker(symbol)
394
+ df = ticker.history(
395
+ start=start_date.strftime("%Y-%m-%d"),
396
+ end=end_date.strftime("%Y-%m-%d"),
397
+ interval="1d"
398
+ )
399
+
400
+ if df.empty:
401
+ logger.warning(f"No data returned for {symbol}")
402
+ stats[symbol] = {"imported": 0, "duplicates": 0, "error": "no_data"}
403
+ continue
404
+
405
+ imported = 0
406
+ duplicates = 0
407
+
408
+ for date_idx, row in df.iterrows():
409
+ try:
410
+ # Convert index to datetime
411
+ if hasattr(date_idx, 'to_pydatetime'):
412
+ bar_date = date_idx.to_pydatetime()
413
+ else:
414
+ bar_date = date_idx
415
+
416
+ # Ensure timezone
417
+ if bar_date.tzinfo is None:
418
+ bar_date = bar_date.replace(tzinfo=timezone.utc)
419
+
420
+ # Upsert
421
+ stmt = get_upsert_stmt(
422
+ PriceBar,
423
+ values={
424
+ "symbol": symbol,
425
+ "date": bar_date,
426
+ "open": float(row.get("Open", 0)) if row.get("Open") else None,
427
+ "high": float(row.get("High", 0)) if row.get("High") else None,
428
+ "low": float(row.get("Low", 0)) if row.get("Low") else None,
429
+ "close": float(row["Close"]),
430
+ "volume": float(row.get("Volume", 0)) if row.get("Volume") else None,
431
+ "adj_close": float(row.get("Adj Close", row["Close"])),
432
+ "fetched_at": datetime.now(timezone.utc),
433
+ },
434
+ index_elements=["symbol", "date"],
435
+ update_set={
436
+ "close": float(row["Close"]),
437
+ "adj_close": float(row.get("Adj Close", row["Close"])),
438
+ "fetched_at": datetime.now(timezone.utc),
439
+ }
440
+ )
441
+
442
+ result = session.execute(stmt)
443
+
444
+ if result.rowcount > 0:
445
+ imported += 1
446
+ else:
447
+ duplicates += 1
448
+
449
+ except Exception as e:
450
+ logger.debug(f"Error processing price bar: {e}")
451
+ continue
452
+
453
+ session.commit()
454
+
455
+ stats[symbol] = {"imported": imported, "duplicates": duplicates}
456
+ logger.info(f"{symbol}: {imported} bars imported, {duplicates} updated")
457
+
458
+ except Exception as e:
459
+ logger.error(f"Failed to fetch {symbol}: {e}")
460
+ stats[symbol] = {"imported": 0, "duplicates": 0, "error": str(e)}
461
+
462
+ return stats
463
+
464
+
465
+ # =============================================================================
466
+ # Main Entry Point
467
+ # =============================================================================
468
+
469
+ def fetch_all(
470
+ news: bool = True,
471
+ prices: bool = True
472
+ ) -> dict:
473
+ """
474
+ Run full data ingestion pipeline.
475
+
476
+ Args:
477
+ news: Whether to fetch news
478
+ prices: Whether to fetch prices
479
+
480
+ Returns:
481
+ Combined stats dict
482
+ """
483
+ logger.info("Starting data ingestion pipeline...")
484
+
485
+ results = {
486
+ "news": None,
487
+ "prices": None,
488
+ "timestamp": datetime.now(timezone.utc).isoformat(),
489
+ }
490
+
491
+ with SessionLocal() as session:
492
+ if news:
493
+ results["news"] = ingest_news(session)
494
+
495
+ if prices:
496
+ results["prices"] = ingest_prices(session)
497
+
498
+ logger.info("Data ingestion complete")
499
+ return results
500
+
501
+
502
+ def main():
503
+ parser = argparse.ArgumentParser(
504
+ description="Fetch news and price data"
505
+ )
506
+ parser.add_argument(
507
+ "--fetch",
508
+ action="store_true",
509
+ help="Run data fetch"
510
+ )
511
+ parser.add_argument(
512
+ "--news-only",
513
+ action="store_true",
514
+ help="Fetch only news"
515
+ )
516
+ parser.add_argument(
517
+ "--prices-only",
518
+ action="store_true",
519
+ help="Fetch only prices"
520
+ )
521
+ parser.add_argument(
522
+ "--no-lock",
523
+ action="store_true",
524
+ help="Skip pipeline lock (for testing)"
525
+ )
526
+ parser.add_argument(
527
+ "--verbose", "-v",
528
+ action="store_true",
529
+ help="Verbose logging"
530
+ )
531
+
532
+ args = parser.parse_args()
533
+
534
+ if args.verbose:
535
+ logging.getLogger().setLevel(logging.DEBUG)
536
+
537
+ if not args.fetch:
538
+ parser.print_help()
539
+ return
540
+
541
+ # Initialize database
542
+ logger.info("Initializing database...")
543
+ init_db()
544
+
545
+ # Determine what to fetch
546
+ fetch_news = not args.prices_only
547
+ fetch_prices = not args.news_only
548
+
549
+ # Run with or without lock
550
+ if args.no_lock:
551
+ results = fetch_all(news=fetch_news, prices=fetch_prices)
552
+ else:
553
+ try:
554
+ with pipeline_lock():
555
+ results = fetch_all(news=fetch_news, prices=fetch_prices)
556
+ except RuntimeError as e:
557
+ logger.error(f"Could not acquire lock: {e}")
558
+ logger.info("Another pipeline process may be running. Use --no-lock to bypass.")
559
+ return
560
+
561
+ # Print summary
562
+ print("\n" + "=" * 50)
563
+ print("DATA INGESTION SUMMARY")
564
+ print("=" * 50)
565
+
566
+ if results.get("news"):
567
+ news = results["news"]
568
+ print(f"\nNews ({news.get('source', 'unknown')}):")
569
+ print(f" - Imported: {news.get('imported', 0)}")
570
+ print(f" - Duplicates: {news.get('duplicates', 0)}")
571
+ print(f" - Fuzzy filtered: {news.get('fuzzy_filtered', 0)}")
572
+ print(f" - Language filtered: {news.get('language_filtered', 0)}")
573
+
574
+ if results.get("prices"):
575
+ print("\nPrices:")
576
+ for symbol, stats in results["prices"].items():
577
+ status = f"{stats.get('imported', 0)} imported"
578
+ if stats.get("error"):
579
+ status = f"ERROR: {stats['error']}"
580
+ print(f" - {symbol}: {status}")
581
+
582
+ print(f"\nTimestamp: {results.get('timestamp', 'N/A')}")
583
+
584
+
585
+ if __name__ == "__main__":
586
+ main()
587
+