from dotenv import load_dotenv load_dotenv() import os # 環境変数から取得 SUPABASE_URL = os.environ.get('SUPABASE_URL') SUPABASE_KEY = os.environ.get('SUPABASE_KEY') import supabase table_threshold = "Threshold_data" table_sensor = "Sensor_data" table_troubleshooting = "Troubleshooting_collection" # クライアントの初期化 supabase_client = supabase.create_client(SUPABASE_URL, SUPABASE_KEY) # データ取得 (初回のみ実行) threshold_data = supabase_client.table(table_threshold).select("*").execute() sensor_data = supabase_client.table(table_sensor).select("*").execute() troubleshooting_data = supabase_client.table(table_troubleshooting).select("*").execute() import pandas as pd # データフレームへの変換 (初回のみ実行) threshold_df = pd.DataFrame(threshold_data.data) sensor_df = pd.DataFrame(sensor_data.data) troubleshooting_df = pd.DataFrame(troubleshooting_data.data) # Convert 'datetime' column to datetime objects (初回のみ実行) sensor_df['datetime'] = pd.to_datetime(sensor_df['datetime'], utc=True) # 閾値チェック関数の定義 def check_thresholds(sensor_df_filtered, threshold_df): # Renamed parameter to clarify it's the filtered data alerts = [] # '下限'と'上限'カラムを数値型に変換。変換できない値はNaNとする。 threshold_df['下限'] = pd.to_numeric(threshold_df['下限'], errors='coerce') threshold_df['上限'] = pd.to_numeric(threshold_df['上限'], errors='coerce') for _, row in threshold_df.iterrows(): metric = row["指標名"] min_val = row["下限"] max_val = row["上限"] data_no = row["No."] # Get the 'No.' from threshold_df # センサーデータに指標が存在しない場合はスキップ if metric not in sensor_df_filtered.columns: # Use filtered data continue # センサーデータの該当カラムを数値型に変換。変換できない値はNaNとする。 sensor_metric_data = pd.to_numeric(sensor_df_filtered[metric], errors='coerce') # Use filtered data for index, value in sensor_metric_data.items(): # Use the index from sensor_metric_data to get the timestamp from the filtered sensor_df passed to the function # Ensure the index exists in the filtered sensor_df if index in sensor_df_filtered.index: timestamp = sensor_df_filtered.loc[index, "datetime"] if "datetime" in sensor_df_filtered.columns else index else: # Handle cases where the index might not be in the filtered dataframe (shouldn't happen with .copy() and .items()) continue # 下限チェック if pd.notna(min_val) and pd.notna(value) and value < min_val: alerts.append({ "timestamp": timestamp, "metric": metric, "value": value, "status": f"下限値 {min_val} 未満", "data no.": data_no # Add the 'data no.' }) # 上限チェック if pd.notna(max_val) and pd.notna(value) and value > max_val: alerts.append({ "timestamp": timestamp, "metric": metric, "value": value, "status": f"上限値 {max_val} 超過", "data no.": data_no # Add the 'data no.' }) return pd.DataFrame(alerts) # Gradioインターフェースの構築 import gradio as gr import pandas as pd import supabase import datetime # Import datetime here as it's used in run_troubleshooting import pytz # Import pytz for timezone conversion from typing import List, Dict, Union # 閾値チェック関数 def check_thresholds(sensor_df_filtered: pd.DataFrame, threshold_df: pd.DataFrame) -> pd.DataFrame: """ センサーデータに対して閾値チェックを行い、下限値未満や上限値超過を検出する。 Args: sensor_df_filtered (pd.DataFrame): 対象期間で抽出したセンサーデータ。 - 必須列: "datetime"(時刻情報), センサー値列(指標名と一致する列) threshold_df (pd.DataFrame): 閾値情報のデータフレーム。 - 必須列: "指標名", "下限", "上限", "No." Returns: pd.DataFrame: 異常が検出された場合の結果データフレーム。 - 列: ["timestamp", "metric", "value", "status", "data no."] - 検出されなければ空の DataFrame(ただし列は固定)。 """ alerts: List[Dict[str, Union[str, float, datetime.datetime]]] = [] threshold_df['下限'] = pd.to_numeric(threshold_df['下限'], errors='coerce') threshold_df['上限'] = pd.to_numeric(threshold_df['上限'], errors='coerce') for _, row in threshold_df.iterrows(): metric: str = row["指標名"] min_val: float = row["下限"] max_val: float = row["上限"] data_no: int = row["No."] if metric not in sensor_df_filtered.columns: continue sensor_metric_data = pd.to_numeric(sensor_df_filtered[metric], errors='coerce') for index, value in sensor_metric_data.items(): if index not in sensor_df_filtered.index: continue timestamp: Union[pd.Timestamp, int] = ( sensor_df_filtered.loc[index, "datetime"] if "datetime" in sensor_df_filtered.columns else index ) if pd.notna(min_val) and pd.notna(value) and value < min_val: alerts.append({ "timestamp": timestamp, "metric": metric, "value": float(value), "status": f"下限値 {min_val} 未満", "data no.": data_no }) if pd.notna(max_val) and pd.notna(value) and value > max_val: alerts.append({ "timestamp": timestamp, "metric": metric, "value": float(value), "status": f"上限値 {max_val} 超過", "data no.": data_no }) return pd.DataFrame(alerts, columns=["timestamp", "metric", "value", "status", "data no."]) # トラブルシューティング実行関数 def run_troubleshooting(hours: int = 24) -> str: """ 指定時間内のセンサーデータを対象に閾値チェックを行い、 異常が同時に複数指標で発生した場合に対応策を返す。 Args: hours (int, optional): 過去何時間分のデータをチェックするか。デフォルトは24。 Returns: str: トラブルシューティング情報のテキスト。 - 異常がない場合: 「過去◯時間 異常ありません」 - 閾値超過がある場合: タイムスタンプと状況・解決策の一覧 - エラー時: エラーメッセージ """ try: current_time_utc = datetime.datetime.now(datetime.timezone.utc) time_start_utc = current_time_utc - datetime.timedelta(hours=hours) global sensor_df, threshold_df, troubleshooting_df recent_sensor_df = sensor_df[ (sensor_df['datetime'] >= time_start_utc) & (sensor_df['datetime'] <= current_time_utc) ].copy() alerts_df = check_thresholds(recent_sensor_df, threshold_df) if alerts_df.empty: return f"過去{hours}時間 異常ありません" grouped_alerts = alerts_df.groupby('timestamp')['data no.'].nunique() multiple_data_nos_timestamps = grouped_alerts[grouped_alerts > 1].index.tolist() filtered_alerts_df = alerts_df[alerts_df['timestamp'].isin(multiple_data_nos_timestamps)] if filtered_alerts_df.empty: return f"過去{hours}時間 異常ありません(複数指標の同時異常なし)" data_nos_by_timestamp = filtered_alerts_df.groupby('timestamp')['data no.'].unique().apply(list) result_list: List[Dict[str, Union[str, datetime.datetime]]] = [] for timestamp, data_nos in data_nos_by_timestamp.items(): data_nos_str = ', '.join(map(str, data_nos)) result_list.append({"timestamp": timestamp, "data_nos": data_nos_str}) result_df = pd.DataFrame(result_list, columns=["timestamp", "data_nos"]) JST = pytz.timezone('Asia/Tokyo') result_df['timestamp'] = result_df['timestamp'].dt.tz_convert(JST) if result_df.empty: return f"過去{hours}時間 異常ありません" if '指標No.' not in troubleshooting_df.columns: return "設定テーブルに『指標No.』列が見つかりません。" troubleshooting_indicator_lists = troubleshooting_df['指標No.'].astype(str).str.split(',').apply( lambda x: [int(i) for i in x if i.strip().isdigit()] ) result_data_nos_lists = result_df['data_nos'].astype(str).str.split(', ').apply( lambda x: [int(i) for i in x if i.strip().isdigit()] ) output_text: str = "" for i, result_nos in enumerate(result_data_nos_lists): result_timestamp = result_df.loc[i, 'timestamp'] for j, troubleshooting_nos in enumerate(troubleshooting_indicator_lists): if set(troubleshooting_nos).issubset(set(result_nos)): if ('シチュエーション\n(対応が必要な状況)' in troubleshooting_df.columns and 'sub goal到達のために必要な行動\n(解決策)' in troubleshooting_df.columns): troubleshooting_situation = troubleshooting_df.loc[j, 'シチュエーション\n(対応が必要な状況)'] troubleshooting_action = troubleshooting_df.loc[j, 'sub goal到達のために必要な行動\n(解決策)'] else: troubleshooting_situation = "(シチュエーション列なし)" troubleshooting_action = "(解決策列なし)" output_text += f"Timestamp: {result_timestamp}\n" output_text += f"Trouble: {troubleshooting_situation}\n" output_text += f"Troubleshooting: {troubleshooting_action}\n" output_text += "-" * 20 + "\n" return output_text if output_text else "該当するトラブルシューティングの組み合わせはありませんでした。" except Exception as e: return f"エラーが発生しました: {type(e).__name__} - {e}" # Gradioインターフェースの設定 iface = gr.Interface( fn=run_troubleshooting, inputs=gr.Number(value=24, label="過去◯時間"), # ← デフォルトは24 outputs="text", title="トラブル原因・解決策提示", description="指定した時間数のセンサーデータから閾値チェックを行います" ) # Gradioインターフェースの起動 iface.launch(mcp_server=True)