File size: 19,190 Bytes
2358888
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
"""
Token Estimator MCP Server.

Provides tools for simulating API integrations with cost estimation.
"""

import asyncio
import json
from typing import Any, Optional

from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import Tool, TextContent

from simulators.github import GitHubSimulator
from simulators.slack import SlackSimulator
from simulators.linear import LinearSimulator
from simulators.notion import NotionSimulator
from simulators.stripe import StripeSimulator
from simulators.sentry import SentrySimulator
from permissions.requirements import get_required_permissions
from permissions.validator import PermissionValidator
from pricing.providers import calculate_cost, compare_costs
from utils.session_tracker import SessionTracker
from utils.credential_manager import get_credential_manager
from utils.comparator import MockRealComparator
from utils.token_counter import get_token_counter


# Initialize server
app = Server("token-estimator")

# Initialize components
simulators = {
    'github': GitHubSimulator(),
    'slack': SlackSimulator(),
    'linear': LinearSimulator(),
    'notion': NotionSimulator(),
    'stripe': StripeSimulator(),
    'sentry': SentrySimulator(),
}

permission_validator = PermissionValidator()
session_tracker = SessionTracker()


@app.list_tools()
async def list_tools() -> list[Tool]:
    """List available MCP tools."""
    return [
        Tool(
            name="simulate_integration",
            description=(
                "Simulate an API integration (GitHub, Slack, Linear, etc.) without making real calls. "
                "Returns realistic mock responses based on official API specs, estimates token costs, "
                "and validates permissions. Use this to test your AI agent's integration logic before "
                "deploying to production."
            ),
            inputSchema={
                "type": "object",
                "properties": {
                    "service": {
                        "type": "string",
                        "description": "Service to integrate with",
                        "enum": ["github", "slack", "linear", "notion", "stripe", "sentry"]
                    },
                    "action": {
                        "type": "string",
                        "description": "Action to perform (e.g., 'get_pull_request', 'post_message')"
                    },
                    "params": {
                        "type": "object",
                        "description": "Parameters for the action (varies by service/action)"
                    },
                    "credentials": {
                        "type": "object",
                        "description": "Optional credentials for permission validation",
                        "properties": {
                            "token": {"type": "string"},
                            "api_key": {"type": "string"}
                        }
                    },
                    "mock": {
                        "type": "boolean",
                        "description": "If true (default), return mock response. If false, make real API call.",
                        "default": True
                    },
                    "model": {
                        "type": "string",
                        "description": "LLM model to estimate costs for",
                        "enum": ["claude-sonnet-4", "claude-opus-4", "claude-haiku-4", "gpt-4-turbo", "gpt-4o", "gemini-pro"],
                        "default": "claude-sonnet-4"
                    }
                },
                "required": ["service", "action", "params"]
            }
        ),
        Tool(
            name="validate_permissions",
            description=(
                "Validate that provided credentials have the required permissions for an action. "
                "This helps catch permission issues during development before they cause production failures."
            ),
            inputSchema={
                "type": "object",
                "properties": {
                    "service": {
                        "type": "string",
                        "description": "Service name",
                        "enum": ["github", "slack", "linear", "notion", "stripe", "sentry"]
                    },
                    "action": {
                        "type": "string",
                        "description": "Action to validate permissions for"
                    },
                    "credentials": {
                        "type": "object",
                        "description": "Credentials to validate",
                        "properties": {
                            "token": {"type": "string"},
                            "api_key": {"type": "string"}
                        }
                    }
                },
                "required": ["service", "action", "credentials"]
            }
        ),
        Tool(
            name="get_session_report",
            description=(
                "Get a detailed report of all API calls made in this session, including total costs saved, "
                "token usage, and breakdowns by service. Use this to see how much money and time you've "
                "saved by using mock mode during development."
            ),
            inputSchema={
                "type": "object",
                "properties": {
                    "format": {
                        "type": "string",
                        "description": "Report format",
                        "enum": ["text", "json"],
                        "default": "text"
                    }
                }
            }
        ),
        Tool(
            name="compare_model_costs",
            description=(
                "Compare estimated costs for a response across different LLM providers. "
                "Helps you choose the most cost-effective model for your use case."
            ),
            inputSchema={
                "type": "object",
                "properties": {
                    "tokens_input": {
                        "type": "integer",
                        "description": "Number of input tokens"
                    },
                    "tokens_output": {
                        "type": "integer",
                        "description": "Number of output tokens"
                    }
                },
                "required": ["tokens_input", "tokens_output"]
            }
        ),
        Tool(
            name="list_available_actions",
            description=(
                "List all available actions for a specific service. "
                "Use this to discover what you can test."
            ),
            inputSchema={
                "type": "object",
                "properties": {
                    "service": {
                        "type": "string",
                        "description": "Service to list actions for",
                        "enum": ["github", "slack", "linear", "notion", "stripe", "sentry"]
                    }
                },
                "required": ["service"]
            }
        ),
        Tool(
            name="compare_mock_vs_real",
            description=(
                "Compare a mock response against a real API response to validate mock accuracy. "
                "Useful for ensuring your mocks stay up-to-date with the actual API."
            ),
            inputSchema={
                "type": "object",
                "properties": {
                    "service": {
                        "type": "string",
                        "description": "Service name",
                        "enum": ["github", "slack", "linear", "notion", "stripe", "sentry"]
                    },
                    "action": {
                        "type": "string",
                        "description": "Action to compare"
                    },
                    "params": {
                        "type": "object",
                        "description": "Parameters for the action"
                    },
                    "credentials": {
                        "type": "object",
                        "description": "Credentials for real API call"
                    }
                },
                "required": ["service", "action", "params", "credentials"]
            }
        ),
        Tool(
            name="manage_credentials",
            description=(
                "Manage stored credentials for real API calls. "
                "List, add, or check stored credentials."
            ),
            inputSchema={
                "type": "object",
                "properties": {
                    "command": {
                        "type": "string",
                        "description": "Command to execute",
                        "enum": ["list", "check", "info"]
                    },
                    "service": {
                        "type": "string",
                        "description": "Service name (for check/info commands)",
                        "enum": ["github", "slack", "linear", "notion", "stripe", "sentry"]
                    }
                },
                "required": ["command"]
            }
        )
    ]


