| """ |
| FinMK MCP Server |
| ================ |
| A full Model Context Protocol (MCP) server that exposes every existing Django REST API |
| as an agent-callable tool. Zero business logic duplication β all tools proxy through |
| the existing Django REST endpoints and reuse all validation, MongoDB operations, and |
| business logic already in place. |
| |
| Works in two modes: |
| - Local dev: LM Studio (qwen3.5:2b) calls tools via MCP protocol |
| - Production: Gemini / Cerebras function-calling integrates same tools via chatbot |
| |
| Usage: |
| python mcp_server.py (for LM Studio / stdio) |
| npx @modelcontextprotocol/inspector python mcp_server.py (for browser inspector) |
| """ |
|
|
| import os |
| import sys |
| import json |
| import httpx |
| import pdfplumber |
| from typing import Optional, Dict, Any |
| from mcp.server.fastmcp import FastMCP |
| from dotenv import load_dotenv |
|
|
| load_dotenv() |
|
|
| |
| |
| |
| mcp = FastMCP("FinMK AI Agent") |
|
|
| PORT = os.getenv("PORT", "8000") |
| BASE_URL = (os.getenv("API_BASE_URL") or f"http://localhost:{PORT}/api").rstrip("/") |
| if os.getenv("RENDER") == "true" and not os.getenv("API_BASE_URL"): |
| BASE_URL = f"http://localhost:{PORT}/api".rstrip("/") |
|
|
| |
| _auth: Dict[str, Optional[str]] = { |
| "access_token": None, |
| "refresh_token": None, |
| "username": None, |
| } |
|
|
|
|
| def _headers() -> Dict[str, str]: |
| h = {"Content-Type": "application/json"} |
| if _auth["access_token"]: |
| h["Authorization"] = f"Bearer {_auth['access_token']}" |
| return h |
|
|
|
|
| def _require_auth() -> Optional[str]: |
| """Returns an error string if not authenticated, else None.""" |
| if not _auth["access_token"]: |
| return "Error: Not authenticated. Call 'authenticate_agent' first." |
| return None |
|
|
|
|
| async def _get(path: str, params: Optional[Dict[str, Any]] = None) -> str: |
| err = _require_auth() |
| if err: |
| return err |
| full_path = path.lstrip("/") |
| url = f"{BASE_URL}/{full_path}" |
| print(f"[MCP] GET {url}", file=sys.stderr) |
| async with httpx.AsyncClient(timeout=300) as client: |
| try: |
| r = await client.get(url, headers=_headers(), params=params) |
| print(f"[MCP] Response {r.status_code} from {url}", file=sys.stderr) |
| if r.status_code == 404: |
| print(f"[MCP] 404 ERROR Body: {r.text[:500]}", file=sys.stderr) |
| return await _handle_binary_save(r) |
| except Exception as e: |
| print(f"[MCP] Error: {e}", file=sys.stderr) |
| return f"Error: {e}" |
|
|
| async def _handle_binary_save(r: httpx.Response) -> str: |
| """Detects binary content (PDF/CSV) and saves to Downloads instead of returning raw bytes to Agent.""" |
| ctype = r.headers.get("Content-Type", "").lower() |
| |
| if "application/pdf" in ctype or "text/csv" in ctype: |
| try: |
| downloads_path = os.path.join(os.path.expanduser("~"), "Downloads") |
| os.makedirs(downloads_path, exist_ok=True) |
| |
| |
| cd = r.headers.get("Content-Disposition", "") |
| filename = "FinMK_Export.pdf" |
| if "filename=" in cd: |
| filename = cd.split("filename=")[1].strip('"') |
| elif "text/csv" in ctype: |
| filename = "FinMK_Export.csv" |
| |
| filepath = os.path.join(downloads_path, filename) |
| with open(filepath, "wb") as f: |
| f.write(r.content) |
| |
| return f"SUCCESS: The file has been generated and saved directly to your Downloads folder: {filepath}" |
| except Exception as e: |
| return f"Error saving binary file: {e}" |
| |
| |
| return r.text |
|
|
|
|
| async def _post(path: str, payload: dict) -> str: |
| err = _require_auth() |
| if err: |
| return err |
| full_path = path.lstrip("/") |
| url = f"{BASE_URL}/{full_path}" |
| print(f"[MCP] POST {url} | Body: {json.dumps(payload)[:100]}...", file=sys.stderr) |
| async with httpx.AsyncClient(timeout=300) as client: |
| try: |
| r = await client.post(url, json=payload, headers=_headers()) |
| print(f"[MCP] Response {r.status_code}", file=sys.stderr) |
| return await _handle_binary_save(r) |
| except Exception as e: |
| print(f"[MCP] Error: {e}", file=sys.stderr) |
| return f"Error: {e}" |
|
|
|
|
| async def _patch(path: str, payload: dict) -> str: |
| err = _require_auth() |
| if err: |
| return err |
| full_path = path.lstrip("/") |
| url = f"{BASE_URL}/{full_path}" |
| print(f"[MCP] PATCH {url} | Body: {json.dumps(payload)[:100]}...", file=sys.stderr) |
| async with httpx.AsyncClient(timeout=300) as client: |
| try: |
| r = await client.patch(url, json=payload, headers=_headers()) |
| print(f"[MCP] Response {r.status_code}", file=sys.stderr) |
| return r.text |
| except Exception as e: |
| print(f"[MCP] Error: {e}", file=sys.stderr) |
| return f"Error: {e}" |
|
|
|
|
| async def _delete(path: str) -> str: |
| err = _require_auth() |
| if err: |
| return err |
| full_path = path.lstrip("/") |
| url = f"{BASE_URL}/{full_path}" |
| print(f"[MCP] DELETE {url}", file=sys.stderr) |
| async with httpx.AsyncClient(timeout=30) as client: |
| try: |
| r = await client.delete(url, headers=_headers()) |
| print(f"[MCP] Response {r.status_code}", file=sys.stderr) |
| if r.status_code in (200, 204): |
| return json.dumps({"success": True, "status": r.status_code}) |
| return r.text |
| except Exception as e: |
| print(f"[MCP] Error: {e}", file=sys.stderr) |
| return f"Error: {e}" |
|
|
|
|
| |
| |
| |
|
|
| @mcp.tool() |
| async def authenticate_agent(username: str, password: str) -> str: |
| """ |
| Log into the FinMK backend with username and password. |
| Optional if already logged in via session/token. |
| """ |
| url = f"{BASE_URL}/auth/login/" |
| print(f"[MCP] Authenticating: {username} at {url}", file=sys.stderr) |
| async with httpx.AsyncClient(timeout=15) as client: |
| try: |
| r = await client.post( |
| url, |
| json={"username": username, "password": password} |
| ) |
| print(f"[MCP] Auth Response: {r.status_code}", file=sys.stderr) |
| if r.status_code != 200: |
| ret = f"Login failed ({r.status_code}): {r.text}" |
| print(f"[MCP] {ret}", file=sys.stderr) |
| return ret |
| data = r.json() |
| _auth["access_token"] = data.get("access") |
| _auth["refresh_token"] = data.get("refresh") |
| _auth["username"] = username |
| return f"Authenticated as '{username}'. Session is active β you can now call all other tools." |
| except Exception as e: |
| err = f"Connection error to {BASE_URL}: {e}" |
| print(f"[MCP] {err}", file=sys.stderr) |
| return err |
|
|
|
|
| @mcp.tool() |
| async def get_auth_status() -> str: |
| """Check whether the AI agent is currently authenticated.""" |
| if _auth["access_token"]: |
| return f"Authenticated as '{_auth['username']}'. Session is active." |
| return "Not authenticated. Call 'authenticate_agent' first." |
|
|
|
|
| |
| |
| |
|
|
| @mcp.tool() |
| async def get_dashboard_summary() -> str: |
| """ |
| Get the full financial dashboard: total income, expenses, savings, net balance, |
| budget adherence, recent transactions, and category breakdowns. |
| """ |
| return await _get("/finance/dashboard-summary/") |
|
|
|
|
| |
| |
| |
|
|
| @mcp.tool() |
| async def list_incomes() -> str: |
| """List all income transactions for the authenticated user, sorted by date descending.""" |
| return await _get("/finance/income/") |
|
|
|
|
| @mcp.tool() |
| async def add_income(title: str, amount: float, date: str, category: str = "") -> str: |
| """ |
| Add a new income transaction. |
| - title: Description (e.g. 'Freelance project payment') |
| - amount: Positive number (e.g. 5000.00) |
| - date: YYYY-MM-DD format (e.g. '2025-03-15') |
| - category: (Optional) Leave empty for auto-classification by AI |
| """ |
| payload: Dict[str, Any] = {"title": title, "amount": amount, "date": date} |
| if category: |
| payload["category"] = category |
| return await _post("/finance/income/", payload) |
|
|
|
|
| @mcp.tool() |
| async def update_income(income_id: str, title: str = "", amount: float = 0, date: str = "", category: str = "") -> str: |
| """ |
| Update an existing income transaction by its ID. |
| Only provide fields you want to change. |
| """ |
| payload: Dict[str, Any] = {} |
| if title: |
| payload["title"] = title |
| if amount: |
| payload["amount"] = amount |
| if date: |
| payload["date"] = date |
| if category: |
| payload["category"] = category |
| if not payload: |
| return "Error: Provide at least one field to update." |
| return await _patch(f"/finance/income/{income_id}/", payload) |
|
|
|
|
| @mcp.tool() |
| async def delete_income(income_id: str) -> str: |
| """Delete an income transaction permanently by its ID.""" |
| return await _delete(f"/finance/income/{income_id}/") |
|
|
|
|
| |
| |
| |
|
|
| @mcp.tool() |
| async def list_expenses() -> str: |
| """List all expense transactions for the authenticated user, sorted by date descending.""" |
| return await _get("/finance/expense/") |
|
|
|
|
| @mcp.tool() |
| async def add_expense(title: str, amount: float, date: str, category: str = "") -> str: |
| """ |
| Add a new expense transaction. |
| - title: Description (e.g. 'Grocery shopping at Reliance Fresh') |
| - amount: Positive number (e.g. 1200.50) |
| - date: YYYY-MM-DD format |
| - category: (Optional) Leave empty for auto-classification by AI |
| """ |
| payload: Dict[str, Any] = {"title": title, "amount": amount, "date": date} |
| if category: |
| payload["category"] = category |
| return await _post("/finance/expense/", payload) |
|
|
|
|
| @mcp.tool() |
| async def update_expense(expense_id: str, title: str = "", amount: float = 0, date: str = "", category: str = "") -> str: |
| """ |
| Update an existing expense transaction by its ID. |
| Only provide fields you want to change. |
| """ |
| payload: Dict[str, Any] = {} |
| if title: |
| payload["title"] = title |
| if amount: |
| payload["amount"] = amount |
| if date: |
| payload["date"] = date |
| if category: |
| payload["category"] = category |
| if not payload: |
| return "Error: Provide at least one field to update." |
| return await _patch(f"/finance/expense/{expense_id}/", payload) |
|
|
|
|
| @mcp.tool() |
| async def delete_expense(expense_id: str) -> str: |
| """Delete an expense transaction permanently by its ID.""" |
| return await _delete(f"/finance/expense/{expense_id}/") |
|
|
|
|
| @mcp.tool() |
| async def list_all_transactions() -> str: |
| """ |
| List ALL transactions (both incomes and expenses) in one call. |
| Returns a combined JSON with 'incomes' and 'expenses' arrays. |
| """ |
| err = _require_auth() |
| if err: |
| return err |
| async with httpx.AsyncClient(timeout=30) as client: |
| try: |
| inc_r, exp_r = await client.get(f"{BASE_URL}/finance/income/", headers=_headers()), \ |
| await client.get(f"{BASE_URL}/finance/expense/", headers=_headers()) |
| return json.dumps({ |
| "incomes": inc_r.json() if inc_r.status_code == 200 else [], |
| "expenses": exp_r.json() if exp_r.status_code == 200 else [], |
| }, indent=2, default=str) |
| except Exception as e: |
| return f"Error: {e}" |
|
|
|
|
| @mcp.tool() |
| async def delete_transaction(transaction_id: str, transaction_type: str) -> str: |
| """ |
| Delete an income or expense by ID. |
| - transaction_type: must be 'income' or 'expense' |
| """ |
| if transaction_type not in ("income", "expense"): |
| return "Error: transaction_type must be 'income' or 'expense'." |
| return await _delete(f"/finance/{transaction_type}/{transaction_id}/") |
|
|
|
|
| |
| |
| |
|
|
| @mcp.tool() |
| async def get_budgets() -> str: |
| """List all budget limits set by the user across categories and months.""" |
| return await _get("/finance/budget/") |
|
|
|
|
| @mcp.tool() |
| async def create_budget(category: str, limit_amount: float, month: str) -> str: |
| """ |
| Create a new budget limit for a category. |
| - category: Expense category name (e.g. 'Food & Dining') |
| - limit_amount: Maximum allowed spending (e.g. 5000.00) |
| - month: YYYY-MM format (e.g. '2025-03') or 'all' for a permanent limit |
| """ |
| return await _post("/finance/budget/", { |
| "category": category, |
| "limit_amount": limit_amount, |
| "month": month, |
| }) |
|
|
|
|
| @mcp.tool() |
| async def update_budget(budget_id: str, limit_amount: float) -> str: |
| """Update the spending limit of an existing budget entry by its ID.""" |
| return await _patch(f"/finance/budget/{budget_id}/", {"limit_amount": limit_amount}) |
|
|
|
|
| @mcp.tool() |
| async def delete_budget(budget_id: str) -> str: |
| """Delete a budget entry permanently by its ID.""" |
| return await _delete(f"/finance/budget/{budget_id}/") |
|
|
|
|
| |
| |
| |
|
|
| @mcp.tool() |
| async def get_savings_goals() -> str: |
| """List all savings goals with their target amounts and current progress.""" |
| return await _get("/finance/savings-goals/") |
|
|
|
|
| @mcp.tool() |
| async def create_savings_goal(name: str, target_amount: float, current_amount: float = 0.0, deadline: str = "") -> str: |
| """ |
| Create a new savings goal. |
| - name: Goal name (e.g. 'Emergency Fund', 'Europe Trip') |
| - target_amount: Goal target in the user's currency |
| - current_amount: Amount already saved (default 0) |
| - deadline: Optional target date in YYYY-MM-DD format |
| """ |
| payload: Dict[str, Any] = {"name": name, "target_amount": target_amount, "current_amount": current_amount} |
| if deadline: |
| payload["deadline"] = deadline |
| return await _post("/finance/savings-goals/", payload) |
|
|
|
|
| @mcp.tool() |
| async def update_savings_goal(goal_id: str, name: str = "", target_amount: float = 0, current_amount: float = -1, deadline: str = "") -> str: |
| """ |
| Update an existing savings goal by its ID. Provide only the fields to change. |
| Use current_amount to log deposits/progress toward the goal. |
| """ |
| payload: Dict[str, Any] = {} |
| if name: |
| payload["name"] = name |
| if target_amount: |
| payload["target_amount"] = target_amount |
| if current_amount >= 0: |
| payload["current_amount"] = current_amount |
| if deadline: |
| payload["deadline"] = deadline |
| if not payload: |
| return "Error: Provide at least one field to update." |
| return await _patch(f"/finance/savings-goals/{goal_id}/", payload) |
|
|
|
|
| @mcp.tool() |
| async def delete_savings_goal(goal_id: str) -> str: |
| """Delete a savings goal permanently by its ID.""" |
| return await _delete(f"/finance/savings-goals/{goal_id}/") |
|
|
|
|
| |
| |
| |
|
|
| @mcp.tool() |
| async def get_cash_flow_forecast() -> str: |
| """ |
| Get a machine-learning cash flow forecast for the next 3 months based on |
| historical income and expense patterns. Powered by Moirai / ARIMA models. |
| """ |
| return await _get("/analytics/forecast/") |
|
|
|
|
| @mcp.tool() |
| async def get_income_forecast() -> str: |
| """Get a detailed income forecast using time-series ML models.""" |
| return await _get("/analytics/income/forecast/") |
|
|
|
|
| @mcp.tool() |
| async def get_anomaly_report() -> str: |
| """ |
| Get the anomaly detection report. Returns unusual/suspicious transactions |
| identified by the Isolation Forest algorithm that may warrant review. |
| """ |
| return await _get("/analytics/anomalies/") |
|
|
|
|
| |
| |
| |
|
|
| @mcp.tool() |
| async def get_user_profile() -> str: |
| """Retrieve the authenticated user's profile: name, email, currency preference, etc.""" |
| return await _get("/auth/profile/") |
|
|
|
|
| @mcp.tool() |
| async def update_user_profile(first_name: str = "", last_name: str = "", email: str = "", currency: str = "") -> str: |
| """ |
| Update the user's profile settings. Only provide fields to change. |
| - currency: ISO currency code (e.g. 'USD', 'INR', 'EUR') |
| """ |
| payload: Dict[str, Any] = {} |
| if first_name: |
| payload["first_name"] = first_name |
| if last_name: |
| payload["last_name"] = last_name |
| if email: |
| payload["email"] = email |
| if currency: |
| payload["currency"] = currency |
| if not payload: |
| return "Error: Provide at least one field to update." |
| return await _patch("/auth/profile/", payload) |
|
|
|
|
| |
| |
| |
|
|
| @mcp.tool() |
| async def get_chat_history() -> str: |
| """Retrieve the last 50 chat messages for the authenticated user.""" |
| return await _get("/chat/message/") |
|
|
|
|
| @mcp.tool() |
| async def clear_chat_history() -> str: |
| """Clear all chat history for the authenticated user. This action is irreversible.""" |
| return await _delete("/chat/message/clear-all/") |
|
|
|
|
| |
| |
| |
|
|
| @mcp.tool() |
| async def generate_comprehensive_report(provider: str = "Auto Mode") -> str: |
| """ |
| Generate a full financial intelligence report combining all data, trends, |
| forecasts, budget adherence and AI-powered insights. |
| - provider: AI model to use for narrative: 'Auto Mode', 'Gemini', 'OpenRouter', 'Cerebras', 'Groq' |
| """ |
| return await _get(f"/finance/comprehensive-report/?provider={provider}") |
|
|
|
|
| @mcp.tool() |
| async def get_financial_summary() -> str: |
| """ |
| A lightweight version of the comprehensive report. |
| Returns only the top 5 critical financial highlights. |
| Optimized for smaller AI models (e.g. Qwen 2b). |
| """ |
| data_str = await _get("/finance/comprehensive-report/") |
| try: |
| data = json.loads(data_str) |
| summary = { |
| "metric_date": data.get("metric_date"), |
| "net_worth": data.get("savings_wealth", {}).get("current_net_worth"), |
| "savings_rate": data.get("executive_overview", {}).get("savings_rate"), |
| "health_score": data.get("health_scorecard", {}).get("overall_score"), |
| "stability": data.get("health_scorecard", {}).get("metrics", {}).get("stability"), |
| "behavioral_insight": data.get("behavioral_analysis", {}).get("insight_text"), |
| "runway": data.get("solvency_runway") |
| } |
| return json.dumps(summary, indent=2) |
| except: |
| return data_str |
|
|
|
|
| @mcp.tool() |
| async def export_data() -> str: |
| """ |
| Export financial data as a professional Analytics PDF report. |
| The report includes transaction history, AI insights, and forecasts. |
| The file will be saved directly to your Downloads folder. |
| """ |
| |
| res_str = await _post("/finance/generate-report/", {"type": "pdf", "format": "pdf"}) |
| try: |
| data = json.loads(res_str) |
| task_id = data.get("task_id") |
| if not task_id: |
| return res_str |
| |
| |
| import asyncio |
| print(f"[MCP] Polling task {task_id}...", file=sys.stderr) |
| for _ in range(30): |
| await asyncio.sleep(1.5) |
| poll_res = await _get(f"/finance/generate-report/?task_id={task_id}") |
| try: |
| poll_data = json.loads(poll_res) |
| if poll_data.get("status") == "completed": |
| print(f"[MCP] Task {task_id} completed. Downloading...", file=sys.stderr) |
| |
| return await _get(f"/finance/generate-report/?task_id={task_id}&download=true") |
| if poll_data.get("status") == "failed": |
| return f"Error: PDF generation failed: {poll_data.get('error')}" |
| except: |
| |
| if "completed" in poll_res.lower(): |
| return await _get(f"/finance/generate-report/?task_id={task_id}&download=true") |
| pass |
| return "Error: Timeout waiting for PDF generation. The process is still running in the background." |
| except Exception as e: |
| print(f"[MCP] Polling Error: {e}", file=sys.stderr) |
| return res_str |
|
|
|
|
| @mcp.tool() |
| async def send_email_report(email: str, provider: str = "Auto Mode") -> str: |
| """ |
| Send a comprehensive financial report to a specific email address. |
| """ |
| return await _post("/finance/send-email-report/", {"email": email, "provider": provider}) |
|
|
|
|
| |
| |
| |
|
|
| @mcp.tool() |
| async def get_transaction_categories() -> str: |
| """Retrieve all available income and expense categories.""" |
| return await _get("/finance/transactions/categories/") |
|
|
|
|
| @mcp.tool() |
| async def bulk_delete_transactions() -> str: |
| """Delete ALL transactions for the authenticated user. USE WITH EXTREME CAUTION.""" |
| return await _post("/finance/transactions/delete-all/", {}) |
|
|
|
|
| @mcp.tool() |
| async def read_pdf_document(file_path: str) -> str: |
| """ |
| Extract text content from a PDF file (e.g., reports, statements). |
| - file_path: Absolute path to the PDF file on the local system. |
| """ |
| if not os.path.exists(file_path): |
| return f"Error: File not found at {file_path}" |
| |
| if not file_path.lower().endswith(".pdf"): |
| return "Error: This tool only supports PDF files." |
|
|
| try: |
| text_content = [] |
| with pdfplumber.open(file_path) as pdf: |
| for i, page in enumerate(pdf.pages): |
| page_text = page.extract_text() |
| if page_text: |
| text_content.append(f"--- Page {i+1} ---\n{page_text}") |
| |
| if not text_content: |
| return "Note: PDF was opened but no text could be extracted (it might be an image-only PDF)." |
| |
| full_text = "\n\n".join(text_content) |
| |
| if len(full_text) > 50000: |
| full_text = full_text[:50000] + "... [Text Truncated for Brevity]" |
| |
| return full_text |
| except Exception as e: |
| return f"Error reading PDF: {e}" |
|
|
|
|
| @mcp.tool() |
| async def scan_receipt(file_path: str) -> str: |
| """ |
| Scan a receipt image to extract transaction details using AI OCR. |
| - file_path: Absolute path to the receipt image on the local system. |
| """ |
| |
| |
| if not os.path.exists(file_path): |
| return f"Error: File not found at {file_path}" |
| |
| async with httpx.AsyncClient(timeout=60) as client: |
| try: |
| with open(file_path, "rb") as f: |
| files = {"file": f} |
| |
| headers = _headers() |
| if "Content-Type" in headers: del headers["Content-Type"] |
| |
| r = await client.post(f"{BASE_URL}/finance/transactions/scan-receipt/", files=files, headers=headers) |
| return r.text |
| except Exception as e: |
| return f"Error: {e}" |
|
|
|
|
| @mcp.tool() |
| async def process_voice_command(command: str) -> str: |
| """ |
| Process a natural language voice command to log transactions or query data. |
| - command: The spoken text (e.g., 'I spent 500 rupees on coffee today') |
| """ |
| return await _post("/finance/voice/command/", {"command": command}) |
|
|
|
|
| @mcp.tool() |
| async def upload_transactions(file_path: str) -> str: |
| """ |
| Upload a CSV or Excel file containing multiple transactions for bulk import. |
| - file_path: Absolute path to the .csv or .xlsx file on the local system. |
| """ |
| if not os.path.exists(file_path): |
| return f"Error: File not found at {file_path}" |
| |
| url = f"{BASE_URL}/finance/transactions/upload/" |
| print(f"[MCP] Uploading transactions from {file_path} to {url}", file=sys.stderr) |
| async with httpx.AsyncClient(timeout=60) as client: |
| try: |
| with open(file_path, "rb") as f: |
| files = {"file": f} |
| |
| headers = _headers() |
| if "Content-Type" in headers: del headers["Content-Type"] |
| |
| r = await client.post(url, files=files, headers=headers) |
| print(f"[MCP] Upload Response: {r.status_code}", file=sys.stderr) |
| return r.text |
| except Exception as e: |
| err = f"Error: {e}" |
| print(f"[MCP] {err}", file=sys.stderr) |
| return err |
|
|
|
|
| |
| |
| |
|
|
| @mcp.tool() |
| async def register_user(username: str, password: str, email: str, first_name: str = "", last_name: str = "") -> str: |
| """Create a new user account in FinMK.""" |
| payload = { |
| "username": username, |
| "password": password, |
| "email": email, |
| "first_name": first_name, |
| "last_name": last_name |
| } |
| url = f"{BASE_URL}/auth/register/" |
| print(f"[MCP] Registering user: {username} at {url}", file=sys.stderr) |
| async with httpx.AsyncClient(timeout=15) as client: |
| try: |
| r = await client.post(url, json=payload) |
| print(f"[MCP] Register Response: {r.status_code}", file=sys.stderr) |
| if r.status_code == 201: |
| return f"Success: User '{username}' registered. You can now call 'authenticate_agent' to log in." |
| return f"Registration failed ({r.status_code}): {r.text}" |
| except Exception as e: |
| err = f"Connection error: {e}" |
| print(f"[MCP] {err}", file=sys.stderr) |
| return err |
|
|
|
|
| @mcp.tool() |
| async def refresh_auth_token() -> str: |
| """Refresh the session access token using the refresh token.""" |
| if not _auth["refresh_token"]: |
| return "Error: No refresh token available. Log in first." |
| |
| async with httpx.AsyncClient(timeout=15) as client: |
| try: |
| r = await client.post(f"{BASE_URL}/auth/token/refresh/", json={"refresh": _auth["refresh_token"]}) |
| if r.status_code == 200: |
| data = r.json() |
| _auth["access_token"] = data.get("access") |
| return "Access token refreshed successfully." |
| return f"Token refresh failed: {r.text}" |
| except Exception as e: |
| return f"Connection error: {e}" |
|
|
|
|
| |
| |
| |
|
|
| @mcp.tool() |
| async def delete_chat_message(msg_id: str) -> str: |
| """Delete a specific chat message by its ID.""" |
| return await _delete(f"/chat/message/{msg_id}/") |
|
|
|
|
| |
| |
| |
|
|
| @mcp.tool() |
| async def get_my_full_data() -> str: |
| """ |
| Retrieve the COMPLETE MongoDB document for the currently authenticated user. |
| This includes all financial data, settings, and profile info in one JSON blob. |
| """ |
| err = _require_auth() |
| if err: |
| return err |
|
|
| username = _auth.get("username") |
| if not username: |
| return "Error: Username not found in session. Please authenticate first." |
|
|
| try: |
| import django |
| from django.conf import settings |
| if not settings.configured: |
| os.environ.setdefault("DJANGO_SETTINGS_MODULE", "expense_tracker.settings") |
| django.setup() |
| |
| from expense_tracker.utils import MongoDBClient |
| from bson import json_util |
|
|
| db = MongoDBClient.get_client() |
| user_doc = db.users.find_one({"username": username}) |
| if not user_doc: |
| return f"Error: No MongoDB document found for user '{username}'." |
|
|
| |
| if "password" in user_doc: del user_doc["password"] |
| if "is_staff" in user_doc: del user_doc["is_staff"] |
| if "is_superuser" in user_doc: del user_doc["is_superuser"] |
|
|
| return json.dumps(json.loads(json_util.dumps(user_doc)), indent=2) |
| except Exception as e: |
| return f"MongoDB Error: {e}" |
|
|
|
|
| @mcp.tool() |
| async def query_mongodb(collection: str, filter_json: str = "{}", limit: int = 20) -> str: |
| """ |
| Execute a query directly on MongoDB Atlas (Read-only). |
| - collection: MongoDB collection name ('users', 'income', 'expense', etc.) |
| - filter_json: JSON string of MongoDB query filter (e.g. '{"category": "Food"}') |
| - limit: Maximum number of documents to return (max 50) |
| """ |
| err = _require_auth() |
| if err: |
| return err |
|
|
| limit = min(limit, 50) |
| try: |
| query_filter = json.loads(filter_json) |
| except json.JSONDecodeError as e: |
| return f"Error: filter_json is not valid JSON β {e}" |
|
|
| try: |
| import django |
| from django.conf import settings |
| if not settings.configured: |
| os.environ.setdefault("DJANGO_SETTINGS_MODULE", "expense_tracker.settings") |
| django.setup() |
| |
| from expense_tracker.utils import MongoDBClient |
| from bson import json_util |
|
|
| db = MongoDBClient.get_client() |
| docs = list(db[collection].find(query_filter).limit(limit)) |
| return json.dumps(json.loads(json_util.dumps(docs)), indent=2) |
| except Exception as e: |
| return f"MongoDB query error: {e}" |
| result = db[collection].delete_many(query_filter) |
| return json.dumps({"success": True, "deleted_count": result.deleted_count}) |
| except Exception as e: |
| return f"MongoDB delete error: {e}" |
|
|
|
|
| |
| |
| |
|
|
| @mcp.tool() |
| async def get_api_status() -> str: |
| """ |
| Check the health status of all AI providers (Gemini, OpenRouter, Cerebras, LM Studio) |
| and the MongoDB connection. Useful for diagnosing connectivity issues. |
| """ |
| async with httpx.AsyncClient(timeout=10) as client: |
| try: |
| r = await client.get(f"{BASE_URL}/finance/status/") |
| return r.text |
| except Exception as e: |
| return f"Could not reach backend: {e}" |
|
|
|
|
| |
|
|
| @mcp.tool() |
| async def get_sample_data_preview() -> str: |
| """Get statistics about available sample transaction data.""" |
| return await _get("/finance/sample-data/preview/") |
|
|
| @mcp.tool() |
| async def load_sample_data() -> str: |
| """Load sample transactions into the authenticated user's account.""" |
| return await _post("/finance/sample-data/load/", {}) |
|
|
| @mcp.tool() |
| async def get_budget_category_stats() -> str: |
| """Get AI-driven insights into average monthly spending by category compared to budgets.""" |
| res = await _get("/finance/budget/") |
| try: |
| data = json.loads(res) |
| return json.dumps(data.get("categories", []), indent=2) |
| except: |
| return res |
|
|
| @mcp.tool() |
| async def get_historical_category_trends() -> str: |
| """Get multi-month historical trends for expense and income categories.""" |
| res = await _get("/finance/dashboard-summary/") |
| try: |
| data = json.loads(res) |
| return json.dumps({ |
| "expense_trends": data.get("category_trends", []), |
| "income_trends": data.get("income_category_trends", []) |
| }, indent=2) |
| except: |
| return res |
|
|
| @mcp.tool() |
| async def search_transactions_adv(query: str, start_date: str = "", end_date: str = "") -> str: |
| """Advanced search for transactions with optional date filters (YYYY-MM-DD).""" |
| params = {"search": query} |
| if start_date: params["start_date"] = start_date |
| if end_date: params["end_date"] = end_date |
| return await _get("/finance/transactions/list/", params=params) |
|
|
| |
|
|
| @mcp.tool() |
| async def get_financial_health_score() -> str: |
| """Get a comprehensive financial health and stability score (0-100).""" |
| res = await _get("/finance/comprehensive-report/") |
| try: |
| data = json.loads(res) |
| return json.dumps({ |
| "health_score": data.get("executive_overview", {}).get("stability_score"), |
| "savings_rate": data.get("executive_overview", {}).get("savings_rate"), |
| "insight": data.get("behavioral_insight") |
| }, indent=2) |
| except: |
| return res |
|
|
| @mcp.tool() |
| async def get_tax_liability_estimate() -> str: |
| """Get an AI-powered estimate of potential tax liability based on income patterns.""" |
| res = await _get("/finance/comprehensive-report/") |
| try: |
| data = json.loads(res) |
| return json.dumps(data.get("tax_liability", {}), indent=2) |
| except: |
| return res |
|
|
| @mcp.tool() |
| async def get_solvency_runway() -> str: |
| """Calculate how many months you can survive on current savings (Solvency Runway).""" |
| res = await _get("/finance/comprehensive-report/") |
| try: |
| data = json.loads(res) |
| return json.dumps(data.get("solvency_runway", {}), indent=2) |
| except: |
| return res |
|
|
| @mcp.tool() |
| async def get_recurring_payments() -> str: |
| """Detect and list suspected recurring payments and subscriptions.""" |
| res = await _get("/finance/comprehensive-report/") |
| try: |
| data = json.loads(res) |
| return json.dumps(data.get("recurring_commitments", []), indent=2) |
| except: |
| return res |
|
|
| @mcp.tool() |
| async def get_financial_velocity() -> str: |
| """Get the 'velocity' of your moneyβhow fast you are earning vs. spending.""" |
| res = await _get("/finance/comprehensive-report/") |
| try: |
| data = json.loads(res) |
| return json.dumps(data.get("velocity", {}), indent=2) |
| except: |
| return res |
|
|
| @mcp.tool() |
| async def get_ai_recommendations() -> str: |
| """Get personalized AI-generated financial recommendations and action items.""" |
| res = await _get("/finance/comprehensive-report/") |
| try: |
| data = json.loads(res) |
| return json.dumps(data.get("recommendations", []), indent=2) |
| except: |
| return res |
|
|
| @mcp.tool() |
| async def simulate_goal_completion(goal_id: str) -> str: |
| """Predict the exact date a savings goal will be reached based on your net cash flow.""" |
| report_res = await _get("/finance/comprehensive-report/") |
| goals_res = await _get("/finance/savings-goals/") |
| try: |
| report_data = json.loads(report_res) |
| goals_data = json.loads(goals_res) |
| |
| net_flow = report_data.get("executive_overview", {}).get("net_savings", 0) |
| if net_flow <= 0: |
| return "Simulation Error: Your net savings flow is negative or zero. You cannot reach this goal at your current spending rate." |
| |
| target_goal = next((g for g in goals_data if g["id"] == goal_id), None) |
| if not target_goal: |
| return f"Goal ID {goal_id} not found." |
| |
| remaining = target_goal["target_amount"] - target_goal["current_amount"] |
| months_needed = remaining / net_flow |
| |
| from datetime import datetime, timedelta |
| predicted_date = (datetime.now() + timedelta(days=months_needed * 30)).strftime('%Y-%m-%d') |
| |
| return json.dumps({ |
| "goal_name": target_goal["title"], |
| "remaining_amount": remaining, |
| "monthly_savings_pacing": round(net_flow, 2), |
| "predicted_months_to_complete": round(months_needed, 1), |
| "estimated_completion_date": predicted_date |
| }, indent=2) |
| except: |
| return "Simulation Error: Unable to compute prediction." |
|
|
|
|
| |
| |
| |
|
|
| MCP_TOOL_CATALOGUE = """ |
| ## π¨ CRITICAL: TOOL CALLING FORMAT |
| You MUST call tools using the standard MCP JSON format. |
| DO NOT use XML tags. Use plain JSON. |
| |
| ### β
CORRECT JSON FORMAT (Example): |
| ```json |
| { |
| "name": "add_income", |
| "arguments": { |
| "title": "Salary", |
| "amount": 5000, |
| "date": "2025-03-01" |
| } |
| } |
| ``` |
| |
| ## AI AGENT CAPABILITIES - MASTER TOOLSET |
| |
| ### Authentication & Users |
| - `authenticate_agent(username, password)` β Login (Optional if already logged in) |
| - `get_auth_status()` β Check if authenticated |
| - `register_user(...)` β Create new account |
| - `refresh_auth_token()` β Refresh JWT access token |
| - `get_user_profile()` β View profile info |
| - `update_user_profile(...)` β Update name, email, currency |
| |
| ### Dashboard & Analytics |
| - `get_dashboard_summary()` β Full financial overview |
| - `get_cash_flow_forecast()` β 3-month ML cash flow forecast |
| - `get_income_forecast()` β Income forecast |
| - `get_anomaly_report()` β Unusual transaction detection |
| |
| ### Transactions |
| - `list_all_transactions()` β All incomes + expenses |
| - `list_incomes()` / `list_expenses()` β Specific lists |
| - `add_income(...)` / `add_expense(...)` β Add new |
| - `update_income(...)` / `update_expense(...)` β Edit existing |
| - `delete_income(...)` / `delete_expense(...)` β Delete existing |
| - `get_transaction_categories()` β List all categories |
| - `bulk_delete_transactions()` β Delete ALL data (Caution!) |
| - `upload_transactions(file_path)` β Bulk import from CSV/Excel |
| |
| ### Advanced Features |
| - `export_data(format)` β Export to PDF/CSV |
| - `send_email_report(email)` β Email the report |
| - `scan_receipt(file_path)` β AI OCR for receipts |
| - `process_voice_command(command)` β Natural language processing |
| - `generate_comprehensive_report()` β AI-powered narrative report |
| - `get_financial_health_score()` β Executive stability & savings report |
| - `get_tax_liability_estimate()` β Estimate potential tax owed |
| - `get_solvency_runway()` β Months of survival on savings |
| - `get_recurring_payments()` β Subscriptions & recurring bills |
| - `get_ai_recommendations()` β Personalized financial advice |
| - `simulate_goal_completion(goal_id)` β Predictive goal reaching date |
| |
| ### Data & Search |
| - `search_transactions_adv(query, start_date?, end_date?)` β Power search |
| - `get_historical_category_trends()` β Multi-month trends |
| - `get_budget_category_stats()` β Avg spend vs Budgets |
| - `get_sample_data_preview()` / `load_sample_data()` β Test data management |
| |
| ### Chat |
| - `get_chat_history()` β Recent chat messages |
| - `clear_chat_history()` β Wipe all chat history |
| - `delete_chat_message(msg_id)` β Remove one message |
| |
| ### Direct Database Access |
| - `get_my_full_data()` β Get your COMPLETE user document (High Performance). |
| - `query_mongodb(collection, filter)` β Read-only access to collections. |
| - `get_api_status()` β Health check. |
| """ |
|
|
|
|
| if __name__ == "__main__": |
| mcp.run() |
|
|