| | """ |
| | FastMCP server for arXiv paper search and download operations. |
| | Provides MCP-compliant tools via FastMCP framework with auto-start capability. |
| | """ |
| | import os |
| | import logging |
| | from pathlib import Path |
| | from typing import Optional, List, Dict, Any |
| | from datetime import datetime |
| | import arxiv |
| | import threading |
| | import time |
| | import urllib.request |
| |
|
| | |
| | try: |
| | from fastmcp import FastMCP |
| | FASTMCP_AVAILABLE = True |
| | except ImportError: |
| | FASTMCP_AVAILABLE = False |
| | logging.warning("FastMCP not available. Install with: pip install fastmcp") |
| |
|
| | logging.basicConfig( |
| | level=logging.INFO, |
| | format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' |
| | ) |
| | logger = logging.getLogger(__name__) |
| |
|
| |
|
| | def _extract_pdf_url(result: arxiv.Result) -> Optional[str]: |
| | """ |
| | Extract PDF URL from arxiv.Result, handling arxiv library v2.2.0 breaking change. |
| | |
| | In arxiv v2.2.0+, pdf_url attribute is always None. PDF URL is now in links field. |
| | |
| | Args: |
| | result: arxiv.Result object |
| | |
| | Returns: |
| | PDF URL string or None if not found |
| | """ |
| | |
| | if result.pdf_url: |
| | return result.pdf_url |
| |
|
| | |
| | |
| | |
| | |
| | |
| | try: |
| | for link in result.links: |
| | if 'pdf' in link.href.lower(): |
| | logger.debug(f"Extracted PDF URL from links: {link.href}") |
| | return link.href |
| | except (AttributeError, TypeError) as e: |
| | logger.warning(f"Error extracting PDF URL from links: {e}") |
| |
|
| | |
| | |
| | try: |
| | paper_id = result.entry_id.split('/')[-1] |
| | fallback_url = f"https://arxiv.org/pdf/{paper_id}" |
| | logger.warning(f"Using fallback PDF URL construction: {fallback_url}") |
| | return fallback_url |
| | except (AttributeError, IndexError) as e: |
| | logger.error(f"Failed to construct fallback PDF URL: {e}") |
| | return None |
| |
|
| |
|
| | class ArxivFastMCPServer: |
| | """FastMCP server for arXiv operations with auto-start capability.""" |
| |
|
| | def __init__( |
| | self, |
| | storage_path: Optional[str] = None, |
| | server_port: int = 5555, |
| | auto_start: bool = True |
| | ): |
| | """ |
| | Initialize FastMCP arXiv server. |
| | |
| | Args: |
| | storage_path: Directory to store downloaded papers |
| | server_port: Port for FastMCP server (default: 5555) |
| | auto_start: Whether to start server automatically |
| | """ |
| | if not FASTMCP_AVAILABLE: |
| | raise ImportError("FastMCP not installed. Run: pip install fastmcp") |
| |
|
| | self.storage_path = Path(storage_path or os.getenv("MCP_ARXIV_STORAGE_PATH", "data/mcp_papers")) |
| | self.storage_path.mkdir(parents=True, exist_ok=True) |
| | self.server_port = server_port |
| |
|
| | |
| | self.mcp = FastMCP("arxiv-server") |
| |
|
| | |
| | self._register_tools() |
| |
|
| | |
| | self._server_thread = None |
| | self._running = False |
| |
|
| | logger.info(f"ArxivFastMCPServer initialized with storage: {self.storage_path}") |
| |
|
| | if auto_start: |
| | self.start() |
| |
|
| | def _register_tools(self): |
| | """Register arXiv tools with FastMCP.""" |
| |
|
| | @self.mcp.tool() |
| | def search_papers( |
| | query: str, |
| | max_results: int = 5, |
| | categories: Optional[List[str]] = None, |
| | sort_by: str = "relevance" |
| | ) -> Dict[str, Any]: |
| | """ |
| | Search for papers on arXiv. |
| | |
| | Args: |
| | query: Search query string |
| | max_results: Maximum number of papers to return (1-50) |
| | categories: Optional list of arXiv category filters (e.g., ['cs.AI']) |
| | sort_by: Sort criterion (relevance, lastUpdatedDate, submittedDate) |
| | |
| | Returns: |
| | Dictionary with 'papers' list containing paper metadata |
| | """ |
| | try: |
| | logger.info(f"Searching arXiv: query='{query}', max_results={max_results}") |
| |
|
| | |
| | search_query = query |
| | if categories: |
| | cat_filter = " OR ".join([f"cat:{cat}" for cat in categories]) |
| | search_query = f"({query}) AND ({cat_filter})" |
| |
|
| | |
| | sort_map = { |
| | "relevance": arxiv.SortCriterion.Relevance, |
| | "lastUpdatedDate": arxiv.SortCriterion.LastUpdatedDate, |
| | "submittedDate": arxiv.SortCriterion.SubmittedDate |
| | } |
| | sort_criterion = sort_map.get(sort_by, arxiv.SortCriterion.Relevance) |
| |
|
| | |
| | search = arxiv.Search( |
| | query=search_query, |
| | max_results=min(max_results, 50), |
| | sort_by=sort_criterion |
| | ) |
| |
|
| | papers = [] |
| | for result in search.results(): |
| | paper_data = { |
| | "id": result.entry_id.split('/')[-1], |
| | "title": result.title, |
| | "authors": [author.name for author in result.authors], |
| | "summary": result.summary, |
| | "pdf_url": _extract_pdf_url(result), |
| | "published": result.published.isoformat(), |
| | "categories": result.categories |
| | } |
| | papers.append(paper_data) |
| |
|
| | logger.info(f"Found {len(papers)} papers") |
| | return {"papers": papers, "count": len(papers)} |
| |
|
| | except Exception as e: |
| | logger.error(f"Error searching arXiv: {str(e)}") |
| | return {"status": "error", "message": str(e), "papers": []} |
| |
|
| | @self.mcp.tool() |
| | def download_paper(paper_id: str) -> Dict[str, Any]: |
| | """ |
| | Download a paper PDF from arXiv. |
| | |
| | Args: |
| | paper_id: arXiv paper ID (e.g., '2401.00001') |
| | |
| | Returns: |
| | Dictionary with download status and file path |
| | """ |
| | try: |
| | logger.info(f"Downloading paper: {paper_id}") |
| |
|
| | |
| | pdf_path = self.storage_path / f"{paper_id}.pdf" |
| | if pdf_path.exists(): |
| | logger.info(f"Paper {paper_id} already cached") |
| | return { |
| | "status": "cached", |
| | "paper_id": paper_id, |
| | "path": str(pdf_path), |
| | "message": "Paper already in storage" |
| | } |
| |
|
| | |
| | search = arxiv.Search(id_list=[paper_id]) |
| | result = next(search.results()) |
| |
|
| | |
| | pdf_url = _extract_pdf_url(result) |
| | if not pdf_url: |
| | raise ValueError(f"Could not extract PDF URL for paper {paper_id}") |
| |
|
| | |
| | headers = {'User-Agent': 'Mozilla/5.0 (FastMCP ArXiv Server)'} |
| | request = urllib.request.Request(pdf_url, headers=headers) |
| |
|
| | with urllib.request.urlopen(request, timeout=30) as response: |
| | pdf_content = response.read() |
| |
|
| | |
| | pdf_path.write_bytes(pdf_content) |
| |
|
| | logger.info(f"Successfully downloaded {paper_id} to {pdf_path}") |
| | return { |
| | "status": "success", |
| | "paper_id": paper_id, |
| | "path": str(pdf_path), |
| | "message": f"Downloaded to {pdf_path}" |
| | } |
| |
|
| | except StopIteration: |
| | error_msg = f"Paper {paper_id} not found on arXiv" |
| | logger.error(error_msg) |
| | return {"status": "error", "paper_id": paper_id, "message": error_msg} |
| | except Exception as e: |
| | error_msg = f"Error downloading paper {paper_id}: {str(e)}" |
| | logger.error(error_msg) |
| | return {"status": "error", "paper_id": paper_id, "message": error_msg} |
| |
|
| | @self.mcp.tool() |
| | def list_papers() -> Dict[str, Any]: |
| | """ |
| | List all cached papers in storage. |
| | |
| | Returns: |
| | Dictionary with list of paper IDs in storage |
| | """ |
| | try: |
| | pdf_files = list(self.storage_path.glob("*.pdf")) |
| | paper_ids = [f.stem for f in pdf_files] |
| |
|
| | logger.info(f"Found {len(paper_ids)} cached papers") |
| | return { |
| | "papers": paper_ids, |
| | "count": len(paper_ids), |
| | "storage_path": str(self.storage_path) |
| | } |
| | except Exception as e: |
| | logger.error(f"Error listing papers: {str(e)}") |
| | return {"status": "error", "message": str(e), "papers": []} |
| |
|
| | logger.info("Registered FastMCP tools: search_papers, download_paper, list_papers") |
| |
|
| | def start(self): |
| | """Start FastMCP server in background thread.""" |
| | if self._running: |
| | logger.warning("Server already running") |
| | return |
| |
|
| | def run_server(): |
| | """Run FastMCP server with asyncio.""" |
| | try: |
| | logger.info(f"Starting FastMCP arXiv server on port {self.server_port}") |
| | self._running = True |
| |
|
| | |
| | |
| | import asyncio |
| | asyncio.run(self.mcp.run_sse_async( |
| | host="localhost", |
| | port=self.server_port, |
| | log_level="INFO" |
| | )) |
| |
|
| | except Exception as e: |
| | logger.error(f"Error running FastMCP server: {str(e)}", exc_info=True) |
| | self._running = False |
| |
|
| | |
| | self._server_thread = threading.Thread(target=run_server, daemon=True) |
| | self._server_thread.start() |
| |
|
| | |
| | time.sleep(1) |
| | logger.info("FastMCP arXiv server started in background") |
| |
|
| | def stop(self): |
| | """Stop FastMCP server.""" |
| | if not self._running: |
| | logger.warning("Server not running") |
| | return |
| |
|
| | logger.info("Stopping FastMCP arXiv server") |
| | self._running = False |
| |
|
| | |
| | |
| | if self._server_thread and self._server_thread.is_alive(): |
| | |
| | self._server_thread.join(timeout=5) |
| |
|
| | logger.info("FastMCP arXiv server stopped") |
| |
|
| | def is_running(self) -> bool: |
| | """Check if server is running.""" |
| | return self._running and self._server_thread and self._server_thread.is_alive() |
| |
|
| | def __enter__(self): |
| | """Context manager entry.""" |
| | if not self._running: |
| | self.start() |
| | return self |
| |
|
| | def __exit__(self, exc_type, exc_val, exc_tb): |
| | """Context manager exit.""" |
| | self.stop() |
| |
|
| | def __del__(self): |
| | """Cleanup on deletion.""" |
| | try: |
| | if self._running: |
| | self.stop() |
| | except Exception: |
| | pass |
| |
|
| |
|
| | |
| | _server_instance: Optional[ArxivFastMCPServer] = None |
| |
|
| |
|
| | def get_server( |
| | storage_path: Optional[str] = None, |
| | server_port: int = 5555, |
| | auto_start: bool = True |
| | ) -> ArxivFastMCPServer: |
| | """ |
| | Get or create singleton FastMCP server instance. |
| | |
| | Args: |
| | storage_path: Storage directory for papers |
| | server_port: Port for server |
| | auto_start: Auto-start server if not running |
| | |
| | Returns: |
| | ArxivFastMCPServer instance |
| | """ |
| | global _server_instance |
| |
|
| | if _server_instance is None: |
| | logger.info("Creating new FastMCP server instance") |
| | _server_instance = ArxivFastMCPServer( |
| | storage_path=storage_path, |
| | server_port=server_port, |
| | auto_start=auto_start |
| | ) |
| | elif not _server_instance.is_running() and auto_start: |
| | logger.info("Restarting stopped FastMCP server") |
| | _server_instance.start() |
| |
|
| | return _server_instance |
| |
|
| |
|
| | def shutdown_server(): |
| | """Shutdown singleton server instance.""" |
| | global _server_instance |
| |
|
| | if _server_instance: |
| | logger.info("Shutting down FastMCP server") |
| | _server_instance.stop() |
| | _server_instance = None |
| |
|
| |
|
| | if __name__ == "__main__": |
| | |
| | import sys |
| |
|
| | storage = sys.argv[1] if len(sys.argv) > 1 else "data/mcp_papers" |
| | port = int(sys.argv[2]) if len(sys.argv) > 2 else 5555 |
| |
|
| | logger.info(f"Starting standalone FastMCP arXiv server") |
| | logger.info(f"Storage: {storage}") |
| | logger.info(f"Port: {port}") |
| |
|
| | server = ArxivFastMCPServer( |
| | storage_path=storage, |
| | server_port=port, |
| | auto_start=True |
| | ) |
| |
|
| | try: |
| | |
| | logger.info("Server running. Press Ctrl+C to stop.") |
| | while True: |
| | time.sleep(1) |
| | except KeyboardInterrupt: |
| | logger.info("Shutting down server...") |
| | server.stop() |
| |
|