Spaces:
Runtime error
Runtime error
| import json | |
| import os | |
| import requests | |
| import numpy as np | |
| import openai | |
| import paramiko | |
| from nltk.stem import WordNetLemmatizer | |
| from nltk.corpus import wordnet, stopwords | |
| from nltk.tokenize import word_tokenize | |
| import nltk | |
| from ..tool import Tool | |
| from swarms.tools.tools.database.utils.db_parser import get_conf | |
| from swarms.tools.tools.database.utils.database import DBArgs, Database | |
| from swarms.tools.models.customllm import CustomLLM | |
| from swarms.tools.knowledge.knowledge_extraction import KnowledgeExtraction | |
| from swarms.tools.tools.db_diag.anomaly_detection import detect_anomalies | |
| from swarms.tools.tools.db_diag.anomaly_detection import prometheus | |
| from swarms.tools.tools.db_diag.example_generate import bm25 | |
| import warnings | |
| def obtain_values_of_metrics(start_time, end_time, metrics): | |
| if ( | |
| end_time - start_time > 11000 * 3 | |
| ): # maximum resolution of 11,000 points per timeseries | |
| # raise Exception("The time range is too large, please reduce the time range") | |
| warnings.warn( | |
| "The time range ({}, {}) is too large, please reduce the time range".format( | |
| start_time, end_time | |
| ) | |
| ) | |
| required_values = {} | |
| print(" ====> metrics: ", metrics) | |
| for metric in metrics: | |
| metric_values = prometheus( | |
| "api/v1/query_range", | |
| {"query": metric, "start": start_time, "end": end_time, "step": "3"}, | |
| ) | |
| if metric_values["data"]["result"] != []: | |
| metric_values = metric_values["data"]["result"][0]["values"] | |
| else: | |
| raise Exception("No metric values found for the given time range") | |
| # compute the average value of the metric | |
| max_value = np.max(np.array([float(value) for _, value in metric_values])) | |
| required_values[metric] = max_value | |
| return required_values | |
| def find_abnormal_metrics(start_time, end_time, monitoring_metrics, resource): | |
| resource_keys = ["memory", "cpu", "disk", "network"] | |
| abnormal_metrics = [] | |
| for metric_name in monitoring_metrics: | |
| interval_time = 5 | |
| metric_values = prometheus( | |
| "api/v1/query_range", | |
| { | |
| "query": metric_name, | |
| "start": start_time - interval_time * 60, | |
| "end": end_time + interval_time * 60, | |
| "step": "3", | |
| }, | |
| ) | |
| if metric_values["data"]["result"] != []: | |
| metric_values = metric_values["data"]["result"][0]["values"] | |
| else: | |
| continue | |
| if detect_anomalies(np.array([float(value) for _, value in metric_values])): | |
| success = True | |
| for key in resource_keys: | |
| if key in metric_name and key != resource: | |
| success = False | |
| break | |
| if success: | |
| abnormal_metrics.append(metric_name) | |
| return abnormal_metrics | |
| def build_db_diag_tool(config) -> Tool: | |
| tool = Tool( | |
| "Database Diagnosis", | |
| "Diagnose the bottlenecks of a database based on relevant metrics", | |
| name_for_model="db_diag", | |
| description_for_model="Plugin for diagnosing the bottlenecks of a database based on relevant metrics", | |
| logo_url="https://commons.wikimedia.org/wiki/File:Postgresql_elephant.svg", | |
| contact_email="hello@contact.com", | |
| legal_info_url="hello@legal.com", | |
| ) | |
| # URL_CURRENT_WEATHER= "http://api.weatherapi.com/v1/current.json" | |
| # URL_FORECAST_WEATHER = "http://api.weatherapi.com/v1/forecast.json" | |
| URL_PROMETHEUS = "http://8.131.229.55:9090/" | |
| prometheus_metrics = { | |
| "cpu_usage": 'avg(rate(process_cpu_seconds_total{instance="172.27.58.65:9187"}[5m]) * 1000)', | |
| "cpu_metrics": [ | |
| 'node_scrape_collector_duration_seconds{instance="172.27.58.65:9100"}', | |
| 'node_procs_running{instance="172.27.58.65:9100"}', | |
| 'node_procs_blocked{instance="172.27.58.65:9100"}', | |
| 'node_entropy_available_bits{instance="172.27.58.65:9100"}', | |
| 'node_load1{instance="172.27.58.65:9100"}', | |
| 'node_load5{instance="172.27.58.65:9100"}', | |
| 'node_load15{instance="172.27.58.65:9100"}', | |
| ], | |
| "memory_usage": 'node_memory_MemTotal_bytes{instance=~"172.27.58.65:9100"} - (node_memory_Cached_bytes{instance=~"172.27.58.65:9100"} + node_memory_Buffers_bytes{instance=~"172.27.58.65:9100"} + node_memory_MemFree_bytes{instance=~"172.27.58.65:9100"})', | |
| "memory_metrics": [ | |
| 'node_memory_Inactive_anon_bytes{instance="172.27.58.65:9100"}', | |
| 'node_memory_MemFree_bytes{instance="172.27.58.65:9100"}', | |
| 'node_memory_Dirty_bytes{instance="172.27.58.65:9100"}', | |
| 'pg_stat_activity_count{datname=~"(imdbload|postgres|sysbench|template0|template1|tpcc|tpch)", instance=~"172.27.58.65:9187", state="active"} !=0', | |
| ], | |
| "network_metrics": [ | |
| 'node_sockstat_TCP_tw{instance="172.27.58.65:9100"}', | |
| 'node_sockstat_TCP_orphan{instance="172.27.58.65:9100"}', | |
| ], | |
| } | |
| # "node_sockstat_TCP_tw{instance=\"172.27.58.65:9100\"}", | |
| # load knowlege extractor | |
| knowledge_matcher = KnowledgeExtraction( | |
| "/swarms.tools/tools/db_diag/root_causes_dbmind.jsonl" | |
| ) | |
| # load db settings | |
| script_path = os.path.abspath(__file__) | |
| script_dir = os.path.dirname(script_path) | |
| config = get_conf(script_dir + "/my_config.ini", "postgresql") | |
| dbargs = DBArgs("postgresql", config=config) # todo assign database name | |
| # send request to database | |
| db = Database(dbargs, timeout=-1) | |
| server_config = get_conf(script_dir + "/my_config.ini", "benchserver") | |
| monitoring_metrics = [] | |
| with open( | |
| str(os.getcwd()) + "/swarms/tools/db_diag/database_monitoring_metrics", "r" | |
| ) as f: | |
| monitoring_metrics = f.read() | |
| monitoring_metrics = eval(monitoring_metrics) | |
| def obtain_start_and_end_time_of_anomaly(): | |
| # Create SSH client | |
| ssh = paramiko.SSHClient() | |
| ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) | |
| start_time = 0 | |
| end_time = 0 | |
| try: | |
| # Connect to the remote server | |
| ssh.connect( | |
| server_config["server_address"], | |
| username=server_config["username"], | |
| password=server_config["password"], | |
| ) | |
| # Create an SFTP client | |
| sftp = ssh.open_sftp() | |
| # Change to the remote directory | |
| sftp.chdir(server_config["remote_directory"]) | |
| # Get a list of files in the directory | |
| files = sftp.listdir() | |
| required_file_name = "" | |
| required_tp = -1 | |
| # Read the contents of each file | |
| for filename in files: | |
| remote_filepath = server_config["remote_directory"] + "/" + filename | |
| if "trigger_time_log" not in filename: | |
| continue | |
| tp = filename.split("_")[0] | |
| if tp.isdigit(): | |
| tp = int(tp) | |
| if required_tp < tp: | |
| required_tp = tp | |
| required_file_name = filename | |
| file_content = sftp.open( | |
| server_config["remote_directory"] + "/" + required_file_name | |
| ).read() | |
| file_content = file_content.decode() | |
| tps = file_content.split("\n")[0] | |
| start_time = tps.split(";")[0] | |
| end_time = tps.split(";")[1] | |
| finally: | |
| # Close the SFTP session and SSH connection | |
| sftp.close() | |
| ssh.close() | |
| return {"start_time": start_time, "end_time": end_time} | |
| def whether_is_abnormal_metric( | |
| start_time: int, end_time: int, metric_name: str = "cpu_usage" | |
| ): | |
| interval_time = 5 | |
| metric_values = prometheus( | |
| "api/v1/query_range", | |
| { | |
| "query": prometheus_metrics[metric_name], | |
| "start": start_time - interval_time * 60, | |
| "end": end_time + interval_time * 60, | |
| "step": "3", | |
| }, | |
| ) | |
| # prometheus('api/v1/query_range', {'query': '100 - (avg(irate(node_cpu_seconds_total{instance=~"172.27.58.65:9100",mode="idle"}[1m])) * 100)', 'start': '1684412385', 'end': '1684413285', 'step': '3'}) | |
| # print(" === metric_values", metric_values) | |
| if metric_values["data"]["result"] != []: | |
| metric_values = metric_values["data"]["result"][0]["values"] | |
| else: | |
| raise Exception("No metric values found for the given time range") | |
| # is_abnormal = detect_anomalies(np.array([float(value) for _, value in metric_values])) | |
| is_abnormal = True | |
| if is_abnormal: | |
| return "The metric is abnormal" | |
| else: | |
| return "The metric is normal" | |
| def cpu_diagnosis_agent(start_time: int, end_time: int): | |
| # live_tuples\n- dead_tuples\n- table_size | |
| cpu_metrics = prometheus_metrics["cpu_metrics"] | |
| cpu_metrics = cpu_metrics # + find_abnormal_metrics(start_time, end_time, monitoring_metrics, 'cpu') | |
| print("==== cpu_metrics", cpu_metrics) | |
| detailed_cpu_metrics = obtain_values_of_metrics( | |
| start_time, end_time, cpu_metrics | |
| ) | |
| docs_str = knowledge_matcher.match(detailed_cpu_metrics) | |
| prompt = """The CPU metric is abnormal. Then obtain the CPU relevant metric values from Prometheus: {}. | |
| Next output the analysis of potential causes of the high CPU usage based on the CPU relevant metric values, | |
| {}""".format( | |
| detailed_cpu_metrics, docs_str | |
| ) | |
| print(prompt) | |
| # response = openai.Completion.create( | |
| # model="text-davinci-003", | |
| # prompt=prompt, | |
| # temperature=0, | |
| # max_tokens=1000, | |
| # top_p=1.0, | |
| # frequency_penalty=0.0, | |
| # presence_penalty=0.0, | |
| # stop=["#", ";"] | |
| # ) | |
| # output_text = response.choices[0].text.strip() | |
| # Set up the OpenAI GPT-3 model | |
| # model_engine = "gpt-3.5-turbo" | |
| # prompt_response = openai.ChatCompletion.create( | |
| # engine="gpt-3.5-turbo", | |
| # messages=[ | |
| # {"role": "assistant", "content": "The table schema is as follows: " + str(schema)}, | |
| # {"role": "user", "content": str(prompt)} | |
| # ] | |
| # ) | |
| # output_text = prompt_response['choices'][0]['message']['content'] | |
| llm = CustomLLM() | |
| output_analysis = llm(prompt) | |
| return {"diagnose": output_analysis, "knowledge": docs_str} | |
| def memory_diagnosis_agent(start_time: int, end_time: int): | |
| memory_metrics = prometheus_metrics["memory_metrics"] | |
| memory_metrics = prometheus_metrics["memory_metrics"] | |
| memory_metrics = memory_metrics # + find_abnormal_metrics(start_time, end_time, monitoring_metrics, 'memory') | |
| detailed_memory_metrics = obtain_values_of_metrics( | |
| start_time, end_time, memory_metrics | |
| ) | |
| openai.api_key = os.environ["OPENAI_API_KEY"] | |
| db = Database(dbargs, timeout=-1) | |
| slow_queries = db.obtain_historical_slow_queries() | |
| slow_query_state = "" | |
| for i, query in enumerate(slow_queries): | |
| slow_query_state += str(i + 1) + ". " + str(query) + "\n" | |
| print(slow_query_state) | |
| # TODO: need a similarity match function to match the top-K examples | |
| # 1. get the categories of incoming metrics. Such as "The abnormal metrics include A, B, C, D" | |
| # 2. embedding the metrics | |
| # note: 这个metrics的embedding有可能预计算吗?如果metrics的种类(组合数)有限的话 | |
| # 3. match the top-K examples(embedding) | |
| # note: 不用embedding如何有效的筛选出来与当前metrics最相关的example呢?可以枚举吗?比如如果我知道某一个example涉及到哪些metrics? | |
| # 该如何判断某一个metrics跟一段文本是相关的呢?能否用一个模型来判断一段文本涉及到哪些metrics呢?重新训练的话感觉需要很多样本才行 | |
| # 能不能用关键词数量? | |
| docs_str = knowledge_matcher.match(detailed_memory_metrics) | |
| prompt = """The memory metric is abnormal. Then obtain the memory metric values from Prometheus: {}. The slow queries are: | |
| {} | |
| Output the analysis of potential causes of the high memory usage based on the memory metric values and slow queries, e.g., | |
| {} | |
| Note: include the important slow queries in the output. | |
| """.format( | |
| detailed_memory_metrics, slow_query_state, docs_str | |
| ) | |
| # print(prompt) | |
| # response = openai.Completion.create( | |
| # model="text-davinci-003", | |
| # prompt=prompt, | |
| # temperature=0, | |
| # max_tokens=1000, | |
| # top_p=1.0, | |
| # frequency_penalty=0.0, | |
| # presence_penalty=0.0, | |
| # stop=["#", ";"] | |
| # ) | |
| # output_text = response.choices[0].text.strip() | |
| # Set up the OpenAI GPT-3 model | |
| # model_engine = "gpt-3.5-turbo" | |
| # prompt_response = openai.ChatCompletion.create( | |
| # engine="gpt-3.5-turbo", | |
| # messages=[ | |
| # {"role": "assistant", "content": "The table schema is as follows: " + str(schema)}, | |
| # {"role": "user", "content": str(prompt)} | |
| # ] | |
| # ) | |
| # output_text = prompt_response['choices'][0]['message']['content'] | |
| llm = CustomLLM() | |
| output_analysis = llm(prompt) | |
| return {"diagnose": output_analysis, "knowledge": docs_str} | |
| return tool | |