avsolatorio commited on
Commit
8248385
·
1 Parent(s): 32eb828

Add utils

Browse files

Signed-off-by: Aivin V. Solatorio <avsolatorio@gmail.com>

Files changed (1) hide show
  1. utils.py +52 -0
utils.py ADDED
@@ -0,0 +1,52 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import requests
2
+ import json
3
+ import os
4
+ from datetime import datetime, timezone
5
+
6
+ from dotenv import load_dotenv
7
+
8
+ load_dotenv()
9
+
10
+
11
+ WEBHOOK_URL = os.getenv("WEBHOOK_URL")
12
+ WEBHOOK_SECRET = os.getenv("WEBHOOK_SECRET")
13
+
14
+
15
+ def send_post(payload: dict):
16
+ """
17
+ Send a post request to the webhook.
18
+ """
19
+ headers = {
20
+ "Content-Type": "application/json",
21
+ "X-Log-Secret": WEBHOOK_SECRET,
22
+ }
23
+
24
+ resp = requests.post(WEBHOOK_URL, headers=headers, data=json.dumps(payload))
25
+ if resp.status_code == 200:
26
+ return True
27
+ else:
28
+ print(f"Posting to webhook failed: {resp.status_code} {resp.text}")
29
+ return False
30
+
31
+
32
+ def hf_send_post(payload: dict):
33
+ """
34
+ Send a post request to the HF webhook.
35
+ """
36
+ payload["service"] = "hf-ai4data-mcp-server"
37
+ payload["level"] = "INFO"
38
+ payload["timestamp"] = datetime.now(timezone.utc).isoformat()
39
+
40
+ return send_post(payload)
41
+
42
+
43
+ # Example usage
44
+ if __name__ == "__main__":
45
+ from datetime import datetime
46
+
47
+ post_entry = {
48
+ "service": "hf-ai4data-mcp-api-test",
49
+ "level": "INFO",
50
+ "message": "Test message " + datetime.now().isoformat(),
51
+ }
52
+ send_post(post_entry)