Spaces:
Sleeping
Sleeping
Furqan Ahmad Rao
feat: implement in-memory ZIP creation and enhance file management functionality
cfda7db | import os | |
| from typing import Dict, Any, List, Optional | |
| import time | |
| import requests | |
| def _base_url(env_key: str, default: str) -> str: | |
| return os.getenv(env_key, default).rstrip("/") | |
| def _retry_request(func, max_retries: int = 3, timeout: int = 10): | |
| """ | |
| Retry wrapper with exponential backoff for HTTP requests. | |
| Args: | |
| func: Function that makes the HTTP request | |
| max_retries: Maximum number of retry attempts | |
| timeout: Request timeout in seconds | |
| Returns: | |
| Response dict or error dict | |
| """ | |
| for attempt in range(max_retries): | |
| try: | |
| return func(timeout=timeout) | |
| except (requests.exceptions.Timeout, | |
| requests.exceptions.ConnectionError, | |
| requests.exceptions.RequestException) as e: | |
| if attempt < max_retries - 1: | |
| # Exponential backoff: 1s, 2s, 4s | |
| backoff = 2 ** attempt | |
| print(f"[MCP][RETRY] Attempt {attempt + 1}/{max_retries} failed: {e}. Retrying in {backoff}s...") | |
| time.sleep(backoff) | |
| else: | |
| print(f"[MCP][ERROR] All {max_retries} attempts failed: {e}") | |
| raise | |
| return {"success": False, "message": "Max retries exceeded"} | |
| class FileManagerMCPClient: | |
| """ | |
| HTTP client for file-manager-mcp. | |
| Endpoints (default base http://localhost:8081): | |
| - POST /create_file { filename, content } | |
| - POST /validate_markdown { filename } | |
| - POST /zip_files { source_dir?, zip_path? } | |
| """ | |
| def __init__(self, base_url: Optional[str] = None): | |
| self.base_url = base_url or _base_url("FILE_MANAGER_MCP_URL", "http://localhost:8081") | |
| def create_file(self, filename: str, content: str) -> Dict[str, Any]: | |
| url = f"{self.base_url}/create_file" | |
| payload = {"filename": filename, "content": content} | |
| print(f"[MCP] β file-manager-mcp.create_file {payload['filename']}") | |
| def make_request(timeout): | |
| resp = requests.post(url, json=payload, timeout=timeout) | |
| return resp.json() | |
| try: | |
| return _retry_request(make_request, max_retries=3, timeout=10) | |
| except Exception as e: | |
| print(f"[MCP][ERROR] file-manager-mcp.create_file failed: {e}") | |
| return {"success": False, "path": filename, "message": str(e)} | |
| def validate_markdown(self, filename: str) -> Dict[str, Any]: | |
| url = f"{self.base_url}/validate_markdown" | |
| payload = {"filename": filename} | |
| print(f"[MCP] β file-manager-mcp.validate_markdown {filename}") | |
| def make_request(timeout): | |
| resp = requests.post(url, json=payload, timeout=timeout) | |
| return resp.json() | |
| try: | |
| return _retry_request(make_request, max_retries=3, timeout=10) | |
| except Exception as e: | |
| print(f"[MCP][ERROR] file-manager-mcp.validate_markdown failed: {e}") | |
| return {"success": False, "path": filename, "errors": [str(e)], "message": "Request failed"} | |
| def zip_files(self, source_dir: str = "outputs", zip_path: str = "outputs/mvp_package.zip") -> Dict[str, Any]: | |
| url = f"{self.base_url}/zip_files" | |
| payload = {"source_dir": source_dir, "zip_path": zip_path} | |
| print(f"[MCP] β file-manager-mcp.zip_files {source_dir} -> {zip_path}") | |
| def make_request(timeout): | |
| resp = requests.post(url, json=payload, timeout=timeout) | |
| return resp.json() | |
| try: | |
| return _retry_request(make_request, max_retries=3, timeout=20) | |
| except Exception as e: | |
| print(f"[MCP][ERROR] file-manager-mcp.zip_files failed: {e}") | |
| return {"success": False, "path": None, "message": str(e)} | |
| def create_zip_from_memory(self, files: Dict[str, str], output_filename: str) -> Dict[str, Any]: | |
| url = f"{self.base_url}/create_zip_from_memory" | |
| payload = {"files": files, "output_filename": output_filename} | |
| print(f"[MCP] β file-manager-mcp.create_zip_from_memory {output_filename}") | |
| def make_request(timeout): | |
| resp = requests.post(url, json=payload, timeout=timeout) | |
| return resp.json() | |
| try: | |
| return _retry_request(make_request, max_retries=3, timeout=30) | |
| except Exception as e: | |
| print(f"[MCP][ERROR] file-manager-mcp.create_zip_from_memory failed: {e}") | |
| return {"success": False, "path": None, "message": str(e)} | |
| class GoogleSearchMCPClient: | |
| """ | |
| HTTP client for google-search-mcp. | |
| Endpoint (default base http://localhost:8082): | |
| - POST /search { query, limit } | |
| """ | |
| def __init__(self, base_url: Optional[str] = None): | |
| self.base_url = base_url or _base_url("GOOGLE_SEARCH_MCP_URL", "http://localhost:8082") | |
| def search(self, query: str, limit: int = 10) -> Dict[str, Any]: | |
| url = f"{self.base_url}/search" | |
| payload = {"query": query, "limit": limit} | |
| print(f"[MCP] β google-search-mcp.search '{query}' (limit={limit})") | |
| def make_request(timeout): | |
| resp = requests.post(url, json=payload, timeout=timeout) | |
| return resp.json() | |
| try: | |
| return _retry_request(make_request, max_retries=3, timeout=15) | |
| except Exception as e: | |
| print(f"[MCP][ERROR] google-search-mcp.search failed: {e}") | |
| return {"success": False, "results": [], "message": str(e)} | |
| class MarkdownifyMCPClient: | |
| """ | |
| HTTP client for markdownify-mcp. | |
| Endpoint (default base http://localhost:8083): | |
| - POST /format { text } | |
| """ | |
| def __init__(self, base_url: Optional[str] = None): | |
| self.base_url = base_url or _base_url("MARKDOWNIFY_MCP_URL", "http://localhost:8083") | |
| def format_markdown(self, text: str) -> str: | |
| url = f"{self.base_url}/format" | |
| print(f"[MCP] β markdownify-mcp.format_markdown (len={len(text)})") | |
| def make_request(timeout): | |
| resp = requests.post(url, json={"text": text}, timeout=timeout) | |
| data = resp.json() | |
| if data.get("success") and "markdown" in data: | |
| return data["markdown"] | |
| # Fallback to original text if MCP reports failure or no markdown field. | |
| return text | |
| try: | |
| return _retry_request(make_request, max_retries=2, timeout=10) | |
| except Exception as e: | |
| print(f"[MCP][ERROR] markdownify-mcp.format_markdown failed: {e}") | |
| # Fallback: basic local cleanup | |
| return self._local_markdown_cleanup(text) | |
| def _local_markdown_cleanup(self, text: str) -> str: | |
| """ | |
| Local fallback for markdown formatting when MCP fails. | |
| Performs basic cleanup and normalization. | |
| """ | |
| import re | |
| # Remove dangerous HTML tags | |
| text = re.sub(r'<script[^>]*>.*?</script>', '', text, flags=re.DOTALL | re.IGNORECASE) | |
| text = re.sub(r'<style[^>]*>.*?</style>', '', text, flags=re.DOTALL | re.IGNORECASE) | |
| # Normalize line breaks | |
| text = text.replace('\r\n', '\n') | |
| # Ensure headers have space after # | |
| text = re.sub(r'^(#{1,6})([^\s])', r'\1 \2', text, flags=re.MULTILINE) | |
| # Remove excessive blank lines (more than 2 consecutive) | |
| text = re.sub(r'\n{3,}', '\n\n', text) | |
| return text.strip() | |