Ronio Jerico Roque
Refactor database connection to use environment variable and update MCP server URL
33dc83e
import os
import asyncpg
from fastapi import FastAPI, HTTPException
from fastapi_mcp import FastApiMCP
from pydantic import BaseModel
from typing import Optional
app = FastAPI()
@app.get("/")
async def read_root():
return {"message": "Server is running and healthy!"}
class ToolInput(BaseModel):
text: str
operation: str
class ToolOutput(BaseModel):
result: str
class QueryInput(BaseModel):
query: str
@app.post("/tools/echo", operation_id="echo_tool")
async def echo_tool(input_data: ToolInput) -> ToolOutput:
"""
A simple echo tool that returns the input text with a prefix
"""
try:
result = f"Echo: {input_data.text}"
return ToolOutput(result=result)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.post("/tools/query_postgres", operation_id="query_postgres")
async def query_postgres(input_data: QueryInput) -> ToolOutput:
"""
Query PostgreSQL database
"""
try:
# Only allow SELECT queries
if not input_data.query.lower().strip().startswith('select'):
return ToolOutput(result="Error: Only SELECT queries are allowed")
conn = await asyncpg.connect(dsn=os.getenv("db"))
rows = await conn.fetch(input_data.query + " LIMIT 10")
await conn.close()
if not rows:
return ToolOutput(result="No results found")
results = [dict(row) for row in rows]
return ToolOutput(result=f"Found {len(results)} rows: {results}")
except Exception as e:
return ToolOutput(result=f"Database error: {str(e)}")
# Initialize MCP with both tools
mcp = FastApiMCP(
app,
include_operations=["echo_tool", "query_postgres"]
)
mcp.mount()
if __name__ == "__main__":
import uvicorn
port = int(os.environ.get("PORT", "7860"))
uvicorn.run(app, host="0.0.0.0", port=port)