import os import tempfile import streamlit as st from typing import TypedDict, List, Optional from langgraph.graph import StateGraph, END from youtubesearchpython import VideosSearch import yt_dlp import whisper from moviepy.editor import AudioFileClip from textblob import TextBlob from langchain_groq import ChatGroq # Initialize LLM llm = ChatGroq( model_name="llama3-70b-8192", temperature=0, groq_api_key=os.getenv("GROQ_API_KEY") ) # Define state class AppState(TypedDict): product_query: str is_specific: Optional[bool] youtube_videos: Optional[List[dict]] video_data: Optional[List[dict]] relevant_videos: Optional[List[dict]] summaries: Optional[List[dict]] sentiment_score: Optional[float] recommendation: Optional[str] # Define agent functions def check_product_specificity(state): query = state["product_query"] prompt = f"Is the following product query specific enough for recommendation search? Be strict. Query: {query}" result = llm.invoke(prompt) state["is_specific"] = "yes" in result.content.lower() st.session_state.specificity_check = state["is_specific"] return state def search_youtube(state): query = state["product_query"] search = VideosSearch(query, limit=5) # Reduced for demo results = search.result() videos = [] for v in results["result"]: videos.append({ "title": v["title"], "link": v["link"], "id": v["id"] }) state["video_data"] = videos st.session_state.videos_found = len(videos) return state def download_and_transcribe_audio(state): model = whisper.load_model("base") videos = state.get("video_data", []) transcripts = [] for video in videos: try: with tempfile.TemporaryDirectory() as temp_dir: video_url = video["link"] title = video["title"] video_id = video["id"] clean_title = "".join(c for c in title if c.isalnum() or c in (' ', '_')).rstrip() # Download audio ydl_opts = { 'format': 'bestaudio/best', 'outtmpl': os.path.join(temp_dir, f"{clean_title}_{video_id}.%(ext)s"), 'quiet': True, } with yt_dlp.YoutubeDL(ydl_opts) as ydl: ydl.download([video_url]) # Convert to WAV audio_file = [f for f in os.listdir(temp_dir) if f.startswith(clean_title)][0] audio_path = os.path.join(temp_dir, audio_file) final_audio_path = os.path.join(temp_dir, f"{clean_title}_{video_id}.wav") clip = AudioFileClip(audio_path) clip.write_audiofile(final_audio_path, codec='pcm_s16le') clip.close() # Transcribe result = model.transcribe(final_audio_path) video["transcript"] = result["text"] transcripts.append(video) except Exception as e: st.warning(f"Error processing {title}: {str(e)}") continue state["video_data"] = transcripts st.session_state.transcripts_processed = len(transcripts) return state def filter_relevant_videos(state): product = state["product_query"] relevant_videos = [] for video in state["video_data"]: transcript = video["transcript"][:2000] prompt = f"Is this transcript relevant to the product: {product}?\n\nTranscript:\n{transcript}\n\nAnswer only yes or no." result = llm.predict(prompt) if "yes" in result.lower(): relevant_videos.append(video) state["relevant_videos"] = relevant_videos st.session_state.relevant_videos = relevant_videos return state def summarize_videos(state): summaries = [] for video in state["relevant_videos"][:5]: transcript = video["transcript"][:3000] prompt = f"Summarize the following transcript and list pros and cons clearly:\n\n{transcript}" result = llm.predict(prompt) summaries.append({ "title": video["title"], "summary": result }) state["summaries"] = summaries st.session_state.summaries = summaries return state def final_recommendation(state): summaries = state["summaries"] combined_text = " ".join([s["summary"] for s in summaries]) sentiment = TextBlob(combined_text).sentiment.polarity state["sentiment_score"] = sentiment state["recommendation"] = "Recommended" if sentiment > 0 else "Not Recommended" st.session_state.sentiment_score = sentiment st.session_state.recommendation = state["recommendation"] return state # Build the graph graph = StateGraph(AppState) graph.add_node("Product Specificity", check_product_specificity) graph.add_node("YouTube Search", search_youtube) graph.add_node("Transcript Fetcher", download_and_transcribe_audio) graph.add_node("Relevance Filter", filter_relevant_videos) graph.add_node("Summarizer", summarize_videos) graph.add_node("Final Recommendation", final_recommendation) graph.set_entry_point("Product Specificity") graph.add_edge("Product Specificity", "YouTube Search") graph.add_edge("YouTube Search", "Transcript Fetcher") graph.add_edge("Transcript Fetcher", "Relevance Filter") graph.add_edge("Relevance Filter", "Summarizer") graph.add_edge("Summarizer", "Final Recommendation") graph.add_edge("Final Recommendation", END) compiled_graph = graph.compile() # Streamlit UI st.title("Product Recommendation System") st.write("Analyze YouTube videos to get product recommendations") product_query = st.text_input("Enter a product query (e.g., 'Sony WH-1000XM5 headphones'):") if st.button("Analyze"): if not product_query: st.warning("Please enter a product query") else: with st.spinner("Analyzing product query..."): initial_state = {"product_query": product_query} # Reset session state for key in ['specificity_check', 'videos_found', 'transcripts_processed', 'relevant_videos', 'summaries', 'sentiment_score', 'recommendation']: if key in st.session_state: del st.session_state[key] # Execute the graph result = compiled_graph.invoke(initial_state) # Display results st.subheader("Analysis Results") col1, col2 = st.columns(2) with col1: st.metric("Query Specific", st.session_state.get('specificity_check', False)) st.metric("Videos Found", st.session_state.get('videos_found', 0)) st.metric("Transcripts Processed", st.session_state.get('transcripts_processed', 0)) st.metric("Relevant Videos", len(st.session_state.get('relevant_videos', []))) with col2: st.metric("Sentiment Score", round(st.session_state.get('sentiment_score', 0), 2)) st.metric("Final Recommendation", st.session_state.get('recommendation', '')) if 'summaries' in st.session_state: st.subheader("Video Summaries") for summary in st.session_state.summaries: with st.expander(summary['title']): st.write(summary['summary'])