HectorPortillaGomez commited on
Commit
742a1e3
·
1 Parent(s): f7c9e17
Files changed (2) hide show
  1. gradio_interface.py +83 -308
  2. mcp_server_core.py +164 -354
gradio_interface.py CHANGED
@@ -1,323 +1,98 @@
 
1
  """
2
- Interfaz Gradio para el Servidor MCP
3
- Proporciona una interfaz web para interactuar con las herramientas MCP
 
 
 
 
 
 
 
 
 
 
 
 
 
 
4
  """
5
 
6
- import gradio as gr
 
7
  import json
8
- from typing import Dict, List, Any
9
- from mcp_server_core import get_mcp_server, MCPTool
 
 
 
 
 
 
 
10
 
11
 
12
  class GradioMCPInterface:
13
- """Interfaz Gradio para el servidor MCP"""
14
-
15
- def __init__(self):
16
- self.mcp_server = get_mcp_server()
17
-
18
- def format_tools_list(self, tools: List[Dict[str, Any]]) -> str:
19
- """Formatea la lista de herramientas para mostrar"""
20
- if not tools:
21
- return " No hay herramientas disponibles."
22
-
23
- formatted = "## 🛠️ Herramientas Disponibles\n\n"
24
- formatted += f"**Total:** {len(tools)} herramientas\n\n"
25
-
26
- # Agrupar por categoría
27
- categories = {}
28
- for tool in tools:
29
- cat = tool.get('category', 'general')
30
- if cat not in categories:
31
- categories[cat] = []
32
- categories[cat].append(tool)
33
-
34
- for category, cat_tools in sorted(categories.items()):
35
- formatted += f"### 📂 {category.title()}\n\n"
36
- for tool in sorted(cat_tools, key=lambda x: x['name']):
37
- status = "✅" if tool.get('enabled', True) else "❌"
38
- version = tool.get('version', 'N/A')
39
- created = tool.get('created_at', 'N/A')[:10] if tool.get('created_at') else 'N/A'
40
-
41
- formatted += f"**{status} {tool['name']}** (v{version})\n"
42
- formatted += f" 📝 {tool['description']}\n"
43
- formatted += f" 📅 Creada: {created}\n\n"
44
-
45
- return formatted
46
-
47
- def format_tool_detail(self, tool_data: Dict[str, Any]) -> str:
48
- """Formatea los detalles de una herramienta específica"""
49
- if 'error' in tool_data:
50
- return f"❌ **Error:** {tool_data['error']}"
51
-
52
- formatted = f"## 🔧 {tool_data['name']}\n\n"
53
- formatted += f"**📝 Descripción:** {tool_data['description']}\n\n"
54
- formatted += f"**📂 Categoría:** {tool_data.get('category', 'N/A')}\n\n"
55
- formatted += f"**🏷️ Versión:** {tool_data.get('version', 'N/A')}\n\n"
56
-
57
- status = '✅ Habilitada' if tool_data.get('enabled', True) else '❌ Deshabilitada'
58
- formatted += f"**⚡ Estado:** {status}\n\n"
59
-
60
- created = tool_data.get('created_at', 'N/A')
61
- if created != 'N/A':
62
- formatted += f"**📅 Creada:** {created[:19].replace('T', ' ')}\n\n"
63
-
64
- # Mostrar esquema de entrada
65
- if 'input_schema' in tool_data:
66
- formatted += "### 📥 Esquema de Entrada\n\n"
67
-
68
- # Mostrar propiedades de forma más legible
69
- schema = tool_data['input_schema']
70
- if 'properties' in schema:
71
- formatted += "**Parámetros:**\n\n"
72
- for prop_name, prop_info in schema['properties'].items():
73
- required = prop_name in schema.get('required', [])
74
- req_text = "*(requerido)*" if required else "*(opcional)*"
75
- prop_type = prop_info.get('type', 'unknown')
76
-
77
- formatted += f"- **{prop_name}** {req_text}: `{prop_type}`\n"
78
-
79
- if 'description' in prop_info:
80
- formatted += f" - {prop_info['description']}\n"
81
-
82
- if 'examples' in prop_info:
83
- examples = ', '.join([f"`{ex}`" for ex in prop_info['examples'][:3]])
84
- formatted += f" - Ejemplos: {examples}\n"
85
-
86
- if 'enum' in prop_info:
87
- values = ', '.join([f"`{val}`" for val in prop_info['enum']])
88
- formatted += f" - Valores válidos: {values}\n"
89
-
90
- formatted += "\n"
91
-
92
- formatted += "\n**Esquema JSON completo:**\n\n"
93
- formatted += f"```json\n{json.dumps(schema, indent=2, ensure_ascii=False)}\n```\n\n"
94
-
95
- return formatted
96
-
97
- def format_execution_result(self, result: Dict[str, Any]) -> str:
98
- """Formatea el resultado de ejecución de una herramienta"""
99
- if 'error' in result:
100
- return f"❌ **Error:** {result['error']}"
101
-
102
- formatted = "## ✅ Resultado de Ejecución\n\n"
103
-
104
- # Formatear según el tipo de resultado
105
- for key, value in result.items():
106
- if key == 'error':
107
- continue
108
-
109
- formatted += f"**{key.replace('_', ' ').title()}:** "
110
-
111
- if isinstance(value, (dict, list)):
112
- formatted += f"\n```json\n{json.dumps(value, indent=2, ensure_ascii=False)}\n```\n\n"
113
- elif isinstance(value, str) and len(value) > 50:
114
- formatted += f"\n```\n{value}\n```\n\n"
115
- else:
116
- formatted += f"`{value}`\n\n"
117
-
118
- return formatted
119
-
120
- def list_all_tools(self):
121
- """Lista todas las herramientas disponibles"""
122
- tools = self.mcp_server.list_tools()
123
- return self.format_tools_list(tools)
124
-
125
- def get_tool_details(self, tool_name: str):
126
- """Obtiene detalles de una herramienta específica"""
127
- if not tool_name.strip():
128
- return "⚠️ Por favor, ingresa el nombre de una herramienta."
129
-
130
- tool_data = self.mcp_server.get_tool(tool_name.strip())
131
- return self.format_tool_detail(tool_data)
132
-
133
- def list_tools_by_category(self, category: str):
134
- """Lista herramientas por categoría"""
135
- if not category.strip():
136
- return "⚠️ Por favor, selecciona una categoría."
137
-
138
- tools = self.mcp_server.get_tools_by_category(category.strip())
139
- return self.format_tools_list(tools)
140
-
141
- def show_server_info(self):
142
- """Muestra información del servidor"""
143
- info = self.mcp_server.get_server_info()
144
- formatted = "## 🖥️ Información del Servidor MCP\n\n"
145
-
146
- formatted += f"**📛 Nombre:** {info['name']}\n\n"
147
- formatted += f"**🏷️ Versión:** {info['version']}\n\n"
148
- formatted += f"**📋 Versión del Protocolo:** {info['protocol_version']}\n\n"
149
- formatted += f"**📅 Creado:** {info.get('created_at', 'N/A')[:19].replace('T', ' ')}\n\n"
150
-
151
- # Estadísticas
152
- if 'stats' in info:
153
- stats = info['stats']
154
- formatted += "### 📊 Estadísticas\n\n"
155
- formatted += f"- **Total de herramientas:** {stats['total_tools']}\n"
156
- formatted += f"- **Herramientas habilitadas:** {stats['enabled_tools']}\n"
157
- formatted += f"- **Categorías:** {stats['categories_count']}\n"
158
- formatted += f"- **Lista de categorías:** {', '.join(stats['categories'])}\n\n"
159
-
160
- # Capacidades
161
- formatted += "### ⚙️ Capacidades del Servidor\n\n"
162
- formatted += f"```json\n{json.dumps(info['capabilities'], indent=2, ensure_ascii=False)}\n```\n\n"
163
-
164
- return formatted
165
-
166
- def execute_tool(self, tool_name: str, params_json: str):
167
- """Ejecuta una herramienta con parámetros JSON"""
168
  if not tool_name.strip():
