Spaces:
Paused
Paused
| #!/usr/bin/env python | |
| """ | |
| PowerPoint MCP Server with Gradio interface and Streamable HTTP transport | |
| Integrated with existing tool structure from ppt_mcp_server.py | |
| """ | |
| import os | |
| import json | |
| import tempfile | |
| import time | |
| import uuid | |
| import logging | |
| import gradio as gr | |
| from threading import Thread | |
| from typing import Dict, Any, Optional | |
| from mcp.server.fastmcp import FastMCP | |
| import asyncio | |
| # Import all your existing tools registration functions | |
| from tools import ( | |
| register_presentation_tools, | |
| register_content_tools, | |
| register_structural_tools, | |
| register_professional_tools, | |
| register_template_tools, | |
| register_hyperlink_tools, | |
| register_chart_tools, | |
| register_connector_tools, | |
| register_master_tools, | |
| register_transition_tools | |
| ) | |
| # Set up logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| # Initialize the FastMCP server for streamable HTTP | |
| mcp_app = FastMCP( | |
| name="ppt-mcp-server" | |
| ) | |
| # Global state to store presentations in memory (from original) | |
| presentations = {} | |
| current_presentation_id = None | |
| # Template configuration (from original) | |
| def get_template_search_directories(): | |
| """ | |
| Get list of directories to search for templates. | |
| Uses environment variable PPT_TEMPLATE_PATH if set, otherwise uses default directories. | |
| """ | |
| template_env_path = os.environ.get('PPT_TEMPLATE_PATH') | |
| if template_env_path: | |
| import platform | |
| separator = ';' if platform.system() == "Windows" else ':' | |
| env_dirs = [path.strip() for path in template_env_path.split(separator) if path.strip()] | |
| valid_env_dirs = [] | |
| for dir_path in env_dirs: | |
| expanded_path = os.path.expanduser(dir_path) | |
| if os.path.exists(expanded_path) and os.path.isdir(expanded_path): | |
| valid_env_dirs.append(expanded_path) | |
| if valid_env_dirs: | |
| return valid_env_dirs + ['.', './templates', './assets', './resources'] | |
| else: | |
| print(f"Warning: PPT_TEMPLATE_PATH directories not found: {template_env_path}") | |
| return ['.', './templates', './assets', './resources'] | |
| # Helper Functions (from original) | |
| def get_current_presentation(): | |
| """Get the current presentation object or raise an error if none is loaded.""" | |
| if current_presentation_id is None or current_presentation_id not in presentations: | |
| raise ValueError("No presentation is currently loaded. Please create or open a presentation first.") | |
| return presentations[current_presentation_id] | |
| def get_current_presentation_id(): | |
| """Get the current presentation ID.""" | |
| return current_presentation_id | |
| def set_current_presentation_id(pres_id): | |
| """Set the current presentation ID.""" | |
| global current_presentation_id | |
| current_presentation_id = pres_id | |
| def validate_parameters(params): | |
| """Validate parameters against constraints.""" | |
| for param_name, (value, constraints) in params.items(): | |
| for constraint_func, error_msg in constraints: | |
| if not constraint_func(value): | |
| return False, f"Parameter '{param_name}': {error_msg}" | |
| return True, None | |
| def is_positive(value): | |
| """Check if a value is positive.""" | |
| return value > 0 | |
| def is_non_negative(value): | |
| """Check if a value is non-negative.""" | |
| return value >= 0 | |
| def is_in_range(min_val, max_val): | |
| """Create a function that checks if a value is in a range.""" | |
| return lambda x: min_val <= x <= max_val | |
| def is_in_list(valid_list): | |
| """Create a function that checks if a value is in a list.""" | |
| return lambda x: x in valid_list | |
| def is_valid_rgb(color_list): | |
| """Check if a color list is a valid RGB tuple.""" | |
| if not isinstance(color_list, list) or len(color_list) != 3: | |
| return False | |
| return all(isinstance(c, int) and 0 <= c <= 255 for c in color_list) | |
| def add_shape_direct(slide, shape_type: str, left: float, top: float, width: float, height: float) -> Any: | |
| """Add an auto shape to a slide using direct integer values.""" | |
| from pptx.util import Inches | |
| shape_type_map = { | |
| 'rectangle': 1, | |
| 'rounded_rectangle': 2, | |
| 'oval': 9, | |
| 'diamond': 4, | |
| 'triangle': 5, | |
| 'right_triangle': 6, | |
| 'pentagon': 56, | |
| 'hexagon': 10, | |
| 'heptagon': 11, | |
| 'octagon': 12, | |
| 'star': 12, | |
| 'arrow': 13, | |
| 'cloud': 35, | |
| 'heart': 21, | |
| 'lightning_bolt': 22, | |
| 'sun': 23, | |
| 'moon': 24, | |
| 'smiley_face': 17, | |
| 'no_symbol': 19, | |
| 'flowchart_process': 112, | |
| 'flowchart_decision': 114, | |
| 'flowchart_data': 115, | |
| 'flowchart_document': 119 | |
| } | |
| shape_type_lower = str(shape_type).lower() | |
| if shape_type_lower not in shape_type_map: | |
| available_shapes = ', '.join(sorted(shape_type_map.keys())) | |
| raise ValueError(f"Unsupported shape type: '{shape_type}'. Available shape types: {available_shapes}") | |
| shape_value = shape_type_map[shape_type_lower] | |
| try: | |
| shape = slide.shapes.add_shape( | |
| shape_value, Inches(left), Inches(top), Inches(width), Inches(height) | |
| ) | |
| return shape | |
| except Exception as e: | |
| raise ValueError(f"Failed to create '{shape_type}' shape using direct value {shape_value}: {str(e)}") | |
| # Custom presentation management wrapper (from original) | |
| class PresentationManager: | |
| """Wrapper to handle presentation state updates.""" | |
| def __init__(self, presentations_dict): | |
| self.presentations = presentations_dict | |
| def store_presentation(self, pres, pres_id): | |
| """Store a presentation and set it as current.""" | |
| self.presentations[pres_id] = pres | |
| set_current_presentation_id(pres_id) | |
| return pres_id | |
| # Create presentation manager wrapper | |
| presentation_manager = PresentationManager(presentations) | |
| # Register all tool modules (from original) | |
| register_presentation_tools( | |
| mcp_app, | |
| presentations, | |
| get_current_presentation_id, | |
| get_template_search_directories | |
| ) | |
| register_content_tools( | |
| mcp_app, | |
| presentations, | |
| get_current_presentation_id, | |
| validate_parameters, | |
| is_positive, | |
| is_non_negative, | |
| is_in_range, | |
| is_valid_rgb | |
| ) | |
| register_structural_tools( | |
| mcp_app, | |
| presentations, | |
| get_current_presentation_id, | |
| validate_parameters, | |
| is_positive, | |
| is_non_negative, | |
| is_in_range, | |
| is_valid_rgb, | |
| add_shape_direct | |
| ) | |
| register_professional_tools( | |
| mcp_app, | |
| presentations, | |
| get_current_presentation_id | |
| ) | |
| register_template_tools( | |
| mcp_app, | |
| presentations, | |
| get_current_presentation_id | |
| ) | |
| register_hyperlink_tools( | |
| mcp_app, | |
| presentations, | |
| get_current_presentation_id, | |
| validate_parameters, | |
| is_positive, | |
| is_non_negative, | |
| is_in_range, | |
| is_valid_rgb | |
| ) | |
| register_chart_tools( | |
| mcp_app, | |
| presentations, | |
| get_current_presentation_id, | |
| validate_parameters, | |
| is_positive, | |
| is_non_negative, | |
| is_in_range, | |
| is_valid_rgb | |
| ) | |
| register_connector_tools( | |
| mcp_app, | |
| presentations, | |
| get_current_presentation_id, | |
| validate_parameters, | |
| is_positive, | |
| is_non_negative, | |
| is_in_range, | |
| is_valid_rgb | |
| ) | |
| register_master_tools( | |
| mcp_app, | |
| presentations, | |
| get_current_presentation_id, | |
| validate_parameters, | |
| is_positive, | |
| is_non_negative, | |
| is_in_range, | |
| is_valid_rgb | |
| ) | |
| register_transition_tools( | |
| mcp_app, | |
| presentations, | |
| get_current_presentation_id, | |
| validate_parameters, | |
| is_positive, | |
| is_non_negative, | |
| is_in_range, | |
| is_valid_rgb | |
| ) | |
| # Additional Utility Tools (from original) | |
| def list_presentations() -> Dict: | |
| """List all loaded presentations.""" | |
| return { | |
| "presentations": [ | |
| { | |
| "id": pres_id, | |
| "slide_count": len(pres.slides), | |
| "is_current": pres_id == current_presentation_id | |
| } | |
| for pres_id, pres in presentations.items() | |
| ], | |
| "current_presentation_id": current_presentation_id, | |
| "total_presentations": len(presentations) | |
| } | |
| def switch_presentation(presentation_id: str) -> Dict: | |
| """Switch to a different loaded presentation.""" | |
| if presentation_id not in presentations: | |
| return { | |
| "error": f"Presentation '{presentation_id}' not found. Available presentations: {list(presentations.keys())}" | |
| } | |
| global current_presentation_id | |
| old_id = current_presentation_id | |
| current_presentation_id = presentation_id | |
| return { | |
| "message": f"Switched from presentation '{old_id}' to '{presentation_id}'", | |
| "previous_presentation_id": old_id, | |
| "current_presentation_id": current_presentation_id | |
| } | |
| def get_server_info() -> Dict: | |
| """Get information about the MCP server.""" | |
| return { | |
| "name": "PowerPoint MCP Server - Enhanced Edition", | |
| "version": "2.1.0", | |
| "total_tools": 32, | |
| "loaded_presentations": len(presentations), | |
| "current_presentation": current_presentation_id, | |
| "transport": "streamable-http", | |
| "features": [ | |
| "Presentation Management (7 tools)", | |
| "Content Management (6 tools)", | |
| "Template Operations (7 tools)", | |
| "Structural Elements (4 tools)", | |
| "Professional Design (3 tools)", | |
| "Specialized Features (5 tools)" | |
| ] | |
| } | |
| # Direct tool usage functions for Gradio (using internal MCP mechanisms) | |
| def create_test_presentation(): | |
| """Create a test presentation using MCP tools directly""" | |
| from pptx import Presentation | |
| from pptx.util import Inches | |
| import uuid | |
| try: | |
| # Create presentation directly | |
| prs = Presentation() | |
| presentation_id = str(uuid.uuid4()) | |
| presentations[presentation_id] = prs | |
| set_current_presentation_id(presentation_id) | |
| return presentation_id, "✅ Test presentation created successfully" | |
| except Exception as e: | |
| return None, f"❌ Error creating presentation: {str(e)}" | |
| def add_test_slide(presentation_id, title="Test Slide", content="Test content"): | |
| """Add a test slide""" | |
| try: | |
| if presentation_id not in presentations: | |
| return False, "Presentation not found" | |
| prs = presentations[presentation_id] | |
| # Add slide with title and content layout | |
| slide_layout = prs.slide_layouts[1] # Title and content layout | |
| slide = prs.slides.add_slide(slide_layout) | |
| # Set title | |
| if slide.shapes.title: | |
| slide.shapes.title.text = title | |
| # Set content | |
| if len(slide.placeholders) > 1: | |
| slide.placeholders[1].text = content | |
| return True, f"Slide added: {title}" | |
| except Exception as e: | |
| return False, f"Error adding slide: {str(e)}" | |
| def save_test_presentation(presentation_id): | |
| """Save test presentation to file""" | |
| try: | |
| if presentation_id not in presentations: | |
| return None, "Presentation not found" | |
| prs = presentations[presentation_id] | |
| temp_file = tempfile.NamedTemporaryFile(suffix='.pptx', delete=False) | |
| prs.save(temp_file.name) | |
| return temp_file.name, "✅ Presentation saved successfully" | |
| except Exception as e: | |
| return None, f"❌ Error saving presentation: {str(e)}" | |
| # Gradio Interface Functions | |
| def create_presentation_from_json(slides_data): | |
| """Create PowerPoint from JSON data (for Gradio testing)""" | |
| try: | |
| if isinstance(slides_data, str): | |
| data = json.loads(slides_data) | |
| else: | |
| data = slides_data | |
| # Create presentation | |
| presentation_id, create_message = create_test_presentation() | |
| if not presentation_id: | |
| return None, create_message | |
| # Add slides | |
| slide_count = 0 | |
| for slide_info in data.get("slides", []): | |
| success, message = add_test_slide( | |
| presentation_id, | |
| slide_info.get("title", f"Slide {slide_count + 1}"), | |
| slide_info.get("content", "Sample content") | |
| ) | |
| if success: | |
| slide_count += 1 | |
| if slide_count == 0: | |
| return None, "❌ No slides were created successfully" | |
| # Save presentation | |
| file_path, save_message = save_test_presentation(presentation_id) | |
| if file_path: | |
| return file_path, f"✅ Presentation created with {slide_count} slides" | |
| else: | |
| return None, save_message | |
| except Exception as e: | |
| logger.error(f"Error creating presentation: {str(e)}") | |
| return None, f"❌ Error: {str(e)}" | |
| def get_server_status(): | |
| """Get current server status for display""" | |
| return f""" | |
| 🟢 **Server Status**: Running | |
| 🔧 **Transport**: Streamable HTTP | |
| 📦 **Tools Available**: 32+ (All modules registered) | |
| 🎯 **MCP Version**: 2024-11-05 | |
| ⚡ **Server Version**: 2.1.0 | |
| 📊 **Presentations Loaded**: {len(presentations)} | |
| 🎮 **Current Presentation**: {current_presentation_id or 'None'} | |
| """ | |
| # Create Gradio Interface | |
| def create_gradio_interface(): | |
| """Create the main Gradio interface""" | |
| with gr.Blocks(title="PowerPoint MCP Server - Streamable HTTP", theme=gr.themes.Soft()) as demo: | |
| gr.Markdown("# 🎯 PowerPoint MCP Server") | |
| gr.Markdown("**Transport**: Streamable HTTP | **Endpoint**: `/mcp` | **Tools**: 32+ available") | |
| with gr.Tab("🚀 Test Interface"): | |
| with gr.Row(): | |
| with gr.Column(): | |
| gr.Markdown("### Create Presentation from JSON") | |
| slides_input = gr.Textbox( | |
| label="Slides Data (JSON)", | |
| value='{"slides": [{"title": "Welcome", "content": "This is slide 1"}, {"title": "About Us", "content": "This is slide 2"}]}', | |
| lines=12, | |
| placeholder='{"slides": [{"title": "Slide Title", "content": "Slide content"}]}' | |
| ) | |
| create_btn = gr.Button("🚀 Create Presentation", variant="primary", size="lg") | |
| with gr.Column(): | |
| status_output = gr.Textbox( | |
| label="Creation Status", | |
| interactive=False, | |
| lines=3 | |
| ) | |
| download_btn = gr.DownloadButton( | |
| label="📥 Download PowerPoint", | |
| visible=False, | |
| size="lg" | |
| ) | |
| with gr.Tab("🔧 MCP Configuration"): | |
| gr.Markdown("## MCP Server Settings for Your Chatbot") | |
| with gr.Row(): | |
| with gr.Column(): | |
| gr.Markdown("### Configuration Details") | |
| config_code = f"""**Name**: PowerPoint Creator | |
| **Description**: AI-powered PowerPoint presentation generator with 32 professional tools and streamable HTTP transport | |
| **URL**: https://brucewayne1-auxoppt.hf.space/mcp | |
| **Transport**: Stream""" | |
| gr.Code(config_code, language="yaml") | |
| with gr.Column(): | |
| gr.Markdown("### Quick Copy") | |
| gr.Textbox( | |
| value="https://brucewayne1-auxoppt.hf.space/mcp", | |
| label="MCP Server URL", | |
| interactive=True | |
| ) | |
| gr.Textbox( | |
| value="Stream", | |
| label="Transport Type", | |
| interactive=False | |
| ) | |
| with gr.Tab("📊 Server Status"): | |
| status_text = gr.Textbox( | |
| value=get_server_status(), | |
| label="Server Information", | |
| lines=10, | |
| interactive=False | |
| ) | |
| refresh_btn = gr.Button("🔄 Refresh Status", variant="secondary") | |
| refresh_btn.click( | |
| fn=get_server_status, | |
| outputs=[status_text] | |
| ) | |
| with gr.Tab("📖 Available Tools"): | |
| gr.Markdown("## All Available MCP Tools") | |
| tools_info = """ | |
| ### 🎯 Presentation Management (7 tools) | |
| • create_presentation • create_presentation_from_template • open_presentation | |
| • save_presentation • get_presentation_info • get_template_file_info • set_core_properties | |
| ### 📝 Content Management (6 tools) | |
| • add_slide • get_slide_info • extract_slide_text • extract_presentation_text | |
| • populate_placeholder • add_bullet_points • manage_text • manage_image | |
| ### 🎨 Template Operations (7 tools) | |
| • list_slide_templates • apply_slide_template • create_slide_from_template | |
| • create_presentation_from_templates • get_template_info • auto_generate_presentation • optimize_slide_text | |
| ### 🏗️ Structural Elements (4 tools) | |
| • add_table • format_table_cell • add_shape • add_chart | |
| ### ✨ Professional Design (3 tools) | |
| • apply_professional_design • apply_picture_effects • manage_fonts | |
| ### 🔧 Specialized Features (5 tools) | |
| • manage_hyperlinks • manage_slide_masters • add_connector • update_chart_data • manage_slide_transitions | |
| """ | |
| gr.Markdown(tools_info) | |
| # Event handlers | |
| create_btn.click( | |
| fn=create_presentation_from_json, | |
| inputs=[slides_input], | |
| outputs=[download_btn, status_output] | |
| ).then( | |
| fn=lambda x: gr.update(visible=bool(x)), | |
| inputs=[download_btn], | |
| outputs=[download_btn] | |
| ) | |
| return demo | |
| # MCP Server startup function | |
| def start_mcp_server(): | |
| """Start the MCP server with streamable HTTP transport""" | |
| try: | |
| logger.info("Starting MCP Server with streamable HTTP transport...") | |
| # Use streamable HTTP transport on port 8000 | |
| mcp_app.run(transport='streamable-http', port=8000) | |
| except Exception as e: | |
| logger.error(f"Failed to start MCP server: {str(e)}") | |
| # Main execution | |
| if __name__ == "__main__": | |
| # Start MCP server in background thread | |
| mcp_thread = Thread(target=start_mcp_server, daemon=True) | |
| mcp_thread.start() | |
| # Wait for MCP server to start | |
| time.sleep(3) | |
| logger.info("MCP Server started on port 8000 with streamable HTTP transport") | |
| # Launch Gradio interface | |
| demo = create_gradio_interface() | |
| demo.launch( | |
| server_name="0.0.0.0", | |
| server_port=7860, | |
| share=True, | |
| show_error=True, | |
| show_api=False | |
| ) | |