File size: 7,785 Bytes
7a87926 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 | """
Training profiler utilities for identifying bottlenecks.
Uses PyTorch profiler to analyze training performance.
"""
import logging
from pathlib import Path
from typing import Any, Dict, Optional
import torch
from torch.profiler import (
ProfilerActivity,
profile,
record_function,
schedule,
tensorboard_trace_handler,
)
logger = logging.getLogger(__name__)
class TrainingProfiler:
"""
Profiler for training loops.
Identifies bottlenecks in forward pass, backward pass, and data loading.
"""
def __init__(
self,
output_dir: Optional[Path] = None,
activities: Optional[list] = None,
record_shapes: bool = True,
profile_memory: bool = True,
with_stack: bool = False,
):
"""
Args:
output_dir: Directory to save profiling results
activities: Activities to profile (default: CUDA + CPU)
record_shapes: Record tensor shapes
profile_memory: Profile memory usage
with_stack: Record stack traces
"""
self.output_dir = Path(output_dir) if output_dir else None
if self.output_dir:
self.output_dir.mkdir(parents=True, exist_ok=True)
if activities is None:
activities = [ProfilerActivity.CUDA, ProfilerActivity.CPU]
self.activities = activities
self.record_shapes = record_shapes
self.profile_memory = profile_memory
self.with_stack = with_stack
self.profiler = None
self.trace_handler = None
if self.output_dir:
self.trace_handler = tensorboard_trace_handler(str(self.output_dir))
def start(self):
"""Start profiling."""
schedule_fn = schedule(
wait=1, # Wait 1 step before profiling
warmup=1, # Warmup for 1 step
active=3, # Profile for 3 steps
repeat=2, # Repeat 2 times
)
self.profiler = profile(
activities=self.activities,
schedule=schedule_fn,
record_shapes=self.record_shapes,
profile_memory=self.profile_memory,
with_stack=self.with_stack,
on_trace_ready=self.trace_handler,
)
self.profiler.start()
logger.info("Profiling started")
def stop(self):
"""Stop profiling and generate report."""
if self.profiler is None:
return
self.profiler.stop()
# Generate summary
if self.output_dir:
summary_path = self.output_dir / "profiler_summary.txt"
with open(summary_path, "w") as f:
f.write(
self.profiler.key_averages().table(
sort_by=(
"cuda_time_total" if torch.cuda.is_available() else "cpu_time_total"
),
row_limit=100,
)
)
logger.info(f"Profiler summary saved to {summary_path}")
logger.info("Profiling stopped")
def step(self):
"""Step profiler (call at each training step)."""
if self.profiler:
self.profiler.step()
def __enter__(self):
self.start()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.stop()
def profile_training_step(
model: torch.nn.Module,
loss_fn: callable,
optimizer: torch.optim.Optimizer,
sample_batch: Dict,
device: str = "cuda",
output_dir: Optional[Path] = None,
) -> Dict[str, Any]:
"""
Profile a single training step.
Args:
model: Model to profile
loss_fn: Loss function
optimizer: Optimizer
sample_batch: Sample batch of data
device: Device to run on
output_dir: Directory to save results
Returns:
Dict with profiling results
"""
activities = [ProfilerActivity.CPU]
if device == "cuda" and torch.cuda.is_available():
activities.append(ProfilerActivity.CUDA)
with profile(
activities=activities,
record_shapes=True,
profile_memory=True,
with_stack=True,
) as prof:
with record_function("forward"):
# Forward pass
output = model(sample_batch["images"].to(device))
loss = loss_fn(output, sample_batch["targets"].to(device))
with record_function("backward"):
# Backward pass
loss.backward()
with record_function("optimizer_step"):
# Optimizer step
optimizer.step()
optimizer.zero_grad()
# Get results
results = {
"forward_time_ms": 0,
"backward_time_ms": 0,
"optimizer_time_ms": 0,
"total_time_ms": 0,
"memory_allocated_mb": 0,
"memory_reserved_mb": 0,
}
# Parse profiler output
key_averages = prof.key_averages()
for event in key_averages:
if "forward" in event.key:
results["forward_time_ms"] += (
event.cuda_time_total if device == "cuda" else event.cpu_time_total
)
elif "backward" in event.key:
results["backward_time_ms"] += (
event.cuda_time_total if device == "cuda" else event.cpu_time_total
)
elif "optimizer" in event.key:
results["optimizer_time_ms"] += (
event.cuda_time_total if device == "cuda" else event.cpu_time_total
)
# Convert to milliseconds
if device == "cuda":
results["forward_time_ms"] /= 1000
results["backward_time_ms"] /= 1000
results["optimizer_time_ms"] /= 1000
results["total_time_ms"] = (
results["forward_time_ms"] + results["backward_time_ms"] + results["optimizer_time_ms"]
)
# Memory stats
if device == "cuda" and torch.cuda.is_available():
results["memory_allocated_mb"] = torch.cuda.memory_allocated() / 1024 / 1024
results["memory_reserved_mb"] = torch.cuda.memory_reserved() / 1024 / 1024
# Save detailed table
if output_dir:
output_dir = Path(output_dir)
output_dir.mkdir(parents=True, exist_ok=True)
table_path = output_dir / "profiler_table.txt"
with open(table_path, "w") as f:
f.write(
prof.key_averages().table(
sort_by="cuda_time_total" if device == "cuda" else "cpu_time_total",
row_limit=50,
)
)
logger.info(f"Profiling results saved to {output_dir}")
return results
def analyze_bottlenecks(profiler_output: str) -> Dict[str, Any]:
"""
Analyze profiler output to identify bottlenecks.
Args:
profiler_output: Profiler table output as string
Returns:
Dict with bottleneck analysis
"""
lines = profiler_output.split("\n")
bottlenecks = {
"slowest_operations": [],
"memory_hotspots": [],
"recommendations": [],
}
# Parse table (simplified - in practice, use proper parsing)
for line in lines:
if "forward" in line.lower() and "backward" not in line.lower():
bottlenecks["recommendations"].append(
"Consider gradient checkpointing for forward pass"
)
if "data_loader" in line.lower() or "dataloader" in line.lower():
bottlenecks["recommendations"].append(
"Data loading may be a bottleneck - increase num_workers"
)
if "memory" in line.lower() and "high" in line.lower():
bottlenecks["recommendations"].append(
"High memory usage - consider gradient checkpointing or smaller batch size"
)
return bottlenecks
|