ak0601 commited on
Commit
2ef93e1
·
verified ·
1 Parent(s): 1acc8d7

Update app/fetcher/fetch_pipeline.py

Browse files
Files changed (1) hide show
  1. app/fetcher/fetch_pipeline.py +232 -225
app/fetcher/fetch_pipeline.py CHANGED
@@ -1,225 +1,232 @@
1
- """
2
- ResearchRadar — Fetch pipeline orchestration.
3
-
4
- Contains the main Sunday job logic. Coordinates all API clients,
5
- handles fallback, deduplication, ranking, storage, and notification.
6
-
7
- This function must **never raise** — all exceptions are caught and
8
- logged into ``Digest.fetch_errors``.
9
- """
10
-
11
- from __future__ import annotations
12
-
13
- import logging
14
- from difflib import SequenceMatcher
15
- from typing import Dict, List
16
-
17
- from app.core.config import (
18
- ARXIV_CATEGORY_MAP,
19
- KEYWORD_MAP,
20
- PUBMED_MESH_MAP,
21
- TOP_N_PER_CATEGORY,
22
- AI_FILTERS,
23
- )
24
- from app.core.models import Digest, Paper, UserProfile
25
- from app.core import database
26
- from app.fetcher import arxiv_client, pubmed_client, semantic_scholar
27
- from app.fetcher.http_session import FetchError, RetrySession
28
- from app.ranker import composite_ranker
29
- from app.summarizer.groq_client import GroqSummarizer
30
-
31
- logger = logging.getLogger(__name__)
32
-
33
-
34
- # ---------------------------------------------------------------------------
35
- # Public entry point
36
- # ---------------------------------------------------------------------------
37
-
38
- def run_weekly_fetch(
39
- db_path: str,
40
- profile: UserProfile | None = None,
41
- ) -> Digest:
42
- """
43
- Main weekly pipeline. Called by the scheduler every Sunday.
44
-
45
- 1. Fetch papers from arXiv (primary) with Semantic Scholar fallback.
46
- 2. For neuro/BCI categories, additionally fetch from PubMed and merge.
47
- 3. Enrich citation counts (best-effort).
48
- 4. Rank papers via composite ranker.
49
- 5. Save digest to DB and send notification.
50
- 6. Return the Digest.
51
- """
52
- if profile is None:
53
- profile = UserProfile()
54
-
55
- digest = Digest.create_new()
56
- session = RetrySession()
57
- all_papers: Dict[str, List[Paper]] = {}
58
-
59
- for category, arxiv_cats in ARXIV_CATEGORY_MAP.items():
60
- papers = _fetch_category(category, arxiv_cats, session, digest)
61
-
62
- # PubMed supplement for neuroscience & BCI
63
- if category in PUBMED_MESH_MAP:
64
- pubmed_papers = _fetch_pubmed(category, session, digest)
65
- papers = _deduplicate(papers + pubmed_papers)
66
-
67
- # Enforce AI filter for neuro categories
68
- # "I want only those papers in neuroscience and BCI which has in someway AI or ML"
69
- papers = _ai_filter(papers)
70
-
71
- all_papers[category] = papers
72
-
73
- # Enrich citation counts (best-effort)
74
- flat = [p for cat_list in all_papers.values() for p in cat_list]
75
- try:
76
- semantic_scholar.enrich_citations(flat, session)
77
- except Exception as exc:
78
- logger.warning('Citation enrichment failed: %s', exc)
79
- digest.fetch_errors.append(f'Citation enrichment: {exc}')
80
-
81
- # Rank
82
- digest.total_fetched = sum(len(v) for v in all_papers.values())
83
- ranked = composite_ranker.rank_all(all_papers, profile)
84
-
85
- # After ranking, summarize the top papers for the digest
86
- # (Only summarizes top N results that appear in the final ranked lists)
87
- _summarize_top_papers(ranked)
88
-
89
- digest.papers = ranked
90
- digest.total_ranked = sum(len(v) for v in ranked.values())
91
-
92
- # Persist
93
- try:
94
- database.save_digest(db_path, digest)
95
- except Exception as exc:
96
- logger.error('Failed to save digest: %s', exc)
97
- digest.fetch_errors.append(f'DB save error: {exc}')
98
-
99
- # Notification (best-effort)
100
- try:
101
- from app.core.notifier import send_digest_notification
102
- send_digest_notification(digest)
103
- except Exception as exc:
104
- logger.warning('Notification failed: %s', exc)
105
-
106
- return digest
107
-
108
-
109
- # ---------------------------------------------------------------------------
110
- # Internal helpers
111
- # ---------------------------------------------------------------------------
112
-
113
- def _fetch_category(
114
- category: str,
115
- arxiv_cats: list,
116
- session: RetrySession,
117
- digest: Digest,
118
- ) -> List[Paper]:
119
- """Fetch from arXiv, fall back to Semantic Scholar if empty / error."""
120
- papers: List[Paper] = []
121
-
122
- try:
123
- papers = arxiv_client.fetch_papers(category, arxiv_cats, session)
124
- except Exception as exc:
125
- msg = f'arXiv error [{category}]: {exc}'
126
- logger.warning(msg)
127
- digest.fetch_errors.append(msg)
128
-
129
- if not papers:
130
- logger.info('arXiv empty for [%s] — trying Semantic Scholar', category)
131
- try:
132
- keywords = KEYWORD_MAP.get(category, [category])
133
- papers = semantic_scholar.fetch_papers(category, keywords, session)
134
- except Exception as exc:
135
- msg = f'Semantic Scholar error [{category}]: {exc}'
136
- logger.warning(msg)
137
- digest.fetch_errors.append(msg)
138
-
139
- if not papers:
140
- logger.info('No papers found for [%s] from any source', category)
141
-
142
- return papers
143
-
144
-
145
- def _fetch_pubmed(
146
- category: str,
147
- session: RetrySession,
148
- digest: Digest,
149
- ) -> List[Paper]:
150
- """Fetch supplemental papers from PubMed."""
151
- mesh = PUBMED_MESH_MAP.get(category, '')
152
- if not mesh:
153
- return []
154
- try:
155
- return pubmed_client.fetch_papers(category, mesh, session)
156
- except Exception as exc:
157
- msg = f'PubMed error [{category}]: {exc}'
158
- logger.warning(msg)
159
- digest.fetch_errors.append(msg)
160
- return []
161
-
162
-
163
- def _summarize_top_papers(papers_by_cat: Dict[str, List[Paper]]):
164
- """Call Groq to summarize papers in the final digest list."""
165
- summarizer = GroqSummarizer()
166
- for cat, papers in papers_by_cat.items():
167
- if papers:
168
- logger.info("Summarizing %d papers for category [%s]...", len(papers), cat)
169
- summarizer.summarize_many(papers)
170
-
171
-
172
- def _ai_filter(papers: List[Paper]) -> List[Paper]:
173
- """Filter to only include papers mentioning AI/ML keywords in title or abstract."""
174
- if not papers:
175
- return []
176
-
177
- result = []
178
- for p in papers:
179
- text = (p.title + " " + p.abstract).lower()
180
- if any(f in text for f in AI_FILTERS):
181
- result.append(p)
182
- return result
183
-
184
-
185
- def _deduplicate(papers: List[Paper]) -> List[Paper]:
186
- """
187
- Remove duplicate papers.
188
-
189
- Two papers are considered duplicates if:
190
- - Their paper_id matches, OR
191
- - Their title similarity (SequenceMatcher ratio) > 0.92
192
-
193
- When merging, prefer arXiv > Semantic Scholar > PubMed.
194
- """
195
- SOURCE_PRIORITY = {'arxiv': 0, 'semantic_scholar': 1, 'pubmed': 2}
196
- seen_ids: set = set()
197
- seen_titles: List[str] = []
198
- result: List[Paper] = []
199
-
200
- # Sort by source priority so preferred sources come first
201
- papers.sort(key=lambda p: SOURCE_PRIORITY.get(p.source, 9))
202
-
203
- for paper in papers:
204
- if paper.paper_id in seen_ids:
205
- continue
206
-
207
- is_dup = False
208
- for existing_title in seen_titles:
209
- if SequenceMatcher(None, paper.title.lower(), existing_title).ratio() > 0.92:
210
- is_dup = True
211
- break
212
-
213
- if is_dup:
214
- continue
215
-
216
- seen_ids.add(paper.paper_id)
217
- seen_titles.append(paper.title.lower())
218
- result.append(paper)
219
-
220
- if len(papers) != len(result):
221
- logger.info(
222
- 'Deduplication: %d → %d papers', len(papers), len(result),
223
- )
224
-
225
- return result
 
 
 
 
 
 
 
 
1
+ """
2
+ ResearchRadar — Fetch pipeline orchestration.
3
+
4
+ Contains the main Sunday job logic. Coordinates all API clients,
5
+ handles fallback, deduplication, ranking, storage, and notification.
6
+
7
+ This function must **never raise** — all exceptions are caught and
8
+ logged into ``Digest.fetch_errors``.
9
+ """
10
+
11
+ from __future__ import annotations
12
+
13
+ import logging
14
+ from difflib import SequenceMatcher
15
+ from typing import Dict, List
16
+
17
+ from app.core.config import (
18
+ ARXIV_CATEGORY_MAP,
19
+ KEYWORD_MAP,
20
+ PUBMED_MESH_MAP,
21
+ TOP_N_PER_CATEGORY,
22
+ AI_FILTERS,
23
+ )
24
+ from app.core.models import Digest, Paper, UserProfile
25
+ from app.core import database
26
+ from app.fetcher import arxiv_client, pubmed_client, semantic_scholar, youtube_client
27
+ from app.fetcher.http_session import FetchError, RetrySession
28
+ from app.ranker import composite_ranker
29
+ from app.summarizer.groq_client import GroqSummarizer
30
+
31
+ logger = logging.getLogger(__name__)
32
+
33
+
34
+ # ---------------------------------------------------------------------------
35
+ # Public entry point
36
+ # ---------------------------------------------------------------------------
37
+
38
+ def run_weekly_fetch(
39
+ db_path: str,
40
+ profile: UserProfile | None = None,
41
+ ) -> Digest:
42
+ """
43
+ Main weekly pipeline. Called by the scheduler every Sunday.
44
+
45
+ 1. Fetch papers from arXiv (primary) with Semantic Scholar fallback.
46
+ 2. For neuro/BCI categories, additionally fetch from PubMed and merge.
47
+ 3. Enrich citation counts (best-effort).
48
+ 4. Rank papers via composite ranker.
49
+ 5. Save digest to DB and send notification.
50
+ 6. Return the Digest.
51
+ """
52
+ if profile is None:
53
+ profile = UserProfile()
54
+
55
+ digest = Digest.create_new()
56
+ session = RetrySession()
57
+ all_papers: Dict[str, List[Paper]] = {}
58
+
59
+ for category, arxiv_cats in ARXIV_CATEGORY_MAP.items():
60
+ papers = _fetch_category(category, arxiv_cats, session, digest)
61
+
62
+ # PubMed supplement for neuroscience & BCI
63
+ if category in PUBMED_MESH_MAP:
64
+ pubmed_papers = _fetch_pubmed(category, session, digest)
65
+ papers = _deduplicate(papers + pubmed_papers)
66
+
67
+ # Enforce AI filter for neuro categories
68
+ # "I want only those papers in neuroscience and BCI which has in someway AI or ML"
69
+ papers = _ai_filter(papers)
70
+
71
+ all_papers[category] = papers
72
+
73
+ # Enrich citation counts (best-effort)
74
+ flat = [p for cat_list in all_papers.values() for p in cat_list]
75
+ try:
76
+ semantic_scholar.enrich_citations(flat, session)
77
+ except Exception as exc:
78
+ logger.warning('Citation enrichment failed: %s', exc)
79
+ digest.fetch_errors.append(f'Citation enrichment: {exc}')
80
+
81
+ # Rank
82
+ digest.total_fetched = sum(len(v) for v in all_papers.values())
83
+ ranked = composite_ranker.rank_all(all_papers, profile)
84
+
85
+ # After ranking, summarize the top papers for the digest
86
+ # (Only summarizes top N results that appear in the final ranked lists)
87
+ _summarize_top_papers(ranked)
88
+
89
+ digest.papers = ranked
90
+ digest.total_ranked = sum(len(v) for v in ranked.values())
91
+
92
+ # Fetch YouTube videos for the "while eating" section
93
+ try:
94
+ digest.videos = youtube_client.fetch_latest_videos(limit_per_channel=1)
95
+ except Exception as exc:
96
+ logger.warning('YouTube fetch failed: %s', exc)
97
+ digest.fetch_errors.append(f'YouTube error: {exc}')
98
+
99
+ # Persist
100
+ try:
101
+ database.save_digest(db_path, digest)
102
+ except Exception as exc:
103
+ logger.error('Failed to save digest: %s', exc)
104
+ digest.fetch_errors.append(f'DB save error: {exc}')
105
+
106
+ # Notification (best-effort)
107
+ try:
108
+ from app.core.notifier import send_digest_notification
109
+ send_digest_notification(digest)
110
+ except Exception as exc:
111
+ logger.warning('Notification failed: %s', exc)
112
+
113
+ return digest
114
+
115
+
116
+ # ---------------------------------------------------------------------------
117
+ # Internal helpers
118
+ # ---------------------------------------------------------------------------
119
+
120
+ def _fetch_category(
121
+ category: str,
122
+ arxiv_cats: list,
123
+ session: RetrySession,
124
+ digest: Digest,
125
+ ) -> List[Paper]:
126
+ """Fetch from arXiv, fall back to Semantic Scholar if empty / error."""
127
+ papers: List[Paper] = []
128
+
129
+ try:
130
+ papers = arxiv_client.fetch_papers(category, arxiv_cats, session)
131
+ except Exception as exc:
132
+ msg = f'arXiv error [{category}]: {exc}'
133
+ logger.warning(msg)
134
+ digest.fetch_errors.append(msg)
135
+
136
+ if not papers:
137
+ logger.info('arXiv empty for [%s] — trying Semantic Scholar', category)
138
+ try:
139
+ keywords = KEYWORD_MAP.get(category, [category])
140
+ papers = semantic_scholar.fetch_papers(category, keywords, session)
141
+ except Exception as exc:
142
+ msg = f'Semantic Scholar error [{category}]: {exc}'
143
+ logger.warning(msg)
144
+ digest.fetch_errors.append(msg)
145
+
146
+ if not papers:
147
+ logger.info('No papers found for [%s] from any source', category)
148
+
149
+ return papers
150
+
151
+
152
+ def _fetch_pubmed(
153
+ category: str,
154
+ session: RetrySession,
155
+ digest: Digest,
156
+ ) -> List[Paper]:
157
+ """Fetch supplemental papers from PubMed."""
158
+ mesh = PUBMED_MESH_MAP.get(category, '')
159
+ if not mesh:
160
+ return []
161
+ try:
162
+ return pubmed_client.fetch_papers(category, mesh, session)
163
+ except Exception as exc:
164
+ msg = f'PubMed error [{category}]: {exc}'
165
+ logger.warning(msg)
166
+ digest.fetch_errors.append(msg)
167
+ return []
168
+
169
+
170
+ def _summarize_top_papers(papers_by_cat: Dict[str, List[Paper]]):
171
+ """Call Groq to summarize papers in the final digest list."""
172
+ summarizer = GroqSummarizer()
173
+ for cat, papers in papers_by_cat.items():
174
+ if papers:
175
+ logger.info("Summarizing %d papers for category [%s]...", len(papers), cat)
176
+ summarizer.summarize_many(papers)
177
+
178
+
179
+ def _ai_filter(papers: List[Paper]) -> List[Paper]:
180
+ """Filter to only include papers mentioning AI/ML keywords in title or abstract."""
181
+ if not papers:
182
+ return []
183
+
184
+ result = []
185
+ for p in papers:
186
+ text = (p.title + " " + p.abstract).lower()
187
+ if any(f in text for f in AI_FILTERS):
188
+ result.append(p)
189
+ return result
190
+
191
+
192
+ def _deduplicate(papers: List[Paper]) -> List[Paper]:
193
+ """
194
+ Remove duplicate papers.
195
+
196
+ Two papers are considered duplicates if:
197
+ - Their paper_id matches, OR
198
+ - Their title similarity (SequenceMatcher ratio) > 0.92
199
+
200
+ When merging, prefer arXiv > Semantic Scholar > PubMed.
201
+ """
202
+ SOURCE_PRIORITY = {'arxiv': 0, 'semantic_scholar': 1, 'pubmed': 2}
203
+ seen_ids: set = set()
204
+ seen_titles: List[str] = []
205
+ result: List[Paper] = []
206
+
207
+ # Sort by source priority so preferred sources come first
208
+ papers.sort(key=lambda p: SOURCE_PRIORITY.get(p.source, 9))
209
+
210
+ for paper in papers:
211
+ if paper.paper_id in seen_ids:
212
+ continue
213
+
214
+ is_dup = False
215
+ for existing_title in seen_titles:
216
+ if SequenceMatcher(None, paper.title.lower(), existing_title).ratio() > 0.92:
217
+ is_dup = True
218
+ break
219
+
220
+ if is_dup:
221
+ continue
222
+
223
+ seen_ids.add(paper.paper_id)
224
+ seen_titles.append(paper.title.lower())
225
+ result.append(paper)
226
+
227
+ if len(papers) != len(result):
228
+ logger.info(
229
+ 'Deduplication: %d → %d papers', len(papers), len(result),
230
+ )
231
+
232
+ return result