@app.call_tool()
async def call_tool(name: str, arguments: Any) -> list[TextContent]:
    """Handle tool calls."""
    
    try:
        if name == "simulate_integration":
            result = await handle_simulate_integration(arguments)
        elif name == "validate_permissions":
            result = await handle_validate_permissions(arguments)
        elif name == "get_session_report":
            result = await handle_get_session_report(arguments)
        elif name == "compare_model_costs":
            result = await handle_compare_costs(arguments)
        elif name == "list_available_actions":
            result = await handle_list_actions(arguments)
        elif name == "compare_mock_vs_real":
            result = await handle_compare_mock_real(arguments)
        elif name == "manage_credentials":
            result = await handle_manage_credentials(arguments)
        else:
            result = {"error": f"Unknown tool: {name}"}
        
        return [TextContent(type="text", text=json.dumps(result, indent=2))]
    
    except Exception as e:
        error_result = {
            "error": "TOOL_EXECUTION_ERROR",
            "message": str(e),
            "tool": name
        }
        return [TextContent(type="text", text=json.dumps(error_result, indent=2))]


async def handle_simulate_integration(args: dict) -> dict:
    """Handle simulate_integration tool call."""
    service = args['service']
    action = args['action']
    params = args['params']
    credentials = args.get('credentials')
    mock = args.get('mock', True)
    model = args.get('model', 'claude-sonnet-4')
    
    # Check if service is supported
    if service not in simulators:
        return {
            "error": "SERVICE_NOT_SUPPORTED",
            "message": f"Service '{service}' is not supported. Available services: {list(simulators.keys())}",
            "available_services": list(simulators.keys())
        }
    
    simulator = simulators[service]
    
    # Validate permissions if credentials provided
    permission_result = None
    if credentials:
        try:
            required_scopes = get_required_permissions(service, action)['scopes']
            permission_result = permission_validator.validate(
                service, action, credentials, set(required_scopes)
            )
            
            if not permission_result['valid']:
                # Return permission error but continue with mock
                response = {
                    "warning": "PERMISSION_VALIDATION_FAILED",
                    "permission_error": permission_result,
                    "note": "Continuing with mock response, but this would fail in production!"
                }
        except Exception as e:
            permission_result = {"error": str(e)}
    
    # Execute the simulation
    response = simulator.execute(action, params, mock=mock, credentials=credentials)
    
    # Estimate tokens and cost
    if response.get('success'):
        tokens = simulator.estimate_tokens(response.get('data', {}))
        cost = calculate_cost(tokens, 0, model)  # Simplified: assuming all input tokens
        
        # Add cost info to response
        response['cost_estimate'] = {
            'model': model,
            'tokens_estimated': tokens,
            'cost_usd': f'${cost:.6f}',
            'note': 'Mock mode - you saved this cost!' if mock else 'Real API call - cost incurred'
        }
        
        # Track in session
        session_tracker.record_call(
            service=service,
            action=action,
            mode='mock' if mock else 'real',
            tokens_estimated=tokens,
            cost_estimated=cost,
            model=model,
            success=True
        )
    
    # Add permission validation result if available
    if permission_result:
        response['permission_validation'] = permission_result
    
    return response


