import torch from einops import rearrange from torch import Tensor from typing import Optional, List import os # ============================================================ # Flash Attention 3 Support (for Hopper GPUs: H100/H200) # ============================================================ # Default ON for ZeroGPU (H200), set USE_FA3=0 to disable _USE_FA3 = os.environ.get("USE_FA3", "1") == "1" _flash_attn_func = None if _USE_FA3: try: from kernels import get_kernel _fa3_kernel = get_kernel("kernels-community/vllm-flash-attn3") _flash_attn_func_raw = _fa3_kernel.flash_attn_func @torch.library.custom_op("flash::flash_attn_func", mutates_args=()) def _flash_attn_func( q: torch.Tensor, k: torch.Tensor, v: torch.Tensor, softmax_scale: Optional[float] = None, causal: bool = False, ) -> torch.Tensor: outputs = _flash_attn_func_raw(q, k, v, softmax_scale=softmax_scale, causal=causal) return outputs[0] @_flash_attn_func.register_fake def _(q, k, v, **kwargs): return torch.empty_like(q).contiguous() print("✓ Flash Attention 3 loaded successfully!") except Exception as e: print(f"Flash Attention 3 not available: {e}") _USE_FA3 = False def attention(q: Tensor, k: Tensor, v: Tensor, pe: Tensor) -> Tensor: q, k = apply_rope(q, k, pe) if _USE_FA3 and _flash_attn_func is not None: # FA3 expects (B, L, H, D) format q_fa3 = rearrange(q, "B H L D -> B L H D") k_fa3 = rearrange(k, "B H L D -> B L H D") v_fa3 = rearrange(v, "B H L D -> B L H D") x = _flash_attn_func(q_fa3, k_fa3, v_fa3) x = rearrange(x, "B L H D -> B L (H D)") else: # Standard PyTorch SDPA (uses FA2 if available) x = torch.nn.functional.scaled_dot_product_attention(q, k, v) x = rearrange(x, "B H L D -> B L (H D)") return x def rope(pos: Tensor, dim: int, theta: int) -> Tensor: assert dim % 2 == 0 scale = torch.arange(0, dim, 2, dtype=torch.float64, device=pos.device) / dim omega = 1.0 / (theta**scale) out = torch.einsum("...n,d->...nd", pos, omega) out = torch.stack([torch.cos(out), -torch.sin(out), torch.sin(out), torch.cos(out)], dim=-1) out = rearrange(out, "b n d (i j) -> b n d i j", i=2, j=2) return out.float() def apply_rope(xq: Tensor, xk: Tensor, freqs_cis: Tensor) -> tuple[Tensor, Tensor]: xq_ = xq.float().reshape(*xq.shape[:-1], -1, 1, 2) xk_ = xk.float().reshape(*xk.shape[:-1], -1, 1, 2) xq_out = freqs_cis[..., 0] * xq_[..., 0] + freqs_cis[..., 1] * xq_[..., 1] xk_out = freqs_cis[..., 0] * xk_[..., 0] + freqs_cis[..., 1] * xk_[..., 1] return xq_out.reshape(*xq.shape).type_as(xq), xk_out.reshape(*xk.shape).type_as(xk)