|
|
|
|
|
|
|
|
|
|
|
"""Unified evaluation for MeCab (JUMANDIC) and the trained model. |
|
|
|
|
|
Evaluates both systems on the same KWDLC test data and compares results. |
|
|
""" |
|
|
|
|
|
import argparse |
|
|
import subprocess |
|
|
from pathlib import Path |
|
|
from typing import Dict, List |
|
|
|
|
|
import torch |
|
|
from tqdm import tqdm |
|
|
|
|
|
|
|
|
def parse_knp_file(knp_file: Path) -> List[Dict]: |
|
|
"""Extract gold morphemes from a KNP file.""" |
|
|
sentences = [] |
|
|
current_sentence = [] |
|
|
current_text = "" |
|
|
|
|
|
with open(knp_file, "r", encoding="utf-8") as f: |
|
|
for line in f: |
|
|
line = line.rstrip("\n") |
|
|
|
|
|
if line.startswith("#"): |
|
|
if line.startswith("# S-ID:"): |
|
|
if current_sentence: |
|
|
sentences.append({"morphemes": current_sentence, "text": current_text}) |
|
|
current_sentence = [] |
|
|
current_text = "" |
|
|
continue |
|
|
elif line == "EOS": |
|
|
if current_sentence: |
|
|
sentences.append({"morphemes": current_sentence, "text": current_text}) |
|
|
current_sentence = [] |
|
|
current_text = "" |
|
|
elif line.startswith("+") or line.startswith("*"): |
|
|
continue |
|
|
elif line: |
|
|
parts = line.split(" ") |
|
|
if len(parts) >= 4: |
|
|
surface = parts[0] |
|
|
reading = parts[1] |
|
|
pos = parts[3] |
|
|
|
|
|
current_sentence.append({"surface": surface, "reading": reading, "pos": pos}) |
|
|
current_text += surface |
|
|
|
|
|
return sentences |
|
|
|
|
|
|
|
|
def analyze_with_mecab(text: str) -> List[Dict]: |
|
|
"""Analyze text with MeCab (JUMANDIC) using a simple best-path parse.""" |
|
|
try: |
|
|
result = subprocess.run( |
|
|
["mecab", "-d", "/var/lib/mecab/dic/juman-utf8"], |
|
|
input=text, |
|
|
capture_output=True, |
|
|
text=True, |
|
|
encoding="utf-8", |
|
|
) |
|
|
|
|
|
if result.returncode != 0: |
|
|
return [] |
|
|
|
|
|
morphemes = [] |
|
|
for line in result.stdout.strip().split("\n"): |
|
|
if line == "EOS": |
|
|
break |
|
|
parts = line.split("\t") |
|
|
if len(parts) >= 2: |
|
|
surface = parts[0] |
|
|
features = parts[1].split(",") |
|
|
if len(features) >= 7: |
|
|
pos = features[0] |
|
|
|
|
|
reading = features[7] if len(features) > 7 and features[7] != "*" else "" |
|
|
|
|
|
morphemes.append({"surface": surface, "reading": reading, "pos": pos}) |
|
|
|
|
|
return morphemes |
|
|
except Exception as e: |
|
|
print(f"MeCab error: {e}") |
|
|
return [] |
|
|
|
|
|
|
|
|
def analyze_with_jumanpp(text: str) -> List[Dict]: |
|
|
"""Analyze text with JUMAN++ (optional baseline).""" |
|
|
try: |
|
|
result = subprocess.run(["jumanpp"], input=text, capture_output=True, text=True, encoding="utf-8") |
|
|
|
|
|
if result.returncode != 0: |
|
|
return [] |
|
|
|
|
|
morphemes = [] |
|
|
for line in result.stdout.strip().split("\n"): |
|
|
if line.startswith("@") or line == "EOS": |
|
|
continue |
|
|
parts = line.split(" ") |
|
|
if len(parts) >= 12: |
|
|
surface = parts[0] |
|
|
reading = parts[1] |
|
|
pos = parts[3] |
|
|
|
|
|
morphemes.append({"surface": surface, "reading": reading, "pos": pos}) |
|
|
|
|
|
return morphemes |
|
|
except Exception as e: |
|
|
print(f"JUMAN++ error: {e}") |
|
|
return [] |
|
|
|
|
|
|
|
|
def analyze_with_model(text: str, model, experiment_info) -> List[Dict]: |
|
|
"""Analyze text with the trained model.""" |
|
|
try: |
|
|
import infer |
|
|
|
|
|
results, optimal_morphemes = infer.predict_morphemes_from_text( |
|
|
text, model=model, experiment_info=experiment_info, silent=True |
|
|
) |
|
|
|
|
|
morphemes = [] |
|
|
for morph in optimal_morphemes: |
|
|
morphemes.append( |
|
|
{"surface": morph["surface"], "reading": morph.get("reading", ""), "pos": morph.get("pos", "*")} |
|
|
) |
|
|
|
|
|
return morphemes |
|
|
except Exception as e: |
|
|
print(f"Model inference error: {e}") |
|
|
return [] |
|
|
|
|
|
|
|
|
def evaluate_morphemes(gold_morphemes: List[Dict], pred_morphemes: List[Dict]) -> Dict: |
|
|
"""Compute segmentation and POS F1 between gold and predictions.""" |
|
|
gold_spans = [] |
|
|
pred_spans = [] |
|
|
|
|
|
|
|
|
pos = 0 |
|
|
for m in gold_morphemes: |
|
|
surface = m["surface"] |
|
|
end = pos + len(surface) |
|
|
gold_spans.append((pos, end, m["pos"])) |
|
|
pos = end |
|
|
|
|
|
|
|
|
pos = 0 |
|
|
for m in pred_morphemes: |
|
|
surface = m["surface"] |
|
|
end = pos + len(surface) |
|
|
pred_spans.append((pos, end, m["pos"])) |
|
|
pos = end |
|
|
|
|
|
|
|
|
gold_seg = {(s, e) for s, e, _ in gold_spans} |
|
|
pred_seg = {(s, e) for s, e, _ in pred_spans} |
|
|
|
|
|
seg_correct = len(gold_seg & pred_seg) |
|
|
seg_precision = seg_correct / len(pred_seg) if pred_seg else 0 |
|
|
seg_recall = seg_correct / len(gold_seg) if gold_seg else 0 |
|
|
seg_f1 = 2 * seg_precision * seg_recall / (seg_precision + seg_recall) if (seg_precision + seg_recall) > 0 else 0 |
|
|
|
|
|
|
|
|
gold_pos = set(gold_spans) |
|
|
pred_pos = set(pred_spans) |
|
|
|
|
|
pos_correct = len(gold_pos & pred_pos) |
|
|
pos_precision = pos_correct / len(pred_pos) if pred_pos else 0 |
|
|
pos_recall = pos_correct / len(gold_pos) if gold_pos else 0 |
|
|
pos_f1 = 2 * pos_precision * pos_recall / (pos_precision + pos_recall) if (pos_precision + pos_recall) > 0 else 0 |
|
|
|
|
|
return { |
|
|
"seg_precision": seg_precision, |
|
|
"seg_recall": seg_recall, |
|
|
"seg_f1": seg_f1, |
|
|
"pos_precision": pos_precision, |
|
|
"pos_recall": pos_recall, |
|
|
"pos_f1": pos_f1, |
|
|
} |
|
|
|
|
|
|
|
|
def main(): |
|
|
parser = argparse.ArgumentParser(description="Unified evaluation script") |
|
|
parser.add_argument("--kwdlc-dir", type=str, default="KWDLC", help="Path to KWDLC root directory") |
|
|
parser.add_argument( |
|
|
"--test-ids", type=str, default="KWDLC/id/split_for_pas/test.id", help="File containing test IDs (one per line)" |
|
|
) |
|
|
parser.add_argument( |
|
|
"--max-samples", type=int, default=None, help="Max number of samples to evaluate (default: all)" |
|
|
) |
|
|
parser.add_argument("--experiment", "-e", type=str, required=True, help="Experiment name to evaluate") |
|
|
|
|
|
args = parser.parse_args() |
|
|
|
|
|
|
|
|
test_ids = [] |
|
|
with open(args.test_ids, "r") as f: |
|
|
for line in f: |
|
|
test_ids.append(line.strip()) |
|
|
|
|
|
if args.max_samples is not None: |
|
|
test_ids = test_ids[: args.max_samples] |
|
|
|
|
|
print(f"Evaluating: {len(test_ids)} files") |
|
|
|
|
|
import infer |
|
|
|
|
|
model_info = infer.load_model(experiment_name=args.experiment) |
|
|
if model_info: |
|
|
model, experiment_info = model_info |
|
|
|
|
|
device = torch.device("cpu") |
|
|
model = model.to(device) |
|
|
experiment_info["device"] = device |
|
|
print(f"Model: {experiment_info['name']}") |
|
|
else: |
|
|
print("Failed to load model") |
|
|
model = None |
|
|
experiment_info = None |
|
|
|
|
|
mecab_results = [] |
|
|
model_results = [] |
|
|
|
|
|
print("\nStart evaluation...") |
|
|
for test_id in tqdm(test_ids, desc="evaluating"): |
|
|
|
|
|
found = False |
|
|
knp_base = Path(args.kwdlc_dir) / "knp" |
|
|
|
|
|
for subdir in knp_base.glob("w*"): |
|
|
candidate = subdir / f"{test_id}.knp" |
|
|
if candidate.exists(): |
|
|
knp_path = candidate |
|
|
found = True |
|
|
break |
|
|
|
|
|
if not found: |
|
|
continue |
|
|
|
|
|
|
|
|
gold_sentences = parse_knp_file(knp_path) |
|
|
|
|
|
for sent_data in gold_sentences: |
|
|
text = sent_data["text"] |
|
|
gold_morphemes = sent_data["morphemes"] |
|
|
|
|
|
|
|
|
pred_mecab = analyze_with_mecab(text) |
|
|
if pred_mecab: |
|
|
result = evaluate_morphemes(gold_morphemes, pred_mecab) |
|
|
mecab_results.append(result) |
|
|
|
|
|
|
|
|
if model is not None: |
|
|
pred_model = analyze_with_model(text, model, experiment_info) |
|
|
if pred_model: |
|
|
model_eval = evaluate_morphemes(gold_morphemes, pred_model) |
|
|
model_results.append(model_eval) |
|
|
|
|
|
|
|
|
print("\n" + "=" * 70) |
|
|
print("Evaluation Results (KWDLC test data)") |
|
|
print("=" * 70) |
|
|
print(f"Num evaluated: MeCab={len(mecab_results)}, Model={len(model_results)}") |
|
|
|
|
|
|
|
|
if mecab_results: |
|
|
avg_seg_f1 = sum(r["seg_f1"] for r in mecab_results) / len(mecab_results) |
|
|
avg_pos_f1 = sum(r["pos_f1"] for r in mecab_results) / len(mecab_results) |
|
|
print("\n[1] MeCab (JUMANDIC):") |
|
|
print(f" Seg F1: {avg_seg_f1:.4f}") |
|
|
print(f" POS F1: {avg_pos_f1:.4f}") |
|
|
|
|
|
|
|
|
if model_results: |
|
|
avg_seg_f1 = sum(r["seg_f1"] for r in model_results) / len(model_results) |
|
|
avg_pos_f1 = sum(r["pos_f1"] for r in model_results) / len(model_results) |
|
|
print(f"\n[2] Trained model ({experiment_info['name']}):") |
|
|
print(f" Seg F1: {avg_seg_f1:.4f}") |
|
|
print(f" POS F1: {avg_pos_f1:.4f}") |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
main() |
|
|
|