Spaces:
Sleeping
Sleeping
| import os | |
| import json | |
| import requests | |
| import re | |
| import tempfile | |
| import torch | |
| import soundfile as sf | |
| from bs4 import BeautifulSoup | |
| from groq import Groq | |
| import gradio as gr | |
| from dotenv import load_dotenv | |
| from youtube_transcript_api import YouTubeTranscriptApi | |
| from transformers import SpeechT5Processor, SpeechT5ForTextToSpeech, SpeechT5HifiGan | |
| load_dotenv() | |
| # API KEYS | |
| GROQ_API_KEY = os.getenv("GROQ_API_KEY") | |
| BRIGHT_API_KEY = os.getenv("BRIGHT_API_KEY") | |
| BRIGHT_ZONE = os.getenv("BRIGHT_ZONE") | |
| client = Groq(api_key=GROQ_API_KEY) | |
| CHAT_FILE = "chat_history.json" | |
| PREF_FILE = "preferences.json" | |
| # SAFE JSON FUNCTIONS | |
| def load_json(file, default): | |
| if os.path.exists(file): | |
| try: | |
| with open(file, "r") as f: | |
| return json.load(f) | |
| except: | |
| return default | |
| return default | |
| def save_json(file, data): | |
| with open(file, "w") as f: | |
| json.dump(data, f, indent=4) | |
| conversation_history = load_json(CHAT_FILE, []) | |
| user_preferences = load_json(PREF_FILE, {"style": "Default"}) | |
| # LOAD TTS MODEL (Version 4) | |
| print("Loading SpeechT5 model...") | |
| processor = SpeechT5Processor.from_pretrained("microsoft/speecht5_tts") | |
| tts_model = SpeechT5ForTextToSpeech.from_pretrained("microsoft/speecht5_tts") | |
| vocoder = SpeechT5HifiGan.from_pretrained("microsoft/speecht5_hifigan") | |
| speaker_embeddings = torch.randn(1, 512) | |
| print("TTS Model Loaded") | |
| # VERSION 1 β WEBSITE SCRAPER | |
| def brightdata_request(target_url): | |
| response = requests.post( | |
| "https://api.brightdata.com/request", | |
| headers={ | |
| "Content-Type": "application/json", | |
| "Authorization": f"Bearer {BRIGHT_API_KEY}" | |
| }, | |
| json={ | |
| "zone": BRIGHT_ZONE, | |
| "url": target_url, | |
| "format": "raw" | |
| } | |
| ) | |
| return response.text | |
| def scrape_goodreads(url): | |
| html = brightdata_request(url) | |
| soup = BeautifulSoup(html, "html.parser") | |
| books = [] | |
| rows = soup.find_all("tr") | |
| for row in rows: | |
| title_tag = row.find("a", class_="bookTitle") | |
| author_tag = row.find("a", class_="authorName") | |
| rating_tag = row.find("span", class_="minirating") | |
| if title_tag and author_tag and rating_tag: | |
| books.append({ | |
| "title": title_tag.get_text(strip=True), | |
| "author": author_tag.get_text(strip=True), | |
| "rating": rating_tag.get_text(strip=True) | |
| }) | |
| return books[:10] | |
| def qa_bot(url, question): | |
| books = scrape_goodreads(url) | |
| if not books: | |
| return "No book data found." | |
| context = "\n".join( | |
| [f"{i+1}. {b['title']} by {b['author']} - {b['rating']}" | |
| for i, b in enumerate(books)] | |
| ) | |
| system_prompt = f""" | |
| You are a helpful assistant. | |
| Answer ONLY using the following scraped Goodreads data. | |
| {context} | |
| """ | |
| response = client.chat.completions.create( | |
| model="llama-3.1-8b-instant", | |
| messages=[ | |
| {"role": "system", "content": system_prompt}, | |
| {"role": "user", "content": question} | |
| ] | |
| ) | |
| return response.choices[0].message.content | |
| # VERSION 2 β YOUTUBE QA | |
| def extract_video_id(url): | |
| pattern = r"(?:v=|youtu\.be/)([a-zA-Z0-9_-]{11})" | |
| match = re.search(pattern, url) | |
| return match.group(1) if match else None | |
| def get_youtube_transcript(url): | |
| video_id = extract_video_id(url) | |
| if not video_id: | |
| return "Invalid YouTube URL." | |
| try: | |
| transcript = YouTubeTranscriptApi.get_transcript(video_id) | |
| full_text = " ".join([entry["text"] for entry in transcript]) | |
| return full_text | |
| except: | |
| return "No transcript available for this video." | |
| def youtube_qa(video_url, question): | |
| transcript = get_youtube_transcript(video_url) | |
| if transcript.startswith("No") or transcript.startswith("Invalid"): | |
| return transcript | |
| system_prompt = f""" | |
| You are a helpful assistant. | |
| Answer ONLY using this transcript. | |
| Transcript: | |
| {transcript[:6000]} | |
| """ | |
| response = client.chat.completions.create( | |
| model="llama-3.1-8b-instant", | |
| messages=[ | |
| {"role": "system", "content": system_prompt}, | |
| {"role": "user", "content": question} | |
| ] | |
| ) | |
| return response.choices[0].message.content | |
| # VERSION 3 β MEMORY CHAT | |
| def chat_with_memory(user_message, preference_text): | |
| global conversation_history, user_preferences | |
| if preference_text and preference_text.strip(): | |
| user_preferences["style"] = preference_text | |
| save_json(PREF_FILE, user_preferences) | |
| system_prompt = f""" | |
| You are a helpful AI assistant. | |
| User Preferences: | |
| {user_preferences.get("style","Default")} | |
| Follow the preferred style in all responses. | |
| Maintain conversational memory. | |
| """ | |
| messages = [{"role": "system", "content": system_prompt}] | |
| messages.extend(conversation_history) | |
| messages.append({"role": "user", "content": user_message}) | |
| response = client.chat.completions.create( | |
| model="llama-3.1-8b-instant", | |
| messages=messages | |
| ) | |
| assistant_reply = response.choices[0].message.content | |
| conversation_history.append({"role": "user", "content": user_message}) | |
| conversation_history.append({"role": "assistant", "content": assistant_reply}) | |
| save_json(CHAT_FILE, conversation_history) | |
| return assistant_reply | |
| def clear_memory(): | |
| global conversation_history | |
| conversation_history = [] | |
| save_json(CHAT_FILE, []) | |
| return [] | |
| # VERSION 4 β VOICE AI | |
| def transcribe_audio(audio_path): | |
| with open(audio_path, "rb") as audio_file: | |
| transcription = client.audio.transcriptions.create( | |
| file=audio_file, | |
| model="whisper-large-v3" | |
| ) | |
| return transcription.text | |
| def text_to_speech(text): | |
| inputs = processor(text=text, return_tensors="pt") | |
| speech = tts_model.generate_speech( | |
| inputs["input_ids"], | |
| speaker_embeddings, | |
| vocoder=vocoder | |
| ) | |
| temp_audio = tempfile.NamedTemporaryFile(delete=False, suffix=".wav") | |
| sf.write(temp_audio.name, speech.numpy(), samplerate=16000) | |
| return temp_audio.name | |
| def process_text(user_message, preference_text, chat_display): | |
| if not user_message.strip(): | |
| return "", chat_display, None | |
| assistant_reply = chat_with_memory(user_message, preference_text) | |
| chat_display.append({"role": "user", "content": user_message}) | |
| chat_display.append({"role": "assistant", "content": assistant_reply}) | |
| audio_output = text_to_speech(assistant_reply) | |
| return "", chat_display, audio_output | |
| def process_voice(audio_file, preference_text, chat_display): | |
| if audio_file is None: | |
| return chat_display, None | |
| user_text = transcribe_audio(audio_file) | |
| assistant_reply = chat_with_memory(user_text, preference_text) | |
| chat_display.append({"role": "user", "content": user_text}) | |
| chat_display.append({"role": "assistant", "content": assistant_reply}) | |
| audio_output = text_to_speech(assistant_reply) | |
| return chat_display, audio_output | |
| # GRADIO UI | |
| with gr.Blocks() as demo: | |
| gr.Markdown("# π Full AI Assistant (V1 + V2 + V3 + V4)") | |
| # TAB 1 | |
| with gr.Tab("π Website Scraper Q&A"): | |
| url_input = gr.Textbox( | |
| label="Enter Goodreads URL", | |
| value="https://www.goodreads.com/list/show/1.Best_Books_Ever" | |
| ) | |
| question_input = gr.Textbox(label="Ask your question") | |
| output1 = gr.Textbox(label="Answer") | |
| btn1 = gr.Button("Submit") | |
| btn1.click( | |
| qa_bot, | |
| inputs=[url_input, question_input], | |
| outputs=output1 | |
| ) | |
| # TAB 2 | |
| with gr.Tab("π₯ YouTube Transcript Q&A"): | |
| video_input = gr.Textbox(label="Enter YouTube URL") | |
| yt_question = gr.Textbox(label="Ask your question") | |
| output2 = gr.Textbox(label="Answer") | |
| btn2 = gr.Button("Submit") | |
| btn2.click( | |
| youtube_qa, | |
| inputs=[video_input, yt_question], | |
| outputs=output2 | |
| ) | |
| # TAB 3 | |
| with gr.Tab("π§ Memory Chatbot"): | |
| chatbot = gr.Chatbot(label="Conversation", value=conversation_history) | |
| preference_input = gr.Textbox( | |
| label="User Preferences (Optional)" | |
| ) | |
| user_message = gr.Textbox(label="Your Message") | |
| send_btn = gr.Button("Send") | |
| clear_btn = gr.Button("Clear Memory") | |
| def chat_interface(user_message, preference_text, chat_display): | |
| if not user_message.strip(): | |
| return "", chat_display | |
| assistant_reply = chat_with_memory(user_message, preference_text) | |
| chat_display.append({"role": "user", "content": user_message}) | |
| chat_display.append({"role": "assistant", "content": assistant_reply}) | |
| return "", chat_display | |
| send_btn.click( | |
| chat_interface, | |
| inputs=[user_message, preference_input, chatbot], | |
| outputs=[user_message, chatbot] | |
| ) | |
| clear_btn.click( | |
| clear_memory, | |
| outputs=chatbot | |
| ) | |
| # TAB 4 | |
| with gr.Tab("ποΈ Voice AI Assistant"): | |
| chatbot_v4 = gr.Chatbot(label="Conversation", value=conversation_history) | |
| preference_input_v4 = gr.Textbox( | |
| label="User Preferences (Optional)" | |
| ) | |
| user_message_v4 = gr.Textbox(label="Type your message") | |
| audio_input = gr.Audio( | |
| sources=["microphone"], | |
| type="filepath", | |
| label="Speak your question" | |
| ) | |
| audio_output = gr.Audio(label="Assistant Voice Response") | |
| send_btn_v4 = gr.Button("Send Text") | |
| voice_btn = gr.Button("Send Voice") | |
| clear_btn_v4 = gr.Button("Clear Memory") | |
| send_btn_v4.click( | |
| process_text, | |
| inputs=[user_message_v4, preference_input_v4, chatbot_v4], | |
| outputs=[user_message_v4, chatbot_v4, audio_output] | |
| ) | |
| voice_btn.click( | |
| process_voice, | |
| inputs=[audio_input, preference_input_v4, chatbot_v4], | |
| outputs=[chatbot_v4, audio_output] | |
| ) | |
| clear_btn_v4.click( | |
| clear_memory, | |
| outputs=chatbot_v4 | |
| ) | |
| if __name__ == "__main__": | |
| demo.launch() |