winbeau commited on
Commit
353b9f4
·
0 Parent(s):

Backend snapshot from 760fa78

Browse files
.env.example ADDED
@@ -0,0 +1,10 @@
 
 
 
 
 
 
 
 
 
 
 
1
+ DATABASE_URL=postgresql://USER:PASSWORD@HOST:PORT/DATABASE?sslmode=require
2
+
3
+ # Dify Workflow API Configuration
4
+ DIFY_API_KEY=your_dify_api_key_here
5
+ DIFY_API_BASE=http://82.157.209.193:8080/v1
6
+
7
+ # Legacy DeepSeek Configuration (deprecated, use Dify instead)
8
+ # DEEPSEEK_API_KEY=your_api_key_here
9
+ # DEEPSEEK_BASE_URL=https://api.deepseek.com
10
+ # DEEPSEEK_MODEL=deepseek-chat
.gitignore ADDED
@@ -0,0 +1,13 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # Python-generated files
2
+ __pycache__/
3
+ *.py[oc]
4
+ build/
5
+ dist/
6
+ wheels/
7
+ *.egg-info
8
+
9
+ # Virtual environments
10
+ .venv
11
+
12
+ # Environment variables
13
+ .env
.python-version ADDED
@@ -0,0 +1 @@
 
 
1
+ 3.12
Dockerfile ADDED
@@ -0,0 +1,23 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ FROM python:3.10-slim
2
+
3
+ ENV PYTHONDONTWRITEBYTECODE=1 \
4
+ PYTHONUNBUFFERED=1 \
5
+ UV_CACHE_DIR=/tmp/uv-cache \
6
+ PATH="/root/.local/bin:/root/.cargo/bin:$PATH"
7
+
8
+ RUN apt-get update \
9
+ && apt-get install -y --no-install-recommends gcc libpq-dev curl \
10
+ && rm -rf /var/lib/apt/lists/*
11
+
12
+ RUN curl -LsSf https://astral.sh/uv/install.sh | sh
13
+
14
+ WORKDIR /app
15
+
16
+ COPY pyproject.toml uv.lock ./
17
+ RUN uv sync --frozen --no-dev
18
+
19
+ COPY . .
20
+
21
+ EXPOSE 7860
22
+
23
+ CMD ["uv", "run", "uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "7860"]
README.md ADDED
@@ -0,0 +1,13 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ ---
2
+ title: Paper Insight API
3
+ emoji: "🎉"
4
+ colorFrom: "blue"
5
+ colorTo: "green"
6
+ sdk: "docker"
7
+ app_file: "Dockerfile"
8
+ pinned: false
9
+ ---
10
+
11
+ # Paper Insight API
12
+
13
+ FastAPI backend for the Paper Insight project.
app/__init__.py ADDED
File without changes
app/constants.py ADDED
@@ -0,0 +1,8 @@
 
 
 
 
 
 
 
 
 
1
+ ARXIV_OPTIONS = [
2
+ {"code": "cs.CV", "name": "Computer Vision", "desc": "Image processing, generated models, segmentation"},
3
+ {"code": "cs.CL", "name": "Computation and Language", "desc": "NLP, LLMs, Text mining"},
4
+ {"code": "cs.LG", "name": "Machine Learning", "desc": "Deep learning architectures, optimization, algorithms"},
5
+ {"code": "cs.AI", "name": "Artificial Intelligence", "desc": "General AI, reasoning, cognitive modeling"},
6
+ {"code": "cs.RO", "name": "Robotics", "desc": "Kinematics, dynamics, sensors, control"},
7
+ {"code": "cs.SD", "name": "Sound", "desc": "Audio processing, speech recognition"}
8
+ ]
app/database.py ADDED
@@ -0,0 +1,130 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import os
2
+ from typing import Generator
3
+ from sqlmodel import SQLModel, create_engine, Session
4
+ from sqlalchemy import inspect, text
5
+ from dotenv import load_dotenv
6
+
7
+ load_dotenv()
8
+
9
+ DATABASE_URL = os.getenv(
10
+ "DATABASE_URL",
11
+ "postgresql://postgres:postgres@localhost:5432/paper_insight"
12
+ )
13
+
14
+ engine = create_engine(DATABASE_URL, echo=False)
15
+
16
+
17
+ def create_db_and_tables():
18
+ """Create all database tables."""
19
+ SQLModel.metadata.create_all(engine)
20
+
21
+
22
+ def ensure_appsettings_schema():
23
+ """Ensure AppSettings has expected columns for legacy databases."""
24
+ inspector = inspect(engine)
25
+ if "appsettings" not in inspector.get_table_names():
26
+ return
27
+
28
+ columns = {col["name"] for col in inspector.get_columns("appsettings")}
29
+ added = set()
30
+ ddl_statements = []
31
+
32
+ if "research_focus" not in columns:
33
+ ddl_statements.append("ALTER TABLE appsettings ADD COLUMN research_focus TEXT")
34
+ added.add("research_focus")
35
+ if "focus_keywords" not in columns:
36
+ ddl_statements.append("ALTER TABLE appsettings ADD COLUMN focus_keywords JSON")
37
+ added.add("focus_keywords")
38
+ if "system_prompt" not in columns:
39
+ ddl_statements.append("ALTER TABLE appsettings ADD COLUMN system_prompt TEXT")
40
+ added.add("system_prompt")
41
+ if "arxiv_categories" not in columns:
42
+ ddl_statements.append("ALTER TABLE appsettings ADD COLUMN arxiv_categories JSON")
43
+ added.add("arxiv_categories")
44
+
45
+ if not ddl_statements and not columns:
46
+ return
47
+
48
+ final_columns = columns | added
49
+ with engine.begin() as conn:
50
+ for stmt in ddl_statements:
51
+ conn.execute(text(stmt))
52
+
53
+ if "research_focus" in final_columns:
54
+ conn.execute(
55
+ text("UPDATE appsettings SET research_focus = '' WHERE research_focus IS NULL")
56
+ )
57
+ if "system_prompt" in final_columns:
58
+ conn.execute(
59
+ text("UPDATE appsettings SET system_prompt = '' WHERE system_prompt IS NULL")
60
+ )
61
+ if "focus_keywords" in final_columns:
62
+ conn.execute(
63
+ text("UPDATE appsettings SET focus_keywords = '[]' WHERE focus_keywords IS NULL")
64
+ )
65
+ if "arxiv_categories" in final_columns:
66
+ conn.execute(
67
+ text(
68
+ "UPDATE appsettings SET arxiv_categories = "
69
+ "'[\"cs.CV\",\"cs.LG\"]' WHERE arxiv_categories IS NULL"
70
+ )
71
+ )
72
+
73
+
74
+ def ensure_paper_schema():
75
+ """Ensure Paper has expected columns for legacy databases."""
76
+ inspector = inspect(engine)
77
+ table_name = None
78
+ if "paper" in inspector.get_table_names():
79
+ table_name = "paper"
80
+ elif "papers" in inspector.get_table_names():
81
+ table_name = "papers"
82
+
83
+ if not table_name:
84
+ return
85
+
86
+ columns = {col["name"] for col in inspector.get_columns(table_name)}
87
+ added = set()
88
+ ddl_statements = []
89
+
90
+ if "processing_status" not in columns:
91
+ ddl_statements.append(
92
+ f"ALTER TABLE {table_name} ADD COLUMN processing_status TEXT"
93
+ )
94
+ added.add("processing_status")
95
+
96
+ final_columns = columns | added
97
+ with engine.begin() as conn:
98
+ for stmt in ddl_statements:
99
+ conn.execute(text(stmt))
100
+
101
+ if "processing_status" in final_columns:
102
+ conn.execute(
103
+ text(
104
+ f"UPDATE {table_name} "
105
+ "SET processing_status = CASE "
106
+ "WHEN is_processed THEN 'processed' ELSE 'pending' END "
107
+ "WHERE processing_status IS NULL"
108
+ )
109
+ )
110
+ conn.execute(
111
+ text(
112
+ f"UPDATE {table_name} "
113
+ "SET processing_status = 'skipped' "
114
+ "WHERE is_processed = TRUE "
115
+ "AND relevance_score IS NOT NULL "
116
+ "AND relevance_score < 5 "
117
+ "AND processing_status = 'processed'"
118
+ )
119
+ )
120
+
121
+
122
+ def get_session() -> Generator[Session, None, None]:
123
+ """Dependency for getting database session."""
124
+ with Session(engine) as session:
125
+ yield session
126
+
127
+
128
+ def get_sync_session() -> Session:
129
+ """Get a synchronous session for non-FastAPI contexts."""
130
+ return Session(engine)
app/main.py ADDED
@@ -0,0 +1,365 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from contextlib import asynccontextmanager
2
+ from typing import List, Optional
3
+ import re
4
+ import json
5
+ from fastapi import FastAPI, Depends, HTTPException, Query, BackgroundTasks
6
+ from fastapi.middleware.cors import CORSMiddleware
7
+ from fastapi.staticfiles import StaticFiles
8
+ from fastapi.responses import StreamingResponse
9
+ from pathlib import Path
10
+ from sqlmodel import Session, select
11
+ from sqlalchemy import or_
12
+
13
+ from app.database import create_db_and_tables, ensure_appsettings_schema, ensure_paper_schema, get_session
14
+ from app.models import Paper, PaperRead, AppSettings
15
+ from app.services.arxiv_bot import get_arxiv_bot, run_daily_fetch
16
+ from app.services.dify_client import (
17
+ get_dify_client,
18
+ DifyClientError,
19
+ DifyEntityTooLargeError,
20
+ DifyTimeoutError,
21
+ DifyRateLimitError,
22
+ )
23
+ from app.constants import ARXIV_OPTIONS
24
+
25
+
26
+ @asynccontextmanager
27
+ async def lifespan(app: FastAPI):
28
+ create_db_and_tables()
29
+ ensure_appsettings_schema()
30
+ ensure_paper_schema()
31
+ yield
32
+
33
+
34
+ app = FastAPI(
35
+ title="Paper Insight API",
36
+ description="API for fetching and summarizing arXiv papers focused on Autoregressive DiT and KV Cache Compression",
37
+ version="0.1.0",
38
+ lifespan=lifespan,
39
+ )
40
+
41
+ app.add_middleware(
42
+ CORSMiddleware,
43
+ allow_origins=["*"],
44
+ allow_credentials=True,
45
+ allow_methods=["*"],
46
+ allow_headers=["*"],
47
+ )
48
+
49
+ # Mount static files
50
+ static_path = Path(__file__).parent / "static"
51
+ static_path.mkdir(exist_ok=True)
52
+ app.mount("/static", StaticFiles(directory=static_path), name="static")
53
+
54
+
55
+ @app.get("/health")
56
+ def health_check():
57
+ """Health check endpoint."""
58
+ return {"status": "healthy"}
59
+
60
+
61
+ @app.get("/constants")
62
+ def get_constants():
63
+ """Get application constants."""
64
+ return {"arxiv_options": ARXIV_OPTIONS}
65
+
66
+
67
+ @app.get("/settings", response_model=AppSettings)
68
+ def get_settings(session: Session = Depends(get_session)):
69
+ """Get application settings."""
70
+ settings = session.get(AppSettings, 1)
71
+ if not settings:
72
+ settings = AppSettings(id=1)
73
+ session.add(settings)
74
+ session.commit()
75
+ session.refresh(settings)
76
+ return settings
77
+
78
+
79
+ @app.put("/settings", response_model=AppSettings)
80
+ def update_settings(new_settings: AppSettings, session: Session = Depends(get_session)):
81
+ """Update application settings."""
82
+ settings = session.get(AppSettings, 1)
83
+ if not settings:
84
+ settings = AppSettings(id=1)
85
+ session.add(settings)
86
+
87
+ settings.research_focus = new_settings.research_focus
88
+ settings.system_prompt = new_settings.system_prompt
89
+ settings.arxiv_categories = new_settings.arxiv_categories
90
+
91
+ # Parse focus keywords
92
+ if new_settings.research_focus:
93
+ raw_focus = new_settings.research_focus.strip()
94
+ if ";" in raw_focus:
95
+ keywords = [
96
+ k.strip() for k in re.split(r"[;]+", raw_focus)
97
+ if k.strip()
98
+ ]
99
+ else:
100
+ parts = re.split(r"\bOR\b|\bAND\b", raw_focus, flags=re.IGNORECASE)
101
+ keywords = []
102
+ for part in parts:
103
+ cleaned = part.strip()
104
+ if not cleaned:
105
+ continue
106
+ cleaned = re.sub(r"^[()]+|[()]+$", "", cleaned).strip()
107
+ cleaned = re.sub(r"^(?:all|abs|ti):", "", cleaned, flags=re.IGNORECASE).strip()
108
+ cleaned = cleaned.strip('"').strip()
109
+ if cleaned:
110
+ keywords.append(cleaned)
111
+
112
+ seen = set()
113
+ deduped = []
114
+ for keyword in keywords:
115
+ if keyword not in seen:
116
+ deduped.append(keyword)
117
+ seen.add(keyword)
118
+ keywords = deduped
119
+
120
+ settings.focus_keywords = keywords
121
+ else:
122
+ settings.focus_keywords = []
123
+
124
+ session.add(settings)
125
+ session.commit()
126
+ session.refresh(settings)
127
+ return settings
128
+
129
+
130
+ @app.get("/papers", response_model=List[PaperRead])
131
+ def get_papers(
132
+ session: Session = Depends(get_session),
133
+ skip: int = Query(0, ge=0),
134
+ limit: int = Query(20, ge=1, le=100),
135
+ min_score: Optional[float] = Query(None, ge=0, le=10),
136
+ processed_only: bool = Query(False),
137
+ ):
138
+ """Get papers with optional filtering."""
139
+ query = select(Paper).where(
140
+ or_(Paper.processing_status.is_(None), Paper.processing_status != "skipped")
141
+ )
142
+
143
+ if processed_only:
144
+ query = query.where(Paper.is_processed == True)
145
+
146
+ if min_score is not None:
147
+ query = query.where(Paper.relevance_score >= min_score)
148
+
149
+ query = query.order_by(
150
+ Paper.is_processed.desc(),
151
+ Paper.relevance_score.desc().nulls_last(),
152
+ Paper.published.desc()
153
+ ).offset(skip).limit(limit)
154
+ papers = session.exec(query).all()
155
+ return papers
156
+
157
+
158
+ @app.get("/papers/{paper_id}", response_model=PaperRead)
159
+ def get_paper(paper_id: int, session: Session = Depends(get_session)):
160
+ """Get a specific paper by ID."""
161
+ paper = session.get(Paper, paper_id)
162
+ if not paper:
163
+ raise HTTPException(status_code=404, detail="Paper not found")
164
+ return paper
165
+
166
+
167
+ @app.get("/papers/arxiv/{arxiv_id}", response_model=PaperRead)
168
+ def get_paper_by_arxiv_id(arxiv_id: str, session: Session = Depends(get_session)):
169
+ """Get a specific paper by arXiv ID."""
170
+ paper = session.exec(select(Paper).where(Paper.arxiv_id == arxiv_id)).first()
171
+ if not paper:
172
+ raise HTTPException(status_code=404, detail="Paper not found")
173
+ return paper
174
+
175
+
176
+ @app.post("/papers/fetch")
177
+ def fetch_papers(
178
+ background_tasks: BackgroundTasks,
179
+ session: Session = Depends(get_session),
180
+ ):
181
+ """Trigger paper fetching in the background."""
182
+ background_tasks.add_task(run_daily_fetch)
183
+ return {"message": "Paper fetch started in background"}
184
+
185
+
186
+ @app.post("/papers/{paper_id}/process")
187
+ async def process_paper(paper_id: int, session: Session = Depends(get_session)):
188
+ """Process a specific paper with LLM analysis."""
189
+ paper = session.get(Paper, paper_id)
190
+ if not paper:
191
+ raise HTTPException(status_code=404, detail="Paper not found")
192
+
193
+ if paper.is_processed:
194
+ return {"message": "Paper already processed", "paper_id": paper_id}
195
+
196
+ bot = get_arxiv_bot()
197
+ success = await bot.process_paper(session, paper)
198
+
199
+ if success:
200
+ return {"message": "Paper processed successfully", "paper_id": paper_id}
201
+ else:
202
+ raise HTTPException(status_code=500, detail="Failed to process paper")
203
+
204
+
205
+ @app.get("/papers/{paper_id}/process/stream")
206
+ async def process_paper_stream(paper_id: int, session: Session = Depends(get_session)):
207
+ """
208
+ Process a paper with streaming response for real-time updates.
209
+
210
+ Returns Server-Sent Events (SSE) with the following event types:
211
+ - thinking: R1 reasoning process (thought field)
212
+ - answer: Partial answer content
213
+ - progress: Processing progress updates
214
+ - result: Final structured analysis result
215
+ - error: Error information
216
+ - done: Stream completion signal
217
+ """
218
+ paper = session.get(Paper, paper_id)
219
+ if not paper:
220
+ raise HTTPException(status_code=404, detail="Paper not found")
221
+
222
+ async def generate_events():
223
+ """Generate SSE events for paper analysis."""
224
+ try:
225
+ # Update paper status
226
+ paper.processing_status = "processing"
227
+ session.add(paper)
228
+ session.commit()
229
+
230
+ # Send initial progress event
231
+ yield f"event: progress\ndata: {json.dumps({'status': 'started', 'message': '开始分析论文...'})}\n\n"
232
+
233
+ dify_client = get_dify_client()
234
+ thought_parts = []
235
+ answer_parts = []
236
+ final_outputs = None
237
+
238
+ async for event in dify_client.analyze_paper_stream(
239
+ paper.title,
240
+ paper.abstract,
241
+ user_id=f"paper-{paper_id}",
242
+ ):
243
+ # Handle thought (R1 thinking process)
244
+ if event.thought:
245
+ thought_parts.append(event.thought)
246
+ yield f"event: thinking\ndata: {json.dumps({'thought': event.thought})}\n\n"
247
+
248
+ # Handle answer chunks
249
+ if event.answer:
250
+ answer_parts.append(event.answer)
251
+ yield f"event: answer\ndata: {json.dumps({'answer': event.answer})}\n\n"
252
+
253
+ # Handle workflow events
254
+ if event.event == "workflow_started":
255
+ yield f"event: progress\ndata: {json.dumps({'status': 'workflow_started', 'message': 'Dify工作流已启动'})}\n\n"
256
+ elif event.event == "node_started":
257
+ node_title = event.data.get("data", {}).get("title", "")
258
+ if node_title:
259
+ yield f"event: progress\ndata: {json.dumps({'status': 'node_started', 'message': f'执行节点: {node_title}'})}\n\n"
260
+ elif event.event == "workflow_finished":
261
+ if event.outputs:
262
+ final_outputs = event.outputs
263
+
264
+ # Process final result
265
+ if final_outputs:
266
+ result = dify_client._parse_outputs(final_outputs, "".join(thought_parts))
267
+ elif answer_parts:
268
+ result = dify_client._parse_answer("".join(answer_parts), "".join(thought_parts))
269
+ else:
270
+ raise DifyClientError("No output received from Dify workflow")
271
+
272
+ # Convert to LLMAnalysis for database storage
273
+ analysis = dify_client.to_llm_analysis(result)
274
+
275
+ # Update paper with results
276
+ from datetime import datetime
277
+ paper.summary_zh = analysis.summary_zh
278
+ paper.relevance_score = analysis.relevance_score
279
+ paper.relevance_reason = analysis.relevance_reason
280
+ paper.heuristic_idea = analysis.heuristic_idea
281
+ paper.is_processed = True
282
+ paper.processed_at = datetime.utcnow()
283
+
284
+ if analysis.relevance_score >= 5:
285
+ paper.processing_status = "processed"
286
+ else:
287
+ paper.processing_status = "skipped"
288
+
289
+ session.add(paper)
290
+ session.commit()
291
+
292
+ # Send final result
293
+ result_data = {
294
+ "summary_zh": result.summary_zh,
295
+ "relevance_score": result.relevance_score,
296
+ "relevance_reason": result.relevance_reason,
297
+ "technical_mapping": {
298
+ "token_vs_patch": result.technical_mapping.token_vs_patch,
299
+ "temporal_logic": result.technical_mapping.temporal_logic,
300
+ "frequency_domain": result.technical_mapping.frequency_domain,
301
+ },
302
+ "heuristic_idea": result.heuristic_idea,
303
+ "thought_process": result.thought_process,
304
+ }
305
+ yield f"event: result\ndata: {json.dumps(result_data, ensure_ascii=False)}\n\n"
306
+ yield f"event: done\ndata: {json.dumps({'status': 'completed'})}\n\n"
307
+
308
+ except DifyEntityTooLargeError as e:
309
+ paper.processing_status = "failed"
310
+ session.add(paper)
311
+ session.commit()
312
+ yield f"event: error\ndata: {json.dumps({'error': 'entity_too_large', 'message': str(e)})}\n\n"
313
+
314
+ except DifyTimeoutError as e:
315
+ paper.processing_status = "failed"
316
+ session.add(paper)
317
+ session.commit()
318
+ yield f"event: error\ndata: {json.dumps({'error': 'timeout', 'message': str(e)})}\n\n"
319
+
320
+ except DifyRateLimitError as e:
321
+ paper.processing_status = "failed"
322
+ session.add(paper)
323
+ session.commit()
324
+ yield f"event: error\ndata: {json.dumps({'error': 'rate_limit', 'message': str(e)})}\n\n"
325
+
326
+ except DifyClientError as e:
327
+ paper.processing_status = "failed"
328
+ session.add(paper)
329
+ session.commit()
330
+ yield f"event: error\ndata: {json.dumps({'error': 'dify_error', 'message': str(e)})}\n\n"
331
+
332
+ except Exception as e:
333
+ paper.processing_status = "failed"
334
+ session.add(paper)
335
+ session.commit()
336
+ yield f"event: error\ndata: {json.dumps({'error': 'unknown', 'message': str(e)})}\n\n"
337
+
338
+ return StreamingResponse(
339
+ generate_events(),
340
+ media_type="text/event-stream",
341
+ headers={
342
+ "Cache-Control": "no-cache",
343
+ "Connection": "keep-alive",
344
+ "X-Accel-Buffering": "no", # Disable nginx buffering
345
+ },
346
+ )
347
+
348
+
349
+ @app.get("/stats")
350
+ def get_stats(session: Session = Depends(get_session)):
351
+ """Get statistics about papers."""
352
+ total = session.exec(
353
+ select(Paper).where(
354
+ or_(Paper.processing_status.is_(None), Paper.processing_status != "skipped")
355
+ )
356
+ ).all()
357
+ processed = [p for p in total if p.is_processed]
358
+ high_relevance = [p for p in processed if p.relevance_score and p.relevance_score >= 9]
359
+
360
+ return {
361
+ "total_papers": len(total),
362
+ "processed_papers": len(processed),
363
+ "high_relevance_papers": len(high_relevance),
364
+ "pending_processing": len(total) - len(processed),
365
+ }
app/models.py ADDED
@@ -0,0 +1,82 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from datetime import datetime
2
+ from typing import Optional, List
3
+ from sqlmodel import SQLModel, Field, Column, Text
4
+ from sqlalchemy import JSON
5
+ from pydantic import BaseModel
6
+
7
+
8
+ class Paper(SQLModel, table=True):
9
+ """Paper model for storing arXiv papers with AI analysis."""
10
+ id: Optional[int] = Field(default=None, primary_key=True)
11
+ arxiv_id: str = Field(unique=True, index=True)
12
+ title: str
13
+ authors: str
14
+ abstract: str = Field(sa_column=Column(Text))
15
+ categories: str
16
+ published: datetime
17
+ updated: datetime
18
+ pdf_url: str
19
+ thumbnail_url: Optional[str] = None
20
+
21
+ # AI-generated analysis fields
22
+ summary_zh: Optional[str] = Field(default=None, sa_column=Column(Text))
23
+ relevance_score: Optional[float] = Field(default=None, ge=0, le=10)
24
+ relevance_reason: Optional[str] = Field(default=None, sa_column=Column(Text))
25
+ heuristic_idea: Optional[str] = Field(default=None, sa_column=Column(Text))
26
+
27
+ # Metadata
28
+ is_processed: bool = Field(default=False)
29
+ processing_status: str = Field(default="pending", index=True)
30
+ created_at: datetime = Field(default_factory=datetime.utcnow)
31
+ processed_at: Optional[datetime] = None
32
+
33
+
34
+ class PaperCreate(SQLModel):
35
+ """Schema for creating a new paper."""
36
+ arxiv_id: str
37
+ title: str
38
+ authors: str
39
+ abstract: str
40
+ categories: str
41
+ published: datetime
42
+ updated: datetime
43
+ pdf_url: str
44
+
45
+
46
+ class PaperRead(SQLModel):
47
+ """Schema for reading paper data."""
48
+ id: int
49
+ arxiv_id: str
50
+ title: str
51
+ authors: str
52
+ abstract: str
53
+ categories: str
54
+ published: datetime
55
+ updated: datetime
56
+ pdf_url: str
57
+ thumbnail_url: Optional[str] = None
58
+ summary_zh: Optional[str]
59
+ relevance_score: Optional[float]
60
+ relevance_reason: Optional[str]
61
+ heuristic_idea: Optional[str]
62
+ is_processed: bool
63
+ processing_status: str
64
+ created_at: datetime
65
+ processed_at: Optional[datetime]
66
+
67
+
68
+ class LLMAnalysis(BaseModel):
69
+ """Schema for LLM analysis response."""
70
+ summary_zh: str
71
+ relevance_score: float
72
+ relevance_reason: str
73
+ heuristic_idea: str
74
+
75
+
76
+ class AppSettings(SQLModel, table=True):
77
+ """Application settings stored in DB."""
78
+ id: int = Field(default=1, primary_key=True)
79
+ research_focus: str = Field(sa_column=Column(Text, default=""))
80
+ focus_keywords: List[str] = Field(default=[], sa_column=Column(JSON))
81
+ system_prompt: str = Field(sa_column=Column(Text, default=""))
82
+ arxiv_categories: List[str] = Field(sa_column=Column(JSON, default=["cs.CV", "cs.LG"]))
app/services/__init__.py ADDED
File without changes
app/services/arxiv_bot.py ADDED
@@ -0,0 +1,275 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import asyncio
2
+ import os
3
+ import arxiv
4
+ from datetime import datetime, timedelta, timezone
5
+ from typing import List, Optional
6
+ from sqlmodel import Session, select
7
+
8
+ from app.models import Paper, PaperCreate, AppSettings
9
+ from app.database import get_sync_session
10
+ from app.services.pdf_renderer import generate_thumbnail
11
+
12
+
13
+ def _get_analysis_client():
14
+ """
15
+ Get the appropriate analysis client based on configuration.
16
+ Prefers Dify if DIFY_API_KEY is set, otherwise falls back to LLMBrain.
17
+ """
18
+ if os.getenv("DIFY_API_KEY"):
19
+ from app.services.dify_client import get_dify_client
20
+ return get_dify_client(), "dify"
21
+ else:
22
+ from app.services.llm_brain import get_llm_brain
23
+ return get_llm_brain(), "deepseek"
24
+
25
+ class ArxivBot:
26
+ """Bot for fetching and processing arXiv papers."""
27
+
28
+ def __init__(self):
29
+ self.client = arxiv.Client(
30
+ page_size=50,
31
+ delay_seconds=3,
32
+ num_retries=3
33
+ )
34
+
35
+ def build_query(self, session: Session) -> str:
36
+ """Builds a targeted arXiv query using AppSettings or defaults."""
37
+ settings = session.get(AppSettings, 1)
38
+
39
+ # Defaults
40
+ default_categories = ['cs.CV', 'cs.LG', 'cs.CL']
41
+ default_focus = (
42
+ '((ti:transformer OR abs:transformer OR ti:diffusion OR abs:diffusion OR ti:DiT OR abs:DiT) AND '
43
+ '(ti:"kv cache" OR abs:"kv cache" OR ti:compression OR abs:compression OR ti:pruning OR abs:pruning OR '
44
+ 'ti:quantization OR abs:quantization OR ti:sparse OR abs:sparse OR ti:"token merging" OR abs:"token merging" OR '
45
+ 'ti:distillation OR abs:distillation OR ti:efficiency OR abs:efficiency))'
46
+ )
47
+ categories = default_categories
48
+ focus_query = default_focus
49
+
50
+ if settings:
51
+ if settings.arxiv_categories:
52
+ categories = settings.arxiv_categories
53
+
54
+ # Use focus_keywords if available, otherwise fallback to research_focus string or default
55
+ if settings.focus_keywords and (not settings.research_focus or ";" in settings.research_focus):
56
+ # Construct OR logic for keywords: (all:k1) OR (all:"k 2")
57
+ keywords_parts = []
58
+ for k in settings.focus_keywords:
59
+ # Wrap in quotes if it contains spaces and isn't already quoted
60
+ if " " in k and not (k.startswith('"') and k.endswith('"')):
61
+ term = f'"{k}"'
62
+ else:
63
+ term = k
64
+ keywords_parts.append(f'(all:{term})')
65
+
66
+ if keywords_parts:
67
+ focus_query = f"({' OR '.join(keywords_parts)})"
68
+
69
+ elif settings.research_focus and settings.research_focus.strip():
70
+ # Fallback to the raw string if keywords list is empty but string exists (backward compatibility)
71
+ focus_query = f"({settings.research_focus})"
72
+
73
+ # 1. Categories
74
+ cat_query = "(" + " OR ".join([f"cat:{c}" for c in categories]) + ")"
75
+
76
+ # 2. Combine
77
+ final_query = f"{cat_query} AND {focus_query}"
78
+
79
+ return final_query
80
+
81
+ def fetch_recent_papers(
82
+ self,
83
+ session: Session,
84
+ max_results: int = 50,
85
+ hours_back: int = 168, # 7 days to catch weekly arXiv updates
86
+ ) -> List[PaperCreate]:
87
+ """Fetch recent targeted papers from arXiv."""
88
+ query = self.build_query(session)
89
+ print(f"Executing Arxiv Query: {query}")
90
+
91
+ search = arxiv.Search(
92
+ query=query,
93
+ max_results=max_results,
94
+ sort_by=arxiv.SortCriterion.SubmittedDate,
95
+ sort_order=arxiv.SortOrder.Descending,
96
+ )
97
+
98
+ # Use UTC aware time for comparison
99
+ cutoff_date = datetime.now(timezone.utc) - timedelta(hours=hours_back)
100
+ papers = []
101
+
102
+ for result in self.client.results(search):
103
+ # Arxiv dates are timezone aware (UTC)
104
+ if result.published < cutoff_date:
105
+ # Since results are sorted descending, we can stop early
106
+ break
107
+
108
+ paper = PaperCreate(
109
+ arxiv_id=result.entry_id.split("/")[-1],
110
+ title=result.title.replace("\n", " ").strip(),
111
+ authors=", ".join([author.name for author in result.authors]),
112
+ abstract=result.summary.replace("\n", " ").strip(),
113
+ categories=", ".join(result.categories),
114
+ # Convert to naive UTC for DB storage
115
+ published=result.published.astimezone(timezone.utc).replace(tzinfo=None),
116
+ updated=result.updated.astimezone(timezone.utc).replace(tzinfo=None),
117
+ pdf_url=result.pdf_url,
118
+ )
119
+ papers.append(paper)
120
+
121
+ return papers
122
+
123
+ def save_paper(self, session: Session, paper_data: PaperCreate) -> Optional[Paper]:
124
+ """Save a paper to database if not exists."""
125
+ existing = session.exec(
126
+ select(Paper).where(Paper.arxiv_id == paper_data.arxiv_id)
127
+ ).first()
128
+
129
+ if existing:
130
+ return None
131
+
132
+ paper = Paper(**paper_data.model_dump())
133
+ session.add(paper)
134
+ session.commit()
135
+ session.refresh(paper)
136
+ return paper
137
+
138
+ async def process_paper(self, session: Session, paper: Paper) -> bool:
139
+ """Process a paper with LLM analysis and thumbnail generation."""
140
+ if paper.is_processed:
141
+ return False
142
+
143
+ try:
144
+ paper.processing_status = "processing"
145
+ session.add(paper)
146
+ session.commit()
147
+ session.refresh(paper)
148
+
149
+ # Get the appropriate analysis client
150
+ client, client_type = _get_analysis_client()
151
+ settings = session.get(AppSettings, 1)
152
+ system_prompt_override = settings.system_prompt if settings else None
153
+
154
+ # Execute analysis based on client type
155
+ loop = asyncio.get_running_loop()
156
+
157
+ if client_type == "dify":
158
+ # Use Dify client (async, non-streaming for batch processing)
159
+ result = await client.analyze_paper(
160
+ paper.title,
161
+ paper.abstract,
162
+ user_id=f"batch-paper-{paper.id}",
163
+ )
164
+ if result:
165
+ analysis = client.to_llm_analysis(result)
166
+ else:
167
+ analysis = None
168
+ else:
169
+ # Use legacy DeepSeek client (sync, run in executor)
170
+ analysis = await loop.run_in_executor(
171
+ None,
172
+ client.analyze_paper,
173
+ paper.title,
174
+ paper.abstract,
175
+ system_prompt_override,
176
+ )
177
+
178
+ thumbnail_url = await generate_thumbnail(paper.arxiv_id, paper.pdf_url)
179
+
180
+ # Update thumbnail regardless of relevance (visuals are good)
181
+ if thumbnail_url:
182
+ paper.thumbnail_url = thumbnail_url
183
+
184
+ if analysis:
185
+ paper.summary_zh = analysis.summary_zh
186
+ paper.relevance_score = analysis.relevance_score
187
+ paper.relevance_reason = analysis.relevance_reason
188
+ paper.heuristic_idea = analysis.heuristic_idea
189
+ paper.is_processed = True
190
+ paper.processed_at = datetime.utcnow()
191
+
192
+ if analysis.relevance_score >= 9:
193
+ paper.processing_status = "processed"
194
+ elif analysis.relevance_score >= 5:
195
+ paper.processing_status = "processed"
196
+ else:
197
+ paper.processing_status = "skipped"
198
+
199
+ session.add(paper)
200
+ session.commit()
201
+ return True
202
+ paper.processing_status = "failed"
203
+ session.add(paper)
204
+ session.commit()
205
+ session.refresh(paper)
206
+
207
+ except Exception as e:
208
+ print(f"Error processing paper {paper.arxiv_id}: {e}")
209
+ paper.processing_status = "failed"
210
+ session.add(paper)
211
+ session.commit()
212
+
213
+ return False
214
+
215
+
216
+ async def run_daily_fetch_async():
217
+ """Async wrapper for daily fetch logic."""
218
+ print(f"[{datetime.now()}] Starting daily paper fetch...")
219
+
220
+ bot = ArxivBot()
221
+ session = get_sync_session()
222
+
223
+ try:
224
+ # Fetch new papers (Sync)
225
+ # Using run_in_executor to avoid blocking the event loop if this takes time
226
+ # We pass the session to fetch_recent_papers
227
+ loop = asyncio.get_running_loop()
228
+ papers = await loop.run_in_executor(None, bot.fetch_recent_papers, session, 50, 168)
229
+ print(f"Fetched {len(papers)} papers from arXiv")
230
+
231
+ # Save to database (Sync)
232
+ saved_count = 0
233
+ for paper_data in papers:
234
+ paper = bot.save_paper(session, paper_data)
235
+ if paper:
236
+ saved_count += 1
237
+ print(f"Saved {saved_count} new papers to database")
238
+
239
+ # Process unprocessed papers (Async)
240
+ unprocessed = session.exec(
241
+ select(Paper).where(Paper.is_processed == False)
242
+ ).all()
243
+ print(f"Processing {len(unprocessed)} unprocessed papers...")
244
+
245
+ processed_count = 0
246
+ for paper in unprocessed:
247
+ # Await the async process
248
+ if await bot.process_paper(session, paper):
249
+ processed_count += 1
250
+ print(f" Processed: {paper.title[:50]}...")
251
+
252
+ print(f"Processed {processed_count} papers with LLM analysis")
253
+
254
+ except Exception as e:
255
+ print(f"Error in daily fetch: {e}")
256
+ finally:
257
+ session.close()
258
+
259
+ print(f"[{datetime.now()}] Daily fetch completed")
260
+
261
+ def run_daily_fetch():
262
+ """Entry point that runs the async fetcher in a loop."""
263
+ asyncio.run(run_daily_fetch_async())
264
+
265
+
266
+ # Singleton instance
267
+ _arxiv_bot: Optional[ArxivBot] = None
268
+
269
+
270
+ def get_arxiv_bot() -> ArxivBot:
271
+ """Get or create ArxivBot singleton."""
272
+ global _arxiv_bot
273
+ if _arxiv_bot is None:
274
+ _arxiv_bot = ArxivBot()
275
+ return _arxiv_bot
app/services/dify_client.py ADDED
@@ -0,0 +1,341 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Dify Workflow API Client with Streaming Support.
3
+
4
+ This module replaces the DeepSeek direct API calls with Dify Chatflow API,
5
+ supporting streaming responses for long-running R1 reasoning processes.
6
+ """
7
+
8
+ import os
9
+ import json
10
+ import httpx
11
+ from typing import Optional, AsyncGenerator, Dict, Any
12
+ from dataclasses import dataclass
13
+ from dotenv import load_dotenv
14
+
15
+ from app.models import LLMAnalysis
16
+
17
+ load_dotenv()
18
+
19
+
20
+ @dataclass
21
+ class DifyStreamEvent:
22
+ """Represents a single event from Dify's streaming response."""
23
+ event: str
24
+ data: Dict[str, Any]
25
+ thought: Optional[str] = None
26
+ answer: Optional[str] = None
27
+ outputs: Optional[Dict[str, Any]] = None
28
+
29
+
30
+ @dataclass
31
+ class TechnicalMapping:
32
+ """Technical mapping analysis from Dify workflow."""
33
+ token_vs_patch: str = ""
34
+ temporal_logic: str = ""
35
+ frequency_domain: str = ""
36
+
37
+
38
+ @dataclass
39
+ class DifyAnalysisResult:
40
+ """Complete analysis result from Dify workflow."""
41
+ summary_zh: str
42
+ relevance_score: float
43
+ relevance_reason: str
44
+ technical_mapping: TechnicalMapping
45
+ heuristic_idea: str
46
+ thought_process: Optional[str] = None # R1 thinking process
47
+
48
+
49
+ class DifyClientError(Exception):
50
+ """Base exception for Dify client errors."""
51
+ pass
52
+
53
+
54
+ class DifyEntityTooLargeError(DifyClientError):
55
+ """Raised when request payload exceeds Dify's limit (413)."""
56
+ pass
57
+
58
+
59
+ class DifyTimeoutError(DifyClientError):
60
+ """Raised when request times out."""
61
+ pass
62
+
63
+
64
+ class DifyRateLimitError(DifyClientError):
65
+ """Raised when rate limit is exceeded (429)."""
66
+ pass
67
+
68
+
69
+ class DifyClient:
70
+ """Dify Chatflow API client with streaming support."""
71
+
72
+ def __init__(self):
73
+ self.api_key = os.getenv("DIFY_API_KEY")
74
+ if not self.api_key:
75
+ raise ValueError("DIFY_API_KEY environment variable is not set")
76
+
77
+ self.base_url = os.getenv("DIFY_API_BASE", "http://82.157.209.193:8080/v1")
78
+ self.endpoint = f"{self.base_url}/chat-messages"
79
+ self.timeout = httpx.Timeout(120.0, connect=10.0) # 2 min for R1 reasoning
80
+
81
+ def _format_query(
82
+ self,
83
+ topic: str,
84
+ background: str,
85
+ method: str,
86
+ contribution: str,
87
+ ) -> str:
88
+ """Format input according to Dify workflow variable specification."""
89
+ return f"""研究主题:{topic}
90
+ 技术背景:{background}
91
+ 核心方法:{method}
92
+ 预期贡献:{contribution}"""
93
+
94
+ def _build_request_body(
95
+ self,
96
+ query: str,
97
+ user_id: str = "paper-insight-user",
98
+ conversation_id: Optional[str] = None,
99
+ ) -> Dict[str, Any]:
100
+ """Build the request body for Dify API."""
101
+ body = {
102
+ "inputs": {
103
+ "query": query,
104
+ },
105
+ "query": query, # Also send as direct query for compatibility
106
+ "response_mode": "streaming",
107
+ "user": user_id,
108
+ }
109
+
110
+ if conversation_id:
111
+ body["conversation_id"] = conversation_id
112
+
113
+ return body
114
+
115
+ def _get_headers(self) -> Dict[str, str]:
116
+ """Get request headers with authentication."""
117
+ return {
118
+ "Authorization": f"Bearer {self.api_key}",
119
+ "Content-Type": "application/json",
120
+ }
121
+
122
+ async def analyze_paper_stream(
123
+ self,
124
+ title: str,
125
+ abstract: str,
126
+ user_id: str = "paper-insight-user",
127
+ ) -> AsyncGenerator[DifyStreamEvent, None]:
128
+ """
129
+ Analyze a paper using Dify workflow with streaming.
130
+
131
+ Yields DifyStreamEvent objects for each SSE event received.
132
+ """
133
+ # Format the query using paper information
134
+ query = self._format_query(
135
+ topic=title,
136
+ background="arXiv论文,需要分析其与DiT/KV Cache研究的相关性",
137
+ method=abstract[:500] if len(abstract) > 500 else abstract,
138
+ contribution="待分析",
139
+ )
140
+
141
+ body = self._build_request_body(query, user_id)
142
+ headers = self._get_headers()
143
+
144
+ async with httpx.AsyncClient(timeout=self.timeout) as client:
145
+ try:
146
+ async with client.stream(
147
+ "POST",
148
+ self.endpoint,
149
+ json=body,
150
+ headers=headers,
151
+ ) as response:
152
+ # Handle error responses
153
+ if response.status_code == 413:
154
+ raise DifyEntityTooLargeError(
155
+ "Request payload too large. Consider shortening the abstract."
156
+ )
157
+ elif response.status_code == 429:
158
+ raise DifyRateLimitError(
159
+ "Rate limit exceeded. Please try again later."
160
+ )
161
+ elif response.status_code >= 400:
162
+ error_text = await response.aread()
163
+ raise DifyClientError(
164
+ f"Dify API error {response.status_code}: {error_text.decode()}"
165
+ )
166
+
167
+ # Parse SSE stream
168
+ buffer = ""
169
+ async for chunk in response.aiter_text():
170
+ buffer += chunk
171
+ # Process complete SSE events
172
+ while "\n\n" in buffer:
173
+ event_str, buffer = buffer.split("\n\n", 1)
174
+ event = self._parse_sse_event(event_str)
175
+ if event:
176
+ yield event
177
+
178
+ except httpx.TimeoutException as e:
179
+ raise DifyTimeoutError(f"Request timed out: {e}")
180
+ except httpx.RequestError as e:
181
+ raise DifyClientError(f"Request failed: {e}")
182
+
183
+ def _parse_sse_event(self, event_str: str) -> Optional[DifyStreamEvent]:
184
+ """Parse a single SSE event string into DifyStreamEvent."""
185
+ lines = event_str.strip().split("\n")
186
+ event_type = ""
187
+ data_str = ""
188
+
189
+ for line in lines:
190
+ if line.startswith("event:"):
191
+ event_type = line[6:].strip()
192
+ elif line.startswith("data:"):
193
+ data_str = line[5:].strip()
194
+
195
+ if not data_str:
196
+ return None
197
+
198
+ try:
199
+ data = json.loads(data_str)
200
+ except json.JSONDecodeError:
201
+ return None
202
+
203
+ event = DifyStreamEvent(
204
+ event=event_type or data.get("event", ""),
205
+ data=data,
206
+ )
207
+
208
+ # Extract common fields
209
+ if "thought" in data:
210
+ event.thought = data["thought"]
211
+ if "answer" in data:
212
+ event.answer = data["answer"]
213
+ if "outputs" in data:
214
+ event.outputs = data["outputs"]
215
+
216
+ return event
217
+
218
+ async def analyze_paper(
219
+ self,
220
+ title: str,
221
+ abstract: str,
222
+ user_id: str = "paper-insight-user",
223
+ ) -> Optional[DifyAnalysisResult]:
224
+ """
225
+ Analyze a paper and return the complete result.
226
+
227
+ This method consumes the entire stream and returns the final result.
228
+ Use analyze_paper_stream() for real-time streaming updates.
229
+ """
230
+ thought_parts = []
231
+ answer_parts = []
232
+ final_outputs = None
233
+
234
+ try:
235
+ async for event in self.analyze_paper_stream(title, abstract, user_id):
236
+ if event.thought:
237
+ thought_parts.append(event.thought)
238
+ if event.answer:
239
+ answer_parts.append(event.answer)
240
+ if event.outputs:
241
+ final_outputs = event.outputs
242
+
243
+ # Check for workflow completion
244
+ if event.event == "workflow_finished" and event.outputs:
245
+ final_outputs = event.outputs
246
+
247
+ # Parse the final outputs
248
+ if final_outputs:
249
+ return self._parse_outputs(final_outputs, "".join(thought_parts))
250
+
251
+ # Try to parse from answer if outputs not available
252
+ full_answer = "".join(answer_parts)
253
+ if full_answer:
254
+ return self._parse_answer(full_answer, "".join(thought_parts))
255
+
256
+ return None
257
+
258
+ except DifyClientError as e:
259
+ print(f"Dify analysis error: {e}")
260
+ return None
261
+
262
+ def _parse_outputs(
263
+ self,
264
+ outputs: Dict[str, Any],
265
+ thought_process: str = "",
266
+ ) -> DifyAnalysisResult:
267
+ """Parse Dify workflow outputs into DifyAnalysisResult."""
268
+ technical_mapping = TechnicalMapping()
269
+ if "technical_mapping" in outputs:
270
+ tm = outputs["technical_mapping"]
271
+ if isinstance(tm, dict):
272
+ technical_mapping = TechnicalMapping(
273
+ token_vs_patch=tm.get("token_vs_patch", ""),
274
+ temporal_logic=tm.get("temporal_logic", ""),
275
+ frequency_domain=tm.get("frequency_domain", ""),
276
+ )
277
+
278
+ return DifyAnalysisResult(
279
+ summary_zh=outputs.get("summary_zh", ""),
280
+ relevance_score=float(outputs.get("relevance_score", 0)),
281
+ relevance_reason=outputs.get("relevance_reason", ""),
282
+ technical_mapping=technical_mapping,
283
+ heuristic_idea=outputs.get("heuristic_idea", ""),
284
+ thought_process=thought_process if thought_process else None,
285
+ )
286
+
287
+ def _parse_answer(
288
+ self,
289
+ answer: str,
290
+ thought_process: str = "",
291
+ ) -> Optional[DifyAnalysisResult]:
292
+ """Parse answer string (JSON) into DifyAnalysisResult."""
293
+ try:
294
+ data = json.loads(answer)
295
+ return self._parse_outputs(data, thought_process)
296
+ except json.JSONDecodeError:
297
+ # If not JSON, try to extract fields manually
298
+ return DifyAnalysisResult(
299
+ summary_zh=answer[:200] if answer else "",
300
+ relevance_score=0,
301
+ relevance_reason="无法解析结构化输出",
302
+ technical_mapping=TechnicalMapping(),
303
+ heuristic_idea="",
304
+ thought_process=thought_process if thought_process else None,
305
+ )
306
+
307
+ def to_llm_analysis(self, result: DifyAnalysisResult) -> LLMAnalysis:
308
+ """Convert DifyAnalysisResult to legacy LLMAnalysis model."""
309
+ # Combine technical mapping into heuristic_idea for backward compatibility
310
+ tech_mapping_str = ""
311
+ if result.technical_mapping:
312
+ parts = []
313
+ if result.technical_mapping.token_vs_patch:
314
+ parts.append(f"Token/Patch映射: {result.technical_mapping.token_vs_patch}")
315
+ if result.technical_mapping.temporal_logic:
316
+ parts.append(f"时序逻辑: {result.technical_mapping.temporal_logic}")
317
+ if result.technical_mapping.frequency_domain:
318
+ parts.append(f"频域分析: {result.technical_mapping.frequency_domain}")
319
+ if parts:
320
+ tech_mapping_str = "\n\n【技术映射】\n" + "\n".join(parts)
321
+
322
+ heuristic_with_mapping = result.heuristic_idea + tech_mapping_str
323
+
324
+ return LLMAnalysis(
325
+ summary_zh=result.summary_zh,
326
+ relevance_score=result.relevance_score,
327
+ relevance_reason=result.relevance_reason,
328
+ heuristic_idea=heuristic_with_mapping,
329
+ )
330
+
331
+
332
+ # Singleton instance
333
+ _dify_client: Optional[DifyClient] = None
334
+
335
+
336
+ def get_dify_client() -> DifyClient:
337
+ """Get or create DifyClient singleton."""
338
+ global _dify_client
339
+ if _dify_client is None:
340
+ _dify_client = DifyClient()
341
+ return _dify_client
app/services/llm_brain.py ADDED
@@ -0,0 +1,105 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import os
2
+ import json
3
+ from typing import Optional
4
+ from openai import OpenAI
5
+ from dotenv import load_dotenv
6
+
7
+ from app.models import LLMAnalysis
8
+
9
+ load_dotenv()
10
+
11
+ SYSTEM_PROMPT = """你是一名资深 AI 研究员,专注于 **Autoregressive Diffusion Transformers (DiT)** 的推理加速与 **KV Cache 压缩**。
12
+ 你的核心能力是**跨领域技术迁移**:你能敏锐地从 LLM(大语言模型)或 ViT(视觉 Transformer)的优化论文中,提取出能应用到 DiT 视频/图像生成上的灵感。
13
+
14
+ 你的任务是阅读给定的论文摘要,并按以下 JSON 格式输出分析结果:
15
+
16
+ {
17
+ "summary_zh": "中文一句话概括核心贡献(直击痛点,如:'提出了一种基于Token重要性的动态剪枝方法,减少50% FLOPs')。",
18
+ "relevance_score": 0-10 评分。
19
+ "relevance_reason": "简述评分理由。如果是 DiT 相关给高分;如果是 LLM KV Cache 相关,评估其迁移潜力。",
20
+ "heuristic_idea": "【核心价值】这是最重要的部分。请进行逻辑推演和思维发散:\n1. **如果这是 LLM 的论文**:它的 'Token' 对应 DiT 的 'Patch' 吗?它的 '序列长度' 对应 DiT 的 '时间步(Timestep)' 还是 '空间分辨率'?\n2. **如果这是剪枝/量化**:DiT 的扩散过程中,早期和晚期的时间步对精度的敏感度不同,这篇论文的方法能利用这一点吗?\n3. **具体建议**:给出一个具体的、可实验的 Idea(例如:'尝试将此文的 Window Attention 机制应用到 DiT 的前 50% 去噪步中')。"
21
+ }
22
+
23
+ 评分标准:
24
+ - **9-10**: 直接针对 DiT/Video Diffusion 的加速/缓存优化。
25
+ - **7-8**: 高质量的 LLM KV Cache、ViT 剪枝、Token Merging 论文,且迁移路径清晰。
26
+ - **4-6**: 通用的 Transformer 量化/硬件加速,参考价值中等。
27
+ - **0-3**: 纯 NLP 任务(如 RAG、Prompt Engineering)或与生成/架构无关。
28
+
29
+ 注意:必须返回纯 JSON 格式,无额外文本。"""
30
+
31
+
32
+ class LLMBrain:
33
+ """DeepSeek LLM client for paper analysis."""
34
+
35
+ def __init__(self):
36
+ api_key = os.getenv("DEEPSEEK_API_KEY")
37
+ if not api_key:
38
+ raise ValueError("DEEPSEEK_API_KEY environment variable is not set")
39
+
40
+ self.client = OpenAI(
41
+ api_key=api_key,
42
+ base_url=os.getenv("DEEPSEEK_BASE_URL", "https://api.deepseek.com"),
43
+ )
44
+ self.model = os.getenv("DEEPSEEK_MODEL", "deepseek-chat")
45
+
46
+ def analyze_paper(
47
+ self,
48
+ title: str,
49
+ abstract: str,
50
+ system_prompt_override: Optional[str] = None,
51
+ ) -> Optional[LLMAnalysis]:
52
+ """Analyze a paper and return structured analysis."""
53
+ user_prompt = f"""请分析以下论文:
54
+
55
+ 标题: {title}
56
+
57
+ 摘要: {abstract}
58
+
59
+ 请按照系统提示中的JSON格式返回分析结果。"""
60
+ system_prompt = SYSTEM_PROMPT
61
+ if system_prompt_override:
62
+ system_prompt = f"{SYSTEM_PROMPT}\n\n用户补充要求:\n{system_prompt_override}"
63
+
64
+ try:
65
+ response = self.client.chat.completions.create(
66
+ model=self.model,
67
+ messages=[
68
+ {"role": "system", "content": system_prompt},
69
+ {"role": "user", "content": user_prompt},
70
+ ],
71
+ max_tokens=1500,
72
+ temperature=0.3,
73
+ response_format={"type": "json_object"},
74
+ )
75
+
76
+ content = response.choices[0].message.content
77
+ if not content:
78
+ return None
79
+
80
+ data = json.loads(content)
81
+ return LLMAnalysis(
82
+ summary_zh=data.get("summary_zh", ""),
83
+ relevance_score=float(data.get("relevance_score", 0)),
84
+ relevance_reason=data.get("relevance_reason", ""),
85
+ heuristic_idea=data.get("heuristic_idea", ""),
86
+ )
87
+
88
+ except json.JSONDecodeError as e:
89
+ print(f"JSON parsing error: {e}")
90
+ return None
91
+ except Exception as e:
92
+ print(f"LLM analysis error: {e}")
93
+ return None
94
+
95
+
96
+ # Singleton instance
97
+ _llm_brain: Optional[LLMBrain] = None
98
+
99
+
100
+ def get_llm_brain() -> LLMBrain:
101
+ """Get or create LLMBrain singleton."""
102
+ global _llm_brain
103
+ if _llm_brain is None:
104
+ _llm_brain = LLMBrain()
105
+ return _llm_brain
app/services/pdf_renderer.py ADDED
@@ -0,0 +1,81 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import fitz # pymupdf
2
+ import httpx
3
+ from pathlib import Path
4
+ from typing import Optional
5
+ import asyncio
6
+
7
+ # Constants
8
+ STATIC_DIR = Path(__file__).parent.parent / "static"
9
+ THUMBNAILS_DIR = STATIC_DIR / "thumbnails"
10
+
11
+ # Ensure directories exist
12
+ THUMBNAILS_DIR.mkdir(parents=True, exist_ok=True)
13
+
14
+
15
+ async def generate_thumbnail(arxiv_id: str, pdf_url: str) -> Optional[str]:
16
+ """
17
+ Generates a JPG thumbnail from the first page of an arXiv PDF.
18
+
19
+ Args:
20
+ arxiv_id: The arXiv ID of the paper (used for filename).
21
+ pdf_url: The URL to download the PDF from.
22
+
23
+ Returns:
24
+ str: Relative URL path to the thumbnail (e.g., "/static/thumbnails/1234.5678.jpg")
25
+ None: If generation fails.
26
+ """
27
+ filename = f"{arxiv_id}.jpg"
28
+ file_path = THUMBNAILS_DIR / filename
29
+ relative_url = f"/static/thumbnails/{filename}"
30
+
31
+ # 1. Check cache
32
+ if file_path.exists():
33
+ return relative_url
34
+
35
+ try:
36
+ # 2. Download PDF
37
+ async with httpx.AsyncClient(follow_redirects=True, timeout=30.0) as client:
38
+ # Arxiv often requires a user agent
39
+ headers = {
40
+ "User-Agent": "PaperInsight/1.0 (mailto:your-email@example.com)"
41
+ }
42
+ # Modify URL to ensure direct PDF link if needed (arxiv often redirects /abs/ to /pdf/)
43
+ # Usually input pdf_url is already correct (e.g. http://arxiv.org/pdf/2312.00001v1)
44
+
45
+ response = await client.get(pdf_url, headers=headers)
46
+ response.raise_for_status()
47
+ pdf_data = response.content
48
+
49
+ # 3. Render Thumbnail (offload CPU-bound work)
50
+ success = await asyncio.to_thread(_render_thumbnail, pdf_data, file_path)
51
+ if not success:
52
+ return None
53
+
54
+ return relative_url
55
+
56
+ except httpx.HTTPStatusError as e:
57
+ print(f"Failed to download PDF for {arxiv_id}: {e}")
58
+ return None
59
+ except Exception as e:
60
+ print(f"Error generating thumbnail for {arxiv_id}: {e}")
61
+ return None
62
+
63
+
64
+ def _render_thumbnail(pdf_data: bytes, file_path: Path) -> bool:
65
+ """Render the first page of a PDF to a thumbnail on disk."""
66
+ doc = None
67
+ try:
68
+ doc = fitz.open(stream=pdf_data, filetype="pdf")
69
+ if len(doc) < 1:
70
+ return False
71
+ page = doc[0]
72
+ # Matrix(1.5, 1.5) increases resolution for better quality
73
+ pix = page.get_pixmap(matrix=fitz.Matrix(1.5, 1.5))
74
+ pix.save(str(file_path))
75
+ return True
76
+ except Exception as e:
77
+ print(f"Error rendering thumbnail: {e}")
78
+ return False
79
+ finally:
80
+ if doc is not None:
81
+ doc.close()
backfill_thumbnails.py ADDED
@@ -0,0 +1,48 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import asyncio
2
+ import sys
3
+ from pathlib import Path
4
+
5
+ # Add backend directory to python path so we can import app modules
6
+ sys.path.append(str(Path(__file__).parent))
7
+
8
+ from sqlmodel import select
9
+ from app.database import get_sync_session
10
+ from app.models import Paper
11
+ from app.services.pdf_renderer import generate_thumbnail
12
+
13
+ async def backfill_thumbnails():
14
+ print("Starting thumbnail backfill...")
15
+ session = get_sync_session()
16
+
17
+ try:
18
+ # Find papers without thumbnails
19
+ statement = select(Paper).where(Paper.thumbnail_url == None)
20
+ papers = session.exec(statement).all()
21
+
22
+ print(f"Found {len(papers)} papers needing thumbnails.")
23
+
24
+ for i, paper in enumerate(papers):
25
+ print(f"[{i+1}/{len(papers)}] Processing {paper.arxiv_id}...")
26
+
27
+ if not paper.pdf_url:
28
+ print(f" Skipping {paper.arxiv_id}: No PDF URL")
29
+ continue
30
+
31
+ thumbnail_url = await generate_thumbnail(paper.arxiv_id, paper.pdf_url)
32
+
33
+ if thumbnail_url:
34
+ paper.thumbnail_url = thumbnail_url
35
+ session.add(paper)
36
+ session.commit()
37
+ print(f" Generated: {thumbnail_url}")
38
+ else:
39
+ print(f" Failed to generate thumbnail for {paper.arxiv_id}")
40
+
41
+ except Exception as e:
42
+ print(f"Error during backfill: {e}")
43
+ finally:
44
+ session.close()
45
+ print("Backfill completed.")
46
+
47
+ if __name__ == "__main__":
48
+ asyncio.run(backfill_thumbnails())
main.py ADDED
@@ -0,0 +1,65 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import os
2
+ from typing import Any, Dict, List, Optional
3
+
4
+ import psycopg2
5
+ import psycopg2.extras
6
+ from fastapi import FastAPI, HTTPException, Query
7
+ from fastapi.middleware.cors import CORSMiddleware
8
+
9
+ DATABASE_URL = os.getenv("DATABASE_URL", "")
10
+
11
+ app = FastAPI()
12
+
13
+ app.add_middleware(
14
+ CORSMiddleware,
15
+ allow_origin_regex=r"^https://.*\.github\.io$",
16
+ allow_credentials=True,
17
+ allow_methods=["*"],
18
+ allow_headers=["*"],
19
+ )
20
+
21
+
22
+ @app.get("/")
23
+ def root() -> Dict[str, str]:
24
+ return {"status": "ok"}
25
+
26
+
27
+ @app.get("/health")
28
+ def health() -> Dict[str, str]:
29
+ return {"status": "ok"}
30
+
31
+
32
+ def fetch_papers(limit: int, offset: int) -> List[Dict[str, Any]]:
33
+ if not DATABASE_URL:
34
+ raise HTTPException(status_code=500, detail="DATABASE_URL is not set")
35
+
36
+ table_candidates = ["paper", "papers"]
37
+ last_error: Optional[Exception] = None
38
+
39
+ for table in table_candidates:
40
+ try:
41
+ with psycopg2.connect(DATABASE_URL) as conn:
42
+ with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
43
+ cur.execute(
44
+ f"SELECT * FROM {table} ORDER BY published DESC NULLS LAST LIMIT %s OFFSET %s",
45
+ (limit, offset),
46
+ )
47
+ rows = cur.fetchall()
48
+ return [dict(row) for row in rows]
49
+ except psycopg2.errors.UndefinedTable as exc:
50
+ last_error = exc
51
+ continue
52
+ except Exception as exc:
53
+ last_error = exc
54
+ break
55
+
56
+ detail = str(last_error) if last_error else "paper table not found"
57
+ raise HTTPException(status_code=500, detail=f"Database error: {detail}")
58
+
59
+
60
+ @app.get("/papers")
61
+ def papers(
62
+ limit: int = Query(20, ge=1, le=100),
63
+ offset: int = Query(0, ge=0),
64
+ ) -> List[Dict[str, Any]]:
65
+ return fetch_papers(limit, offset)
paper_insight.db ADDED
Binary file (20.5 kB). View file
 
pyproject.toml ADDED
@@ -0,0 +1,19 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ [project]
2
+ name = "backend"
3
+ version = "0.1.0"
4
+ description = "Paper Insight - arXiv paper fetching and summarization API"
5
+ readme = "README.md"
6
+ requires-python = ">=3.12"
7
+ dependencies = [
8
+ "apscheduler>=3.11.2",
9
+ "arxiv>=2.3.1",
10
+ "fastapi>=0.128.0",
11
+ "httpx>=0.28.1",
12
+ "openai>=2.14.0",
13
+ "psycopg2-binary>=2.9.11",
14
+ "pymupdf>=1.26.7",
15
+ "python-dotenv>=1.2.1",
16
+ "socksio>=1.0.0",
17
+ "sqlmodel>=0.0.31",
18
+ "uvicorn>=0.40.0",
19
+ ]
uv.lock ADDED
The diff for this file is too large to render. See raw diff