File size: 8,459 Bytes
498af49
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
import sys
import os
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))

import spacy
import tiktoken
from lemminflect import getLemma
import re
from llm_optimizer import (
    optimize_with_llm, 
    optimize_with_agent, 
    get_accurate_token_count,
    PERSONAS
)
from fastmcp import FastMCP
from pydantic import Field

# ============================================================================
# spaCy Optimizer (kept separate as it's unique to this implementation)
# ============================================================================

class AdvancedPromptOptimizer:
    def __init__(self):
        self.nlp = spacy.load("en_core_web_sm")
        self.nlp.Defaults.stop_words -= {"not", "no", "never"}
        self.tokenizer = tiktoken.get_encoding("cl100k_base")
        self.negation_words = {"not", "no", "never", "without", "except"}

    def _mask_spans(self, s):
        masks = {}
        # triple backticks
        s, _ = re.subn(r"```.*?```", lambda m: masks.setdefault(f"<CODE{len(masks)}>", m.group(0)) or list(masks.keys())[-1], s, flags=re.S)
        # inline code
        s = re.sub(r"`[^`]+`", lambda m: masks.setdefault(f"<IC{len(masks)}>", m.group(0)) or list(masks.keys())[-1], s)
        # urls
        s = re.sub(r"https?://\S+", lambda m: masks.setdefault(f"<URL{len(masks)}>", m.group(0)) or list(masks.keys())[-1], s)
        # comparators
        s = re.sub(r"\b(less than|at least|no more than)\b", lambda m: masks.setdefault(f"<CMP{len(masks)}>", m.group(0)) or list(masks.keys())[-1], s, flags=re.I)
        return s, masks

    def _unmask_spans(self, s, masks):
        for k, v in masks.items():
            s = s.replace(k, v)
        return s

    def optimize(self, prompt: str, aggressiveness: float = 0.7) -> tuple:
        """Optimize prompt with token counting"""
        masked_prompt, masks = self._mask_spans(prompt)
        optimized = self._apply_rules(masked_prompt, aggressiveness)
        optimized = self._linguistic_optimize(optimized, aggressiveness)
        optimized = self._unmask_spans(optimized, masks)
        optimized = re.sub(r"\s+", " ", optimized).strip()

        try:
            orig_tokens = len(self.tokenizer.encode(prompt))
            new_tokens = len(self.tokenizer.encode(optimized))
        except:
            orig_tokens = len(prompt.split())
            new_tokens = len(optimized.split())

        return optimized, orig_tokens, new_tokens

    def _apply_rules(self, text: str, aggressiveness: float) -> str:
        rules = [
            (r"\s{2,}", " ", 0.0),
            (r"\b(\w+)\s+\1\b", r"\1", 0.0),
            (r"\b(advantages and disadvantages)\b", "pros/cons", 0.5),
            (r"\b(in a detailed manner|in a detailed way)\b", "", 0.7),
            (r"\b(I want to|I need to|I would like to)\b", "", 0.7),
            (r"\b(for example|e\.g\.|such as|i\.e\.)\b", "e.g.", 0.8),
            (r"\b(please\s+)?(kindly\s+)?(carefully|very|extremely|really|quite)\b", "", 0.8),
            (r"\b(can you|could you|would you)\b", "", 0.9),
            (r"\b(output|provide|give|return)\s+in\s+(JSON|json)\s+format\b", "JSON:", 1.0),
        ]
        for pattern, repl, priority in rules:
            if aggressiveness >= priority:
                text = re.sub(pattern, repl, text, flags=re.IGNORECASE)
        return text

    def _linguistic_optimize(self, text: str, aggressiveness: float) -> str:
        if not text.strip():
            return text
        doc = self.nlp(text)
        out = []
        for token in doc:
            if token.text.lower() in ["deliverables:", "constraints:", "metrics:"] and token.is_sent_start:
                out.append(token.text)
                continue

            if token.pos_ in ("PUNCT", "SPACE"): continue
            if token.like_num or token.ent_type_ or token.dep_ == "neg" or token.text.lower() in self.negation_words:
                out.append(token.text)
                continue
            if token.pos_ in ("PROPN", "NUM", "NOUN", "ADJ"):
                out.append(token.text)
                continue
            if token.pos_ == "VERB":
                if aggressiveness >= 0.8:
                    lemma = getLemma(token.text, upos="VERB") or [token.lemma_]
                    out.append(lemma[0])
                else:
                    out.append(token.text)
                continue
            if token.pos_ in ("ADV", "DET", "PRON"):
                if aggressiveness < 0.6:
                    out.append(token.text)
                continue
            out.append(token.text)
        return " ".join(out)

