import os import pickle from flask import Flask, jsonify, request, render_template # =============================== # 基本路径配置 # =============================== PROJECT_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), "..")) RESULT_DIR = os.path.join(PROJECT_ROOT, "results") DATASET_DIR = os.path.join(PROJECT_ROOT, "datasets") os.makedirs(RESULT_DIR, exist_ok=True) os.makedirs(DATASET_DIR, exist_ok=True) # =============================== # Flask App # =============================== app = Flask(__name__) # =============================== # 内存缓存 # =============================== RESULT_CACHE = {} # {dataset_name: results} # =============================== # PKL IO 工具 # =============================== def save_result_pkl(dataset, results): path = os.path.join(RESULT_DIR, f"{dataset}.pkl") with open(path, "wb") as f: pickle.dump(results, f) def load_result_pkl(dataset): path = os.path.join(RESULT_DIR, f"{dataset}.pkl") if not os.path.exists(path): return None with open(path, "rb") as f: return pickle.load(f) def list_available_datasets(): datasets = set() for f in os.listdir(RESULT_DIR): if f.endswith(".pkl"): datasets.add(f.replace(".pkl", "")) # 默认展示数据集 datasets.add("Authorship") return sorted(datasets) # =============================== # ⚠️ 你自己的 Agent 入口 # =============================== def run_agent_for_dataset(dataset): """ ⚠️ 用你自己的 router / agent 替换这里 必须返回 List[Dict] """ # --------------------------- # 示例(占位) # --------------------------- return [ { "algorithm": "UCRFS", "num_features": 15, "mean_f1": 0.9233716475, "mean_auc": 0.9816098520, "time": 5.75, "score": 0.9408431088, }, { "algorithm": "JMIM", "num_features": 15, "mean_f1": 0.918, "mean_auc": 0.979, "time": 7.32, "score": 0.932, }, ] # =============================== # 页面 # =============================== @app.route("/") def index(): return render_template( "index.html", datasets=list_available_datasets(), default_dataset="Authorship", ) # =============================== # API:获取结果 # =============================== @app.route("/api/results") def get_results(): dataset = request.args.get("dataset", "Authorship") # ① 内存缓存 if dataset in RESULT_CACHE: rank_results = jsonify(RESULT_CACHE[dataset]) # ② 磁盘 pkl results = load_result_pkl(dataset) if results is not None: RESULT_CACHE[dataset] = results rank_results = jsonify(results) # ③ Agent 实时运行 results = run_agent_for_dataset(dataset) # ④ 存储 save_result_pkl(dataset, results) RESULT_CACHE[dataset] = results rank_results = jsonify(results) leaderboard = rank_results(rank_results) return leaderboard # @app.route("/api/results") # def get_results(): dataset = request.args.get("dataset", "Authorship") print(f"[DEBUG] request dataset = {dataset}") if dataset in RESULT_CACHE: print("[DEBUG] hit memory cache") return jsonify(RESULT_CACHE[dataset]) results = load_result_pkl(dataset) if results is not None: print("[DEBUG] hit pkl cache") RESULT_CACHE[dataset] = results return jsonify(results) print("[DEBUG] run agent") results = run_agent_for_dataset(dataset) print("[DEBUG] agent results =", results) save_result_pkl(dataset, results) RESULT_CACHE[dataset] = results return jsonify(results) # =============================== # API:数据集列表 # =============================== @app.route("/api/datasets") def api_datasets(): return jsonify(list_available_datasets()) if __name__ == "__main__": app.run(debug=True)