mcpOptimizer / src /mcp /mcp_server.py
anouar-bm's picture
ai
498af49
import sys
import os
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
import spacy
import tiktoken
from lemminflect import getLemma
import re
from llm_optimizer import (
optimize_with_llm,
optimize_with_agent,
get_accurate_token_count,
PERSONAS
)
#from fastmcp import FastMCP
#from mcp de claude
from mcp.server.fastmcp import FastMCP
from mcp.server.fastmcp.prompts import base
from starlette.middleware import Middleware
from starlette.middleware.cors import CORSMiddleware
from pydantic import Field
# ============================================================================
# spaCy Optimizer (kept separate as it's unique to this implementation)
# ============================================================================
class AdvancedPromptOptimizer:
def __init__(self):
self.nlp = spacy.load("en_core_web_sm")
self.nlp.Defaults.stop_words -= {"not", "no", "never"}
self.tokenizer = tiktoken.get_encoding("cl100k_base")
self.negation_words = {"not", "no", "never", "without", "except"}
def _mask_spans(self, s):
masks = {}
# triple backticks
s, n = re.subn(r"```.*?```", lambda m: masks.setdefault(f"<CODE{len(masks)}>", m.group(0)) or list(masks.keys())[-1], s, flags=re.S)
# inline code
s = re.sub(r"`[^`]+`", lambda m: masks.setdefault(f"<IC{len(masks)}>", m.group(0)) or list(masks.keys())[-1], s)
# urls
s = re.sub(r"https?://\S+", lambda m: masks.setdefault(f"<URL{len(masks)}>", m.group(0)) or list(masks.keys())[-1], s)
# comparators
s = re.sub(r"\b(less than|at least|no more than)\b", lambda m: masks.setdefault(f"<CMP{len(masks)}>", m.group(0)) or list(masks.keys())[-1], s, flags=re.I)
return s, masks
def _unmask_spans(self, s, masks):
for k, v in masks.items():
s = s.replace(k, v)
return s
def optimize(self, prompt: str, aggressiveness: float = 0.7) -> tuple:
"""Optimize prompt with token counting"""
masked_prompt, masks = self._mask_spans(prompt)
optimized = self._apply_rules(masked_prompt, aggressiveness)
optimized = self._linguistic_optimize(optimized, aggressiveness)
optimized = self._unmask_spans(optimized, masks)
optimized = re.sub(r"\s+", " ", optimized).strip()
try:
orig_tokens = len(self.tokenizer.encode(prompt))
new_tokens = len(self.tokenizer.encode(optimized))
except:
orig_tokens = len(prompt.split())
new_tokens = len(optimized.split())
return optimized, orig_tokens, new_tokens
def _apply_rules(self, text: str, aggressiveness: float) -> str:
rules = [
(r"\s{2,}", " ", 0.0),
(r"\b(\w+)\s+\1\b", r"\1", 0.0),
(r"\b(advantages and disadvantages)\b", "pros/cons", 0.5),
(r"\b(in a detailed manner|in a detailed way)\b", "", 0.7),
(r"\b(I want to|I need to|I would like to)\b", "", 0.7),
(r"\b(for example|e\.g\.|such as|i\.e\.)\b", "e.g.", 0.8),
(r"\b(please\s+)?(kindly\s+)?(carefully|very|extremely|really|quite)\b", "", 0.8),
(r"\b(can you|could you|would you)\b", "", 0.9),
(r"\b(output|provide|give|return)\s+in\s+(JSON|json)\s+format\b", "JSON:", 1.0),
]
for pattern, repl, priority in rules:
if aggressiveness >= priority:
text = re.sub(pattern, repl, text, flags=re.IGNORECASE)
return text
def _linguistic_optimize(self, text: str, aggressiveness: float) -> str:
if not text.strip():
return text
doc = self.nlp(text)
out = []
for token in doc:
if token.text.lower() in ["deliverables:", "constraints:", "metrics:"] and token.is_sent_start:
out.append(token.text)
continue
if token.pos_ in ("PUNCT", "SPACE"): continue
if token.like_num or token.ent_type_ or token.dep_ == "neg" or token.text.lower() in self.negation_words:
out.append(token.text)
continue
if token.pos_ in ("PROPN", "NUM", "NOUN", "ADJ"):
out.append(token.text)
continue
if token.pos_ == "VERB":
if aggressiveness >= 0.8:
lemma = getLemma(token.text, upos="VERB") or [token.lemma_]
out.append(lemma[0])
else:
out.append(token.text)
continue
if token.pos_ in ("ADV", "DET", "PRON"):
if aggressiveness < 0.6:
out.append(token.text)
continue
out.append(token.text)
return " ".join(out)
# ============================================================================
# MCP Server
# ============================================================================
mcp = FastMCP(name="PromptOptimizer", log_level="ERROR")
@mcp.tool(
name="optimize_prompt",
description="Optimizes a given prompt using various methods to reduce token count while preserving meaning.",
)
def optimize_prompt(
prompt: str = Field(description="The prompt to optimize"),
method: str = Field(default="simple", description="The optimization method to use. Can be 'simple', 'agent', or 'spacy'"),
persona: str = Field(default="Default", description="The persona to use for LLM-based optimization"),
api_key: str = Field(default=None, description="The API key for the LLM"),
tavily_api_key: str = Field(default=None, description="The API key for Tavily search"),
aggressiveness: float = Field(default=0.7, description="The aggressiveness level for spaCy-based optimization"),
) -> str:
if method == "simple":
if not api_key:
return "API key is required for simple optimization."
return optimize_with_llm(prompt, api_key, persona)
elif method == "agent":
if not api_key or not tavily_api_key:
return "API key and Tavily API key are required for agent-based optimization."
return optimize_with_agent(prompt, api_key, persona, tavily_api_key)
elif method == "spacy":
optimizer = AdvancedPromptOptimizer()
optimized, _, _ = optimizer.optimize(prompt, aggressiveness)
return optimized
else:
return "Invalid method. Use 'simple', 'agent', or 'spacy'."
@mcp.tool(
name="get_available_personas",
description="Get list of available optimization personas and their descriptions.",
)
def get_available_personas() -> str:
return "\n".join([f"- {persona}: {desc.split('.')[0]}..." for persona, desc in PERSONAS.items()])
@mcp.tool(
name="count_tokens",
description="Count tokens in text using specified model tokenizer.",
)
def count_tokens(
text: str = Field(description="The text to count tokens for"),
model: str = Field(default="gpt-4", description="The model to use for tokenization"),
) -> str:
count = get_accurate_token_count(text, model)
return f"Token count: {count}"
@mcp.resource("prompts://optimization", mime_type="application/json")
def list_optimization_methods() -> list[str]:
return ["simple", "agent", "spacy"]
@mcp.resource("prompts://personas", mime_type="application/json")
def list_personas() -> list[str]:
return list(PERSONAS.keys())
@mcp.resource("prompts://personas/{persona_id}", mime_type="text/plain")
def fetch_persona_details(persona_id: str) -> str:
if persona_id not in PERSONAS:
raise ValueError(f"Persona with id {persona_id} not found")
return PERSONAS[persona_id]
@mcp.prompt(
name="optimize_for_persona",
description="Creates an optimization prompt tailored to a specific persona.",
)
def optimize_for_persona(
text: str = Field(description="Text to optimize"),
persona: str = Field(description="Persona to optimize for"),
) -> list[base.Message]:
if persona not in PERSONAS:
persona = "Default"
prompt = f"""
Your goal is to optimize the following text for the {persona} persona.
The text to optimize is:
<text>
{text}
</text>
Persona guidelines:
{PERSONAS[persona]}
Use the 'optimize_prompt' tool with method='simple' to optimize the text.
After optimization, respond with the optimized version and explain what changes were made.
"""
return [base.UserMessage(prompt)]
if __name__ == "__main__":
#mcp.run(transport="http", host="127.0.0.1", port=8000, path="/mcp")
mcp.run(transport="streamable-http")