Spaces:
Sleeping
Sleeping
| """ | |
| Token Estimator MCP Server. | |
| Provides tools for simulating API integrations with cost estimation. | |
| """ | |
| import asyncio | |
| import json | |
| from typing import Any, Optional | |
| from mcp.server import Server | |
| from mcp.server.stdio import stdio_server | |
| from mcp.types import Tool, TextContent | |
| from simulators.github import GitHubSimulator | |
| from simulators.slack import SlackSimulator | |
| from simulators.linear import LinearSimulator | |
| from simulators.notion import NotionSimulator | |
| from simulators.stripe import StripeSimulator | |
| from simulators.sentry import SentrySimulator | |
| from permissions.requirements import get_required_permissions | |
| from permissions.validator import PermissionValidator | |
| from pricing.providers import calculate_cost, compare_costs | |
| from utils.session_tracker import SessionTracker | |
| from utils.credential_manager import get_credential_manager | |
| from utils.comparator import MockRealComparator | |
| from utils.token_counter import get_token_counter | |
| # Initialize server | |
| app = Server("token-estimator") | |
| # Initialize components | |
| simulators = { | |
| 'github': GitHubSimulator(), | |
| 'slack': SlackSimulator(), | |
| 'linear': LinearSimulator(), | |
| 'notion': NotionSimulator(), | |
| 'stripe': StripeSimulator(), | |
| 'sentry': SentrySimulator(), | |
| } | |
| permission_validator = PermissionValidator() | |
| session_tracker = SessionTracker() | |
| async def list_tools() -> list[Tool]: | |
| """List available MCP tools.""" | |
| return [ | |
| Tool( | |
| name="simulate_integration", | |
| description=( | |
| "Simulate an API integration (GitHub, Slack, Linear, etc.) without making real calls. " | |
| "Returns realistic mock responses based on official API specs, estimates token costs, " | |
| "and validates permissions. Use this to test your AI agent's integration logic before " | |
| "deploying to production." | |
| ), | |
| inputSchema={ | |
| "type": "object", | |
| "properties": { | |
| "service": { | |
| "type": "string", | |
| "description": "Service to integrate with", | |
| "enum": ["github", "slack", "linear", "notion", "stripe", "sentry"] | |
| }, | |
| "action": { | |
| "type": "string", | |
| "description": "Action to perform (e.g., 'get_pull_request', 'post_message')" | |
| }, | |
| "params": { | |
| "type": "object", | |
| "description": "Parameters for the action (varies by service/action)" | |
| }, | |
| "credentials": { | |
| "type": "object", | |
| "description": "Optional credentials for permission validation", | |
| "properties": { | |
| "token": {"type": "string"}, | |
| "api_key": {"type": "string"} | |
| } | |
| }, | |
| "mock": { | |
| "type": "boolean", | |
| "description": "If true (default), return mock response. If false, make real API call.", | |
| "default": True | |
| }, | |
| "model": { | |
| "type": "string", | |
| "description": "LLM model to estimate costs for", | |
| "enum": ["claude-sonnet-4", "claude-opus-4", "claude-haiku-4", "gpt-4-turbo", "gpt-4o", "gemini-pro"], | |
| "default": "claude-sonnet-4" | |
| } | |
| }, | |
| "required": ["service", "action", "params"] | |
| } | |
| ), | |
| Tool( | |
| name="validate_permissions", | |
| description=( | |
| "Validate that provided credentials have the required permissions for an action. " | |
| "This helps catch permission issues during development before they cause production failures." | |
| ), | |
| inputSchema={ | |
| "type": "object", | |
| "properties": { | |
| "service": { | |
| "type": "string", | |
| "description": "Service name", | |
| "enum": ["github", "slack", "linear", "notion", "stripe", "sentry"] | |
| }, | |
| "action": { | |
| "type": "string", | |
| "description": "Action to validate permissions for" | |
| }, | |
| "credentials": { | |
| "type": "object", | |
| "description": "Credentials to validate", | |
| "properties": { | |
| "token": {"type": "string"}, | |
| "api_key": {"type": "string"} | |
| } | |
| } | |
| }, | |
| "required": ["service", "action", "credentials"] | |
| } | |
| ), | |
| Tool( | |
| name="get_session_report", | |
| description=( | |
| "Get a detailed report of all API calls made in this session, including total costs saved, " | |
| "token usage, and breakdowns by service. Use this to see how much money and time you've " | |
| "saved by using mock mode during development." | |
| ), | |
| inputSchema={ | |
| "type": "object", | |
| "properties": { | |
| "format": { | |
| "type": "string", | |
| "description": "Report format", | |
| "enum": ["text", "json"], | |
| "default": "text" | |
| } | |
| } | |
| } | |
| ), | |
| Tool( | |
| name="compare_model_costs", | |
| description=( | |
| "Compare estimated costs for a response across different LLM providers. " | |
| "Helps you choose the most cost-effective model for your use case." | |
| ), | |
| inputSchema={ | |
| "type": "object", | |
| "properties": { | |
| "tokens_input": { | |
| "type": "integer", | |
| "description": "Number of input tokens" | |
| }, | |
| "tokens_output": { | |
| "type": "integer", | |
| "description": "Number of output tokens" | |
| } | |
| }, | |
| "required": ["tokens_input", "tokens_output"] | |
| } | |
| ), | |
| Tool( | |
| name="list_available_actions", | |
| description=( | |
| "List all available actions for a specific service. " | |
| "Use this to discover what you can test." | |
| ), | |
| inputSchema={ | |
| "type": "object", | |
| "properties": { | |
| "service": { | |
| "type": "string", | |
| "description": "Service to list actions for", | |
| "enum": ["github", "slack", "linear", "notion", "stripe", "sentry"] | |
| } | |
| }, | |
| "required": ["service"] | |
| } | |
| ), | |
| Tool( | |
| name="compare_mock_vs_real", | |
| description=( | |
| "Compare a mock response against a real API response to validate mock accuracy. " | |
| "Useful for ensuring your mocks stay up-to-date with the actual API." | |
| ), | |
| inputSchema={ | |
| "type": "object", | |
| "properties": { | |
| "service": { | |
| "type": "string", | |
| "description": "Service name", | |
| "enum": ["github", "slack", "linear", "notion", "stripe", "sentry"] | |
| }, | |
| "action": { | |
| "type": "string", | |
| "description": "Action to compare" | |
| }, | |
| "params": { | |
| "type": "object", | |
| "description": "Parameters for the action" | |
| }, | |
| "credentials": { | |
| "type": "object", | |
| "description": "Credentials for real API call" | |
| } | |
| }, | |
| "required": ["service", "action", "params", "credentials"] | |
| } | |
| ), | |
| Tool( | |
| name="manage_credentials", | |
| description=( | |
| "Manage stored credentials for real API calls. " | |
| "List, add, or check stored credentials." | |
| ), | |
| inputSchema={ | |
| "type": "object", | |
| "properties": { | |
| "command": { | |
| "type": "string", | |
| "description": "Command to execute", | |
| "enum": ["list", "check", "info"] | |
| }, | |
| "service": { | |
| "type": "string", | |
| "description": "Service name (for check/info commands)", | |
| "enum": ["github", "slack", "linear", "notion", "stripe", "sentry"] | |
| } | |
| }, | |
| "required": ["command"] | |
| } | |
| ) | |
| ] | |
| async def call_tool(name: str, arguments: Any) -> list[TextContent]: | |
| """Handle tool calls.""" | |
| try: | |
| if name == "simulate_integration": | |
| result = await handle_simulate_integration(arguments) | |
| elif name == "validate_permissions": | |
| result = await handle_validate_permissions(arguments) | |
| elif name == "get_session_report": | |
| result = await handle_get_session_report(arguments) | |
| elif name == "compare_model_costs": | |
| result = await handle_compare_costs(arguments) | |
| elif name == "list_available_actions": | |
| result = await handle_list_actions(arguments) | |
| elif name == "compare_mock_vs_real": | |
| result = await handle_compare_mock_real(arguments) | |
| elif name == "manage_credentials": | |
| result = await handle_manage_credentials(arguments) | |
| else: | |
| result = {"error": f"Unknown tool: {name}"} | |
| return [TextContent(type="text", text=json.dumps(result, indent=2))] | |
| except Exception as e: | |
| error_result = { | |
| "error": "TOOL_EXECUTION_ERROR", | |
| "message": str(e), | |
| "tool": name | |
| } | |
| return [TextContent(type="text", text=json.dumps(error_result, indent=2))] | |
| async def handle_simulate_integration(args: dict) -> dict: | |
| """Handle simulate_integration tool call.""" | |
| service = args['service'] | |
| action = args['action'] | |
| params = args['params'] | |
| credentials = args.get('credentials') | |
| mock = args.get('mock', True) | |
| model = args.get('model', 'claude-sonnet-4') | |
| # Check if service is supported | |
| if service not in simulators: | |
| return { | |
| "error": "SERVICE_NOT_SUPPORTED", | |
| "message": f"Service '{service}' is not supported. Available services: {list(simulators.keys())}", | |
| "available_services": list(simulators.keys()) | |
| } | |
| simulator = simulators[service] | |
| # Validate permissions if credentials provided | |
| permission_result = None | |
| if credentials: | |
| try: | |
| required_scopes = get_required_permissions(service, action)['scopes'] | |
| permission_result = permission_validator.validate( | |
| service, action, credentials, set(required_scopes) | |
| ) | |
| if not permission_result['valid']: | |
| # Return permission error but continue with mock | |
| response = { | |
| "warning": "PERMISSION_VALIDATION_FAILED", | |
| "permission_error": permission_result, | |
| "note": "Continuing with mock response, but this would fail in production!" | |
| } | |
| except Exception as e: | |
| permission_result = {"error": str(e)} | |
| # Execute the simulation | |
| response = simulator.execute(action, params, mock=mock, credentials=credentials) | |
| # Estimate tokens and cost | |
| if response.get('success'): | |
| tokens = simulator.estimate_tokens(response.get('data', {})) | |
| cost = calculate_cost(tokens, 0, model) # Simplified: assuming all input tokens | |
| # Add cost info to response | |
| response['cost_estimate'] = { | |
| 'model': model, | |
| 'tokens_estimated': tokens, | |
| 'cost_usd': f'${cost:.6f}', | |
| 'note': 'Mock mode - you saved this cost!' if mock else 'Real API call - cost incurred' | |
| } | |
| # Track in session | |
| session_tracker.record_call( | |
| service=service, | |
| action=action, | |
| mode='mock' if mock else 'real', | |
| tokens_estimated=tokens, | |
| cost_estimated=cost, | |
| model=model, | |
| success=True | |
| ) | |
| # Add permission validation result if available | |
| if permission_result: | |
| response['permission_validation'] = permission_result | |
| return response | |
| async def handle_validate_permissions(args: dict) -> dict: | |
| """Handle validate_permissions tool call.""" | |
| service = args['service'] | |
| action = args['action'] | |
| credentials = args['credentials'] | |
| try: | |
| required_perms = get_required_permissions(service, action) | |
| required_scopes = set(required_perms['scopes']) | |
| validation_result = permission_validator.validate( | |
| service, action, credentials, required_scopes | |
| ) | |
| return { | |
| "service": service, | |
| "action": action, | |
| "required_permissions": list(required_scopes), | |
| "validation": validation_result, | |
| "documentation": required_perms.get('docs_url'), | |
| "description": required_perms.get('description') | |
| } | |
| except Exception as e: | |
| return { | |
| "error": "VALIDATION_ERROR", | |
| "message": str(e), | |
| "service": service, | |
| "action": action | |
| } | |
| async def handle_get_session_report(args: dict) -> dict: | |
| """Handle get_session_report tool call.""" | |
| format_type = args.get('format', 'text') | |
| if format_type == 'json': | |
| return session_tracker.get_summary() | |
| else: | |
| return { | |
| "report": session_tracker.get_detailed_report() | |
| } | |
| async def handle_compare_costs(args: dict) -> dict: | |
| """Handle compare_model_costs tool call.""" | |
| tokens_input = args['tokens_input'] | |
| tokens_output = args['tokens_output'] | |
| comparison = compare_costs(tokens_input, tokens_output) | |
| # Format for readability | |
| formatted = { | |
| "tokens": { | |
| "input": tokens_input, | |
| "output": tokens_output, | |
| "total": tokens_input + tokens_output | |
| }, | |
| "cost_comparison": comparison, | |
| "cheapest": list(comparison.keys())[0], | |
| "most_expensive": list(comparison.keys())[-1] | |
| } | |
| return formatted | |
| async def handle_list_actions(args: dict) -> dict: | |
| """Handle list_available_actions tool call.""" | |
| service = args['service'] | |
| if service not in simulators: | |
| return { | |
| "error": "SERVICE_NOT_SUPPORTED", | |
| "message": f"Service '{service}' not yet implemented", | |
| "available_services": list(simulators.keys()) | |
| } | |
| simulator = simulators[service] | |
| actions = simulator.get_action_list() | |
| # Get details for each action | |
| action_details = [] | |
| for action in actions: | |
| try: | |
| perms = get_required_permissions(service, action) | |
| action_details.append({ | |
| "action": action, | |
| "description": perms['description'], | |
| "required_scopes": perms['scopes'], | |
| "docs": perms.get('docs_url') | |
| }) | |
| except: | |
| action_details.append({ | |
| "action": action, | |
| "description": "No description available" | |
| }) | |
| return { | |
| "service": service, | |
| "available_actions": action_details, | |
| "count": len(action_details) | |
| } | |
| async def main(): | |
| """Run the MCP server.""" | |
| async with stdio_server() as (read_stream, write_stream): | |
| await app.run( | |
| read_stream, | |
| write_stream, | |
| app.create_initialization_options() | |
| ) | |
| if __name__ == "__main__": | |
| asyncio.run(main()) | |
| async def handle_compare_mock_real(args: dict) -> dict: | |
| """Handle compare_mock_vs_real tool call.""" | |
| service = args['service'] | |
| action = args['action'] | |
| params = args['params'] | |
| credentials = args['credentials'] | |
| if service not in simulators: | |
| return { | |
| "error": "SERVICE_NOT_SUPPORTED", | |
| "message": f"Service '{service}' not supported" | |
| } | |
| simulator = simulators[service] | |
| # Execute mock | |
| mock_response = simulator.execute(action, params, mock=True) | |
| # Execute real | |
| real_response = simulator.execute(action, params, mock=False, credentials=credentials) | |
| # Compare | |
| comparator = MockRealComparator() | |
| comparison = comparator.compare(mock_response, real_response) | |
| return { | |
| "service": service, | |
| "action": action, | |
| "mock_response": mock_response, | |
| "real_response": real_response, | |
| "comparison": comparison, | |
| "report": comparator.generate_report(comparison) | |
| } | |
| async def handle_manage_credentials(args: dict) -> dict: | |
| """Handle manage_credentials tool call.""" | |
| command = args['command'] | |
| credential_mgr = get_credential_manager() | |
| if command == "list": | |
| services = credential_mgr.list_services() | |
| return { | |
| "command": "list", | |
| "services_with_credentials": services, | |
| "count": len(services) | |
| } | |
| elif command == "check": | |
| service = args.get('service') | |
| if not service: | |
| return {"error": "SERVICE_REQUIRED", "message": "Service name required for check command"} | |
| has_creds = credential_mgr.has_credentials(service) | |
| masked = credential_mgr.get_masked_credentials(service) if has_creds else None | |
| return { | |
| "command": "check", | |
| "service": service, | |
| "has_credentials": has_creds, | |
| "credentials": masked | |
| } | |
| elif command == "info": | |
| service = args.get('service') | |
| if not service: | |
| return {"error": "SERVICE_REQUIRED", "message": "Service name required for info command"} | |
| has_creds = credential_mgr.has_credentials(service) | |
| return { | |
| "command": "info", | |
| "service": service, | |
| "has_credentials": has_creds, | |
| "can_use_real_mode": has_creds, | |
| "note": "Credentials are loaded from environment variables or ~/.token-estimator/credentials.json" | |
| } | |
| return {"error": "UNKNOWN_COMMAND", "message": f"Unknown command: {command}"} | |