|
|
""" |
|
|
MCP Tools for Session Management and Quiz |
|
|
""" |
|
|
|
|
|
import json |
|
|
import uuid |
|
|
import random |
|
|
from mcp.server.fastmcp import FastMCP |
|
|
from pydantic import Field |
|
|
from typing import Dict, List, Optional |
|
|
|
|
|
|
|
|
def register_tools(mcp: FastMCP, db_getter): |
|
|
"""Register all MCP tools with lazy database loading""" |
|
|
|
|
|
@mcp.tool( |
|
|
title="Add Player to Session", |
|
|
description="Add a player to a session with their pseudo and score in the database", |
|
|
) |
|
|
async def add_player( |
|
|
session_id: str = Field(description="The ID of the session to add the player to"), |
|
|
player_pseudo: str = Field(description="The pseudo/username of the player to add") |
|
|
) -> str: |
|
|
"""Add a player to a session in the database""" |
|
|
try: |
|
|
|
|
|
db = db_getter() |
|
|
|
|
|
session_ref = db.collection('quiz_sessions').document(session_id) |
|
|
|
|
|
|
|
|
session_doc = session_ref.get() |
|
|
print(session_doc) |
|
|
|
|
|
|
|
|
session_data = { |
|
|
'players': [] |
|
|
} |
|
|
|
|
|
if session_doc.exists: |
|
|
session_data = session_doc.to_dict() |
|
|
|
|
|
|
|
|
new_player = { |
|
|
'pseudo': player_pseudo, |
|
|
'score': 0 |
|
|
} |
|
|
|
|
|
|
|
|
players = session_data.get('players', []) |
|
|
existing_player = next((player for player in players if player.get('pseudo') == player_pseudo), None) |
|
|
|
|
|
if existing_player: |
|
|
result = { |
|
|
"player_pseudo": player_pseudo, |
|
|
"session_id": session_id, |
|
|
"score": existing_player.get('score', 0), |
|
|
"message": f"Player '{player_pseudo}' already exists in session '{session_id}'", |
|
|
"already_exists": True |
|
|
} |
|
|
return json.dumps(result, indent=2) |
|
|
|
|
|
|
|
|
player_id = str(uuid.uuid4())[:8] |
|
|
new_player['player_id'] = player_id |
|
|
|
|
|
|
|
|
players.append(new_player) |
|
|
session_data['players'] = players |
|
|
|
|
|
|
|
|
if not session_doc.exists: |
|
|
|
|
|
from firebase_admin import firestore |
|
|
session_ref.set(session_data) |
|
|
session_ref.update({'created_at': firestore.SERVER_TIMESTAMP}) |
|
|
else: |
|
|
|
|
|
session_ref.set(session_data, merge=True) |
|
|
|
|
|
result = { |
|
|
"player_id": player_id, |
|
|
"player_pseudo": player_pseudo, |
|
|
"session_id": session_id, |
|
|
"score": 0, |
|
|
"message": f"Successfully added player '{player_pseudo}' to session '{session_id}'", |
|
|
"already_exists": False |
|
|
} |
|
|
|
|
|
return json.dumps(result, indent=2) |
|
|
|
|
|
except Exception as e: |
|
|
return f"Error adding player to session: {str(e)}" |
|
|
|
|
|
@mcp.tool( |
|
|
title="Get Next Question", |
|
|
description="Get the next question for a quiz session", |
|
|
) |
|
|
async def get_next_question( |
|
|
session_id: str = Field(description="The ID of the quiz session") |
|
|
) -> str: |
|
|
"""Get the next question from the quiz session""" |
|
|
try: |
|
|
|
|
|
db = db_getter() |
|
|
|
|
|
session_ref = db.collection('quiz_sessions').document(session_id) |
|
|
session_doc = session_ref.get() |
|
|
|
|
|
if not session_doc.exists: |
|
|
return f"Session '{session_id}' not found" |
|
|
|
|
|
session_data = session_doc.to_dict() |
|
|
questions = session_data.get('questions', []) |
|
|
current_question_index = session_data.get('current_question', 0) |
|
|
|
|
|
|
|
|
if not questions: |
|
|
return json.dumps({ |
|
|
"error": "No questions available", |
|
|
"message": "This session has no questions configured. Please create the session via the backend API first." |
|
|
}, indent=2) |
|
|
|
|
|
|
|
|
if current_question_index >= len(questions): |
|
|
return json.dumps({ |
|
|
"finished": True, |
|
|
"message": "Quiz finished - no more questions available", |
|
|
"total_questions": len(questions), |
|
|
"current_question": current_question_index + 1 |
|
|
}, indent=2) |
|
|
|
|
|
|
|
|
current_question = questions[current_question_index] |
|
|
|
|
|
|
|
|
question_data = { |
|
|
"question_id": current_question.get('id'), |
|
|
"question_text": current_question.get('question'), |
|
|
"options": current_question.get('options', []), |
|
|
"question_number": current_question_index + 1, |
|
|
"total_questions": len(questions) |
|
|
} |
|
|
|
|
|
return json.dumps(question_data, indent=2) |
|
|
|
|
|
except Exception as e: |
|
|
return f"Error getting next question: {str(e)}" |
|
|
|
|
|
@mcp.tool( |
|
|
title="Submit Answer", |
|
|
description="Submit answer, validate it, and update score in one operation for the current user", |
|
|
) |
|
|
async def submit_answer( |
|
|
session_id: str = Field(description="The ID of the quiz session"), |
|
|
player_pseudo: str = Field(description="The pseudo/playername of the player"), |
|
|
answer_index: int = Field(description="The selected answer index (0-3)") |
|
|
) -> str: |
|
|
"""Submit answer, validate, and update score atomically""" |
|
|
try: |
|
|
|
|
|
db = db_getter() |
|
|
session_ref = db.collection('quiz_sessions').document(session_id) |
|
|
session_doc = session_ref.get() |
|
|
|
|
|
if not session_doc.exists: |
|
|
return f"Session '{session_id}' not found" |
|
|
|
|
|
session_data = session_doc.to_dict() |
|
|
questions = session_data.get('questions', []) |
|
|
current_question_index = session_data.get('current_question', 0) |
|
|
players = session_data.get('players', []) |
|
|
|
|
|
|
|
|
if current_question_index >= len(questions): |
|
|
return "Quiz finished - no active question" |
|
|
|
|
|
current_question = questions[current_question_index] |
|
|
correct_answer = current_question.get('correct') |
|
|
is_correct = answer_index == correct_answer |
|
|
|
|
|
|
|
|
player_found = False |
|
|
new_score = 0 |
|
|
for player in players: |
|
|
if player.get('pseudo') == player_pseudo: |
|
|
if is_correct: |
|
|
player['score'] = player.get('score', 0) + 1 |
|
|
new_score = player.get('score', 0) |
|
|
player_found = True |
|
|
break |
|
|
|
|
|
if not player_found: |
|
|
return f"User '{player_pseudo}' not found in session '{session_id}'" |
|
|
|
|
|
|
|
|
session_data['players'] = players |
|
|
session_ref.set(session_data, merge=True) |
|
|
|
|
|
result = { |
|
|
"correct": is_correct, |
|
|
"correct_answer_index": correct_answer, |
|
|
"submitted_answer_index": answer_index, |
|
|
"player_pseudo": player_pseudo, |
|
|
"new_score": new_score, |
|
|
"question_id": current_question.get('id') |
|
|
} |
|
|
|
|
|
return json.dumps(result, indent=2) |
|
|
|
|
|
except Exception as e: |
|
|
return f"Error submitting answer: {str(e)}" |
|
|
|
|
|
@mcp.tool( |
|
|
title="Get Live Scores", |
|
|
description="Get the current scoreboard for a quiz session for all players", |
|
|
) |
|
|
async def get_scores( |
|
|
session_id: str = Field(description="The ID of the quiz session") |
|
|
) -> str: |
|
|
"""Get live scoreboard for the session""" |
|
|
try: |
|
|
|
|
|
db = db_getter() |
|
|
session_ref = db.collection('quiz_sessions').document(session_id) |
|
|
session_doc = session_ref.get() |
|
|
|
|
|
if not session_doc.exists: |
|
|
return f"Session '{session_id}' not found" |
|
|
|
|
|
session_data = session_doc.to_dict() |
|
|
players = session_data.get('players', []) |
|
|
current_question = session_data.get('current_question', 0) |
|
|
total_questions = len(session_data.get('questions', [])) |
|
|
|
|
|
|
|
|
scores = [] |
|
|
for player in players: |
|
|
scores.append({ |
|
|
"pseudo": player.get('pseudo'), |
|
|
"score": player.get('score', 0) |
|
|
}) |
|
|
|
|
|
|
|
|
scores.sort(key=lambda x: x["score"], reverse=True) |
|
|
|
|
|
result = { |
|
|
"scores": scores, |
|
|
"current_question": current_question + 1, |
|
|
"total_questions": total_questions, |
|
|
"session_id": session_id |
|
|
} |
|
|
|
|
|
return json.dumps(result, indent=2) |
|
|
|
|
|
except Exception as e: |
|
|
return f"Error getting scores: {str(e)}" |