vachaspathi's picture
Update app.py
b36a95b verified
# app.py — MCP server (single-file)
# - FastMCP-based Zoho MCP POC with local HF model (DeepSeek / fallback) and Zoho Invoice v3 support.
# - Place next to config.py which must define:
# CLIENT_ID, CLIENT_SECRET, REFRESH_TOKEN, API_BASE,
# INVOICE_API_BASE, ORGANIZATION_ID, LOCAL_MODEL (or LOCAL_MODEL=None)
# - Developer note: this file will demo process_document on startup using the uploaded file
# path: /mnt/data/script_zoho_mcp (the file you uploaded). That path will be converted
# to a file:// URL automatically when tools receive it.
#
# IMPORTANT: Do NOT commit API keys to repos. Keep them in config.py locally.
from mcp.server.fastmcp import FastMCP
from typing import Optional, List, Tuple, Any, Dict
import requests
import os
import gradio as gr
import json
import time
import traceback
import inspect
import re
import logging
import base64
# Setup logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("mcp_server")
# Attempt to import transformers (optional)
TRANSFORMERS_AVAILABLE = False
try:
from transformers import pipeline, AutoTokenizer, AutoModelForCausalLM, AutoModelForSeq2SeqLM
TRANSFORMERS_AVAILABLE = True
except Exception as e:
logger.warning("transformers not available or failed to import: %s", e)
TRANSFORMERS_AVAILABLE = False
# ----------------------------
# Load config (must exist)
# ----------------------------
try:
from config import (
CLIENT_ID,
CLIENT_SECRET,
REFRESH_TOKEN,
API_BASE, # Zoho CRM base if used
INVOICE_API_BASE, # Zoho Invoice API base (e.g., https://invoice.zoho.in/api/v3)
ORGANIZATION_ID, # Zoho Invoice organization id
LOCAL_MODEL, # local HF model (e.g., "google/flan-t5-small") or None
)
except Exception as e:
raise SystemExit("Make sure config.py exists and contains CLIENT_ID, CLIENT_SECRET, REFRESH_TOKEN, API_BASE, INVOICE_API_BASE, ORGANIZATION_ID, LOCAL_MODEL (or set LOCAL_MODEL=None).")
# Safe optional globals (can be set in config)
LOCAL_TOKENIZER = globals().get("LOCAL_TOKENIZER", None)
# ----------------------------
# Initialize FastMCP
# ----------------------------
mcp = FastMCP("ZohoCRMAgent")
# ----------------------------
# Analytics / KPI logging (simple JSON file)
# ----------------------------
ANALYTICS_PATH = "mcp_analytics.json"
def _init_analytics():
if not os.path.exists(ANALYTICS_PATH):
base = {"tool_calls": {}, "llm_calls": 0, "last_llm_confidence": None, "created_at": time.time()}
with open(ANALYTICS_PATH, "w") as f:
json.dump(base, f, indent=2)
def _log_tool_call(tool_name: str, success: bool = True):
try:
with open(ANALYTICS_PATH, "r") as f:
data = json.load(f)
except Exception:
data = {"tool_calls": {}, "llm_calls": 0, "last_llm_confidence": None}
data["tool_calls"].setdefault(tool_name, {"count": 0, "success": 0, "fail": 0})
data["tool_calls"][tool_name]["count"] += 1
if success:
data["tool_calls"][tool_name]["success"] += 1
else:
data["tool_calls"][tool_name]["fail"] += 1
with open(ANALYTICS_PATH, "w") as f:
json.dump(data, f, indent=2)
def _log_llm_call(confidence: Optional[float] = None):
try:
with open(ANALYTICS_PATH, "r") as f:
data = json.load(f)
except Exception:
data = {"tool_calls": {}, "llm_calls": 0, "last_llm_confidence": None}
data["llm_calls"] = data.get("llm_calls", 0) + 1
if confidence is not None:
data["last_llm_confidence"] = confidence
with open(ANALYTICS_PATH, "w") as f:
json.dump(data, f, indent=2)
_init_analytics()
# ----------------------------
# Helper: normalize local file_path args
# ----------------------------
def _normalize_local_path_args(args: Any) -> Any:
if not isinstance(args, dict):
return args
fp = args.get("file_path") or args.get("path") or args.get("file")
if isinstance(fp, str) and fp.startswith("/mnt/data/") and os.path.exists(fp):
try:
args["file_url"] = f"file://{fp}"
try:
with open(fp, "rb") as f:
args["file_b64"] = base64.b64encode(f.read()).decode("utf-8")
except Exception as e:
logger.warning("Could not read file for base64: %s", e)
args.pop("file_b64", None)
except Exception as e:
logger.warning("Normalization error for file_path %s: %s", fp, e)
return args
# ----------------------------
# Local LLM loader (DeepSeek style or fallback)
# ----------------------------
LLM_PIPELINE = None
TOKENIZER = None
LOADED_MODEL_NAME = None
def _parse_model_and_revision(model_string: str) -> Tuple[str, Optional[str]]:
if ":" in model_string:
repo_id, revision = model_string.split(":", 1)
return repo_id.strip(), revision.strip()
return model_string, None
def init_local_model():
global LLM_PIPELINE, TOKENIZER, LOADED_MODEL_NAME
if not LOCAL_MODEL:
logger.info("LOCAL_MODEL not set — skipping local LLM load.")
LLM_PIPELINE = None
return
if not TRANSFORMERS_AVAILABLE:
logger.warning("transformers not available — skipping model load.")
LLM_PIPELINE = None
return
try:
repo_id, revision = _parse_model_and_revision(LOCAL_MODEL)
tokenizer_name = LOCAL_TOKENIZER or repo_id
LOADED_MODEL_NAME = f"{repo_id}" + (f" (rev={revision})" if revision else "")
seq2seq_keywords = ["flan", "t5", "seq2seq"]
if any(k in repo_id.lower() for k in seq2seq_keywords):
if revision:
TOKENIZER = AutoTokenizer.from_pretrained(tokenizer_name, use_fast=True, revision=revision)
model = AutoModelForSeq2SeqLM.from_pretrained(repo_id, revision=revision)
else:
TOKENIZER = AutoTokenizer.from_pretrained(tokenizer_name, use_fast=True)
model = AutoModelForSeq2SeqLM.from_pretrained(repo_id)
LLM_PIPELINE = pipeline("text2text-generation", model=model, tokenizer=TOKENIZER)
logger.info("Loaded seq2seq model: %s", LOADED_MODEL_NAME)
else:
if revision:
TOKENIZER = AutoTokenizer.from_pretrained(tokenizer_name, use_fast=True, revision=revision)
model = AutoModelForCausalLM.from_pretrained(repo_id, revision=revision)
else:
TOKENIZER = AutoTokenizer.from_pretrained(tokenizer_name, use_fast=True)
model = AutoModelForCausalLM.from_pretrained(repo_id)
LLM_PIPELINE = pipeline("text-generation", model=model, tokenizer=TOKENIZER)
logger.info("Loaded causal model: %s", LOADED_MODEL_NAME)
except Exception as e:
logger.error("Failed to load LOCAL_MODEL '%s': %s", LOCAL_MODEL, e)
traceback.print_exc()
# fallback
try:
fallback = "google/flan-t5-small"
TOKENIZER = AutoTokenizer.from_pretrained(fallback, use_fast=True)
model = AutoModelForSeq2SeqLM.from_pretrained(fallback)
LLM_PIPELINE = pipeline("text2text-generation", model=model, tokenizer=TOKENIZER)
LOADED_MODEL_NAME = fallback
logger.info("Loaded fallback model: %s", fallback)
except Exception as e2:
logger.error("Fallback model load failed: %s", e2)
traceback.print_exc()
LLM_PIPELINE = None
LOADED_MODEL_NAME = None
init_local_model()
def rule_based_response(message: str) -> str:
msg = (message or "").strip().lower()
if msg.startswith("create record") or msg.startswith("create contact"):
return "To create a record, use: create_record MODULE_NAME {\"Field\":\"value\"}"
if msg.startswith("create_invoice"):
return "To create invoice: create_invoice {\"customer_id\":\"...\",\"line_items\":[...]} (JSON)"
if msg.startswith("help") or "what can you do" in msg:
return "I can run create_record/update_record/delete_record or process local files by pasting their /mnt/data/ path."
return "(fallback) No local LLM loaded. Use explicit commands like create_record or paste /mnt/data/ path."
def local_llm_generate(prompt: str, max_tokens: int = 256) -> Dict[str, Any]:
if LLM_PIPELINE is None:
return {"text": rule_based_response(prompt), "confidence": None, "raw": None}
try:
out = LLM_PIPELINE(prompt, max_new_tokens=max_tokens)
text = ""
if isinstance(out, list) and len(out) > 0:
first = out[0]
if isinstance(first, dict):
text = first.get("generated_text") or first.get("text") or str(first)
else:
text = str(first)
else:
text = str(out)
_log_llm_call(None)
return {"text": text, "confidence": None, "raw": out}
except Exception as e:
logger.error("LLM error: %s", e)
traceback.print_exc()
return {"text": rule_based_response(prompt), "confidence": None, "raw": str(e)}
# ----------------------------
# Zoho OAuth helper & MCP tools
# ----------------------------
def _get_valid_token_headers() -> dict:
token_url = "https://accounts.zoho.in/oauth/v2/token"
params = {
"refresh_token": REFRESH_TOKEN,
"client_id": CLIENT_ID,
"client_secret": CLIENT_SECRET,
"grant_type": "refresh_token"
}
r = requests.post(token_url, params=params, timeout=20)
if r.status_code == 200:
t = r.json().get("access_token")
return {"Authorization": f"Zoho-oauthtoken {t}"}
else:
raise RuntimeError(f"Failed to refresh Zoho token: {r.status_code} {r.text}")
@mcp.tool()
def authenticate_zoho() -> str:
try:
_ = _get_valid_token_headers()
_log_tool_call("authenticate_zoho", True)
return "Zoho token refreshed (ok)."
except Exception as e:
_log_tool_call("authenticate_zoho", False)
return f"Failed to authenticate: {e}"
@mcp.tool()
def create_record(module_name: str, record_data: dict) -> str:
try:
headers = _get_valid_token_headers()
url = f"{API_BASE}/{module_name}"
payload = {"data": [record_data]}
r = requests.post(url, headers=headers, json=payload, timeout=20)
if r.status_code in (200, 201):
_log_tool_call("create_record", True)
return json.dumps(r.json(), ensure_ascii=False)
_log_tool_call("create_record", False)
return f"Error creating record: {r.status_code} {r.text}"
except Exception as e:
_log_tool_call("create_record", False)
return f"Exception: {e}"
@mcp.tool()
def get_records(module_name: str, page: int = 1, per_page: int = 200) -> list:
try:
headers = _get_valid_token_headers()
url = f"{API_BASE}/{module_name}"
r = requests.get(url, headers=headers, params={"page": page, "per_page": per_page}, timeout=20)
if r.status_code == 200:
_log_tool_call("get_records", True)
return r.json().get("data", [])
_log_tool_call("get_records", False)
return [f"Error retrieving {module_name}: {r.status_code} {r.text}"]
except Exception as e:
_log_tool_call("get_records", False)
return [f"Exception: {e}"]
@mcp.tool()
def update_record(module_name: str, record_id: str, data: dict) -> str:
try:
headers = _get_valid_token_headers()
url = f"{API_BASE}/{module_name}/{record_id}"
payload = {"data": [data]}
r = requests.put(url, headers=headers, json=payload, timeout=20)
if r.status_code == 200:
_log_tool_call("update_record", True)
return json.dumps(r.json(), ensure_ascii=False)
_log_tool_call("update_record", False)
return f"Error updating: {r.status_code} {r.text}"
except Exception as e:
_log_tool_call("update_record", False)
return f"Exception: {e}"
@mcp.tool()
def delete_record(module_name: str, record_id: str) -> str:
try:
headers = _get_valid_token_headers()
url = f"{API_BASE}/{module_name}/{record_id}"
r = requests.delete(url, headers=headers, timeout=20)
if r.status_code == 200:
_log_tool_call("delete_record", True)
return json.dumps(r.json(), ensure_ascii=False)
_log_tool_call("delete_record", False)
return f"Error deleting: {r.status_code} {r.text}"
except Exception as e:
_log_tool_call("delete_record", False)
return f"Exception: {e}"
# ----------------------------
# Zoho Invoice v3 functions (create invoice + upload attachment)
# ----------------------------
def _ensure_invoice_config():
if not INVOICE_API_BASE or not ORGANIZATION_ID:
raise RuntimeError("INVOICE_API_BASE and ORGANIZATION_ID must be set in config.py")
@mcp.tool()
def create_invoice(data: dict) -> str:
"""
Create an invoice in Zoho Invoice v3.
'data' should follow Zoho Invoice payload conventions (customer_id, line_items, etc.).
"""
try:
_ensure_invoice_config()
headers = _get_valid_token_headers()
url = f"{INVOICE_API_BASE}/invoices"
params = {"organization_id": ORGANIZATION_ID}
r = requests.post(url, headers=headers, params=params, json=data, timeout=30)
if r.status_code in (200, 201):
_log_tool_call("create_invoice", True)
return json.dumps(r.json(), ensure_ascii=False)
_log_tool_call("create_invoice", False)
return f"Error creating invoice: {r.status_code} {r.text}"
except Exception as e:
_log_tool_call("create_invoice", False)
return f"Exception: {e}"
def upload_invoice_attachment(invoice_id: str, file_path: str) -> str:
"""
Upload a local file as an attachment to the given invoice id.
"""
try:
_ensure_invoice_config()
except Exception as e:
return f"Missing config: {e}"
if not os.path.exists(file_path):
return f"Attachment file not found: {file_path}"
try:
headers = _get_valid_token_headers()
headers.pop("Content-Type", None) # requests will set for multipart
url = f"{INVOICE_API_BASE}/invoices/{invoice_id}/attachments"
params = {"organization_id": ORGANIZATION_ID}
with open(file_path, "rb") as f:
files = {"attachment": (os.path.basename(file_path), f)}
r = requests.post(url, headers=headers, params=params, files=files, timeout=60)
if r.status_code in (200, 201):
_log_tool_call("upload_invoice_attachment", True)
return json.dumps(r.json(), ensure_ascii=False)
_log_tool_call("upload_invoice_attachment", False)
return f"Error uploading attachment: {r.status_code} {r.text}"
except Exception as e:
_log_tool_call("upload_invoice_attachment", False)
return f"Exception uploading attachment: {e}"
@mcp.tool()
def process_document(file_path: str, target_module: Optional[str] = "Contacts") -> dict:
"""
Simple placeholder that simulates OCR+parsing.
Returns extracted_data with a confidence score and normalizes path->file_url.
"""
try:
if os.path.exists(file_path):
file_url = f"file://{file_path}"
extracted = {
"Name": "ACME Corp (simulated)",
"Email": "contact@acme.example",
"Phone": "+91-99999-00000",
"Total": "1234.00",
"Confidence": 0.88
}
_log_tool_call("process_document", True)
return {"status": "success", "file": os.path.basename(file_path), "file_url": file_url, "target_module": target_module, "extracted_data": extracted}
else:
_log_tool_call("process_document", False)
return {"status": "error", "error": "file not found", "file_path": file_path}
except Exception as e:
_log_tool_call("process_document", False)
return {"status": "error", "error": str(e)}
# ----------------------------
# Helpers: map LLM args -> Zoho payloads and extract ids
# ----------------------------
def _extract_created_id_from_zoho_response(resp_json) -> Optional[str]:
try:
if not resp_json:
return None
if isinstance(resp_json, str):
try:
resp_json = json.loads(resp_json)
except Exception:
return None
data = resp_json.get("data") or resp_json.get("result") or None
if isinstance(data, list) and len(data) > 0:
details = data[0].get("details") or data[0]
if isinstance(details, dict):
for key in ("id", "ID", "Id"):
if key in details:
return str(details[key])
for v in details.values():
if isinstance(v, (str, int)) and len(str(v)) >= 4:
return str(v)
if "id" in resp_json:
return str(resp_json["id"])
except Exception:
pass
return None
def _map_contact_args_to_zoho_payload(args: dict) -> dict:
contact_payload = {}
if "contact" in args:
contact_payload["Last_Name"] = args.get("contact")
if "first_name" in args:
contact_payload["First_Name"] = args.get("first_name")
if "email" in args:
contact_payload["Email"] = args.get("email")
if "mobile" in args:
contact_payload["Phone"] = args.get("mobile")
if "currency" in args:
contact_payload["Currency"] = args.get("currency")
for k, v in args.items():
if k.lower() in ("contact", "email", "mobile", "first_name", "currency", "invoice", "items", "line_items"):
continue
if isinstance(v, (str, int, float)):
if k not in contact_payload:
contact_payload[k] = v
return contact_payload
def _build_invoice_payload_for_zoho(contact_id: str, invoice_items: List[dict], currency: str = None, vat_pct: float = 0.0) -> dict:
line_items = []
for it in invoice_items:
amount = None
for k in ("item", "amount", "price", "rate"):
if k in it:
try:
amount = float(str(it[k]).replace("$", "").replace(",", "").strip())
break
except Exception:
continue
name = it.get("description") or it.get("name") or it.get("service") or "Item"
qty = it.get("quantity") or it.get("qty") or 1
try:
qty = int(qty)
except Exception:
try:
qty = int(float(qty))
except Exception:
qty = 1
if amount is None:
amount = 0.0
line_items.append({"name": name, "rate": amount, "quantity": qty})
tax_obj = None
try:
vat_val = float(vat_pct)
except Exception:
try:
vat_val = float(str(vat_pct).replace("%", "").strip())
except Exception:
vat_val = 0.0
if vat_val > 0:
tax_obj = {"name": "VAT", "percentage": vat_val}
invoice_payload = {"customer_id": contact_id, "line_items": line_items}
if currency:
invoice_payload["currency_code"] = currency
if tax_obj:
invoice_payload["tax"] = tax_obj
return invoice_payload
# ----------------------------
# Parse & execute model tool JSON outputs (core executor)
# ----------------------------
def parse_and_execute_model_tool_output(model_text: str, history: Optional[List[Tuple[str,str]]] = None) -> str:
try:
payload = json.loads(model_text)
except Exception:
return "(Parse) Model output was not valid JSON tool instruction."
instructions = []
if isinstance(payload, dict) and "tool" in payload:
instructions.append(payload)
elif isinstance(payload, list):
instructions = [p for p in payload if isinstance(p, dict) and "tool" in p]
else:
return "(Parse) No tool instructions found."
results = []
contact_id = None
for instr in instructions:
tool = instr.get("tool")
args = instr.get("args", {}) or {}
args = _normalize_local_path_args(args)
if tool == "create_record":
try:
contact_payload = _map_contact_args_to_zoho_payload(args)
module = args.get("module_name") or args.get("module") or "Contacts"
res = create_record(module, contact_payload)
results.append(f"create_record -> {res}")
# Try extract id
cid = _extract_created_id_from_zoho_response(res if not isinstance(res, str) else (json.loads(res) if (isinstance(res, str) and res.strip().startswith("{")) else None))
if not cid:
try:
parsed = json.loads(res) if isinstance(res, str) and res.strip().startswith("{") else None
cid = _extract_created_id_from_zoho_response(parsed)
except Exception:
cid = None
if cid:
contact_id = cid
except Exception as e:
results.append(f"create_record failed: {e}")
continue
elif tool in ("create_invoice", "createInvoice", "createInvoiceRecord"):
invoice_items = args.get("invoice") or args.get("items") or args.get("line_items") or []
currency = args.get("currency") or args.get("currency_code") or args.get("currency_symbol") or None
vat = args.get("vat") or args.get("tax") or args.get("vat_pct") or 0
try:
if isinstance(vat, str) and "%" in vat:
vat_val = float(vat.replace("%", "").strip())
else:
vat_val = float(vat)
except Exception:
vat_val = 0.0
if not contact_id:
contact_id = args.get("contact_id") or args.get("customer_id") or None
if not contact_id:
results.append("create_invoice skipped: no contact/customer id available. Create contact first.")
continue
zoho_line_items = []
for it in invoice_items:
amount = None
for k in ("amount","price","rate","item"):
if k in it:
try:
amount = float(str(it[k]).replace("$","").replace(",","").strip())
break
except Exception:
continue
name = it.get("description") or it.get("name") or it.get("service") or "Item"
qty = it.get("quantity") or it.get("qty") or 1
try:
qty = int(qty)
except Exception:
try:
qty = int(float(qty))
except Exception:
qty = 1
if amount is None:
amount = 0.0
zoho_line_items.append({"name": name, "rate": amount, "quantity": qty})
invoice_payload = {"customer_id": contact_id, "line_items": zoho_line_items}
if currency:
invoice_payload["currency_code"] = currency
if vat_val > 0:
invoice_payload["tax"] = {"name": "VAT", "percentage": vat_val}
if args.get("notes"):
invoice_payload["notes"] = args.get("notes")
if args.get("reference_number"):
invoice_payload["reference_number"] = args.get("reference_number")
if args.get("terms"):
invoice_payload["terms"] = args.get("terms")
try:
inv_res_text = create_invoice(invoice_payload)
results.append(f"create_invoice -> {inv_res_text}")
except Exception as e:
results.append(f"create_invoice failed: {e}")
continue
# extract invoice id
inv_id = None
try:
parsed_inv = json.loads(inv_res_text) if isinstance(inv_res_text, str) and inv_res_text.strip().startswith("{") else None
if parsed_inv:
if isinstance(parsed_inv.get("invoice"), dict) and parsed_inv["invoice"].get("invoice_id"):
inv_id = str(parsed_inv["invoice"]["invoice_id"])
else:
for k in ("invoice_id","id"):
if k in parsed_inv:
inv_id = str(parsed_inv[k]); break
data = parsed_inv.get("data")
if not inv_id and isinstance(data, list) and len(data) > 0:
d0 = data[0]
if isinstance(d0, dict):
for key in ("invoice_id", "id"):
if key in d0:
inv_id = str(d0[key]); break
except Exception:
inv_id = None
# attempt attachment upload if provided
attachment_path = None
if args.get("file_path"):
attachment_path = args.get("file_path")
elif args.get("file_url"):
fu = args.get("file_url")
if isinstance(fu, str) and fu.startswith("file://"):
attachment_path = fu.replace("file://", "")
if inv_id and attachment_path:
try:
attach_res = upload_invoice_attachment(inv_id, attachment_path)
results.append(f"upload_attachment -> {attach_res}")
except Exception as e:
results.append(f"upload_attachment failed: {e}")
else:
# Generic attempt to call a local tool
if callable(globals().get(tool)):
try:
args_norm = args if not isinstance(args, dict) else _normalize_local_path_args(args)
if isinstance(args_norm, dict):
out = globals()[tool](**args_norm)
elif isinstance(args_norm, (list, tuple)):
out = globals()[tool](*args_norm)
else:
out = globals()[tool](args_norm)
results.append(f"{tool} -> {out}")
except Exception as e:
results.append(f"{tool} failed: {e}")
else:
results.append(f"Tool '{tool}' not found.")
if not results:
return "(Execute) No executable results produced."
reply = "Execution results:\n" + "\n".join(results)
if contact_id:
reply += f"\nPrimary contact id: {contact_id}"
return reply
# ----------------------------
# Simple explicit command parser for debugging
# ----------------------------
def try_parse_and_invoke_command(text: str):
text = text.strip()
m = re.match(r"^create_record\s+(\w+)\s+(.+)$", text, re.I)
if m:
module = m.group(1); body = m.group(2)
try:
record_data = json.loads(body)
except Exception:
return "Invalid JSON for record_data"
return create_record(module, record_data)
m = re.match(r"^create_invoice\s+(.+)$", text, re.I)
if m:
body = m.group(1)
try:
invoice_data = json.loads(body)
except Exception:
return "Invalid JSON for invoice_data"
return create_invoice(invoice_data)
m = re.match(r"^(\/mnt\/data\/\S+)$", text)
if m:
path = m.group(1); return process_document(path)
return None
# ----------------------------
# Chat flow: call LLM, parse JSON responses and execute
# ----------------------------
def deepseek_response(message: str, history: Optional[List[Tuple[str,str]]] = None) -> str:
history = history or []
system_prompt = "You are Zoho Assistant. Prefer concise answers. If you want to call a tool, return a JSON object: {\"tool\": \"create_record\", \"args\": {...}}"
history_text = ""
for pair in history:
try:
u,a = pair[0], pair[1]
history_text += f"User: {u}\nAssistant: {a}\n"
except Exception:
continue
prompt = f"{system_prompt}\n{history_text}\nUser: {message}\nAssistant:"
gen = local_llm_generate(prompt, max_tokens=256)
text = gen.get("text", "")
payload = text.strip()
# If model produced JSON instruction, execute
if payload.startswith("{") or payload.startswith("["):
try:
exec_result = parse_and_execute_model_tool_output(payload, history)
return exec_result
except Exception as e:
logger.error("Execution error: %s", e)
traceback.print_exc()
return f"(Execute) Error: {e}"
return text
# ----------------------------
# Gradio chat handler
# ----------------------------
def chat_handler(message, history):
history = history or []
trimmed = (message or "").strip()
# explicit debug commands
cmd = try_parse_and_invoke_command(trimmed)
if cmd is not None:
return cmd
# dev path handling
if trimmed.startswith("/mnt/data/"):
try:
doc = process_document(trimmed)
return f"Processed file {doc.get('file')}. Extracted: {json.dumps(doc.get('extracted_data'), ensure_ascii=False)}"
except Exception as e:
return f"Error processing document: {e}"
try:
return deepseek_response(trimmed, history)
except Exception as e:
logger.error("deepseek_response error: %s", e)
traceback.print_exc()
return rule_based_response(trimmed)
# ----------------------------
# Gradio UI builder
# ----------------------------
def chat_interface():
return gr.ChatInterface(fn=chat_handler, textbox=gr.Textbox(placeholder="Ask me to create contacts, invoices, upload docs (or paste /mnt/data/... for dev)."))
# ----------------------------
# Demo runner: directly perform create_contact -> create_invoice sequence
# ----------------------------
def perform_demo_actions(contact_payload: dict, invoice_items: List[dict], vat_pct: float = 0.0):
"""Directly call create_record and create_invoice to perform actions on Zoho.
This bypasses the LLM and demonstrates the end-to-end flow.
Returns a dict with results and any created ids.
"""
results = {"create_record": None, "contact_id": None, "create_invoice": None}
try:
# create contact
res_text = create_record("Contacts", contact_payload)
results["create_record"] = res_text
cid = _extract_created_id_from_zoho_response(res_text)
results["contact_id"] = cid
if not cid:
logger.warning("No contact id returned from create_record. Aborting invoice creation.")
return results
invoice_payload = _build_invoice_payload_for_zoho(cid, invoice_items, currency=contact_payload.get("Currency"), vat_pct=vat_pct)
inv_res = create_invoice(invoice_payload)
results["create_invoice"] = inv_res
# attempt to extract invoice id
try:
parsed = json.loads(inv_res) if isinstance(inv_res, str) and inv_res.strip().startswith("{") else None
inv_id = None
if parsed:
for k in ("invoice_id","id"):
if k in parsed:
inv_id = str(parsed[k]); break
data = parsed.get("data")
if not inv_id and isinstance(data, list) and len(data) > 0:
d0 = data[0]
if isinstance(d0, dict):
for key in ("invoice_id","id"):
if key in d0:
inv_id = str(d0[key]); break
results["invoice_id"] = inv_id
except Exception:
results["invoice_id"] = None
return results
except Exception as e:
logger.error("perform_demo_actions error: %s", e)
traceback.print_exc()
return results
# ----------------------------
# Entrypoint (includes a one-shot demo process_document call using your uploaded path)
# ----------------------------
if __name__ == "__main__":
logger.info("Starting MCP server. Loaded model: %s", LOADED_MODEL_NAME)
# Developer demo: process the uploaded file path you provided earlier
demo_file_path = "/mnt/data/script_zoho_mcp"
try:
demo_result = process_document(demo_file_path)
logger.info("Demo process_document result for %s: %s", demo_file_path, json.dumps(demo_result))
print("Demo process_document result:", json.dumps(demo_result))
except Exception as e:
logger.warning("Demo process_document failed: %s", e)
# --- NEW: run a direct contact+invoice demo using the user's supplied example ---
# This will actually call Zoho APIs (create contact -> create invoice) using config.py credentials.
demo_contact = {
"contact": "VachMe",
"Email": "vachaspathi1234@gmail.com",
"mobile": "9876543210",
"Currency": "$"
}
demo_items = [
{"name": "Car Wash - Wax Coating", "amount": 150.0, "quantity": 1},
{"name": "Car Wash - Washing", "amount": 50.0, "quantity": 1}
]
try:
demo_flow_res = perform_demo_actions(demo_contact, demo_items, vat_pct=16.0)
logger.info("Demo create contact+invoice result: %s", json.dumps(demo_flow_res))
print("Demo create contact+invoice result:", json.dumps(demo_flow_res))
except Exception as e:
logger.warning("Demo contact+invoice failed: %s", e)
demo = chat_interface()
demo.launch(server_name="0.0.0.0", server_port=7860)