| import base64 |
| import copy |
| import os |
| import sys |
| import tempfile |
| from typing import Any, Dict |
|
|
| import torch |
| from huggingface_hub import snapshot_download |
| from peft import PeftModel |
|
|
|
|
| DEFAULT_PROMPT = "Please describe the audio in detail." |
|
|
|
|
| def _log(msg: str) -> None: |
| print(f"[AF3 NVIDIA handler] {msg}", flush=True) |
|
|
|
|
| def _env_true(name: str, default: bool = False) -> bool: |
| raw = os.getenv(name) |
| if raw is None: |
| return default |
| return str(raw).strip().lower() in {"1", "true", "yes", "on"} |
|
|
|
|
| def _strip_state_dict_prefixes(state_dict: Dict[str, Any]) -> Dict[str, Any]: |
| out: Dict[str, Any] = {} |
| for key, value in state_dict.items(): |
| key2 = key[6:] if key.startswith("model.") else key |
| out[key2] = value |
| return out |
|
|
|
|
| class EndpointHandler: |
| """ |
| NVIDIA AF3 stack endpoint handler (matches Space architecture closely). |
| |
| Request: |
| { |
| "inputs": { |
| "prompt": "...", |
| "audio_base64": "...", |
| "think_mode": true, |
| "max_new_tokens": 2048, |
| "temperature": 0.2 |
| } |
| } |
| |
| Response: |
| {"generated_text": "...", "mode": "think|single"} |
| """ |
|
|
| def __init__(self, model_dir: str = ""): |
| del model_dir |
| self.hf_token = os.getenv("HF_TOKEN", "") |
| self.code_repo_id = os.getenv("AF3_NV_CODE_REPO_ID", "nvidia/audio-flamingo-3") |
| self.model_repo_id = os.getenv("AF3_NV_MODEL_REPO_ID", "nvidia/audio-flamingo-3") |
| self.code_repo_type = os.getenv("AF3_NV_CODE_REPO_TYPE", "space") |
| self.model_repo_type = os.getenv("AF3_NV_MODEL_REPO_TYPE", "model") |
| self.default_mode = os.getenv("AF3_NV_DEFAULT_MODE", "think").strip().lower() |
| if self.default_mode not in {"think", "single"}: |
| self.default_mode = "think" |
|
|
| self.load_think = _env_true("AF3_NV_LOAD_THINK", True) |
| self.load_single = _env_true("AF3_NV_LOAD_SINGLE", self.default_mode == "single") |
| self.device = "cuda" if torch.cuda.is_available() else "cpu" |
|
|
| _log(f"torch={torch.__version__} cuda={torch.cuda.is_available()} device={self.device}") |
| _log( |
| f"code_repo={self.code_repo_type}:{self.code_repo_id} " |
| f"model_repo={self.model_repo_type}:{self.model_repo_id} default_mode={self.default_mode}" |
| ) |
|
|
| self.llava = self._load_llava_runtime() |
| self.model_root = self._download_model_root() |
|
|
| self.model_single = None |
| self.model_think = None |
|
|
| if self.load_single: |
| self.model_single = self._load_single_model() |
| if self.load_think: |
| self.model_think = self._load_think_model() |
|
|
| if self.model_single is None and self.model_think is None: |
| raise RuntimeError("No model loaded. Enable AF3_NV_LOAD_THINK or AF3_NV_LOAD_SINGLE.") |
|
|
| def _load_llava_runtime(self): |
| code_root = snapshot_download( |
| repo_id=self.code_repo_id, |
| repo_type=self.code_repo_type, |
| allow_patterns=["llava/**"], |
| token=self.hf_token or None, |
| ) |
| if code_root not in sys.path: |
| sys.path.insert(0, code_root) |
| import llava |
|
|
| _log(f"Loaded llava runtime from {code_root}") |
| return llava |
|
|
| def _download_model_root(self) -> str: |
| model_root = snapshot_download( |
| repo_id=self.model_repo_id, |
| repo_type=self.model_repo_type, |
| token=self.hf_token or None, |
| ) |
| _log(f"Model root: {model_root}") |
| return model_root |
|
|
| def _load_single_model(self): |
| _log("Loading single-turn model...") |
| model = self.llava.load(self.model_root, model_base=None) |
| model = model.to(self.device) |
| model.eval() |
| return model |
|
|
| def _load_think_model(self): |
| _log("Loading think/long model (stage35 adapter)...") |
| stage35_dir = os.path.join(self.model_root, "stage35") |
| non_lora_path = os.path.join(stage35_dir, "non_lora_trainables.bin") |
| if not os.path.exists(non_lora_path): |
| raise RuntimeError(f"stage35 non_lora_trainables missing: {non_lora_path}") |
|
|
| model = self.llava.load(self.model_root, model_base=None) |
| model = model.to(self.device) |
|
|
| non_lora_trainables = torch.load(non_lora_path, map_location="cpu") |
| non_lora_trainables = _strip_state_dict_prefixes(non_lora_trainables) |
| model.load_state_dict(non_lora_trainables, strict=False) |
|
|
| dtype = torch.float16 if torch.cuda.is_available() else torch.float32 |
| model = PeftModel.from_pretrained( |
| model, |
| stage35_dir, |
| device_map="auto" if torch.cuda.is_available() else None, |
| torch_dtype=dtype, |
| ) |
| model.eval() |
| return model |
|
|
| def _select_model(self, think_mode: bool): |
| if think_mode and self.model_think is not None: |
| return self.model_think, "think" |
| if (not think_mode) and self.model_single is not None: |
| return self.model_single, "single" |
| if self.model_think is not None: |
| return self.model_think, "think" |
| return self.model_single, "single" |
|
|
| def _build_generation_config(self, model, max_new_tokens: int, temperature: float): |
| base_cfg = getattr(model, "default_generation_config", None) |
| if base_cfg is None: |
| return None |
| cfg = copy.deepcopy(base_cfg) |
| if max_new_tokens > 0: |
| setattr(cfg, "max_new_tokens", int(max_new_tokens)) |
| if temperature > 0: |
| setattr(cfg, "temperature", float(temperature)) |
| setattr(cfg, "do_sample", True) |
| else: |
| setattr(cfg, "do_sample", False) |
| return cfg |
|
|
| def __call__(self, data: Dict[str, Any]) -> Dict[str, Any]: |
| payload = data.get("inputs", data) if isinstance(data, dict) else {} |
| audio_b64 = payload.get("audio_base64") |
| if not audio_b64: |
| return {"error": "audio_base64 is required"} |
|
|
| prompt = str(payload.get("prompt", DEFAULT_PROMPT)).strip() or DEFAULT_PROMPT |
| think_mode_val = payload.get("think_mode") |
| if think_mode_val is None: |
| think_mode = self.default_mode == "think" |
| else: |
| think_mode = bool(think_mode_val) |
|
|
| max_new_tokens = int(payload.get("max_new_tokens", 2048)) |
| temperature = float(payload.get("temperature", 0.2)) |
| model, mode = self._select_model(think_mode) |
|
|
| with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as tmp: |
| tmp_path = tmp.name |
| tmp.write(base64.b64decode(audio_b64)) |
|
|
| try: |
| sound = self.llava.Sound(tmp_path) |
| full_prompt = f"<sound>\n{prompt}" |
| gen_cfg = self._build_generation_config(model, max_new_tokens=max_new_tokens, temperature=temperature) |
|
|
| with torch.inference_mode(): |
| if gen_cfg is not None: |
| response = model.generate_content([sound, full_prompt], generation_config=gen_cfg) |
| else: |
| response = model.generate_content([sound, full_prompt]) |
| return {"generated_text": str(response).strip(), "mode": mode} |
| except Exception as exc: |
| return {"error": str(exc), "mode": mode} |
| finally: |
| try: |
| os.unlink(tmp_path) |
| except Exception: |
| pass |
|
|