GitHub Actions
Clean sync from GitHub - no large files in history
aca8ab4
"""
FastMCP server for arXiv paper search and download operations.
Provides MCP-compliant tools via FastMCP framework with auto-start capability.
"""
import os
import logging
from pathlib import Path
from typing import Optional, List, Dict, Any
from datetime import datetime
import arxiv
import threading
import time
import urllib.request
# Import FastMCP
try:
from fastmcp import FastMCP
FASTMCP_AVAILABLE = True
except ImportError:
FASTMCP_AVAILABLE = False
logging.warning("FastMCP not available. Install with: pip install fastmcp")
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
def _extract_pdf_url(result: arxiv.Result) -> Optional[str]:
"""
Extract PDF URL from arxiv.Result, handling arxiv library v2.2.0 breaking change.
In arxiv v2.2.0+, pdf_url attribute is always None. PDF URL is now in links field.
Args:
result: arxiv.Result object
Returns:
PDF URL string or None if not found
"""
# Try legacy pdf_url attribute first (backward compatibility)
if result.pdf_url:
return result.pdf_url
# arxiv v2.2.0+: PDF URL is in links
# Links typically have format:
# [0] abs URL (alternate)
# [1] pdf URL (alternate)
# [2] DOI URL (related)
try:
for link in result.links:
if 'pdf' in link.href.lower():
logger.debug(f"Extracted PDF URL from links: {link.href}")
return link.href
except (AttributeError, TypeError) as e:
logger.warning(f"Error extracting PDF URL from links: {e}")
# Fallback: construct URL from entry_id
# entry_id format: http://arxiv.org/abs/2102.08370v2
try:
paper_id = result.entry_id.split('/')[-1]
fallback_url = f"https://arxiv.org/pdf/{paper_id}"
logger.warning(f"Using fallback PDF URL construction: {fallback_url}")
return fallback_url
except (AttributeError, IndexError) as e:
logger.error(f"Failed to construct fallback PDF URL: {e}")
return None
class ArxivFastMCPServer:
"""FastMCP server for arXiv operations with auto-start capability."""
def __init__(
self,
storage_path: Optional[str] = None,
server_port: int = 5555,
auto_start: bool = True
):
"""
Initialize FastMCP arXiv server.
Args:
storage_path: Directory to store downloaded papers
server_port: Port for FastMCP server (default: 5555)
auto_start: Whether to start server automatically
"""
if not FASTMCP_AVAILABLE:
raise ImportError("FastMCP not installed. Run: pip install fastmcp")
self.storage_path = Path(storage_path or os.getenv("MCP_ARXIV_STORAGE_PATH", "data/mcp_papers"))
self.storage_path.mkdir(parents=True, exist_ok=True)
self.server_port = server_port
# Initialize FastMCP server
self.mcp = FastMCP("arxiv-server")
# Register tools
self._register_tools()
# Server state
self._server_thread = None
self._running = False
logger.info(f"ArxivFastMCPServer initialized with storage: {self.storage_path}")
if auto_start:
self.start()
def _register_tools(self):
"""Register arXiv tools with FastMCP."""
@self.mcp.tool()
def search_papers(
query: str,
max_results: int = 5,
categories: Optional[List[str]] = None,
sort_by: str = "relevance"
) -> Dict[str, Any]:
"""
Search for papers on arXiv.
Args:
query: Search query string
max_results: Maximum number of papers to return (1-50)
categories: Optional list of arXiv category filters (e.g., ['cs.AI'])
sort_by: Sort criterion (relevance, lastUpdatedDate, submittedDate)
Returns:
Dictionary with 'papers' list containing paper metadata
"""
try:
logger.info(f"Searching arXiv: query='{query}', max_results={max_results}")
# Build search query with category filter
search_query = query
if categories:
cat_filter = " OR ".join([f"cat:{cat}" for cat in categories])
search_query = f"({query}) AND ({cat_filter})"
# Map sort_by to arxiv.SortCriterion
sort_map = {
"relevance": arxiv.SortCriterion.Relevance,
"lastUpdatedDate": arxiv.SortCriterion.LastUpdatedDate,
"submittedDate": arxiv.SortCriterion.SubmittedDate
}
sort_criterion = sort_map.get(sort_by, arxiv.SortCriterion.Relevance)
# Create and execute search
search = arxiv.Search(
query=search_query,
max_results=min(max_results, 50),
sort_by=sort_criterion
)
papers = []
for result in search.results():
paper_data = {
"id": result.entry_id.split('/')[-1],
"title": result.title,
"authors": [author.name for author in result.authors],
"summary": result.summary,
"pdf_url": _extract_pdf_url(result),
"published": result.published.isoformat(),
"categories": result.categories
}
papers.append(paper_data)
logger.info(f"Found {len(papers)} papers")
return {"papers": papers, "count": len(papers)}
except Exception as e:
logger.error(f"Error searching arXiv: {str(e)}")
return {"status": "error", "message": str(e), "papers": []}
@self.mcp.tool()
def download_paper(paper_id: str) -> Dict[str, Any]:
"""
Download a paper PDF from arXiv.
Args:
paper_id: arXiv paper ID (e.g., '2401.00001')
Returns:
Dictionary with download status and file path
"""
try:
logger.info(f"Downloading paper: {paper_id}")
# Check if already exists
pdf_path = self.storage_path / f"{paper_id}.pdf"
if pdf_path.exists():
logger.info(f"Paper {paper_id} already cached")
return {
"status": "cached",
"paper_id": paper_id,
"path": str(pdf_path),
"message": "Paper already in storage"
}
# Get paper metadata to get PDF URL
search = arxiv.Search(id_list=[paper_id])
result = next(search.results())
# Extract PDF URL using helper (handles arxiv v2.2.0 breaking change)
pdf_url = _extract_pdf_url(result)
if not pdf_url:
raise ValueError(f"Could not extract PDF URL for paper {paper_id}")
# Download PDF directly using urllib to avoid Path/str mixing issues
headers = {'User-Agent': 'Mozilla/5.0 (FastMCP ArXiv Server)'}
request = urllib.request.Request(pdf_url, headers=headers)
with urllib.request.urlopen(request, timeout=30) as response:
pdf_content = response.read()
# Write using pathlib to avoid any string/Path mixing
pdf_path.write_bytes(pdf_content)
logger.info(f"Successfully downloaded {paper_id} to {pdf_path}")
return {
"status": "success",
"paper_id": paper_id,
"path": str(pdf_path),
"message": f"Downloaded to {pdf_path}"
}
except StopIteration:
error_msg = f"Paper {paper_id} not found on arXiv"
logger.error(error_msg)
return {"status": "error", "paper_id": paper_id, "message": error_msg}
except Exception as e:
error_msg = f"Error downloading paper {paper_id}: {str(e)}"
logger.error(error_msg)
return {"status": "error", "paper_id": paper_id, "message": error_msg}
@self.mcp.tool()
def list_papers() -> Dict[str, Any]:
"""
List all cached papers in storage.
Returns:
Dictionary with list of paper IDs in storage
"""
try:
pdf_files = list(self.storage_path.glob("*.pdf"))
paper_ids = [f.stem for f in pdf_files]
logger.info(f"Found {len(paper_ids)} cached papers")
return {
"papers": paper_ids,
"count": len(paper_ids),
"storage_path": str(self.storage_path)
}
except Exception as e:
logger.error(f"Error listing papers: {str(e)}")
return {"status": "error", "message": str(e), "papers": []}
logger.info("Registered FastMCP tools: search_papers, download_paper, list_papers")
def start(self):
"""Start FastMCP server in background thread."""
if self._running:
logger.warning("Server already running")
return
def run_server():
"""Run FastMCP server with asyncio."""
try:
logger.info(f"Starting FastMCP arXiv server on port {self.server_port}")
self._running = True
# Run FastMCP server with SSE transport using async method
# FastMCP 2.x provides run_sse_async for SSE servers
import asyncio
asyncio.run(self.mcp.run_sse_async(
host="localhost",
port=self.server_port,
log_level="INFO"
))
except Exception as e:
logger.error(f"Error running FastMCP server: {str(e)}", exc_info=True)
self._running = False
# Start server in daemon thread so it doesn't block app shutdown
self._server_thread = threading.Thread(target=run_server, daemon=True)
self._server_thread.start()
# Give server time to start
time.sleep(1)
logger.info("FastMCP arXiv server started in background")
def stop(self):
"""Stop FastMCP server."""
if not self._running:
logger.warning("Server not running")
return
logger.info("Stopping FastMCP arXiv server")
self._running = False
# FastMCP should provide graceful shutdown
# Implementation depends on FastMCP API
if self._server_thread and self._server_thread.is_alive():
# Wait for thread to finish (with timeout)
self._server_thread.join(timeout=5)
logger.info("FastMCP arXiv server stopped")
def is_running(self) -> bool:
"""Check if server is running."""
return self._running and self._server_thread and self._server_thread.is_alive()
def __enter__(self):
"""Context manager entry."""
if not self._running:
self.start()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""Context manager exit."""
self.stop()
def __del__(self):
"""Cleanup on deletion."""
try:
if self._running:
self.stop()
except Exception:
pass
# Singleton instance for application-wide use
_server_instance: Optional[ArxivFastMCPServer] = None
def get_server(
storage_path: Optional[str] = None,
server_port: int = 5555,
auto_start: bool = True
) -> ArxivFastMCPServer:
"""
Get or create singleton FastMCP server instance.
Args:
storage_path: Storage directory for papers
server_port: Port for server
auto_start: Auto-start server if not running
Returns:
ArxivFastMCPServer instance
"""
global _server_instance
if _server_instance is None:
logger.info("Creating new FastMCP server instance")
_server_instance = ArxivFastMCPServer(
storage_path=storage_path,
server_port=server_port,
auto_start=auto_start
)
elif not _server_instance.is_running() and auto_start:
logger.info("Restarting stopped FastMCP server")
_server_instance.start()
return _server_instance
def shutdown_server():
"""Shutdown singleton server instance."""
global _server_instance
if _server_instance:
logger.info("Shutting down FastMCP server")
_server_instance.stop()
_server_instance = None
if __name__ == "__main__":
# Test server in standalone mode
import sys
storage = sys.argv[1] if len(sys.argv) > 1 else "data/mcp_papers"
port = int(sys.argv[2]) if len(sys.argv) > 2 else 5555
logger.info(f"Starting standalone FastMCP arXiv server")
logger.info(f"Storage: {storage}")
logger.info(f"Port: {port}")
server = ArxivFastMCPServer(
storage_path=storage,
server_port=port,
auto_start=True
)
try:
# Keep server running
logger.info("Server running. Press Ctrl+C to stop.")
while True:
time.sleep(1)
except KeyboardInterrupt:
logger.info("Shutting down server...")
server.stop()