# ============================================================================
# FastMCP Server
# ============================================================================

mcp = FastMCP("PromptOptimizer")

@mcp.tool
def optimize_prompt(
    prompt: str = Field(description="The prompt to optimize"),
    method: str = Field(default="simple", description="The optimization method to use. Can be 'simple', 'agent', or 'spacy'"),
    persona: str = Field(default="Default", description="The persona to use for LLM-based optimization"),
    aggressiveness: float = Field(default=0.7, description="The aggressiveness level for spaCy-based optimization"),
) -> str:
    """Optimizes a given prompt using various methods to reduce token count while preserving meaning."""
    # Get API keys from environment variables (passed by MCP client)
    aimlapi_key = os.getenv("AIMLAPI_API_KEY")
    tavily_key = os.getenv("TAVILY_API_KEY")
    
    if method == "simple":
        if not aimlapi_key:
            return "Error: AIMLAPI_API_KEY environment variable is required for simple optimization"
        result = optimize_with_llm(prompt, aimlapi_key, persona)
        return result
    elif method == "agent":
        if not aimlapi_key or not tavily_key:
            return "Error: Both AIMLAPI_API_KEY and TAVILY_API_KEY environment variables are required for agent-based optimization"
        result = optimize_with_agent(prompt, aimlapi_key, persona, tavily_key)
        return result
    elif method == "spacy":
        optimizer = AdvancedPromptOptimizer()
        optimized, orig_tokens, new_tokens = optimizer.optimize(prompt, aggressiveness)
        result = f"Original tokens: {orig_tokens}\nOptimized tokens: {new_tokens}\nSavings: {orig_tokens - new_tokens} tokens\n\nOptimized prompt:\n{optimized}"
        return result
    else:
        return "Error: Invalid method. Use 'simple', 'agent', or 'spacy'"

@mcp.tool
def get_available_personas() -> str:
    """Get list of available optimization personas and their descriptions."""
    return "\n".join([f"- {persona}: {desc.split('.')[0]}..." for persona, desc in PERSONAS.items()])

@mcp.tool
def count_tokens(
    text: str = Field(description="The text to count tokens for"),
    model: str = Field(default="gpt-4", description="The model to use for tokenization"),
) -> str:
    """Count tokens in text using specified model tokenizer."""
    count = get_accurate_token_count(text, model)
    return f"Token count: {count}"

@mcp.resource("config://optimization-methods")
def list_optimization_methods() -> list[str]:
    """List available optimization methods."""
    return ["simple", "agent", "spacy"]

@mcp.resource("config://personas")
def list_personas() -> list[str]:
    """List available personas for optimization."""
    return list(PERSONAS.keys())

@mcp.resource("config://persona/{persona_id}")
def fetch_persona_details(persona_id: str) -> str:
    """Get details for a specific persona."""
    if persona_id not in PERSONAS:
        raise ValueError(f"Persona with id {persona_id} not found")
    return PERSONAS[persona_id]

@mcp.prompt
def optimize_for_persona(
    text: str = Field(description="Text to optimize"),
    persona: str = Field(description="Persona to optimize for"),
) -> str:
    """Creates an optimization prompt tailored to a specific persona."""
    if persona not in PERSONAS:
        persona = "Default"
    
    return f"""
Your goal is to optimize the following text for the {persona} persona.

The text to optimize is:
<text>
{text}
</text>

Persona guidelines:
{PERSONAS[persona]}

Use the 'optimize_prompt' tool with method='simple' to optimize the text.
After optimization, respond with the optimized version and explain what changes were made.
"""

if __name__ == "__main__":
    # Run with stdio transport (default for FastMCP)
    mcp.run()