Spaces:
Sleeping
Sleeping
| import argparse | |
| import code | |
| import datetime | |
| import json | |
| import os | |
| import oss2 | |
| from pytz import timezone | |
| import time | |
| import pandas as pd # pandas>=2.0.3 | |
| import plotly.express as px | |
| import plotly.graph_objects as go | |
| from tqdm import tqdm | |
| NUM_SERVERS = 1 | |
| LOG_ROOT_DIR = os.getenv("LOG_ROOT_DIR", "") | |
| OSS_ACCESS_KEY_ID = os.getenv("OSS_ACCESS_KEY_ID", "") | |
| OSS_ACCESS_KEY_SECRET = os.getenv("OSS_ACCESS_KEY_SECRET", "") | |
| OSS_ENDPOINT = os.getenv("OSS_ENDPOINT", "") | |
| OSS_BUCKET_NAME = os.getenv("OSS_BUCKET_NAME", "") | |
| OSS_FILE_PREFIX = "logs/vote_log/" | |
| auth = oss2.Auth(OSS_ACCESS_KEY_ID, OSS_ACCESS_KEY_SECRET) | |
| bucket = oss2.Bucket(auth, OSS_ENDPOINT, OSS_BUCKET_NAME) | |
| def get_log_files(bucket, max_num_files=None): | |
| """ | |
| Fetch log file paths from OSS, sorted by last modified timestamp. | |
| :param bucket: oss2.Bucket instance | |
| :param max_num_files: Maximum number of files to return | |
| :return: List of log file paths (sorted by timestamp) | |
| """ | |
| # List objects in the OSS bucket with the specified prefix | |
| filenames = [] | |
| for obj in oss2.ObjectIterator(bucket, prefix=OSS_FILE_PREFIX): | |
| if obj.key.endswith("-conv.json"): # Filter log files by extension | |
| filenames.append((obj.key, obj.last_modified)) | |
| # Sort filenames by the last modified timestamp | |
| filenames = sorted(filenames, key=lambda x: x[1]) | |
| # Extract only the file paths (keys) | |
| filenames = [x[0] for x in filenames] | |
| # Apply the max_num_files limit if specified | |
| max_num_files = max_num_files or len(filenames) | |
| filenames = filenames[-max_num_files:] | |
| return filenames | |
| def load_log_files(filename): | |
| data = [] | |
| for retry in range(5): | |
| try: | |
| lines = open(filename).readlines() | |
| break | |
| except FileNotFoundError: | |
| time.sleep(2) | |
| for l in lines: | |
| row = json.loads(l) | |
| data.append( | |
| dict( | |
| type=row["type"], | |
| tstamp=row["tstamp"], | |
| model=row.get("model", ""), | |
| models=row.get("models", ["", ""]), | |
| ) | |
| ) | |
| return data | |
| def load_log_files_parallel(log_files, num_threads=16): | |
| data_all = [] | |
| from multiprocessing import Pool | |
| with Pool(num_threads) as p: | |
| ret_all = list(tqdm(p.imap(load_log_files, log_files), total=len(log_files))) | |
| for ret in ret_all: | |
| data_all.extend(ret) | |
| return data_all | |
| def load_log_files_from_oss(bucket, filename): | |
| """ | |
| Load log data from a file stored in OSS. | |
| :param bucket: oss2.Bucket instance | |
| :param filename: Path to the file in OSS | |
| :return: Parsed log data as a list of dictionaries | |
| """ | |
| data = [] | |
| for retry in range(5): | |
| try: | |
| # Read the file from OSS | |
| result = bucket.get_object(filename) | |
| lines = result.read().decode('utf-8').splitlines() # Read file content and split into lines | |
| break | |
| except oss2.exceptions.NoSuchKey: | |
| print(f"File not found in OSS: {filename}, retrying ({retry + 1}/5)...") | |
| time.sleep(2) | |
| except Exception as e: | |
| print(f"Error reading file {filename} from OSS: {e}") | |
| time.sleep(2) | |
| for line in lines: | |
| row = json.loads(line) | |
| data.append( | |
| dict( | |
| type=row["type"], | |
| tstamp=row["tstamp"], | |
| model=row.get("model", ""), | |
| models=row.get("models", ["", ""]), | |
| ) | |
| ) | |
| return data | |
| def load_log_files_parallel_from_oss(bucket, log_files, num_threads=16): | |
| """ | |
| Load log files from OSS in parallel using multiple threads. | |
| :param bucket: oss2.Bucket instance | |
| :param log_files: List of log file paths in OSS | |
| :param num_threads: Number of threads to use for parallel loading | |
| :return: Combined log data from all files | |
| """ | |
| data_all = [] | |
| from multiprocessing import Pool | |
| from functools import partial | |
| # Partial function to include the bucket in the function arguments | |
| load_function = partial(load_log_files_from_oss, bucket) | |
| # Parallel processing using multiple threads | |
| with Pool(num_threads) as p: | |
| ret_all = list(tqdm(p.imap(load_function, log_files), total=len(log_files))) | |
| for ret in ret_all: | |
| data_all.extend(ret) | |
| return data_all | |
| def get_anony_vote_df(df): | |
| anony_vote_df = df[ | |
| df["type"].isin(["leftvote", "rightvote", "tievote", "bothbad_vote"]) | |
| ] | |
| anony_vote_df = anony_vote_df[anony_vote_df["models"].apply(lambda x: x[0] == "")] | |
| return anony_vote_df | |
| def merge_counts(series, on, names): | |
| ret = pd.merge(series[0], series[1], on=on) | |
| for i in range(2, len(series)): | |
| ret = pd.merge(ret, series[i], on=on) | |
| ret = ret.reset_index() | |
| old_names = list(ret.columns)[-len(series) :] | |
| rename = {old_name: new_name for old_name, new_name in zip(old_names, names)} | |
| ret = ret.rename(columns=rename) | |
| return ret | |
| def report_basic_stats(bucket, log_files): | |
| df_all = load_log_files_parallel_from_oss(bucket, log_files) | |
| df_all = pd.DataFrame(df_all) | |
| now_t = df_all["tstamp"].max() | |
| df_1_hour = df_all[df_all["tstamp"] > (now_t - 3600)] | |
| df_1_day = df_all[df_all["tstamp"] > (now_t - 3600 * 24)] | |
| anony_vote_df_all = get_anony_vote_df(df_all) | |
| # Chat trends | |
| chat_dates = [ | |
| datetime.datetime.fromtimestamp(x, tz=timezone("US/Pacific")).strftime( | |
| "%Y-%m-%d" | |
| ) | |
| for x in df_all[df_all["type"] == "chat"]["tstamp"] | |
| ] | |
| chat_dates_counts = pd.value_counts(chat_dates) | |
| vote_dates = [ | |
| datetime.datetime.fromtimestamp(x, tz=timezone("US/Pacific")).strftime( | |
| "%Y-%m-%d" | |
| ) | |
| for x in anony_vote_df_all["tstamp"] | |
| ] | |
| vote_dates_counts = pd.value_counts(vote_dates) | |
| chat_dates_bar = go.Figure( | |
| data=[ | |
| go.Bar( | |
| name="Anony. Vote", | |
| x=vote_dates_counts.index, | |
| y=vote_dates_counts, | |
| text=[f"{val:.0f}" for val in vote_dates_counts], | |
| textposition="auto", | |
| ), | |
| go.Bar( | |
| name="Chat", | |
| x=chat_dates_counts.index, | |
| y=chat_dates_counts, | |
| text=[f"{val:.0f}" for val in chat_dates_counts], | |
| textposition="auto", | |
| ), | |
| ] | |
| ) | |
| chat_dates_bar.update_layout( | |
| barmode="stack", | |
| xaxis_title="Dates", | |
| yaxis_title="Count", | |
| height=300, | |
| width=1200, | |
| ) | |
| # Model call counts | |
| model_hist_all = df_all[df_all["type"] == "chat"]["model"].value_counts() | |
| model_hist_1_day = df_1_day[df_1_day["type"] == "chat"]["model"].value_counts() | |
| model_hist_1_hour = df_1_hour[df_1_hour["type"] == "chat"]["model"].value_counts() | |
| model_hist = merge_counts( | |
| [model_hist_all, model_hist_1_day, model_hist_1_hour], | |
| on="model", | |
| names=["All", "Last Day", "Last Hour"], | |
| ) | |
| model_hist_md = model_hist.to_markdown(index=False, tablefmt="github") | |
| # Action counts | |
| action_hist_all = df_all["type"].value_counts() | |
| action_hist_1_day = df_1_day["type"].value_counts() | |
| action_hist_1_hour = df_1_hour["type"].value_counts() | |
| action_hist = merge_counts( | |
| [action_hist_all, action_hist_1_day, action_hist_1_hour], | |
| on="type", | |
| names=["All", "Last Day", "Last Hour"], | |
| ) | |
| action_hist_md = action_hist.to_markdown(index=False, tablefmt="github") | |
| # Anony vote counts | |
| anony_vote_hist_all = anony_vote_df_all["type"].value_counts() | |
| anony_vote_df_1_day = get_anony_vote_df(df_1_day) | |
| anony_vote_hist_1_day = anony_vote_df_1_day["type"].value_counts() | |
| # anony_vote_df_1_hour = get_anony_vote_df(df_1_hour) | |
| # anony_vote_hist_1_hour = anony_vote_df_1_hour["type"].value_counts() | |
| anony_vote_hist = merge_counts( | |
| [anony_vote_hist_all, anony_vote_hist_1_day], | |
| on="type", | |
| names=["All", "Last Day"], | |
| ) | |
| anony_vote_hist_md = anony_vote_hist.to_markdown(index=False, tablefmt="github") | |
| # Last 24 hours | |
| chat_1_day = df_1_day[df_1_day["type"] == "chat"] | |
| num_chats_last_24_hours = [] | |
| base = df_1_day["tstamp"].min() | |
| for i in range(24, 0, -1): | |
| left = base + (i - 1) * 3600 | |
| right = base + i * 3600 | |
| num = ((chat_1_day["tstamp"] >= left) & (chat_1_day["tstamp"] < right)).sum() | |
| num_chats_last_24_hours.append(num) | |
| times = [ | |
| datetime.datetime.fromtimestamp( | |
| base + i * 3600, tz=timezone("US/Pacific") | |
| ).strftime("%Y-%m-%d %H:%M:%S %Z") | |
| for i in range(24, 0, -1) | |
| ] | |
| last_24_hours_df = pd.DataFrame({"time": times, "value": num_chats_last_24_hours}) | |
| last_24_hours_md = last_24_hours_df.to_markdown(index=False, tablefmt="github") | |
| # Last update datetime | |
| last_updated_tstamp = now_t | |
| last_updated_datetime = datetime.datetime.fromtimestamp( | |
| last_updated_tstamp, tz=timezone("US/Pacific") | |
| ).strftime("%Y-%m-%d %H:%M:%S %Z") | |
| # code.interact(local=locals()) | |
| return { | |
| "chat_dates_bar": chat_dates_bar, | |
| "model_hist_md": model_hist_md, | |
| "action_hist_md": action_hist_md, | |
| "anony_vote_hist_md": anony_vote_hist_md, | |
| "num_chats_last_24_hours": last_24_hours_md, | |
| "last_updated_datetime": last_updated_datetime, | |
| } | |
| if __name__ == "__main__": | |
| parser = argparse.ArgumentParser() | |
| parser.add_argument("--max-num-files", type=int) | |
| args = parser.parse_args() | |
| log_files = get_log_files(bucket, args.max_num_files) | |
| basic_stats = report_basic_stats(bucket, log_files) | |
| print(basic_stats["action_hist_md"] + "\n") | |
| print(basic_stats["model_hist_md"] + "\n") | |
| print(basic_stats["anony_vote_hist_md"] + "\n") | |
| print(basic_stats["num_chats_last_24_hours"] + "\n") | |