Spaces:
Sleeping
Sleeping
| # app.py — MCP server (single-file) | |
| # - FastMCP-based Zoho MCP POC with local HF model (DeepSeek / fallback) and Zoho Invoice v3 support. | |
| # - Place next to config.py which must define: | |
| # CLIENT_ID, CLIENT_SECRET, REFRESH_TOKEN, API_BASE, | |
| # INVOICE_API_BASE, ORGANIZATION_ID, LOCAL_MODEL (or LOCAL_MODEL=None) | |
| # - Developer note: this file will demo process_document on startup using the uploaded file | |
| # path: /mnt/data/script_zoho_mcp (the file you uploaded). That path will be converted | |
| # to a file:// URL automatically when tools receive it. | |
| # | |
| # IMPORTANT: Do NOT commit API keys to repos. Keep them in config.py locally. | |
| from mcp.server.fastmcp import FastMCP | |
| from typing import Optional, List, Tuple, Any, Dict | |
| import requests | |
| import os | |
| import gradio as gr | |
| import json | |
| import time | |
| import traceback | |
| import inspect | |
| import re | |
| import logging | |
| import base64 | |
| # Setup logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger("mcp_server") | |
| # Attempt to import transformers (optional) | |
| TRANSFORMERS_AVAILABLE = False | |
| try: | |
| from transformers import pipeline, AutoTokenizer, AutoModelForCausalLM, AutoModelForSeq2SeqLM | |
| TRANSFORMERS_AVAILABLE = True | |
| except Exception as e: | |
| logger.warning("transformers not available or failed to import: %s", e) | |
| TRANSFORMERS_AVAILABLE = False | |
| # ---------------------------- | |
| # Load config (must exist) | |
| # ---------------------------- | |
| try: | |
| from config import ( | |
| CLIENT_ID, | |
| CLIENT_SECRET, | |
| REFRESH_TOKEN, | |
| API_BASE, # Zoho CRM base if used | |
| INVOICE_API_BASE, # Zoho Invoice API base (e.g., https://invoice.zoho.in/api/v3) | |
| ORGANIZATION_ID, # Zoho Invoice organization id | |
| LOCAL_MODEL, # local HF model (e.g., "google/flan-t5-small") or None | |
| ) | |
| except Exception as e: | |
| raise SystemExit("Make sure config.py exists and contains CLIENT_ID, CLIENT_SECRET, REFRESH_TOKEN, API_BASE, INVOICE_API_BASE, ORGANIZATION_ID, LOCAL_MODEL (or set LOCAL_MODEL=None).") | |
| # Safe optional globals (can be set in config) | |
| LOCAL_TOKENIZER = globals().get("LOCAL_TOKENIZER", None) | |
| # ---------------------------- | |
| # Initialize FastMCP | |
| # ---------------------------- | |
| mcp = FastMCP("ZohoCRMAgent") | |
| # ---------------------------- | |
| # Analytics / KPI logging (simple JSON file) | |
| # ---------------------------- | |
| ANALYTICS_PATH = "mcp_analytics.json" | |
| def _init_analytics(): | |
| if not os.path.exists(ANALYTICS_PATH): | |
| base = {"tool_calls": {}, "llm_calls": 0, "last_llm_confidence": None, "created_at": time.time()} | |
| with open(ANALYTICS_PATH, "w") as f: | |
| json.dump(base, f, indent=2) | |
| def _log_tool_call(tool_name: str, success: bool = True): | |
| try: | |
| with open(ANALYTICS_PATH, "r") as f: | |
| data = json.load(f) | |
| except Exception: | |
| data = {"tool_calls": {}, "llm_calls": 0, "last_llm_confidence": None} | |
| data["tool_calls"].setdefault(tool_name, {"count": 0, "success": 0, "fail": 0}) | |
| data["tool_calls"][tool_name]["count"] += 1 | |
| if success: | |
| data["tool_calls"][tool_name]["success"] += 1 | |
| else: | |
| data["tool_calls"][tool_name]["fail"] += 1 | |
| with open(ANALYTICS_PATH, "w") as f: | |
| json.dump(data, f, indent=2) | |
| def _log_llm_call(confidence: Optional[float] = None): | |
| try: | |
| with open(ANALYTICS_PATH, "r") as f: | |
| data = json.load(f) | |
| except Exception: | |
| data = {"tool_calls": {}, "llm_calls": 0, "last_llm_confidence": None} | |
| data["llm_calls"] = data.get("llm_calls", 0) + 1 | |
| if confidence is not None: | |
| data["last_llm_confidence"] = confidence | |
| with open(ANALYTICS_PATH, "w") as f: | |
| json.dump(data, f, indent=2) | |
| _init_analytics() | |
| # ---------------------------- | |
| # Helper: normalize local file_path args | |
| # ---------------------------- | |
| def _normalize_local_path_args(args: Any) -> Any: | |
| if not isinstance(args, dict): | |
| return args | |
| fp = args.get("file_path") or args.get("path") or args.get("file") | |
| if isinstance(fp, str) and fp.startswith("/mnt/data/") and os.path.exists(fp): | |
| try: | |
| args["file_url"] = f"file://{fp}" | |
| try: | |
| with open(fp, "rb") as f: | |
| args["file_b64"] = base64.b64encode(f.read()).decode("utf-8") | |
| except Exception as e: | |
| logger.warning("Could not read file for base64: %s", e) | |
| args.pop("file_b64", None) | |
| except Exception as e: | |
| logger.warning("Normalization error for file_path %s: %s", fp, e) | |
| return args | |
| # ---------------------------- | |
| # Local LLM loader (DeepSeek style or fallback) | |
| # ---------------------------- | |
| LLM_PIPELINE = None | |
| TOKENIZER = None | |
| LOADED_MODEL_NAME = None | |
| def _parse_model_and_revision(model_string: str) -> Tuple[str, Optional[str]]: | |
| if ":" in model_string: | |
| repo_id, revision = model_string.split(":", 1) | |
| return repo_id.strip(), revision.strip() | |
| return model_string, None | |
| def init_local_model(): | |
| global LLM_PIPELINE, TOKENIZER, LOADED_MODEL_NAME | |
| if not LOCAL_MODEL: | |
| logger.info("LOCAL_MODEL not set — skipping local LLM load.") | |
| LLM_PIPELINE = None | |
| return | |
| if not TRANSFORMERS_AVAILABLE: | |
| logger.warning("transformers not available — skipping model load.") | |
| LLM_PIPELINE = None | |
| return | |
| try: | |
| repo_id, revision = _parse_model_and_revision(LOCAL_MODEL) | |
| tokenizer_name = LOCAL_TOKENIZER or repo_id | |
| LOADED_MODEL_NAME = f"{repo_id}" + (f" (rev={revision})" if revision else "") | |
| seq2seq_keywords = ["flan", "t5", "seq2seq"] | |
| if any(k in repo_id.lower() for k in seq2seq_keywords): | |
| if revision: | |
| TOKENIZER = AutoTokenizer.from_pretrained(tokenizer_name, use_fast=True, revision=revision) | |
| model = AutoModelForSeq2SeqLM.from_pretrained(repo_id, revision=revision) | |
| else: | |
| TOKENIZER = AutoTokenizer.from_pretrained(tokenizer_name, use_fast=True) | |
| model = AutoModelForSeq2SeqLM.from_pretrained(repo_id) | |
| LLM_PIPELINE = pipeline("text2text-generation", model=model, tokenizer=TOKENIZER) | |
| logger.info("Loaded seq2seq model: %s", LOADED_MODEL_NAME) | |
| else: | |
| if revision: | |
| TOKENIZER = AutoTokenizer.from_pretrained(tokenizer_name, use_fast=True, revision=revision) | |
| model = AutoModelForCausalLM.from_pretrained(repo_id, revision=revision) | |
| else: | |
| TOKENIZER = AutoTokenizer.from_pretrained(tokenizer_name, use_fast=True) | |
| model = AutoModelForCausalLM.from_pretrained(repo_id) | |
| LLM_PIPELINE = pipeline("text-generation", model=model, tokenizer=TOKENIZER) | |
| logger.info("Loaded causal model: %s", LOADED_MODEL_NAME) | |
| except Exception as e: | |
| logger.error("Failed to load LOCAL_MODEL '%s': %s", LOCAL_MODEL, e) | |
| traceback.print_exc() | |
| # fallback | |
| try: | |
| fallback = "google/flan-t5-small" | |
| TOKENIZER = AutoTokenizer.from_pretrained(fallback, use_fast=True) | |
| model = AutoModelForSeq2SeqLM.from_pretrained(fallback) | |
| LLM_PIPELINE = pipeline("text2text-generation", model=model, tokenizer=TOKENIZER) | |
| LOADED_MODEL_NAME = fallback | |
| logger.info("Loaded fallback model: %s", fallback) | |
| except Exception as e2: | |
| logger.error("Fallback model load failed: %s", e2) | |
| traceback.print_exc() | |
| LLM_PIPELINE = None | |
| LOADED_MODEL_NAME = None | |
| init_local_model() | |
| def rule_based_response(message: str) -> str: | |
| msg = (message or "").strip().lower() | |
| if msg.startswith("create record") or msg.startswith("create contact"): | |
| return "To create a record, use: create_record MODULE_NAME {\"Field\":\"value\"}" | |
| if msg.startswith("create_invoice"): | |
| return "To create invoice: create_invoice {\"customer_id\":\"...\",\"line_items\":[...]} (JSON)" | |
| if msg.startswith("help") or "what can you do" in msg: | |
| return "I can run create_record/update_record/delete_record or process local files by pasting their /mnt/data/ path." | |
| return "(fallback) No local LLM loaded. Use explicit commands like create_record or paste /mnt/data/ path." | |
| def local_llm_generate(prompt: str, max_tokens: int = 256) -> Dict[str, Any]: | |
| if LLM_PIPELINE is None: | |
| return {"text": rule_based_response(prompt), "confidence": None, "raw": None} | |
| try: | |
| out = LLM_PIPELINE(prompt, max_new_tokens=max_tokens) | |
| text = "" | |
| if isinstance(out, list) and len(out) > 0: | |
| first = out[0] | |
| if isinstance(first, dict): | |
| text = first.get("generated_text") or first.get("text") or str(first) | |
| else: | |
| text = str(first) | |
| else: | |
| text = str(out) | |
| _log_llm_call(None) | |
| return {"text": text, "confidence": None, "raw": out} | |
| except Exception as e: | |
| logger.error("LLM error: %s", e) | |
| traceback.print_exc() | |
| return {"text": rule_based_response(prompt), "confidence": None, "raw": str(e)} | |
| # ---------------------------- | |
| # Zoho OAuth helper & MCP tools | |
| # ---------------------------- | |
| def _get_valid_token_headers() -> dict: | |
| token_url = "https://accounts.zoho.in/oauth/v2/token" | |
| params = { | |
| "refresh_token": REFRESH_TOKEN, | |
| "client_id": CLIENT_ID, | |
| "client_secret": CLIENT_SECRET, | |
| "grant_type": "refresh_token" | |
| } | |
| r = requests.post(token_url, params=params, timeout=20) | |
| if r.status_code == 200: | |
| t = r.json().get("access_token") | |
| return {"Authorization": f"Zoho-oauthtoken {t}"} | |
| else: | |
| raise RuntimeError(f"Failed to refresh Zoho token: {r.status_code} {r.text}") | |
| def authenticate_zoho() -> str: | |
| try: | |
| _ = _get_valid_token_headers() | |
| _log_tool_call("authenticate_zoho", True) | |
| return "Zoho token refreshed (ok)." | |
| except Exception as e: | |
| _log_tool_call("authenticate_zoho", False) | |
| return f"Failed to authenticate: {e}" | |
| def create_record(module_name: str, record_data: dict) -> str: | |
| try: | |
| headers = _get_valid_token_headers() | |
| url = f"{API_BASE}/{module_name}" | |
| payload = {"data": [record_data]} | |
| r = requests.post(url, headers=headers, json=payload, timeout=20) | |
| if r.status_code in (200, 201): | |
| _log_tool_call("create_record", True) | |
| return json.dumps(r.json(), ensure_ascii=False) | |
| _log_tool_call("create_record", False) | |
| return f"Error creating record: {r.status_code} {r.text}" | |
| except Exception as e: | |
| _log_tool_call("create_record", False) | |
| return f"Exception: {e}" | |
| def get_records(module_name: str, page: int = 1, per_page: int = 200) -> list: | |
| try: | |
| headers = _get_valid_token_headers() | |
| url = f"{API_BASE}/{module_name}" | |
| r = requests.get(url, headers=headers, params={"page": page, "per_page": per_page}, timeout=20) | |
| if r.status_code == 200: | |
| _log_tool_call("get_records", True) | |
| return r.json().get("data", []) | |
| _log_tool_call("get_records", False) | |
| return [f"Error retrieving {module_name}: {r.status_code} {r.text}"] | |
| except Exception as e: | |
| _log_tool_call("get_records", False) | |
| return [f"Exception: {e}"] | |
| def update_record(module_name: str, record_id: str, data: dict) -> str: | |
| try: | |
| headers = _get_valid_token_headers() | |
| url = f"{API_BASE}/{module_name}/{record_id}" | |
| payload = {"data": [data]} | |
| r = requests.put(url, headers=headers, json=payload, timeout=20) | |
| if r.status_code == 200: | |
| _log_tool_call("update_record", True) | |
| return json.dumps(r.json(), ensure_ascii=False) | |
| _log_tool_call("update_record", False) | |
| return f"Error updating: {r.status_code} {r.text}" | |
| except Exception as e: | |
| _log_tool_call("update_record", False) | |
| return f"Exception: {e}" | |
| def delete_record(module_name: str, record_id: str) -> str: | |
| try: | |
| headers = _get_valid_token_headers() | |
| url = f"{API_BASE}/{module_name}/{record_id}" | |
| r = requests.delete(url, headers=headers, timeout=20) | |
| if r.status_code == 200: | |
| _log_tool_call("delete_record", True) | |
| return json.dumps(r.json(), ensure_ascii=False) | |
| _log_tool_call("delete_record", False) | |
| return f"Error deleting: {r.status_code} {r.text}" | |
| except Exception as e: | |
| _log_tool_call("delete_record", False) | |
| return f"Exception: {e}" | |
| # ---------------------------- | |
| # Zoho Invoice v3 functions (create invoice + upload attachment) | |
| # ---------------------------- | |
| def _ensure_invoice_config(): | |
| if not INVOICE_API_BASE or not ORGANIZATION_ID: | |
| raise RuntimeError("INVOICE_API_BASE and ORGANIZATION_ID must be set in config.py") | |
| def create_invoice(data: dict) -> str: | |
| """ | |
| Create an invoice in Zoho Invoice v3. | |
| 'data' should follow Zoho Invoice payload conventions (customer_id, line_items, etc.). | |
| """ | |
| try: | |
| _ensure_invoice_config() | |
| headers = _get_valid_token_headers() | |
| url = f"{INVOICE_API_BASE}/invoices" | |
| params = {"organization_id": ORGANIZATION_ID} | |
| r = requests.post(url, headers=headers, params=params, json=data, timeout=30) | |
| if r.status_code in (200, 201): | |
| _log_tool_call("create_invoice", True) | |
| return json.dumps(r.json(), ensure_ascii=False) | |
| _log_tool_call("create_invoice", False) | |
| return f"Error creating invoice: {r.status_code} {r.text}" | |
| except Exception as e: | |
| _log_tool_call("create_invoice", False) | |
| return f"Exception: {e}" | |
| def upload_invoice_attachment(invoice_id: str, file_path: str) -> str: | |
| """ | |
| Upload a local file as an attachment to the given invoice id. | |
| """ | |
| try: | |
| _ensure_invoice_config() | |
| except Exception as e: | |
| return f"Missing config: {e}" | |
| if not os.path.exists(file_path): | |
| return f"Attachment file not found: {file_path}" | |
| try: | |
| headers = _get_valid_token_headers() | |
| headers.pop("Content-Type", None) # requests will set for multipart | |
| url = f"{INVOICE_API_BASE}/invoices/{invoice_id}/attachments" | |
| params = {"organization_id": ORGANIZATION_ID} | |
| with open(file_path, "rb") as f: | |
| files = {"attachment": (os.path.basename(file_path), f)} | |
| r = requests.post(url, headers=headers, params=params, files=files, timeout=60) | |
| if r.status_code in (200, 201): | |
| _log_tool_call("upload_invoice_attachment", True) | |
| return json.dumps(r.json(), ensure_ascii=False) | |
| _log_tool_call("upload_invoice_attachment", False) | |
| return f"Error uploading attachment: {r.status_code} {r.text}" | |
| except Exception as e: | |
| _log_tool_call("upload_invoice_attachment", False) | |
| return f"Exception uploading attachment: {e}" | |
| def process_document(file_path: str, target_module: Optional[str] = "Contacts") -> dict: | |
| """ | |
| Simple placeholder that simulates OCR+parsing. | |
| Returns extracted_data with a confidence score and normalizes path->file_url. | |
| """ | |
| try: | |
| if os.path.exists(file_path): | |
| file_url = f"file://{file_path}" | |
| extracted = { | |
| "Name": "ACME Corp (simulated)", | |
| "Email": "contact@acme.example", | |
| "Phone": "+91-99999-00000", | |
| "Total": "1234.00", | |
| "Confidence": 0.88 | |
| } | |
| _log_tool_call("process_document", True) | |
| return {"status": "success", "file": os.path.basename(file_path), "file_url": file_url, "target_module": target_module, "extracted_data": extracted} | |
| else: | |
| _log_tool_call("process_document", False) | |
| return {"status": "error", "error": "file not found", "file_path": file_path} | |
| except Exception as e: | |
| _log_tool_call("process_document", False) | |
| return {"status": "error", "error": str(e)} | |
| # ---------------------------- | |
| # Helpers: map LLM args -> Zoho payloads and extract ids | |
| # ---------------------------- | |
| def _extract_created_id_from_zoho_response(resp_json) -> Optional[str]: | |
| try: | |
| if not resp_json: | |
| return None | |
| if isinstance(resp_json, str): | |
| try: | |
| resp_json = json.loads(resp_json) | |
| except Exception: | |
| return None | |
| data = resp_json.get("data") or resp_json.get("result") or None | |
| if isinstance(data, list) and len(data) > 0: | |
| details = data[0].get("details") or data[0] | |
| if isinstance(details, dict): | |
| for key in ("id", "ID", "Id"): | |
| if key in details: | |
| return str(details[key]) | |
| for v in details.values(): | |
| if isinstance(v, (str, int)) and len(str(v)) >= 4: | |
| return str(v) | |
| if "id" in resp_json: | |
| return str(resp_json["id"]) | |
| except Exception: | |
| pass | |
| return None | |
| def _map_contact_args_to_zoho_payload(args: dict) -> dict: | |
| contact_payload = {} | |
| if "contact" in args: | |
| contact_payload["Last_Name"] = args.get("contact") | |
| if "first_name" in args: | |
| contact_payload["First_Name"] = args.get("first_name") | |
| if "email" in args: | |
| contact_payload["Email"] = args.get("email") | |
| if "mobile" in args: | |
| contact_payload["Phone"] = args.get("mobile") | |
| if "currency" in args: | |
| contact_payload["Currency"] = args.get("currency") | |
| for k, v in args.items(): | |
| if k.lower() in ("contact", "email", "mobile", "first_name", "currency", "invoice", "items", "line_items"): | |
| continue | |
| if isinstance(v, (str, int, float)): | |
| if k not in contact_payload: | |
| contact_payload[k] = v | |
| return contact_payload | |
| def _build_invoice_payload_for_zoho(contact_id: str, invoice_items: List[dict], currency: str = None, vat_pct: float = 0.0) -> dict: | |
| line_items = [] | |
| for it in invoice_items: | |
| amount = None | |
| for k in ("item", "amount", "price", "rate"): | |
| if k in it: | |
| try: | |
| amount = float(str(it[k]).replace("$", "").replace(",", "").strip()) | |
| break | |
| except Exception: | |
| continue | |
| name = it.get("description") or it.get("name") or it.get("service") or "Item" | |
| qty = it.get("quantity") or it.get("qty") or 1 | |
| try: | |
| qty = int(qty) | |
| except Exception: | |
| try: | |
| qty = int(float(qty)) | |
| except Exception: | |
| qty = 1 | |
| if amount is None: | |
| amount = 0.0 | |
| line_items.append({"name": name, "rate": amount, "quantity": qty}) | |
| tax_obj = None | |
| try: | |
| vat_val = float(vat_pct) | |
| except Exception: | |
| try: | |
| vat_val = float(str(vat_pct).replace("%", "").strip()) | |
| except Exception: | |
| vat_val = 0.0 | |
| if vat_val > 0: | |
| tax_obj = {"name": "VAT", "percentage": vat_val} | |
| invoice_payload = {"customer_id": contact_id, "line_items": line_items} | |
| if currency: | |
| invoice_payload["currency_code"] = currency | |
| if tax_obj: | |
| invoice_payload["tax"] = tax_obj | |
| return invoice_payload | |
| # ---------------------------- | |
| # Parse & execute model tool JSON outputs (core executor) | |
| # ---------------------------- | |
| def parse_and_execute_model_tool_output(model_text: str, history: Optional[List[Tuple[str,str]]] = None) -> str: | |
| try: | |
| payload = json.loads(model_text) | |
| except Exception: | |
| return "(Parse) Model output was not valid JSON tool instruction." | |
| instructions = [] | |
| if isinstance(payload, dict) and "tool" in payload: | |
| instructions.append(payload) | |
| elif isinstance(payload, list): | |
| instructions = [p for p in payload if isinstance(p, dict) and "tool" in p] | |
| else: | |
| return "(Parse) No tool instructions found." | |
| results = [] | |
| contact_id = None | |
| for instr in instructions: | |
| tool = instr.get("tool") | |
| args = instr.get("args", {}) or {} | |
| args = _normalize_local_path_args(args) | |
| if tool == "create_record": | |
| try: | |
| contact_payload = _map_contact_args_to_zoho_payload(args) | |
| module = args.get("module_name") or args.get("module") or "Contacts" | |
| res = create_record(module, contact_payload) | |
| results.append(f"create_record -> {res}") | |
| # Try extract id | |
| cid = _extract_created_id_from_zoho_response(res if not isinstance(res, str) else (json.loads(res) if (isinstance(res, str) and res.strip().startswith("{")) else None)) | |
| if not cid: | |
| try: | |
| parsed = json.loads(res) if isinstance(res, str) and res.strip().startswith("{") else None | |
| cid = _extract_created_id_from_zoho_response(parsed) | |
| except Exception: | |
| cid = None | |
| if cid: | |
| contact_id = cid | |
| except Exception as e: | |
| results.append(f"create_record failed: {e}") | |
| continue | |
| elif tool in ("create_invoice", "createInvoice", "createInvoiceRecord"): | |
| invoice_items = args.get("invoice") or args.get("items") or args.get("line_items") or [] | |
| currency = args.get("currency") or args.get("currency_code") or args.get("currency_symbol") or None | |
| vat = args.get("vat") or args.get("tax") or args.get("vat_pct") or 0 | |
| try: | |
| if isinstance(vat, str) and "%" in vat: | |
| vat_val = float(vat.replace("%", "").strip()) | |
| else: | |
| vat_val = float(vat) | |
| except Exception: | |
| vat_val = 0.0 | |
| if not contact_id: | |
| contact_id = args.get("contact_id") or args.get("customer_id") or None | |
| if not contact_id: | |
| results.append("create_invoice skipped: no contact/customer id available. Create contact first.") | |
| continue | |
| zoho_line_items = [] | |
| for it in invoice_items: | |
| amount = None | |
| for k in ("amount","price","rate","item"): | |
| if k in it: | |
| try: | |
| amount = float(str(it[k]).replace("$","").replace(",","").strip()) | |
| break | |
| except Exception: | |
| continue | |
| name = it.get("description") or it.get("name") or it.get("service") or "Item" | |
| qty = it.get("quantity") or it.get("qty") or 1 | |
| try: | |
| qty = int(qty) | |
| except Exception: | |
| try: | |
| qty = int(float(qty)) | |
| except Exception: | |
| qty = 1 | |
| if amount is None: | |
| amount = 0.0 | |
| zoho_line_items.append({"name": name, "rate": amount, "quantity": qty}) | |
| invoice_payload = {"customer_id": contact_id, "line_items": zoho_line_items} | |
| if currency: | |
| invoice_payload["currency_code"] = currency | |
| if vat_val > 0: | |
| invoice_payload["tax"] = {"name": "VAT", "percentage": vat_val} | |
| if args.get("notes"): | |
| invoice_payload["notes"] = args.get("notes") | |
| if args.get("reference_number"): | |
| invoice_payload["reference_number"] = args.get("reference_number") | |
| if args.get("terms"): | |
| invoice_payload["terms"] = args.get("terms") | |
| try: | |
| inv_res_text = create_invoice(invoice_payload) | |
| results.append(f"create_invoice -> {inv_res_text}") | |
| except Exception as e: | |
| results.append(f"create_invoice failed: {e}") | |
| continue | |
| # extract invoice id | |
| inv_id = None | |
| try: | |
| parsed_inv = json.loads(inv_res_text) if isinstance(inv_res_text, str) and inv_res_text.strip().startswith("{") else None | |
| if parsed_inv: | |
| if isinstance(parsed_inv.get("invoice"), dict) and parsed_inv["invoice"].get("invoice_id"): | |
| inv_id = str(parsed_inv["invoice"]["invoice_id"]) | |
| else: | |
| for k in ("invoice_id","id"): | |
| if k in parsed_inv: | |
| inv_id = str(parsed_inv[k]); break | |
| data = parsed_inv.get("data") | |
| if not inv_id and isinstance(data, list) and len(data) > 0: | |
| d0 = data[0] | |
| if isinstance(d0, dict): | |
| for key in ("invoice_id", "id"): | |
| if key in d0: | |
| inv_id = str(d0[key]); break | |
| except Exception: | |
| inv_id = None | |
| # attempt attachment upload if provided | |
| attachment_path = None | |
| if args.get("file_path"): | |
| attachment_path = args.get("file_path") | |
| elif args.get("file_url"): | |
| fu = args.get("file_url") | |
| if isinstance(fu, str) and fu.startswith("file://"): | |
| attachment_path = fu.replace("file://", "") | |
| if inv_id and attachment_path: | |
| try: | |
| attach_res = upload_invoice_attachment(inv_id, attachment_path) | |
| results.append(f"upload_attachment -> {attach_res}") | |
| except Exception as e: | |
| results.append(f"upload_attachment failed: {e}") | |
| else: | |
| # Generic attempt to call a local tool | |
| if callable(globals().get(tool)): | |
| try: | |
| args_norm = args if not isinstance(args, dict) else _normalize_local_path_args(args) | |
| if isinstance(args_norm, dict): | |
| out = globals()[tool](**args_norm) | |
| elif isinstance(args_norm, (list, tuple)): | |
| out = globals()[tool](*args_norm) | |
| else: | |
| out = globals()[tool](args_norm) | |
| results.append(f"{tool} -> {out}") | |
| except Exception as e: | |
| results.append(f"{tool} failed: {e}") | |
| else: | |
| results.append(f"Tool '{tool}' not found.") | |
| if not results: | |
| return "(Execute) No executable results produced." | |
| reply = "Execution results:\n" + "\n".join(results) | |
| if contact_id: | |
| reply += f"\nPrimary contact id: {contact_id}" | |
| return reply | |
| # ---------------------------- | |
| # Simple explicit command parser for debugging | |
| # ---------------------------- | |
| def try_parse_and_invoke_command(text: str): | |
| text = text.strip() | |
| m = re.match(r"^create_record\s+(\w+)\s+(.+)$", text, re.I) | |
| if m: | |
| module = m.group(1); body = m.group(2) | |
| try: | |
| record_data = json.loads(body) | |
| except Exception: | |
| return "Invalid JSON for record_data" | |
| return create_record(module, record_data) | |
| m = re.match(r"^create_invoice\s+(.+)$", text, re.I) | |
| if m: | |
| body = m.group(1) | |
| try: | |
| invoice_data = json.loads(body) | |
| except Exception: | |
| return "Invalid JSON for invoice_data" | |
| return create_invoice(invoice_data) | |
| m = re.match(r"^(\/mnt\/data\/\S+)$", text) | |
| if m: | |
| path = m.group(1); return process_document(path) | |
| return None | |
| # ---------------------------- | |
| # Chat flow: call LLM, parse JSON responses and execute | |
| # ---------------------------- | |
| def deepseek_response(message: str, history: Optional[List[Tuple[str,str]]] = None) -> str: | |
| history = history or [] | |
| system_prompt = "You are Zoho Assistant. Prefer concise answers. If you want to call a tool, return a JSON object: {\"tool\": \"create_record\", \"args\": {...}}" | |
| history_text = "" | |
| for pair in history: | |
| try: | |
| u,a = pair[0], pair[1] | |
| history_text += f"User: {u}\nAssistant: {a}\n" | |
| except Exception: | |
| continue | |
| prompt = f"{system_prompt}\n{history_text}\nUser: {message}\nAssistant:" | |
| gen = local_llm_generate(prompt, max_tokens=256) | |
| text = gen.get("text", "") | |
| payload = text.strip() | |
| # If model produced JSON instruction, execute | |
| if payload.startswith("{") or payload.startswith("["): | |
| try: | |
| exec_result = parse_and_execute_model_tool_output(payload, history) | |
| return exec_result | |
| except Exception as e: | |
| logger.error("Execution error: %s", e) | |
| traceback.print_exc() | |
| return f"(Execute) Error: {e}" | |
| return text | |
| # ---------------------------- | |
| # Gradio chat handler | |
| # ---------------------------- | |
| def chat_handler(message, history): | |
| history = history or [] | |
| trimmed = (message or "").strip() | |
| # explicit debug commands | |
| cmd = try_parse_and_invoke_command(trimmed) | |
| if cmd is not None: | |
| return cmd | |
| # dev path handling | |
| if trimmed.startswith("/mnt/data/"): | |
| try: | |
| doc = process_document(trimmed) | |
| return f"Processed file {doc.get('file')}. Extracted: {json.dumps(doc.get('extracted_data'), ensure_ascii=False)}" | |
| except Exception as e: | |
| return f"Error processing document: {e}" | |
| try: | |
| return deepseek_response(trimmed, history) | |
| except Exception as e: | |
| logger.error("deepseek_response error: %s", e) | |
| traceback.print_exc() | |
| return rule_based_response(trimmed) | |
| # ---------------------------- | |
| # Gradio UI builder | |
| # ---------------------------- | |
| def chat_interface(): | |
| return gr.ChatInterface(fn=chat_handler, textbox=gr.Textbox(placeholder="Ask me to create contacts, invoices, upload docs (or paste /mnt/data/... for dev).")) | |
| # ---------------------------- | |
| # Demo runner: directly perform create_contact -> create_invoice sequence | |
| # ---------------------------- | |
| def perform_demo_actions(contact_payload: dict, invoice_items: List[dict], vat_pct: float = 0.0): | |
| """Directly call create_record and create_invoice to perform actions on Zoho. | |
| This bypasses the LLM and demonstrates the end-to-end flow. | |
| Returns a dict with results and any created ids. | |
| """ | |
| results = {"create_record": None, "contact_id": None, "create_invoice": None} | |
| try: | |
| # create contact | |
| res_text = create_record("Contacts", contact_payload) | |
| results["create_record"] = res_text | |
| cid = _extract_created_id_from_zoho_response(res_text) | |
| results["contact_id"] = cid | |
| if not cid: | |
| logger.warning("No contact id returned from create_record. Aborting invoice creation.") | |
| return results | |
| invoice_payload = _build_invoice_payload_for_zoho(cid, invoice_items, currency=contact_payload.get("Currency"), vat_pct=vat_pct) | |
| inv_res = create_invoice(invoice_payload) | |
| results["create_invoice"] = inv_res | |
| # attempt to extract invoice id | |
| try: | |
| parsed = json.loads(inv_res) if isinstance(inv_res, str) and inv_res.strip().startswith("{") else None | |
| inv_id = None | |
| if parsed: | |
| for k in ("invoice_id","id"): | |
| if k in parsed: | |
| inv_id = str(parsed[k]); break | |
| data = parsed.get("data") | |
| if not inv_id and isinstance(data, list) and len(data) > 0: | |
| d0 = data[0] | |
| if isinstance(d0, dict): | |
| for key in ("invoice_id","id"): | |
| if key in d0: | |
| inv_id = str(d0[key]); break | |
| results["invoice_id"] = inv_id | |
| except Exception: | |
| results["invoice_id"] = None | |
| return results | |
| except Exception as e: | |
| logger.error("perform_demo_actions error: %s", e) | |
| traceback.print_exc() | |
| return results | |
| # ---------------------------- | |
| # Entrypoint (includes a one-shot demo process_document call using your uploaded path) | |
| # ---------------------------- | |
| if __name__ == "__main__": | |
| logger.info("Starting MCP server. Loaded model: %s", LOADED_MODEL_NAME) | |
| # Developer demo: process the uploaded file path you provided earlier | |
| demo_file_path = "/mnt/data/script_zoho_mcp" | |
| try: | |
| demo_result = process_document(demo_file_path) | |
| logger.info("Demo process_document result for %s: %s", demo_file_path, json.dumps(demo_result)) | |
| print("Demo process_document result:", json.dumps(demo_result)) | |
| except Exception as e: | |
| logger.warning("Demo process_document failed: %s", e) | |
| # --- NEW: run a direct contact+invoice demo using the user's supplied example --- | |
| # This will actually call Zoho APIs (create contact -> create invoice) using config.py credentials. | |
| demo_contact = { | |
| "contact": "VachMe", | |
| "Email": "vachaspathi1234@gmail.com", | |
| "mobile": "9876543210", | |
| "Currency": "$" | |
| } | |
| demo_items = [ | |
| {"name": "Car Wash - Wax Coating", "amount": 150.0, "quantity": 1}, | |
| {"name": "Car Wash - Washing", "amount": 50.0, "quantity": 1} | |
| ] | |
| try: | |
| demo_flow_res = perform_demo_actions(demo_contact, demo_items, vat_pct=16.0) | |
| logger.info("Demo create contact+invoice result: %s", json.dumps(demo_flow_res)) | |
| print("Demo create contact+invoice result:", json.dumps(demo_flow_res)) | |
| except Exception as e: | |
| logger.warning("Demo contact+invoice failed: %s", e) | |
| demo = chat_interface() | |
| demo.launch(server_name="0.0.0.0", server_port=7860) | |