Spaces:
No application file
No application file
| import json | |
| import hashlib | |
| import datetime | |
| import requests | |
| import os | |
| import gradio as gr | |
| from datetime import datetime, timedelta | |
| from dataclasses import dataclass | |
| from datetime import datetime | |
| from typing import List, Optional, Any, Dict | |
| # 修改后的数据类(添加 Optional 和默认值) | |
| class Author: | |
| _id: Optional[str] = None | |
| name: Optional[str] = None | |
| hidden: Optional[bool] = None | |
| class Paper: | |
| id: Optional[str] = None | |
| authors: List[Author] = None | |
| publishedAt: Optional[datetime] = None | |
| title: Optional[str] = None | |
| summary: Optional[str] = None | |
| upvotes: Optional[int] = None | |
| discussionId: Optional[str] = None | |
| class SubmittedBy: | |
| _id: Optional[str] = None | |
| avatarUrl: Optional[str] = None | |
| fullname: Optional[str] = None | |
| name: Optional[str] = None | |
| type: Optional[str] = None | |
| isPro: Optional[bool] = None | |
| isHf: Optional[bool] = None | |
| isMod: Optional[bool] = None | |
| followerCount: Optional[int] = None | |
| class Article: | |
| paper: Optional[Paper] = None | |
| publishedAt: Optional[datetime] = None | |
| title: Optional[str] = None | |
| thumbnail: Optional[str] = None | |
| numComments: Optional[int] = None | |
| submittedBy: Optional[SubmittedBy] = None | |
| isAuthorParticipating: Optional[bool] = None | |
| def safe_get(data: Dict, *keys: str) -> Any: | |
| """安全获取嵌套字典值""" | |
| for key in keys: | |
| data = data.get(key, {}) if isinstance(data, dict) else None | |
| return data if data != {} else None | |
| def parse_article(data: Dict[str, Any]) -> Article: | |
| """容错式解析函数""" | |
| def parse_datetime(dt_str: Optional[str]) -> Optional[datetime]: | |
| """安全解析时间""" | |
| if not dt_str: | |
| return None | |
| try: | |
| if dt_str.endswith('Z'): | |
| dt_str = dt_str[:-1] + '+00:00' | |
| return datetime.fromisoformat(dt_str) | |
| except ValueError: | |
| return None | |
| # 解析作者列表 | |
| authors = [] | |
| for author_data in safe_get(data, "paper", "authors") or []: | |
| authors.append(Author( | |
| _id=author_data.get("_id"), | |
| name=author_data.get("name"), | |
| hidden=author_data.get("hidden") | |
| )) | |
| # 解析论文 | |
| paper = Paper( | |
| id=safe_get(data, "paper", "id"), | |
| authors=authors, | |
| publishedAt=parse_datetime(safe_get(data, "paper", "publishedAt")), | |
| title=safe_get(data, "paper", "title"), | |
| summary=safe_get(data, "paper", "summary"), | |
| upvotes=safe_get(data, "paper", "upvotes"), | |
| discussionId=safe_get(data, "paper", "discussionId") | |
| ) if safe_get(data, "paper") else None | |
| # 解析提交者 | |
| submitted_by_data = safe_get(data, "submittedBy") | |
| submitted_by = SubmittedBy( | |
| _id=submitted_by_data.get("_id") if submitted_by_data else None, | |
| avatarUrl=submitted_by_data.get("avatarUrl") if submitted_by_data else None, | |
| fullname=submitted_by_data.get("fullname") if submitted_by_data else None, | |
| name=submitted_by_data.get("name") if submitted_by_data else None, | |
| type=submitted_by_data.get("type") if submitted_by_data else None, | |
| isPro=submitted_by_data.get("isPro") if submitted_by_data else None, | |
| isHf=submitted_by_data.get("isHf") if submitted_by_data else None, | |
| isMod=submitted_by_data.get("isMod") if submitted_by_data else None, | |
| followerCount=submitted_by_data.get("followerCount") if submitted_by_data else None | |
| ) if submitted_by_data else None | |
| # 构建最终对象 | |
| return Article( | |
| paper=paper, | |
| publishedAt=parse_datetime(data.get("publishedAt")), | |
| title=data.get("title"), | |
| thumbnail=data.get("thumbnail"), | |
| numComments=data.get("numComments"), | |
| submittedBy=submitted_by, | |
| isAuthorParticipating=data.get("isAuthorParticipating") | |
| ) | |
| API_URL = "https://huggingface.co/api/daily_papers" | |
| cache = {} | |
| def make_request(url: str): | |
| # Create a hash of the URL to use as the cache key | |
| url_hash = hashlib.md5(url.encode()).hexdigest() | |
| # Check if the response is already cached | |
| if url_hash in cache: | |
| print(f"Cache hit for URL: {url}") | |
| return cache[url_hash] | |
| http_proxy = os.getenv("HF_HTTP_PROXY") | |
| https_proxy = os.getenv("HF_HTTPS_PROXY") | |
| proxies = { | |
| "http": http_proxy, | |
| "https": https_proxy | |
| } if http_proxy or https_proxy else None | |
| attempts = 0 | |
| while attempts < 3: | |
| try: | |
| response = requests.get(url, proxies=proxies) | |
| response.raise_for_status() | |
| data = response.json() | |
| # Cache the response | |
| cache[url_hash] = data | |
| return data | |
| except requests.RequestException as e: | |
| attempts += 1 | |
| print(f"Attempt {attempts} failed: {e}") | |
| if attempts == 3: | |
| return [] | |
| def fetch_papers(): | |
| data = make_request(API_URL) | |
| return [parse_article(item) for item in data] | |
| def fetch_papers_with_date(date: datetime): | |
| formatted_date = date.strftime("%Y-%m-%d") | |
| data = make_request(API_URL + "?date=" + formatted_date) | |
| return [parse_article(item) for item in data] | |
| def fetch_papers_with_daterange(start_date: datetime, end_date: datetime): | |
| # return [] | |
| # 每天的数据都是独立的,所以只需要遍历日期范围即可 | |
| articles = [] | |
| current_date = start_date | |
| while current_date <= end_date: | |
| print(current_date) | |
| articles.extend(fetch_papers_with_date(current_date)) | |
| print(f"Total articles: {len(articles)}") | |
| current_date += datetime.timedelta(days=1) | |
| # 根据每个文章的.paper.id去重 | |
| unique_articles = {} | |
| for article in articles: | |
| if article.paper.id not in unique_articles: | |
| unique_articles[article.paper.id] = article | |
| return list(unique_articles.values()) | |
| def sort_by_date(articles): | |
| return sorted(articles, key=lambda x: x.publishedAt, reverse=True) | |
| def sort_by_upvotes(articles): | |
| return sorted(articles, key=lambda x: x.paper.upvotes, reverse=True) | |
| def sort_by_comments(articles): | |
| return sorted(articles, key=lambda x: x.numComments, reverse=True) | |
| def format_author(author): | |
| """格式化作者信息""" | |
| if not author: | |
| return "" | |
| hidden_status = "(隐藏)" if author.hidden else "" | |
| if author.name: | |
| return f"<a href='https://scholar.google.com/citations?view_op=search_authors&mauthors={author.name.replace(' ', '+')}'>{author.name}</a>{hidden_status}" | |
| return f"匿名作者{hidden_status}" | |
| def format_paper_info(article): | |
| """生成论文展示的 HTML 内容""" | |
| if not article.paper: | |
| return "论文信息缺失" | |
| info = [] | |
| # 标题部分 | |
| info.append(f"<h2>{article.title or '无标题论文'}</h2>") | |
| # 缩略图 | |
| if article.thumbnail: | |
| info.append(f"<p><img src='{article.thumbnail}' style='max-width: 30em; width: 100%; margin: auto'/></p>") | |
| # 基本信息 | |
| info.append(f"<p><strong>论文 ID</strong>:<a href='https://huggingface.co/papers/{article.paper.id}'>{article.paper.id or '未知'}</a></p>") | |
| info.append(f"<p><strong>发布时间</strong>:{article.paper.publishedAt.strftime('%Y-%m-%d %H:%M') if article.paper.publishedAt else '未知'}</p>") | |
| # 作者信息 | |
| authors = "、".join([format_author(a) for a in article.paper.authors]) if article.paper.authors else "作者信息暂缺" | |
| info.append(f"<p><strong>作者</strong>:{authors}</p>") | |
| # 摘要 | |
| if article.paper.summary: | |
| summary = article.paper.summary.replace('{{', '{').replace('}}', '}').replace('\n', ' ') | |
| info.append(f"<h3>摘要</h3><p>{summary}</p>") | |
| # 讨论信息 | |
| info.append(f"<p><strong>点赞数</strong>:{article.paper.upvotes or 0}<span style='margin-left: .5rem'></span>") | |
| info.append(f"<strong>评论数</strong>:{article.numComments or 0}</p>") | |
| if article.paper.discussionId: | |
| info.append(f"<a href='https://huggingface.co/papers/{article.paper.id}/discussion/{article.paper.discussionId}'>进入讨论</a></p>") | |
| # 提交者信息 | |
| if article.submittedBy: | |
| submitter = article.submittedBy | |
| info.append(f"<hr><p><strong>提交者</strong>: ") | |
| info.append( | |
| f"<span><img src='{submitter.avatarUrl}' class='author' /></span>{submitter.fullname}(<a href='https://huggingface.co/{submitter.name}'>@{submitter.name}</a>) ") | |
| info.append(f"粉丝数:{submitter.followerCount or 0}</p>") | |
| return "".join(info) | |
| def generate_table_html(papers): | |
| """生成带可点击标题的表格 HTML""" | |
| html = ['<table class="paper-table"><tr><th>标题</th><th>👍点赞</th><th>💬评论</th><th>📅日期</th></tr>'] | |
| for article in papers: | |
| title = article.title or "无标题" | |
| upvotes = article.paper.upvotes or 0 | |
| comments = article.numComments or 0 | |
| date = article.paper.publishedAt.strftime("%Y-%m-%d") if article.paper.publishedAt else "未知" | |
| paper_id = article.paper.id | |
| row = f""" | |
| <tr> | |
| <td><a class="paper-title" href="javascript:void(0)" onclick="showDetail('{paper_id}')">{title}</a></td> | |
| <td>{upvotes}</td> | |
| <td>{comments}</td> | |
| <td>{date}</td> | |
| </tr> | |
| """ | |
| html.append(row) | |
| html.append("</table>") | |
| return "".join(html) | |
| def build_html(papers): | |
| # 将所有的papers转换为一个html字符串,每个paper用一个div包裹,div内部包含paper的信息,div的id为paper的id | |
| html = "" | |
| for article in papers: | |
| article_html = format_paper_info(article) | |
| html += f"<div id='smartflow-paper-{article.paper.id.replace('.', '-')}' style='display: none'>{article_html}</div>" | |
| return html | |
| def query_papers(start_date_str, end_date_str): | |
| """处理日期查询""" | |
| try: | |
| start_date = datetime.strptime(start_date_str, "%Y-%m-%d") | |
| end_date = datetime.strptime(end_date_str, "%Y-%m-%d") | |
| papers = fetch_papers_with_daterange(start_date, end_date) | |
| papers = sort_by_upvotes(papers) | |
| return generate_table_html(papers), build_html(papers) | |
| except Exception as e: | |
| print(f"查询出错: {e}") | |
| return "<p>⚠️ 查询失败,请检查日期格式(YYYY-MM-DD)</p>", "<p>⚠️ 查询失败,请检查日期格式(YYYY-MM-DD)</p>" | |
| def show_detail(paper_id, papers): | |
| """显示论文详情""" | |
| if not papers: | |
| return "请先进行查询" | |
| return build_html(papers) | |
| # CSS 样式(可放入单独文件) | |
| custom_css = """ | |
| .paper-table { width: 100%; border-collapse: collapse; } | |
| .paper-table td { padding: 12px; border-bottom: 1px solid #ddd; } | |
| .paper-table th { font-weight: bold; background: #f9f9f920; } | |
| .paper-table tr:hover { background: #f9f9f920; } | |
| .paper-title { color: #1a73e8; cursor: pointer; text-decoration: none !important; } | |
| .paper-title:hover { text-decoration: underline !important; } | |
| .paper-table td:nth-child(2), .paper-table td:nth-child(3), .paper-table td:nth-child(4) { text-align: center; } | |
| .paper-table th:nth-child(2), .paper-table th:nth-child(3), .paper-table th:nth-child(4) { text-align: center; } | |
| .detail-area { margin-top: 20px; padding: 20px; border: 1px solid #ddd; border-radius: 5px; } | |
| """ | |
| custom_js = """ | |
| function showDetail(paperId) { | |
| // 隐藏 smartflow-paper-paperId 的所有兄弟节点 | |
| var siblings = document.querySelectorAll(`div[id^='smartflow-paper-']:not(#smartflow-paper-${paperId.replace('.', '-')})`); | |
| siblings.forEach(sibling => sibling.style.display = 'none'); | |
| // 显示当前节点 | |
| var paper = document.getElementById(`smartflow-paper-${paperId.replace('.', '-')}`); | |
| if (paper) { | |
| paper.style.display = 'block'; | |
| } | |
| } | |
| """ | |
| def create_interface(): | |
| """创建新的界面布局""" | |
| with gr.Blocks(title="Hugging Face Daily Paper", css=custom_css, head=f"<script>{custom_js}</script>") as app: | |
| # 主界面 | |
| gr.Markdown("# 📚 Hugging Face Daily Paper") | |
| # 查询控制区 | |
| with gr.Row(): | |
| start_date = gr.Textbox(label="起始日期", placeholder="YYYY-MM-DD", value=datetime.now().strftime("%Y-%m-%d")) | |
| end_date = gr.Textbox(label="结束日期", placeholder="YYYY-MM-DD", value=datetime.now().strftime("%Y-%m-%d")) | |
| query_btn = gr.Button("🔍 查询", variant="primary") | |
| # 结果显示区 | |
| with gr.Column(visible=True): | |
| results_html = gr.HTML(label="查询结果") | |
| # 论文详情区 | |
| with gr.Column(visible=True, elem_classes="detail-area"): | |
| gr.Markdown("## 论文详情") | |
| detail_html = gr.HTML(elem_id="detail-html") | |
| # 事件处理 | |
| query_btn.click( | |
| fn=query_papers, | |
| inputs=[start_date, end_date], | |
| outputs=[results_html, detail_html] | |
| ) | |
| return app | |
| if __name__ == "__main__": | |
| gr.close_all() | |
| app = create_interface() | |
| app.launch( | |
| # server_name="localhost", | |
| # server_port=7860, | |
| # share=True | |
| ) | |