Spaces:
Runtime error
Runtime error
| # import gradio as gr | |
| # import requests | |
| # import time | |
| # from PIL import Image | |
| # from io import BytesIO | |
| # # AssemblyAI API Key | |
| # ASSEMBLYAI_API_KEY = "your_assemblyai_api_key_here" | |
| # # DeepAI API Key | |
| # DEEPAI_API_KEY = "your_deepai_api_key_here" | |
| # # Function to convert speech to text using AssemblyAI API | |
| # def speech_to_text(audio_file): | |
| # # Upload audio to AssemblyAI for transcription | |
| # upload_url = "https://api.assemblyai.com/v2/upload" | |
| # headers = { | |
| # "authorization": ASSEMBLYAI_API_KEY | |
| # } | |
| # # Upload the audio file to AssemblyAI | |
| # with open(audio_file, 'rb') as file: | |
| # response = requests.post(upload_url, headers=headers, files={"file": file}) | |
| # if response.status_code != 200: | |
| # return "Error uploading audio." | |
| # audio_url = response.json()["upload_url"] | |
| # # Request transcription from AssemblyAI | |
| # transcript_url = "https://api.assemblyai.com/v2/transcript" | |
| # transcript_request = { | |
| # "audio_url": audio_url | |
| # } | |
| # transcript_response = requests.post(transcript_url, json=transcript_request, headers=headers) | |
| # if transcript_response.status_code != 200: | |
| # return "Error requesting transcription." | |
| # transcript_id = transcript_response.json()["id"] | |
| # # Poll for transcription completion | |
| # while True: | |
| # polling_url = f"https://api.assemblyai.com/v2/transcript/{transcript_id}" | |
| # polling_response = requests.get(polling_url, headers=headers) | |
| # if polling_response.status_code != 200: | |
| # return "Error polling for transcription status." | |
| # status = polling_response.json()["status"] | |
| # if status == "completed": | |
| # return polling_response.json()["text"] | |
| # elif status == "failed": | |
| # return "Transcription failed." | |
| # time.sleep(5) # Wait 5 seconds before polling again | |
| # # Function to generate an image based on text using DeepAI's Image Generation API | |
| # def generate_image_from_text(text): | |
| # image_generation_url = "https://api.deepai.org/api/text2img" | |
| # headers = { | |
| # "api-key": DEEPAI_API_KEY | |
| # } | |
| # payload = { | |
| # "text": text | |
| # } | |
| # # Request image generation from DeepAI | |
| # response = requests.post(image_generation_url, data=payload, headers=headers) | |
| # if response.status_code == 200: | |
| # # Get the image URL from the response | |
| # image_url = response.json()["output_url"] | |
| # return image_url | |
| # else: | |
| # return "Failed to generate image." | |
| # # Function to download image from URL and return as a PIL image | |
| # def get_image_from_url(image_url): | |
| # try: | |
| # response = requests.get(image_url) | |
| # img = Image.open(BytesIO(response.content)) | |
| # return img | |
| # except Exception as e: | |
| # return "Error downloading image: " + str(e) | |
| # # Gradio Interface function | |
| # def process_audio(audio_file): | |
| # # Convert speech to text | |
| # text = speech_to_text(audio_file) | |
| # if text and text != "Error uploading audio." and text != "Error requesting transcription.": | |
| # print(f"Transcribed text: {text}") # Debug output for transcribed text | |
| # # Generate image from the transcribed text | |
| # image_url = generate_image_from_text(text) | |
| # if "Failed" not in image_url: | |
| # print(f"Image URL: {image_url}") # Debug output for image URL | |
| # # Download the image from URL and return it as a PIL image | |
| # return get_image_from_url(image_url) | |
| # else: | |
| # return image_url | |
| # else: | |
| # return "Error processing audio." | |
| # # Set up Gradio interface | |
| # iface = gr.Interface(fn=process_audio, | |
| # inputs=gr.Audio(type="filepath"), # Audio input | |
| # outputs=gr.Image(type="pil"), # Image output as PIL image | |
| # live=True, | |
| # title="Speech-to-Text to Image Generator") | |
| # iface.launch() | |
| # import gradio as gr | |
| # import requests | |
| # import time | |
| # from PIL import Image | |
| # from io import BytesIO | |
| # # API keys | |
| # ASSEMBLYAI_API_KEY = "your_assemblyai_api_key_here" | |
| # STABILITY_AI_API_KEY = "your_stability_ai_api_key_here" | |
| # # Function to convert speech to text using AssemblyAI API | |
| # def speech_to_text(audio_file): | |
| # upload_url = "https://api.assemblyai.com/v2/upload" | |
| # headers = { | |
| # "authorization": ASSEMBLYAI_API_KEY | |
| # } | |
| # # Upload the audio file to AssemblyAI | |
| # with open(audio_file, 'rb') as file: | |
| # response = requests.post(upload_url, headers=headers, files={"file": file}) | |
| # if response.status_code != 200: | |
| # return "Error uploading audio." | |
| # audio_url = response.json()["upload_url"] | |
| # # Request transcription from AssemblyAI | |
| # transcript_url = "https://api.assemblyai.com/v2/transcript" | |
| # transcript_request = { | |
| # "audio_url": audio_url | |
| # } | |
| # transcript_response = requests.post(transcript_url, json=transcript_request, headers=headers) | |
| # if transcript_response.status_code != 200: | |
| # return "Error requesting transcription." | |
| # transcript_id = transcript_response.json()["id"] | |
| # # Poll for transcription completion | |
| # while True: | |
| # polling_url = f"https://api.assemblyai.com/v2/transcript/{transcript_id}" | |
| # polling_response = requests.get(polling_url, headers=headers) | |
| # if polling_response.status_code != 200: | |
| # return "Error polling for transcription status." | |
| # status = polling_response.json()["status"] | |
| # if status == "completed": | |
| # return polling_response.json()["text"] | |
| # elif status == "failed": | |
| # return "Transcription failed." | |
| # time.sleep(5) # Wait 5 seconds before polling again | |
| # # Function to generate an image based on text using Stability AI (Stable Diffusion) | |
| # def generate_image_from_text(text): | |
| # image_generation_url = "https://stability.ai/api/v3/generate" # Stability AI API endpoint (assuming) | |
| # headers = { | |
| # "Authorization": f"Bearer {STABILITY_AI_API_KEY}" | |
| # } | |
| # payload = { | |
| # "text": text, | |
| # "width": 512, # Adjust image dimensions as needed | |
| # "height": 512 | |
| # } | |
| # # Request image generation from Stability AI | |
| # response = requests.post(image_generation_url, json=payload, headers=headers) | |
| # if response.status_code == 200: | |
| # # Get the image URL from the response (assuming the response contains a URL) | |
| # image_url = response.json().get("image_url", "") | |
| # if image_url: | |
| # return image_url | |
| # else: | |
| # return "Failed to generate image: No image URL found in response." | |
| # else: | |
| # return f"Failed to generate image: {response.status_code}" | |
| # # Function to download image from URL and return as a PIL image | |
| # def get_image_from_url(image_url): | |
| # try: | |
| # response = requests.get(image_url) | |
| # img = Image.open(BytesIO(response.content)) | |
| # return img | |
| # except Exception as e: | |
| # return f"Error downloading image: {str(e)}" | |
| # # Gradio Interface function | |
| # def process_audio(audio_file): | |
| # # Convert speech to text | |
| # text = speech_to_text(audio_file) | |
| # if text and text != "Error uploading audio." and text != "Error requesting transcription.": | |
| # print(f"Transcribed text: {text}") # Debug output for transcribed text | |
| # # Generate image from the transcribed text | |
| # image_url = generate_image_from_text(text) | |
| # if "Failed" not in image_url: | |
| # print(f"Image URL: {image_url}") # Debug output for image URL | |
| # # Download the image from URL and return it as a PIL image | |
| # return get_image_from_url(image_url) | |
| # else: | |
| # return image_url | |
| # else: | |
| # return "Error processing audio." | |
| # # Set up Gradio interface | |
| # iface = gr.Interface(fn=process_audio, | |
| # inputs=gr.Audio(type="filepath"), # Audio input | |
| # outputs=gr.Image(type="pil"), # Image output as PIL image | |
| # live=True, | |
| # title="Speech-to-Text to Image Generator") | |
| # iface.launch() | |
| #1st D | |
| import subprocess | |
| # Install required libraries | |
| subprocess.check_call(["pip", "install", "torch>=1.11.0"]) | |
| subprocess.check_call(["pip", "install", "transformers"]) | |
| subprocess.check_call(["pip", "install", "diffusers>=0.14.0"]) | |
| subprocess.check_call(["pip", "install", "librosa"]) | |
| subprocess.check_call(["pip", "install", "accelerate>= 0.20.1"]) | |
| subprocess.check_call(["pip", "install", "safetensors>=0.1.0"]) | |
| subprocess.check_call(["pip", "install", "huggingface_hub>=0.16.4"]) | |
| import os | |
| import threading | |
| import numpy as np | |
| import diffusers | |
| from functools import lru_cache | |
| import gradio as gr | |
| from transformers import pipeline | |
| from huggingface_hub import login | |
| from diffusers import StableDiffusionPipeline, DPMSolverMultistepScheduler | |
| import librosa | |
| import accelerate | |
| import pandas | |
| import safetensors | |
| import torch # Import torch here to avoid the NameError | |
| # Ensure required dependencies are installed | |
| def install_missing_packages(): | |
| required_packages = { | |
| "librosa": None, | |
| "diffusers": ">=0.14.0", | |
| "gradio": ">=3.35.2", | |
| "huggingface_hub": ">=0.16.4", | |
| "accelerate": ">= 0.20.1", | |
| "safetensors":">=0.1.0", | |
| "torch":">=1.11.0", | |
| } | |
| for package, version in required_packages.items(): | |
| try: | |
| __import__(package) | |
| except ImportError: | |
| package_name = f"{package}{version}" if version else package | |
| subprocess.check_call(["pip", "install", package_name]) | |
| install_missing_packages() | |
| # Get Hugging Face token for authentication | |
| hf_token = os.getenv("HF_TOKEN") | |
| if hf_token: | |
| login(hf_token) | |
| else: | |
| raise ValueError("HF_TOKEN environment variable not set.") | |
| # Load speech-to-text model (Whisper) | |
| speech_to_text = pipeline("automatic-speech-recognition", model="openai/whisper-tiny") | |
| #Load Stable Diffusion model for text-to-image | |
| text_to_image = StableDiffusionPipeline.from_pretrained( | |
| "runwayml/stable-diffusion-v1-5" | |
| ) | |
| # text_to_image = StableDiffusionPipeline.from_pretrained( | |
| # "runwayml/stable-diffusion-v1-5", | |
| # cache_dir="./my_model_cache", # Custom cache directory | |
| # revision="fp16" | |
| # ) | |
| device = "cuda" if torch.cuda.is_available() else "cpu" # This will now work since torch is imported | |
| text_to_image.to(device) | |
| text_to_image.enable_attention_slicing() # Optimizes memory usage | |
| text_to_image.safety_checker = None # Disables safety checker to improve speed | |
| text_to_image.scheduler = DPMSolverMultistepScheduler.from_config(text_to_image.scheduler.config) # Faster scheduler | |
| # Preprocess audio file into NumPy array | |
| def preprocess_audio(audio_path): | |
| try: | |
| audio, sr = librosa.load(audio_path, sr=16000) # Resample to 16kHz | |
| return np.array(audio, dtype=np.float16) | |
| except Exception as e: | |
| return f"Error in preprocessing audio: {str(e)}" | |
| # Speech-to-text function | |
| def transcribe_audio(audio_path): | |
| try: | |
| audio_array = preprocess_audio(audio_path) | |
| if isinstance(audio_array, str): # Error message from preprocessing | |
| return audio_array | |
| result = speech_to_text(audio_array) | |
| return result["text"] | |
| except Exception as e: | |
| return f"Error in transcription: {str(e)}" | |
| # Text-to-image function | |
| def generate_image_from_text(text): | |
| try: | |
| image = text_to_image(text, height=256, width=256).images[0] # Generate smaller images for speed | |
| return image | |
| except Exception as e: | |
| return f"Error in image generation: {str(e)}" | |
| # Optimized combined processing function | |
| def process_audio_and_generate_image(audio_path): | |
| transcription_result = {"result": None} | |
| image_result = {"result": None} | |
| # Function to run transcription and image generation in parallel | |
| def transcription_thread(): | |
| transcription_result["result"] = transcribe_audio(audio_path) | |
| def image_generation_thread(): | |
| transcription = transcription_result["result"] | |
| if transcription and "Error" not in transcription: | |
| image_result["result"] = generate_image_from_text(transcription) | |
| # Start both tasks in parallel | |
| t1 = threading.Thread(target=transcription_thread) | |
| t2 = threading.Thread(target=image_generation_thread) | |
| t1.start() | |
| t2.start() | |
| t1.join() # Wait for transcription to finish | |
| t2.join() # Wait for image generation to finish | |
| transcription = transcription_result["result"] | |
| image = image_result["result"] | |
| if "Error" in transcription: | |
| return None, transcription | |
| if isinstance(image, str) and "Error" in image: | |
| return None, image | |
| return image, transcription | |
| # Gradio interface | |
| iface = gr.Interface( | |
| fn=process_audio_and_generate_image, | |
| inputs=gr.Audio(type="filepath", label="Upload audio file (WAV/MP3)"), | |
| outputs=[gr.Image(label="Generated Image"), gr.Textbox(label="Transcription")], | |
| title="Voice-to-Image Generator", | |
| description="Upload an audio file to transcribe speech to text, and then generate an image based on the transcription.", | |
| ) | |
| # Launch Gradio interface | |
| iface.launch(debug=True, share=True) | |
| #2 D | |