Spaces:
Runtime error
Runtime error
| import os | |
| import tempfile | |
| import streamlit as st | |
| from typing import TypedDict, List, Optional | |
| from langgraph.graph import StateGraph, END | |
| from youtubesearchpython import VideosSearch | |
| import yt_dlp | |
| import whisper | |
| from moviepy.editor import AudioFileClip | |
| from textblob import TextBlob | |
| from langchain_groq import ChatGroq | |
| # Initialize LLM | |
| llm = ChatGroq( | |
| model_name="llama3-70b-8192", | |
| temperature=0, | |
| groq_api_key=os.getenv("GROQ_API_KEY") | |
| ) | |
| # Define state | |
| class AppState(TypedDict): | |
| product_query: str | |
| is_specific: Optional[bool] | |
| youtube_videos: Optional[List[dict]] | |
| video_data: Optional[List[dict]] | |
| relevant_videos: Optional[List[dict]] | |
| summaries: Optional[List[dict]] | |
| sentiment_score: Optional[float] | |
| recommendation: Optional[str] | |
| # Define agent functions | |
| def check_product_specificity(state): | |
| query = state["product_query"] | |
| prompt = f"Is the following product query specific enough for recommendation search? Be strict. Query: {query}" | |
| result = llm.invoke(prompt) | |
| state["is_specific"] = "yes" in result.content.lower() | |
| st.session_state.specificity_check = state["is_specific"] | |
| return state | |
| def search_youtube(state): | |
| query = state["product_query"] | |
| search = VideosSearch(query, limit=5) # Reduced for demo | |
| results = search.result() | |
| videos = [] | |
| for v in results["result"]: | |
| videos.append({ | |
| "title": v["title"], | |
| "link": v["link"], | |
| "id": v["id"] | |
| }) | |
| state["video_data"] = videos | |
| st.session_state.videos_found = len(videos) | |
| return state | |
| def download_and_transcribe_audio(state): | |
| model = whisper.load_model("base") | |
| videos = state.get("video_data", []) | |
| transcripts = [] | |
| for video in videos: | |
| try: | |
| with tempfile.TemporaryDirectory() as temp_dir: | |
| video_url = video["link"] | |
| title = video["title"] | |
| video_id = video["id"] | |
| clean_title = "".join(c for c in title if c.isalnum() or c in (' ', '_')).rstrip() | |
| # Download audio | |
| ydl_opts = { | |
| 'format': 'bestaudio/best', | |
| 'outtmpl': os.path.join(temp_dir, f"{clean_title}_{video_id}.%(ext)s"), | |
| 'quiet': True, | |
| } | |
| with yt_dlp.YoutubeDL(ydl_opts) as ydl: | |
| ydl.download([video_url]) | |
| # Convert to WAV | |
| audio_file = [f for f in os.listdir(temp_dir) if f.startswith(clean_title)][0] | |
| audio_path = os.path.join(temp_dir, audio_file) | |
| final_audio_path = os.path.join(temp_dir, f"{clean_title}_{video_id}.wav") | |
| clip = AudioFileClip(audio_path) | |
| clip.write_audiofile(final_audio_path, codec='pcm_s16le') | |
| clip.close() | |
| # Transcribe | |
| result = model.transcribe(final_audio_path) | |
| video["transcript"] = result["text"] | |
| transcripts.append(video) | |
| except Exception as e: | |
| st.warning(f"Error processing {title}: {str(e)}") | |
| continue | |
| state["video_data"] = transcripts | |
| st.session_state.transcripts_processed = len(transcripts) | |
| return state | |
| def filter_relevant_videos(state): | |
| product = state["product_query"] | |
| relevant_videos = [] | |
| for video in state["video_data"]: | |
| transcript = video["transcript"][:2000] | |
| prompt = f"Is this transcript relevant to the product: {product}?\n\nTranscript:\n{transcript}\n\nAnswer only yes or no." | |
| result = llm.predict(prompt) | |
| if "yes" in result.lower(): | |
| relevant_videos.append(video) | |
| state["relevant_videos"] = relevant_videos | |
| st.session_state.relevant_videos = relevant_videos | |
| return state | |
| def summarize_videos(state): | |
| summaries = [] | |
| for video in state["relevant_videos"][:5]: | |
| transcript = video["transcript"][:3000] | |
| prompt = f"Summarize the following transcript and list pros and cons clearly:\n\n{transcript}" | |
| result = llm.predict(prompt) | |
| summaries.append({ | |
| "title": video["title"], | |
| "summary": result | |
| }) | |
| state["summaries"] = summaries | |
| st.session_state.summaries = summaries | |
| return state | |
| def final_recommendation(state): | |
| summaries = state["summaries"] | |
| combined_text = " ".join([s["summary"] for s in summaries]) | |
| sentiment = TextBlob(combined_text).sentiment.polarity | |
| state["sentiment_score"] = sentiment | |
| state["recommendation"] = "Recommended" if sentiment > 0 else "Not Recommended" | |
| st.session_state.sentiment_score = sentiment | |
| st.session_state.recommendation = state["recommendation"] | |
| return state | |
| # Build the graph | |
| graph = StateGraph(AppState) | |
| graph.add_node("Product Specificity", check_product_specificity) | |
| graph.add_node("YouTube Search", search_youtube) | |
| graph.add_node("Transcript Fetcher", download_and_transcribe_audio) | |
| graph.add_node("Relevance Filter", filter_relevant_videos) | |
| graph.add_node("Summarizer", summarize_videos) | |
| graph.add_node("Final Recommendation", final_recommendation) | |
| graph.set_entry_point("Product Specificity") | |
| graph.add_edge("Product Specificity", "YouTube Search") | |
| graph.add_edge("YouTube Search", "Transcript Fetcher") | |
| graph.add_edge("Transcript Fetcher", "Relevance Filter") | |
| graph.add_edge("Relevance Filter", "Summarizer") | |
| graph.add_edge("Summarizer", "Final Recommendation") | |
| graph.add_edge("Final Recommendation", END) | |
| compiled_graph = graph.compile() | |
| # Streamlit UI | |
| st.title("Product Recommendation System") | |
| st.write("Analyze YouTube videos to get product recommendations") | |
| product_query = st.text_input("Enter a product query (e.g., 'Sony WH-1000XM5 headphones'):") | |
| if st.button("Analyze"): | |
| if not product_query: | |
| st.warning("Please enter a product query") | |
| else: | |
| with st.spinner("Analyzing product query..."): | |
| initial_state = {"product_query": product_query} | |
| # Reset session state | |
| for key in ['specificity_check', 'videos_found', 'transcripts_processed', | |
| 'relevant_videos', 'summaries', 'sentiment_score', 'recommendation']: | |
| if key in st.session_state: | |
| del st.session_state[key] | |
| # Execute the graph | |
| result = compiled_graph.invoke(initial_state) | |
| # Display results | |
| st.subheader("Analysis Results") | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| st.metric("Query Specific", st.session_state.get('specificity_check', False)) | |
| st.metric("Videos Found", st.session_state.get('videos_found', 0)) | |
| st.metric("Transcripts Processed", st.session_state.get('transcripts_processed', 0)) | |
| st.metric("Relevant Videos", len(st.session_state.get('relevant_videos', []))) | |
| with col2: | |
| st.metric("Sentiment Score", round(st.session_state.get('sentiment_score', 0), 2)) | |
| st.metric("Final Recommendation", st.session_state.get('recommendation', '')) | |
| if 'summaries' in st.session_state: | |
| st.subheader("Video Summaries") | |
| for summary in st.session_state.summaries: | |
| with st.expander(summary['title']): | |
| st.write(summary['summary']) |