169
- return "⚠️ Por favor, especifica el nombre de la herramienta."
170
-
171
  try:
172
- # Parsear parámetros JSON
173
- if params_json.strip():
174
- params = json.loads(params_json)
175
- else:
176
- params = {}
177
-
178
- # Ejecutar herramienta
179
- result = self.mcp_server.execute_tool(tool_name.strip(), **params)
180
- return self.format_execution_result(result)
181
-
182
- except json.JSONDecodeError as e:
183
- return f"❌ **Error en JSON:** {str(e)}\n\nVerifica que el formato JSON sea válido."
184
- except Exception as e:
185
- return f"❌ **Error:** {str(e)}"
186
-
187
- def toggle_tool_status(self, tool_name: str):
188
- """Cambia el estado de una herramienta"""
189
- if not tool_name.strip():
190
- return "⚠️ Por favor, especifica el nombre de la herramienta."
191
-
192
- result = self.mcp_server.toggle_tool(tool_name.strip())
193
-
194
- if 'error' in result:
195
- return f"❌ **Error:** {result['error']}"
196
- else:
197
- status_emoji = "✅" if result['enabled'] else "❌"
198
- return f"{status_emoji} **Herramienta '{result['tool']}'** ahora está **{result['status']}**"
199
-
200
- def get_available_tools_list(self):
201
- """Obtiene lista de nombres de herramientas para dropdowns"""
202
- tools = self.mcp_server.list_tools()
203
- return [tool['name'] for tool in tools]
204
-
205
- def get_categories_list(self):
206
- """Obtiene lista de categorías para dropdowns"""
207
- return self.mcp_server.get_categories()
208
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
209
 
