| import gradio as gr |
| import torch |
| import os |
| import numpy as np |
| from groq import Groq |
| from transformers import AutoModel, AutoTokenizer, BitsAndBytesConfig |
| from diffusers import StableDiffusionXLPipeline, UNet2DConditionModel, EulerDiscreteScheduler |
| from parler_tts import ParlerTTSForConditionalGeneration |
| import soundfile as sf |
| from langchain_community.embeddings import OpenAIEmbeddings |
| from langchain_community.vectorstores import Chroma |
| from langchain.text_splitter import RecursiveCharacterTextSplitter |
| from langchain.chains import RetrievalQA |
| from langchain_community.llms import OpenAI |
| from PIL import Image |
| from decord import VideoReader, cpu |
| import requests |
|
|
| client = Groq(api_key=os.environ.get("GROQ_API_KEY")) |
| MODEL = 'llama3-groq-70b-8192-tool-use-preview' |
|
|
| |
|
|
| |
| text_model = AutoModel.from_pretrained('openbmb/MiniCPM-V-2', trust_remote_code=True, |
| device_map="auto", dtype=torch.float16) |
| tokenizer = AutoTokenizer.from_pretrained('openbmb/MiniCPM-V-2', trust_remote_code=True) |
|
|
| tts_model = ParlerTTSForConditionalGeneration.from_pretrained("parler-tts/parler-tts-large-v1").to('cuda') |
| tts_tokenizer = AutoTokenizer.from_pretrained("parler-tts/parler-tts-large-v1") |
|
|
| image_model = UNet2DConditionModel.from_config("stabilityai/stable-diffusion-xl-base-1.0", subfolder="unet").to("cuda", torch.float16) |
| image_pipe = StableDiffusionXLPipeline.from_pretrained("stabilityai/stable-diffusion-xl-base-1.0", unet=image_model, torch_dtype=torch.float16, variant="fp16").to("cuda") |
| image_pipe.scheduler = EulerDiscreteScheduler.from_config(image_pipe.scheduler.config, timestep_spacing="trailing") |
|
|
| |
| def play_voice_output(response): |
| description = "Jon's voice is monotone yet slightly fast in delivery, with a very close recording that almost has no background noise." |
| input_ids = tts_tokenizer(description, return_tensors="pt").input_ids.to('cuda') |
| prompt_input_ids = tts_tokenizer(response, return_tensors="pt").input_ids.to('cuda') |
| generation = tts_model.generate(input_ids=input_ids, prompt_input_ids=prompt_input_ids) |
| audio_arr = generation.cpu().numpy().squeeze() |
| sf.write("output.wav", audio_arr, tts_model.config.sampling_rate) |
| return "output.wav" |
|
|
| |
| def web_search(query): |
| api_key = os.environ.get("BING_API_KEY") |
| search_url = "https://api.bing.microsoft.com/v7.0/search" |
| headers = {"Ocp-Apim-Subscription-Key": api_key} |
| params = {"q": query, "textDecorations": True, "textFormat": "HTML"} |
| response = requests.get(search_url, headers=headers, params=params) |
| response.raise_for_status() |
| search_results = response.json() |
| snippets = [result['snippet'] for result in search_results.get('webPages', {}).get('value', [])] |
| return "\n".join(snippets) |
|
|
| |
| def numpy_calculate(code: str) -> str: |
| try: |
| local_dict = {} |
| exec(code, {"np": np}, local_dict) |
| result = local_dict.get("result", "No result found") |
| return str(result) |
| except Exception as e: |
| return f"An error occurred: {str(e)}" |
|
|
| |
| def handle_input(user_prompt, image=None, video=None, audio=None, doc=None): |
| messages = [{"role": "user", "content": user_prompt}] |
|
|
| if audio: |
| transcription = client.audio.transcriptions.create( |
| file=(audio.name, audio.read()), |
| model="whisper-large-v3" |
| ) |
| user_prompt = transcription.text |
|
|
| if doc: |
| |
| response = use_langchain_rag(doc.name, doc.read(), user_prompt) |
| elif image and not video: |
| image = Image.open(image).convert('RGB') |
| messages[0]['content'] = [image, user_prompt] |
| response = text_model.chat(image=None, msgs=messages, tokenizer=tokenizer) |
| elif video: |
| frames = encode_video(video.name) |
| messages[0]['content'] = frames + [user_prompt] |
| response = text_model.chat(image=None, msgs=messages, tokenizer=tokenizer) |
| else: |
| response = client.chat.completions.create( |
| model=MODEL, |
| messages=messages, |
| tools=initialize_tools() |
| ).choices[0].message.content |
|
|
| return response |
|
|
| |
| def use_langchain_rag(file_name, file_content, query): |
| |
| text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=0) |
| docs = text_splitter.create_documents([file_content]) |
|
|
| |
| embeddings = OpenAIEmbeddings() |
| db = Chroma.from_documents(docs, embeddings, persist_directory=".chroma_db") |
|
|
| |
| qa = RetrievalQA.from_chain_type(llm=OpenAI(), chain_type="stuff", retriever=db.as_retriever()) |
|
|
| |
| return qa.run(query) |
|
|
| |
| def encode_video(video_path): |
| MAX_NUM_FRAMES = 64 |
| vr = VideoReader(video_path, ctx=cpu(0)) |
| sample_fps = round(vr.get_avg_fps() / 1) |
| frame_idx = [i for i in range(0, len(vr), sample_fps)] |
| if len(frame_idx) > MAX_NUM_FRAMES: |
| frame_idx = uniform_sample(frame_idx, MAX_NUM_FRAMES) |
| frames = vr.get_batch(frame_idx).asnumpy() |
| frames = [Image.fromarray(v.astype('uint8')) for v in frames] |
| return frames |
|
|
| |
| def initialize_tools(): |
| tools = [ |
| { |
| "type": "function", |
| "function": { |
| "name": "calculate", |
| "description": "Evaluate a mathematical expression", |
| "parameters": { |
| "type": "object", |
| "properties": { |
| "expression": {"type": "string", "description": "The mathematical expression to evaluate"} |
| }, |
| "required": ["expression"] |
| }, |
| } |
| }, |
| { |
| "type": "function", |
| "function": { |
| "name": "web_search", |
| "description": "Perform a web search", |
| "parameters": { |
| "type": "object", |
| "properties": { |
| "query": {"type": "string", "description": "The search query"} |
| }, |
| "required": ["query"] |
| }, |
| "implementation": web_search |
| } |
| }, |
| { |
| "type": "function", |
| "function": { |
| "name": "numpy_calculate", |
| "description": "Execute NumPy-based Python code for calculations", |
| "parameters": { |
| "type": "object", |
| "properties": { |
| "code": {"type": "string", "description": "The Python code with NumPy operations"} |
| }, |
| "required": ["code"] |
| }, |
| "implementation": numpy_calculate |
| } |
| } |
| ] |
| return tools |
| @spaces.GPU() |
| |
| def main_interface(user_prompt, image=None, video=None, audio=None, doc=None, voice_only=False): |
| response = handle_input(user_prompt, image=image, video=video, audio=audio, doc=doc) |
| if voice_only: |
| audio_file = play_voice_output(response) |
| return gr.Audio.update(value=audio_file, visible=True) |
| else: |
| return response |
|
|
| |
| with gr.Blocks() as demo: |
| user_prompt = gr.Textbox(placeholder="Type your message here...", lines=1) |
| image_input = gr.Image(type="file", label="Upload an image") |
| video_input = gr.Video(type="file", label="Upload a video") |
| audio_input = gr.Audio(type="file", label="Upload audio") |
| doc_input = gr.File(type="file", label="Upload a document") |
| voice_only_mode = gr.Checkbox(label="Enable Voice Only Mode") |
| output = gr.Output() |
| |
| submit = gr.Button("Submit") |
| submit.click( |
| fn=main_interface, |
| inputs=[user_prompt, image_input, video_input, audio_input, doc_input, voice_only_mode], |
| outputs=output |
| ) |
|
|
| demo.launch(inline=False) |