Spaces:
Sleeping
Sleeping
| import requests | |
| import json | |
| import config | |
| from utils import normalize_date | |
| import time | |
| # ===================================================== | |
| # 1. AUTHENTICATION | |
| # ===================================================== | |
| def get_headers(): | |
| try: | |
| params = { | |
| "refresh_token": config.REFRESH_TOKEN, | |
| "client_id": config.CLIENT_ID, | |
| "client_secret": config.CLIENT_SECRET, | |
| "grant_type": "refresh_token", | |
| "redirect_uri": "http://www.google.com" | |
| } | |
| # Added 10s timeout to prevent hanging | |
| resp = requests.post(config.AUTH_URL, data=params, timeout=10) | |
| if resp.status_code == 200: | |
| return {"Authorization": f"Zoho-oauthtoken {resp.json().get('access_token')}"} | |
| except Exception as e: | |
| print(f"Auth Error: {e}") | |
| return None | |
| # ===================================================== | |
| # 2. DIRECT CUSTOMER RESOLVER | |
| # ===================================================== | |
| def get_customer_id(name, headers): | |
| """ | |
| Strict Logic: Search -> Found? -> Return ID. | |
| Else -> Create -> Return ID. | |
| """ | |
| if not name or str(name).lower() in ["unknown", "none"]: | |
| name = "MCP Generic Customer" | |
| print(f" π Searching Zoho for: '{name}'...") | |
| # --- STEP 1: SEARCH --- | |
| search_url = f"{config.API_BASE}/contacts" | |
| search_params = { | |
| 'organization_id': config.ORGANIZATION_ID, | |
| 'contact_name_contains': name | |
| } | |
| try: | |
| # 10s Timeout | |
| res = requests.get(search_url, headers=headers, params=search_params, timeout=10) | |
| if res.status_code == 200: | |
| contacts = res.json().get('contacts', []) | |
| if len(contacts) > 0: | |
| # MATCH FOUND! | |
| found_id = contacts[0]['contact_id'] | |
| print(f" β Found Existing Customer: {contacts[0]['contact_name']} ({found_id})") | |
| return found_id | |
| else: | |
| print(f" β οΈ Search API Status: {res.status_code} (Not 200)") | |
| except Exception as e: | |
| print(f" β Search Exception: {e}") | |
| # --- STEP 2: CREATE (Only if Search failed) --- | |
| print(f" π Not found. Creating new customer: '{name}'...") | |
| create_payload = { | |
| "contact_name": name, | |
| "contact_type": "customer" | |
| } | |
| try: | |
| res = requests.post(search_url, headers=headers, params={'organization_id': config.ORGANIZATION_ID}, json=create_payload, timeout=10) | |
| if res.status_code == 201: | |
| new_id = res.json().get('contact', {}).get('contact_id') | |
| print(f" β Created New: {new_id}") | |
| return new_id | |
| # Handle Duplicate Error (Race Condition) | |
| if res.json().get('code') == 3062: | |
| print(" β οΈ Zoho says Duplicate. Forcing Search on exact name...") | |
| # Logic: If we are here, the 'contains' search failed, but it exists. | |
| # We assume it exists and we just failed to fetch it. | |
| # We cannot proceed without an ID. | |
| return None | |
| print(f" β Creation Failed: {res.text}") | |
| except Exception as e: | |
| print(f" β Create Exception: {e}") | |
| return None | |
| # ===================================================== | |
| # 3. INVOICE PUSHER | |
| # ===================================================== | |
| def create_invoice(cid, ai_data, headers): | |
| print(" π¦ Preparing Invoice Payload...") | |
| # 1. Process Line Items (Handle 0 items case) | |
| raw_items = ai_data.get('line_items', []) | |
| clean_items = [] | |
| if raw_items: | |
| for item in raw_items: | |
| clean_items.append({ | |
| "name": str(item.get("name", "Service"))[:100], # Limit 100 chars | |
| "rate": float(item.get("rate", 0)), | |
| "quantity": float(item.get("quantity", 1)) | |
| }) | |
| # Fallback if list is empty | |
| if not clean_items: | |
| print(" β οΈ No items found. Using Default 'Services Rendered' Item.") | |
| clean_items = [{ | |
| "name": "Services Rendered", | |
| "description": "Auto-generated from document total", | |
| "rate": float(ai_data.get("total", 0)), | |
| "quantity": 1 | |
| }] | |
| # 2. Build Payload | |
| payload = { | |
| "customer_id": cid, | |
| "date": normalize_date(ai_data.get("date")), | |
| "invoice_number": ai_data.get("reference_number") or f"INV-{int(time.time())}", | |
| "line_items": clean_items, | |
| "status": "draft" | |
| } | |
| # 3. Send | |
| url = f"{config.API_BASE}/invoices?organization_id={config.ORGANIZATION_ID}" | |
| try: | |
| print(" π Sending Invoice to Zoho...") | |
| res = requests.post(url, headers=headers, json=payload, timeout=15) | |
| # Auto-Number Retry | |
| if res.status_code != 201 and res.json().get('code') == 4097: | |
| print(" β οΈ Auto-Numbering active. Removing ID and Retrying...") | |
| del payload['invoice_number'] | |
| res = requests.post(url, headers=headers, json=payload, timeout=15) | |
| if res.status_code == 201: | |
| inv = res.json().get('invoice', {}) | |
| return f"β SUCCESS! Invoice Created.\nNumber: {inv.get('invoice_number')}\nLink: https://books.zoho.in/app/{config.ORGANIZATION_ID}#/invoices/{inv.get('invoice_id')}" | |
| return f"β Zoho Error: {res.text}" | |
| except Exception as e: | |
| return f"β Connection Error: {e}" | |
| # ===================================================== | |
| # 4. MAIN EXECUTOR | |
| # ===================================================== | |
| def route_and_execute(ai_output): | |
| # 1. Get Auth | |
| headers = get_headers() | |
| if not headers: | |
| yield "β Auth Failed. Check Credentials." | |
| return | |
| # 2. Get Data | |
| data = ai_output.get('data', {}) | |
| vendor_name = data.get('vendor_name') | |
| yield f"π Processing Invoice for: {vendor_name}\n" | |
| # 3. Get Customer ID | |
| cid = get_customer_id(vendor_name, headers) | |
| if not cid: | |
| # HARD FALLBACK: If name search fails, try generic | |
| yield " β οΈ Specific customer failed. Trying 'Generic' fallback...\n" | |
| cid = get_customer_id("MCP Generic Customer", headers) | |
| if not cid: | |
| yield "π Critical: Could not find/create any customer." | |
| return | |
| yield f" -> Customer ID: {cid}\n" | |
| # 4. Create Invoice | |
| result = create_invoice(cid, data, headers) | |
| yield result |