Eji-Sensei14's picture
Upload folder using huggingface_hub
c6535db verified
# layerstyle advance
import numpy as np
import os
import torch
from PIL import Image
from transformers import AutoModelForCausalLM, AutoProcessor, AutoTokenizer, pipeline
import folder_paths
from .imagefunc import log, clear_memory
model_path = os.path.join(folder_paths.models_dir, 'LLM')
class LS_PhiModel:
def __init__(self, name, device, dtype):
self.name = name
self.device = device
self.dtype = dtype
self.model = None
self.tokenizer= None
self.processor = None
class LS_Phi_Prompt:
CATEGORY = '😺dzNodes/LayerUtility'
FUNCTION = "phi_prompt"
RETURN_TYPES = ("STRING",)
RETURN_NAMES = ("text",)
def __init__(self):
self.NODE_NAME = 'Phi Prompt'
self.previous_model = LS_PhiModel("", "", "")
@classmethod
def INPUT_TYPES(self):
phi_model_list = ["auto", "Phi-3.5-mini-instruct", "Phi-3.5-vision-instruct"]
device_list = ['cuda', 'cpu']
dtype_list = ['fp16', 'bf16', 'fp32']
return {
"required": {
"model": (phi_model_list,),
"device": (device_list,),
"dtype": (dtype_list,),
"cache_model": ("BOOLEAN", {"default": False}),
"system_prompt": ("STRING", {"default": "You are a helpful AI assistant.","multiline": False}),
"user_prompt": ("STRING", {"default": "Describe this image","multiline": True}),
"do_sample": ("BOOLEAN", {"default": True}),
"temperature": ("FLOAT", {"default": 0.5, "min": 0.01, "max":1, "step": 0.01}),
"max_new_tokens": ("INT", {"default": 512,"min": 8, "max":4096, "step": 1}),
},
"optional": {
"image": ("IMAGE",),
}
}
def phi_prompt(self, model, device, dtype, cache_model,
system_prompt, user_prompt, do_sample,
temperature, max_new_tokens, image=None):
if model == "Phi-3.5-mini-instruct" or (model=="auto" and image is None):
if (self.previous_model.name != "Phi-3.5-mini-instruct"
or self.previous_model.device != device
or self.previous_model.dtype != dtype):
phi_model = self.load_phi_model("Phi-3.5-mini-instruct", device, dtype)
else:
phi_model = self.previous_model
# Prepare messages
messages = [
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt}
]
# Build pipeline
pipe = pipeline("text-generation", model=phi_model.model, tokenizer=phi_model.tokenizer)
generation_args = {
"return_full_text": False,
"do_sample": do_sample,
"temperature": temperature,
"max_new_tokens": max_new_tokens
}
# Generate
output = pipe(messages, **generation_args)
response = output[0]["generated_text"]
elif model == "Phi-3.5-vision-instruct" or (model=="auto" and image is not None):
if image is None:
log(f"{self.NODE_NAME} input is vision model but image is None.", message_type="error")
return ("",)
else:
if (self.previous_model.name != "Phi-3.5-vision-instruct"
or self.previous_model.device != device
or self.previous_model.dtype != dtype):
phi_model = self.load_phi_model("Phi-3.5-vision-instruct", device, dtype)
else:
phi_model = self.previous_model
images = self.tensor2batch_pil(image) # Convert tensor to PIL image batch
# Prepare images placeholders in the prompt
placeholder = ''
for index, value in enumerate(images, start=1):
placeholder += f"<|image_{index}|>\n"
# Prepare prompt
messages = [{"role": "user", "content": placeholder + user_prompt}]
prompt = phi_model.processor.tokenizer.apply_chat_template(
messages,
tokenize=False,
add_generation_prompt=True
)
# Prepare generation arguments
inputs = phi_model.processor(prompt, images, return_tensors="pt").to(device)
generate_args = {}
if do_sample:
generate_args["do_sample"] = do_sample
generate_args["temperature"] = temperature
else:
generate_args["do_sample"] = do_sample
# Generate
generate_ids = phi_model.model.generate(
**inputs,
eos_token_id=phi_model.processor.tokenizer.eos_token_id,
max_new_tokens=max_new_tokens,
**generate_args
)
# Remove input tokens
generate_ids = generate_ids[:, inputs['input_ids'].shape[1]:]
response = phi_model.processor.batch_decode(
generate_ids,
skip_special_tokens=True,
clean_up_tokenization_spaces=False
)[0]
log(f"{self.NODE_NAME} processed successfully.", message_type="finish")
if cache_model:
self.previous_model = phi_model
else:
self.previous_model = LS_PhiModel("", "", "")
del phi_model
clear_memory()
response = response.strip()
return (response,)
def load_phi_model(self, model, device, dtype):
phi_model =LS_PhiModel(model, device, dtype)
model_dir = os.path.join(model_path, model)
if dtype == 'fp16':
torch_dtype = torch.float16
elif dtype == 'bf16':
torch_dtype = torch.bfloat16
else:
torch_dtype = torch.float32
clear_memory()
if model == "Phi-3.5-mini-instruct":
try:
phi_model.model = AutoModelForCausalLM.from_pretrained(
pretrained_model_name_or_path=model_dir,
device_map=device,
torch_dtype=torch_dtype,
trust_remote_code=True
)
phi_model.tokenizer = AutoTokenizer.from_pretrained(
model_dir,
)
except Exception as e:
log(f"{self.NODE_NAME} failed to load {model}. Error: {e}", message_type="error")
elif model == "Phi-3.5-vision-instruct":
try:
phi_model.model = AutoModelForCausalLM.from_pretrained(
model_dir,
device_map=device,
trust_remote_code=True,
torch_dtype=torch_dtype,
# _attn_implementation="flash_attention_2",
_attn_implementation="eager"
)
# For best performance, use num_crops=4 for multi-frame, num_crops=16 for single-frame.
phi_model.processor = AutoProcessor.from_pretrained(
model_dir,
trust_remote_code=True,
num_crops=16
)
except Exception as e:
log(f"{self.NODE_NAME} failed to load {model}. Error: {e}", message_type="error")
return phi_model
def tensor2batch_pil(self, image):
batch_count = image.size(0) if len(image.shape) > 3 else 1
if batch_count > 1:
out = []
for i in range(batch_count):
out.extend(self.tensor2pil(image[i]))
return out
return [Image.fromarray(np.clip(255.0 * image.cpu().numpy().squeeze(), 0, 255).astype(np.uint8))]
NODE_CLASS_MAPPINGS = {
"LayerUtility: PhiPrompt": LS_Phi_Prompt
}
NODE_DISPLAY_NAME_MAPPINGS = {
"LayerUtility: PhiPrompt": "LayerUtility: Phi Prompt(Advance)"
}