File size: 3,080 Bytes
7254b09
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
# app.py – MCP wrapper for triggering a Databricks spam check job

import os
import time
import json
import requests
import gradio as gr

# ── Environment setup ─────────────────────────────────────────
DATABRICKS_HOST = os.getenv("DATABRICKS_HOST")
DATABRICKS_TOKEN = os.getenv("DATABRICKS_TOKEN")
DATABRICKS_NOTEBOOK_PATH = os.getenv("DATABRICKS_NOTEBOOK_PATH")
EXISTING_CLUSTER_ID = os.getenv("EXISTING_CLUSTER_ID")

HEADERS = {
    "Authorization": f"Bearer {DATABRICKS_TOKEN}",
    "Content-Type": "application/json"
}

# ── MCP Tool ──────────────────────────────────────────────────
def run_spam_check(phone_number: str):
    # 1. Submit the job to Databricks
    submit_payload = {
        "run_name": f"FraudCheck_{phone_number}",
        "notebook_task": {
            "notebook_path": DATABRICKS_NOTEBOOK_PATH,
            "base_parameters": {
                "phone_number": phone_number
            }
        },
        "existing_cluster_id": EXISTING_CLUSTER_ID
    }

    response = requests.post(
        f"{DATABRICKS_HOST}/api/2.1/jobs/runs/submit",
        headers=HEADERS,
        json=submit_payload
    )

    if response.status_code != 200:
        return {"error": "Failed to start job", "details": response.text}

    run_id = response.json()["run_id"]

    # 2. Poll for completion
    while True:
        status_response = requests.get(
            f"{DATABRICKS_HOST}/api/2.1/jobs/runs/get?run_id={run_id}",
            headers=HEADERS
        )
        status_json = status_response.json()
        run_state = status_json["state"]["life_cycle_state"]
        if run_state in ("TERMINATED", "SKIPPED", "INTERNAL_ERROR"):
            break
        time.sleep(2)

    # 3. Get output
    output_response = requests.get(
        f"{DATABRICKS_HOST}/api/2.1/jobs/runs/get-output?run_id={run_id}",
        headers=HEADERS
    )

    if output_response.status_code != 200:
        return {"error": "Failed to fetch output", "details": output_response.text}

    output_json = output_response.json()
    notebook_output = output_json.get("notebook_output", {}).get("result")

    if notebook_output:
        try:
            return json.loads(notebook_output) if isinstance(notebook_output, str) else notebook_output
        except json.JSONDecodeError:
            return {"raw_output": notebook_output}
    else:
        return {"warning": "No result returned in notebook output."}

# ── Minimal UI to test ────────────────────────────────────────
demo = gr.Interface(
    fn=run_spam_check,
    inputs=gr.Textbox(label="Phone Number to Check"),
    outputs="json",
    title="Databricks Spam Check",
    description="Triggers a Databricks job to check for spam/fraud using a phone number.",
    api_name="spam_check"
)

if __name__ == "__main__":
    demo.launch(mcp_server=True)