| import os |
| import json |
| import requests |
| import re |
| import tempfile |
| import torch |
| import soundfile as sf |
|
|
| from bs4 import BeautifulSoup |
| from groq import Groq |
| import gradio as gr |
| from dotenv import load_dotenv |
| from youtube_transcript_api import YouTubeTranscriptApi |
|
|
| from transformers import SpeechT5Processor, SpeechT5ForTextToSpeech, SpeechT5HifiGan |
|
|
| load_dotenv() |
|
|
| |
| GROQ_API_KEY = os.getenv("GROQ_API_KEY") |
| BRIGHT_API_KEY = os.getenv("BRIGHT_API_KEY") |
| BRIGHT_ZONE = os.getenv("BRIGHT_ZONE") |
|
|
| client = Groq(api_key=GROQ_API_KEY) |
|
|
| CHAT_FILE = "chat_history.json" |
| PREF_FILE = "preferences.json" |
|
|
|
|
| |
|
|
| def load_json(file, default): |
| if os.path.exists(file): |
| try: |
| with open(file, "r") as f: |
| return json.load(f) |
| except: |
| return default |
| return default |
|
|
|
|
| def save_json(file, data): |
| with open(file, "w") as f: |
| json.dump(data, f, indent=4) |
|
|
|
|
| conversation_history = load_json(CHAT_FILE, []) |
| user_preferences = load_json(PREF_FILE, {"style": "Default"}) |
|
|
|
|
|
|
| |
|
|
|
|
| print("Loading SpeechT5 model...") |
|
|
| processor = SpeechT5Processor.from_pretrained("microsoft/speecht5_tts") |
| tts_model = SpeechT5ForTextToSpeech.from_pretrained("microsoft/speecht5_tts") |
| vocoder = SpeechT5HifiGan.from_pretrained("microsoft/speecht5_hifigan") |
|
|
| speaker_embeddings = torch.randn(1, 512) |
|
|
| print("TTS Model Loaded") |
|
|
|
|
|
|
| |
|
|
|
|
| def brightdata_request(target_url): |
|
|
| response = requests.post( |
| "https://api.brightdata.com/request", |
| headers={ |
| "Content-Type": "application/json", |
| "Authorization": f"Bearer {BRIGHT_API_KEY}" |
| }, |
| json={ |
| "zone": BRIGHT_ZONE, |
| "url": target_url, |
| "format": "raw" |
| } |
| ) |
|
|
| return response.text |
|
|
|
|
| def scrape_goodreads(url): |
|
|
| html = brightdata_request(url) |
|
|
| soup = BeautifulSoup(html, "html.parser") |
|
|
| books = [] |
|
|
| rows = soup.find_all("tr") |
|
|
| for row in rows: |
|
|
| title_tag = row.find("a", class_="bookTitle") |
| author_tag = row.find("a", class_="authorName") |
| rating_tag = row.find("span", class_="minirating") |
|
|
| if title_tag and author_tag and rating_tag: |
|
|
| books.append({ |
| "title": title_tag.get_text(strip=True), |
| "author": author_tag.get_text(strip=True), |
| "rating": rating_tag.get_text(strip=True) |
| }) |
|
|
| return books[:10] |
|
|
|
|
| def qa_bot(url, question): |
|
|
| books = scrape_goodreads(url) |
|
|
| if not books: |
| return "No book data found." |
|
|
| context = "\n".join( |
| [f"{i+1}. {b['title']} by {b['author']} - {b['rating']}" |
| for i, b in enumerate(books)] |
| ) |
|
|
| system_prompt = f""" |
| You are a helpful assistant. |
| Answer ONLY using the following scraped Goodreads data. |
| |
| {context} |
| """ |
|
|
| response = client.chat.completions.create( |
| model="llama-3.1-8b-instant", |
| messages=[ |
| {"role": "system", "content": system_prompt}, |
| {"role": "user", "content": question} |
| ] |
| ) |
|
|
| return response.choices[0].message.content |
|
|
|
|
|
|
| |
|
|
|
|
| def extract_video_id(url): |
|
|
| pattern = r"(?:v=|youtu\.be/)([a-zA-Z0-9_-]{11})" |
|
|
| match = re.search(pattern, url) |
|
|
| return match.group(1) if match else None |
|
|
|
|
| def get_youtube_transcript(url): |
|
|
| video_id = extract_video_id(url) |
|
|
| if not video_id: |
| return "Invalid YouTube URL." |
|
|
| try: |
| transcript = YouTubeTranscriptApi.get_transcript(video_id) |
|
|
| full_text = " ".join([entry["text"] for entry in transcript]) |
|
|
| return full_text |
|
|
| except: |
| return "No transcript available for this video." |
|
|
|
|
| def youtube_qa(video_url, question): |
|
|
| transcript = get_youtube_transcript(video_url) |
|
|
| if transcript.startswith("No") or transcript.startswith("Invalid"): |
| return transcript |
|
|
| system_prompt = f""" |
| You are a helpful assistant. |
| Answer ONLY using this transcript. |
| |
| Transcript: |
| {transcript[:6000]} |
| """ |
|
|
| response = client.chat.completions.create( |
| model="llama-3.1-8b-instant", |
| messages=[ |
| {"role": "system", "content": system_prompt}, |
| {"role": "user", "content": question} |
| ] |
| ) |
|
|
| return response.choices[0].message.content |
|
|
|
|
|
|
| |
|
|
|
|
| def chat_with_memory(user_message, preference_text): |
|
|
| global conversation_history, user_preferences |
|
|
| if preference_text and preference_text.strip(): |
|
|
| user_preferences["style"] = preference_text |
|
|
| save_json(PREF_FILE, user_preferences) |
|
|
| system_prompt = f""" |
| You are a helpful AI assistant. |
| |
| User Preferences: |
| {user_preferences.get("style","Default")} |
| |
| Follow the preferred style in all responses. |
| Maintain conversational memory. |
| """ |
|
|
| messages = [{"role": "system", "content": system_prompt}] |
|
|
| messages.extend(conversation_history) |
|
|
| messages.append({"role": "user", "content": user_message}) |
|
|
| response = client.chat.completions.create( |
| model="llama-3.1-8b-instant", |
| messages=messages |
| ) |
|
|
| assistant_reply = response.choices[0].message.content |
|
|
| conversation_history.append({"role": "user", "content": user_message}) |
|
|
| conversation_history.append({"role": "assistant", "content": assistant_reply}) |
|
|
| save_json(CHAT_FILE, conversation_history) |
|
|
| return assistant_reply |
|
|
|
|
| def clear_memory(): |
|
|
| global conversation_history |
|
|
| conversation_history = [] |
|
|
| save_json(CHAT_FILE, []) |
|
|
| return [] |
|
|
|
|
|
|
| |
|
|
|
|
| def transcribe_audio(audio_path): |
|
|
| with open(audio_path, "rb") as audio_file: |
|
|
| transcription = client.audio.transcriptions.create( |
| file=audio_file, |
| model="whisper-large-v3" |
| ) |
|
|
| return transcription.text |
|
|
|
|
| def text_to_speech(text): |
|
|
| inputs = processor(text=text, return_tensors="pt") |
|
|
| speech = tts_model.generate_speech( |
| inputs["input_ids"], |
| speaker_embeddings, |
| vocoder=vocoder |
| ) |
|
|
| temp_audio = tempfile.NamedTemporaryFile(delete=False, suffix=".wav") |
|
|
| sf.write(temp_audio.name, speech.numpy(), samplerate=16000) |
|
|
| return temp_audio.name |
|
|
|
|
| def process_text(user_message, preference_text, chat_display): |
|
|
| if not user_message.strip(): |
| return "", chat_display, None |
|
|
| assistant_reply = chat_with_memory(user_message, preference_text) |
|
|
| chat_display.append({"role": "user", "content": user_message}) |
| chat_display.append({"role": "assistant", "content": assistant_reply}) |
|
|
| audio_output = text_to_speech(assistant_reply) |
|
|
| return "", chat_display, audio_output |
|
|
|
|
| def process_voice(audio_file, preference_text, chat_display): |
|
|
| if audio_file is None: |
| return chat_display, None |
|
|
| user_text = transcribe_audio(audio_file) |
|
|
| assistant_reply = chat_with_memory(user_text, preference_text) |
|
|
| chat_display.append({"role": "user", "content": user_text}) |
| chat_display.append({"role": "assistant", "content": assistant_reply}) |
|
|
| audio_output = text_to_speech(assistant_reply) |
|
|
| return chat_display, audio_output |
|
|
|
|
|
|
| |
|
|
|
|
| with gr.Blocks() as demo: |
|
|
| gr.Markdown("# π Full AI Assistant (V1 + V2 + V3 + V4)") |
|
|
| |
| with gr.Tab("π Website Scraper Q&A"): |
|
|
| url_input = gr.Textbox( |
| label="Enter Goodreads URL", |
| value="https://www.goodreads.com/list/show/1.Best_Books_Ever" |
| ) |
|
|
| question_input = gr.Textbox(label="Ask your question") |
|
|
| output1 = gr.Textbox(label="Answer") |
|
|
| btn1 = gr.Button("Submit") |
|
|
| btn1.click( |
| qa_bot, |
| inputs=[url_input, question_input], |
| outputs=output1 |
| ) |
|
|
| |
| with gr.Tab("π₯ YouTube Transcript Q&A"): |
|
|
| video_input = gr.Textbox(label="Enter YouTube URL") |
|
|
| yt_question = gr.Textbox(label="Ask your question") |
|
|
| output2 = gr.Textbox(label="Answer") |
|
|
| btn2 = gr.Button("Submit") |
|
|
| btn2.click( |
| youtube_qa, |
| inputs=[video_input, yt_question], |
| outputs=output2 |
| ) |
|
|
| |
| with gr.Tab("π§ Memory Chatbot"): |
|
|
| chatbot = gr.Chatbot(label="Conversation", value=conversation_history) |
|
|
| preference_input = gr.Textbox( |
| label="User Preferences (Optional)" |
| ) |
|
|
| user_message = gr.Textbox(label="Your Message") |
|
|
| send_btn = gr.Button("Send") |
|
|
| clear_btn = gr.Button("Clear Memory") |
|
|
| def chat_interface(user_message, preference_text, chat_display): |
|
|
| if not user_message.strip(): |
| return "", chat_display |
|
|
| assistant_reply = chat_with_memory(user_message, preference_text) |
|
|
| chat_display.append({"role": "user", "content": user_message}) |
|
|
| chat_display.append({"role": "assistant", "content": assistant_reply}) |
|
|
| return "", chat_display |
|
|
| send_btn.click( |
| chat_interface, |
| inputs=[user_message, preference_input, chatbot], |
| outputs=[user_message, chatbot] |
| ) |
|
|
| clear_btn.click( |
| clear_memory, |
| outputs=chatbot |
| ) |
|
|
| |
| with gr.Tab("ποΈ Voice AI Assistant"): |
|
|
| chatbot_v4 = gr.Chatbot(label="Conversation", value=conversation_history) |
|
|
| preference_input_v4 = gr.Textbox( |
| label="User Preferences (Optional)" |
| ) |
|
|
| user_message_v4 = gr.Textbox(label="Type your message") |
|
|
| audio_input = gr.Audio( |
| sources=["microphone"], |
| type="filepath", |
| label="Speak your question" |
| ) |
|
|
| audio_output = gr.Audio(label="Assistant Voice Response") |
|
|
| send_btn_v4 = gr.Button("Send Text") |
|
|
| voice_btn = gr.Button("Send Voice") |
|
|
| clear_btn_v4 = gr.Button("Clear Memory") |
|
|
| send_btn_v4.click( |
| process_text, |
| inputs=[user_message_v4, preference_input_v4, chatbot_v4], |
| outputs=[user_message_v4, chatbot_v4, audio_output] |
| ) |
|
|
| voice_btn.click( |
| process_voice, |
| inputs=[audio_input, preference_input_v4, chatbot_v4], |
| outputs=[chatbot_v4, audio_output] |
| ) |
|
|
| clear_btn_v4.click( |
| clear_memory, |
| outputs=chatbot_v4 |
| ) |
|
|
|
|
| if __name__ == "__main__": |
|
|
| demo.launch() |