Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| import torch | |
| import os | |
| import requests ,json | |
| from dotenv import load_dotenv | |
| import torch.nn.functional as F | |
| from transformers import AutoTokenizer, AutoModel, WhisperProcessor, WhisperForConditionalGeneration | |
| import re | |
| import librosa | |
| device = ( | |
| torch.device("mps") if torch.backends.mps.is_available() else | |
| torch.device("cuda") if torch.cuda.is_available() else | |
| torch.device("cpu") | |
| ) | |
| tokenizer = AutoTokenizer.from_pretrained("sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2") | |
| model = AutoModel.from_pretrained("sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2") | |
| model = model.to(device) | |
| model_id = "openai/whisper-small" | |
| processor_transcribe = WhisperProcessor.from_pretrained(model_id) | |
| model_transcribe = WhisperForConditionalGeneration.from_pretrained(model_id) | |
| load_dotenv() | |
| secret_key = os.getenv("MY_SECRET_KEY") | |
| invoke_url = os.getenv("invoke_url") | |
| stream = True | |
| headers = { | |
| "Authorization": secret_key, | |
| "Accept": "text/event-stream" if stream else "application/json" | |
| } | |
| def gen_llm(text): | |
| payload = { | |
| "model": "meta/llama-4-maverick-17b-128e-instruct", | |
| "messages": [{"role":"user","content":text}], | |
| "max_tokens": 512, | |
| "temperature": 1.00, | |
| "top_p": 1.00, | |
| "frequency_penalty": 0.00, | |
| "presence_penalty": 0.00, | |
| "stream": stream | |
| } | |
| response = requests.post(invoke_url, headers=headers, json=payload, stream=stream) | |
| joined_text = [] | |
| full_text = [] | |
| if stream: | |
| for line in response.iter_lines(): | |
| if not line or not line.strip(): | |
| continue # skip empty lines | |
| try: | |
| # Decode bytes to string | |
| line_str = line.decode('utf-8') | |
| # Some streaming APIs prefix lines with "data: " | |
| if line_str.startswith("data: "): | |
| line_str = line_str[6:] | |
| # Skip keep-alive pings or [DONE] markers | |
| if line_str.strip() == "[DONE]": | |
| break | |
| # Parse JSON and extract content | |
| line_dict = json.loads(line_str) | |
| content = line_dict["choices"][0]["delta"].get("content") | |
| if content is not None: | |
| # print(content) | |
| full_text.append(content) | |
| except json.JSONDecodeError as e: | |
| return (f"JSON decode error: {e}") | |
| except (KeyError, IndexError, TypeError) as e: | |
| return (f"Malformed line or missing fields: {e}") | |
| else: | |
| return (response.json()) | |
| joined_text = ''.join(full_text) | |
| return joined_text | |
| def embed_text(texts): | |
| inputs = tokenizer(texts, return_tensors='pt', padding=True, truncation=True) | |
| inputs = {key: val.to(device) for key, val in inputs.items()} | |
| with torch.no_grad(): | |
| outputs = model(**inputs) | |
| embeddings = outputs.last_hidden_state[:, 0, :] # [CLS] token | |
| embeddings = F.normalize(embeddings, p=2, dim=1) # Normalize for cosine similarity | |
| return embeddings | |
| def check_and_read_txt(file): | |
| if file is None: | |
| return gr.update(visible=False), "" | |
| try: | |
| with open(file.name, 'r') as f: | |
| content = f.read() | |
| corpus_list = [s.strip() for s in re.split(r'\.\s+', content) if s.strip()] | |
| global corpus | |
| corpus = corpus_list[0].split('\n') | |
| corpus_embeddings = embed_text(corpus) | |
| global state_global | |
| state_global = corpus_embeddings | |
| return gr.update(visible=True), content, gr.update(visible=True), gr.update(visible=True), gr.update(visible=True), gr.update(visible=True), gr.update(visible=True), gr.update(visible=True) | |
| except Exception as e: | |
| return gr.update(visible=True), f"Error reading file: {str(e)}", gr.update(visible=True), gr.update(visible=True), gr.update(visible=True), gr.update(visible=True), gr.update(visible=True), gr.update(visible=True) | |
| def save_text(input_text): | |
| corpus_list = [s.strip() for s in re.split(r'\.\s+', input_text) if s.strip()] | |
| global corpus | |
| corpus = corpus_list[0].split('\n') | |
| corpus_embeddings = embed_text(corpus) | |
| global state_global | |
| state_global = corpus_embeddings | |
| return input_text | |
| transcribe_list = ['english', 'chinese', 'german', 'spanish', 'russian', 'korean', 'french', 'japanese', 'portuguese', 'turkish', 'polish', 'catalan', 'dutch', 'arabic', 'swedish', 'italian', 'indonesian', 'hindi', 'finnish', 'vietnamese', 'hebrew', 'ukrainian', 'greek', 'malay', 'czech', 'romanian', 'danish', 'hungarian', 'tamil', 'norwegian', 'thai', 'urdu', 'croatian', 'bulgarian', 'lithuanian', 'latin', 'maori', 'malayalam', 'welsh', 'slovak', 'telugu', 'persian', 'latvian', 'bengali', 'serbian', 'azerbaijani', 'slovenian', 'kannada', 'estonian', 'macedonian', 'breton', 'basque', 'icelandic', 'armenian', 'nepali', 'mongolian', 'bosnian', 'kazakh', 'albanian', 'swahili', 'galician', 'marathi', 'punjabi', 'sinhala', 'khmer', 'shona', 'yoruba', 'somali', 'afrikaans', 'occitan', 'georgian', 'belarusian', 'tajik', 'sindhi', 'gujarati', 'amharic', 'yiddish', 'lao', 'uzbek', 'faroese', 'haitian creole', 'pashto', 'turkmen', 'nynorsk', 'maltese', 'sanskrit', 'luxembourgish', 'myanmar', 'tibetan', 'tagalog', 'malagasy', 'assamese', 'tatar', 'hawaiian', 'lingala', 'hausa', 'bashkir', 'javanese', 'sundanese', 'cantonese', 'burmese', 'valencian', 'flemish', 'haitian', 'letzeburgesch', 'pushto', 'panjabi', 'moldavian', 'moldovan', 'sinhalese', 'castilian', 'mandarin'] | |
| global selected_lang | |
| selected_lang = "english" | |
| def transcribe(audio): | |
| if audio is None: | |
| return "No audio recorded." | |
| global selected_lang | |
| print("selected_lang",selected_lang) | |
| forced_decoder_ids = processor_transcribe.get_decoder_prompt_ids(language=selected_lang, task="transcribe") | |
| audio_array, sampling_rate = librosa.load(audio, sr=16000) | |
| inputs = processor_transcribe(audio_array, sampling_rate=16000, return_tensors="pt") | |
| input_features = inputs.input_features.to(model_transcribe.device) | |
| predicted_ids = model_transcribe.generate(input_features, forced_decoder_ids=forced_decoder_ids) | |
| transcription = processor_transcribe.batch_decode(predicted_ids, skip_special_tokens=True)[0] | |
| return transcription | |
| def select_lang(language): | |
| global selected_lang | |
| selected_lang = language | |
| chatbot = gr.Chatbot(label="💬 Chat",type="messages", visible=False) | |
| chat_input = gr.Textbox(label="Enter text here:", visible=False) | |
| file_input = gr.File(label="Upload File", file_types=[".txt", "*"]) | |
| output_text = gr.Textbox(label="Result", lines=20, interactive=True, visible=False) | |
| btn = gr.Button("Update Information", visible=False) | |
| transcribe_button = gr.Button("Transcribe", visible=False) | |
| transcribe_dropdown = gr.Dropdown(choices=transcribe_list, label="Transcibe Language", visible=False) | |
| audio_input = gr.Audio(sources="microphone", type="filepath", label="Record Audio", visible=False) | |
| state_text = gr.State("") | |
| global state_global | |
| state_global = "" | |
| global corpus | |
| corpus = [] | |
| with gr.Blocks() as demo: | |
| # gr.Markdown("### Upload a file to check if it's a .txt file") | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| file_input.render() | |
| output_text.render() | |
| btn.render() | |
| file_input.change( | |
| fn=check_and_read_txt, | |
| inputs=file_input, | |
| outputs=[output_text, output_text, btn, chatbot, chat_input, transcribe_button,transcribe_dropdown,audio_input] | |
| ) | |
| btn.click( | |
| fn=save_text, | |
| inputs=output_text, | |
| outputs=state_text | |
| ) | |
| with gr.Column(scale=2): | |
| chatbot.render() | |
| chat_input.render() | |
| with gr.Row(): | |
| audio_input.render() | |
| transcribe_button.render() | |
| transcribe_button.click(fn=transcribe, inputs=audio_input, outputs=chat_input) | |
| transcribe_dropdown.render() | |
| transcribe_dropdown.change(fn=select_lang, inputs=transcribe_dropdown) | |
| def respond(message, history): | |
| if history is None: | |
| history = [] | |
| query_embedding = embed_text([message]) | |
| cosine_scores = torch.matmul(query_embedding, state_global.T).squeeze() | |
| top_k_indices = torch.topk(cosine_scores, k=3).indices | |
| context = [corpus[i] for i in top_k_indices] | |
| prompt = ( | |
| f"Use the following information to answer the user's question : '{context}'\n" | |
| f"This is chat history : {history}\n" | |
| f"Then answer this question : '{message}'\n" | |
| f"Respond clearly and helpfully as a assistant. Keep your response focused and informative, but not overly brief. No need to explain how you got the answer.\n" | |
| f"Answer in the same language as the user input. If the answer is not in the context, say something like: " | |
| f"'I do not quite understand. Could you please try rephrasing or describing it differently?'\n" | |
| ) | |
| # print(prompt) | |
| bot_ans = gen_llm(prompt) | |
| history.append({"role": "user", "content": message}) | |
| history.append({"role": "assistant", "content": bot_ans}) | |
| return history, "" # Return history and clear input | |
| chat_input.submit(fn=respond, inputs=[chat_input, chatbot], outputs=[chatbot, chat_input]) | |
| if __name__ == "__main__": | |
| demo.launch() | |