| |
|
| |
|
| | import argparse
|
| | import os
|
| | import sys
|
| | import importlib
|
| | import torch
|
| | import numpy as np
|
| |
|
| | from transformers import AutoTokenizer, AutoModelForCausalLM, AutoModelForImageTextToText, AutoConfig
|
| |
|
| |
|
| | sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..'))
|
| | from utils.common import debug_hook, save_output_data
|
| |
|
| | def parse_arguments():
|
| | parser = argparse.ArgumentParser(description="Process model with specified path")
|
| | parser.add_argument("--model-path", "-m", help="Path to the model")
|
| | parser.add_argument("--prompt-file", "-f", help="Optional prompt file", required=False)
|
| | parser.add_argument("--verbose", "-v", action="store_true", help="Enable verbose debug output")
|
| | parser.add_argument("--device", "-d", help="Device to use (cpu, cuda, mps, auto)", default="auto")
|
| | return parser.parse_args()
|
| |
|
| | def load_model_and_tokenizer(model_path, device="auto"):
|
| | print("Loading model and tokenizer using AutoTokenizer:", model_path)
|
| | tokenizer = AutoTokenizer.from_pretrained(model_path, trust_remote_code=True)
|
| | config = AutoConfig.from_pretrained(model_path, trust_remote_code=True)
|
| | multimodal = False
|
| | full_config = config
|
| |
|
| |
|
| | if device == "cpu":
|
| | device_map = {"": "cpu"}
|
| | print("Forcing CPU usage")
|
| | elif device == "auto":
|
| | device_map = "auto"
|
| | else:
|
| | device_map = {"": device}
|
| |
|
| | print("Model type: ", config.model_type)
|
| | if "vocab_size" not in config and "text_config" in config:
|
| | config = config.text_config
|
| | multimodal = True
|
| |
|
| | def print_if_exists(label, obj, attr, default="N/A"):
|
| | val = getattr(obj, attr) if hasattr(obj, attr) else default
|
| | print(f"{label}", val)
|
| |
|
| | print_if_exists("Vocab size: ", config, "vocab_size")
|
| | print_if_exists("Hidden size: ", config, "hidden_size")
|
| | print_if_exists("Number of layers: ", config, "num_hidden_layers")
|
| | print_if_exists("BOS token id: ", config, "bos_token_id")
|
| | print_if_exists("EOS token id: ", config, "eos_token_id")
|
| |
|
| | unreleased_model_name = os.getenv("UNRELEASED_MODEL_NAME")
|
| | if unreleased_model_name:
|
| | model_name_lower = unreleased_model_name.lower()
|
| | unreleased_module_path = (
|
| | f"transformers.models.{model_name_lower}.modular_{model_name_lower}"
|
| | )
|
| | class_name = f"{unreleased_model_name}ForCausalLM"
|
| | print(f"Importing unreleased model module: {unreleased_module_path}")
|
| |
|
| | try:
|
| | model_class = getattr(importlib.import_module(unreleased_module_path), class_name)
|
| | model = model_class.from_pretrained(
|
| | model_path,
|
| | device_map=device_map,
|
| | offload_folder="offload",
|
| | trust_remote_code=True,
|
| | config=config
|
| | )
|
| | except (ImportError, AttributeError) as e:
|
| | print(f"Failed to import or load model: {e}")
|
| | exit(1)
|
| | else:
|
| | if multimodal:
|
| | model = AutoModelForImageTextToText.from_pretrained(
|
| | model_path,
|
| | device_map=device_map,
|
| | offload_folder="offload",
|
| | trust_remote_code=True,
|
| | config=full_config
|
| | )
|
| | else:
|
| | model = AutoModelForCausalLM.from_pretrained(
|
| | model_path,
|
| | device_map=device_map,
|
| | offload_folder="offload",
|
| | trust_remote_code=True,
|
| | config=config
|
| | )
|
| |
|
| | print(f"Model class: {model.__class__.__name__}")
|
| |
|
| | return model, tokenizer, config
|
| |
|
| | def enable_torch_debugging(model):
|
| | for name, module in model.named_modules():
|
| | if len(list(module.children())) == 0:
|
| | module.register_forward_hook(debug_hook(name))
|
| |
|
| | def get_prompt(args):
|
| | if args.prompt_file:
|
| | with open(args.prompt_file, encoding='utf-8') as f:
|
| | return f.read()
|
| | elif os.getenv("MODEL_TESTING_PROMPT"):
|
| | return os.getenv("MODEL_TESTING_PROMPT")
|
| | else:
|
| | return "Hello, my name is"
|
| |
|
| | def main():
|
| | args = parse_arguments()
|
| | model_path = os.environ.get("MODEL_PATH", args.model_path)
|
| | if model_path is None:
|
| | print("Error: Model path must be specified either via --model-path argument or MODEL_PATH environment variable")
|
| | sys.exit(1)
|
| |
|
| |
|
| | model, tokenizer, config = load_model_and_tokenizer(model_path, args.device)
|
| |
|
| | if args.verbose:
|
| | enable_torch_debugging(model)
|
| |
|
| | model_name = os.path.basename(model_path)
|
| |
|
| |
|
| |
|
| | device = next(model.parameters()).device
|
| | prompt = get_prompt(args)
|
| | input_ids = tokenizer(prompt, return_tensors="pt").input_ids.to(device)
|
| | token_ids = input_ids[0].cpu().tolist()
|
| |
|
| | print(f"Input tokens: {input_ids}")
|
| | print(f"Input text: {repr(prompt)}")
|
| | print(f"Tokenized: {tokenizer.convert_ids_to_tokens(input_ids[0])}")
|
| |
|
| | batch_size = 512
|
| |
|
| | with torch.no_grad():
|
| | past = None
|
| | outputs = None
|
| | for i in range(0, input_ids.size(1), batch_size):
|
| | print(f"Processing chunk with tokens {i} to {i + batch_size}")
|
| | chunk = input_ids[:, i:i + batch_size]
|
| | outputs = model(chunk.to(model.device), past_key_values=past, use_cache=True)
|
| | past = outputs.past_key_values
|
| |
|
| | logits = outputs.logits
|
| |
|
| |
|
| | last_logits = logits[0, -1, :].float().cpu().numpy()
|
| |
|
| | print(f"Logits shape: {logits.shape}")
|
| | print(f"Last token logits shape: {last_logits.shape}")
|
| | print(f"Vocab size: {len(last_logits)}")
|
| |
|
| |
|
| | print(f"First 10 logits: {last_logits[:10]}")
|
| | print(f"Last 10 logits: {last_logits[-10:]}")
|
| |
|
| |
|
| | top_indices = np.argsort(last_logits)[-5:][::-1]
|
| | print("Top 5 predictions:")
|
| | for idx in top_indices:
|
| | token = tokenizer.decode([idx])
|
| | print(f" Token {idx} ({repr(token)}): {last_logits[idx]:.6f}")
|
| |
|
| | save_output_data(last_logits, token_ids, prompt, model_name)
|
| |
|
| | if __name__ == "__main__":
|
| | main()
|
| |
|