210
- def create_gradio_interface():
211
- """Crea y configura la interfaz Gradio"""
212
-
213
- # Crear instancia de la interfaz
214
- interface = GradioMCPInterface()
215
-
216
- # Configurar tema personalizado
217
- theme = gr.themes.Soft(
218
- primary_hue="blue",
219
- secondary_hue="cyan",
220
- neutral_hue="slate",
221
- )
222
-
223
- # Crear la interfaz principal
224
- with gr.Blocks(
225
- title="🛠️ Servidor MCP - Gestor de Herramientas",
226
- theme=theme,
227
- css="""
228
- .gradio-container {
229
- max-width: 1200px !important;
230
- }
231
- .tool-card {
232
- border: 1px solid #e5e7eb;
233
- border-radius: 8px;
234
- padding: 16px;
235
- margin: 8px 0;
236
- }
237
- """
238
- ) as demo:
239
-
240
- # Header
241
- gr.Markdown("""
242
- # 🛠️ Servidor MCP - Gestor de Herramientas
243
-
244
- **Model Context Protocol (MCP) Server** - Interfaz web para gestionar y ejecutar herramientas del protocolo MCP.
245
-
246
- ---
247
- """)
248
-
249
- with gr.Tabs():
250
- # Pestaña: Listar todas las herramientas
251
- with gr.Tab("📋 Herramientas", id="tools_list"):
252
- gr.Markdown("### Lista completa de herramientas disponibles")
253
-
254
- with gr.Row():
255
- list_btn = gr.Button("🔄 Actualizar Lista", variant="primary", scale=1)
256
- gr.Markdown("") # Spacer
257
-
258
- tools_output = gr.Markdown(value="Cargando herramientas...")
259
- list_btn.click(interface.list_all_tools, outputs=tools_output)
260
-
261
- # Pestaña: Detalles de la herramienta
262
- with gr.Tab("🔧 Detalles de la Herramienta"):
263
- gr.Markdown("### Obtén detalles de una herramienta específica")
264
-
265
- with gr.Row():
266
- tool_name_input = gr.Textbox(label="Nombre de la Herramienta", scale=3)
267
- get_details_btn = gr.Button("🔍 Obtener Detalles", variant="primary", scale=1)
268
-
269
- tool_details_output = gr.Markdown(value="")
270
- get_details_btn.click(interface.get_tool_details, inputs=tool_name_input, outputs=tool_details_output)
271
-
272
- # Pestaña: Ejecutar Herramienta
273
- with gr.Tab("💻 Ejecutar Herramienta"):
274
- gr.Markdown("### Ejecuta una herramienta con parámetros")
275
-
276
- with gr.Row():
277
- tool_name_input = gr.Textbox(label="Nombre de la Herramienta", scale=3)
278
- params_input = gr.Textbox(label="Parámetros JSON", lines=3, scale=3)
279
- execute_btn = gr.Button("💻 Ejecutar", variant="primary", scale=1)
280
-
281
- execution_output = gr.Markdown(value="")
282
- execute_btn.click(interface.execute_tool, inputs=[tool_name_input, params_input], outputs=execution_output)
283
-
284
- # Pestaña: Información del Servidor
285
- with gr.Tab("🖥️ Información del Servidor"):
286
- gr.Markdown("### Información general del servidor MCP")
287
-
288
- with gr.Row():
289
- server_info_btn = gr.Button("🔄 Actualizar Información", variant="primary", scale=1)
290
- gr.Markdown("") # Spacer
291
-
292
- server_info_output = gr.Markdown(value="")
293
- server_info_btn.click(interface.show_server_info, outputs=server_info_output)
294
-
295
- # Pestaña: Herramientas por Categoría
296
- with gr.Tab("📂 Herramientas por Categoría"):
297
- gr.Markdown("### Lista de herramientas por categoría")
298
-
299
- with gr.Row():
300
- category_input = gr.Dropdown(label="Categoría", choices=interface.get_categories_list(), scale=3)
301
- list_by_category_btn = gr.Button("🔄 Actualizar Lista", variant="primary", scale=1)
302
-
303
- tools_by_category_output = gr.Markdown(value="")
304
- list_by_category_btn.click(interface.list_tools_by_category, inputs=category_input, outputs=tools_by_category_output)
305
-
306
- # Pestaña: Cambiar Estado de la Herramienta
307
- with gr.Tab("🔌 Cambiar Estado de la Herramienta"):
308
- gr.Markdown("### Cambia el estado de una herramienta")
309
-
310
- with gr.Row():
311
- tool_name_input = gr.Textbox(label="Nombre de la Herramienta", scale=3)
312
- toggle_status_btn = gr.Button("🔄 Cambiar Estado", variant="primary", scale=1)
313
-
314
- toggle_status_output = gr.Markdown(value="")
315
- toggle_status_btn.click(interface.toggle_tool_status, inputs=tool_name_input, outputs=toggle_status_output)
316
-
317
  return demo
318
 
319
 
320
- if __name__ == "__main__":
321
- ui = create_gradio_interface()
322
- ui.launch()
323
 
 
 
 
 
 
1
+ # gradio_interface.py
2
  """
3
+ Gradio front‑end for the MCP server defined in `mcp_server_core.py`.
4
+
5
+ Launches a tiny admin UI **and** exposes a fully‑compliant MCP/SSE endpoint
6
+ at `/gradio_api/mcp/sse` thanks to `launch(mcp_server=True)`.
7
+
8
+ Usage
9
+ -----
10
+ pip install gradio>=4.25 simpleeval # ensure dependencies
11
+ python gradio_interface.py # open UI + MCP endpoint
12
+
13
+ Notes
14
+ -----
15
+ * Handlers are declared `async` so that long‑running tools do not block the
16
+ Gradio event loop (they internally rely on the async helpers in the core).
17
+ * The UI itself is intentionally minimal; extend it with extra tabs or widgets
18
+ without touching `mcp_server_core.py`.
19
  """
20
 
21
+ from __future__ import annotations
22
+
23
  import json
24
+ from typing import Any
25
+
26
+ import gradio as gr
27
+
28
+ from mcp_server_core import get_server
29
+
30
+ # ---------------------------------------------------------------------------
31
+ # Wrapper class that bridges Gradio events ↔ MCPServer methods
32
+ # ---------------------------------------------------------------------------
33
 
34
 
35
  class GradioMCPInterface:
36
+ def __init__(self) -> None:
37
+ self.server = get_server()
38
+
39
+ # ----------------- Utility helpers -----------------
40
+
41
+ @staticmethod
42
+ def _md(obj: Any, lang: str = "json") -> str:
43
+ """Return a fenced code block for pretty Markdown rendering."""
44
+ return f"```{lang}\n{json.dumps(obj, indent=2, ensure_ascii=False)}\n```"
45
+
46
+ # ----------------- Event handlers (async) -----------------
47
+
48
+ async def run_tool(self, tool_name: str, params_json: str) -> str:
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
49
  if not tool_name.strip():
50
+ return "⚠️ Debes indicar una herramienta."
 
51
  try:
52
+ params = json.loads(params_json) if params_json.strip() else {}
53
+ except json.JSONDecodeError as exc:
54
+ return f"❌ JSON mal formado: {exc}"
55
+
56
+ result = await self.server.execute_tool_async(tool_name.strip(), **params)
57
+ if "error" in result:
58
+ return f"❌ {result['error']}"
59
+ return self._md(result)
60
+
61
+ def list_tools(self) -> str:
62
+ return self._md(self.server.list_tools())
63
+
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
64
 
