Spaces:
Runtime error
Runtime error
| import torch | |
| from transformers import ( | |
| AutoTokenizer, | |
| AutoModelForCausalLM, | |
| AutoModelForSeq2SeqLM, | |
| AutoProcessor, | |
| AutoModelForSpeechSeq2Seq, | |
| AutoModelForTextToWaveform | |
| ) | |
| from diffusers import DiffusionPipeline | |
| import time | |
| import os | |
| from dotenv import load_dotenv | |
| from huggingface_hub import HfApi, HfFolder, Repository | |
| import gradio as gr | |
| load_dotenv() | |
| def prune_model(model, amount=0.5): | |
| from torch.nn.utils import prune | |
| for name, module in model.named_modules(): | |
| if isinstance(module, (torch.nn.Linear, torch.nn.Conv2d)): | |
| prune.l1_unstructured(module, name='weight', amount=amount) | |
| prune.remove(module, 'weight') | |
| return model | |
| def quantize_to_q1_with_min(tensor, min_value=-1): | |
| tensor = torch.sign(tensor) | |
| tensor[tensor < min_value] = min_value | |
| return tensor | |
| def quantize_model_to_q1_with_min(model, min_value=-1): | |
| for name, param in model.named_parameters(): | |
| if param.dtype in [torch.float32, torch.float16]: | |
| with torch.no_grad(): | |
| param.copy_(quantize_to_q1_with_min(param.data, min_value)) | |
| def disable_unnecessary_components(model): | |
| for name, module in model.named_modules(): | |
| if isinstance(module, torch.nn.Dropout): | |
| module.p = 0.0 | |
| elif isinstance(module, torch.nn.BatchNorm1d): | |
| module.eval() | |
| def ultra_max_compress(model): | |
| model = prune_model(model, amount=0.8) | |
| quantize_model_to_q1_with_min(model, min_value=-0.05) | |
| disable_unnecessary_components(model) | |
| with torch.no_grad(): | |
| for name, param in model.named_parameters(): | |
| if param.requires_grad: | |
| param.requires_grad = False | |
| param.data = torch.nn.functional.hardtanh(param.data, min_val=-1.0, max_val=1.0) | |
| param.data = param.data.half() | |
| try: | |
| model = torch.jit.script(model) | |
| except Exception: | |
| pass | |
| prune_model(model, amount=0.9) | |
| model.eval() | |
| for buffer_name, buffer in model.named_buffers(): | |
| if buffer.numel() == 0: | |
| model._buffers.pop(buffer_name) | |
| return model | |
| def optimize_model_resources(model): | |
| torch.set_grad_enabled(False) | |
| model.eval() | |
| for name, param in model.named_parameters(): | |
| param.requires_grad = False | |
| if param.dtype == torch.float32: | |
| param.data = param.data.half() | |
| if hasattr(model, 'config'): | |
| if hasattr(model.config, 'max_position_embeddings'): | |
| model.config.max_position_embeddings = min(model.config.max_position_embeddings, 512) | |
| if hasattr(model.config, 'hidden_size'): | |
| model.config.hidden_size = min(model.config.hidden_size, 768) | |
| model = torch.jit.optimize_for_inference(model) | |
| return model | |
| def generate_random_responses(model, tokenizer, prompt, num_responses=5, max_length=50): | |
| responses = [] | |
| for _ in range(num_responses): | |
| input_ids = tokenizer.encode(prompt, return_tensors="pt") | |
| output = model.generate(input_ids, max_length=max_length, do_sample=True, top_k=50) | |
| response = tokenizer.decode(output[0], skip_special_tokens=True) | |
| responses.append(response) | |
| return responses | |
| def patched_distilbert_forward(self, input_ids=None, attention_mask=None, head_mask=None, inputs_embeds=None, output_attentions=None, output_hidden_states=None, return_dict=None): | |
| return_dict = return_dict if return_dict is not None else self.config.use_return_dict | |
| outputs = DistilBertModel.forward(self, input_ids, attention_mask=attention_mask, head_mask=head_mask, inputs_embeds=inputs_embeds, output_attentions=output_attentions, output_hidden_states=output_hidden_states, return_dict=return_dict) | |
| if not return_dict: | |
| output_tuple = [] | |
| for v in [outputs.last_hidden_state, outputs.hidden_states, outputs.attentions]: | |
| if v is not None: | |
| output_tuple.append(v) | |
| return tuple(output_tuple) | |
| return outputs | |
| def patched_forward(self, input_ids=None, attention_mask=None, head_mask=None, inputs_embeds=None, labels=None, output_attentions=None, output_hidden_states=None, return_dict=None): | |
| return_dict = return_dict if return_dict is not None else self.config.use_return_dict | |
| outputs = self.distilbert(input_ids, attention_mask=attention_mask, head_mask=head_mask, inputs_embeds=inputs_embeds, output_attentions=output_attentions, output_hidden_states=output_hidden_states, return_dict=return_dict) | |
| hidden_state = outputs[0] | |
| pooled_output = self.pre_classifier(hidden_state[:, 0]) | |
| pooled_output = self.dropout(pooled_output) | |
| logits = self.classifier(pooled_output) | |
| if not return_dict: | |
| output = (logits,) + outputs[1:] | |
| return output | |
| return logits | |
| def patched_roberta_forward(self, input_ids=None, attention_mask=None, head_mask=None, inputs_embeds=None, labels=None, output_attentions=None, output_hidden_states=None, return_dict=None): | |
| return_dict = return_dict if return_dict is not None else self.config.use_return_dict | |
| outputs = self.roberta(input_ids, attention_mask=attention_mask, head_mask=head_mask, inputs_embeds=inputs_embeds, output_attentions=output_attentions, output_hidden_states=output_hidden_states, return_dict=return_dict) | |
| hidden_state = outputs[0] | |
| pooled_output = hidden_state[:, 0] | |
| pooled_output = self.dropout(pooled_output) | |
| logits = self.classifier(pooled_output) | |
| if not return_dict: | |
| output = (logits,) + outputs[1:] | |
| return output | |
| return logits | |
| def optimize_for_low_resources(model): | |
| model = ultra_max_compress(model) | |
| model = optimize_model_resources(model) | |
| model.config.max_position_embeddings = 256 | |
| model.config.hidden_size = 384 | |
| return model | |
| def optimize_for_very_low_resources(model): | |
| model = ultra_max_compress(model) | |
| model = optimize_model_resources(model) | |
| model.config.max_position_embeddings = 128 | |
| model.config.hidden_size = 256 | |
| return model | |
| def remove_unused_model_components(model): | |
| for name, param in model.named_parameters(): | |
| if param.numel() == 0: | |
| model._parameters.pop(name) | |
| return model | |
| def auto_train_model(model, train_data, epochs=3): | |
| optimizer = torch.optim.Adam(model.parameters(), lr=1e-5) | |
| model.train() | |
| for epoch in range(epochs): | |
| for batch in train_data: | |
| inputs, labels = batch | |
| optimizer.zero_grad() | |
| outputs = model(**inputs, labels=labels) | |
| loss = outputs.loss | |
| loss.backward() | |
| optimizer.step() | |
| return model | |
| def apply_extreme_filters(model): | |
| model = ultra_max_compress(model) | |
| model = optimize_model_resources(model) | |
| model.config.max_position_embeddings = 128 | |
| model.config.hidden_size = 256 | |
| model = torch.jit.optimize_for_inference(model) | |
| model = prune_model(model, amount=0.95) | |
| quantize_model_to_q1_with_min(model, min_value=-0.1) | |
| return model | |
| def reduce_latency(model, tokenizer, prompt, num_responses=5, max_length=50): | |
| responses = [] | |
| start_time = time.time() | |
| for _ in range(num_responses): | |
| input_ids = tokenizer.encode(prompt, return_tensors="pt") | |
| output = model.generate(input_ids, max_length=max_length, do_sample=True, top_k=50) | |
| response = tokenizer.decode(output[0], skip_special_tokens=True) | |
| responses.append(response) | |
| end_time = time.time() | |
| latency = (end_time - start_time) / num_responses * 1000 | |
| return responses, latency | |
| def create_gpt_distill_model(): | |
| gpt_model = GPT2LMHeadModel.from_pretrained("gpt2") | |
| gpt_tokenizer = AutoTokenizer.from_pretrained("gpt2") | |
| return gpt_model, gpt_tokenizer | |
| def create_gemma_distill_model(): | |
| gemma_model = AutoModelForCausalLM.from_pretrained("google/gemma-2-9b") | |
| gemma_tokenizer = AutoTokenizer.from_pretrained("google/gemma-2-9b") | |
| return gemma_model, gemma_tokenizer | |
| def measure_performance(model, tokenizer, sequence_length=20, num_tokens=100): | |
| inputs = tokenizer("A" * sequence_length, return_tensors="pt") | |
| start_time = time.time() | |
| for _ in range(num_tokens): | |
| model.generate(**inputs) | |
| end_time = time.time() | |
| latency = (end_time - start_time) / num_tokens * 1000 | |
| tokens_per_second = num_tokens / (end_time - start_time) | |
| return latency, tokens_per_second | |
| def apply_diffusion_pipeline(prompt): | |
| diffusion_pipeline = DiffusionPipeline.from_pretrained("black-forest-labs/FLUX.1-schnell") | |
| images = diffusion_pipeline(prompt).images | |
| return images | |
| def generate_responses_with_diffusion(prompt, use_diffusion): | |
| if "imagina" in prompt.lower() or "imagine" in prompt.lower(): | |
| images = apply_diffusion_pipeline(prompt) | |
| return images | |
| return None | |
| def generate_summary_with_bart(prompt): | |
| tokenizer = AutoTokenizer.from_pretrained("facebook/bart-large-cnn") | |
| model = AutoModelForSeq2SeqLM.from_pretrained("facebook/bart-large-cnn") | |
| inputs = tokenizer.encode(prompt, return_tensors="pt") | |
| summary_ids = model.generate(inputs, max_length=130, min_length=30, length_penalty=2.0, num_beams=4, early_stopping=True) | |
| summary = tokenizer.decode(summary_ids[0], skip_special_tokens=True) | |
| return summary | |
| def generate_responses_with_bart(prompt): | |
| if "resumir" in prompt.lower() or "resumime" in prompt.lower(): | |
| summary = generate_summary_with_bart(prompt) | |
| return summary | |
| return None | |
| def apply_whisper_pipeline(prompt): | |
| processor = AutoProcessor.from_pretrained("openai/whisper-small") | |
| model = AutoModelForSpeechSeq2Seq.from_pretrained("openai/whisper-small") | |
| inputs = processor(prompt, return_tensors="pt") | |
| outputs = model.generate(**inputs) | |
| transcription = processor.batch_decode(outputs, skip_special_tokens=True) | |
| return transcription | |
| def generate_transcription_with_whisper(prompt): | |
| if "transcribe" in prompt.lower() or "transcribime" in prompt.lower(): | |
| transcription = apply_whisper_pipeline(prompt) | |
| return transcription | |
| return None | |
| def apply_translation_pipeline(prompt): | |
| tokenizer = AutoTokenizer.from_pretrained("google-t5/t5-base") | |
| model = AutoModelForSeq2SeqLM.from_pretrained("google-t5/t5-base") | |
| inputs = tokenizer.encode(prompt, return_tensors="pt") | |
| translated_ids = model.generate(inputs, max_length=50) | |
| translated_text = tokenizer.decode(translated_ids[0], skip_special_tokens=True) | |
| return translated_text | |
| def generate_translation_with_t5(prompt): | |
| if "traducir" in prompt.lower() or "traducime" in prompt.lower(): | |
| translation = apply_translation_pipeline(prompt) | |
| return translation | |
| return None | |
| def apply_musicgen_pipeline(prompt): | |
| tokenizer = AutoTokenizer.from_pretrained("facebook/musicgen-small") | |
| model = AutoModelForTextToWaveform.from_pretrained("facebook/musicgen-small") | |
| inputs = tokenizer(prompt, return_tensors="pt") | |
| audio = model.generate(inputs) | |
| return audio | |
| def generate_music_with_musicgen(prompt): | |
| if "música" in prompt.lower() or "canción" in prompt.lower(): | |
| music = apply_musicgen_pipeline(prompt) | |
| return music | |
| return None | |
| def apply_musicgen_melody_pipeline(prompt): | |
| tokenizer = AutoTokenizer.from_pretrained("facebook/musicgen-melody") | |
| model = AutoModelForTextToWaveform.from_pretrained("facebook/musicgen-melody") | |
| inputs = tokenizer(prompt, return_tensors="pt") | |
| audio = model.generate(inputs) | |
| return audio | |
| def generate_music_with_musicgen_melody(prompt): | |
| if "melodía" in prompt.lower() or "melodia" in prompt.lower(): | |
| music = apply_musicgen_melody_pipeline(prompt) | |
| return music | |
| return None | |
| def apply_stable_diffusion_pipeline(prompt): | |
| pipeline = DiffusionPipeline.from_pretrained("stabilityai/stable-diffusion-2-1") | |
| images = pipeline(prompt).images | |
| return images | |
| def generate_responses_with_stable_diffusion(prompt): | |
| if "imagen" in prompt.lower() or "image" in prompt.lower(): | |
| images = apply_stable_diffusion_pipeline(prompt) | |
| return images | |
| return None | |
| def unify_models(*models): | |
| combined_model = torch.nn.ModuleList(models) | |
| return combined_model | |
| def combined_filter(model): | |
| model = ultra_max_compress(model) | |
| model = optimize_model_resources(model) | |
| model.config.max_position_embeddings = 128 | |
| model.config.hidden_size = 256 | |
| model = torch.jit.optimize_for_inference(model) | |
| model = prune_model(model, amount=0.95) | |
| quantize_model_to_q1_with_min(model, min_value=-0.1) | |
| return model | |
| def apply_filters_and_unify(model): | |
| model = combined_filter(model) | |
| model = remove_unused_model_components(model) | |
| return model | |
| def upload_to_huggingface(model, repo_name): | |
| api = HfApi() | |
| try: | |
| api.create_repo(repo_id=repo_name, repo_type="model") | |
| except Exception: | |
| pass | |
| model.save_pretrained(repo_name) | |
| tokenizer.save_pretrained(repo_name) | |
| repo = Repository(repo_name) | |
| repo.push_to_hub() | |
| def apply_extreme_filters_and_upload(model, repo_name): | |
| model = apply_extreme_filters(model) | |
| upload_to_huggingface(model, repo_name) | |
| def start_gradio_interface(): | |
| def process_prompt(prompt): | |
| response = { | |
| "summary": generate_responses_with_bart(prompt), | |
| "transcription": generate_transcription_with_whisper(prompt), | |
| "translation": generate_translation_with_t5(prompt), | |
| "music": generate_music_with_musicgen(prompt), | |
| "melody_music": generate_music_with_musicgen_melody(prompt), | |
| "image": generate_responses_with_stable_diffusion(prompt), | |
| "diffusion": generate_responses_with_diffusion(prompt, True) | |
| } | |
| return response | |
| interface = gr.Interface( | |
| fn=process_prompt, | |
| inputs=gr.Textbox(label="Enter Prompt"), | |
| outputs=[gr.Textbox(label="Summary"), gr.Textbox(label="Transcription"), gr.Textbox(label="Translation"), | |
| gr.Audio(label="Music"), gr.Audio(label="Melody Music"), gr.Image(label="Image"), gr.Image(label="Diffusion")], | |
| title="Multi-Function AI Model", | |
| description="Generate summaries, transcriptions, translations, music, melodies, images, and diffusion responses." | |
| ) | |
| interface.launch() | |
| start_gradio_interface() | |
| model_infos = [ | |
| {"model_name": "gpt2", "class": GPT2LMHeadModel}, | |
| {"model_name": "google/gemma-2-9b", "class": AutoModelForCausalLM} | |
| ] | |
| for model_info in model_infos: | |
| model = model_info["class"].from_pretrained(model_info["model_name"]) | |
| tokenizer = AutoTokenizer.from_pretrained(model_info["model_name"]) | |
| optimized_model, responses, latency = optimize_model_with_all_optimizations(model, tokenizer, "Sample prompt for optimization.") | |
| print(f"Model: {model_info['model_name']}") | |
| print(f"Latency: {latency:.2f} ms") | |
| print(f"Sample Responses: {responses}") | |
| gpt_model, gpt_tokenizer = create_gpt_distill_model() | |
| gemma_model, gemma_tokenizer = create_gemma_distill_model() | |
| optimized_gpt_model, gpt_responses, gpt_latency = optimize_model_with_all_optimizations(gpt_model, gpt_tokenizer, "Sample prompt for GPT optimization.") | |
| optimized_gemma_model, gemma_responses, gemma_latency = optimize_model_with_all_optimizations(gemma_model, gemma_tokenizer, "Sample prompt for Gemma optimization.") | |
| combined_model = unify_models(optimized_gpt_model, optimized_gemma_model) | |
| optimized_gpt_model_1gb = optimize_for_1gb_ram(optimized_gpt_model) | |
| optimized_gemma_model_1gb = optimize_for_1gb_ram(optimized_gemma_model) | |
| optimized_gpt_model_low = optimize_for_very_low_resources(optimized_gpt_model) | |
| optimized_gemma_model_low = optimize_for_very_low_resources(optimized_gemma_model) | |
| optimized_gpt_model_cpu = optimize_for_old_cpu(optimized_gpt_model) | |
| optimized_gemma_model_cpu = optimize_for_old_cpu(optimized_gemma_model) | |
| optimized_gpt_model_gpu = optimize_for_old_gpu(optimized_gpt_model) | |
| optimized_gemma_model_gpu = optimize_for_old_gpu(optimized_gemma_model) | |
| print("Models optimized for various resource constraints.") | |
| diffusion_response = generate_responses_with_diffusion("Imagine a serene landscape", True) | |
| if diffusion_response: | |
| print("Diffusion response generated.") | |
| summary_response = generate_responses_with_bart("Resumir este texto para obtener un resumen efectivo.", True) | |
| if summary_response: | |
| print("Summary response generated.") | |
| transcription_response = generate_transcription_with_whisper("Transcribe this audio file.", True) | |
| if transcription_response: | |
| print("Transcription response generated.") | |
| translation_response = generate_translation_with_t5("Traducir este texto al inglés.", True) | |
| if translation_response: | |
| print("Translation response generated.") | |
| music_response = generate_music_with_musicgen("Música para una tarde tranquila.", True) | |
| if music_response: | |
| print("Music response generated.") | |
| melody_music_response = generate_music_with_musicgen_melody("Melodía para relajación.", True) | |
| if melody_music_response: | |
| print("Melody music response generated.") | |
| image_response = generate_responses_with_stable_diffusion("Imagen de un paisaje sereno.", True) | |
| if image_response: | |
| print("Image response generated.") | |
| upload_to_huggingface(combined_model, "Ffftdtd5dtft/my_model") | |