""" FinMK MCP Server ================ A full Model Context Protocol (MCP) server that exposes every existing Django REST API as an agent-callable tool. Zero business logic duplication — all tools proxy through the existing Django REST endpoints and reuse all validation, MongoDB operations, and business logic already in place. Works in two modes: - Local dev: LM Studio (qwen3.5:2b) calls tools via MCP protocol - Production: Gemini / Cerebras function-calling integrates same tools via chatbot Usage: python mcp_server.py (for LM Studio / stdio) npx @modelcontextprotocol/inspector python mcp_server.py (for browser inspector) """ import os import sys import json import httpx import pdfplumber from typing import Optional, Dict, Any from mcp.server.fastmcp import FastMCP from dotenv import load_dotenv load_dotenv() # ────────────────────────────────────────────── # Configuration # ────────────────────────────────────────────── mcp = FastMCP("FinMK AI Agent") PORT = os.getenv("PORT", "8000") BASE_URL = (os.getenv("API_BASE_URL") or f"http://localhost:{PORT}/api").rstrip("/") if os.getenv("RENDER") == "true" and not os.getenv("API_BASE_URL"): BASE_URL = f"http://localhost:{PORT}/api".rstrip("/") # Shared auth state — persists for the lifetime of the MCP session _auth: Dict[str, Optional[str]] = { "access_token": None, "refresh_token": None, "username": None, } def _headers() -> Dict[str, str]: h = {"Content-Type": "application/json"} if _auth["access_token"]: h["Authorization"] = f"Bearer {_auth['access_token']}" return h def _require_auth() -> Optional[str]: """Returns an error string if not authenticated, else None.""" if not _auth["access_token"]: return "Error: Not authenticated. Call 'authenticate_agent' first." return None async def _get(path: str, params: Optional[Dict[str, Any]] = None) -> str: err = _require_auth() if err: return err full_path = path.lstrip("/") url = f"{BASE_URL}/{full_path}" print(f"[MCP] GET {url}", file=sys.stderr) async with httpx.AsyncClient(timeout=300) as client: try: r = await client.get(url, headers=_headers(), params=params) print(f"[MCP] Response {r.status_code} from {url}", file=sys.stderr) if r.status_code == 404: print(f"[MCP] 404 ERROR Body: {r.text[:500]}", file=sys.stderr) return await _handle_binary_save(r) except Exception as e: print(f"[MCP] Error: {e}", file=sys.stderr) return f"Error: {e}" async def _handle_binary_save(r: httpx.Response) -> str: """Detects binary content (PDF/CSV) and saves to Downloads instead of returning raw bytes to Agent.""" ctype = r.headers.get("Content-Type", "").lower() if "application/pdf" in ctype or "text/csv" in ctype: try: downloads_path = os.path.join(os.path.expanduser("~"), "Downloads") os.makedirs(downloads_path, exist_ok=True) # Extract filename from header or use default cd = r.headers.get("Content-Disposition", "") filename = "FinMK_Export.pdf" if "filename=" in cd: filename = cd.split("filename=")[1].strip('"') elif "text/csv" in ctype: filename = "FinMK_Export.csv" filepath = os.path.join(downloads_path, filename) with open(filepath, "wb") as f: f.write(r.content) return f"SUCCESS: The file has been generated and saved directly to your Downloads folder: {filepath}" except Exception as e: return f"Error saving binary file: {e}" # Standard text/json response return r.text async def _post(path: str, payload: dict) -> str: err = _require_auth() if err: return err full_path = path.lstrip("/") url = f"{BASE_URL}/{full_path}" print(f"[MCP] POST {url} | Body: {json.dumps(payload)[:100]}...", file=sys.stderr) async with httpx.AsyncClient(timeout=300) as client: try: r = await client.post(url, json=payload, headers=_headers()) print(f"[MCP] Response {r.status_code}", file=sys.stderr) return await _handle_binary_save(r) except Exception as e: print(f"[MCP] Error: {e}", file=sys.stderr) return f"Error: {e}" async def _patch(path: str, payload: dict) -> str: err = _require_auth() if err: return err full_path = path.lstrip("/") url = f"{BASE_URL}/{full_path}" print(f"[MCP] PATCH {url} | Body: {json.dumps(payload)[:100]}...", file=sys.stderr) async with httpx.AsyncClient(timeout=300) as client: try: r = await client.patch(url, json=payload, headers=_headers()) print(f"[MCP] Response {r.status_code}", file=sys.stderr) return r.text except Exception as e: print(f"[MCP] Error: {e}", file=sys.stderr) return f"Error: {e}" async def _delete(path: str) -> str: err = _require_auth() if err: return err full_path = path.lstrip("/") url = f"{BASE_URL}/{full_path}" print(f"[MCP] DELETE {url}", file=sys.stderr) async with httpx.AsyncClient(timeout=30) as client: try: r = await client.delete(url, headers=_headers()) print(f"[MCP] Response {r.status_code}", file=sys.stderr) if r.status_code in (200, 204): return json.dumps({"success": True, "status": r.status_code}) return r.text except Exception as e: print(f"[MCP] Error: {e}", file=sys.stderr) return f"Error: {e}" # ══════════════════════════════════════════════ # AUTH TOOLS # ══════════════════════════════════════════════ @mcp.tool() async def authenticate_agent(username: str, password: str) -> str: """ Log into the FinMK backend with username and password. Optional if already logged in via session/token. """ url = f"{BASE_URL}/auth/login/" print(f"[MCP] Authenticating: {username} at {url}", file=sys.stderr) async with httpx.AsyncClient(timeout=15) as client: try: r = await client.post( url, json={"username": username, "password": password} ) print(f"[MCP] Auth Response: {r.status_code}", file=sys.stderr) if r.status_code != 200: ret = f"Login failed ({r.status_code}): {r.text}" print(f"[MCP] {ret}", file=sys.stderr) return ret data = r.json() _auth["access_token"] = data.get("access") _auth["refresh_token"] = data.get("refresh") _auth["username"] = username return f"Authenticated as '{username}'. Session is active — you can now call all other tools." except Exception as e: err = f"Connection error to {BASE_URL}: {e}" print(f"[MCP] {err}", file=sys.stderr) return err @mcp.tool() async def get_auth_status() -> str: """Check whether the AI agent is currently authenticated.""" if _auth["access_token"]: return f"Authenticated as '{_auth['username']}'. Session is active." return "Not authenticated. Call 'authenticate_agent' first." # ══════════════════════════════════════════════ # DASHBOARD # ══════════════════════════════════════════════ @mcp.tool() async def get_dashboard_summary() -> str: """ Get the full financial dashboard: total income, expenses, savings, net balance, budget adherence, recent transactions, and category breakdowns. """ return await _get("/finance/dashboard-summary/") # ══════════════════════════════════════════════ # TRANSACTIONS — INCOME # ══════════════════════════════════════════════ @mcp.tool() async def list_incomes() -> str: """List all income transactions for the authenticated user, sorted by date descending.""" return await _get("/finance/income/") @mcp.tool() async def add_income(title: str, amount: float, date: str, category: str = "") -> str: """ Add a new income transaction. - title: Description (e.g. 'Freelance project payment') - amount: Positive number (e.g. 5000.00) - date: YYYY-MM-DD format (e.g. '2025-03-15') - category: (Optional) Leave empty for auto-classification by AI """ payload: Dict[str, Any] = {"title": title, "amount": amount, "date": date} if category: payload["category"] = category return await _post("/finance/income/", payload) @mcp.tool() async def update_income(income_id: str, title: str = "", amount: float = 0, date: str = "", category: str = "") -> str: """ Update an existing income transaction by its ID. Only provide fields you want to change. """ payload: Dict[str, Any] = {} if title: payload["title"] = title if amount: payload["amount"] = amount if date: payload["date"] = date if category: payload["category"] = category if not payload: return "Error: Provide at least one field to update." return await _patch(f"/finance/income/{income_id}/", payload) @mcp.tool() async def delete_income(income_id: str) -> str: """Delete an income transaction permanently by its ID.""" return await _delete(f"/finance/income/{income_id}/") # ══════════════════════════════════════════════ # TRANSACTIONS — EXPENSE # ══════════════════════════════════════════════ @mcp.tool() async def list_expenses() -> str: """List all expense transactions for the authenticated user, sorted by date descending.""" return await _get("/finance/expense/") @mcp.tool() async def add_expense(title: str, amount: float, date: str, category: str = "") -> str: """ Add a new expense transaction. - title: Description (e.g. 'Grocery shopping at Reliance Fresh') - amount: Positive number (e.g. 1200.50) - date: YYYY-MM-DD format - category: (Optional) Leave empty for auto-classification by AI """ payload: Dict[str, Any] = {"title": title, "amount": amount, "date": date} if category: payload["category"] = category return await _post("/finance/expense/", payload) @mcp.tool() async def update_expense(expense_id: str, title: str = "", amount: float = 0, date: str = "", category: str = "") -> str: """ Update an existing expense transaction by its ID. Only provide fields you want to change. """ payload: Dict[str, Any] = {} if title: payload["title"] = title if amount: payload["amount"] = amount if date: payload["date"] = date if category: payload["category"] = category if not payload: return "Error: Provide at least one field to update." return await _patch(f"/finance/expense/{expense_id}/", payload) @mcp.tool() async def delete_expense(expense_id: str) -> str: """Delete an expense transaction permanently by its ID.""" return await _delete(f"/finance/expense/{expense_id}/") @mcp.tool() async def list_all_transactions() -> str: """ List ALL transactions (both incomes and expenses) in one call. Returns a combined JSON with 'incomes' and 'expenses' arrays. """ err = _require_auth() if err: return err async with httpx.AsyncClient(timeout=30) as client: try: inc_r, exp_r = await client.get(f"{BASE_URL}/finance/income/", headers=_headers()), \ await client.get(f"{BASE_URL}/finance/expense/", headers=_headers()) return json.dumps({ "incomes": inc_r.json() if inc_r.status_code == 200 else [], "expenses": exp_r.json() if exp_r.status_code == 200 else [], }, indent=2, default=str) except Exception as e: return f"Error: {e}" @mcp.tool() async def delete_transaction(transaction_id: str, transaction_type: str) -> str: """ Delete an income or expense by ID. - transaction_type: must be 'income' or 'expense' """ if transaction_type not in ("income", "expense"): return "Error: transaction_type must be 'income' or 'expense'." return await _delete(f"/finance/{transaction_type}/{transaction_id}/") # ══════════════════════════════════════════════ # BUDGETS # ══════════════════════════════════════════════ @mcp.tool() async def get_budgets() -> str: """List all budget limits set by the user across categories and months.""" return await _get("/finance/budget/") @mcp.tool() async def create_budget(category: str, limit_amount: float, month: str) -> str: """ Create a new budget limit for a category. - category: Expense category name (e.g. 'Food & Dining') - limit_amount: Maximum allowed spending (e.g. 5000.00) - month: YYYY-MM format (e.g. '2025-03') or 'all' for a permanent limit """ return await _post("/finance/budget/", { "category": category, "limit_amount": limit_amount, "month": month, }) @mcp.tool() async def update_budget(budget_id: str, limit_amount: float) -> str: """Update the spending limit of an existing budget entry by its ID.""" return await _patch(f"/finance/budget/{budget_id}/", {"limit_amount": limit_amount}) @mcp.tool() async def delete_budget(budget_id: str) -> str: """Delete a budget entry permanently by its ID.""" return await _delete(f"/finance/budget/{budget_id}/") # ══════════════════════════════════════════════ # SAVINGS GOALS # ══════════════════════════════════════════════ @mcp.tool() async def get_savings_goals() -> str: """List all savings goals with their target amounts and current progress.""" return await _get("/finance/savings-goals/") @mcp.tool() async def create_savings_goal(name: str, target_amount: float, current_amount: float = 0.0, deadline: str = "") -> str: """ Create a new savings goal. - name: Goal name (e.g. 'Emergency Fund', 'Europe Trip') - target_amount: Goal target in the user's currency - current_amount: Amount already saved (default 0) - deadline: Optional target date in YYYY-MM-DD format """ payload: Dict[str, Any] = {"name": name, "target_amount": target_amount, "current_amount": current_amount} if deadline: payload["deadline"] = deadline return await _post("/finance/savings-goals/", payload) @mcp.tool() async def update_savings_goal(goal_id: str, name: str = "", target_amount: float = 0, current_amount: float = -1, deadline: str = "") -> str: """ Update an existing savings goal by its ID. Provide only the fields to change. Use current_amount to log deposits/progress toward the goal. """ payload: Dict[str, Any] = {} if name: payload["name"] = name if target_amount: payload["target_amount"] = target_amount if current_amount >= 0: payload["current_amount"] = current_amount if deadline: payload["deadline"] = deadline if not payload: return "Error: Provide at least one field to update." return await _patch(f"/finance/savings-goals/{goal_id}/", payload) @mcp.tool() async def delete_savings_goal(goal_id: str) -> str: """Delete a savings goal permanently by its ID.""" return await _delete(f"/finance/savings-goals/{goal_id}/") # ══════════════════════════════════════════════ # ANALYTICS # ══════════════════════════════════════════════ @mcp.tool() async def get_cash_flow_forecast() -> str: """ Get a machine-learning cash flow forecast for the next 3 months based on historical income and expense patterns. Powered by Moirai / ARIMA models. """ return await _get("/analytics/forecast/") @mcp.tool() async def get_income_forecast() -> str: """Get a detailed income forecast using time-series ML models.""" return await _get("/analytics/income/forecast/") @mcp.tool() async def get_anomaly_report() -> str: """ Get the anomaly detection report. Returns unusual/suspicious transactions identified by the Isolation Forest algorithm that may warrant review. """ return await _get("/analytics/anomalies/") # ══════════════════════════════════════════════ # USER PROFILE # ══════════════════════════════════════════════ @mcp.tool() async def get_user_profile() -> str: """Retrieve the authenticated user's profile: name, email, currency preference, etc.""" return await _get("/auth/profile/") @mcp.tool() async def update_user_profile(first_name: str = "", last_name: str = "", email: str = "", currency: str = "") -> str: """ Update the user's profile settings. Only provide fields to change. - currency: ISO currency code (e.g. 'USD', 'INR', 'EUR') """ payload: Dict[str, Any] = {} if first_name: payload["first_name"] = first_name if last_name: payload["last_name"] = last_name if email: payload["email"] = email if currency: payload["currency"] = currency if not payload: return "Error: Provide at least one field to update." return await _patch("/auth/profile/", payload) # ══════════════════════════════════════════════ # CHAT HISTORY # ══════════════════════════════════════════════ @mcp.tool() async def get_chat_history() -> str: """Retrieve the last 50 chat messages for the authenticated user.""" return await _get("/chat/message/") @mcp.tool() async def clear_chat_history() -> str: """Clear all chat history for the authenticated user. This action is irreversible.""" return await _delete("/chat/message/clear-all/") # ══════════════════════════════════════════════ # REPORTING & EXPORT # ══════════════════════════════════════════════ @mcp.tool() async def generate_comprehensive_report(provider: str = "Auto Mode") -> str: """ Generate a full financial intelligence report combining all data, trends, forecasts, budget adherence and AI-powered insights. - provider: AI model to use for narrative: 'Auto Mode', 'Gemini', 'OpenRouter', 'Cerebras', 'Groq' """ return await _get(f"/finance/comprehensive-report/?provider={provider}") @mcp.tool() async def get_financial_summary() -> str: """ A lightweight version of the comprehensive report. Returns only the top 5 critical financial highlights. Optimized for smaller AI models (e.g. Qwen 2b). """ data_str = await _get("/finance/comprehensive-report/") try: data = json.loads(data_str) summary = { "metric_date": data.get("metric_date"), "net_worth": data.get("savings_wealth", {}).get("current_net_worth"), "savings_rate": data.get("executive_overview", {}).get("savings_rate"), "health_score": data.get("health_scorecard", {}).get("overall_score"), "stability": data.get("health_scorecard", {}).get("metrics", {}).get("stability"), "behavioral_insight": data.get("behavioral_analysis", {}).get("insight_text"), "runway": data.get("solvency_runway") } return json.dumps(summary, indent=2) except: return data_str # Fallback to raw if parsing fails @mcp.tool() async def export_data() -> str: """ Export financial data as a professional Analytics PDF report. The report includes transaction history, AI insights, and forecasts. The file will be saved directly to your Downloads folder. """ # 1. Start background task res_str = await _post("/finance/generate-report/", {"type": "pdf", "format": "pdf"}) try: data = json.loads(res_str) task_id = data.get("task_id") if not task_id: return res_str # 2. Poll until complete import asyncio print(f"[MCP] Polling task {task_id}...", file=sys.stderr) for _ in range(30): # 45 seconds max (30 * 1.5) await asyncio.sleep(1.5) poll_res = await _get(f"/finance/generate-report/?task_id={task_id}") try: poll_data = json.loads(poll_res) if poll_data.get("status") == "completed": print(f"[MCP] Task {task_id} completed. Downloading...", file=sys.stderr) # 3. Download final (This triggers _handle_binary_save) return await _get(f"/finance/generate-report/?task_id={task_id}&download=true") if poll_data.get("status") == "failed": return f"Error: PDF generation failed: {poll_data.get('error')}" except: # If it's not JSON, maybe it's the error message or the status text if "completed" in poll_res.lower(): return await _get(f"/finance/generate-report/?task_id={task_id}&download=true") pass return "Error: Timeout waiting for PDF generation. The process is still running in the background." except Exception as e: print(f"[MCP] Polling Error: {e}", file=sys.stderr) return res_str @mcp.tool() async def send_email_report(email: str, provider: str = "Auto Mode") -> str: """ Send a comprehensive financial report to a specific email address. """ return await _post("/finance/send-email-report/", {"email": email, "provider": provider}) # ══════════════════════════════════════════════ # ADVANCED TRANSACTION TOOLS # ══════════════════════════════════════════════ @mcp.tool() async def get_transaction_categories() -> str: """Retrieve all available income and expense categories.""" return await _get("/finance/transactions/categories/") @mcp.tool() async def bulk_delete_transactions() -> str: """Delete ALL transactions for the authenticated user. USE WITH EXTREME CAUTION.""" return await _post("/finance/transactions/delete-all/", {}) @mcp.tool() async def read_pdf_document(file_path: str) -> str: """ Extract text content from a PDF file (e.g., reports, statements). - file_path: Absolute path to the PDF file on the local system. """ if not os.path.exists(file_path): return f"Error: File not found at {file_path}" if not file_path.lower().endswith(".pdf"): return "Error: This tool only supports PDF files." try: text_content = [] with pdfplumber.open(file_path) as pdf: for i, page in enumerate(pdf.pages): page_text = page.extract_text() if page_text: text_content.append(f"--- Page {i+1} ---\n{page_text}") if not text_content: return "Note: PDF was opened but no text could be extracted (it might be an image-only PDF)." full_text = "\n\n".join(text_content) # Limit text to avoid overwhelming the AI if it's huge if len(full_text) > 50000: full_text = full_text[:50000] + "... [Text Truncated for Brevity]" return full_text except Exception as e: return f"Error reading PDF: {e}" @mcp.tool() async def scan_receipt(file_path: str) -> str: """ Scan a receipt image to extract transaction details using AI OCR. - file_path: Absolute path to the receipt image on the local system. """ # Note: File upload via MCP proxy requires reading the file and sending as multipart/form-data # Since this is a proxy, we'll try to use the existing OCR endpoint if not os.path.exists(file_path): return f"Error: File not found at {file_path}" async with httpx.AsyncClient(timeout=60) as client: try: with open(file_path, "rb") as f: files = {"file": f} # Security: Remove Content-Type so httpx can set multipart/form-data correctly headers = _headers() if "Content-Type" in headers: del headers["Content-Type"] r = await client.post(f"{BASE_URL}/finance/transactions/scan-receipt/", files=files, headers=headers) return r.text except Exception as e: return f"Error: {e}" @mcp.tool() async def process_voice_command(command: str) -> str: """ Process a natural language voice command to log transactions or query data. - command: The spoken text (e.g., 'I spent 500 rupees on coffee today') """ return await _post("/finance/voice/command/", {"command": command}) @mcp.tool() async def upload_transactions(file_path: str) -> str: """ Upload a CSV or Excel file containing multiple transactions for bulk import. - file_path: Absolute path to the .csv or .xlsx file on the local system. """ if not os.path.exists(file_path): return f"Error: File not found at {file_path}" url = f"{BASE_URL}/finance/transactions/upload/" print(f"[MCP] Uploading transactions from {file_path} to {url}", file=sys.stderr) async with httpx.AsyncClient(timeout=60) as client: try: with open(file_path, "rb") as f: files = {"file": f} # Security: Remove Content-Type so httpx can set multipart/form-data correctly headers = _headers() if "Content-Type" in headers: del headers["Content-Type"] r = await client.post(url, files=files, headers=headers) print(f"[MCP] Upload Response: {r.status_code}", file=sys.stderr) return r.text except Exception as e: err = f"Error: {e}" print(f"[MCP] {err}", file=sys.stderr) return err # ══════════════════════════════════════════════ # AUTH & USER MANAGEMENT # ══════════════════════════════════════════════ @mcp.tool() async def register_user(username: str, password: str, email: str, first_name: str = "", last_name: str = "") -> str: """Create a new user account in FinMK.""" payload = { "username": username, "password": password, "email": email, "first_name": first_name, "last_name": last_name } url = f"{BASE_URL}/auth/register/" print(f"[MCP] Registering user: {username} at {url}", file=sys.stderr) async with httpx.AsyncClient(timeout=15) as client: try: r = await client.post(url, json=payload) print(f"[MCP] Register Response: {r.status_code}", file=sys.stderr) if r.status_code == 201: return f"Success: User '{username}' registered. You can now call 'authenticate_agent' to log in." return f"Registration failed ({r.status_code}): {r.text}" except Exception as e: err = f"Connection error: {e}" print(f"[MCP] {err}", file=sys.stderr) return err @mcp.tool() async def refresh_auth_token() -> str: """Refresh the session access token using the refresh token.""" if not _auth["refresh_token"]: return "Error: No refresh token available. Log in first." async with httpx.AsyncClient(timeout=15) as client: try: r = await client.post(f"{BASE_URL}/auth/token/refresh/", json={"refresh": _auth["refresh_token"]}) if r.status_code == 200: data = r.json() _auth["access_token"] = data.get("access") return "Access token refreshed successfully." return f"Token refresh failed: {r.text}" except Exception as e: return f"Connection error: {e}" # ══════════════════════════════════════════════ # CHAT MANAGEMENT # ══════════════════════════════════════════════ @mcp.tool() async def delete_chat_message(msg_id: str) -> str: """Delete a specific chat message by its ID.""" return await _delete(f"/chat/message/{msg_id}/") # ══════════════════════════════════════════════ # DIRECT MONGODB READ/WRITE TOOLS # ══════════════════════════════════════════════ @mcp.tool() async def get_my_full_data() -> str: """ Retrieve the COMPLETE MongoDB document for the currently authenticated user. This includes all financial data, settings, and profile info in one JSON blob. """ err = _require_auth() if err: return err username = _auth.get("username") if not username: return "Error: Username not found in session. Please authenticate first." try: import django from django.conf import settings if not settings.configured: os.environ.setdefault("DJANGO_SETTINGS_MODULE", "expense_tracker.settings") django.setup() from expense_tracker.utils import MongoDBClient from bson import json_util db = MongoDBClient.get_client() user_doc = db.users.find_one({"username": username}) if not user_doc: return f"Error: No MongoDB document found for user '{username}'." # Security: Remove sensitive fields if "password" in user_doc: del user_doc["password"] if "is_staff" in user_doc: del user_doc["is_staff"] if "is_superuser" in user_doc: del user_doc["is_superuser"] return json.dumps(json.loads(json_util.dumps(user_doc)), indent=2) except Exception as e: return f"MongoDB Error: {e}" @mcp.tool() async def query_mongodb(collection: str, filter_json: str = "{}", limit: int = 20) -> str: """ Execute a query directly on MongoDB Atlas (Read-only). - collection: MongoDB collection name ('users', 'income', 'expense', etc.) - filter_json: JSON string of MongoDB query filter (e.g. '{"category": "Food"}') - limit: Maximum number of documents to return (max 50) """ err = _require_auth() if err: return err limit = min(limit, 50) try: query_filter = json.loads(filter_json) except json.JSONDecodeError as e: return f"Error: filter_json is not valid JSON — {e}" try: import django from django.conf import settings if not settings.configured: os.environ.setdefault("DJANGO_SETTINGS_MODULE", "expense_tracker.settings") django.setup() from expense_tracker.utils import MongoDBClient from bson import json_util db = MongoDBClient.get_client() docs = list(db[collection].find(query_filter).limit(limit)) return json.dumps(json.loads(json_util.dumps(docs)), indent=2) except Exception as e: return f"MongoDB query error: {e}" result = db[collection].delete_many(query_filter) return json.dumps({"success": True, "deleted_count": result.deleted_count}) except Exception as e: return f"MongoDB delete error: {e}" # ══════════════════════════════════════════════ # API STATUS # ══════════════════════════════════════════════ @mcp.tool() async def get_api_status() -> str: """ Check the health status of all AI providers (Gemini, OpenRouter, Cerebras, LM Studio) and the MongoDB connection. Useful for diagnosing connectivity issues. """ async with httpx.AsyncClient(timeout=10) as client: try: r = await client.get(f"{BASE_URL}/finance/status/") return r.text except Exception as e: return f"Could not reach backend: {e}" # --- WAVE 2: ADVANCED FINANCE TOOLS --- @mcp.tool() async def get_sample_data_preview() -> str: """Get statistics about available sample transaction data.""" return await _get("/finance/sample-data/preview/") @mcp.tool() async def load_sample_data() -> str: """Load sample transactions into the authenticated user's account.""" return await _post("/finance/sample-data/load/", {}) @mcp.tool() async def get_budget_category_stats() -> str: """Get AI-driven insights into average monthly spending by category compared to budgets.""" res = await _get("/finance/budget/") try: data = json.loads(res) return json.dumps(data.get("categories", []), indent=2) except: return res @mcp.tool() async def get_historical_category_trends() -> str: """Get multi-month historical trends for expense and income categories.""" res = await _get("/finance/dashboard-summary/") try: data = json.loads(res) return json.dumps({ "expense_trends": data.get("category_trends", []), "income_trends": data.get("income_category_trends", []) }, indent=2) except: return res @mcp.tool() async def search_transactions_adv(query: str, start_date: str = "", end_date: str = "") -> str: """Advanced search for transactions with optional date filters (YYYY-MM-DD).""" params = {"search": query} if start_date: params["start_date"] = start_date if end_date: params["end_date"] = end_date return await _get("/finance/transactions/list/", params=params) # --- ADVANCED ANALYTICS (EXECUTIVE ENGINE) --- @mcp.tool() async def get_financial_health_score() -> str: """Get a comprehensive financial health and stability score (0-100).""" res = await _get("/finance/comprehensive-report/") try: data = json.loads(res) return json.dumps({ "health_score": data.get("executive_overview", {}).get("stability_score"), "savings_rate": data.get("executive_overview", {}).get("savings_rate"), "insight": data.get("behavioral_insight") }, indent=2) except: return res @mcp.tool() async def get_tax_liability_estimate() -> str: """Get an AI-powered estimate of potential tax liability based on income patterns.""" res = await _get("/finance/comprehensive-report/") try: data = json.loads(res) return json.dumps(data.get("tax_liability", {}), indent=2) except: return res @mcp.tool() async def get_solvency_runway() -> str: """Calculate how many months you can survive on current savings (Solvency Runway).""" res = await _get("/finance/comprehensive-report/") try: data = json.loads(res) return json.dumps(data.get("solvency_runway", {}), indent=2) except: return res @mcp.tool() async def get_recurring_payments() -> str: """Detect and list suspected recurring payments and subscriptions.""" res = await _get("/finance/comprehensive-report/") try: data = json.loads(res) return json.dumps(data.get("recurring_commitments", []), indent=2) except: return res @mcp.tool() async def get_financial_velocity() -> str: """Get the 'velocity' of your money—how fast you are earning vs. spending.""" res = await _get("/finance/comprehensive-report/") try: data = json.loads(res) return json.dumps(data.get("velocity", {}), indent=2) except: return res @mcp.tool() async def get_ai_recommendations() -> str: """Get personalized AI-generated financial recommendations and action items.""" res = await _get("/finance/comprehensive-report/") try: data = json.loads(res) return json.dumps(data.get("recommendations", []), indent=2) except: return res @mcp.tool() async def simulate_goal_completion(goal_id: str) -> str: """Predict the exact date a savings goal will be reached based on your net cash flow.""" report_res = await _get("/finance/comprehensive-report/") goals_res = await _get("/finance/savings-goals/") try: report_data = json.loads(report_res) goals_data = json.loads(goals_res) net_flow = report_data.get("executive_overview", {}).get("net_savings", 0) if net_flow <= 0: return "Simulation Error: Your net savings flow is negative or zero. You cannot reach this goal at your current spending rate." target_goal = next((g for g in goals_data if g["id"] == goal_id), None) if not target_goal: return f"Goal ID {goal_id} not found." remaining = target_goal["target_amount"] - target_goal["current_amount"] months_needed = remaining / net_flow from datetime import datetime, timedelta predicted_date = (datetime.now() + timedelta(days=months_needed * 30)).strftime('%Y-%m-%d') return json.dumps({ "goal_name": target_goal["title"], "remaining_amount": remaining, "monthly_savings_pacing": round(net_flow, 2), "predicted_months_to_complete": round(months_needed, 1), "estimated_completion_date": predicted_date }, indent=2) except: return "Simulation Error: Unable to compute prediction." # ══════════════════════════════════════════════ # TOOL CATALOGUE (for injection into AI prompts) # ══════════════════════════════════════════════ MCP_TOOL_CATALOGUE = """ ## 🚨 CRITICAL: TOOL CALLING FORMAT You MUST call tools using the standard MCP JSON format. DO NOT use XML tags. Use plain JSON. ### ✅ CORRECT JSON FORMAT (Example): ```json { "name": "add_income", "arguments": { "title": "Salary", "amount": 5000, "date": "2025-03-01" } } ``` ## AI AGENT CAPABILITIES - MASTER TOOLSET ### Authentication & Users - `authenticate_agent(username, password)` — Login (Optional if already logged in) - `get_auth_status()` — Check if authenticated - `register_user(...)` — Create new account - `refresh_auth_token()` — Refresh JWT access token - `get_user_profile()` — View profile info - `update_user_profile(...)` — Update name, email, currency ### Dashboard & Analytics - `get_dashboard_summary()` — Full financial overview - `get_cash_flow_forecast()` — 3-month ML cash flow forecast - `get_income_forecast()` — Income forecast - `get_anomaly_report()` — Unusual transaction detection ### Transactions - `list_all_transactions()` — All incomes + expenses - `list_incomes()` / `list_expenses()` — Specific lists - `add_income(...)` / `add_expense(...)` — Add new - `update_income(...)` / `update_expense(...)` — Edit existing - `delete_income(...)` / `delete_expense(...)` — Delete existing - `get_transaction_categories()` — List all categories - `bulk_delete_transactions()` — Delete ALL data (Caution!) - `upload_transactions(file_path)` — Bulk import from CSV/Excel ### Advanced Features - `export_data(format)` — Export to PDF/CSV - `send_email_report(email)` — Email the report - `scan_receipt(file_path)` — AI OCR for receipts - `process_voice_command(command)` — Natural language processing - `generate_comprehensive_report()` — AI-powered narrative report - `get_financial_health_score()` — Executive stability & savings report - `get_tax_liability_estimate()` — Estimate potential tax owed - `get_solvency_runway()` — Months of survival on savings - `get_recurring_payments()` — Subscriptions & recurring bills - `get_ai_recommendations()` — Personalized financial advice - `simulate_goal_completion(goal_id)` — Predictive goal reaching date ### Data & Search - `search_transactions_adv(query, start_date?, end_date?)` — Power search - `get_historical_category_trends()` — Multi-month trends - `get_budget_category_stats()` — Avg spend vs Budgets - `get_sample_data_preview()` / `load_sample_data()` — Test data management ### Chat - `get_chat_history()` — Recent chat messages - `clear_chat_history()` — Wipe all chat history - `delete_chat_message(msg_id)` — Remove one message ### Direct Database Access - `get_my_full_data()` — Get your COMPLETE user document (High Performance). - `query_mongodb(collection, filter)` — Read-only access to collections. - `get_api_status()` — Health check. """ if __name__ == "__main__": mcp.run()