65
+ # ---------------------------------------------------------------------------
66
+ # Build Gradio Blocks UI
67
+ # ---------------------------------------------------------------------------
68
+
69
+ def build_ui() -> gr.Blocks: # noqa: D401 – not a public library function
70
+ iface = GradioMCPInterface()
71
+
72
+ theme = gr.themes.Soft()
73
+ with gr.Blocks(title="🛠️ MCP Admin", theme=theme) as demo:
74
+ gr.Markdown("# 🛠️ MCP Server (async + safe eval)")
75
+
76
+ with gr.Tab("Ejecutar herramienta"):
77
+ name_box = gr.Textbox(label="Nombre de la herramienta", placeholder="calculator")
78
+ params_box = gr.Textbox(label="Parámetros JSON", lines=3)
79
+ run_btn = gr.Button("Ejecutar", variant="primary")
80
+ out_md = gr.Markdown()
81
+ run_btn.click(iface.run_tool, inputs=[name_box, params_box], outputs=out_md)
82
+
83
+ with gr.Tab("Lista de herramientas"):
84
+ refresh_btn = gr.Button("Refrescar lista")
85
+ out_list = gr.Markdown()
86
+ refresh_btn.click(iface.list_tools, outputs=out_list)
87
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
88
  return demo
89
 
90
 
91
+ # ---------------------------------------------------------------------------
92
+ # Entry point
93
+ # ---------------------------------------------------------------------------
94
 
95
+ if __name__ == "__main__":
96
+ ui = build_ui()
97
+ # 👇 This flag automatically adds the `/gradio_api/mcp/sse` endpoint.
98
+ ui.launch(server_name="0.0.0.0", mcp_server=True)
mcp_server_core.py CHANGED
@@ -1,428 +1,238 @@
 
1
  """
2
- Servidor MCP (Model Context Protocol) - Módulo Core
3
- Gestiona herramientas y capacidades del protocolo MCP
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
4
  """
5
 
6
- import json
7
- from typing import Dict, List, Any, Optional
8
- from dataclasses import dataclass, asdict
9
- from datetime import datetime
10
- import uuid
11
  import re
12
- import math
 
 
 
 
 
 
 
 
 
13
 
14
 
15
  @dataclass
16
  class MCPTool:
17
- """Representa una herramienta MCP"""
 
18
  name: str
19
  description: str
20
  input_schema: Dict[str, Any]
21
  category: str = "general"
22
  version: str = "1.0.0"
23
  enabled: bool = True
24
- created_at: Optional[str] = None
25
-
26
- def __post_init__(self):
27
  if self.created_at is None:
28
- self.created_at = datetime.now().isoformat()
 
 
 
 
 
29
 
30
 
31
  class MCPToolExecutor:
32
- """Ejecutor de herramientas MCP - Implementa la lógica de cada herramienta"""
33
-
34
  @staticmethod
35
  def execute_calculator(expression: str) -> Dict[str, Any]:
36
- """Ejecuta operaciones matemáticas básicas"""
 
 
 
 
 
 
 
 
 
 
 
37
  try:
38
- # Sanitizar la expresión para seguridad
39
- allowed_chars = set('0123456789+-*/.() ')
40
- if not all(c in allowed_chars for c in expression):
41
- return {"error": "Expresión contiene caracteres no permitidos"}
42
-
43
- result = eval(expression)
44
  return {
45
- "result": result,
46
  "expression": expression,
47
- "type": type(result).__name__
 
48
  }
49
- except Exception as e:
50
- return {"error": f"Error en cálculo: {str(e)}"}
51
-
52
  @staticmethod
53
  def execute_datetime(format_str: str = "%Y-%m-%d %H:%M:%S", timezone: str = "UTC") -> Dict[str, Any]:
54
- """Obtiene información de fecha y hora"""
55
- try:
56
- now = datetime.now()
57
- return {
58
- "timestamp": now.timestamp(),
59
- "formatted": now.strftime(format_str),
60
- "iso_format": now.isoformat(),
61
- "timezone": timezone,
62
- "weekday": now.strftime("%A"),
63
- "year": now.year,
64
- "month": now.month,
65
- "day": now.day,
66
- "hour": now.hour,
67
- "minute": now.minute,
68
- "second": now.second
69
- }
70
- except Exception as e:
71
- return {"error": f"Error obteniendo fecha: {str(e)}"}
72
-
73
  @staticmethod
74
  def execute_email_validator(email: str) -> Dict[str, Any]:
75
- """Valida formato de email"""
76
- email_pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
77
- is_valid = bool(re.match(email_pattern, email))
78
-
79
  return {
80
  "email": email,
81
  "is_valid": is_valid,
82
- "has_at": "@" in email,
83
- "has_dot": "." in email,
84
- "length": len(email),
85
- "domain": email.split("@")[-1] if "@" in email else None
86
  }
87
-
88
  @staticmethod
89
  def execute_text_generator(text_type: str, length: int = 100) -> Dict[str, Any]:
