File size: 1,905 Bytes
b99cd1e
83a4aff
e94f698
 
 
 
b99cd1e
 
 
 
 
e94f698
b99cd1e
e94f698
 
 
 
 
 
 
83a4aff
 
 
e94f698
 
 
 
 
 
 
 
 
 
 
83a4aff
 
 
 
 
 
 
 
 
33dc83e
 
83a4aff
 
 
 
 
 
 
 
 
 
 
 
 
e94f698
 
83a4aff
 
e94f698
b99cd1e
 
 
e94f698
b99cd1e
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
import os
import asyncpg
from fastapi import FastAPI, HTTPException
from fastapi_mcp import FastApiMCP
from pydantic import BaseModel
from typing import Optional

app = FastAPI()

@app.get("/")
async def read_root():
    return {"message": "Server is running and healthy!"}

class ToolInput(BaseModel):
    text: str
    operation: str

class ToolOutput(BaseModel):
    result: str

class QueryInput(BaseModel):
    query: str

@app.post("/tools/echo", operation_id="echo_tool")
async def echo_tool(input_data: ToolInput) -> ToolOutput:
    """
    A simple echo tool that returns the input text with a prefix
    """
    try:
        result = f"Echo: {input_data.text}"
        return ToolOutput(result=result)
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

@app.post("/tools/query_postgres", operation_id="query_postgres")
async def query_postgres(input_data: QueryInput) -> ToolOutput:
    """
    Query PostgreSQL database
    """
    try:
        # Only allow SELECT queries
        if not input_data.query.lower().strip().startswith('select'):
            return ToolOutput(result="Error: Only SELECT queries are allowed")

        conn = await asyncpg.connect(dsn=os.getenv("db"))
        rows = await conn.fetch(input_data.query + " LIMIT 10")
        await conn.close()
        
        if not rows:
            return ToolOutput(result="No results found")
        
        results = [dict(row) for row in rows]
        return ToolOutput(result=f"Found {len(results)} rows: {results}")
        
    except Exception as e:
        return ToolOutput(result=f"Database error: {str(e)}")

# Initialize MCP with both tools
mcp = FastApiMCP(
    app,
    include_operations=["echo_tool", "query_postgres"]
)
mcp.mount()

if __name__ == "__main__":
    import uvicorn
    port = int(os.environ.get("PORT", "7860"))
    uvicorn.run(app, host="0.0.0.0", port=port)