Spaces:
Sleeping
Sleeping
| import json | |
| from openai import OpenAI | |
| import streamlit as st | |
| from dotenv import load_dotenv | |
| import os | |
| from supabase_db import DatabaseConnection | |
| from pytube import Search, YouTube | |
| import psycopg2 | |
| from datetime import datetime | |
| from psycopg2 import OperationalError, sql | |
| from decimal import Decimal | |
| from googleapiclient.discovery import build | |
| from googleapiclient.errors import HttpError | |
| # Use only API key without ADC authentication | |
| YOUTUBE_API_KEY = os.environ.get("YOUTUBE_API_KEY") | |
| def get_youtube_service(): | |
| try: | |
| youtube = build("youtube", "v3", developerKey=YOUTUBE_API_KEY, cache_discovery=False) | |
| return youtube | |
| except HttpError as e: | |
| print(f"β An error occurred: {e}") | |
| return None | |
| # β Load environment variables | |
| load_dotenv() | |
| # β Initialize LLM client | |
| client = OpenAI(api_key=os.environ.get("OPENAI_API_KEY")) | |
| MODEL = "gpt-4o-mini" | |
| # β Define Tools with Improved Names and Descriptions | |
| TOOLS = [ | |
| { | |
| "type": "function", | |
| "function": { | |
| "name": "execute_sql_query", | |
| "description": "Executes SQL queries on the 'youtube_videos' table to retrieve trending video insights.", | |
| "parameters": { | |
| "type": "object", | |
| "properties": { | |
| "query": { | |
| "type": "string", | |
| "description": "The SQL query string to execute on the database." | |
| }, | |
| }, | |
| "required": ["query"] | |
| } | |
| } | |
| }, | |
| { | |
| "type": "function", | |
| "function": { | |
| "name": "fetch_top_comments", | |
| "description": "Fetches top trending comments and their YouTube links.", | |
| "parameters": { | |
| "type": "object", | |
| "properties": { | |
| "thumbnail_links": { | |
| "type": "array", | |
| "items": {"type": "string"}, | |
| "description": "An array of video IDs to fetch comments from." | |
| } | |
| }, | |
| "required": ["thumbnail_links"] | |
| } | |
| } | |
| } | |
| ] | |
| # β Initialize session state properly | |
| if "messages" not in st.session_state: | |
| st.session_state.messages = [] | |
| def execute_sql_query(query: str): | |
| print(query) | |
| try: | |
| db_instance = DatabaseConnection() | |
| conn = db_instance.get_connection() | |
| if conn: | |
| with conn.cursor() as cur: | |
| cur.execute(query) | |
| results = cur.fetchall() | |
| res = "Here are the results based on your query. Please answer the previous question using this context:\n" + \ | |
| str(results) | |
| print(res) | |
| return res | |
| else: | |
| raise Exception("Database connection error") | |
| except Exception as e: | |
| return f"""An error occurred while querying the database {e}. Tell a general message to try again later.""" | |
| def fetch_top_comments(thumbnail_links: list) -> str: | |
| """Fetches top comments from YouTube using the YouTube Data API.""" | |
| comments = [] | |
| for thumbnail_link in thumbnail_links: | |
| try: | |
| # Fetch top 5 comments | |
| youtube = get_youtube_service() | |
| request = youtube.commentThreads().list( | |
| part="snippet", | |
| videoId=thumbnail_link, | |
| maxResults=5, | |
| order="relevance" | |
| ) | |
| response = request.execute() | |
| if "items" in response: | |
| for item in response["items"]: | |
| comment = item["snippet"]["topLevelComment"]["snippet"] | |
| author = comment["authorDisplayName"] | |
| text = comment["textOriginal"] | |
| like_count = comment["likeCount"] | |
| comments.append( | |
| f"π€ **{author}**: {text} π {like_count} likes" | |
| ) | |
| except Exception as e: | |
| print(f"Error fetching comments for {thumbnail_link}: {e}") | |
| comments.append(f"Error fetching comments for {thumbnail_link}: {e}") | |
| if comments: | |
| return "\n\n".join(comments) | |
| else: | |
| return "No comments found." | |
| # β Function: Main Streamlit Application Logic | |
| def main(): | |
| """ | |
| Streamlit app entry point. | |
| Handles the rendering of the chatbot interface, message exchange, | |
| SQL queries, and video download functionalities. | |
| """ | |
| # π οΈ Add YouTube logo and title | |
| st.markdown( | |
| """ | |
| <div style="display: flex; align-items: center; gap: 20px;"> | |
| <img src="https://upload.wikimedia.org/wikipedia/commons/b/b8/YouTube_Logo_2017.svg" | |
| alt="YouTube Logo" style="height: 80px;"/> | |
| <h1 style="margin: 0; font-size: 64px; display: flex; align-items: center;">Chatbot</h1> | |
| </div> | |
| """, | |
| unsafe_allow_html=True | |
| ) | |
| if "messages" not in st.session_state: | |
| st.session_state.messages = [ | |
| { | |
| "role": "system", | |
| "content": """ | |
| You are a helpful assistant that provides insights and accurate analysis of YouTube trending video data. | |
| Always use real data for if user asks any queries related to top most videos and comments. | |
| To work with real data, use the following database: | |
| - The table name is "youtube_videos" | |
| - Column Structure: | |
| | column_name | data_type | | |
| |--------------------|----------------------------| | |
| | video_id | text | | |
| | title | text | | |
| | publishedAt | timestamp with time zone | | |
| | channelId | text | | |
| | channelTitle | text | | |
| | category_id | bigint | | |
| | trending_date | timestamp with time zone | | |
| | tags | text | | |
| | views | bigint | | |
| | likes | numeric | | |
| | dislikes | numeric | | |
| | comment_count | bigint | | |
| | thumbnail_link | text | | |
| | comments_disabled | boolean | | |
| | ratings_disabled | boolean | | |
| | description | text | | |
| | category_name | text | | |
| """ | |
| } | |
| ] | |
| # # β Display previous chat messages | |
| # for message in st.session_state.messages: | |
| # print(message) | |
| # β User chat input | |
| user_prompt = st.chat_input("Ask me anything!") | |
| if user_prompt: | |
| # Append user message to session state | |
| st.session_state.messages.append({"role": "user", "content": user_prompt}) | |
| with st.chat_message("user"): | |
| st.write(user_prompt) | |
| # β Generate LLM response | |
| response = client.chat.completions.create( | |
| model=MODEL, | |
| messages=st.session_state.messages, | |
| tools=TOOLS, | |
| tool_choice="auto" | |
| ) | |
| assistant_message = response.choices[0].message | |
| st.session_state.messages.append(assistant_message) | |
| if getattr(assistant_message, "tool_calls", None): | |
| tool_response = "" | |
| for tool_call in assistant_message.tool_calls: | |
| tool_name = tool_call.function.name | |
| tool_args = json.loads(tool_call.function.arguments) | |
| if tool_name == "execute_sql_query": | |
| with st.spinner("Querying the database..."): | |
| tool_response = execute_sql_query(tool_args.get("query")) | |
| elif tool_name == "fetch_top_comments": | |
| with st.spinner("Fetching comments..."): | |
| tool_response = fetch_top_comments(tool_args.get("thumbnail_link")) | |
| # β Append the tool response inside the loop | |
| st.session_state.messages.append({ | |
| "role": "tool", | |
| "tool_call_id": tool_call.id, | |
| "content": tool_response, | |
| }) | |
| with st.chat_message("assistant"): | |
| if assistant_message.content: | |
| st.write(assistant_message.content) | |
| else: | |
| st.write("No content available.") | |
| # β Add a fallback for when there are no tool calls | |
| else: | |
| st.session_state.messages.append({ | |
| "role": "assistant", | |
| "content": "No tool calls were made." | |
| }) | |
| # β Run the app | |
| if __name__ == "__main__": | |
| main() |