| | |
| | |
| | import contextlib |
| | import json |
| | import os |
| | import subprocess |
| |
|
| | from datetime import datetime |
| | from itertools import islice |
| | from pathlib import Path |
| | from typing import Iterable, Union |
| |
|
| | from tabulate import tabulate |
| | from typer import run |
| |
|
| |
|
| | def main( |
| | n_benchmarks: Union[int, None] = None, |
| | ): |
| | path = Path("benchmark") |
| |
|
| | folders: Iterable[Path] = path.iterdir() |
| |
|
| | if n_benchmarks: |
| | folders = islice(folders, n_benchmarks) |
| |
|
| | benchmarks = [] |
| | results = [] |
| | for bench_folder in folders: |
| | if os.path.isdir(bench_folder): |
| | print(f"Running benchmark for {bench_folder}") |
| |
|
| | log_path = bench_folder / "log.txt" |
| | log_file = open(log_path, "w") |
| | process = subprocess.Popen( |
| | [ |
| | "python", |
| | "-u", |
| | "-m", |
| | "gpt_engineer.cli.main", |
| | bench_folder, |
| | "--steps", |
| | "benchmark", |
| | ], |
| | stdout=log_file, |
| | stderr=log_file, |
| | bufsize=0, |
| | ) |
| | benchmarks.append(bench_folder) |
| | results.append((process, log_file)) |
| |
|
| | print("You can stream the log file by running:") |
| | print(f"tail -f {log_path}") |
| | print() |
| |
|
| | for bench_folder, (process, file) in zip(benchmarks, results): |
| | process.wait() |
| | file.close() |
| |
|
| | print("process", bench_folder.name, "finished with code", process.returncode) |
| | print("Running it. Original benchmark prompt:") |
| | print() |
| | with open(bench_folder / "prompt") as f: |
| | print(f.read()) |
| | print() |
| |
|
| | with contextlib.suppress(KeyboardInterrupt): |
| | subprocess.run( |
| | [ |
| | "python", |
| | "-m", |
| | "gpt_engineer.cli.main", |
| | bench_folder, |
| | "--steps", |
| | "evaluate", |
| | ], |
| | ) |
| |
|
| | generate_report(benchmarks, path) |
| |
|
| |
|
| | def generate_report(benchmarks, benchmark_path): |
| | headers = ["Benchmark", "Ran", "Works", "Perfect", "Notes"] |
| | rows = [] |
| | for bench_folder in benchmarks: |
| | memory = bench_folder / ".gpteng" / "memory" |
| | with open(memory / "review") as f: |
| | review = json.loads(f.read()) |
| | rows.append( |
| | [ |
| | bench_folder.name, |
| | to_emoji(review.get("ran", None)), |
| | to_emoji(review.get("works", None)), |
| | to_emoji(review.get("perfect", None)), |
| | review.get("comments", None), |
| | ] |
| | ) |
| | table: str = tabulate(rows, headers, tablefmt="pipe") |
| | print("\nBenchmark report:\n") |
| | print(table) |
| | print() |
| | append_to_results = ask_yes_no("Append report to the results file?") |
| | if append_to_results: |
| | results_path = benchmark_path / "RESULTS.md" |
| | current_date = datetime.now().strftime("%Y-%m-%d") |
| | insert_markdown_section(results_path, current_date, table, 2) |
| |
|
| |
|
| | def to_emoji(value: bool) -> str: |
| | return "\U00002705" if value else "\U0000274C" |
| |
|
| |
|
| | def insert_markdown_section(file_path, section_title, section_text, level): |
| | with open(file_path, "r") as file: |
| | lines = file.readlines() |
| |
|
| | header_prefix = "#" * level |
| | new_section = f"{header_prefix} {section_title}\n\n{section_text}\n\n" |
| |
|
| | |
| | line_number = -1 |
| | for i, line in enumerate(lines): |
| | if line.startswith(header_prefix): |
| | line_number = i |
| | break |
| |
|
| | if line_number != -1: |
| | lines.insert(line_number, new_section) |
| | else: |
| | print( |
| | f"Markdown file was of unexpected format. No section of level {level} found. " |
| | "Did not write results." |
| | ) |
| | return |
| |
|
| | |
| | with open(file_path, "w") as file: |
| | file.writelines(lines) |
| |
|
| |
|
| | def ask_yes_no(question: str) -> bool: |
| | while True: |
| | response = input(question + " (y/n): ").lower().strip() |
| | if response == "y": |
| | return True |
| | elif response == "n": |
| | return False |
| | else: |
| | print("Please enter either 'y' or 'n'.") |
| |
|
| |
|
| | if __name__ == "__main__": |
| | run(main) |
| |
|