Spaces:
Sleeping
Sleeping
| import torch | |
| import torch.nn as nn | |
| import os | |
| from peft import PeftModel | |
| from PIL import Image | |
| import gradio as gr | |
| import librosa | |
| import nltk | |
| import re | |
| from transformers import PreTrainedModel | |
| from transformers import AutoModelForCausalLM, AutoTokenizer | |
| from transformers import CLIPProcessor, CLIPModel | |
| from transformers import WhisperProcessor, WhisperForConditionalGeneration | |
| device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
| model_name = "microsoft/Phi-3.5-mini-instruct" | |
| tokenizer = AutoTokenizer.from_pretrained(model_name, trust_remote_code=True) | |
| # Load the model and processor | |
| clipmodel = CLIPModel.from_pretrained("openai/clip-vit-base-patch32") | |
| clipprocessor = CLIPProcessor.from_pretrained("openai/clip-vit-base-patch32") | |
| nltk.download('punkt') | |
| nltk.download('punkt_tab') | |
| def remove_punctuation(text): | |
| newtext = ''.join([char for char in text if char.isalnum() or char.isspace()]) | |
| newtext = ' '.join(newtext.split()) | |
| return newtext | |
| def preprocess_text(text): | |
| text_no_punct = remove_punctuation(text) | |
| return text_no_punct | |
| # Load Whisper model and processor | |
| whisper_model_name = "openai/whisper-small" | |
| whisper_processor = WhisperProcessor.from_pretrained(whisper_model_name) | |
| whisper_model = WhisperForConditionalGeneration.from_pretrained(whisper_model_name) | |
| def transcribe_speech(audiopath): | |
| # Load and preprocess the audio | |
| speech, rate = librosa.load(audiopath, sr=16000) | |
| audio_input = whisper_processor(speech, return_tensors="pt", sampling_rate=16000) | |
| # print("audio_input:", audio_input) | |
| # Generate transcription | |
| with torch.no_grad(): | |
| generated_ids = whisper_model.generate(audio_input["input_features"]) | |
| # Decode the transcription | |
| transcription = whisper_processor.batch_decode(generated_ids, skip_special_tokens=True)[0] | |
| return transcription | |
| class ProjectionBlock(nn.Module): | |
| def __init__(self, input_dim_CLIP, input_dim_phi2): | |
| super().__init__() | |
| self.pre_norm = nn.LayerNorm(input_dim_CLIP) | |
| self.proj = nn.Sequential( | |
| nn.Linear(input_dim_CLIP, input_dim_phi2), | |
| nn.GELU(), | |
| nn.Linear(input_dim_phi2, input_dim_phi2) | |
| ) | |
| def forward(self, x): | |
| x = self.pre_norm(x) | |
| return self.proj(x) | |
| # Modify the MultimodalPhiModel class to work with HuggingFace Trainer | |
| class MultimodalPhiModel(PreTrainedModel): | |
| def gradient_checkpointing_enable(self, **kwargs): | |
| self.phi_model.gradient_checkpointing_enable(**kwargs) | |
| def gradient_checkpointing_disable(self): | |
| self.phi_model.gradient_checkpointing_disable() | |
| def __init__(self, phi_model, tokenizer, projection): | |
| super().__init__(phi_model.config) | |
| self.phi_model = phi_model | |
| self.image_projection = projection | |
| self.tokenizer = tokenizer | |
| # self.device = device | |
| self.base_phi_model = None | |
| def from_pretrained(self, pretrained_model_name_or_path, *model_args, debug=False, **kwargs): | |
| model_name = "microsoft/Phi-3.5-mini-instruct" | |
| base_phi_model = AutoModelForCausalLM.from_pretrained( | |
| model_name, | |
| torch_dtype=torch.bfloat16, | |
| trust_remote_code=True, | |
| ) | |
| tokenizer = AutoTokenizer.from_pretrained(model_name, trust_remote_code=True) | |
| # phi_path = os.path.join(pretrained_model_name_or_path, "phi_model") | |
| phi_path = pretrained_model_name_or_path | |
| # Save the base model | |
| model = PeftModel.from_pretrained(base_phi_model, phi_path) | |
| phi_model = model.merge_and_unload() | |
| # # Load the base Phi-3 model | |
| # phi_model = AutoModelForCausalLM.from_pretrained(pretrained_model_name_or_path, *model_args, **kwargs) | |
| input_dim = 512 | |
| output_dim = 3072 | |
| # Load the projector weights | |
| # projector_path = os.path.join(pretrained_model_name_or_path, "projection_layer", "pytorch_model.bin") | |
| projector_path = os.path.join(pretrained_model_name_or_path, "image_projector.pth") | |
| if os.path.exists(projector_path): | |
| projector_state_dict = torch.load(projector_path, map_location=phi_model.device) | |
| projector = ProjectionBlock(input_dim, output_dim) | |
| # Try to load the state dict, ignoring mismatched keys | |
| projector.load_state_dict(projector_state_dict, strict=False) | |
| print(f"Loaded projector with input_dim={input_dim}, output_dim={output_dim}") | |
| else: | |
| print(f"Projector weights not found at {projector_path}. Initializing with default dimensions.") | |
| input_dim = 512 # Default CLIP embedding size | |
| output_dim = phi_model.config.hidden_size | |
| projector = ProjectionBlock(input_dim, output_dim) | |
| # Create and return the Phi3WithProjector instance | |
| model = self(phi_model, tokenizer, projector) | |
| model.base_phi_model = base_phi_model | |
| return model | |
| def save_pretrained(self, save_directory): | |
| # Load the Phi-3.5 model | |
| self.phi_model.save_pretrained(save_directory) | |
| # model_name = "microsoft/Phi-3.5-mini-instruct" | |
| # base_phi_model = AutoModelForCausalLM.from_pretrained( | |
| # model_name, | |
| # torch_dtype=torch.bfloat16, | |
| # trust_remote_code=True, | |
| # ) | |
| # # Save the base model | |
| # model = PeftModel.from_pretrained(base_phi_model, self.phi_model) | |
| # model = model.merge_and_unload() | |
| # model.save_pretrained(save_directory) | |
| # Save the projector weights | |
| projector_path = os.path.join(save_directory, "image_projector.pth") | |
| torch.save(self.image_projection.state_dict(), projector_path) | |
| # Save the config | |
| self.config.save_pretrained(save_directory) | |
| def encode(self, image_features): | |
| image_projections = self.image_projection(image_features) | |
| return image_projections | |
| def forward(self, start_input_ids, end_input_ids, image_features, attention_mask, labels): | |
| # print("tokenizer bos_token_id", self.tokenizer.bos_token_id, "tokenizer eos_token", self.tokenizer.eos_token, | |
| # "tokenizer pad_token_id", self.tokenizer.pad_token_id, "tokenizer sep_token_id", self.tokenizer.sep_token_id, | |
| # "tokenizer cls_token_id", self.tokenizer.cls_token_id, "tokenizer mask_token_id", self.tokenizer.mask_token_id, | |
| # "tokenizer unk_token_id", self.tokenizer.unk_token_id) | |
| device = next(self.parameters()).device | |
| start_embeds = self.phi_model.get_input_embeddings()(start_input_ids.to(device)) | |
| end_embeds = self.phi_model.get_input_embeddings()(end_input_ids.to(device)) | |
| # print("start_embeds shape:", start_embeds.shape, "image_embeddings shape:", image_embeddings.shape, "end_embeds shape:", end_embeds.shape) | |
| # print("start_embeds dtype:", start_embeds.dtype, "image_embeddings dtype:", image_embeddings.dtype, "end_embeds dtype:", end_embeds.dtype) | |
| if image_features is not None: | |
| # Encode image features | |
| image_embeddings = self.encode(image_features.to(device)).bfloat16() | |
| input_embeds = torch.cat([start_embeds, image_embeddings, end_embeds], dim=1) | |
| else: | |
| input_embeds = torch.cat([start_embeds, end_embeds], dim=1) | |
| # print("Input Embeds shape:", input_embeds.shape, "attention_mask shape:", attention_mask.shape, "labels shape:", labels.shape) | |
| # print("input_embeds dtype:", input_embeds.dtype, "attention_mask dtype:", attention_mask.dtype) | |
| # Forward pass through the language model | |
| outputs = self.phi_model(inputs_embeds=input_embeds.to(device), | |
| attention_mask=attention_mask.to(device), | |
| labels=labels, | |
| return_dict=True) | |
| return outputs | |
| def getImageArray(image_path): | |
| image = Image.open(image_path) | |
| return image | |
| def getAudioArray(audio_path): | |
| speech, rate = librosa.load(audio_path, sr=16000) | |
| return speech | |
| # Start text before putting image embedding | |
| start_text = "<|system|> \n You are an assistant good at understanding the context.<|end|> \n <|user|> \n" | |
| # Prepare text input for causal language modeling | |
| end_text = "\n Describe the objects and their relationship in the given context.<|end|> \n <|assistant|> \n" | |
| words = nltk.word_tokenize(start_text) + nltk.word_tokenize(end_text) | |
| input_words = list(set(words)) | |
| # print("Input words:",input_words) | |
| def getInputs(image_path, question, answer=""): | |
| image_features = None | |
| num_image_tokens = 0 | |
| if image_path is not None: | |
| # print("type of image:", type(image_path)) | |
| # print("image path:", image_path) | |
| image = clipprocessor(images=Image.open(image_path), return_tensors="pt") | |
| # Generate the embedding | |
| image_features = clipmodel.get_image_features(**image) | |
| # Generate the embedding | |
| # image_features = get_clip_embeddings(image) | |
| image_features = torch.stack([image_features]) | |
| num_image_tokens = image_features.shape[1] | |
| # Start text before putting image embedding | |
| start_text = f"<|system|>\nYou are an assistant good at understanding the context.<|end|>\n<|user|>\n " | |
| # Prepare text input for causal language modeling | |
| end_text = f" .\n Describe the objects and their relationship from the context. <|end|>\n<|assistant|>\n {answer}" | |
| # Tokenize the full texts | |
| start_tokens = tokenizer(start_text, padding=True, truncation=True, max_length=512, return_tensors="pt") | |
| end_tokens = tokenizer(end_text, padding=True, truncation=True, max_length=512, return_tensors="pt") | |
| # print(f"start_encodings shape: {start_encodings['input_ids'].shape}, end_encodings shape: {end_encodings['input_ids'].shape}") | |
| start_input_ids = start_tokens['input_ids'] | |
| start_attention_mask = start_tokens['attention_mask'] | |
| end_input_ids = end_tokens['input_ids'] | |
| end_attention_mask = end_tokens['attention_mask'] | |
| # print("start_input_ids type:", type(start_input_ids), "image_tokens type:", type(image_tokens)) | |
| # print(f"start_input_ids shape: {start_input_ids.shape}, image_tokens shape: {image_tokens.shape}, end_input_ids shape: {end_input_ids.shape}") | |
| # input_ids = torch.cat([start_input_ids,image_tokens,end_input_ids], dim=1) | |
| if image_path is not None: | |
| attention_mask = torch.cat([start_attention_mask, torch.ones((1, num_image_tokens), dtype=torch.long), end_attention_mask], dim=1) | |
| else: | |
| attention_mask = torch.cat([start_attention_mask, end_attention_mask], dim=1) | |
| return start_input_ids, end_input_ids, image_features, attention_mask | |
| model_location = "./MM_FT_C1_V2" | |
| # print("Model location:", model_location) | |
| model = MultimodalPhiModel.from_pretrained(model_location).to(device) | |
| model_name = "microsoft/Phi-3.5-mini-instruct" | |
| base_phi_model = AutoModelForCausalLM.from_pretrained( | |
| model_name, | |
| torch_dtype=torch.bfloat16, | |
| trust_remote_code=True, | |
| ).to(device) | |
| def getStringAfter(output, start_str): | |
| if start_str in output: | |
| answer = output.split(start_str)[1] | |
| else: | |
| answer = output | |
| answer = preprocess_text(answer) | |
| return answer | |
| def getAnswerPart(output): | |
| output_words = nltk.word_tokenize(output) | |
| filtered_words = [word for word in output_words if word.lower() not in [w.lower() for w in input_words]] | |
| return ' '.join(filtered_words) | |
| def generateOutput(image_path, audio_path, context_text, question, max_length=3): | |
| answerPart = "" | |
| speech_text = "" | |
| if image_path is not None: | |
| for i in range(max_length): | |
| start_tokens, end_tokens, image_features, attention_mask = getInputs(image_path, question, answer=answerPart) | |
| # print("image_features dtype:", image_features.dtype) | |
| output = model(start_tokens, end_tokens, image_features, attention_mask, labels=None) | |
| tokens = output.logits.argmax(dim=-1) | |
| output = tokenizer.decode( | |
| tokens[0], | |
| skip_special_tokens=True | |
| ) | |
| # answerPart = getStringAfter(output, "<|assistant|>") | |
| answerPart = getAnswerPart(output) | |
| print("Answerpart:", answerPart) | |
| if audio_path is not None: | |
| speech_text = transcribe_speech(audio_path) | |
| print("Speech Text:", speech_text) | |
| if (question is None) or (question == ""): | |
| question = " Describe the objects and their relationships in 1 sentence." | |
| input_text = ( | |
| "<|system|>\n Please understand the context " | |
| "and answer the question in 1 or 2 summarized sentences.\n" | |
| f"<|end|>\n<|user|>\n<|context|> {answerPart} \n {speech_text} \n {context_text} " | |
| f"\n<|question|>: {question} \n<|end|>\n<|assistant|>\n" | |
| ) | |
| print("input_text:", input_text) | |
| tokens = tokenizer(input_text, padding=True, truncation=True, max_length=1024, return_tensors="pt") | |
| start_tokens = tokens['input_ids'].to(device) | |
| # start_tokens = tokenizer(input_text, padding=True, truncation=True, max_length=1024, return_tensors="pt")['input_ids'].to(device) | |
| attention_mask = tokens['attention_mask'].to(device) | |
| # base_phi_model.generate(start_tokens, max_length=2, do_sample=False, pad_token_id=tokenizer.pad_token_id) | |
| output_text = tokenizer.decode( | |
| base_phi_model.generate(start_tokens, attention_mask=attention_mask, max_length=1024, do_sample=False, pad_token_id=tokenizer.pad_token_id)[0], | |
| skip_special_tokens=True | |
| ) | |
| output_text = getStringAfter(output_text, question).strip() | |
| return output_text | |
| title = "Created Fine Tuned MultiModal model" | |
| description = "Test the fine tuned multimodal model created using clip, phi3.5 mini instruct, whisper models" | |
| demo = gr.Blocks() | |
| def process_inputs(image, audio_source, audio_file, audio_mic, context_text, question): | |
| if audio_source == "Microphone": | |
| speech = audio_mic | |
| elif audio_source == "Audio File": | |
| speech = audio_file | |
| else: | |
| speech = None | |
| # image_features = get_clip_embeddings(image) if image else None | |
| answer = generateOutput(image, speech, context_text, question) | |
| return answer | |
| with demo: | |
| gr.Markdown(f"# {title}") | |
| gr.Markdown(f" {description}") | |
| with gr.Row(): | |
| with gr.Column(scale=1, min_width=300): | |
| image_input = gr.Image(type="filepath", label="Upload Image") | |
| with gr.Column(scale=2, min_width=300): | |
| question = gr.Textbox(label="Question") | |
| with gr.Row(): | |
| audio_source = gr.Radio(choices=["Microphone", "Audio File"], label="Select Audio Source") | |
| audio_file = gr.Audio(sources="upload", type="filepath", visible=False) | |
| audio_mic = gr.Audio(sources="microphone", type="filepath", visible=False) | |
| context_text = gr.Textbox(label="Context Text") | |
| output_text = gr.Textbox(label="Output") | |
| def update_audio_input(source): | |
| if source == "Microphone": | |
| return gr.update(visible=True), gr.update(visible=False) | |
| elif source == "Audio File": | |
| return gr.update(visible=False), gr.update(visible=True) | |
| else: | |
| return gr.update(visible=False), gr.update(visible=False) | |
| audio_source.change(fn=update_audio_input, inputs=audio_source, outputs=[audio_mic, audio_file]) | |
| submit_button = gr.Button("Submit") | |
| submit_button.click(fn=process_inputs, inputs=[image_input, audio_source, audio_file, audio_mic, context_text, question], outputs=output_text) | |
| demo.launch(debug=True) | |