| | import argparse |
| | import csv |
| | import gc |
| | import os |
| | from dataclasses import dataclass |
| | from typing import Dict, List, Union |
| |
|
| | import torch |
| | import torch.utils.benchmark as benchmark |
| |
|
| |
|
| | GITHUB_SHA = os.getenv("GITHUB_SHA", None) |
| | BENCHMARK_FIELDS = [ |
| | "pipeline_cls", |
| | "ckpt_id", |
| | "batch_size", |
| | "num_inference_steps", |
| | "model_cpu_offload", |
| | "run_compile", |
| | "time (secs)", |
| | "memory (gbs)", |
| | "actual_gpu_memory (gbs)", |
| | "github_sha", |
| | ] |
| |
|
| | PROMPT = "ghibli style, a fantasy landscape with castles" |
| | BASE_PATH = os.getenv("BASE_PATH", ".") |
| | TOTAL_GPU_MEMORY = float(os.getenv("TOTAL_GPU_MEMORY", torch.cuda.get_device_properties(0).total_memory / (1024**3))) |
| |
|
| | REPO_ID = "diffusers/benchmarks" |
| | FINAL_CSV_FILE = "collated_results.csv" |
| |
|
| |
|
| | @dataclass |
| | class BenchmarkInfo: |
| | time: float |
| | memory: float |
| |
|
| |
|
| | def flush(): |
| | """Wipes off memory.""" |
| | gc.collect() |
| | torch.cuda.empty_cache() |
| | torch.cuda.reset_max_memory_allocated() |
| | torch.cuda.reset_peak_memory_stats() |
| |
|
| |
|
| | def bytes_to_giga_bytes(bytes): |
| | return f"{(bytes / 1024 / 1024 / 1024):.3f}" |
| |
|
| |
|
| | def benchmark_fn(f, *args, **kwargs): |
| | t0 = benchmark.Timer( |
| | stmt="f(*args, **kwargs)", |
| | globals={"args": args, "kwargs": kwargs, "f": f}, |
| | num_threads=torch.get_num_threads(), |
| | ) |
| | return f"{(t0.blocked_autorange().mean):.3f}" |
| |
|
| |
|
| | def generate_csv_dict( |
| | pipeline_cls: str, ckpt: str, args: argparse.Namespace, benchmark_info: BenchmarkInfo |
| | ) -> Dict[str, Union[str, bool, float]]: |
| | """Packs benchmarking data into a dictionary for latter serialization.""" |
| | data_dict = { |
| | "pipeline_cls": pipeline_cls, |
| | "ckpt_id": ckpt, |
| | "batch_size": args.batch_size, |
| | "num_inference_steps": args.num_inference_steps, |
| | "model_cpu_offload": args.model_cpu_offload, |
| | "run_compile": args.run_compile, |
| | "time (secs)": benchmark_info.time, |
| | "memory (gbs)": benchmark_info.memory, |
| | "actual_gpu_memory (gbs)": f"{(TOTAL_GPU_MEMORY):.3f}", |
| | "github_sha": GITHUB_SHA, |
| | } |
| | return data_dict |
| |
|
| |
|
| | def write_to_csv(file_name: str, data_dict: Dict[str, Union[str, bool, float]]): |
| | """Serializes a dictionary into a CSV file.""" |
| | with open(file_name, mode="w", newline="") as csvfile: |
| | writer = csv.DictWriter(csvfile, fieldnames=BENCHMARK_FIELDS) |
| | writer.writeheader() |
| | writer.writerow(data_dict) |
| |
|
| |
|
| | def collate_csv(input_files: List[str], output_file: str): |
| | """Collates multiple identically structured CSVs into a single CSV file.""" |
| | with open(output_file, mode="w", newline="") as outfile: |
| | writer = csv.DictWriter(outfile, fieldnames=BENCHMARK_FIELDS) |
| | writer.writeheader() |
| |
|
| | for file in input_files: |
| | with open(file, mode="r") as infile: |
| | reader = csv.DictReader(infile) |
| | for row in reader: |
| | writer.writerow(row) |
| |
|