ifieryarrows commited on
Commit
58106a0
·
verified ·
1 Parent(s): 1a2d1b3

Sync from GitHub (tests passed)

Browse files
Files changed (1) hide show
  1. pipelines/ingestion/news.py +40 -24
pipelines/ingestion/news.py CHANGED
@@ -47,14 +47,14 @@ def compute_url_hash(url: Optional[str]) -> Optional[str]:
47
  def insert_raw_article(
48
  session: Session,
49
  url: Optional[str],
50
- title: str,
51
  description: Optional[str],
52
  source: str,
53
  source_feed: str,
54
- published_at: datetime,
55
  run_id: uuid.UUID,
56
  raw_payload: Optional[dict] = None,
57
- ) -> Optional[int]:
58
  """
59
  Insert single article to news_raw.
60
 
@@ -63,21 +63,28 @@ def insert_raw_article(
63
  Args:
64
  session: Database session
65
  url: Article URL (can be None)
66
- title: Article title
67
  description: Article description
68
  source: Source name (e.g., "google_news", "newsapi")
69
  source_feed: Exact feed URL or query string
70
- published_at: Publication timestamp (UTC)
71
  run_id: Pipeline run UUID
72
  raw_payload: Original response fragment for debugging
73
 
74
  Returns:
75
- raw_id if inserted, None if duplicate or error
76
  """
77
- if not title or not title.strip():
78
- return None
79
 
 
 
 
80
  title = clean_text(title)[:500] # Truncate to column limit
 
 
 
 
 
81
  url_hash = compute_url_hash(url)
82
 
83
  try:
@@ -93,6 +100,7 @@ def insert_raw_article(
93
  source=source,
94
  source_feed=source_feed[:500] if source_feed else None,
95
  published_at=published_at,
 
96
  run_id=run_id,
97
  raw_payload=raw_payload,
98
  )
