mcp-server-test-CG-compatible / escalation_server.py
prthm11's picture
Update escalation_server.py
e82457c verified
import os
import json
import contextlib
from fastmcp import FastMCP, Context
from fastmcp.dependencies import CurrentContext
from starlette.applications import Starlette
from starlette.routing import Route, Mount
from starlette.requests import Request
from starlette.responses import PlainTextResponse, JSONResponse
from starlette.middleware.cors import CORSMiddleware
import aiosmtplib
from email.message import EmailMessage
# --- EMAIL CONFIG ---
SMTP_SERVER = os.getenv("SMTP_SERVER", "smtp.gmail.com")
SMTP_PORT = int(os.getenv("SMTP_PORT", "465"))
SENDER_EMAIL = os.getenv("SENDER_EMAIL")
SENDER_PASSWORD = os.getenv("SENDER_PASSWORD")
if not SENDER_EMAIL or not SENDER_PASSWORD:
print("WARNING: SMTP credentials are not set. Email tools will fail.")
# --- FastMCP init ---
mcp = FastMCP("escalation_server")
# --- Data ---
CURRENT_DIR = os.path.dirname(os.path.abspath(__file__))
RESOURCE_PATH = os.path.join(CURRENT_DIR, "resource", "jemh114-min (1).md")
FACULTY_DATA = {
"Mr. Murty": {"email": "dharmin.naik@webashlar.com", "id": "T-MURTY-01"},
"Mrs. Sunita": {"email": "dummyaiapi03@gmail.com", "id": "T-SUNITA-03"},
"Mr. Patel": {"email": "dev.patel@webashlar.com", "id": "T-PATEL-05"},
"Dr. Rao": {"email": "pratham.lakdawala@webashlar.com", "id": "T-RAO-02"},
"Mr. Khanna": {"email": "dummyuseai@gmail.com", "id": "T-KHANNA-04"},
"Ms. Joshi": {"email": "dummyaiapi532@gmail.com", "id": "T-JOSHI-06"},
}
CLASS_ASSIGNMENTS = {
"10-A": {"Math": "Mr. Murty", "Science": "Dr. Rao"},
"10-B": {"Math": "Mrs. Sunita", "Science": "Mr. Khanna"},
"10-C": {"Math": "Mr. Patel", "Science": "Ms. Joshi"},
}
SUBJECT_MAP = {
"mathematics": "Math", "math": "Math", "maths": "Math",
"statistics": "Math", "stats": "Math",
"science": "Science", "physics": "Science", "biology": "Science",
}
def normalize_subject(subject: str) -> str:
if not subject:
return subject
return SUBJECT_MAP.get(subject.lower(), subject.title())
def read_resource_file() -> str:
if not os.path.exists(RESOURCE_PATH):
return "Error: Statistics markdown file not found."
with open(RESOURCE_PATH, "r", encoding="utf-8") as f:
return f.read()
def content_text(obj) -> dict:
text = json.dumps(obj, ensure_ascii=False) if isinstance(obj, (dict, list)) else str(obj)
return {"content": [{"type": "text", "text": text}]}
# --- RESOURCES ---
@mcp.resource("study://math/statistics")
def statistics_resource() -> dict:
return content_text({"id": "study://math/statistics", "text": read_resource_file()})
@mcp.resource("system://tutor/instructions")
def tutor_instructions() -> dict:
return content_text({"instructions": "1. Gateway only. 2. Max 3 attempts. 3. Escalate if limit reached."})
# --- TOOLS ---
# readOnlyHint=True → ChatGPT skips confirmation dialogs for these tools
@mcp.tool(annotations={"readOnlyHint": True})
async def explain_from_resource(concept: str, ctx: Context = CurrentContext()):
"""Explain a math or science concept using the available study material."""
attempts = (await ctx.get_state("math_attempts")) or 0
if attempts >= 3:
return content_text({"error": "TUTORING_LIMIT_REACHED", "next": "REQUEST_CLASS_ID"})
await ctx.set_state("math_attempts", attempts + 1)
return content_text({"attempt": attempts + 1, "material": read_resource_file(), "concept": concept})
@mcp.tool(annotations={"readOnlyHint": True})
async def get_teacher_info(subject: str, class_id: str) -> dict:
"""Get the assigned teacher name and ID for a given subject and class."""
std_subject = normalize_subject(subject)
clean = class_id.upper().replace("CLASS", "").replace(" ", "").strip()
std_class = f"{clean[:2]}-{clean[2:]}" if len(clean) == 3 else clean
class_data = CLASS_ASSIGNMENTS.get(std_class)
if not class_data:
return {"error": f"Class '{std_class}' not found. Valid: 10-A, 10-B, 10-C."}
teacher_name = class_data.get(std_subject)
if not teacher_name:
return {"error": f"No teacher for '{std_subject}' in '{std_class}'."}
teacher_details = FACULTY_DATA.get(teacher_name, {})
return {"name": teacher_name, "id": teacher_details.get("id"), "subject": std_subject}
@mcp.tool(annotations={"readOnlyHint": False, "openWorldHint": False})
async def escalate_to_teacher(
teacher_name: str,
subject: str,
student_query: str,
ai_response: str,
ctx: Context = CurrentContext(),
):
"""Escalate an unresolved student doubt to the assigned teacher via email."""
teacher_info = FACULTY_DATA.get(teacher_name)
recipient = teacher_info["email"] if teacher_info else SENDER_EMAIL
msg = EmailMessage()
msg["Subject"] = f"[Escalation] {subject} Doubt from Student"
msg["From"] = SENDER_EMAIL
msg["To"] = recipient
msg.set_content(
f"A student doubt was escalated after 3 AI attempts.\n\n"
f"Subject: {subject}\n"
f"Student Query: {student_query}\n"
f"Last AI Response: {ai_response}"
)
try:
await aiosmtplib.send(
msg, hostname=SMTP_SERVER, port=SMTP_PORT,
username=SENDER_EMAIL, password=SENDER_PASSWORD, use_tls=True,
)
await ctx.set_state("math_attempts", 0)
return content_text({"status": "success", "notified": recipient})
except Exception as e:
return content_text({"status": "error", "message": str(e)})
# --- MCP PROTOCOL CONSTANTS ---
PROTOCOL_VERSION = "2024-11-05"
# --- CRITICAL: Explicit MCP handshake handler ---
# HF Spaces returns HTML for cold requests. This handler intercepts
# the JSON-RPC initialize call BEFORE FastMCP and returns a valid
# MCP response with the correct headers ChatGPT checks for.
async def mcp_handshake(request: Request):
try:
body = await request.json()
except Exception:
return JSONResponse({"error": "Invalid JSON"}, status_code=400)
method = body.get("method")
req_id = body.get("id")
# 1. initialize
if method == "initialize":
return JSONResponse(
{
"jsonrpc": "2.0",
"id": req_id,
"result": {
"protocolVersion": PROTOCOL_VERSION,
"capabilities": {
"tools": {"listChanged": False},
"resources": {"listChanged": False},
"prompts": {"listChanged": False},
"logging": {},
},
"serverInfo": {"name": "escalation_server", "version": "1.0.0"},
},
},
headers={"MCP-Protocol-Version": PROTOCOL_VERSION},
)
# 2. notifications/initialized — no body needed, just 200
if method == "notifications/initialized":
return PlainTextResponse("", status_code=200)
# 3. tools/list — hardcoded, ZERO latency, no FastMCP delegation
if method == "tools/list":
return JSONResponse(
{
"jsonrpc": "2.0",
"id": req_id,
"result": {
"tools": [
{
"name": "explain_from_resource",
"description": "Explain a math or science concept using available study material.",
"inputSchema": {
"type": "object",
"properties": {
"concept": {"type": "string", "description": "The concept to explain"}
},
"required": ["concept"],
},
},
{
"name": "get_teacher_info",
"description": "Get the assigned teacher name and ID for a subject and class.",
"inputSchema": {
"type": "object",
"properties": {
"subject": {"type": "string", "description": "Subject name e.g. Math, Science"},
"class_id": {"type": "string", "description": "Class ID e.g. 10-A, 10-B, 10-C"},
},
"required": ["subject", "class_id"],
},
},
{
"name": "escalate_to_teacher",
"description": "Escalate an unresolved student doubt to the teacher via email.",
"inputSchema": {
"type": "object",
"properties": {
"teacher_name": {"type": "string"},
"subject": {"type": "string"},
"student_query": {"type": "string"},
"ai_response": {"type": "string"},
},
"required": ["teacher_name", "subject", "student_query", "ai_response"],
},
},
]
},
},
headers={"MCP-Protocol-Version": PROTOCOL_VERSION},
)
# All other calls (tools/call etc.) → delegate to FastMCP
return await mcp_asgi(request.scope, request._receive, request._send)
# Build FastMCP ASGI app
mcp_asgi = mcp.http_app(stateless_http=True)
async def health(request: Request):
return PlainTextResponse("ok")
app = Starlette(
routes=[
Route("/health", health, methods=["GET"]),
# This explicit route intercepts /mcp BEFORE HF can return HTML
Route("/mcp", mcp_handshake, methods=["POST", "GET", "OPTIONS"]),
# Fallback mount for any other FastMCP routes
Mount("/", app=mcp_asgi),
],
lifespan=mcp_asgi.lifespan,
)
app.add_middleware(
CORSMiddleware,
allow_origins=["https://chatgpt.com", "https://chat.openai.com", "*"],
allow_methods=["GET", "POST", "OPTIONS", "DELETE"],
allow_headers=["*"],
expose_headers=["MCP-Protocol-Version"],
)
if __name__ == "__main__":
import uvicorn
port = int(os.getenv("PORT", "7860"))
uvicorn.run("escalation_server:app", host="0.0.0.0", port=port, workers=1)