File size: 2,957 Bytes
ee3e701 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 | import json
import math
import os
import re
import time
from typing import Dict
import requests
from internlm.utils.logger import get_logger
logger = get_logger(__file__)
def initialize_light_monitor(monitor_address: str = None):
try:
from uniscale_monitoring import init_monitor
init_monitor(monitor_address)
except Exception as e:
logger.warning(f"init monitor meet error: {e}")
def send_heartbeat(msg_type: str, msg: Dict):
def nan2none(v):
if isinstance(v, float) and math.isnan(v):
return None
return v
try:
from uniscale_monitoring import send_meta
data = {}
for k, v in msg.items():
if isinstance(v, Dict):
for k1, v1 in v.items():
new_k = f"{k}_{k1}".split(" ")[0]
new_k = re.sub(r"[^a-zA-Z0-9_]", "_", new_k)
data[new_k] = nan2none(v1)
else:
new_k = k.split(" ")[0]
new_k = re.sub(r"[^a-zA-Z0-9_]", "_", new_k)
data[new_k] = nan2none(v)
if os.getenv("CLUSTER_NAME"):
data.update({"cluster": os.getenv("CLUSTER_NAME")})
if msg_type == "train_metrics":
data.update({"msg_type": "train_metrics"})
elif msg_type == "init_time":
data.update({"msg_type": "init_time"})
elif msg_type == "stage_time":
data.update({"msg_type": "stage_time"})
send_meta(data, timeout=0.1)
except Exception as e:
logger.warning(f"send heartbeat meet error: {e}")
def send_feishu_msg_with_webhook(webhook: str, title: str, message: str):
"""
Use Feishu robot to send messages with the given webhook.
Args:
webhook (str): The webhook to be used to send message.
title (str): The message title.
message (str): The message body.
Returns:
The response from the request. Or catch the exception and return None.
Raises:
Exception: An exception rasied by the HTTP post request.
"""
headers = {"Content-Type": "application/json;charset=utf-8"}
msg_body = {
"timestamp": int(time.time()),
"msg_type": "post",
"content": {
"post": {
"zh_cn": {
"title": title,
"content": [
[
{
"tag": "text",
"text": message,
},
],
],
},
},
},
}
try:
res = requests.post(webhook, data=json.dumps(msg_body), headers=headers, timeout=30)
res = res.json()
print(f"Feishu webhook response: {res}")
except Exception as err: # pylint: disable=W0703
print(f"HTTP Post error: {err}")
res = None
return res
|