youtube_trends / src /app1.py
molehh's picture
created project
eec3758
import json
from openai import OpenAI
import streamlit as st
from dotenv import load_dotenv
import os
from supabase_db import DatabaseConnection
from pytube import Search, YouTube
import psycopg2
from datetime import datetime
from psycopg2 import OperationalError, sql
from decimal import Decimal
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
# Use only API key without ADC authentication
YOUTUBE_API_KEY = os.environ.get("YOUTUBE_API_KEY")
def get_youtube_service():
try:
youtube = build("youtube", "v3", developerKey=YOUTUBE_API_KEY, cache_discovery=False)
return youtube
except HttpError as e:
print(f"❌ An error occurred: {e}")
return None
# βœ… Load environment variables
load_dotenv()
# βœ… Initialize LLM client
client = OpenAI(api_key=os.environ.get("OPENAI_API_KEY"))
MODEL = "gpt-4o-mini"
# βœ… Define Tools with Improved Names and Descriptions
TOOLS = [
{
"type": "function",
"function": {
"name": "execute_sql_query",
"description": "Executes SQL queries on the 'youtube_videos' table to retrieve trending video insights.",
"parameters": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "The SQL query string to execute on the database."
},
},
"required": ["query"]
}
}
},
{
"type": "function",
"function": {
"name": "fetch_top_comments",
"description": "Fetches top trending comments and their YouTube links.",
"parameters": {
"type": "object",
"properties": {
"thumbnail_links": {
"type": "array",
"items": {"type": "string"},
"description": "An array of video IDs to fetch comments from."
}
},
"required": ["thumbnail_links"]
}
}
}
]
# βœ… Initialize session state properly
if "messages" not in st.session_state:
st.session_state.messages = []
def execute_sql_query(query: str):
print(query)
try:
db_instance = DatabaseConnection()
conn = db_instance.get_connection()
if conn:
with conn.cursor() as cur:
cur.execute(query)
results = cur.fetchall()
res = "Here are the results based on your query. Please answer the previous question using this context:\n" + \
str(results)
print(res)
return res
else:
raise Exception("Database connection error")
except Exception as e:
return f"""An error occurred while querying the database {e}. Tell a general message to try again later."""
def fetch_top_comments(thumbnail_links: list) -> str:
"""Fetches top comments from YouTube using the YouTube Data API."""
comments = []
for thumbnail_link in thumbnail_links:
try:
# Fetch top 5 comments
youtube = get_youtube_service()
request = youtube.commentThreads().list(
part="snippet",
videoId=thumbnail_link,
maxResults=5,
order="relevance"
)
response = request.execute()
if "items" in response:
for item in response["items"]:
comment = item["snippet"]["topLevelComment"]["snippet"]
author = comment["authorDisplayName"]
text = comment["textOriginal"]
like_count = comment["likeCount"]
comments.append(
f"🎀 **{author}**: {text} πŸ‘ {like_count} likes"
)
except Exception as e:
print(f"Error fetching comments for {thumbnail_link}: {e}")
comments.append(f"Error fetching comments for {thumbnail_link}: {e}")
if comments:
return "\n\n".join(comments)
else:
return "No comments found."
# βœ… Function: Main Streamlit Application Logic
def main():
"""
Streamlit app entry point.
Handles the rendering of the chatbot interface, message exchange,
SQL queries, and video download functionalities.
"""
# πŸ› οΈ Add YouTube logo and title
st.markdown(
"""
<div style="display: flex; align-items: center; gap: 20px;">
<img src="https://upload.wikimedia.org/wikipedia/commons/b/b8/YouTube_Logo_2017.svg"
alt="YouTube Logo" style="height: 80px;"/>
<h1 style="margin: 0; font-size: 64px; display: flex; align-items: center;">Chatbot</h1>
</div>
""",
unsafe_allow_html=True
)
if "messages" not in st.session_state:
st.session_state.messages = [
{
"role": "system",
"content": """
You are a helpful assistant that provides insights and accurate analysis of YouTube trending video data.
Always use real data for if user asks any queries related to top most videos and comments.
To work with real data, use the following database:
- The table name is "youtube_videos"
- Column Structure:
| column_name | data_type |
|--------------------|----------------------------|
| video_id | text |
| title | text |
| publishedAt | timestamp with time zone |
| channelId | text |
| channelTitle | text |
| category_id | bigint |
| trending_date | timestamp with time zone |
| tags | text |
| views | bigint |
| likes | numeric |
| dislikes | numeric |
| comment_count | bigint |
| thumbnail_link | text |
| comments_disabled | boolean |
| ratings_disabled | boolean |
| description | text |
| category_name | text |
"""
}
]
# # βœ… Display previous chat messages
# for message in st.session_state.messages:
# print(message)
# βœ… User chat input
user_prompt = st.chat_input("Ask me anything!")
if user_prompt:
# Append user message to session state
st.session_state.messages.append({"role": "user", "content": user_prompt})
with st.chat_message("user"):
st.write(user_prompt)
# βœ… Generate LLM response
response = client.chat.completions.create(
model=MODEL,
messages=st.session_state.messages,
tools=TOOLS,
tool_choice="auto"
)
assistant_message = response.choices[0].message
st.session_state.messages.append(assistant_message)
if getattr(assistant_message, "tool_calls", None):
tool_response = ""
for tool_call in assistant_message.tool_calls:
tool_name = tool_call.function.name
tool_args = json.loads(tool_call.function.arguments)
if tool_name == "execute_sql_query":
with st.spinner("Querying the database..."):
tool_response = execute_sql_query(tool_args.get("query"))
elif tool_name == "fetch_top_comments":
with st.spinner("Fetching comments..."):
tool_response = fetch_top_comments(tool_args.get("thumbnail_link"))
# βœ… Append the tool response inside the loop
st.session_state.messages.append({
"role": "tool",
"tool_call_id": tool_call.id,
"content": tool_response,
})
with st.chat_message("assistant"):
if assistant_message.content:
st.write(assistant_message.content)
else:
st.write("No content available.")
# βœ… Add a fallback for when there are no tool calls
else:
st.session_state.messages.append({
"role": "assistant",
"content": "No tool calls were made."
})
# βœ… Run the app
if __name__ == "__main__":
main()