FinMK / backend /mcp_server.py
Kumar
Refactor: Exclude PDF and CSV files from Git to fix HF push error
24e6f5b
"""
FinMK MCP Server
================
A full Model Context Protocol (MCP) server that exposes every existing Django REST API
as an agent-callable tool. Zero business logic duplication β€” all tools proxy through
the existing Django REST endpoints and reuse all validation, MongoDB operations, and
business logic already in place.
Works in two modes:
- Local dev: LM Studio (qwen3.5:2b) calls tools via MCP protocol
- Production: Gemini / Cerebras function-calling integrates same tools via chatbot
Usage:
python mcp_server.py (for LM Studio / stdio)
npx @modelcontextprotocol/inspector python mcp_server.py (for browser inspector)
"""
import os
import sys
import json
import httpx
import pdfplumber
from typing import Optional, Dict, Any
from mcp.server.fastmcp import FastMCP
from dotenv import load_dotenv
load_dotenv()
# ──────────────────────────────────────────────
# Configuration
# ──────────────────────────────────────────────
mcp = FastMCP("FinMK AI Agent")
PORT = os.getenv("PORT", "8000")
BASE_URL = (os.getenv("API_BASE_URL") or f"http://localhost:{PORT}/api").rstrip("/")
if os.getenv("RENDER") == "true" and not os.getenv("API_BASE_URL"):
BASE_URL = f"http://localhost:{PORT}/api".rstrip("/")
# Shared auth state β€” persists for the lifetime of the MCP session
_auth: Dict[str, Optional[str]] = {
"access_token": None,
"refresh_token": None,
"username": None,
}
def _headers() -> Dict[str, str]:
h = {"Content-Type": "application/json"}
if _auth["access_token"]:
h["Authorization"] = f"Bearer {_auth['access_token']}"
return h
def _require_auth() -> Optional[str]:
"""Returns an error string if not authenticated, else None."""
if not _auth["access_token"]:
return "Error: Not authenticated. Call 'authenticate_agent' first."
return None
async def _get(path: str, params: Optional[Dict[str, Any]] = None) -> str:
err = _require_auth()
if err:
return err
full_path = path.lstrip("/")
url = f"{BASE_URL}/{full_path}"
print(f"[MCP] GET {url}", file=sys.stderr)
async with httpx.AsyncClient(timeout=300) as client:
try:
r = await client.get(url, headers=_headers(), params=params)
print(f"[MCP] Response {r.status_code} from {url}", file=sys.stderr)
if r.status_code == 404:
print(f"[MCP] 404 ERROR Body: {r.text[:500]}", file=sys.stderr)
return await _handle_binary_save(r)
except Exception as e:
print(f"[MCP] Error: {e}", file=sys.stderr)
return f"Error: {e}"
async def _handle_binary_save(r: httpx.Response) -> str:
"""Detects binary content (PDF/CSV) and saves to Downloads instead of returning raw bytes to Agent."""
ctype = r.headers.get("Content-Type", "").lower()
if "application/pdf" in ctype or "text/csv" in ctype:
try:
downloads_path = os.path.join(os.path.expanduser("~"), "Downloads")
os.makedirs(downloads_path, exist_ok=True)
# Extract filename from header or use default
cd = r.headers.get("Content-Disposition", "")
filename = "FinMK_Export.pdf"
if "filename=" in cd:
filename = cd.split("filename=")[1].strip('"')
elif "text/csv" in ctype:
filename = "FinMK_Export.csv"
filepath = os.path.join(downloads_path, filename)
with open(filepath, "wb") as f:
f.write(r.content)
return f"SUCCESS: The file has been generated and saved directly to your Downloads folder: {filepath}"
except Exception as e:
return f"Error saving binary file: {e}"
# Standard text/json response
return r.text
async def _post(path: str, payload: dict) -> str:
err = _require_auth()
if err:
return err
full_path = path.lstrip("/")
url = f"{BASE_URL}/{full_path}"
print(f"[MCP] POST {url} | Body: {json.dumps(payload)[:100]}...", file=sys.stderr)
async with httpx.AsyncClient(timeout=300) as client:
try:
r = await client.post(url, json=payload, headers=_headers())
print(f"[MCP] Response {r.status_code}", file=sys.stderr)
return await _handle_binary_save(r)
except Exception as e:
print(f"[MCP] Error: {e}", file=sys.stderr)
return f"Error: {e}"
async def _patch(path: str, payload: dict) -> str:
err = _require_auth()
if err:
return err
full_path = path.lstrip("/")
url = f"{BASE_URL}/{full_path}"
print(f"[MCP] PATCH {url} | Body: {json.dumps(payload)[:100]}...", file=sys.stderr)
async with httpx.AsyncClient(timeout=300) as client:
try:
r = await client.patch(url, json=payload, headers=_headers())
print(f"[MCP] Response {r.status_code}", file=sys.stderr)
return r.text
except Exception as e:
print(f"[MCP] Error: {e}", file=sys.stderr)
return f"Error: {e}"
async def _delete(path: str) -> str:
err = _require_auth()
if err:
return err
full_path = path.lstrip("/")
url = f"{BASE_URL}/{full_path}"
print(f"[MCP] DELETE {url}", file=sys.stderr)
async with httpx.AsyncClient(timeout=30) as client:
try:
r = await client.delete(url, headers=_headers())
print(f"[MCP] Response {r.status_code}", file=sys.stderr)
if r.status_code in (200, 204):
return json.dumps({"success": True, "status": r.status_code})
return r.text
except Exception as e:
print(f"[MCP] Error: {e}", file=sys.stderr)
return f"Error: {e}"
# ══════════════════════════════════════════════
# AUTH TOOLS
# ══════════════════════════════════════════════
@mcp.tool()
async def authenticate_agent(username: str, password: str) -> str:
"""
Log into the FinMK backend with username and password.
Optional if already logged in via session/token.
"""
url = f"{BASE_URL}/auth/login/"
print(f"[MCP] Authenticating: {username} at {url}", file=sys.stderr)
async with httpx.AsyncClient(timeout=15) as client:
try:
r = await client.post(
url,
json={"username": username, "password": password}
)
print(f"[MCP] Auth Response: {r.status_code}", file=sys.stderr)
if r.status_code != 200:
ret = f"Login failed ({r.status_code}): {r.text}"
print(f"[MCP] {ret}", file=sys.stderr)
return ret
data = r.json()
_auth["access_token"] = data.get("access")
_auth["refresh_token"] = data.get("refresh")
_auth["username"] = username
return f"Authenticated as '{username}'. Session is active β€” you can now call all other tools."
except Exception as e:
err = f"Connection error to {BASE_URL}: {e}"
print(f"[MCP] {err}", file=sys.stderr)
return err
@mcp.tool()
async def get_auth_status() -> str:
"""Check whether the AI agent is currently authenticated."""
if _auth["access_token"]:
return f"Authenticated as '{_auth['username']}'. Session is active."
return "Not authenticated. Call 'authenticate_agent' first."
# ══════════════════════════════════════════════
# DASHBOARD
# ══════════════════════════════════════════════
@mcp.tool()
async def get_dashboard_summary() -> str:
"""
Get the full financial dashboard: total income, expenses, savings, net balance,
budget adherence, recent transactions, and category breakdowns.
"""
return await _get("/finance/dashboard-summary/")
# ══════════════════════════════════════════════
# TRANSACTIONS β€” INCOME
# ══════════════════════════════════════════════
@mcp.tool()
async def list_incomes() -> str:
"""List all income transactions for the authenticated user, sorted by date descending."""
return await _get("/finance/income/")
@mcp.tool()
async def add_income(title: str, amount: float, date: str, category: str = "") -> str:
"""
Add a new income transaction.
- title: Description (e.g. 'Freelance project payment')
- amount: Positive number (e.g. 5000.00)
- date: YYYY-MM-DD format (e.g. '2025-03-15')
- category: (Optional) Leave empty for auto-classification by AI
"""
payload: Dict[str, Any] = {"title": title, "amount": amount, "date": date}
if category:
payload["category"] = category
return await _post("/finance/income/", payload)
@mcp.tool()
async def update_income(income_id: str, title: str = "", amount: float = 0, date: str = "", category: str = "") -> str:
"""
Update an existing income transaction by its ID.
Only provide fields you want to change.
"""
payload: Dict[str, Any] = {}
if title:
payload["title"] = title
if amount:
payload["amount"] = amount
if date:
payload["date"] = date
if category:
payload["category"] = category
if not payload:
return "Error: Provide at least one field to update."
return await _patch(f"/finance/income/{income_id}/", payload)
@mcp.tool()
async def delete_income(income_id: str) -> str:
"""Delete an income transaction permanently by its ID."""
return await _delete(f"/finance/income/{income_id}/")
# ══════════════════════════════════════════════
# TRANSACTIONS β€” EXPENSE
# ══════════════════════════════════════════════
@mcp.tool()
async def list_expenses() -> str:
"""List all expense transactions for the authenticated user, sorted by date descending."""
return await _get("/finance/expense/")
@mcp.tool()
async def add_expense(title: str, amount: float, date: str, category: str = "") -> str:
"""
Add a new expense transaction.
- title: Description (e.g. 'Grocery shopping at Reliance Fresh')
- amount: Positive number (e.g. 1200.50)
- date: YYYY-MM-DD format
- category: (Optional) Leave empty for auto-classification by AI
"""
payload: Dict[str, Any] = {"title": title, "amount": amount, "date": date}
if category:
payload["category"] = category
return await _post("/finance/expense/", payload)
@mcp.tool()
async def update_expense(expense_id: str, title: str = "", amount: float = 0, date: str = "", category: str = "") -> str:
"""
Update an existing expense transaction by its ID.
Only provide fields you want to change.
"""
payload: Dict[str, Any] = {}
if title:
payload["title"] = title
if amount:
payload["amount"] = amount
if date:
payload["date"] = date
if category:
payload["category"] = category
if not payload:
return "Error: Provide at least one field to update."
return await _patch(f"/finance/expense/{expense_id}/", payload)
@mcp.tool()
async def delete_expense(expense_id: str) -> str:
"""Delete an expense transaction permanently by its ID."""
return await _delete(f"/finance/expense/{expense_id}/")
@mcp.tool()
async def list_all_transactions() -> str:
"""
List ALL transactions (both incomes and expenses) in one call.
Returns a combined JSON with 'incomes' and 'expenses' arrays.
"""
err = _require_auth()
if err:
return err
async with httpx.AsyncClient(timeout=30) as client:
try:
inc_r, exp_r = await client.get(f"{BASE_URL}/finance/income/", headers=_headers()), \
await client.get(f"{BASE_URL}/finance/expense/", headers=_headers())
return json.dumps({
"incomes": inc_r.json() if inc_r.status_code == 200 else [],
"expenses": exp_r.json() if exp_r.status_code == 200 else [],
}, indent=2, default=str)
except Exception as e:
return f"Error: {e}"
@mcp.tool()
async def delete_transaction(transaction_id: str, transaction_type: str) -> str:
"""
Delete an income or expense by ID.
- transaction_type: must be 'income' or 'expense'
"""
if transaction_type not in ("income", "expense"):
return "Error: transaction_type must be 'income' or 'expense'."
return await _delete(f"/finance/{transaction_type}/{transaction_id}/")
# ══════════════════════════════════════════════
# BUDGETS
# ══════════════════════════════════════════════
@mcp.tool()
async def get_budgets() -> str:
"""List all budget limits set by the user across categories and months."""
return await _get("/finance/budget/")
@mcp.tool()
async def create_budget(category: str, limit_amount: float, month: str) -> str:
"""
Create a new budget limit for a category.
- category: Expense category name (e.g. 'Food & Dining')
- limit_amount: Maximum allowed spending (e.g. 5000.00)
- month: YYYY-MM format (e.g. '2025-03') or 'all' for a permanent limit
"""
return await _post("/finance/budget/", {
"category": category,
"limit_amount": limit_amount,
"month": month,
})
@mcp.tool()
async def update_budget(budget_id: str, limit_amount: float) -> str:
"""Update the spending limit of an existing budget entry by its ID."""
return await _patch(f"/finance/budget/{budget_id}/", {"limit_amount": limit_amount})
@mcp.tool()
async def delete_budget(budget_id: str) -> str:
"""Delete a budget entry permanently by its ID."""
return await _delete(f"/finance/budget/{budget_id}/")
# ══════════════════════════════════════════════
# SAVINGS GOALS
# ══════════════════════════════════════════════
@mcp.tool()
async def get_savings_goals() -> str:
"""List all savings goals with their target amounts and current progress."""
return await _get("/finance/savings-goals/")
@mcp.tool()
async def create_savings_goal(name: str, target_amount: float, current_amount: float = 0.0, deadline: str = "") -> str:
"""
Create a new savings goal.
- name: Goal name (e.g. 'Emergency Fund', 'Europe Trip')
- target_amount: Goal target in the user's currency
- current_amount: Amount already saved (default 0)
- deadline: Optional target date in YYYY-MM-DD format
"""
payload: Dict[str, Any] = {"name": name, "target_amount": target_amount, "current_amount": current_amount}
if deadline:
payload["deadline"] = deadline
return await _post("/finance/savings-goals/", payload)
@mcp.tool()
async def update_savings_goal(goal_id: str, name: str = "", target_amount: float = 0, current_amount: float = -1, deadline: str = "") -> str:
"""
Update an existing savings goal by its ID. Provide only the fields to change.
Use current_amount to log deposits/progress toward the goal.
"""
payload: Dict[str, Any] = {}
if name:
payload["name"] = name
if target_amount:
payload["target_amount"] = target_amount
if current_amount >= 0:
payload["current_amount"] = current_amount
if deadline:
payload["deadline"] = deadline
if not payload:
return "Error: Provide at least one field to update."
return await _patch(f"/finance/savings-goals/{goal_id}/", payload)
@mcp.tool()
async def delete_savings_goal(goal_id: str) -> str:
"""Delete a savings goal permanently by its ID."""
return await _delete(f"/finance/savings-goals/{goal_id}/")
# ══════════════════════════════════════════════
# ANALYTICS
# ══════════════════════════════════════════════
@mcp.tool()
async def get_cash_flow_forecast() -> str:
"""
Get a machine-learning cash flow forecast for the next 3 months based on
historical income and expense patterns. Powered by Moirai / ARIMA models.
"""
return await _get("/analytics/forecast/")
@mcp.tool()
async def get_income_forecast() -> str:
"""Get a detailed income forecast using time-series ML models."""
return await _get("/analytics/income/forecast/")
@mcp.tool()
async def get_anomaly_report() -> str:
"""
Get the anomaly detection report. Returns unusual/suspicious transactions
identified by the Isolation Forest algorithm that may warrant review.
"""
return await _get("/analytics/anomalies/")
# ══════════════════════════════════════════════
# USER PROFILE
# ══════════════════════════════════════════════
@mcp.tool()
async def get_user_profile() -> str:
"""Retrieve the authenticated user's profile: name, email, currency preference, etc."""
return await _get("/auth/profile/")
@mcp.tool()
async def update_user_profile(first_name: str = "", last_name: str = "", email: str = "", currency: str = "") -> str:
"""
Update the user's profile settings. Only provide fields to change.
- currency: ISO currency code (e.g. 'USD', 'INR', 'EUR')
"""
payload: Dict[str, Any] = {}
if first_name:
payload["first_name"] = first_name
if last_name:
payload["last_name"] = last_name
if email:
payload["email"] = email
if currency:
payload["currency"] = currency
if not payload:
return "Error: Provide at least one field to update."
return await _patch("/auth/profile/", payload)
# ══════════════════════════════════════════════
# CHAT HISTORY
# ══════════════════════════════════════════════
@mcp.tool()
async def get_chat_history() -> str:
"""Retrieve the last 50 chat messages for the authenticated user."""
return await _get("/chat/message/")
@mcp.tool()
async def clear_chat_history() -> str:
"""Clear all chat history for the authenticated user. This action is irreversible."""
return await _delete("/chat/message/clear-all/")
# ══════════════════════════════════════════════
# REPORTING & EXPORT
# ══════════════════════════════════════════════
@mcp.tool()
async def generate_comprehensive_report(provider: str = "Auto Mode") -> str:
"""
Generate a full financial intelligence report combining all data, trends,
forecasts, budget adherence and AI-powered insights.
- provider: AI model to use for narrative: 'Auto Mode', 'Gemini', 'OpenRouter', 'Cerebras', 'Groq'
"""
return await _get(f"/finance/comprehensive-report/?provider={provider}")
@mcp.tool()
async def get_financial_summary() -> str:
"""
A lightweight version of the comprehensive report.
Returns only the top 5 critical financial highlights.
Optimized for smaller AI models (e.g. Qwen 2b).
"""
data_str = await _get("/finance/comprehensive-report/")
try:
data = json.loads(data_str)
summary = {
"metric_date": data.get("metric_date"),
"net_worth": data.get("savings_wealth", {}).get("current_net_worth"),
"savings_rate": data.get("executive_overview", {}).get("savings_rate"),
"health_score": data.get("health_scorecard", {}).get("overall_score"),
"stability": data.get("health_scorecard", {}).get("metrics", {}).get("stability"),
"behavioral_insight": data.get("behavioral_analysis", {}).get("insight_text"),
"runway": data.get("solvency_runway")
}
return json.dumps(summary, indent=2)
except:
return data_str # Fallback to raw if parsing fails
@mcp.tool()
async def export_data() -> str:
"""
Export financial data as a professional Analytics PDF report.
The report includes transaction history, AI insights, and forecasts.
The file will be saved directly to your Downloads folder.
"""
# 1. Start background task
res_str = await _post("/finance/generate-report/", {"type": "pdf", "format": "pdf"})
try:
data = json.loads(res_str)
task_id = data.get("task_id")
if not task_id:
return res_str
# 2. Poll until complete
import asyncio
print(f"[MCP] Polling task {task_id}...", file=sys.stderr)
for _ in range(30): # 45 seconds max (30 * 1.5)
await asyncio.sleep(1.5)
poll_res = await _get(f"/finance/generate-report/?task_id={task_id}")
try:
poll_data = json.loads(poll_res)
if poll_data.get("status") == "completed":
print(f"[MCP] Task {task_id} completed. Downloading...", file=sys.stderr)
# 3. Download final (This triggers _handle_binary_save)
return await _get(f"/finance/generate-report/?task_id={task_id}&download=true")
if poll_data.get("status") == "failed":
return f"Error: PDF generation failed: {poll_data.get('error')}"
except:
# If it's not JSON, maybe it's the error message or the status text
if "completed" in poll_res.lower():
return await _get(f"/finance/generate-report/?task_id={task_id}&download=true")
pass
return "Error: Timeout waiting for PDF generation. The process is still running in the background."
except Exception as e:
print(f"[MCP] Polling Error: {e}", file=sys.stderr)
return res_str
@mcp.tool()
async def send_email_report(email: str, provider: str = "Auto Mode") -> str:
"""
Send a comprehensive financial report to a specific email address.
"""
return await _post("/finance/send-email-report/", {"email": email, "provider": provider})
# ══════════════════════════════════════════════
# ADVANCED TRANSACTION TOOLS
# ══════════════════════════════════════════════
@mcp.tool()
async def get_transaction_categories() -> str:
"""Retrieve all available income and expense categories."""
return await _get("/finance/transactions/categories/")
@mcp.tool()
async def bulk_delete_transactions() -> str:
"""Delete ALL transactions for the authenticated user. USE WITH EXTREME CAUTION."""
return await _post("/finance/transactions/delete-all/", {})
@mcp.tool()
async def read_pdf_document(file_path: str) -> str:
"""
Extract text content from a PDF file (e.g., reports, statements).
- file_path: Absolute path to the PDF file on the local system.
"""
if not os.path.exists(file_path):
return f"Error: File not found at {file_path}"
if not file_path.lower().endswith(".pdf"):
return "Error: This tool only supports PDF files."
try:
text_content = []
with pdfplumber.open(file_path) as pdf:
for i, page in enumerate(pdf.pages):
page_text = page.extract_text()
if page_text:
text_content.append(f"--- Page {i+1} ---\n{page_text}")
if not text_content:
return "Note: PDF was opened but no text could be extracted (it might be an image-only PDF)."
full_text = "\n\n".join(text_content)
# Limit text to avoid overwhelming the AI if it's huge
if len(full_text) > 50000:
full_text = full_text[:50000] + "... [Text Truncated for Brevity]"
return full_text
except Exception as e:
return f"Error reading PDF: {e}"
@mcp.tool()
async def scan_receipt(file_path: str) -> str:
"""
Scan a receipt image to extract transaction details using AI OCR.
- file_path: Absolute path to the receipt image on the local system.
"""
# Note: File upload via MCP proxy requires reading the file and sending as multipart/form-data
# Since this is a proxy, we'll try to use the existing OCR endpoint
if not os.path.exists(file_path):
return f"Error: File not found at {file_path}"
async with httpx.AsyncClient(timeout=60) as client:
try:
with open(file_path, "rb") as f:
files = {"file": f}
# Security: Remove Content-Type so httpx can set multipart/form-data correctly
headers = _headers()
if "Content-Type" in headers: del headers["Content-Type"]
r = await client.post(f"{BASE_URL}/finance/transactions/scan-receipt/", files=files, headers=headers)
return r.text
except Exception as e:
return f"Error: {e}"
@mcp.tool()
async def process_voice_command(command: str) -> str:
"""
Process a natural language voice command to log transactions or query data.
- command: The spoken text (e.g., 'I spent 500 rupees on coffee today')
"""
return await _post("/finance/voice/command/", {"command": command})
@mcp.tool()
async def upload_transactions(file_path: str) -> str:
"""
Upload a CSV or Excel file containing multiple transactions for bulk import.
- file_path: Absolute path to the .csv or .xlsx file on the local system.
"""
if not os.path.exists(file_path):
return f"Error: File not found at {file_path}"
url = f"{BASE_URL}/finance/transactions/upload/"
print(f"[MCP] Uploading transactions from {file_path} to {url}", file=sys.stderr)
async with httpx.AsyncClient(timeout=60) as client:
try:
with open(file_path, "rb") as f:
files = {"file": f}
# Security: Remove Content-Type so httpx can set multipart/form-data correctly
headers = _headers()
if "Content-Type" in headers: del headers["Content-Type"]
r = await client.post(url, files=files, headers=headers)
print(f"[MCP] Upload Response: {r.status_code}", file=sys.stderr)
return r.text
except Exception as e:
err = f"Error: {e}"
print(f"[MCP] {err}", file=sys.stderr)
return err
# ══════════════════════════════════════════════
# AUTH & USER MANAGEMENT
# ══════════════════════════════════════════════
@mcp.tool()
async def register_user(username: str, password: str, email: str, first_name: str = "", last_name: str = "") -> str:
"""Create a new user account in FinMK."""
payload = {
"username": username,
"password": password,
"email": email,
"first_name": first_name,
"last_name": last_name
}
url = f"{BASE_URL}/auth/register/"
print(f"[MCP] Registering user: {username} at {url}", file=sys.stderr)
async with httpx.AsyncClient(timeout=15) as client:
try:
r = await client.post(url, json=payload)
print(f"[MCP] Register Response: {r.status_code}", file=sys.stderr)
if r.status_code == 201:
return f"Success: User '{username}' registered. You can now call 'authenticate_agent' to log in."
return f"Registration failed ({r.status_code}): {r.text}"
except Exception as e:
err = f"Connection error: {e}"
print(f"[MCP] {err}", file=sys.stderr)
return err
@mcp.tool()
async def refresh_auth_token() -> str:
"""Refresh the session access token using the refresh token."""
if not _auth["refresh_token"]:
return "Error: No refresh token available. Log in first."
async with httpx.AsyncClient(timeout=15) as client:
try:
r = await client.post(f"{BASE_URL}/auth/token/refresh/", json={"refresh": _auth["refresh_token"]})
if r.status_code == 200:
data = r.json()
_auth["access_token"] = data.get("access")
return "Access token refreshed successfully."
return f"Token refresh failed: {r.text}"
except Exception as e:
return f"Connection error: {e}"
# ══════════════════════════════════════════════
# CHAT MANAGEMENT
# ══════════════════════════════════════════════
@mcp.tool()
async def delete_chat_message(msg_id: str) -> str:
"""Delete a specific chat message by its ID."""
return await _delete(f"/chat/message/{msg_id}/")
# ══════════════════════════════════════════════
# DIRECT MONGODB READ/WRITE TOOLS
# ══════════════════════════════════════════════
@mcp.tool()
async def get_my_full_data() -> str:
"""
Retrieve the COMPLETE MongoDB document for the currently authenticated user.
This includes all financial data, settings, and profile info in one JSON blob.
"""
err = _require_auth()
if err:
return err
username = _auth.get("username")
if not username:
return "Error: Username not found in session. Please authenticate first."
try:
import django
from django.conf import settings
if not settings.configured:
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "expense_tracker.settings")
django.setup()
from expense_tracker.utils import MongoDBClient
from bson import json_util
db = MongoDBClient.get_client()
user_doc = db.users.find_one({"username": username})
if not user_doc:
return f"Error: No MongoDB document found for user '{username}'."
# Security: Remove sensitive fields
if "password" in user_doc: del user_doc["password"]
if "is_staff" in user_doc: del user_doc["is_staff"]
if "is_superuser" in user_doc: del user_doc["is_superuser"]
return json.dumps(json.loads(json_util.dumps(user_doc)), indent=2)
except Exception as e:
return f"MongoDB Error: {e}"
@mcp.tool()
async def query_mongodb(collection: str, filter_json: str = "{}", limit: int = 20) -> str:
"""
Execute a query directly on MongoDB Atlas (Read-only).
- collection: MongoDB collection name ('users', 'income', 'expense', etc.)
- filter_json: JSON string of MongoDB query filter (e.g. '{"category": "Food"}')
- limit: Maximum number of documents to return (max 50)
"""
err = _require_auth()
if err:
return err
limit = min(limit, 50)
try:
query_filter = json.loads(filter_json)
except json.JSONDecodeError as e:
return f"Error: filter_json is not valid JSON β€” {e}"
try:
import django
from django.conf import settings
if not settings.configured:
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "expense_tracker.settings")
django.setup()
from expense_tracker.utils import MongoDBClient
from bson import json_util
db = MongoDBClient.get_client()
docs = list(db[collection].find(query_filter).limit(limit))
return json.dumps(json.loads(json_util.dumps(docs)), indent=2)
except Exception as e:
return f"MongoDB query error: {e}"
result = db[collection].delete_many(query_filter)
return json.dumps({"success": True, "deleted_count": result.deleted_count})
except Exception as e:
return f"MongoDB delete error: {e}"
# ══════════════════════════════════════════════
# API STATUS
# ══════════════════════════════════════════════
@mcp.tool()
async def get_api_status() -> str:
"""
Check the health status of all AI providers (Gemini, OpenRouter, Cerebras, LM Studio)
and the MongoDB connection. Useful for diagnosing connectivity issues.
"""
async with httpx.AsyncClient(timeout=10) as client:
try:
r = await client.get(f"{BASE_URL}/finance/status/")
return r.text
except Exception as e:
return f"Could not reach backend: {e}"
# --- WAVE 2: ADVANCED FINANCE TOOLS ---
@mcp.tool()
async def get_sample_data_preview() -> str:
"""Get statistics about available sample transaction data."""
return await _get("/finance/sample-data/preview/")
@mcp.tool()
async def load_sample_data() -> str:
"""Load sample transactions into the authenticated user's account."""
return await _post("/finance/sample-data/load/", {})
@mcp.tool()
async def get_budget_category_stats() -> str:
"""Get AI-driven insights into average monthly spending by category compared to budgets."""
res = await _get("/finance/budget/")
try:
data = json.loads(res)
return json.dumps(data.get("categories", []), indent=2)
except:
return res
@mcp.tool()
async def get_historical_category_trends() -> str:
"""Get multi-month historical trends for expense and income categories."""
res = await _get("/finance/dashboard-summary/")
try:
data = json.loads(res)
return json.dumps({
"expense_trends": data.get("category_trends", []),
"income_trends": data.get("income_category_trends", [])
}, indent=2)
except:
return res
@mcp.tool()
async def search_transactions_adv(query: str, start_date: str = "", end_date: str = "") -> str:
"""Advanced search for transactions with optional date filters (YYYY-MM-DD)."""
params = {"search": query}
if start_date: params["start_date"] = start_date
if end_date: params["end_date"] = end_date
return await _get("/finance/transactions/list/", params=params)
# --- ADVANCED ANALYTICS (EXECUTIVE ENGINE) ---
@mcp.tool()
async def get_financial_health_score() -> str:
"""Get a comprehensive financial health and stability score (0-100)."""
res = await _get("/finance/comprehensive-report/")
try:
data = json.loads(res)
return json.dumps({
"health_score": data.get("executive_overview", {}).get("stability_score"),
"savings_rate": data.get("executive_overview", {}).get("savings_rate"),
"insight": data.get("behavioral_insight")
}, indent=2)
except:
return res
@mcp.tool()
async def get_tax_liability_estimate() -> str:
"""Get an AI-powered estimate of potential tax liability based on income patterns."""
res = await _get("/finance/comprehensive-report/")
try:
data = json.loads(res)
return json.dumps(data.get("tax_liability", {}), indent=2)
except:
return res
@mcp.tool()
async def get_solvency_runway() -> str:
"""Calculate how many months you can survive on current savings (Solvency Runway)."""
res = await _get("/finance/comprehensive-report/")
try:
data = json.loads(res)
return json.dumps(data.get("solvency_runway", {}), indent=2)
except:
return res
@mcp.tool()
async def get_recurring_payments() -> str:
"""Detect and list suspected recurring payments and subscriptions."""
res = await _get("/finance/comprehensive-report/")
try:
data = json.loads(res)
return json.dumps(data.get("recurring_commitments", []), indent=2)
except:
return res
@mcp.tool()
async def get_financial_velocity() -> str:
"""Get the 'velocity' of your moneyβ€”how fast you are earning vs. spending."""
res = await _get("/finance/comprehensive-report/")
try:
data = json.loads(res)
return json.dumps(data.get("velocity", {}), indent=2)
except:
return res
@mcp.tool()
async def get_ai_recommendations() -> str:
"""Get personalized AI-generated financial recommendations and action items."""
res = await _get("/finance/comprehensive-report/")
try:
data = json.loads(res)
return json.dumps(data.get("recommendations", []), indent=2)
except:
return res
@mcp.tool()
async def simulate_goal_completion(goal_id: str) -> str:
"""Predict the exact date a savings goal will be reached based on your net cash flow."""
report_res = await _get("/finance/comprehensive-report/")
goals_res = await _get("/finance/savings-goals/")
try:
report_data = json.loads(report_res)
goals_data = json.loads(goals_res)
net_flow = report_data.get("executive_overview", {}).get("net_savings", 0)
if net_flow <= 0:
return "Simulation Error: Your net savings flow is negative or zero. You cannot reach this goal at your current spending rate."
target_goal = next((g for g in goals_data if g["id"] == goal_id), None)
if not target_goal:
return f"Goal ID {goal_id} not found."
remaining = target_goal["target_amount"] - target_goal["current_amount"]
months_needed = remaining / net_flow
from datetime import datetime, timedelta
predicted_date = (datetime.now() + timedelta(days=months_needed * 30)).strftime('%Y-%m-%d')
return json.dumps({
"goal_name": target_goal["title"],
"remaining_amount": remaining,
"monthly_savings_pacing": round(net_flow, 2),
"predicted_months_to_complete": round(months_needed, 1),
"estimated_completion_date": predicted_date
}, indent=2)
except:
return "Simulation Error: Unable to compute prediction."
# ══════════════════════════════════════════════
# TOOL CATALOGUE (for injection into AI prompts)
# ══════════════════════════════════════════════
MCP_TOOL_CATALOGUE = """
## 🚨 CRITICAL: TOOL CALLING FORMAT
You MUST call tools using the standard MCP JSON format.
DO NOT use XML tags. Use plain JSON.
### βœ… CORRECT JSON FORMAT (Example):
```json
{
"name": "add_income",
"arguments": {
"title": "Salary",
"amount": 5000,
"date": "2025-03-01"
}
}
```
## AI AGENT CAPABILITIES - MASTER TOOLSET
### Authentication & Users
- `authenticate_agent(username, password)` β€” Login (Optional if already logged in)
- `get_auth_status()` β€” Check if authenticated
- `register_user(...)` β€” Create new account
- `refresh_auth_token()` β€” Refresh JWT access token
- `get_user_profile()` β€” View profile info
- `update_user_profile(...)` β€” Update name, email, currency
### Dashboard & Analytics
- `get_dashboard_summary()` β€” Full financial overview
- `get_cash_flow_forecast()` β€” 3-month ML cash flow forecast
- `get_income_forecast()` β€” Income forecast
- `get_anomaly_report()` β€” Unusual transaction detection
### Transactions
- `list_all_transactions()` β€” All incomes + expenses
- `list_incomes()` / `list_expenses()` β€” Specific lists
- `add_income(...)` / `add_expense(...)` β€” Add new
- `update_income(...)` / `update_expense(...)` β€” Edit existing
- `delete_income(...)` / `delete_expense(...)` β€” Delete existing
- `get_transaction_categories()` β€” List all categories
- `bulk_delete_transactions()` β€” Delete ALL data (Caution!)
- `upload_transactions(file_path)` β€” Bulk import from CSV/Excel
### Advanced Features
- `export_data(format)` β€” Export to PDF/CSV
- `send_email_report(email)` β€” Email the report
- `scan_receipt(file_path)` β€” AI OCR for receipts
- `process_voice_command(command)` β€” Natural language processing
- `generate_comprehensive_report()` β€” AI-powered narrative report
- `get_financial_health_score()` β€” Executive stability & savings report
- `get_tax_liability_estimate()` β€” Estimate potential tax owed
- `get_solvency_runway()` β€” Months of survival on savings
- `get_recurring_payments()` β€” Subscriptions & recurring bills
- `get_ai_recommendations()` β€” Personalized financial advice
- `simulate_goal_completion(goal_id)` β€” Predictive goal reaching date
### Data & Search
- `search_transactions_adv(query, start_date?, end_date?)` β€” Power search
- `get_historical_category_trends()` β€” Multi-month trends
- `get_budget_category_stats()` β€” Avg spend vs Budgets
- `get_sample_data_preview()` / `load_sample_data()` β€” Test data management
### Chat
- `get_chat_history()` β€” Recent chat messages
- `clear_chat_history()` β€” Wipe all chat history
- `delete_chat_message(msg_id)` β€” Remove one message
### Direct Database Access
- `get_my_full_data()` β€” Get your COMPLETE user document (High Performance).
- `query_mongodb(collection, filter)` β€” Read-only access to collections.
- `get_api_status()` β€” Health check.
"""
if __name__ == "__main__":
mcp.run()