Spaces:
Sleeping
Sleeping
| """ | |
| MCP Server configuration with correct FastMCP host and port setup | |
| """ | |
| import os | |
| import logging | |
| from typing import Dict | |
| from mcp.server.fastmcp import FastMCP | |
| import asyncio | |
| from presentation_manager import presentations, get_current_presentation_id, set_current_presentation_id | |
| from config import get_server_config | |
| # Import all tool modules | |
| from tools.presentation_tools import register_presentation_tools | |
| from tools.content_tools import register_content_tools | |
| from tools.template_tools import register_template_tools | |
| from tools.structural_tools import register_structural_tools | |
| from tools.professional_tools import register_professional_tools | |
| from tools.hyperlink_tools import register_hyperlink_tools | |
| from tools.chart_tools import register_chart_tools | |
| from tools.connector_tools import register_connector_tools | |
| from tools.master_tools import register_master_tools | |
| from tools.transition_tools import register_transition_tools | |
| # Import utils with fallbacks | |
| try: | |
| from utils.core_utils import * | |
| except ImportError: | |
| pass | |
| try: | |
| from utils.presentation_utils import * | |
| except ImportError: | |
| pass | |
| try: | |
| from utils.content_utils import * | |
| except ImportError: | |
| pass | |
| try: | |
| from utils.design_utils import * | |
| except ImportError: | |
| pass | |
| try: | |
| from utils.template_utils import * | |
| except ImportError: | |
| pass | |
| try: | |
| from utils.validation_utils import * | |
| except ImportError: | |
| pass | |
| # Fallback utility functions | |
| def safe_get_function(module_name, function_name, fallback_func): | |
| """Safely get a function from a module, return fallback if not found""" | |
| try: | |
| import importlib | |
| module = importlib.import_module(module_name) | |
| return getattr(module, function_name) | |
| except (ImportError, AttributeError): | |
| return fallback_func | |
| def fallback_validate_parameters(params): | |
| """Fallback validate parameters function""" | |
| for param_name, (value, constraints) in params.items(): | |
| for constraint_func, error_msg in constraints: | |
| if not constraint_func(value): | |
| return False, f"Parameter '{param_name}': {error_msg}" | |
| return True, None | |
| def fallback_is_positive(value): | |
| """Fallback is_positive function""" | |
| return value > 0 | |
| def fallback_is_non_negative(value): | |
| """Fallback is_non_negative function""" | |
| return value >= 0 | |
| def fallback_is_in_range(min_val, max_val): | |
| """Fallback is_in_range function""" | |
| return lambda x: min_val <= x <= max_val | |
| def fallback_is_valid_rgb(color_list): | |
| """Fallback is_valid_rgb function""" | |
| if not isinstance(color_list, list) or len(color_list) != 3: | |
| return False | |
| return all(isinstance(c, int) and 0 <= c <= 255 for c in color_list) | |
| def fallback_get_template_search_directories(): | |
| """Fallback get_template_search_directories function""" | |
| template_env_path = os.environ.get('PPT_TEMPLATE_PATH') | |
| if template_env_path: | |
| import platform | |
| separator = ';' if platform.system() == "Windows" else ':' | |
| env_dirs = [path.strip() for path in template_env_path.split(separator) if path.strip()] | |
| valid_env_dirs = [] | |
| for dir_path in env_dirs: | |
| expanded_path = os.path.expanduser(dir_path) | |
| if os.path.exists(expanded_path) and os.path.isdir(expanded_path): | |
| valid_env_dirs.append(expanded_path) | |
| if valid_env_dirs: | |
| return valid_env_dirs + ['.', './templates', './assets', './resources'] | |
| else: | |
| print(f"Warning: PPT_TEMPLATE_PATH directories not found: {template_env_path}") | |
| return ['.', './templates', './assets', './resources'] | |
| def fallback_add_shape_direct(slide, shape_type: str, left: float, top: float, width: float, height: float): | |
| """Fallback add_shape_direct function""" | |
| from pptx.util import Inches | |
| shape_type_map = { | |
| 'rectangle': 1, 'rounded_rectangle': 2, 'oval': 9, 'diamond': 4, | |
| 'triangle': 5, 'right_triangle': 6, 'pentagon': 56, 'hexagon': 10, | |
| 'heptagon': 11, 'octagon': 12, 'star': 12, 'arrow': 13, 'cloud': 35, | |
| 'heart': 21, 'lightning_bolt': 22, 'sun': 23, 'moon': 24, | |
| 'smiley_face': 17, 'no_symbol': 19, 'flowchart_process': 112, | |
| 'flowchart_decision': 114, 'flowchart_data': 115, 'flowchart_document': 119 | |
| } | |
| shape_type_lower = str(shape_type).lower() | |
| if shape_type_lower not in shape_type_map: | |
| available_shapes = ', '.join(sorted(shape_type_map.keys())) | |
| raise ValueError(f"Unsupported shape type: '{shape_type}'. Available shape types: {available_shapes}") | |
| shape_value = shape_type_map[shape_type_lower] | |
| try: | |
| shape = slide.shapes.add_shape( | |
| shape_value, Inches(left), Inches(top), Inches(width), Inches(height) | |
| ) | |
| return shape | |
| except Exception as e: | |
| raise ValueError(f"Failed to create '{shape_type}' shape using direct value {shape_value}: {str(e)}") | |
| # Get utility functions (existing or fallbacks) | |
| validate_parameters = safe_get_function('utils.validation_utils', 'validate_parameters', fallback_validate_parameters) | |
| is_positive = safe_get_function('utils.core_utils', 'is_positive', fallback_is_positive) | |
| is_non_negative = safe_get_function('utils.core_utils', 'is_non_negative', fallback_is_non_negative) | |
| is_in_range = safe_get_function('utils.core_utils', 'is_in_range', fallback_is_in_range) | |
| is_valid_rgb = safe_get_function('utils.validation_utils', 'is_valid_rgb', fallback_is_valid_rgb) | |
| get_template_search_directories = safe_get_function('utils.template_utils', 'get_template_search_directories', fallback_get_template_search_directories) | |
| add_shape_direct = safe_get_function('utils.design_utils', 'add_shape_direct', fallback_add_shape_direct) | |
| logger = logging.getLogger(__name__) | |
| # Get config FIRST | |
| config = get_server_config() | |
| # ✅ CORRECT: Set host and port in FastMCP constructor | |
| mcp_app = FastMCP( | |
| name="ppt-mcp-server", | |
| host=config.get('host', '0.0.0.0'), # ✅ Set host here | |
| port=config.get('mcp_port', 8000), # ✅ Set port here | |
| stateless_http=True | |
| ) | |
| # Attach friendly root and health routes to avoid 404s on '/' | |
| def _attach_basic_http_routes(): | |
| try: | |
| # Try common attributes; some FastMCP versions expose different names | |
| http_app = ( | |
| getattr(mcp_app, 'app', None) | |
| or getattr(mcp_app, 'http_app', None) | |
| or getattr(mcp_app, 'fastapi_app', None) | |
| or getattr(mcp_app, 'fastapi', None) | |
| ) | |
| if http_app is None: | |
| return | |
| # Prefer FastAPI's add_api_route API to avoid decorator binding issues | |
| from starlette.responses import JSONResponse, PlainTextResponse, Response | |
| try: | |
| from fastapi import APIRouter | |
| router = APIRouter() | |
| except Exception: | |
| router = None | |
| async def root_handler(request=None): | |
| info = get_mcp_server_info() | |
| return JSONResponse({ | |
| "status": "ok", | |
| "message": "PowerPoint MCP Server", | |
| "endpoint": "/mcp", | |
| "transport": info.get("transport", "streamable-http"), | |
| "version": info.get("version", "2.1.0") | |
| }) | |
| async def health_handler(request=None): | |
| return PlainTextResponse("ok") | |
| async def mcp_get_handler(request=None): | |
| info = get_mcp_server_info() | |
| return JSONResponse({ | |
| "status": "ok", | |
| "message": "MCP endpoint (GET)", | |
| "note": "Use POST with streaming for actual MCP interactions", | |
| "endpoint": "/mcp", | |
| "transport": info.get("transport", "streamable-http"), | |
| "version": info.get("version", "2.1.0") | |
| }) | |
| async def favicon_handler(request=None): | |
| # Empty favicon to avoid 404 noise | |
| return Response(content=b"", media_type="image/x-icon", status_code=204) | |
| # Add routes via router if available, then include | |
| if router is not None: | |
| router.add_api_route('/', root_handler, methods=['GET']) | |
| router.add_api_route('', root_handler, methods=['GET']) | |
| router.add_api_route('/health', health_handler, methods=['GET']) | |
| router.add_api_route('/healthz', health_handler, methods=['GET']) | |
| router.add_api_route('/mcp', mcp_get_handler, methods=['GET']) | |
| router.add_api_route('/mcp/', mcp_get_handler, methods=['GET']) | |
| router.add_api_route('/favicon.ico', favicon_handler, methods=['GET']) | |
| if hasattr(http_app, 'include_router'): | |
| http_app.include_router(router) | |
| else: | |
| # Directly add routes on the app | |
| if hasattr(http_app, 'add_api_route'): | |
| http_app.add_api_route('/', root_handler, methods=['GET']) | |
| http_app.add_api_route('', root_handler, methods=['GET']) | |
| http_app.add_api_route('/health', health_handler, methods=['GET']) | |
| http_app.add_api_route('/healthz', health_handler, methods=['GET']) | |
| http_app.add_api_route('/mcp', mcp_get_handler, methods=['GET']) | |
| http_app.add_api_route('/mcp/', mcp_get_handler, methods=['GET']) | |
| http_app.add_api_route('/favicon.ico', favicon_handler, methods=['GET']) | |
| elif hasattr(http_app, 'add_route'): | |
| http_app.add_route('/', root_handler, methods=['GET']) | |
| http_app.add_route('', root_handler, methods=['GET']) | |
| http_app.add_route('/health', health_handler, methods=['GET']) | |
| http_app.add_route('/healthz', health_handler, methods=['GET']) | |
| http_app.add_route('/mcp', mcp_get_handler, methods=['GET']) | |
| http_app.add_route('/mcp/', mcp_get_handler, methods=['GET']) | |
| http_app.add_route('/favicon.ico', favicon_handler, methods=['GET']) | |
| except Exception: | |
| # Non-fatal; server will still run | |
| pass | |
| def register_all_tools_on(app_instance: FastMCP): | |
| """Register all MCP tools with the server""" | |
| try: | |
| logger.info("Registering all PowerPoint MCP tools...") | |
| register_presentation_tools( | |
| app_instance, | |
| presentations, | |
| get_current_presentation_id, | |
| get_template_search_directories | |
| ) | |
| register_content_tools( | |
| app_instance, | |
| presentations, | |
| get_current_presentation_id, | |
| validate_parameters, | |
| is_positive, | |
| is_non_negative, | |
| is_in_range, | |
| is_valid_rgb | |
| ) | |
| register_template_tools( | |
| app_instance, | |
| presentations, | |
| get_current_presentation_id | |
| ) | |
| register_structural_tools( | |
| app_instance, | |
| presentations, | |
| get_current_presentation_id, | |
| validate_parameters, | |
| is_positive, | |
| is_non_negative, | |
| is_in_range, | |
| is_valid_rgb, | |
| add_shape_direct | |
| ) | |
| register_professional_tools( | |
| app_instance, | |
| presentations, | |
| get_current_presentation_id | |
| ) | |
| register_hyperlink_tools( | |
| app_instance, | |
| presentations, | |
| get_current_presentation_id, | |
| validate_parameters, | |
| is_positive, | |
| is_non_negative, | |
| is_in_range, | |
| is_valid_rgb | |
| ) | |
| register_chart_tools( | |
| app_instance, | |
| presentations, | |
| get_current_presentation_id, | |
| validate_parameters, | |
| is_positive, | |
| is_non_negative, | |
| is_in_range, | |
| is_valid_rgb | |
| ) | |
| register_connector_tools( | |
| app_instance, | |
| presentations, | |
| get_current_presentation_id, | |
| validate_parameters, | |
| is_positive, | |
| is_non_negative, | |
| is_in_range, | |
| is_valid_rgb | |
| ) | |
| register_master_tools( | |
| app_instance, | |
| presentations, | |
| get_current_presentation_id, | |
| validate_parameters, | |
| is_positive, | |
| is_non_negative, | |
| is_in_range, | |
| is_valid_rgb | |
| ) | |
| register_transition_tools( | |
| app_instance, | |
| presentations, | |
| get_current_presentation_id, | |
| validate_parameters, | |
| is_positive, | |
| is_non_negative, | |
| is_in_range, | |
| is_valid_rgb | |
| ) | |
| # Register utility tools | |
| register_utility_tools_on(app_instance) | |
| logger.info("All MCP tools registered successfully (32+ tools from 11 modules)") | |
| except Exception as e: | |
| logger.error(f"Error registering tools: {str(e)}") | |
| # Don't raise - allow partial tool registration | |
| pass | |
| def register_all_tools(): | |
| register_all_tools_on(mcp_app) | |
| def register_utility_tools_on(app_instance: FastMCP): | |
| """Register additional utility tools""" | |
| def list_presentations() -> Dict: | |
| current_id = get_current_presentation_id() | |
| return { | |
| "presentations": [ | |
| { | |
| "id": pres_id, | |
| "slide_count": len(pres.slides), | |
| "is_current": pres_id == current_id | |
| } | |
| for pres_id, pres in presentations.items() | |
| ], | |
| "current_presentation_id": current_id, | |
| "total_presentations": len(presentations) | |
| } | |
| def switch_presentation(presentation_id: str) -> Dict: | |
| if presentation_id not in presentations: | |
| return { | |
| "error": f"Presentation '{presentation_id}' not found. Available presentations: {list(presentations.keys())}" | |
| } | |
| old_id = get_current_presentation_id() | |
| set_current_presentation_id(presentation_id) | |
| return { | |
| "message": f"Switched from presentation '{old_id}' to '{presentation_id}'", | |
| "previous_presentation_id": old_id, | |
| "current_presentation_id": presentation_id | |
| } | |
| def get_server_info() -> Dict: | |
| return get_mcp_server_info() | |
| def start_mcp_server(): | |
| """Start the MCP server with streamable HTTP transport""" | |
| try: | |
| register_all_tools() | |
| _attach_basic_http_routes() | |
| config = get_server_config() | |
| logger.info("Starting PowerPoint MCP Server with streamable HTTP transport...") | |
| logger.info(f"Server will be available at: {config.get('base_url', 'localhost')}/mcp") | |
| # Run a composite FastAPI app that mounts MCP at /mcp and serves root routes | |
| try: | |
| from fastapi import FastAPI | |
| from starlette.responses import JSONResponse, PlainTextResponse, Response | |
| import uvicorn | |
| # Build root app with slash redirects disabled to avoid 307s on /mcp | |
| root_app = FastAPI(redirect_slashes=False) | |
| async def _root(): | |
| info = get_mcp_server_info() | |
| return JSONResponse({ | |
| "status": "ok", | |
| "message": "PowerPoint MCP Server", | |
| "endpoint": "/mcp", | |
| "ui": "/ui", | |
| "transport": info.get("transport", "streamable-http"), | |
| "version": info.get("version", "2.1.0") | |
| }) | |
| async def _health(): | |
| return PlainTextResponse("ok") | |
| async def _healthz(): | |
| return PlainTextResponse("ok") | |
| async def _favicon(): | |
| return Response(content=b"", media_type="image/x-icon", status_code=204) | |
| # Provide harmless GET handlers on /mcp and /mcp/ | |
| async def _mcp_get(): | |
| info = get_mcp_server_info() | |
| return JSONResponse({ | |
| "status": "ok", | |
| "message": "MCP endpoint (GET)", | |
| "note": "Use POST with streaming for actual MCP interactions", | |
| "endpoint": "/mcp", | |
| "transport": info.get("transport", "streamable-http"), | |
| "version": info.get("version", "2.1.0") | |
| }) | |
| async def _mcp_get_slash(): | |
| return await _mcp_get() | |
| # Minimal manifest to avoid 404 noise | |
| async def _manifest(): | |
| info = get_mcp_server_info() | |
| return JSONResponse({ | |
| "name": info.get("name", "PowerPoint MCP Server"), | |
| "short_name": "PPT-MCP", | |
| "start_url": "/", | |
| "display": "standalone", | |
| "background_color": "#ffffff", | |
| "theme_color": "#0b5fff" | |
| }) | |
| # Mount MCP app under /mcp preserving all POST/streaming behavior | |
| # Create a fresh FastMCP instance for mounting to ensure attributes exist | |
| mounted_mcp = FastMCP( | |
| name="ppt-mcp-server", | |
| host=config.get('host', '0.0.0.0'), | |
| port=config.get('mcp_port', 8000), | |
| stateless_http=True | |
| ) | |
| register_all_tools_on(mounted_mcp) | |
| # DEBUG: log available attributes so we can locate the underlying FastAPI app for this MCP version | |
| try: | |
| logger.warning("FastMCP attributes (mounted_mcp): %s", dir(mounted_mcp)) | |
| except Exception: | |
| pass | |
| # Prefer streamable HTTP app for mounting; call if attribute is a factory method | |
| candidates = [ | |
| 'streamable_http_app', | |
| 'sse_app', | |
| 'app', | |
| 'http_app', | |
| 'fastapi_app', | |
| 'fastapi', | |
| 'asgi_app', | |
| 'application' | |
| ] | |
| underlying = None | |
| for attr in candidates: | |
| obj = getattr(mounted_mcp, attr, None) | |
| if obj is None: | |
| continue | |
| try: | |
| # If it's callable with zero args, try to get the ASGI app instance | |
| if callable(obj): | |
| # Ensure zero-arg call; some methods may require no params to return the app | |
| try: | |
| test = obj() | |
| if test is not None: | |
| underlying = test | |
| break | |
| except TypeError: | |
| # Not a zero-arg factory; skip | |
| pass | |
| else: | |
| # Non-callable; assume it's already an ASGI app | |
| underlying = obj | |
| break | |
| except Exception: | |
| continue | |
| if underlying is not None: | |
| # Mount underlying MCP ASGI app at an internal path to avoid route conflicts | |
| try: | |
| root_app.mount("/_mcp_internal", underlying) | |
| except Exception: | |
| pass | |
| # Add POST proxy that rewrites path to root for the underlying ASGI app | |
| from starlette.requests import Request | |
| from starlette.responses import Response | |
| async def _proxy_post_to_underlying(req: Request): | |
| body = await req.body() | |
| # Build a minimal ASGI receive that feeds the body once | |
| received = {"sent": False} | |
| async def _receive(): | |
| if not received["sent"]: | |
| received["sent"] = True | |
| return { | |
| "type": "http.request", | |
| "body": body, | |
| "more_body": False, | |
| } | |
| return {"type": "http.request", "body": b"", "more_body": False} | |
| # Capture response from underlying | |
| resp_status = {"code": 500} | |
| resp_headers = [] | |
| resp_body_parts = [] | |
| async def _send(event): | |
| et = event.get("type") | |
| if et == "http.response.start": | |
| resp_status["code"] = event["status"] | |
| resp_headers.extend(event.get("headers", [])) | |
| elif et == "http.response.body": | |
| data = event.get("body", b"") or b"" | |
| if data: | |
| resp_body_parts.append(data) | |
| else: | |
| pass | |
| # Rewrite scope path to '/mcp' for the underlying ASGI app | |
| scope = dict(req.scope) | |
| scope["path"] = "/mcp" | |
| scope["raw_path"] = b"/mcp" | |
| # Preserve query string | |
| if "query_string" not in scope and req.url.query: | |
| scope["query_string"] = req.url.query.encode() | |
| await underlying(scope, _receive, _send) | |
| return Response(content=b"".join(resp_body_parts), status_code=resp_status["code"], headers={k.decode(): v.decode() for k, v in resp_headers}) | |
| root_app.add_api_route("/mcp", _proxy_post_to_underlying, methods=["POST"]) | |
| root_app.add_api_route("/mcp/", _proxy_post_to_underlying, methods=["POST"]) | |
| # Catch-all for any subpath under /mcp (some clients may send /mcp/ with extra path) | |
| from typing import Optional | |
| async def _proxy_post_catch_all(rest_path: Optional[str] = None, request=None): | |
| # Reuse request object | |
| return await _proxy_post_to_underlying(request) | |
| root_app.add_api_route("/mcp/{rest_path:path}", _proxy_post_catch_all, methods=["POST"]) | |
| else: | |
| raise RuntimeError("FastMCP underlying FastAPI app not found for mounting") | |
| # Optionally mount Gradio UI at /ui (so Spaces has a visible page) | |
| try: | |
| from gradio_interface import create_gradio_interface | |
| demo = create_gradio_interface() | |
| try: | |
| import gradio as gr | |
| root_app = gr.mount_gradio_app(root_app, demo, path="/ui") | |
| except Exception: | |
| # Fallback: mount underlying FastAPI app of Gradio Blocks | |
| if hasattr(demo, 'app'): | |
| root_app.mount("/ui", demo.app) | |
| except Exception: | |
| pass | |
| # Add middleware to intercept POST /mcp* and proxy to underlying ASGI app | |
| try: | |
| from starlette.middleware.base import BaseHTTPMiddleware | |
| from starlette.requests import Request as StarletteRequest | |
| from starlette.responses import Response as StarletteResponse | |
| class McpProxyMiddleware(BaseHTTPMiddleware): | |
| async def dispatch(self, request: StarletteRequest, call_next): | |
| try: | |
| path = request.url.path | |
| if request.method.upper() == 'POST' and (path == '/mcp' or path.startswith('/mcp/')): | |
| body = await request.body() | |
| received = {"sent": False} | |
| async def _receive(): | |
| if not received["sent"]: | |
| received["sent"] = True | |
| return {"type": "http.request", "body": body, "more_body": False} | |
| return {"type": "http.request", "body": b"", "more_body": False} | |
| resp_status = {"code": 500} | |
| resp_headers = [] | |
| resp_body_parts = [] | |
| async def _send(event): | |
| et = event.get("type") | |
| if et == "http.response.start": | |
| resp_status["code"] = event["status"] | |
| resp_headers.extend(event.get("headers", [])) | |
| elif et == "http.response.body": | |
| data = event.get("body", b"") or b"" | |
| if data: | |
| resp_body_parts.append(data) | |
| scope = dict(request.scope) | |
| scope["path"] = "/mcp" | |
| scope["raw_path"] = b"/mcp" | |
| if "query_string" not in scope and request.url.query: | |
| scope["query_string"] = request.url.query.encode() | |
| await underlying(scope, _receive, _send) | |
| headers_dict = {} | |
| for k, v in resp_headers: | |
| try: | |
| headers_dict[k.decode()] = v.decode() | |
| except Exception: | |
| pass | |
| return StarletteResponse(content=b"".join(resp_body_parts), status_code=resp_status["code"], headers=headers_dict) | |
| except Exception: | |
| # Fall through to next app on any proxy error | |
| pass | |
| return await call_next(request) | |
| root_app.add_middleware(McpProxyMiddleware) | |
| except Exception: | |
| pass | |
| # Start Uvicorn | |
| uvicorn.run(root_app, host=config.get('host', '0.0.0.0'), port=config.get('mcp_port', 8000), log_level="info") | |
| except Exception as run_err: | |
| # Fallback to FastMCP's built-in runner if mounting fails | |
| logger.warning(f"Composite app startup failed, falling back to FastMCP runner: {run_err}") | |
| mcp_app.run(transport='streamable-http') | |
| except Exception as e: | |
| logger.error(f"Failed to start MCP server: {str(e)}") | |
| raise | |
| def get_mcp_server_info() -> Dict: | |
| return { | |
| "name": "PowerPoint MCP Server - Complete Edition", | |
| "version": "2.1.0", | |
| "total_tools": 32, | |
| "loaded_presentations": len(presentations), | |
| "current_presentation": get_current_presentation_id(), | |
| "transport": "streamable-http", | |
| "endpoint": "/mcp", | |
| "deployment": "Hugging Face Spaces", | |
| "architecture": "Modular with complete utils/ and tools/ integration", | |
| "modules": { | |
| "utils": ["core_utils", "presentation_utils", "content_utils", "design_utils", "template_utils", "validation_utils"], | |
| "tools": ["presentation_tools", "content_tools", "template_tools", "structural_tools", "professional_tools", | |
| "hyperlink_tools", "chart_tools", "connector_tools", "master_tools", "transition_tools"] | |
| }, | |
| "features": [ | |
| "Presentation Management (7 tools)", | |
| "Content Management (8 tools)", | |
| "Template Operations (7 tools)", | |
| "Structural Elements (4 tools)", | |
| "Professional Design (3 tools)", | |
| "Specialized Features (5 tools)" | |
| ] | |
| } | |
| __all__ = ['mcp_app', 'start_mcp_server', 'get_mcp_server_info'] | |