3d_model / ylff /utils /training_profiler.py
Azan
Clean deployment build (Squashed)
7a87926
"""
Training profiler utilities for identifying bottlenecks.
Uses PyTorch profiler to analyze training performance.
"""
import logging
from pathlib import Path
from typing import Any, Dict, Optional
import torch
from torch.profiler import (
ProfilerActivity,
profile,
record_function,
schedule,
tensorboard_trace_handler,
)
logger = logging.getLogger(__name__)
class TrainingProfiler:
"""
Profiler for training loops.
Identifies bottlenecks in forward pass, backward pass, and data loading.
"""
def __init__(
self,
output_dir: Optional[Path] = None,
activities: Optional[list] = None,
record_shapes: bool = True,
profile_memory: bool = True,
with_stack: bool = False,
):
"""
Args:
output_dir: Directory to save profiling results
activities: Activities to profile (default: CUDA + CPU)
record_shapes: Record tensor shapes
profile_memory: Profile memory usage
with_stack: Record stack traces
"""
self.output_dir = Path(output_dir) if output_dir else None
if self.output_dir:
self.output_dir.mkdir(parents=True, exist_ok=True)
if activities is None:
activities = [ProfilerActivity.CUDA, ProfilerActivity.CPU]
self.activities = activities
self.record_shapes = record_shapes
self.profile_memory = profile_memory
self.with_stack = with_stack
self.profiler = None
self.trace_handler = None
if self.output_dir:
self.trace_handler = tensorboard_trace_handler(str(self.output_dir))
def start(self):
"""Start profiling."""
schedule_fn = schedule(
wait=1, # Wait 1 step before profiling
warmup=1, # Warmup for 1 step
active=3, # Profile for 3 steps
repeat=2, # Repeat 2 times
)
self.profiler = profile(
activities=self.activities,
schedule=schedule_fn,
record_shapes=self.record_shapes,
profile_memory=self.profile_memory,
with_stack=self.with_stack,
on_trace_ready=self.trace_handler,
)
self.profiler.start()
logger.info("Profiling started")
def stop(self):
"""Stop profiling and generate report."""
if self.profiler is None:
return
self.profiler.stop()
# Generate summary
if self.output_dir:
summary_path = self.output_dir / "profiler_summary.txt"
with open(summary_path, "w") as f:
f.write(
self.profiler.key_averages().table(
sort_by=(
"cuda_time_total" if torch.cuda.is_available() else "cpu_time_total"
),
row_limit=100,
)
)
logger.info(f"Profiler summary saved to {summary_path}")
logger.info("Profiling stopped")
def step(self):
"""Step profiler (call at each training step)."""
if self.profiler:
self.profiler.step()
def __enter__(self):
self.start()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.stop()
def profile_training_step(
model: torch.nn.Module,
loss_fn: callable,
optimizer: torch.optim.Optimizer,
sample_batch: Dict,
device: str = "cuda",
output_dir: Optional[Path] = None,
) -> Dict[str, Any]:
"""
Profile a single training step.
Args:
model: Model to profile
loss_fn: Loss function
optimizer: Optimizer
sample_batch: Sample batch of data
device: Device to run on
output_dir: Directory to save results
Returns:
Dict with profiling results
"""
activities = [ProfilerActivity.CPU]
if device == "cuda" and torch.cuda.is_available():
activities.append(ProfilerActivity.CUDA)
with profile(
activities=activities,
record_shapes=True,
profile_memory=True,
with_stack=True,
) as prof:
with record_function("forward"):
# Forward pass
output = model(sample_batch["images"].to(device))
loss = loss_fn(output, sample_batch["targets"].to(device))
with record_function("backward"):
# Backward pass
loss.backward()
with record_function("optimizer_step"):
# Optimizer step
optimizer.step()
optimizer.zero_grad()
# Get results
results = {
"forward_time_ms": 0,
"backward_time_ms": 0,
"optimizer_time_ms": 0,
"total_time_ms": 0,
"memory_allocated_mb": 0,
"memory_reserved_mb": 0,
}
# Parse profiler output
key_averages = prof.key_averages()
for event in key_averages:
if "forward" in event.key:
results["forward_time_ms"] += (
event.cuda_time_total if device == "cuda" else event.cpu_time_total
)
elif "backward" in event.key:
results["backward_time_ms"] += (
event.cuda_time_total if device == "cuda" else event.cpu_time_total
)
elif "optimizer" in event.key:
results["optimizer_time_ms"] += (
event.cuda_time_total if device == "cuda" else event.cpu_time_total
)
# Convert to milliseconds
if device == "cuda":
results["forward_time_ms"] /= 1000
results["backward_time_ms"] /= 1000
results["optimizer_time_ms"] /= 1000
results["total_time_ms"] = (
results["forward_time_ms"] + results["backward_time_ms"] + results["optimizer_time_ms"]
)
# Memory stats
if device == "cuda" and torch.cuda.is_available():
results["memory_allocated_mb"] = torch.cuda.memory_allocated() / 1024 / 1024
results["memory_reserved_mb"] = torch.cuda.memory_reserved() / 1024 / 1024
# Save detailed table
if output_dir:
output_dir = Path(output_dir)
output_dir.mkdir(parents=True, exist_ok=True)
table_path = output_dir / "profiler_table.txt"
with open(table_path, "w") as f:
f.write(
prof.key_averages().table(
sort_by="cuda_time_total" if device == "cuda" else "cpu_time_total",
row_limit=50,
)
)
logger.info(f"Profiling results saved to {output_dir}")
return results
def analyze_bottlenecks(profiler_output: str) -> Dict[str, Any]:
"""
Analyze profiler output to identify bottlenecks.
Args:
profiler_output: Profiler table output as string
Returns:
Dict with bottleneck analysis
"""
lines = profiler_output.split("\n")
bottlenecks = {
"slowest_operations": [],
"memory_hotspots": [],
"recommendations": [],
}
# Parse table (simplified - in practice, use proper parsing)
for line in lines:
if "forward" in line.lower() and "backward" not in line.lower():
bottlenecks["recommendations"].append(
"Consider gradient checkpointing for forward pass"
)
if "data_loader" in line.lower() or "dataloader" in line.lower():
bottlenecks["recommendations"].append(
"Data loading may be a bottleneck - increase num_workers"
)
if "memory" in line.lower() and "high" in line.lower():
bottlenecks["recommendations"].append(
"High memory usage - consider gradient checkpointing or smaller batch size"
)
return bottlenecks