| | """ |
| | arXiv MCP client wrapper for accessing arXiv papers via Model Context Protocol. |
| | Uses in-process handler calls instead of subprocess stdio protocol. |
| | """ |
| | import os |
| | import logging |
| | import sys |
| | from typing import List, Optional, Any, Dict |
| | from pathlib import Path |
| | from datetime import datetime |
| | from tenacity import retry, stop_after_attempt, wait_exponential |
| | import json |
| | import asyncio |
| | import nest_asyncio |
| | import urllib.request |
| | import urllib.error |
| |
|
| | from utils.schemas import Paper |
| |
|
| | |
| | |
| |
|
| | logging.basicConfig( |
| | level=logging.INFO, |
| | format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' |
| | ) |
| | logger = logging.getLogger(__name__) |
| |
|
| |
|
| | class MCPArxivClient: |
| | """Wrapper for arXiv MCP server using direct in-process handler calls.""" |
| |
|
| | |
| | _handlers_imported = False |
| | handle_search = None |
| | handle_download = None |
| | handle_list_papers = None |
| |
|
| | @classmethod |
| | def _import_handlers(cls): |
| | """Import MCP handlers once at class level.""" |
| | if not cls._handlers_imported: |
| | from arxiv_mcp_server.tools import handle_search, handle_download, handle_list_papers |
| | cls.handle_search = handle_search |
| | cls.handle_download = handle_download |
| | cls.handle_list_papers = handle_list_papers |
| | cls._handlers_imported = True |
| |
|
| | def __init__(self, storage_path: Optional[str] = None): |
| | """ |
| | Initialize MCP arXiv client with in-process handlers. |
| | |
| | Args: |
| | storage_path: Path where papers are stored (reads from env if not provided) |
| | """ |
| | self.storage_path = Path(storage_path or os.getenv("MCP_ARXIV_STORAGE_PATH", "data/mcp_papers")) |
| | self.storage_path.mkdir(parents=True, exist_ok=True) |
| |
|
| | |
| | self._original_argv = sys.argv.copy() |
| | if not self._handlers_imported: |
| | |
| | if "--storage-path" not in sys.argv: |
| | sys.argv.extend(["--storage-path", str(self.storage_path.resolve())]) |
| | logger.debug(f"Set sys.argv storage path: {self.storage_path.resolve()}") |
| |
|
| | |
| | self._import_handlers() |
| |
|
| | |
| | from arxiv_mcp_server.config import Settings as MCPSettings |
| | import arxiv_mcp_server.tools.download as download_module |
| |
|
| | |
| | |
| | if hasattr(download_module, 'settings'): |
| | |
| | logger.debug(f"Updating download module settings storage path") |
| |
|
| | logger.info(f"MCPArxivClient initialized with in-process handlers") |
| | logger.info(f"Storage path: {self.storage_path.resolve()}") |
| |
|
| | |
| | existing_files = list(self.storage_path.glob("*.pdf")) |
| | logger.info(f"Storage directory contains {len(existing_files)} existing PDF files") |
| |
|
| | async def _call_handler_async(self, handler_func, arguments: Dict[str, Any], handler_name: str) -> Any: |
| | """ |
| | Call an MCP handler function directly and return parsed result. |
| | |
| | Args: |
| | handler_func: The async handler function to call |
| | arguments: Handler arguments as dictionary |
| | handler_name: Name of handler (for logging) |
| | |
| | Returns: |
| | Parsed handler result (dict or list) |
| | |
| | Raises: |
| | Exception: If handler call fails |
| | """ |
| | try: |
| | logger.debug(f"Calling {handler_name} with arguments: {arguments}") |
| |
|
| | |
| | result = await handler_func(arguments) |
| |
|
| | |
| | if result and len(result) > 0: |
| | text_content = result[0].text |
| | logger.debug(f"Raw {handler_name} response: {text_content[:200]}...") |
| |
|
| | |
| | try: |
| | parsed_data = json.loads(text_content) |
| | logger.debug(f"Parsed {handler_name} response type: {type(parsed_data)}") |
| |
|
| | |
| | if isinstance(parsed_data, dict) and "error" in parsed_data: |
| | logger.error(f"{handler_name} returned error: {parsed_data['error']}") |
| |
|
| | return parsed_data |
| | except json.JSONDecodeError: |
| | logger.warning(f"Could not parse {handler_name} response as JSON: {text_content[:200]}") |
| | return text_content |
| | else: |
| | logger.warning(f"{handler_name} returned empty result") |
| | return {} |
| |
|
| | except Exception as e: |
| | logger.error(f"Error calling {handler_name}: {str(e)}") |
| | raise |
| |
|
| | def _download_from_arxiv_direct(self, paper: Paper) -> Optional[Path]: |
| | """ |
| | Fallback method to download PDF directly from arXiv. |
| | Used when MCP server download fails or file is not accessible. |
| | |
| | Args: |
| | paper: Paper object |
| | |
| | Returns: |
| | Path to downloaded PDF, or None if download fails |
| | """ |
| | try: |
| | pdf_path = self.storage_path / f"{paper.arxiv_id}.pdf" |
| |
|
| | logger.info(f"Attempting direct download from arXiv for {paper.arxiv_id}") |
| | logger.debug(f"PDF URL: {paper.pdf_url}") |
| |
|
| | |
| | headers = {'User-Agent': 'Mozilla/5.0 (Research Paper Analysis System)'} |
| | request = urllib.request.Request(paper.pdf_url, headers=headers) |
| |
|
| | with urllib.request.urlopen(request, timeout=30) as response: |
| | pdf_content = response.read() |
| |
|
| | |
| | pdf_path.write_bytes(pdf_content) |
| | logger.info(f"Successfully downloaded {len(pdf_content)} bytes to {pdf_path}") |
| |
|
| | return pdf_path |
| |
|
| | except urllib.error.HTTPError as e: |
| | logger.error(f"HTTP error downloading from arXiv: {e.code} {e.reason}") |
| | return None |
| | except urllib.error.URLError as e: |
| | logger.error(f"URL error downloading from arXiv: {str(e)}") |
| | return None |
| | except Exception as e: |
| | logger.error(f"Unexpected error in direct arXiv download: {str(e)}", exc_info=True) |
| | return None |
| |
|
| |
|
| | def _parse_mcp_paper(self, paper_data: Dict[str, Any]) -> Paper: |
| | """ |
| | Convert MCP tool response to Paper object with robust type validation. |
| | |
| | Args: |
| | paper_data: Paper data from MCP tool |
| | |
| | Returns: |
| | Paper object with validated and normalized fields |
| | |
| | Raises: |
| | Exception: If critical fields are missing or invalid |
| | """ |
| | try: |
| | |
| | |
| | arxiv_id = paper_data.get("id") or paper_data.get("arxiv_id", "") |
| | if not arxiv_id: |
| | raise ValueError("Missing required field: arxiv_id") |
| |
|
| | |
| | published_str = paper_data.get("published", "") |
| | if isinstance(published_str, str): |
| | try: |
| | published = datetime.fromisoformat(published_str.replace('Z', '+00:00')) |
| | except Exception as e: |
| | logger.warning(f"Failed to parse published date '{published_str}': {e}, using current time") |
| | published = datetime.now() |
| | elif isinstance(published_str, datetime): |
| | published = published_str |
| | else: |
| | logger.warning(f"Published field has unexpected type: {type(published_str)}, using current time") |
| | published = datetime.now() |
| |
|
| | |
| | authors_raw = paper_data.get("authors", []) |
| | if isinstance(authors_raw, list): |
| | |
| | authors = [str(author) if not isinstance(author, str) else author for author in authors_raw] |
| | elif isinstance(authors_raw, dict): |
| | |
| | logger.warning(f"Authors field is dict for paper {arxiv_id}: {authors_raw}") |
| | if 'names' in authors_raw: |
| | authors = authors_raw['names'] if isinstance(authors_raw['names'], list) else [str(authors_raw['names'])] |
| | else: |
| | authors = [str(val) for val in authors_raw.values() if val] |
| | elif isinstance(authors_raw, str): |
| | authors = [authors_raw] |
| | else: |
| | logger.warning(f"Unexpected authors format for paper {arxiv_id}: {type(authors_raw)}") |
| | authors = [] |
| |
|
| | |
| | categories_raw = paper_data.get("categories", []) |
| | if isinstance(categories_raw, list): |
| | |
| | categories = [str(cat) if not isinstance(cat, str) else cat for cat in categories_raw] |
| | elif isinstance(categories_raw, dict): |
| | |
| | logger.warning(f"Categories field is dict for paper {arxiv_id}: {categories_raw}") |
| | if 'categories' in categories_raw: |
| | categories = categories_raw['categories'] if isinstance(categories_raw['categories'], list) else [str(categories_raw['categories'])] |
| | else: |
| | categories = [str(val) for val in categories_raw.values() if val] |
| | elif isinstance(categories_raw, str): |
| | categories = [categories_raw] |
| | else: |
| | logger.warning(f"Unexpected categories format for paper {arxiv_id}: {type(categories_raw)}") |
| | categories = [] |
| |
|
| | |
| | title_raw = paper_data.get("title", "") |
| | if isinstance(title_raw, dict): |
| | logger.warning(f"Title field is dict for paper {arxiv_id}: {title_raw}") |
| | title = title_raw.get("title") or str(title_raw) |
| | else: |
| | title = str(title_raw) if title_raw else "" |
| |
|
| | |
| | abstract_raw = paper_data.get("summary") or paper_data.get("abstract", "") |
| | if isinstance(abstract_raw, dict): |
| | logger.warning(f"Abstract field is dict for paper {arxiv_id}: {abstract_raw}") |
| | abstract = abstract_raw.get("abstract") or abstract_raw.get("summary") or str(abstract_raw) |
| | else: |
| | abstract = str(abstract_raw) if abstract_raw else "" |
| |
|
| | |
| | pdf_url_raw = paper_data.get("pdf_url") |
| | if pdf_url_raw: |
| | if isinstance(pdf_url_raw, dict): |
| | logger.warning(f"pdf_url field is dict for paper {arxiv_id}: {pdf_url_raw}") |
| | pdf_url = pdf_url_raw.get("url") or pdf_url_raw.get("pdf_url") or f"https://arxiv.org/pdf/{arxiv_id}.pdf" |
| | else: |
| | pdf_url = str(pdf_url_raw) |
| | else: |
| | pdf_url = f"https://arxiv.org/pdf/{arxiv_id}.pdf" |
| |
|
| | |
| | |
| | paper = Paper( |
| | arxiv_id=arxiv_id, |
| | title=title, |
| | authors=authors, |
| | abstract=abstract, |
| | pdf_url=pdf_url, |
| | published=published, |
| | categories=categories |
| | ) |
| |
|
| | logger.debug(f"Successfully parsed paper {arxiv_id}: {len(authors)} authors, {len(categories)} categories") |
| | return paper |
| |
|
| | except Exception as e: |
| | logger.error(f"Error parsing MCP paper data: {str(e)}") |
| | logger.error(f"Raw paper data: {paper_data}") |
| | raise |
| |
|
| | @retry( |
| | stop=stop_after_attempt(3), |
| | wait=wait_exponential(multiplier=1, min=4, max=10) |
| | ) |
| | async def search_papers_async( |
| | self, |
| | query: str, |
| | max_results: int = 5, |
| | category: Optional[str] = None, |
| | sort_by: str = "relevance" |
| | ) -> List[Paper]: |
| | """ |
| | Search for papers on arXiv using direct MCP handler calls. |
| | |
| | Args: |
| | query: Search query |
| | max_results: Maximum number of papers to return |
| | category: Optional arXiv category filter (e.g., 'cs.AI') |
| | sort_by: Sort criterion (relevance, lastUpdatedDate, submittedDate) |
| | |
| | Returns: |
| | List of Paper objects |
| | |
| | Raises: |
| | Exception: If handler call fails after retries |
| | """ |
| | try: |
| | logger.info(f"Searching arXiv via MCP for: {query}") |
| |
|
| | |
| | search_args = { |
| | "query": query, |
| | "max_results": max_results, |
| | "sort_by": sort_by |
| | } |
| |
|
| | |
| | if category: |
| | search_args["categories"] = [category] |
| |
|
| | |
| | result = await self._call_handler_async(MCPArxivClient.handle_search, search_args, "handle_search") |
| |
|
| | |
| | papers = [] |
| | if isinstance(result, dict): |
| | paper_list = result.get("papers", []) |
| | elif isinstance(result, list): |
| | paper_list = result |
| | else: |
| | logger.warning(f"Unexpected result format: {type(result)}") |
| | paper_list = [] |
| |
|
| | for paper_data in paper_list: |
| | try: |
| | paper = self._parse_mcp_paper(paper_data) |
| | papers.append(paper) |
| | except Exception as e: |
| | logger.warning(f"Failed to parse paper: {str(e)}") |
| | continue |
| |
|
| | logger.info(f"Found {len(papers)} papers via MCP") |
| | return papers |
| |
|
| | except Exception as e: |
| | logger.error(f"Error searching arXiv via MCP: {str(e)}") |
| | raise |
| |
|
| | def search_papers( |
| | self, |
| | query: str, |
| | max_results: int = 5, |
| | category: Optional[str] = None, |
| | sort_by: str = "relevance" |
| | ) -> List[Paper]: |
| | """ |
| | Synchronous wrapper for search_papers_async. |
| | |
| | Args: |
| | query: Search query |
| | max_results: Maximum number of papers to return |
| | category: Optional arXiv category filter |
| | sort_by: Sort criterion |
| | |
| | Returns: |
| | List of Paper objects |
| | """ |
| | import asyncio |
| | import nest_asyncio |
| |
|
| | |
| | try: |
| | loop = asyncio.get_event_loop() |
| | |
| | if loop.is_closed(): |
| | |
| | loop = asyncio.new_event_loop() |
| | asyncio.set_event_loop(loop) |
| | except RuntimeError: |
| | |
| | loop = asyncio.new_event_loop() |
| | asyncio.set_event_loop(loop) |
| |
|
| | |
| | nest_asyncio.apply(loop) |
| |
|
| | return loop.run_until_complete( |
| | self.search_papers_async(query, max_results, category, sort_by) |
| | ) |
| |
|
| | @retry( |
| | stop=stop_after_attempt(3), |
| | wait=wait_exponential(multiplier=1, min=4, max=10) |
| | ) |
| | async def download_paper_async(self, paper: Paper) -> Optional[Path]: |
| | """ |
| | Download paper PDF using direct MCP handler calls. |
| | |
| | The MCP server downloads PDFs and converts to Markdown, but we only need the PDF. |
| | With in-process handlers, we can access the PDF directly from storage. |
| | |
| | Args: |
| | paper: Paper object |
| | |
| | Returns: |
| | Path to downloaded PDF, or None if download fails |
| | """ |
| | try: |
| | |
| | pdf_path = self.storage_path / f"{paper.arxiv_id}.pdf" |
| |
|
| | |
| | if pdf_path.exists(): |
| | logger.info(f"Paper {paper.arxiv_id} already in storage") |
| | return pdf_path |
| |
|
| | logger.info(f"Downloading paper {paper.arxiv_id} via MCP handler") |
| | logger.debug(f"Expected download path: {pdf_path}") |
| |
|
| | |
| | result = await self._call_handler_async( |
| | MCPArxivClient.handle_download, |
| | {"paper_id": paper.arxiv_id}, |
| | "handle_download" |
| | ) |
| |
|
| | |
| | logger.debug(f"MCP download response: {result}") |
| |
|
| | |
| | if isinstance(result, dict): |
| | if result.get("status") == "error": |
| | error_msg = result.get("message", "Unknown error") |
| | logger.error(f"MCP download failed for {paper.arxiv_id}: {error_msg}") |
| | |
| | return self._download_from_arxiv_direct(paper) |
| |
|
| | |
| | |
| | if pdf_path.exists(): |
| | logger.info(f"Successfully downloaded paper to {pdf_path}") |
| | return pdf_path |
| |
|
| | |
| | storage_files = list(self.storage_path.glob("*.pdf")) |
| | matching_files = [f for f in storage_files if paper.arxiv_id in f.name] |
| | if matching_files: |
| | found_file = matching_files[0] |
| | logger.info(f"Found downloaded file: {found_file}") |
| | return found_file |
| |
|
| | |
| | logger.warning(f"MCP download completed but PDF not found for {paper.arxiv_id}") |
| | logger.warning("Falling back to direct arXiv download...") |
| | return self._download_from_arxiv_direct(paper) |
| |
|
| | except Exception as e: |
| | logger.error(f"Error downloading paper {paper.arxiv_id} via MCP: {str(e)}", exc_info=True) |
| | logger.warning("Attempting direct arXiv download as fallback...") |
| | return self._download_from_arxiv_direct(paper) |
| |
|
| | def download_paper(self, paper: Paper) -> Optional[Path]: |
| | """ |
| | Synchronous wrapper for download_paper_async. |
| | |
| | Args: |
| | paper: Paper object |
| | |
| | Returns: |
| | Path to downloaded PDF |
| | """ |
| | import asyncio |
| | import nest_asyncio |
| |
|
| | |
| | try: |
| | loop = asyncio.get_event_loop() |
| | |
| | if loop.is_closed(): |
| | |
| | loop = asyncio.new_event_loop() |
| | asyncio.set_event_loop(loop) |
| | except RuntimeError: |
| | |
| | loop = asyncio.new_event_loop() |
| | asyncio.set_event_loop(loop) |
| |
|
| | |
| | nest_asyncio.apply(loop) |
| |
|
| | return loop.run_until_complete(self.download_paper_async(paper)) |
| |
|
| | def download_papers(self, papers: List[Paper]) -> List[Path]: |
| | """ |
| | Download multiple papers. |
| | |
| | Args: |
| | papers: List of Paper objects |
| | |
| | Returns: |
| | List of Paths to downloaded PDFs |
| | """ |
| | paths = [] |
| | for paper in papers: |
| | path = self.download_paper(paper) |
| | if path: |
| | paths.append(path) |
| | return paths |
| |
|
| | async def get_cached_papers_async(self) -> List[Path]: |
| | """ |
| | Get list of cached paper PDFs using direct MCP handler calls. |
| | |
| | Returns: |
| | List of Paths to cached PDFs |
| | """ |
| | try: |
| | |
| | result = await self._call_handler_async(MCPArxivClient.handle_list_papers, {}, "handle_list_papers") |
| |
|
| | |
| | if isinstance(result, dict): |
| | paper_ids = result.get("papers", []) |
| | elif isinstance(result, list): |
| | paper_ids = result |
| | else: |
| | logger.warning("Unexpected format from list_papers") |
| | paper_ids = [] |
| |
|
| | |
| | paths = [self.storage_path / f"{pid}.pdf" for pid in paper_ids |
| | if (self.storage_path / f"{pid}.pdf").exists()] |
| |
|
| | return paths |
| | except Exception as e: |
| | logger.warning(f"Error listing cached papers via MCP: {str(e)}") |
| | |
| | return list(self.storage_path.glob("*.pdf")) |
| |
|
| | def get_cached_papers(self) -> List[Path]: |
| | """ |
| | Synchronous wrapper for get_cached_papers_async. |
| | |
| | Returns: |
| | List of Paths to cached PDFs |
| | """ |
| | import asyncio |
| | import nest_asyncio |
| |
|
| | |
| | try: |
| | loop = asyncio.get_event_loop() |
| | |
| | if loop.is_closed(): |
| | |
| | loop = asyncio.new_event_loop() |
| | asyncio.set_event_loop(loop) |
| | except RuntimeError: |
| | |
| | loop = asyncio.new_event_loop() |
| | asyncio.set_event_loop(loop) |
| |
|
| | |
| | nest_asyncio.apply(loop) |
| |
|
| | return loop.run_until_complete(self.get_cached_papers_async()) |
| |
|
| | def __del__(self): |
| | """Cleanup on deletion - restore original sys.argv.""" |
| | try: |
| | |
| | sys.argv = self._original_argv |
| | except Exception: |
| | pass |
| |
|