@@ -105,14 +113,13 @@ def insert_raw_article(
105
 
106
  if result.rowcount > 0:
107
  # Get the inserted ID
108
- # For PostgreSQL, we need to query it
109
  row = session.execute(
110
  text("SELECT id FROM news_raw WHERE url_hash = :hash ORDER BY id DESC LIMIT 1"),
111
  {"hash": url_hash}
112
  ).fetchone()
113
- return row[0] if row else None
114
 
115
- return None # Duplicate
116
 
117
  else:
118
  # SQLite fallback - simple insert with error handling
@@ -124,17 +131,22 @@ def insert_raw_article(
124
  source=source,
125
  source_feed=source_feed[:500] if source_feed else None,
126
  published_at=published_at,
 
127
  run_id=run_id,
128
  raw_payload=raw_payload,
129
  )
130
  session.add(article)
131
  session.flush()
132
- return article.id
133
 
134
  except Exception as e:
135
- logger.debug(f"Insert raw article failed: {e}")
 
 
 
 
136
  session.rollback()
137
- return None
138
 
139
 
140
  def ingest_news_to_raw(
@@ -202,22 +214,24 @@ def ingest_news_to_raw(
202
  stats["fetched"] += len(articles)
203
 
204
  for article in articles:
205
- raw_id = insert_raw_article(
206
  session=session,
207
  url=article.get("url"),
208
- title=article.get("title", ""),
209
  description=article.get("description"),
210
  source="google_news",
211
  source_feed=f"google_news:{query}",
212
- published_at=article.get("published_at", datetime.now(timezone.utc)),
213
  run_id=run_id,
214
  raw_payload={"query": query, "source": article.get("source")},
215
  )
216
 
217
- if raw_id:
218
  stats["inserted"] += 1
219
- else:
220
  stats["duplicates"] += 1
 
 
221
 
222
  except Exception as e:
223
  logger.warning(f"Error fetching {source} for '{query}': {e}")
@@ -239,22 +253,24 @@ def ingest_news_to_raw(
239
  stats["fetched"] += len(articles)
240
 
241
  for article in articles:
242
- raw_id = insert_raw_article(
243
  session=session,
244
  url=article.get("url"),
245
- title=article.get("title", ""),
246
  description=article.get("description"),
247
  source="newsapi",
248
  source_feed=f"newsapi:{query}",
249
- published_at=article.get("published_at", datetime.now(timezone.utc)),
250
  run_id=run_id,
251
  raw_payload={"query": query, "author": article.get("author")},
252
  )
253
 
254
- if raw_id:
255
  stats["inserted"] += 1
256
- else:
257
  stats["duplicates"] += 1
 
 
258
 
259
  except Exception as e:
260
  logger.warning(f"Error fetching newsapi for '{query}': {e}")
 
47
  def insert_raw_article(
48
  session: Session,
49
  url: Optional[str],
50
+ title: Optional[str],
51
  description: Optional[str],
52
  source: str,
53
  source_feed: str,
54
+ published_at: Optional[datetime],
55
  run_id: uuid.UUID,
56
  raw_payload: Optional[dict] = None,
57
+ ) -> tuple[Optional[int], str]:
58
  """
59
  Insert single article to news_raw.
60
 
 
63
  Args:
64
  session: Database session
65
  url: Article URL (can be None)
66
+ title: Article title (will use fallback if empty)
67
  description: Article description
68
  source: Source name (e.g., "google_news", "newsapi")
69
  source_feed: Exact feed URL or query string
70
+ published_at: Publication timestamp (will use now if None)
71
  run_id: Pipeline run UUID
72
  raw_payload: Original response fragment for debugging
73
 
74
  Returns:
75
+ Tuple of (raw_id, status) where status is 'inserted', 'duplicate', or 'error'
76
  """
77
+ fetched_at = datetime.now(timezone.utc)
 
78
 
79
+ # Ensure title is never None/empty (NOT NULL constraint)
80
+ if not title or not title.strip():
81
+ title = "(untitled)"
82
  title = clean_text(title)[:500] # Truncate to column limit
83
+
84
+ # Ensure published_at is never None (NOT NULL constraint)
85
+ if not published_at:
86
+ published_at = fetched_at
87
+
88
  url_hash = compute_url_hash(url)
89
 
90
  try:
 
100
  source=source,
101
  source_feed=source_feed[:500] if source_feed else None,
102
  published_at=published_at,
103
+ fetched_at=fetched_at,
104
  run_id=run_id,
105
  raw_payload=raw_payload,
106
  )
 
113
 
114
  if result.rowcount > 0:
115
  # Get the inserted ID
 
116
  row = session.execute(
117
  text("SELECT id FROM news_raw WHERE url_hash = :hash ORDER BY id DESC LIMIT 1"),
118
  {"hash": url_hash}
119
  ).fetchone()
120
+ return (row[0] if row else None, "inserted")
121
 
122
+ return (None, "duplicate")
123
 
124
  else:
125
  # SQLite fallback - simple insert with error handling
 
131
  source=source,
132
  source_feed=source_feed[:500] if source_feed else None,
133
  published_at=published_at,
134
+ fetched_at=fetched_at,
135
  run_id=run_id,
136
  raw_payload=raw_payload,
137
  )
138
  session.add(article)
139
  session.flush()
140
+ return (article.id, "inserted")
141
 
142
  except Exception as e:
143
+ # Distinguish between duplicate (23505) and other errors
144
+ pgcode = getattr(getattr(e, "orig", None), "pgcode", None)
145
+ if pgcode == "23505": # Unique violation
146
+ return (None, "duplicate")
147
+ logger.warning(f"Insert raw article failed (pgcode={pgcode}): {e}")
148
  session.rollback()
149
+ return (None, "error")
150
 
151
 
152
  def ingest_news_to_raw(
 
214
  stats["fetched"] += len(articles)
215
 
216
  for article in articles:
217
+ raw_id, status = insert_raw_article(
218
  session=session,
219
  url=article.get("url"),
220
+ title=article.get("title"),
221
  description=article.get("description"),
222
  source="google_news",
223
  source_feed=f"google_news:{query}",
224
+ published_at=article.get("published_at"),
225
  run_id=run_id,
226
  raw_payload={"query": query, "source": article.get("source")},
227
  )
228
 
229
+ if status == "inserted":
230
  stats["inserted"] += 1
231
+ elif status == "duplicate":
232
  stats["duplicates"] += 1
233
+ else:
234
+ stats["errors"] += 1
235
 
236
  except Exception as e:
237
  logger.warning(f"Error fetching {source} for '{query}': {e}")
 
253
  stats["fetched"] += len(articles)
254
 
255
  for article in articles:
256
+ raw_id, status = insert_raw_article(
257
  session=session,
258
  url=article.get("url"),
259
+ title=article.get("title"),
260
  description=article.get("description"),
261
  source="newsapi",
262
  source_feed=f"newsapi:{query}",
263
+ published_at=article.get("published_at"),
264
  run_id=run_id,
265
  raw_payload={"query": query, "author": article.get("author")},
266
  )
267
 
268
+ if status == "inserted":
269
  stats["inserted"] += 1
270
+ elif status == "duplicate":
271
  stats["duplicates"] += 1
272
+ else:
273
+ stats["errors"] += 1
274
 
275
  except Exception as e:
276
  logger.warning(f"Error fetching newsapi for '{query}': {e}")