async def handle_validate_permissions(args: dict) -> dict:
    """Handle validate_permissions tool call."""
    service = args['service']
    action = args['action']
    credentials = args['credentials']
    
    try:
        required_perms = get_required_permissions(service, action)
        required_scopes = set(required_perms['scopes'])
        
        validation_result = permission_validator.validate(
            service, action, credentials, required_scopes
        )
        
        return {
            "service": service,
            "action": action,
            "required_permissions": list(required_scopes),
            "validation": validation_result,
            "documentation": required_perms.get('docs_url'),
            "description": required_perms.get('description')
        }
    
    except Exception as e:
        return {
            "error": "VALIDATION_ERROR",
            "message": str(e),
            "service": service,
            "action": action
        }


async def handle_get_session_report(args: dict) -> dict:
    """Handle get_session_report tool call."""
    format_type = args.get('format', 'text')
    
    if format_type == 'json':
        return session_tracker.get_summary()
    else:
        return {
            "report": session_tracker.get_detailed_report()
        }


async def handle_compare_costs(args: dict) -> dict:
    """Handle compare_model_costs tool call."""
    tokens_input = args['tokens_input']
    tokens_output = args['tokens_output']
    
    comparison = compare_costs(tokens_input, tokens_output)
    
    # Format for readability
    formatted = {
        "tokens": {
            "input": tokens_input,
            "output": tokens_output,
            "total": tokens_input + tokens_output
        },
        "cost_comparison": comparison,
        "cheapest": list(comparison.keys())[0],
        "most_expensive": list(comparison.keys())[-1]
    }
    
    return formatted


async def handle_list_actions(args: dict) -> dict:
    """Handle list_available_actions tool call."""
    service = args['service']
    
    if service not in simulators:
        return {
            "error": "SERVICE_NOT_SUPPORTED",
            "message": f"Service '{service}' not yet implemented",
            "available_services": list(simulators.keys())
        }
    
    simulator = simulators[service]
    actions = simulator.get_action_list()
    
    # Get details for each action
    action_details = []
    for action in actions:
        try:
            perms = get_required_permissions(service, action)
            action_details.append({
                "action": action,
                "description": perms['description'],
                "required_scopes": perms['scopes'],
                "docs": perms.get('docs_url')
            })
        except:
            action_details.append({
                "action": action,
                "description": "No description available"
            })
    
    return {
        "service": service,
        "available_actions": action_details,
        "count": len(action_details)
    }


async def main():
    """Run the MCP server."""
    async with stdio_server() as (read_stream, write_stream):
        await app.run(
            read_stream,
            write_stream,
            app.create_initialization_options()
        )


if __name__ == "__main__":
    asyncio.run(main())


async def handle_compare_mock_real(args: dict) -> dict:
    """Handle compare_mock_vs_real tool call."""
    service = args['service']
    action = args['action']
    params = args['params']
    credentials = args['credentials']
    
    if service not in simulators:
        return {
            "error": "SERVICE_NOT_SUPPORTED",
            "message": f"Service '{service}' not supported"
        }
    
    simulator = simulators[service]
    
    # Execute mock
    mock_response = simulator.execute(action, params, mock=True)
    
    # Execute real
    real_response = simulator.execute(action, params, mock=False, credentials=credentials)
    
    # Compare
    comparator = MockRealComparator()
    comparison = comparator.compare(mock_response, real_response)
    
    return {
        "service": service,
        "action": action,
        "mock_response": mock_response,
        "real_response": real_response,
        "comparison": comparison,
        "report": comparator.generate_report(comparison)
    }


async def handle_manage_credentials(args: dict) -> dict:
    """Handle manage_credentials tool call."""
    command = args['command']
    credential_mgr = get_credential_manager()
    
    if command == "list":
        services = credential_mgr.list_services()
        return {
            "command": "list",
            "services_with_credentials": services,
            "count": len(services)
        }
    
    elif command == "check":
        service = args.get('service')
        if not service:
            return {"error": "SERVICE_REQUIRED", "message": "Service name required for check command"}
        
        has_creds = credential_mgr.has_credentials(service)
        masked = credential_mgr.get_masked_credentials(service) if has_creds else None
        
        return {
            "command": "check",
            "service": service,
            "has_credentials": has_creds,
            "credentials": masked
        }
    
    elif command == "info":
        service = args.get('service')
        if not service:
            return {"error": "SERVICE_REQUIRED", "message": "Service name required for info command"}
        
        has_creds = credential_mgr.has_credentials(service)
        
        return {
            "command": "info",
            "service": service,
            "has_credentials": has_creds,
            "can_use_real_mode": has_creds,
            "note": "Credentials are loaded from environment variables or ~/.token-estimator/credentials.json"
        }
    
    return {"error": "UNKNOWN_COMMAND", "message": f"Unknown command: {command}"}