Spaces:
Running
Running
| """ | |
| StructEval-T Analyzer | |
| 松尾研LLM講義2025 メインコンペ用 推論結果分析ツール | |
| """ | |
| import json | |
| import csv | |
| import io | |
| import re | |
| import traceback | |
| from collections import Counter, defaultdict | |
| from pathlib import Path | |
| import gradio as gr | |
| import pandas as pd | |
| # --------------------------------------------------------------------------- | |
| # 1. Syntax Validators | |
| # --------------------------------------------------------------------------- | |
| def validate_json(text): | |
| try: | |
| json.loads(text) | |
| return True, "" | |
| except json.JSONDecodeError as e: | |
| return False, f"JSONDecodeError: {e.msg} (line {e.lineno})" | |
| def validate_yaml(text): | |
| try: | |
| import yaml | |
| yaml.safe_load(text) | |
| return True, "" | |
| except Exception as e: | |
| return False, f"YAMLError: {e}" | |
| def validate_toml(text): | |
| try: | |
| import tomllib | |
| tomllib.loads(text) | |
| return True, "" | |
| except Exception as e: | |
| return False, f"TOMLError: {e}" | |
| def validate_xml(text): | |
| try: | |
| import xml.etree.ElementTree as ET | |
| ET.fromstring(text) | |
| return True, "" | |
| except Exception as e: | |
| return False, f"XMLError: {e}" | |
| def validate_csv(text): | |
| try: | |
| reader = csv.reader(io.StringIO(text)) | |
| rows = list(reader) | |
| if len(rows) == 0: | |
| return False, "Empty CSV" | |
| if len(rows) == 1: | |
| return False, "Only header" | |
| col_counts = [len(row) for row in rows] | |
| if len(set(col_counts)) > 1: | |
| return False, f"Inconsistent cols: {set(col_counts)}" | |
| return True, "" | |
| except Exception as e: | |
| return False, f"CSVError: {e}" | |
| VALIDATORS = { | |
| "JSON": validate_json, | |
| "YAML": validate_yaml, | |
| "TOML": validate_toml, | |
| "XML": validate_xml, | |
| "CSV": validate_csv, | |
| } | |
| # --------------------------------------------------------------------------- | |
| # 2. Error Pattern Classifier | |
| # --------------------------------------------------------------------------- | |
| def classify_error_patterns(generation, output_type): | |
| patterns = [] | |
| if re.search(r"```\w*", generation): | |
| patterns.append("markdown_block") | |
| first_line = generation.strip().split("\n")[0] if generation.strip() else "" | |
| nl_indicators = ["here is", "here's", "below is", "sure", "certainly", "let me"] | |
| if any(ind in first_line.lower() for ind in nl_indicators): | |
| patterns.append("natural_language_prefix") | |
| last_lines = generation.strip().split("\n")[-3:] if generation.strip() else [] | |
| last_text = " ".join(last_lines).lower() | |
| if any(ind in last_text for ind in ["note:", "explanation:", "this ", "the above"]): | |
| patterns.append("natural_language_suffix") | |
| if output_type == "JSON": | |
| if generation.count("{") + generation.count("[") > generation.count("}") + generation.count("]"): | |
| patterns.append("truncation") | |
| elif output_type == "XML": | |
| open_tags = len(re.findall(r"<[^/!?][^>]*>", generation)) | |
| close_tags = len(re.findall(r"</[^>]+>", generation)) | |
| if open_tags > close_tags + 1: | |
| patterns.append("truncation") | |
| if not generation.strip(): | |
| patterns.append("empty_output") | |
| if re.search(r"<think>|</think>", generation): | |
| patterns.append("cot_leakage") | |
| if re.search(r"<tool_call>", generation): | |
| patterns.append("tool_call_leakage") | |
| return patterns if patterns else ["unknown"] | |
| # --------------------------------------------------------------------------- | |
| # 3. Core Analysis | |
| # --------------------------------------------------------------------------- | |
| def load_public_150(file_path): | |
| with open(file_path, "r", encoding="utf-8") as f: | |
| data = json.load(f) | |
| return {item["task_id"]: item for item in data} | |
| def analyze_single_inference(inference_data, task_info): | |
| results = [] | |
| for item in inference_data: | |
| task_id = item.get("task_id", "") | |
| generation = item.get("generation", "") | |
| info = task_info.get(task_id, {}) | |
| output_type = info.get("output_type", "UNKNOWN") | |
| task_name = info.get("task_name", "UNKNOWN") | |
| validator = VALIDATORS.get(output_type) | |
| if validator: | |
| is_valid, error_msg = validator(generation) | |
| else: | |
| is_valid, error_msg = False, f"Unknown format: {output_type}" | |
| if not is_valid: | |
| error_patterns = classify_error_patterns(generation, output_type) | |
| else: | |
| error_patterns = [] | |
| results.append({ | |
| "task_id": task_id, | |
| "task_name": task_name, | |
| "output_type": output_type, | |
| "is_valid": is_valid, | |
| "error_msg": error_msg, | |
| "error_patterns": ",".join(error_patterns) if error_patterns else "", | |
| "generation_length": len(generation), | |
| "generation_preview": generation[:200], | |
| }) | |
| return pd.DataFrame(results) | |
| def compute_summary(df): | |
| total = len(df) | |
| valid = int(df["is_valid"].sum()) | |
| summary = { | |
| "total_tasks": total, | |
| "parse_success": valid, | |
| "parse_fail": total - valid, | |
| "parse_rate": f"{valid / total * 100:.1f}%" if total > 0 else "N/A", | |
| } | |
| format_stats = {} | |
| for fmt in ["JSON", "YAML", "TOML", "XML", "CSV"]: | |
| fmt_df = df[df["output_type"] == fmt] | |
| fmt_total = len(fmt_df) | |
| fmt_valid = int(fmt_df["is_valid"].sum()) | |
| format_stats[fmt] = { | |
| "total": fmt_total, | |
| "success": fmt_valid, | |
| "fail": fmt_total - fmt_valid, | |
| "rate": f"{fmt_valid / fmt_total * 100:.1f}%" if fmt_total > 0 else "N/A", | |
| } | |
| summary["by_format"] = format_stats | |
| all_patterns = [] | |
| for patterns_str in df[df["is_valid"] == False]["error_patterns"]: | |
| if patterns_str: | |
| all_patterns.extend(patterns_str.split(",")) | |
| summary["error_pattern_counts"] = dict(Counter(all_patterns).most_common()) | |
| return summary | |
| # --------------------------------------------------------------------------- | |
| # 4. Main Processing | |
| # --------------------------------------------------------------------------- | |
| def process_files(public_150_file, inference_files): | |
| if public_150_file is None: | |
| return "❌ public_150.json をアップロードしてください", "", "" | |
| if not inference_files: | |
| return "❌ inference.json を1つ以上アップロードしてください", "", "" | |
| try: | |
| pub_path = public_150_file if isinstance(public_150_file, str) else public_150_file.name | |
| task_info = load_public_150(pub_path) | |
| all_results = {} | |
| all_summaries = {} | |
| for inf_file in inference_files: | |
| inf_path = inf_file if isinstance(inf_file, str) else inf_file.name | |
| filename = Path(inf_path).stem | |
| with open(inf_path, "r", encoding="utf-8") as f: | |
| inference_data = json.load(f) | |
| df = analyze_single_inference(inference_data, task_info) | |
| summary = compute_summary(df) | |
| all_results[filename] = df | |
| all_summaries[filename] = summary | |
| # --- Output 1: Summary --- | |
| summary_text = "## 📊 分析結果サマリー\n\n" | |
| for name, s in all_summaries.items(): | |
| summary_text += f"### {name}\n" | |
| summary_text += f"- パース成功: {s['parse_success']}/{s['total_tasks']} ({s['parse_rate']})\n" | |
| summary_text += f"- フォーマット別:\n" | |
| for fmt, fs in s["by_format"].items(): | |
| summary_text += f" - {fmt}: {fs['success']}/{fs['total']} ({fs['rate']})\n" | |
| if s["error_pattern_counts"]: | |
| summary_text += f"- エラーパターン:\n" | |
| for pattern, count in s["error_pattern_counts"].items(): | |
| summary_text += f" - {pattern}: {count}件\n" | |
| summary_text += "\n" | |
| # --- Output 2: Comparison table as markdown --- | |
| comp_lines = ["## 📈 実験比較\n"] | |
| comp_lines.append("| experiment | total | pass | rate | JSON | YAML | TOML | XML | CSV |") | |
| comp_lines.append("|---|---|---|---|---|---|---|---|---|") | |
| for name, df in all_results.items(): | |
| total = len(df) | |
| valid = int(df["is_valid"].sum()) | |
| rate = f"{valid/total*100:.1f}%" if total > 0 else "N/A" | |
| fmt_rates = {} | |
| for fmt in ["JSON", "YAML", "TOML", "XML", "CSV"]: | |
| fmt_df = df[df["output_type"] == fmt] | |
| ft = len(fmt_df) | |
| fv = int(fmt_df["is_valid"].sum()) | |
| fmt_rates[fmt] = f"{fv/ft*100:.1f}%" if ft > 0 else "N/A" | |
| comp_lines.append(f"| {name} | {total} | {valid} | {rate} | {fmt_rates['JSON']} | {fmt_rates['YAML']} | {fmt_rates['TOML']} | {fmt_rates['XML']} | {fmt_rates['CSV']} |") | |
| comparison_md = "\n".join(comp_lines) | |
| # --- Output 3: Error details as markdown --- | |
| first_name = list(all_results.keys())[0] | |
| first_df = all_results[first_name] | |
| error_df = first_df[first_df["is_valid"] == False] | |
| error_lines = [f"## ❌ エラー詳細 ({first_name})\n"] | |
| error_lines.append(f"パース失敗: {len(error_df)}件\n") | |
| error_lines.append("| task_name | output_type | error_patterns | error_msg |") | |
| error_lines.append("|---|---|---|---|") | |
| for _, row in error_df.iterrows(): | |
| err_msg_short = str(row['error_msg'])[:60] | |
| error_lines.append(f"| {row['task_name']} | {row['output_type']} | {row['error_patterns']} | {err_msg_short} |") | |
| error_md = "\n".join(error_lines) | |
| return summary_text, comparison_md, error_md | |
| except Exception as e: | |
| error_trace = traceback.format_exc() | |
| return f"❌ エラー:\n```\n{error_trace}\n```", "", "" | |
| # --------------------------------------------------------------------------- | |
| # 5. Gradio App - using only Markdown outputs to avoid Dataframe bugs | |
| # --------------------------------------------------------------------------- | |
| with gr.Blocks( | |
| title="StructEval-T Analyzer", | |
| theme=gr.themes.Soft(), | |
| ) as demo: | |
| gr.Markdown( | |
| """ | |
| # 🔍 StructEval-T Analyzer | |
| ### 松尾研LLM講義2025 メインコンペ用 推論結果分析ツール | |
| `inference.json` と `public_150.json` をアップロードすることで、 | |
| モデル出力の構文的正確性(パース可能性)やエラーパターンを分析できます。 | |
| **使い方:** | |
| 1. `public_150.json` をアップロード | |
| 2. 1つ以上の `inference.json` をアップロード(複数ファイル対応・実験比較可能) | |
| 3. 「分析開始」ボタンをクリック | |
| """ | |
| ) | |
| with gr.Row(): | |
| public_file = gr.File( | |
| label="public_150.json", | |
| file_types=[".json"], | |
| type="filepath", | |
| ) | |
| inference_files = gr.File( | |
| label="inference.json(複数可)", | |
| file_types=[".json"], | |
| file_count="multiple", | |
| type="filepath", | |
| ) | |
| analyze_btn = gr.Button("🔬 分析開始", variant="primary", size="lg") | |
| with gr.Tabs(): | |
| with gr.Tab("📊 サマリー"): | |
| summary_output = gr.Markdown() | |
| with gr.Tab("📈 実験比較"): | |
| comparison_output = gr.Markdown() | |
| with gr.Tab("❌ エラー詳細"): | |
| error_output = gr.Markdown() | |
| analyze_btn.click( | |
| fn=process_files, | |
| inputs=[public_file, inference_files], | |
| outputs=[summary_output, comparison_output, error_output], | |
| ) | |
| gr.Markdown( | |
| """ | |
| --- | |
| **注意:** このツールは構文的な正確性(パース可能かどうか)のみを検証します。 | |
| 運営側の採点基準である `raw_output_metric`(特定キーの存在チェック等)は | |
| `public_150.json` から削除されているため、完全なスコア再現はできません。 | |
| **エラーパターンの凡例:** | |
| - `markdown_block`: マークダウンコードブロック(\\`\\`\\`json 等)の混入 | |
| - `natural_language_prefix`: 先頭に自然言語("Here is..."等)が混入 | |
| - `natural_language_suffix`: 末尾に自然言語("Note:"等)が混入 | |
| - `truncation`: 出力の途切れ(閉じ括弧・タグの欠落) | |
| - `empty_output`: 空の出力 | |
| - `wrong_format`: 要求と異なるフォーマットの出力 | |
| - `cot_leakage`: 思考過程(\\<think\\>等)の混入 | |
| - `tool_call_leakage`: ツールコール(\\<tool_call\\>等)の混入 | |
| - `unknown`: 上記に該当しない構文エラー | |
| """ | |
| ) | |
| if __name__ == "__main__": | |
| demo.launch() | |