# app.py — MCP server (single-file) # - FastMCP-based Zoho MCP POC with local HF model (DeepSeek / fallback) and Zoho Invoice v3 support. # - Place next to config.py which must define: # CLIENT_ID, CLIENT_SECRET, REFRESH_TOKEN, API_BASE, # INVOICE_API_BASE, ORGANIZATION_ID, LOCAL_MODEL (or LOCAL_MODEL=None) # - Developer note: this file will demo process_document on startup using the uploaded file # path: /mnt/data/script_zoho_mcp (the file you uploaded). That path will be converted # to a file:// URL automatically when tools receive it. # # IMPORTANT: Do NOT commit API keys to repos. Keep them in config.py locally. from mcp.server.fastmcp import FastMCP from typing import Optional, List, Tuple, Any, Dict import requests import os import gradio as gr import json import time import traceback import inspect import re import logging import base64 # Setup logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger("mcp_server") # Attempt to import transformers (optional) TRANSFORMERS_AVAILABLE = False try: from transformers import pipeline, AutoTokenizer, AutoModelForCausalLM, AutoModelForSeq2SeqLM TRANSFORMERS_AVAILABLE = True except Exception as e: logger.warning("transformers not available or failed to import: %s", e) TRANSFORMERS_AVAILABLE = False # ---------------------------- # Load config (must exist) # ---------------------------- try: from config import ( CLIENT_ID, CLIENT_SECRET, REFRESH_TOKEN, API_BASE, # Zoho CRM base if used INVOICE_API_BASE, # Zoho Invoice API base (e.g., https://invoice.zoho.in/api/v3) ORGANIZATION_ID, # Zoho Invoice organization id LOCAL_MODEL, # local HF model (e.g., "google/flan-t5-small") or None ) except Exception as e: raise SystemExit("Make sure config.py exists and contains CLIENT_ID, CLIENT_SECRET, REFRESH_TOKEN, API_BASE, INVOICE_API_BASE, ORGANIZATION_ID, LOCAL_MODEL (or set LOCAL_MODEL=None).") # Safe optional globals (can be set in config) LOCAL_TOKENIZER = globals().get("LOCAL_TOKENIZER", None) # ---------------------------- # Initialize FastMCP # ---------------------------- mcp = FastMCP("ZohoCRMAgent") # ---------------------------- # Analytics / KPI logging (simple JSON file) # ---------------------------- ANALYTICS_PATH = "mcp_analytics.json" def _init_analytics(): if not os.path.exists(ANALYTICS_PATH): base = {"tool_calls": {}, "llm_calls": 0, "last_llm_confidence": None, "created_at": time.time()} with open(ANALYTICS_PATH, "w") as f: json.dump(base, f, indent=2) def _log_tool_call(tool_name: str, success: bool = True): try: with open(ANALYTICS_PATH, "r") as f: data = json.load(f) except Exception: data = {"tool_calls": {}, "llm_calls": 0, "last_llm_confidence": None} data["tool_calls"].setdefault(tool_name, {"count": 0, "success": 0, "fail": 0}) data["tool_calls"][tool_name]["count"] += 1 if success: data["tool_calls"][tool_name]["success"] += 1 else: data["tool_calls"][tool_name]["fail"] += 1 with open(ANALYTICS_PATH, "w") as f: json.dump(data, f, indent=2) def _log_llm_call(confidence: Optional[float] = None): try: with open(ANALYTICS_PATH, "r") as f: data = json.load(f) except Exception: data = {"tool_calls": {}, "llm_calls": 0, "last_llm_confidence": None} data["llm_calls"] = data.get("llm_calls", 0) + 1 if confidence is not None: data["last_llm_confidence"] = confidence with open(ANALYTICS_PATH, "w") as f: json.dump(data, f, indent=2) _init_analytics() # ---------------------------- # Helper: normalize local file_path args # ---------------------------- def _normalize_local_path_args(args: Any) -> Any: if not isinstance(args, dict): return args fp = args.get("file_path") or args.get("path") or args.get("file") if isinstance(fp, str) and fp.startswith("/mnt/data/") and os.path.exists(fp): try: args["file_url"] = f"file://{fp}" try: with open(fp, "rb") as f: args["file_b64"] = base64.b64encode(f.read()).decode("utf-8") except Exception as e: logger.warning("Could not read file for base64: %s", e) args.pop("file_b64", None) except Exception as e: logger.warning("Normalization error for file_path %s: %s", fp, e) return args # ---------------------------- # Local LLM loader (DeepSeek style or fallback) # ---------------------------- LLM_PIPELINE = None TOKENIZER = None LOADED_MODEL_NAME = None def _parse_model_and_revision(model_string: str) -> Tuple[str, Optional[str]]: if ":" in model_string: repo_id, revision = model_string.split(":", 1) return repo_id.strip(), revision.strip() return model_string, None def init_local_model(): global LLM_PIPELINE, TOKENIZER, LOADED_MODEL_NAME if not LOCAL_MODEL: logger.info("LOCAL_MODEL not set — skipping local LLM load.") LLM_PIPELINE = None return if not TRANSFORMERS_AVAILABLE: logger.warning("transformers not available — skipping model load.") LLM_PIPELINE = None return try: repo_id, revision = _parse_model_and_revision(LOCAL_MODEL) tokenizer_name = LOCAL_TOKENIZER or repo_id LOADED_MODEL_NAME = f"{repo_id}" + (f" (rev={revision})" if revision else "") seq2seq_keywords = ["flan", "t5", "seq2seq"] if any(k in repo_id.lower() for k in seq2seq_keywords): if revision: TOKENIZER = AutoTokenizer.from_pretrained(tokenizer_name, use_fast=True, revision=revision) model = AutoModelForSeq2SeqLM.from_pretrained(repo_id, revision=revision) else: TOKENIZER = AutoTokenizer.from_pretrained(tokenizer_name, use_fast=True) model = AutoModelForSeq2SeqLM.from_pretrained(repo_id) LLM_PIPELINE = pipeline("text2text-generation", model=model, tokenizer=TOKENIZER) logger.info("Loaded seq2seq model: %s", LOADED_MODEL_NAME) else: if revision: TOKENIZER = AutoTokenizer.from_pretrained(tokenizer_name, use_fast=True, revision=revision) model = AutoModelForCausalLM.from_pretrained(repo_id, revision=revision) else: TOKENIZER = AutoTokenizer.from_pretrained(tokenizer_name, use_fast=True) model = AutoModelForCausalLM.from_pretrained(repo_id) LLM_PIPELINE = pipeline("text-generation", model=model, tokenizer=TOKENIZER) logger.info("Loaded causal model: %s", LOADED_MODEL_NAME) except Exception as e: logger.error("Failed to load LOCAL_MODEL '%s': %s", LOCAL_MODEL, e) traceback.print_exc() # fallback try: fallback = "google/flan-t5-small" TOKENIZER = AutoTokenizer.from_pretrained(fallback, use_fast=True) model = AutoModelForSeq2SeqLM.from_pretrained(fallback) LLM_PIPELINE = pipeline("text2text-generation", model=model, tokenizer=TOKENIZER) LOADED_MODEL_NAME = fallback logger.info("Loaded fallback model: %s", fallback) except Exception as e2: logger.error("Fallback model load failed: %s", e2) traceback.print_exc() LLM_PIPELINE = None LOADED_MODEL_NAME = None init_local_model() def rule_based_response(message: str) -> str: msg = (message or "").strip().lower() if msg.startswith("create record") or msg.startswith("create contact"): return "To create a record, use: create_record MODULE_NAME {\"Field\":\"value\"}" if msg.startswith("create_invoice"): return "To create invoice: create_invoice {\"customer_id\":\"...\",\"line_items\":[...]} (JSON)" if msg.startswith("help") or "what can you do" in msg: return "I can run create_record/update_record/delete_record or process local files by pasting their /mnt/data/ path." return "(fallback) No local LLM loaded. Use explicit commands like create_record or paste /mnt/data/ path." def local_llm_generate(prompt: str, max_tokens: int = 256) -> Dict[str, Any]: if LLM_PIPELINE is None: return {"text": rule_based_response(prompt), "confidence": None, "raw": None} try: out = LLM_PIPELINE(prompt, max_new_tokens=max_tokens) text = "" if isinstance(out, list) and len(out) > 0: first = out[0] if isinstance(first, dict): text = first.get("generated_text") or first.get("text") or str(first) else: text = str(first) else: text = str(out) _log_llm_call(None) return {"text": text, "confidence": None, "raw": out} except Exception as e: logger.error("LLM error: %s", e) traceback.print_exc() return {"text": rule_based_response(prompt), "confidence": None, "raw": str(e)} # ---------------------------- # Zoho OAuth helper & MCP tools # ---------------------------- def _get_valid_token_headers() -> dict: token_url = "https://accounts.zoho.in/oauth/v2/token" params = { "refresh_token": REFRESH_TOKEN, "client_id": CLIENT_ID, "client_secret": CLIENT_SECRET, "grant_type": "refresh_token" } r = requests.post(token_url, params=params, timeout=20) if r.status_code == 200: t = r.json().get("access_token") return {"Authorization": f"Zoho-oauthtoken {t}"} else: raise RuntimeError(f"Failed to refresh Zoho token: {r.status_code} {r.text}") @mcp.tool() def authenticate_zoho() -> str: try: _ = _get_valid_token_headers() _log_tool_call("authenticate_zoho", True) return "Zoho token refreshed (ok)." except Exception as e: _log_tool_call("authenticate_zoho", False) return f"Failed to authenticate: {e}" @mcp.tool() def create_record(module_name: str, record_data: dict) -> str: try: headers = _get_valid_token_headers() url = f"{API_BASE}/{module_name}" payload = {"data": [record_data]} r = requests.post(url, headers=headers, json=payload, timeout=20) if r.status_code in (200, 201): _log_tool_call("create_record", True) return json.dumps(r.json(), ensure_ascii=False) _log_tool_call("create_record", False) return f"Error creating record: {r.status_code} {r.text}" except Exception as e: _log_tool_call("create_record", False) return f"Exception: {e}" @mcp.tool() def get_records(module_name: str, page: int = 1, per_page: int = 200) -> list: try: headers = _get_valid_token_headers() url = f"{API_BASE}/{module_name}" r = requests.get(url, headers=headers, params={"page": page, "per_page": per_page}, timeout=20) if r.status_code == 200: _log_tool_call("get_records", True) return r.json().get("data", []) _log_tool_call("get_records", False) return [f"Error retrieving {module_name}: {r.status_code} {r.text}"] except Exception as e: _log_tool_call("get_records", False) return [f"Exception: {e}"] @mcp.tool() def update_record(module_name: str, record_id: str, data: dict) -> str: try: headers = _get_valid_token_headers() url = f"{API_BASE}/{module_name}/{record_id}" payload = {"data": [data]} r = requests.put(url, headers=headers, json=payload, timeout=20) if r.status_code == 200: _log_tool_call("update_record", True) return json.dumps(r.json(), ensure_ascii=False) _log_tool_call("update_record", False) return f"Error updating: {r.status_code} {r.text}" except Exception as e: _log_tool_call("update_record", False) return f"Exception: {e}" @mcp.tool() def delete_record(module_name: str, record_id: str) -> str: try: headers = _get_valid_token_headers() url = f"{API_BASE}/{module_name}/{record_id}" r = requests.delete(url, headers=headers, timeout=20) if r.status_code == 200: _log_tool_call("delete_record", True) return json.dumps(r.json(), ensure_ascii=False) _log_tool_call("delete_record", False) return f"Error deleting: {r.status_code} {r.text}" except Exception as e: _log_tool_call("delete_record", False) return f"Exception: {e}" # ---------------------------- # Zoho Invoice v3 functions (create invoice + upload attachment) # ---------------------------- def _ensure_invoice_config(): if not INVOICE_API_BASE or not ORGANIZATION_ID: raise RuntimeError("INVOICE_API_BASE and ORGANIZATION_ID must be set in config.py") @mcp.tool() def create_invoice(data: dict) -> str: """ Create an invoice in Zoho Invoice v3. 'data' should follow Zoho Invoice payload conventions (customer_id, line_items, etc.). """ try: _ensure_invoice_config() headers = _get_valid_token_headers() url = f"{INVOICE_API_BASE}/invoices" params = {"organization_id": ORGANIZATION_ID} r = requests.post(url, headers=headers, params=params, json=data, timeout=30) if r.status_code in (200, 201): _log_tool_call("create_invoice", True) return json.dumps(r.json(), ensure_ascii=False) _log_tool_call("create_invoice", False) return f"Error creating invoice: {r.status_code} {r.text}" except Exception as e: _log_tool_call("create_invoice", False) return f"Exception: {e}" def upload_invoice_attachment(invoice_id: str, file_path: str) -> str: """ Upload a local file as an attachment to the given invoice id. """ try: _ensure_invoice_config() except Exception as e: return f"Missing config: {e}" if not os.path.exists(file_path): return f"Attachment file not found: {file_path}" try: headers = _get_valid_token_headers() headers.pop("Content-Type", None) # requests will set for multipart url = f"{INVOICE_API_BASE}/invoices/{invoice_id}/attachments" params = {"organization_id": ORGANIZATION_ID} with open(file_path, "rb") as f: files = {"attachment": (os.path.basename(file_path), f)} r = requests.post(url, headers=headers, params=params, files=files, timeout=60) if r.status_code in (200, 201): _log_tool_call("upload_invoice_attachment", True) return json.dumps(r.json(), ensure_ascii=False) _log_tool_call("upload_invoice_attachment", False) return f"Error uploading attachment: {r.status_code} {r.text}" except Exception as e: _log_tool_call("upload_invoice_attachment", False) return f"Exception uploading attachment: {e}" @mcp.tool() def process_document(file_path: str, target_module: Optional[str] = "Contacts") -> dict: """ Simple placeholder that simulates OCR+parsing. Returns extracted_data with a confidence score and normalizes path->file_url. """ try: if os.path.exists(file_path): file_url = f"file://{file_path}" extracted = { "Name": "ACME Corp (simulated)", "Email": "contact@acme.example", "Phone": "+91-99999-00000", "Total": "1234.00", "Confidence": 0.88 } _log_tool_call("process_document", True) return {"status": "success", "file": os.path.basename(file_path), "file_url": file_url, "target_module": target_module, "extracted_data": extracted} else: _log_tool_call("process_document", False) return {"status": "error", "error": "file not found", "file_path": file_path} except Exception as e: _log_tool_call("process_document", False) return {"status": "error", "error": str(e)} # ---------------------------- # Helpers: map LLM args -> Zoho payloads and extract ids # ---------------------------- def _extract_created_id_from_zoho_response(resp_json) -> Optional[str]: try: if not resp_json: return None if isinstance(resp_json, str): try: resp_json = json.loads(resp_json) except Exception: return None data = resp_json.get("data") or resp_json.get("result") or None if isinstance(data, list) and len(data) > 0: details = data[0].get("details") or data[0] if isinstance(details, dict): for key in ("id", "ID", "Id"): if key in details: return str(details[key]) for v in details.values(): if isinstance(v, (str, int)) and len(str(v)) >= 4: return str(v) if "id" in resp_json: return str(resp_json["id"]) except Exception: pass return None def _map_contact_args_to_zoho_payload(args: dict) -> dict: contact_payload = {} if "contact" in args: contact_payload["Last_Name"] = args.get("contact") if "first_name" in args: contact_payload["First_Name"] = args.get("first_name") if "email" in args: contact_payload["Email"] = args.get("email") if "mobile" in args: contact_payload["Phone"] = args.get("mobile") if "currency" in args: contact_payload["Currency"] = args.get("currency") for k, v in args.items(): if k.lower() in ("contact", "email", "mobile", "first_name", "currency", "invoice", "items", "line_items"): continue if isinstance(v, (str, int, float)): if k not in contact_payload: contact_payload[k] = v return contact_payload def _build_invoice_payload_for_zoho(contact_id: str, invoice_items: List[dict], currency: str = None, vat_pct: float = 0.0) -> dict: line_items = [] for it in invoice_items: amount = None for k in ("item", "amount", "price", "rate"): if k in it: try: amount = float(str(it[k]).replace("$", "").replace(",", "").strip()) break except Exception: continue name = it.get("description") or it.get("name") or it.get("service") or "Item" qty = it.get("quantity") or it.get("qty") or 1 try: qty = int(qty) except Exception: try: qty = int(float(qty)) except Exception: qty = 1 if amount is None: amount = 0.0 line_items.append({"name": name, "rate": amount, "quantity": qty}) tax_obj = None try: vat_val = float(vat_pct) except Exception: try: vat_val = float(str(vat_pct).replace("%", "").strip()) except Exception: vat_val = 0.0 if vat_val > 0: tax_obj = {"name": "VAT", "percentage": vat_val} invoice_payload = {"customer_id": contact_id, "line_items": line_items} if currency: invoice_payload["currency_code"] = currency if tax_obj: invoice_payload["tax"] = tax_obj return invoice_payload # ---------------------------- # Parse & execute model tool JSON outputs (core executor) # ---------------------------- def parse_and_execute_model_tool_output(model_text: str, history: Optional[List[Tuple[str,str]]] = None) -> str: try: payload = json.loads(model_text) except Exception: return "(Parse) Model output was not valid JSON tool instruction." instructions = [] if isinstance(payload, dict) and "tool" in payload: instructions.append(payload) elif isinstance(payload, list): instructions = [p for p in payload if isinstance(p, dict) and "tool" in p] else: return "(Parse) No tool instructions found." results = [] contact_id = None for instr in instructions: tool = instr.get("tool") args = instr.get("args", {}) or {} args = _normalize_local_path_args(args) if tool == "create_record": try: contact_payload = _map_contact_args_to_zoho_payload(args) module = args.get("module_name") or args.get("module") or "Contacts" res = create_record(module, contact_payload) results.append(f"create_record -> {res}") # Try extract id cid = _extract_created_id_from_zoho_response(res if not isinstance(res, str) else (json.loads(res) if (isinstance(res, str) and res.strip().startswith("{")) else None)) if not cid: try: parsed = json.loads(res) if isinstance(res, str) and res.strip().startswith("{") else None cid = _extract_created_id_from_zoho_response(parsed) except Exception: cid = None if cid: contact_id = cid except Exception as e: results.append(f"create_record failed: {e}") continue elif tool in ("create_invoice", "createInvoice", "createInvoiceRecord"): invoice_items = args.get("invoice") or args.get("items") or args.get("line_items") or [] currency = args.get("currency") or args.get("currency_code") or args.get("currency_symbol") or None vat = args.get("vat") or args.get("tax") or args.get("vat_pct") or 0 try: if isinstance(vat, str) and "%" in vat: vat_val = float(vat.replace("%", "").strip()) else: vat_val = float(vat) except Exception: vat_val = 0.0 if not contact_id: contact_id = args.get("contact_id") or args.get("customer_id") or None if not contact_id: results.append("create_invoice skipped: no contact/customer id available. Create contact first.") continue zoho_line_items = [] for it in invoice_items: amount = None for k in ("amount","price","rate","item"): if k in it: try: amount = float(str(it[k]).replace("$","").replace(",","").strip()) break except Exception: continue name = it.get("description") or it.get("name") or it.get("service") or "Item" qty = it.get("quantity") or it.get("qty") or 1 try: qty = int(qty) except Exception: try: qty = int(float(qty)) except Exception: qty = 1 if amount is None: amount = 0.0 zoho_line_items.append({"name": name, "rate": amount, "quantity": qty}) invoice_payload = {"customer_id": contact_id, "line_items": zoho_line_items} if currency: invoice_payload["currency_code"] = currency if vat_val > 0: invoice_payload["tax"] = {"name": "VAT", "percentage": vat_val} if args.get("notes"): invoice_payload["notes"] = args.get("notes") if args.get("reference_number"): invoice_payload["reference_number"] = args.get("reference_number") if args.get("terms"): invoice_payload["terms"] = args.get("terms") try: inv_res_text = create_invoice(invoice_payload) results.append(f"create_invoice -> {inv_res_text}") except Exception as e: results.append(f"create_invoice failed: {e}") continue # extract invoice id inv_id = None try: parsed_inv = json.loads(inv_res_text) if isinstance(inv_res_text, str) and inv_res_text.strip().startswith("{") else None if parsed_inv: if isinstance(parsed_inv.get("invoice"), dict) and parsed_inv["invoice"].get("invoice_id"): inv_id = str(parsed_inv["invoice"]["invoice_id"]) else: for k in ("invoice_id","id"): if k in parsed_inv: inv_id = str(parsed_inv[k]); break data = parsed_inv.get("data") if not inv_id and isinstance(data, list) and len(data) > 0: d0 = data[0] if isinstance(d0, dict): for key in ("invoice_id", "id"): if key in d0: inv_id = str(d0[key]); break except Exception: inv_id = None # attempt attachment upload if provided attachment_path = None if args.get("file_path"): attachment_path = args.get("file_path") elif args.get("file_url"): fu = args.get("file_url") if isinstance(fu, str) and fu.startswith("file://"): attachment_path = fu.replace("file://", "") if inv_id and attachment_path: try: attach_res = upload_invoice_attachment(inv_id, attachment_path) results.append(f"upload_attachment -> {attach_res}") except Exception as e: results.append(f"upload_attachment failed: {e}") else: # Generic attempt to call a local tool if callable(globals().get(tool)): try: args_norm = args if not isinstance(args, dict) else _normalize_local_path_args(args) if isinstance(args_norm, dict): out = globals()[tool](**args_norm) elif isinstance(args_norm, (list, tuple)): out = globals()[tool](*args_norm) else: out = globals()[tool](args_norm) results.append(f"{tool} -> {out}") except Exception as e: results.append(f"{tool} failed: {e}") else: results.append(f"Tool '{tool}' not found.") if not results: return "(Execute) No executable results produced." reply = "Execution results:\n" + "\n".join(results) if contact_id: reply += f"\nPrimary contact id: {contact_id}" return reply # ---------------------------- # Simple explicit command parser for debugging # ---------------------------- def try_parse_and_invoke_command(text: str): text = text.strip() m = re.match(r"^create_record\s+(\w+)\s+(.+)$", text, re.I) if m: module = m.group(1); body = m.group(2) try: record_data = json.loads(body) except Exception: return "Invalid JSON for record_data" return create_record(module, record_data) m = re.match(r"^create_invoice\s+(.+)$", text, re.I) if m: body = m.group(1) try: invoice_data = json.loads(body) except Exception: return "Invalid JSON for invoice_data" return create_invoice(invoice_data) m = re.match(r"^(\/mnt\/data\/\S+)$", text) if m: path = m.group(1); return process_document(path) return None # ---------------------------- # Chat flow: call LLM, parse JSON responses and execute # ---------------------------- def deepseek_response(message: str, history: Optional[List[Tuple[str,str]]] = None) -> str: history = history or [] system_prompt = "You are Zoho Assistant. Prefer concise answers. If you want to call a tool, return a JSON object: {\"tool\": \"create_record\", \"args\": {...}}" history_text = "" for pair in history: try: u,a = pair[0], pair[1] history_text += f"User: {u}\nAssistant: {a}\n" except Exception: continue prompt = f"{system_prompt}\n{history_text}\nUser: {message}\nAssistant:" gen = local_llm_generate(prompt, max_tokens=256) text = gen.get("text", "") payload = text.strip() # If model produced JSON instruction, execute if payload.startswith("{") or payload.startswith("["): try: exec_result = parse_and_execute_model_tool_output(payload, history) return exec_result except Exception as e: logger.error("Execution error: %s", e) traceback.print_exc() return f"(Execute) Error: {e}" return text # ---------------------------- # Gradio chat handler # ---------------------------- def chat_handler(message, history): history = history or [] trimmed = (message or "").strip() # explicit debug commands cmd = try_parse_and_invoke_command(trimmed) if cmd is not None: return cmd # dev path handling if trimmed.startswith("/mnt/data/"): try: doc = process_document(trimmed) return f"Processed file {doc.get('file')}. Extracted: {json.dumps(doc.get('extracted_data'), ensure_ascii=False)}" except Exception as e: return f"Error processing document: {e}" try: return deepseek_response(trimmed, history) except Exception as e: logger.error("deepseek_response error: %s", e) traceback.print_exc() return rule_based_response(trimmed) # ---------------------------- # Gradio UI builder # ---------------------------- def chat_interface(): return gr.ChatInterface(fn=chat_handler, textbox=gr.Textbox(placeholder="Ask me to create contacts, invoices, upload docs (or paste /mnt/data/... for dev).")) # ---------------------------- # Demo runner: directly perform create_contact -> create_invoice sequence # ---------------------------- def perform_demo_actions(contact_payload: dict, invoice_items: List[dict], vat_pct: float = 0.0): """Directly call create_record and create_invoice to perform actions on Zoho. This bypasses the LLM and demonstrates the end-to-end flow. Returns a dict with results and any created ids. """ results = {"create_record": None, "contact_id": None, "create_invoice": None} try: # create contact res_text = create_record("Contacts", contact_payload) results["create_record"] = res_text cid = _extract_created_id_from_zoho_response(res_text) results["contact_id"] = cid if not cid: logger.warning("No contact id returned from create_record. Aborting invoice creation.") return results invoice_payload = _build_invoice_payload_for_zoho(cid, invoice_items, currency=contact_payload.get("Currency"), vat_pct=vat_pct) inv_res = create_invoice(invoice_payload) results["create_invoice"] = inv_res # attempt to extract invoice id try: parsed = json.loads(inv_res) if isinstance(inv_res, str) and inv_res.strip().startswith("{") else None inv_id = None if parsed: for k in ("invoice_id","id"): if k in parsed: inv_id = str(parsed[k]); break data = parsed.get("data") if not inv_id and isinstance(data, list) and len(data) > 0: d0 = data[0] if isinstance(d0, dict): for key in ("invoice_id","id"): if key in d0: inv_id = str(d0[key]); break results["invoice_id"] = inv_id except Exception: results["invoice_id"] = None return results except Exception as e: logger.error("perform_demo_actions error: %s", e) traceback.print_exc() return results # ---------------------------- # Entrypoint (includes a one-shot demo process_document call using your uploaded path) # ---------------------------- if __name__ == "__main__": logger.info("Starting MCP server. Loaded model: %s", LOADED_MODEL_NAME) # Developer demo: process the uploaded file path you provided earlier demo_file_path = "/mnt/data/script_zoho_mcp" try: demo_result = process_document(demo_file_path) logger.info("Demo process_document result for %s: %s", demo_file_path, json.dumps(demo_result)) print("Demo process_document result:", json.dumps(demo_result)) except Exception as e: logger.warning("Demo process_document failed: %s", e) # --- NEW: run a direct contact+invoice demo using the user's supplied example --- # This will actually call Zoho APIs (create contact -> create invoice) using config.py credentials. demo_contact = { "contact": "VachMe", "Email": "vachaspathi1234@gmail.com", "mobile": "9876543210", "Currency": "$" } demo_items = [ {"name": "Car Wash - Wax Coating", "amount": 150.0, "quantity": 1}, {"name": "Car Wash - Washing", "amount": 50.0, "quantity": 1} ] try: demo_flow_res = perform_demo_actions(demo_contact, demo_items, vat_pct=16.0) logger.info("Demo create contact+invoice result: %s", json.dumps(demo_flow_res)) print("Demo create contact+invoice result:", json.dumps(demo_flow_res)) except Exception as e: logger.warning("Demo contact+invoice failed: %s", e) demo = chat_interface() demo.launch(server_name="0.0.0.0", server_port=7860)