Spaces:
Sleeping
Sleeping
| import requests | |
| from bs4 import BeautifulSoup | |
| import os | |
| import json | |
| from datetime import datetime | |
| from groq import Groq | |
| import gradio as gr | |
| # ----------------------------------------------- | |
| # LOAD API KEYS | |
| # ----------------------------------------------- | |
| # load_dotenv() # Remove / comment this line when deploying to Hugging Face | |
| BRIGHTDATA_API_KEY = os.getenv("BRIGHTDATA_API_KEY") | |
| GROQ_API_KEY = os.getenv("GROQ_API_KEY") | |
| client = Groq(api_key=GROQ_API_KEY) | |
| # ----------------------------------------------- | |
| # PERSISTENT STORAGE β JSON Files | |
| # ----------------------------------------------- | |
| HISTORY_FILE = "chat_history.json" | |
| PREFERENCES_FILE = "user_preferences.json" | |
| def load_chat_history(): | |
| if not os.path.exists(HISTORY_FILE): | |
| return [] | |
| try: | |
| with open(HISTORY_FILE, "r", encoding="utf-8") as f: | |
| data = json.load(f) | |
| return data if isinstance(data, list) else [] | |
| except Exception as e: | |
| print(f"[Storage] Could not load history: {e}") | |
| return [] | |
| def save_chat_history(history): | |
| try: | |
| with open(HISTORY_FILE, "w", encoding="utf-8") as f: | |
| json.dump(history, f, indent=2, ensure_ascii=False) | |
| except Exception as e: | |
| print(f"[Storage] Could not save history: {e}") | |
| def load_preferences(): | |
| default_prefs = { | |
| "tone": "friendly", | |
| "language": "English", | |
| "format": "concise", | |
| "custom_rules": "" | |
| } | |
| if not os.path.exists(PREFERENCES_FILE): | |
| return default_prefs | |
| try: | |
| with open(PREFERENCES_FILE, "r", encoding="utf-8") as f: | |
| data = json.load(f) | |
| return data if isinstance(data, dict) else default_prefs | |
| except Exception as e: | |
| print(f"[Storage] Could not load preferences: {e}") | |
| return default_prefs | |
| def save_preferences(prefs): | |
| try: | |
| with open(PREFERENCES_FILE, "w", encoding="utf-8") as f: | |
| json.dump(prefs, f, indent=2, ensure_ascii=False) | |
| return "β Preferences saved!" | |
| except Exception as e: | |
| return f"β Could not save: {str(e)}" | |
| # Load at startup | |
| chat_history_store = load_chat_history() | |
| user_preferences = load_preferences() | |
| print(f"[Startup] Loaded {len(chat_history_store)} past messages") | |
| print(f"[Startup] Preferences: {user_preferences}") | |
| # ================================================================ | |
| # GOODREADS SCRAPER | |
| # ================================================================ | |
| def get_books(): | |
| if not BRIGHTDATA_API_KEY: | |
| return [], "β BRIGHTDATA_API_KEY not found!" | |
| headers = {"Authorization": f"Bearer {BRIGHTDATA_API_KEY}", "Content-Type": "application/json"} | |
| data = {"zone": "web_unlocker1", "url": "https://www.goodreads.com/list/show/1.Best_Books_Ever", "format": "raw"} | |
| try: | |
| response = requests.post("https://api.brightdata.com/request", json=data, headers=headers, timeout=30) | |
| if response.status_code != 200: | |
| return [], f"β Scraping failed! Status: {response.status_code}" | |
| soup = BeautifulSoup(response.text, "html.parser") | |
| books = soup.select("tr[itemtype='http://schema.org/Book']") | |
| if not books: | |
| return [], "β οΈ No books found." | |
| results = [] | |
| for i, book in enumerate(books[:20], start=1): | |
| try: | |
| title = book.select_one("a.bookTitle span").text.strip() | |
| author = book.select_one("a.authorName span").text.strip() | |
| rating = book.select_one("span.minirating").text.strip() | |
| results.append({"rank": i, "title": title, "author": author, "rating": rating}) | |
| except: | |
| pass | |
| return results, f"β Scraped {len(results)} books!" | |
| except Exception as e: | |
| return [], f"β Error: {str(e)}" | |
| def format_books_as_text(books): | |
| if not books: | |
| return "No book data available." | |
| return "\n".join([f"Rank #{b['rank']}: \"{b['title']}\" by {b['author']} β Rating: {b['rating']}" for b in books]) | |
| print("π Scraping Goodreads...") | |
| books_data, scrape_status = get_books() | |
| print(scrape_status) | |
| # ================================================================ | |
| # YOUTUBE TRANSCRIPT | |
| # ================================================================ | |
| def get_youtube_transcript(video_id: str): | |
| video_id = video_id.strip() | |
| if "youtube.com/watch" in video_id: | |
| video_id = video_id.split("v=")[-1].split("&")[0] | |
| elif "youtu.be/" in video_id: | |
| video_id = video_id.split("youtu.be/")[-1].split("?")[0] | |
| if not video_id: | |
| return None, "β Please enter a valid YouTube Video ID or URL." | |
| try: | |
| from youtube_transcript_api import YouTubeTranscriptApi | |
| try: | |
| ytt = YouTubeTranscriptApi() | |
| fetched = ytt.fetch(video_id) | |
| full_text = " ".join([s.text for s in fetched]) | |
| except Exception: | |
| fetched = YouTubeTranscriptApi.get_transcript(video_id) | |
| full_text = " ".join([s["text"] for s in fetched]) | |
| if full_text.strip(): | |
| return full_text, f"β Transcript fetched! ({len(full_text)} chars)" | |
| return None, "β οΈ Transcript empty or not available." | |
| except Exception as e: | |
| return None, f"β Error: {str(e)}" | |
| current_transcript = {"text": None, "status": "No transcript loaded."} | |
| def load_transcript(video_id): | |
| global current_transcript | |
| text, status = get_youtube_transcript(video_id) | |
| current_transcript["text"] = text | |
| current_transcript["status"] = status | |
| if text: | |
| preview = text[:600] + "..." if len(text) > 600 else text | |
| return status, f"**π Preview:**\n\n_{preview}_" | |
| return status, "No preview available." | |
| # ================================================================ | |
| # PREFERENCES + PERSISTENT CHAT HELPERS | |
| # ================================================================ | |
| def build_system_prompt(base_prompt, prefs): | |
| pref_text = f"""USER PREFERENCES (follow these always): | |
| - Tone: {prefs.get('tone', 'friendly')} | |
| - Language: {prefs.get('language', 'English')} | |
| - Response Format: {prefs.get('format', 'concise')}""" | |
| custom = prefs.get("custom_rules", "").strip() | |
| if custom: | |
| pref_text += f"\n- Custom Rules: {custom}" | |
| return base_prompt + pref_text | |
| def convert_history_for_display(history_store): | |
| return [{"role": item["role"], "content": item["content"]} for item in history_store] | |
| def ask_main_ai(message, history): | |
| global chat_history_store, user_preferences | |
| if not GROQ_API_KEY: | |
| return "β GROQ_API_KEY not set." | |
| base_prompt = """You are a smart, helpful AI assistant with memory of past conversations. | |
| You help users with general questions, book recommendations, research, and more. | |
| Always maintain context from previous messages.""" | |
| system = build_system_prompt(base_prompt, user_preferences) | |
| messages = [{"role": "system", "content": system}] | |
| for item in chat_history_store: | |
| messages.append({"role": item["role"], "content": item["content"]}) | |
| for item in history: | |
| if isinstance(item, dict): | |
| if item not in chat_history_store: | |
| messages.append({"role": item["role"], "content": item["content"]}) | |
| else: | |
| messages.append({"role": "user", "content": item[0]}) | |
| messages.append({"role": "assistant", "content": item[1]}) | |
| messages.append({"role": "user", "content": message}) | |
| try: | |
| response = client.chat.completions.create( | |
| model="llama-3.3-70b-versatile", | |
| messages=messages, | |
| temperature=0.6, | |
| max_tokens=1024 | |
| ) | |
| reply = response.choices[0].message.content | |
| chat_history_store.append({"role": "user", "content": message}) | |
| chat_history_store.append({"role": "assistant", "content": reply}) | |
| save_chat_history(chat_history_store) | |
| return reply | |
| except Exception as e: | |
| return f"β AI Error: {str(e)}" | |
| def ask_books_ai(message, history): | |
| if not GROQ_API_KEY: | |
| return "β GROQ_API_KEY not set." | |
| if not books_data: | |
| return f"β οΈ No book data. Status: {scrape_status}" | |
| base = """You are a smart and friendly book assistant named BookBot π. | |
| Books data from Goodreads: | |
| {context} | |
| RULES: Only answer from this data. Be friendly and concise.""" | |
| system = build_system_prompt(base.format(context=format_books_as_text(books_data)), user_preferences) | |
| messages = [{"role": "system", "content": system}] | |
| for item in history: | |
| if isinstance(item, dict): | |
| messages.append({"role": item["role"], "content": item["content"]}) | |
| else: | |
| messages.append({"role": "user", "content": item[0]}) | |
| messages.append({"role": "assistant", "content": item[1]}) | |
| messages.append({"role": "user", "content": message}) | |
| try: | |
| resp = client.chat.completions.create( | |
| model="llama-3.1-8b-instant", | |
| messages=messages, | |
| temperature=0.5, | |
| max_tokens=1024 | |
| ) | |
| return resp.choices[0].message.content | |
| except Exception as e: | |
| return f"β AI Error: {str(e)}" | |
| def ask_youtube_ai(message, history): | |
| if not GROQ_API_KEY: | |
| return "β GROQ_API_KEY not set." | |
| transcript = current_transcript.get("text") | |
| if not transcript: | |
| return "β οΈ No transcript loaded. Enter a YouTube Video ID and click 'Load Transcript' first." | |
| base = "You are a helpful assistant answering ONLY from this transcript:\n{transcript}\nRULES: Only use transcript info. Be concise." | |
| system = build_system_prompt(base.format(transcript=transcript[:6000]), user_preferences) | |
| messages = [{"role": "system", "content": system}] | |
| for item in history: | |
| if isinstance(item, dict): | |
| messages.append({"role": item["role"], "content": item["content"]}) | |
| else: | |
| messages.append({"role": "user", "content": item[0]}) | |
| messages.append({"role": "assistant", "content": item[1]}) | |
| messages.append({"role": "user", "content": message}) | |
| try: | |
| resp = client.chat.completions.create( | |
| model="llama-3.1-8b-instant", | |
| messages=messages, | |
| temperature=0.4, | |
| max_tokens=1024 | |
| ) | |
| return resp.choices[0].message.content | |
| except Exception as e: | |
| return f"β AI Error: {str(e)}" | |
| def clear_history(): | |
| global chat_history_store | |
| chat_history_store = [] | |
| save_chat_history([]) | |
| return "ποΈ Chat history cleared!" | |
| # ================================================================ | |
| # GRADIO UI β Now only 3 tabs | |
| # ================================================================ | |
| initial_display = convert_history_for_display(chat_history_store) | |
| with gr.Blocks(title="AI Assistant β Version 3") as demo: | |
| gr.Markdown(""" | |
| # AI Assistant β Version 3 | |
| Persistent memory β’ Goodreads + YouTube Q&A | |
| Powered by Groq β’ MSDSF25M011-ver3 | |
| """) | |
| with gr.Tabs(): | |
| with gr.Tab("π¬ Main Chatbot"): | |
| gr.Markdown("General chat with memory across sessions") | |
| main_chatbot = gr.ChatInterface( | |
| fn=ask_main_ai, | |
| examples=[ | |
| "What did we talk about last time?", | |
| "Recommend some good sci-fi books", | |
| "Summarize our conversation so far" | |
| ], | |
| chatbot=gr.Chatbot( | |
| value=initial_display, | |
| height=500, | |
| placeholder="Ask anything β I remember past chats!" | |
| ), | |
| textbox=gr.Textbox(placeholder="Your message...", scale=7), | |
| ) | |
| with gr.Row(): | |
| clear_btn = gr.Button("ποΈ Clear History", variant="stop") | |
| clear_status = gr.Textbox(label="Status", interactive=False, scale=3) | |
| clear_btn.click(fn=clear_history, outputs=clear_status) | |
| #gr.Markdown(f"*πΎ {len(chat_history_store)} messages loaded from storage*") | |
| with gr.Tab("π Goodreads Q&A"): | |
| gr.Markdown("Ask anything about the Goodreads Best Books list") | |
| gr.ChatInterface( | |
| fn=ask_books_ai, | |
| examples=[ | |
| "What is the top-rated book?", | |
| "Who wrote the second book?", | |
| "List top 5 books with authors", | |
| "Which book has highest rating?" | |
| ], | |
| chatbot=gr.Chatbot(height=480, placeholder="Ask about books!"), | |
| textbox=gr.Textbox(placeholder="Your question...", scale=7), | |
| ) | |
| gr.Markdown(f"**Books loaded:** {scrape_status}") | |
| with gr.Tab("π¬ YouTube Q&A"): | |
| gr.Markdown("### Load Video Transcript") | |
| with gr.Row(): | |
| video_id_input = gr.Textbox(placeholder="dQw4w9WgXcQ or full URL", label="YouTube Video ID / URL", scale=4) | |
| load_btn = gr.Button("π₯ Load", variant="primary", scale=1) | |
| transcript_status = gr.Textbox(label="Status", interactive=False) | |
| transcript_preview = gr.Markdown("Preview will appear here...") | |
| load_btn.click(fn=load_transcript, inputs=[video_id_input], outputs=[transcript_status, transcript_preview]) | |
| gr.Markdown("### Ask About the Video") | |
| gr.ChatInterface( | |
| fn=ask_youtube_ai, | |
| examples=["What is the main topic?", "Summarize in 3 bullets", "Key points?"], | |
| chatbot=gr.Chatbot(height=450, placeholder="Load transcript first!"), | |
| textbox=gr.Textbox(placeholder="Your question...", scale=7), | |
| ) | |
| gr.Markdown("---\nVersion 3 β’ Persistent memory β’ Ready to submit") | |
| demo.launch() |