| """ |
| Search MCP Client - Find migration guides and documentation using Tavily MCP server. |
| """ |
|
|
| import logging |
| from typing import List, Dict, Optional |
| from mcp import ClientSession |
| from mcp.client.stdio import stdio_client |
|
|
| logger = logging.getLogger(__name__) |
|
|
|
|
| class SearchMCPClient: |
| """ |
| Client for Tavily Search MCP server to find migration guides and best practices. |
| """ |
| |
| def __init__(self, mcp_manager): |
| """ |
| Initialize Search MCP client. |
| |
| Args: |
| mcp_manager: MCPManager instance |
| """ |
| self.mcp_manager = mcp_manager |
| self.server_name = "tavily" |
| |
| logger.info("SearchMCPClient initialized") |
| |
| async def find_migration_guide(self, from_tech: str, to_tech: str, max_results: int = 5) -> List[Dict]: |
| """ |
| Find migration documentation for technology upgrade. |
| |
| Args: |
| from_tech: Source technology (e.g., "Python 2.7") |
| to_tech: Target technology (e.g., "Python 3.12") |
| max_results: Maximum number of results to return |
| |
| Returns: |
| List of search results with URLs and snippets |
| """ |
| try: |
| server_params = self.mcp_manager.get_server_params(self.server_name) |
| if not server_params: |
| logger.warning(f"{self.server_name} MCP server not registered, returning empty results") |
| return [] |
| |
| query = f"{from_tech} to {to_tech} migration guide best practices" |
| |
| async with stdio_client(server_params) as (read, write): |
| async with ClientSession(read, write) as session: |
| await session.initialize() |
| |
| result = await session.call_tool( |
| "search", |
| arguments={ |
| "query": query, |
| "max_results": max_results |
| } |
| ) |
| |
| |
| results = [] |
| if result and hasattr(result, 'content'): |
| for item in result.content: |
| if hasattr(item, 'text'): |
| results.append({ |
| 'title': item.text.get('title', ''), |
| 'url': item.text.get('url', ''), |
| 'snippet': item.text.get('snippet', ''), |
| 'score': item.text.get('score', 0) |
| }) |
| |
| logger.info(f"Found {len(results)} migration guides for {from_tech} to {to_tech}") |
| return results |
| |
| except Exception as e: |
| logger.error(f"Error finding migration guide: {e}") |
| return [] |
| |
| async def find_library_documentation(self, library_name: str, version: Optional[str] = None) -> List[Dict]: |
| """ |
| Find official documentation for a library. |
| |
| Args: |
| library_name: Name of the library |
| version: Optional specific version |
| |
| Returns: |
| List of documentation links |
| """ |
| try: |
| server_params = self.mcp_manager.get_server_params(self.server_name) |
| if not server_params: |
| logger.warning(f"{self.server_name} MCP server not registered, returning empty results") |
| return [] |
| |
| query = f"{library_name} official documentation" |
| if version: |
| query += f" version {version}" |
| |
| async with stdio_client(server_params) as (read, write): |
| async with ClientSession(read, write) as session: |
| await session.initialize() |
| |
| result = await session.call_tool( |
| "search", |
| arguments={ |
| "query": query, |
| "max_results": 3 |
| } |
| ) |
| |
| results = [] |
| if result and hasattr(result, 'content'): |
| for item in result.content: |
| if hasattr(item, 'text'): |
| results.append({ |
| 'title': item.text.get('title', ''), |
| 'url': item.text.get('url', ''), |
| 'snippet': item.text.get('snippet', '') |
| }) |
| |
| logger.info(f"Found {len(results)} documentation links for {library_name}") |
| return results |
| |
| except Exception as e: |
| logger.error(f"Error finding library documentation: {e}") |
| return [] |
| |
| async def find_best_practices(self, topic: str, language: str = "python") -> List[Dict]: |
| """ |
| Find best practices for a specific topic. |
| |
| Args: |
| topic: Topic to search for (e.g., "database connection pooling") |
| language: Programming language |
| |
| Returns: |
| List of best practice resources |
| """ |
| try: |
| server_params = self.mcp_manager.get_server_params(self.server_name) |
| if not server_params: |
| logger.warning(f"{self.server_name} MCP server not registered, returning empty results") |
| return [] |
| |
| query = f"{language} {topic} best practices 2024" |
| |
| async with stdio_client(server_params) as (read, write): |
| async with ClientSession(read, write) as session: |
| await session.initialize() |
| |
| result = await session.call_tool( |
| "search", |
| arguments={ |
| "query": query, |
| "max_results": 5 |
| } |
| ) |
| |
| results = [] |
| if result and hasattr(result, 'content'): |
| for item in result.content: |
| if hasattr(item, 'text'): |
| results.append({ |
| 'title': item.text.get('title', ''), |
| 'url': item.text.get('url', ''), |
| 'snippet': item.text.get('snippet', '') |
| }) |
| |
| logger.info(f"Found {len(results)} best practice resources for {topic}") |
| return results |
| |
| except Exception as e: |
| logger.error(f"Error finding best practices: {e}") |
| return [] |
| |
| async def find_security_vulnerabilities(self, pattern: str, language: str = "python") -> List[Dict]: |
| """ |
| Find information about security vulnerabilities in a code pattern. |
| |
| Args: |
| pattern: Code pattern to check (e.g., "SQL string interpolation") |
| language: Programming language |
| |
| Returns: |
| List of security resources |
| """ |
| try: |
| server_params = self.mcp_manager.get_server_params(self.server_name) |
| if not server_params: |
| logger.warning(f"{self.server_name} MCP server not registered, returning empty results") |
| return [] |
| |
| query = f"{language} {pattern} security vulnerability CVE" |
| |
| async with stdio_client(server_params) as (read, write): |
| async with ClientSession(read, write) as session: |
| await session.initialize() |
| |
| result = await session.call_tool( |
| "search", |
| arguments={ |
| "query": query, |
| "max_results": 5 |
| } |
| ) |
| |
| results = [] |
| if result and hasattr(result, 'content'): |
| for item in result.content: |
| if hasattr(item, 'text'): |
| results.append({ |
| 'title': item.text.get('title', ''), |
| 'url': item.text.get('url', ''), |
| 'snippet': item.text.get('snippet', ''), |
| 'severity': self._extract_severity(item.text.get('snippet', '')) |
| }) |
| |
| logger.info(f"Found {len(results)} security resources for {pattern}") |
| return results |
| |
| except Exception as e: |
| logger.error(f"Error finding security vulnerabilities: {e}") |
| return [] |
| |
| def _extract_severity(self, text: str) -> str: |
| """ |
| Extract severity level from text. |
| |
| Args: |
| text: Text to analyze |
| |
| Returns: |
| Severity level (critical, high, medium, low, unknown) |
| """ |
| text_lower = text.lower() |
| if 'critical' in text_lower: |
| return 'critical' |
| elif 'high' in text_lower: |
| return 'high' |
| elif 'medium' in text_lower or 'moderate' in text_lower: |
| return 'medium' |
| elif 'low' in text_lower: |
| return 'low' |
| return 'unknown' |
|
|