mojad121's picture
Update src/streamlit_app.py
35359c5 verified
import os
import tempfile
import streamlit as st
from typing import TypedDict, List, Optional
from langgraph.graph import StateGraph, END
from youtubesearchpython import VideosSearch
import yt_dlp
import whisper
from moviepy.editor import AudioFileClip
from textblob import TextBlob
from langchain_groq import ChatGroq
# Initialize LLM
llm = ChatGroq(
model_name="llama3-70b-8192",
temperature=0,
groq_api_key=os.getenv("GROQ_API_KEY")
)
# Define state
class AppState(TypedDict):
product_query: str
is_specific: Optional[bool]
youtube_videos: Optional[List[dict]]
video_data: Optional[List[dict]]
relevant_videos: Optional[List[dict]]
summaries: Optional[List[dict]]
sentiment_score: Optional[float]
recommendation: Optional[str]
# Define agent functions
def check_product_specificity(state):
query = state["product_query"]
prompt = f"Is the following product query specific enough for recommendation search? Be strict. Query: {query}"
result = llm.invoke(prompt)
state["is_specific"] = "yes" in result.content.lower()
st.session_state.specificity_check = state["is_specific"]
return state
def search_youtube(state):
query = state["product_query"]
search = VideosSearch(query, limit=5) # Reduced for demo
results = search.result()
videos = []
for v in results["result"]:
videos.append({
"title": v["title"],
"link": v["link"],
"id": v["id"]
})
state["video_data"] = videos
st.session_state.videos_found = len(videos)
return state
def download_and_transcribe_audio(state):
model = whisper.load_model("base")
videos = state.get("video_data", [])
transcripts = []
for video in videos:
try:
with tempfile.TemporaryDirectory() as temp_dir:
video_url = video["link"]
title = video["title"]
video_id = video["id"]
clean_title = "".join(c for c in title if c.isalnum() or c in (' ', '_')).rstrip()
# Download audio
ydl_opts = {
'format': 'bestaudio/best',
'outtmpl': os.path.join(temp_dir, f"{clean_title}_{video_id}.%(ext)s"),
'quiet': True,
}
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
ydl.download([video_url])
# Convert to WAV
audio_file = [f for f in os.listdir(temp_dir) if f.startswith(clean_title)][0]
audio_path = os.path.join(temp_dir, audio_file)
final_audio_path = os.path.join(temp_dir, f"{clean_title}_{video_id}.wav")
clip = AudioFileClip(audio_path)
clip.write_audiofile(final_audio_path, codec='pcm_s16le')
clip.close()
# Transcribe
result = model.transcribe(final_audio_path)
video["transcript"] = result["text"]
transcripts.append(video)
except Exception as e:
st.warning(f"Error processing {title}: {str(e)}")
continue
state["video_data"] = transcripts
st.session_state.transcripts_processed = len(transcripts)
return state
def filter_relevant_videos(state):
product = state["product_query"]
relevant_videos = []
for video in state["video_data"]:
transcript = video["transcript"][:2000]
prompt = f"Is this transcript relevant to the product: {product}?\n\nTranscript:\n{transcript}\n\nAnswer only yes or no."
result = llm.predict(prompt)
if "yes" in result.lower():
relevant_videos.append(video)
state["relevant_videos"] = relevant_videos
st.session_state.relevant_videos = relevant_videos
return state
def summarize_videos(state):
summaries = []
for video in state["relevant_videos"][:5]:
transcript = video["transcript"][:3000]
prompt = f"Summarize the following transcript and list pros and cons clearly:\n\n{transcript}"
result = llm.predict(prompt)
summaries.append({
"title": video["title"],
"summary": result
})
state["summaries"] = summaries
st.session_state.summaries = summaries
return state
def final_recommendation(state):
summaries = state["summaries"]
combined_text = " ".join([s["summary"] for s in summaries])
sentiment = TextBlob(combined_text).sentiment.polarity
state["sentiment_score"] = sentiment
state["recommendation"] = "Recommended" if sentiment > 0 else "Not Recommended"
st.session_state.sentiment_score = sentiment
st.session_state.recommendation = state["recommendation"]
return state
# Build the graph
graph = StateGraph(AppState)
graph.add_node("Product Specificity", check_product_specificity)
graph.add_node("YouTube Search", search_youtube)
graph.add_node("Transcript Fetcher", download_and_transcribe_audio)
graph.add_node("Relevance Filter", filter_relevant_videos)
graph.add_node("Summarizer", summarize_videos)
graph.add_node("Final Recommendation", final_recommendation)
graph.set_entry_point("Product Specificity")
graph.add_edge("Product Specificity", "YouTube Search")
graph.add_edge("YouTube Search", "Transcript Fetcher")
graph.add_edge("Transcript Fetcher", "Relevance Filter")
graph.add_edge("Relevance Filter", "Summarizer")
graph.add_edge("Summarizer", "Final Recommendation")
graph.add_edge("Final Recommendation", END)
compiled_graph = graph.compile()
# Streamlit UI
st.title("Product Recommendation System")
st.write("Analyze YouTube videos to get product recommendations")
product_query = st.text_input("Enter a product query (e.g., 'Sony WH-1000XM5 headphones'):")
if st.button("Analyze"):
if not product_query:
st.warning("Please enter a product query")
else:
with st.spinner("Analyzing product query..."):
initial_state = {"product_query": product_query}
# Reset session state
for key in ['specificity_check', 'videos_found', 'transcripts_processed',
'relevant_videos', 'summaries', 'sentiment_score', 'recommendation']:
if key in st.session_state:
del st.session_state[key]
# Execute the graph
result = compiled_graph.invoke(initial_state)
# Display results
st.subheader("Analysis Results")
col1, col2 = st.columns(2)
with col1:
st.metric("Query Specific", st.session_state.get('specificity_check', False))
st.metric("Videos Found", st.session_state.get('videos_found', 0))
st.metric("Transcripts Processed", st.session_state.get('transcripts_processed', 0))
st.metric("Relevant Videos", len(st.session_state.get('relevant_videos', [])))
with col2:
st.metric("Sentiment Score", round(st.session_state.get('sentiment_score', 0), 2))
st.metric("Final Recommendation", st.session_state.get('recommendation', ''))
if 'summaries' in st.session_state:
st.subheader("Video Summaries")
for summary in st.session_state.summaries:
with st.expander(summary['title']):
st.write(summary['summary'])