File size: 3,759 Bytes
b057ee3
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
import requests
from config import BOTS, TIMEOUT_SEC


def get_bot_config(device_key: str):
    bot = BOTS.get(device_key)
    if not bot:
        raise ValueError(f"Unknown device_key: {device_key}")
    return bot


def get_device_key_by_instance_id(instance_id: str):
    for device_key, bot in BOTS.items():
        if str(bot.get("instance_id")) == str(instance_id):
            return device_key
    return None


def build_greenapi_base(instance_id: str) -> str:
    return f"https://7105.api.greenapi.com/waInstance{instance_id}"


def get_greenapi_urls(device_key: str):
    bot = get_bot_config(device_key)

    instance_id = bot["instance_id"]
    token = bot["token"]

    base = build_greenapi_base(instance_id)

    return {
        "text": f"{base}/sendMessage/{token}",
        "reply_buttons": f"{base}/sendInteractiveButtonsReply/{token}",
        "url_buttons": f"{base}/sendInteractiveButtons/{token}",
    }


def send_text_message(device_key: str, chat_id: str, message: str):
    urls = get_greenapi_urls(device_key)

    payload = {
        "chatId": chat_id,
        "message": message
    }

    headers = {"Content-Type": "application/json"}

    resp = requests.post(
        urls["text"],
        json=payload,
        headers=headers,
        timeout=TIMEOUT_SEC
    )
    return resp.status_code, (resp.text or "")[:1500]


def send_reply_buttons(device_key: str, chat_id: str, header: str, body: str, footer: str, buttons: list):
    urls = get_greenapi_urls(device_key)

    payload = {
        "chatId": chat_id,
        "header": header,
        "body": body,
        "footer": footer,
        "buttons": buttons
    }

    resp = requests.post(
        urls["reply_buttons"],
        json=payload,
        timeout=TIMEOUT_SEC
    )
    return resp.status_code, (resp.text or "")[:1500]


def send_url_button(device_key: str, chat_id: str, header: str, body: str, footer: str, url: str, button_text: str = "فتح الرابط"):
    urls = get_greenapi_urls(device_key)

    payload = {
        "chatId": chat_id,
        "header": header,
        "body": body,
        "footer": footer,
        "buttons": [
            {
                "type": "url",
                "buttonId": "1",
                "buttonText": button_text,
                "url": url
            }
        ]
    }

    resp = requests.post(
        urls["url_buttons"],
        json=payload,
        timeout=TIMEOUT_SEC
    )
    return resp.status_code, (resp.text or "")[:1500]


def extract_text_message(msg_data: dict) -> str:
    if not isinstance(msg_data, dict):
        return ""

    t = (msg_data.get("typeMessage") or "").strip()

    if t == "textMessage":
        return ((msg_data.get("textMessageData") or {}).get("textMessage") or "").strip()

    if t == "extendedTextMessage":
        return ((msg_data.get("extendedTextMessageData") or {}).get("text") or "").strip()

    if t == "interactiveButtonsResponse":
        r = msg_data.get("interactiveButtonsResponse") or {}
        txt = (r.get("selectedDisplayText") or "").strip()
        if txt:
            return txt
        return (r.get("selectedId") or "").strip()

    for path in [
        ("textMessageData", "textMessage"),
        ("extendedTextMessageData", "text"),
        ("interactiveButtonsResponse", "selectedDisplayText"),
        ("interactiveButtonsResponse", "selectedId"),
        ("text",),
        ("message",),
    ]:
        cur = msg_data
        for k in path:
            cur = cur.get(k) if isinstance(cur, dict) else None
        if isinstance(cur, str) and cur.strip():
            return cur.strip()

    return ""