File size: 3,197 Bytes
aa988a7
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
from __future__ import annotations

import argparse
import ast
import json
import re
import sys
from collections import Counter
from pathlib import Path

_METRICS_DIR = Path(__file__).resolve().parent
if str(_METRICS_DIR) not in sys.path:
    sys.path.insert(0, str(_METRICS_DIR))

from broken_code_generation import EVAL_FILE, FILE_CODE, FILE_JSON_VALIDITY, MODEL_ID  # noqa: E402
from report_io import metrics_path, write_report  # noqa: E402


def parse_args() -> argparse.Namespace:
    parser = argparse.ArgumentParser(description=f"Code metrics for {MODEL_ID} only.")
    parser.add_argument(
        "--from-generation-report",
        type=Path,
        default=metrics_path(FILE_JSON_VALIDITY),
    )
    parser.add_argument("--output", type=Path, default=None)
    return parser.parse_args()


def normalize_code(code: str) -> str:
    return code.replace("\\n", "\n").replace('\\"', '"')


def is_valid_python(code: str) -> bool:
    try:
        ast.parse(normalize_code(code))
        return True
    except SyntaxError:
        return False


def code_tokens(code: str) -> Counter:
    return Counter(re.findall(r"[A-Za-z_][A-Za-z0-9_]*|\d+|[^\s]", code))


def code_token_f1(reference: str, hypothesis: str) -> float:
    ref, hyp = code_tokens(reference), code_tokens(hypothesis)
    if not ref and not hyp:
        return 1.0
    if not ref or not hyp:
        return 0.0
    overlap = sum((ref & hyp).values())
    precision = overlap / sum(hyp.values())
    recall = overlap / sum(ref.values())
    if precision + recall == 0:
        return 0.0
    return 2 * precision * recall / (precision + recall)


def main() -> None:
    args = parse_args()
    gen_path = args.from_generation_report
    if not gen_path.exists():
        raise FileNotFoundError(f"Run 02_json_validity.py first. Missing: {gen_path}")

    references = json.loads(EVAL_FILE.read_text(encoding="utf-8"))
    gen_report = json.loads(gen_path.read_text(encoding="utf-8"))

    if gen_report.get("model") != MODEL_ID:
        raise ValueError(f"Report is not for {MODEL_ID}: {gen_report.get('model')}")

    syntax_ok = 0
    code_f1_scores: list[float] = []
    total = len(gen_report.get("results", []))

    for ref, row in zip(references, gen_report.get("results", [])):
        if row.get("status") != "ok":
            continue
        gen_code = str(row["generated"].get("broken_code", ""))
        if is_valid_python(gen_code):
            syntax_ok += 1
        code_f1_scores.append(code_token_f1(str(ref.get("broken_code", "")), gen_code))

    n = max(total, 1)
    f1_mean = round(sum(code_f1_scores) / max(len(code_f1_scores), 1), 4) if code_f1_scores else None

    report = {
        "metric_group": "code_metrics",
        "model": MODEL_ID,
        "source_report": str(gen_path),
        "metrics": {
            "broken_code_syntax_valid_rate": round(syntax_ok / n, 4),
            "code_token_f1_broken_code": f1_mean,
            "codebleu_broken_code": f1_mean,
        },
    }
    write_report(args.output or metrics_path(FILE_CODE), report)


if __name__ == "__main__":
    main()