90
- """Genera diferentes tipos de texto"""
91
- try:
92
- if text_type == "lorem":
93
- lorem_words = [
94
- "lorem", "ipsum", "dolor", "sit", "amet", "consectetur", "adipiscing", "elit",
95
- "sed", "do", "eiusmod", "tempor", "incididunt", "ut", "labore", "et", "dolore",
96
- "magna", "aliqua", "enim", "ad", "minim", "veniam", "quis", "nostrud"
97
- ]
98
- words_needed = min(length // 6, len(lorem_words) * 4) # Aproximación
99
- text = " ".join((lorem_words * ((words_needed // len(lorem_words)) + 1))[:words_needed])
100
- return {"text": text[:length], "type": "lorem", "length": len(text[:length])}
101
-
102
- elif text_type == "uuid":
103
- generated_uuid = str(uuid.uuid4())
104
- return {"text": generated_uuid, "type": "uuid", "length": len(generated_uuid)}
105
-
106
- elif text_type == "random":
107
- import random
108
- import string
109
- chars = string.ascii_letters + string.digits + " "
110
- text = "".join(random.choice(chars) for _ in range(length))
111
- return {"text": text, "type": "random", "length": len(text)}
112
-
113
- else:
114
- return {"error": f"Tipo de texto '{text_type}' no soportado"}
115
-
116
- except Exception as e:
117
- return {"error": f"Error generando texto: {str(e)}"}
118
-
119
- @staticmethod
120
- def execute_unit_converter(value: float, from_unit: str, to_unit: str) -> Dict[str, Any]:
121
- """Convierte entre unidades"""
122
- try:
123
- # Conversiones de temperatura
124
- temp_conversions = {
125
- ("celsius", "fahrenheit"): lambda x: x * 9/5 + 32,
126
- ("fahrenheit", "celsius"): lambda x: (x - 32) * 5/9,
127
- ("celsius", "kelvin"): lambda x: x + 273.15,
128
- ("kelvin", "celsius"): lambda x: x - 273.15,
129
- ("fahrenheit", "kelvin"): lambda x: (x - 32) * 5/9 + 273.15,
130
- ("kelvin", "fahrenheit"): lambda x: (x - 273.15) * 9/5 + 32,
131
- }
132
-
133
- # Conversiones de distancia (a metros como base)
134
- distance_to_meters = {
135
- "meters": 1, "kilometers": 1000, "centimeters": 0.01,
136
- "millimeters": 0.001, "inches": 0.0254, "feet": 0.3048,
137
- "yards": 0.9144, "miles": 1609.34
138
- }
139
-
140
- # Conversiones de peso (a kilogramos como base)
141
- weight_to_kg = {
142
- "kilograms": 1, "grams": 0.001, "pounds": 0.453592,
143
- "ounces": 0.0283495, "tons": 1000
144
- }
145
-
146
- from_unit = from_unit.lower()
147
- to_unit = to_unit.lower()
148
-
149
- # Temperatura
150
- if (from_unit, to_unit) in temp_conversions:
151
- result = temp_conversions[(from_unit, to_unit)](value)
152
- return {
153
- "original_value": value,
154
- "converted_value": round(result, 6),
155
- "from_unit": from_unit,
156
- "to_unit": to_unit,
157
- "conversion_type": "temperature"
158
- }
159
-
160
- # Distancia
161
- elif from_unit in distance_to_meters and to_unit in distance_to_meters:
162
- meters = value * distance_to_meters[from_unit]
163
- result = meters / distance_to_meters[to_unit]
164
- return {
165
- "original_value": value,
166
- "converted_value": round(result, 6),
167
- "from_unit": from_unit,
168
- "to_unit": to_unit,
169
- "conversion_type": "distance"
170
- }
171
-
172
- # Peso
173
- elif from_unit in weight_to_kg and to_unit in weight_to_kg:
174
- kg = value * weight_to_kg[from_unit]
175
- result = kg / weight_to_kg[to_unit]
176
- return {
177
- "original_value": value,
178
- "converted_value": round(result, 6),
179
- "from_unit": from_unit,
180
- "to_unit": to_unit,
181
- "conversion_type": "weight"
182
- }
183
-
184
- else:
185
- available_units = list(set(list(distance_to_meters.keys()) +
186
- list(weight_to_kg.keys()) +
187
- ["celsius", "fahrenheit", "kelvin"]))
188
- return {
189
- "error": f"Conversión no soportada: {from_unit} -> {to_unit}",
190
- "available_units": available_units
191
- }
192
-
193
- except Exception as e:
194
- return {"error": f"Error en conversión: {str(e)}"}
195
 
196
 
197
  class MCPServer:
198
- """Servidor MCP que gestiona herramientas y su ejecución"""
199
-
200
- def __init__(self):
201
- self.tools = self._initialize_tools()
202
  self.executor = MCPToolExecutor()
203
  self.server_info = {
204
- "name": "Gradio MCP Server",
205
- "version": "1.0.0",
206
  "protocol_version": "2024-11-05",
207
- "capabilities": {
208
- "tools": {"listChanged": True},
209
- "resources": {},
210
- "prompts": {}
211
- },
212
- "created_at": datetime.now().isoformat()
213
  }
214
-
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
215
  def _initialize_tools(self) -> Dict[str, MCPTool]:
216
- """Inicializa las herramientas disponibles"""
217
- tools = {}
218
-
219
- # Herramienta de cálculo
220
  tools["calculator"] = MCPTool(
221
  name="calculator",
222
- description="Realiza operaciones matemáticas básicas (+, -, *, /, parentesis)",
223
  input_schema={
224
  "type": "object",
225
- "properties": {
226
- "expression": {
227
- "type": "string",
228
- "description": "Expresión matemática a evaluar (ej: '2 + 3 * 4')",
229
- "examples": ["2 + 3", "10 * (5 - 2)", "100 / 4"]
230
- }
231
- },
232
- "required": ["expression"]
233
  },
234
- category="matemáticas"
235
  )
236
-
237
- # Herramienta de fecha/hora
238
  tools["datetime"] = MCPTool(
239
  name="datetime",
240
- description="Obtiene información completa de fecha y hora actual del sistema",
241
  input_schema={
242
  "type": "object",
243
  "properties": {
244
- "format": {
245
- "type": "string",
246
- "description": "Formato de fecha personalizado (opcional)",
247
- "default": "%Y-%m-%d %H:%M:%S",
248
- "examples": ["%Y-%m-%d", "%H:%M:%S", "%A, %B %d, %Y"]
249
- },
250
- "timezone": {
251
- "type": "string",
252
- "description": "Zona horaria (solo informativo, no afecta el cálculo)",
253
- "default": "UTC"
254
- }
255
  },
256
- "required": []
257
  },
258
- category="utilidades"
259
  )
260
-
261
- # Herramienta de validación de email
262
  tools["email_validator"] = MCPTool(
263
  name="email_validator",
264
- description="Valida formato de email y proporciona análisis detallado",
265
  input_schema={
266
  "type": "object",
267
- "properties": {
268
- "email": {
269
- "type": "string",
270
- "description": "Dirección de email a validar",
271
- "examples": ["usuario@ejemplo.com", "test@domain.org"]
272
- }
273
- },
274
- "required": ["email"]
275
  },
276
- category="validación"
277
  )
278
-
279
- # Herramienta de generación de texto
280
  tools["text_generator"] = MCPTool(
281
  name="text_generator",
282
- description="Genera diferentes tipos de texto: Lorem Ipsum, UUIDs, o texto aleatorio",
283
  input_schema={
284
  "type": "object",
285
  "properties": {
286
- "type": {
287
- "type": "string",
288
- "enum": ["lorem", "random", "uuid"],
289
- "description": "Tipo de texto a generar"
290
- },
291
- "length": {
292
- "type": "integer",
293
- "description": "Longitud aproximada del texto (no aplica para UUID)",
294
- "minimum": 1,
295
- "maximum": 1000,
296
- "default": 100
297
- }
298
  },
299
- "required": ["type"]
300
  },
301
- category="generación"
302
  )
303
-
304
- # Herramienta de conversión de unidades
305
- tools["unit_converter"] = MCPTool(
306
- name="unit_converter",
307
- description="Convierte entre unidades de temperatura, distancia y peso",
308
- input_schema={
309
- "type": "object",
310
- "properties": {
311
- "value": {
312
- "type": "number",
313
- "description": "Valor numérico a convertir"
314
- },
315
- "from_unit": {
316
- "type": "string",
317
- "description": "Unidad de origen",
318
- "examples": ["celsius", "kilometers", "pounds", "meters", "fahrenheit"]
319
- },
320
- "to_unit": {
321
- "type": "string",
322
- "description": "Unidad de destino",
323
- "examples": ["fahrenheit", "miles", "kilograms", "feet", "kelvin"]
324
- }
325
- },
326
- "required": ["value", "from_unit", "to_unit"]
327
- },
328
- category="conversión"
329
- )
330
-
331
  return tools
332
-
333
- def list_tools(self) -> List[Dict[str, Any]]:
334
- """Lista todas las herramientas disponibles"""
335
- return [asdict(tool) for tool in self.tools.values() if tool.enabled]
336
-
337
- def get_tool(self, name: str) -> Dict[str, Any]:
338
- """Obtiene información de una herramienta específica"""
339
- if name in self.tools:
340
- return asdict(self.tools[name])
341
- return {"error": f"Herramienta '{name}' no encontrada"}
342
-
343
- def get_tools_by_category(self, category: str) -> List[Dict[str, Any]]:
344
- """Obtiene herramientas por categoría"""
345
- return [asdict(tool) for tool in self.tools.values()
346
- if tool.category.lower() == category.lower() and tool.enabled]
347
-
348
- def get_categories(self) -> List[str]:
349
- """Obtiene todas las categorías disponibles"""
350
- return list(set(tool.category for tool in self.tools.values()))
351
-
352
- def get_server_info(self) -> Dict[str, Any]:
353
- """Obtiene información del servidor"""
354
- info = self.server_info.copy()
355
- info["stats"] = {
356
- "total_tools": len(self.tools),
357
- "enabled_tools": len([t for t in self.tools.values() if t.enabled]),
358
- "categories_count": len(self.get_categories()),
359
- "categories": self.get_categories()
360
- }
361
- return info
362
-
363
- def execute_tool(self, tool_name: str, **kwargs) -> Dict[str, Any]:
364
- """Ejecuta una herramienta con los parámetros dados"""
365
- if tool_name not in self.tools:
366
- return {"error": f"Herramienta '{tool_name}' no encontrada"}
367
-
368
- if not self.tools[tool_name].enabled:
369
- return {"error": f"Herramienta '{tool_name}' está deshabilitada"}
370
-
371
- try:
372
- # Mapeo de herramientas a métodos del ejecutor
373
- method_map = {
374
- "calculator": self.executor.execute_calculator,
375
- "datetime": self.executor.execute_datetime,
376
- "email_validator": self.executor.execute_email_validator,
377
- "text_generator": self.executor.execute_text_generator,
378
- "unit_converter": self.executor.execute_unit_converter
379
- }
380
-
381
- if tool_name in method_map:
382
- return method_map[tool_name](**kwargs)
383
- else:
384
- return {"error": f"Ejecutor para '{tool_name}' no implementado"}
385
-
386
- except Exception as e:
387
- return {"error": f"Error ejecutando herramienta: {str(e)}"}
388
-
389
- def toggle_tool(self, tool_name: str) -> Dict[str, Any]:
390
- """Habilita/deshabilita una herramienta"""
391
- if tool_name not in self.tools:
392
- return {"error": f"Herramienta '{tool_name}' no encontrada"}
393
-
394
- self.tools[tool_name].enabled = not self.tools[tool_name].enabled
395
- status = "habilitada" if self.tools[tool_name].enabled else "deshabilitada"
396
-
397
- return {
398
- "tool": tool_name,
399
- "status": status,
400
- "enabled": self.tools[tool_name].enabled
401
- }
402
-
403
- def add_tool(self, tool: MCPTool) -> Dict[str, Any]:
404
- """Agrega una nueva herramienta al servidor"""
405
- if tool.name in self.tools:
406
- return {"error": f"Herramienta '{tool.name}' ya existe"}
407
-
408
- self.tools[tool.name] = tool
409
- return {"message": f"Herramienta '{tool.name}' agregada exitosamente"}
410
-
411
- def remove_tool(self, tool_name: str) -> Dict[str, Any]:
412
- """Elimina una herramienta del servidor"""
413
- if tool_name not in self.tools:
414
- return {"error": f"Herramienta '{tool_name}' no encontrada"}
415
-
416
- del self.tools[tool_name]
417
- return {"message": f"Herramienta '{tool_name}' eliminada exitosamente"}
418
 
419
 
420
- # Instancia global del servidor (singleton)
421
- _mcp_server_instance = None
 
 
 
422
 
423
- def get_mcp_server() -> MCPServer:
424
- """Obtiene la instancia singleton del servidor MCP"""
425
- global _mcp_server_instance
426
- if _mcp_server_instance is None:
427
- _mcp_server_instance = MCPServer()
428
- return _mcp_server_instance
 
1
+ # mcp_server_core.py
2
  """
3
+ Core logic for an MCP server, separated from any user‑interface concerns.
4
+
5
+ Features implemented (reflecting your requested improvements):
6
+ 1. **Safe evaluation** in the `calculator` tool via `simpleeval.simple_eval` plus a light
7
+ regex whitelist for allowed characters (digits, operators, spaces and parentheses).
8
+ 2. **Asynchronous execution** helper (`execute_tool_async`) that delegates the real work
9
+ to a background thread with `asyncio.to_thread`, so long‑running tools never block
10
+ an async‑capable web server.
11
+ 3. A clean **MCPServer** class that registers tools, provides `list_tools`, and performs
12
+ guarded look‑ups before execution.
13
+
14
+ Import this module from your Gradio (or FastAPI) front‑end and call
15
+ `get_server()` to obtain the singleton instance.
16
+
17
+ Dependencies
18
+ ------------
19
+ pip install simpleeval>=0.9.15
20
  """
21
 
22
+ from __future__ import annotations
23
+
24
+ import asyncio
 
 
25
  import re
26
+ import uuid
27
+ from dataclasses import asdict, dataclass
28
+ from datetime import datetime
29
+ from typing import Any, Dict, List
30
+
31
+ from simpleeval import simple_eval
32
+
33
+ # ---------------------------------------------------------------------------
34
+ # Data model
35
+ # ---------------------------------------------------------------------------
36
 
37
 
38
  @dataclass
39
  class MCPTool:
40
+ """Represents a single MCP tool (function) and its metadata."""
41
+
42
  name: str
43
  description: str
44
  input_schema: Dict[str, Any]
45
  category: str = "general"
46
  version: str = "1.0.0"
47
  enabled: bool = True
48
+ created_at: str | None = None
49
+
50
+ def __post_init__(self) -> None: # noqa: D401 – short description already in docstring
51
  if self.created_at is None:
52
+ self.created_at = datetime.utcnow().isoformat()
53
+
54
+
55
+ # ---------------------------------------------------------------------------
56
+ # Executors (business logic)
57
+ # ---------------------------------------------------------------------------
58
 
59
 
60
  class MCPToolExecutor:
61
+ """Concrete implementations for each built‑in tool."""
62
+
63
  @staticmethod
64
  def execute_calculator(expression: str) -> Dict[str, Any]:
65
+ """Safely evaluate basic arithmetic expressions without `eval()`.
66
+
67
+ Only digits, operators (+‑*/), periods, parentheses and whitespace are allowed.
68
+ """
69
+
70
+ if not re.fullmatch(r"[0-9+\-*/().\s]+", expression):
71
+ return {
72
+ "error": (
73
+ "La expresión contiene caracteres no permitidos. "+
74
+ "Solo se aceptan dígitos, operadores (+‑*/), puntos y paréntesis."
75
+ )
76
+ }
77
  try:
78
+ result = simple_eval(expression)
 
 
 
 
 
79
  return {
 
80
  "expression": expression,
81
+ "result": result,
82
+ "type": type(result).__name__,
83
  }
84
+ except Exception as exc: # pylint: disable=broad-except – controlled boundary
85
+ return {"error": f"Error evaluando la expresión: {exc}"}
86
+
87
  @staticmethod
88
  def execute_datetime(format_str: str = "%Y-%m-%d %H:%M:%S", timezone: str = "UTC") -> Dict[str, Any]:
89
+ now = datetime.utcnow()
90
+ return {
91
+ "timestamp": now.timestamp(),
92
+ "formatted": now.strftime(format_str),
93
+ "iso_format": now.isoformat(),
94
+ "timezone": timezone,
95
+ }
96
+
 
 
 
 
 
 
 
 
 
 
 
97
  @staticmethod
98
  def execute_email_validator(email: str) -> Dict[str, Any]:
99
+ pattern = r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$"
100
+ is_valid = bool(re.match(pattern, email))
 
 
101
  return {
102
  "email": email,
103
  "is_valid": is_valid,
104
+ "domain": email.split("@")[-1] if "@" in email else None,
 
 
 
105
  }
106
+
107
  @staticmethod
108
  def execute_text_generator(text_type: str, length: int = 100) -> Dict[str, Any]:
109
+ if text_type == "uuid":
110
+ generated_uuid = str(uuid.uuid4())
111
+ return {"text": generated_uuid, "type": "uuid", "length": len(generated_uuid)}
112
+ if text_type == "random":
113
+ import random, string # local import keeps optional deps light
114
+
115
+ chars = string.ascii_letters + string.digits + " "
116
+ txt = "".join(random.choice(chars) for _ in range(length))
117
+ return {"text": txt, "type": "random", "length": len(txt)}
118
+ if text_type == "lorem":
119
+ lorem = (
120
+ "lorem ipsum dolor sit amet consectetur adipiscing elit sed do eiusmod tempor "
121
+ "incididunt ut labore et dolore magna aliqua"
122
+ )
123
+ repeated = " ".join([lorem] * ((length // len(lorem.split())) + 1))
124
+ return {"text": repeated[:length], "type": "lorem", "length": length}
125
+ return {"error": f"Tipo de texto '{text_type}' no soportado"}
126
+
127
+
128
+ # ---------------------------------------------------------------------------
129
+ # MCP Server façade
130
+ # ---------------------------------------------------------------------------
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
131
 
132
 
133
  class MCPServer:
134
+ """Tool registry, routing and async helpers."""
135
+
136
+ def __init__(self) -> None:
137
+ self.tools: Dict[str, MCPTool] = self._initialize_tools()
138
  self.executor = MCPToolExecutor()
139
  self.server_info = {
140
+ "name": "Gradio MCP Server Core",
141
+ "version": "1.1.0",
142
  "protocol_version": "2024-11-05",
143
+ "created_at": datetime.utcnow().isoformat(),
 
 
 
 
 
144
  }
145
+
146
+ # -------------------- Public API --------------------
147
+
148
+ def list_tools(self) -> List[Dict[str, Any]]:
149
+ """Return enabled tools as serialisable dicts."""
150
+ return [asdict(tool) for tool in self.tools.values() if tool.enabled]
151
+
152
+ def execute_tool(self, tool_name: str, **kwargs: Any) -> Dict[str, Any]:
153
+ """Synchronous execution (will call a *pure* function).
154
+
155
+ Raises no exceptions; always returns a JSON‑serialisable dict.
156
+ """
157
+ if tool_name not in self.tools:
158
+ return {"error": f"Herramienta '{tool_name}' no encontrada"}
159
+ if not self.tools[tool_name].enabled:
160
+ return {"error": f"Herramienta '{tool_name}' deshabilitada"}
161
+
162
+ mapping = {
163
+ "calculator": self.executor.execute_calculator,
164
+ "datetime": self.executor.execute_datetime,
165
+ "email_validator": self.executor.execute_email_validator,
166
+ "text_generator": self.executor.execute_text_generator,
167
+ }
168
+ if tool_name not in mapping:
169
+ return {"error": f"Ejecutor para '{tool_name}' no implementado"}
170
+ return mapping[tool_name](**kwargs)
171
+
172
+ async def execute_tool_async(self, tool_name: str, **kwargs: Any) -> Dict[str, Any]:
173
+ """Non‑blocking wrapper that runs the sync code in a thread."""
174
+ return await asyncio.to_thread(self.execute_tool, tool_name, **kwargs)
175
+
176
+ # ------------------- Internals -------------------
177
+
178
  def _initialize_tools(self) -> Dict[str, MCPTool]:
179
+ """Register the built‑in tools. Add more here or load dynamically."""
180
+ tools: Dict[str, MCPTool] = {}
 
 
181
  tools["calculator"] = MCPTool(
182
  name="calculator",
183
+ description="Realiza operaciones matemáticas básicas (+, -, *, /).",
184
  input_schema={
185
  "type": "object",
186
+ "properties": {"expression": {"type": "string", "description": "Expresión"}},
187
+ "required": ["expression"],
 
 
 
 
 
 
188
  },
189
+ category="matemáticas",
190
  )
 
 
191
  tools["datetime"] = MCPTool(
192
  name="datetime",
193
+ description="Devuelve la fecha y hora UTC actuales en el formato indicado.",
194
  input_schema={
195
  "type": "object",
196
  "properties": {
197
+ "format_str": {"type": "string"},
198
+ "timezone": {"type": "string"},
 
 
 
 
 
 
 
 
 
199
  },
 
200
  },
 
201
  )
 
 
202
  tools["email_validator"] = MCPTool(
203
  name="email_validator",
204
+ description="Valida sintaxis de correo electrónico estándar (RFC 5322).",
205
  input_schema={
206
  "type": "object",
207
+ "properties": {"email": {"type": "string"}},
208
+ "required": ["email"],
 
 
 
 
 
 
209
  },
210
+ category="validación",
211
  )
 
 
212
  tools["text_generator"] = MCPTool(
213
  name="text_generator",
214
+ description="Genera UUID, texto aleatorio o lorem ipsum.",
215
  input_schema={
216
  "type": "object",
217
  "properties": {
218
+ "text_type": {"type": "string"},
219
+ "length": {"type": "integer"},
 
 
 
 
 
 
 
 
 
 
220
  },
221
+ "required": ["text_type"],
222
  },
 
223
  )
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
224
  return tools
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
225
 
226
 
227
+ # ---------------------------------------------------------------------------
228
+ # Singleton helper for easy import in the UI layer
229
+ # ---------------------------------------------------------------------------
230
+
231
+ _server_instance: MCPServer | None = None
232
 
233
+ def get_server() -> MCPServer:
234
+ """Return a singleton MCPServer so the same registry is shared across the app."""
235
+ global _server_instance # pylint: disable=global-statement
236
+ if _server_instance is None:
237
+ _server_instance = MCPServer()
238
+ return _server_instance