#chat is working + key topics working from h2o_wave import main, app, Q, ui, data from youtube_transcript_api import YouTubeTranscriptApi from h2ogpte import H2OGPTE import re import os from dotenv import load_dotenv from collections import Counter import nltk from nltk.tokenize import word_tokenize from nltk.corpus import stopwords from nltk.sentiment import SentimentIntensityAnalyzer from textblob import TextBlob from nltk.tokenize import sent_tokenize import asyncio import logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) class TranscriptAnalyzer: def __init__(self, transcript_list): # Join all text for general analysis self.transcript = ' '.join([entry['text'] for entry in transcript_list]) self.transcript_list = transcript_list self.sia = SentimentIntensityAnalyzer() self.stop_words = set(stopwords.words('english')) self.additional_stops = {'um', 'uh', 'like', 'okay', 'right', 'well', 'so'} self.stop_words.update(self.additional_stops) self.sentences = sent_tokenize(self.transcript) self.words = word_tokenize(self.transcript.lower()) def analyze(self): try: return { 'word_freq': self._analyze_word_frequency(), 'sentiments': self._analyze_sentiment(), 'topics': self._extract_topics(), 'time_segments': self._create_time_segments() } except Exception as e: logger.error(f"Error in transcript analysis: {e}") return { 'word_freq': [('no data', 1)], 'sentiments': [0.0], 'topics': [('no topics', 1, ['none'])], 'time_segments': [{ 'start_time': '0:00', 'end_time': '0:00', 'text': 'Analysis not available', 'sentiment': 0.0, 'topics': [] }] } def _analyze_word_frequency(self): words = [word.strip('.,!?()[]{}":;') for word in self.words] words = [word for word in words if ( word.isalnum() and not word.isnumeric() and len(word) > 2 and word not in self.stop_words )] return Counter(words).most_common(15) def _analyze_sentiment(self): """Analyze sentiment""" return [self.sia.polarity_scores(sentence)['compound'] for sentence in self.sentences] def _extract_topics(self): blob = TextBlob(self.transcript) noun_phrases = [phrase for phrase in blob.noun_phrases if len(phrase.split()) >= 2] topic_counter = Counter(noun_phrases) topics = [] for topic, count in topic_counter.most_common(10): topics.append((topic, count, ['related'])) return topics def _create_time_segments(self): """Create time-based segments using actual YouTube timestamps""" segments = [] segment_size = 5 # Number of transcript entries per segment for i in range(0, len(self.transcript_list), segment_size): segment_entries = self.transcript_list[i:i + segment_size] segment_text = ' '.join([entry['text'] for entry in segment_entries]) # Get start time from first entry and end time from last entry start_time = segment_entries[0]['start'] end_time = segment_entries[-1]['start'] + segment_entries[-1]['duration'] # Convert times to minutes:seconds format start_min, start_sec = divmod(int(start_time), 60) end_min, end_sec = divmod(int(end_time), 60) sentiment_scores = self.sia.polarity_scores(segment_text) blob = TextBlob(segment_text) segments.append({ 'start_time': f"{start_min}:{start_sec:02d}", 'end_time': f"{end_min}:{end_sec:02d}", 'text': segment_text, 'sentiment': sentiment_scores['compound'], 'topics': [phrase for phrase in blob.noun_phrases][:3] }) return segments # Download NLTK data safely nltk_dependencies = ['punkt', 'stopwords', 'vader_lexicon'] for dep in nltk_dependencies: try: nltk.download(dep, quiet=True) except Exception as e: print(f"Error downloading {dep}: {e}") # Load environment variables load_dotenv() # Initialize H2O GPT client h2ogpt_url = os.getenv('H2OGPT_URL') h2ogpt_api_key = os.getenv('H2OGPT_API_KEY') client = H2OGPTE( address=h2ogpt_url, api_key=h2ogpt_api_key ) def analyze_transcript(transcript): """Analyze transcript for insights with more sophisticated processing""" try: # Word frequency analysis - improved # Tokenize and clean text more thoroughly tokens = word_tokenize(transcript.lower()) stop_words = set(stopwords.words('english')) # Add common transcript-specific words to stop words additional_stops = {'um', 'uh', 'like', 'okay', 'right', 'well', 'so', 'and', 'the', 'to', 'of', 'in', 'a', 'is', 'that'} stop_words.update(additional_stops) # Filter for meaningful words (longer than 2 characters and not numbers) words = [word for word in tokens if ( word.isalnum() and not word.isnumeric() and len(word) > 2 and word not in stop_words )] # Get word frequency for meaningful words word_freq = Counter(words).most_common(10) # Enhanced sentiment analysis # Break transcript into meaningful chunks sentences = [s.strip() for s in re.split('[.!?]', transcript) if len(s.strip()) > 20] sentiments = [] sia = SentimentIntensityAnalyzer() for sentence in sentences: sentiment_scores = sia.polarity_scores(sentence) # Use compound score for overall sentiment sentiments.append(sentiment_scores['compound']) # If we have too many sentences, sample them to get a representative view if len(sentiments) > 50: step = len(sentiments) // 50 sentiments = sentiments[::step][:50] # Topic extraction with improved filtering blob = TextBlob(transcript) noun_phrases = [phrase for phrase in blob.noun_phrases if len(phrase.split()) >= 2] # Only multi-word phrases topics = Counter(noun_phrases).most_common(5) # Ensure we have at least some data if not word_freq: word_freq = [('no', 1)] if not sentiments: sentiments = [0.0] if not topics: topics = [('no topics found', 1)] return { 'word_freq': word_freq, 'sentiments': sentiments, 'topics': topics } except Exception as e: print(f"Error in transcript analysis: {e}") return { 'word_freq': [('error', 1)], 'sentiments': [0.0], 'topics': [('error', 1)] } def create_word_frequency_plot(word_freq): """Create word frequency plot components""" plot_data = data( fields=['word', 'count'], rows=[{'word': word, 'count': count} for word, count in word_freq] ) plot = ui.plot([ ui.mark( type='interval', x='=word', y='=count' ) ]) return plot_data, plot def create_sentiment_plot(sentiments): """Create sentiment plot components""" plot_data = data( fields=['index', 'sentiment'], rows=[{'index': str(i), 'sentiment': score} for i, score in enumerate(sentiments)] ) plot = ui.plot([ ui.mark( type='line', x='=index', y='=sentiment' ) ]) return plot_data, plot def extract_video_id(url): """Extract YouTube video ID from URL""" pattern = r'(?:v=|\/)([\w-]{11})(?:\?|\/|&|$)' match = re.search(pattern, url) return match.group(1) if match else None async def get_transcript(video_id: str): """Get transcript for YouTube video""" try: transcript_list = await asyncio.get_event_loop().run_in_executor( None, YouTubeTranscriptApi.get_transcript, video_id ) return transcript_list except Exception as e: print(f"Error fetching transcript: {e}") # Using print instead of logger for simplicity return f"Error: {str(e)}" async def setup_h2ogpt_collection(transcript, video_id): """Setup H2O GPT collection for the video transcript""" try: collection_id = client.create_collection( name=f'YouTube_Video_{video_id}', description='YouTube video transcript for chat interaction' ) with open(f'transcript_{video_id}.txt', 'w', encoding='utf-8') as f: f.write(transcript) with open(f'transcript_{video_id}.txt', 'rb') as f: upload_id = client.upload(f'transcript_{video_id}.txt', f) client.ingest_uploads(collection_id, [upload_id]) os.remove(f'transcript_{video_id}.txt') return collection_id except Exception as e: return f"Error setting up H2O GPT: {str(e)}" async def get_gpt_response(collection_id, question): """Get response from H2O GPT""" try: chat_session_id = client.create_chat_session(collection_id) with client.connect(chat_session_id) as session: response = session.query( question, timeout=60, rag_config={"rag_type": "rag"} ) return response.content except Exception as e: return f"Error getting response: {str(e)}" @app('/chatbot') async def serve(q: Q): if not q.client.initialized: q.client.initialized = True # Header q.page['header'] = ui.header_card( box='1 1 12 1', title='YouTube Video Transcript Chatbot & Analysis | xAmplify', subtitle='Enter a YouTube URL to analyse and chat about the video content', color='primary' ) # URL input form q.page['url_form'] = ui.form_card( box='1 2 12 1', items=[ ui.inline([ ui.textbox( name='video_url', # label='YouTube URL', placeholder='Enter YouTube video URL...', width='800px' ), ui.button( name='submit_url', label='Fetch Transcript', primary=True ), ui.button( name='clear_chat', label='Clear Chat', icon='Delete' ) ]) ] ) # Status card q.page['status'] = ui.form_card( box='1 3 12 1', items=[ ui.text('Please enter a YouTube URL to begin.') ] ) # Left column - Transcript and Analysis q.page['transcript'] = ui.form_card( box='1 4 6 4', title='Video Transcript', items=[ ui.text('Transcript will appear here...') ] ) # Initialize plots with dummy data q.page['word_freq'] = ui.plot_card( box='1 8 3 4', title='Word Frequency Analysis', # caption='Frequency of significant terms identified in the video content', data=data('word count', rows=[('', 0)], pack=True), plot=ui.plot([ui.mark(type='interval', x='=word', y='=count')]) ) q.page['sentiment'] = ui.plot_card( box='4 8 3 4', title='Sentiment Flow', # caption='Emotional tone progression throughout the video', data=data('index sentiment', rows=[(0, 0)], pack=True), plot=ui.plot([ui.mark(type='line', x='=index', y='=sentiment')]) ) # Key topics q.page['topics'] = ui.markdown_card( box='7 8 6 4', title='Key Topics', content='Key topics discussed in the video with their frequency of mention', ) # Right column - Chat interface q.page['chat'] = ui.chatbot_card( box='7 4 6 4', data=data(fields='content from_user', t='list'), name='chatbot', events=['feedback'], placeholder='Type your question here...', disabled=True, ) # Feedback card q.page['feedback'] = ui.form_card( box='1 12 12 1', items=[ ui.inline([ ui.text_l('Response Feedback'), ui.text(name='feedback_text', content='No feedback yet.'), ui.button(name='export_chat', label='Export Chat', icon='Download') ]) ] ) # Handle URL submission if q.args.submit_url: url = q.args.video_url video_id = extract_video_id(url) if not video_id: q.page['status'].items = [ ui.message_bar( type='error', text='Invalid YouTube URL. Please check and try again.' ) ] return # Update status to processing q.page['status'].items = [ ui.progress(label='Processing video transcript...', value=True) ] await q.page.save() # Get and process transcript transcript_list = await get_transcript(video_id) if isinstance(transcript_list, str) and transcript_list.startswith('Error'): q.page['status'].items = [ ui.message_bar(type='error', text=transcript_list) ] return # Store transcript and analyze q.client.transcript = ' '.join([entry['text'] for entry in transcript_list]) analyzer = TranscriptAnalyzer(transcript_list) analysis = analyzer.analyze() # Update transcript display with time segments transcript_items = [] transcript_items.append(ui.text_xl('Video Transcript')) transcript_items.append(ui.separator()) for segment in analysis['time_segments']: # Add timestamp header with markdown for bold text transcript_items.append( ui.text( f"**[{segment['start_time']} - {segment['end_time']}]**", size='s' ) ) # Add segment text transcript_items.append(ui.text(segment['text'])) # Add sentiment indicator sentiment_value = (segment['sentiment'] + 1) / 2 # Convert from [-1,1] to [0,1] transcript_items.append( ui.progress( label='Sentiment', value=sentiment_value, caption=f"{'Positive' if segment['sentiment'] > 0.1 else 'Negative' if segment['sentiment'] < -0.1 else 'Neutral'}" ) ) # Add segment topics if available if segment['topics']: transcript_items.append( ui.text( f"Topics: {', '.join(segment['topics'])}", size='s' ) ) # Add separator between segments transcript_items.append(ui.separator()) q.page['transcript'].items = transcript_items # Update analysis visualizations word_freq_data = [(word, count) for word, count in analysis['word_freq']] q.page['word_freq'].data = data('word count', rows=word_freq_data, pack=True) # Update sentiment plot sentiment_data = [(i, score) for i, score in enumerate(analysis['sentiments'])] q.page['sentiment'].data = data('index sentiment', rows=sentiment_data, pack=True) # Update topics topics_md = "## Key Topics Discussed\n" + "\n".join([ f"- {topic} ({count} mentions)\n Related: {', '.join(related)}" for topic, count, related in analysis['topics'] ]) q.page['topics'].content = topics_md # Setup H2O GPT collection collection_id = await setup_h2ogpt_collection(q.client.transcript, video_id) if isinstance(collection_id, str) and collection_id.startswith('Error'): q.page['status'].items = [ ui.message_bar(type='error', text=collection_id) ] return # Store collection ID q.client.collection_id = collection_id # Enable chat and update status q.page['chat'].disabled = False q.page['status'].items = [ ui.message_bar( type='success', text='Transcript processed successfully! You can now ask questions about the video.' ) ] # Handle chat clear if q.args.clear_chat: q.page['chat'].data = data(fields='content from_user', t='list') q.page['feedback'].items[0][1].content = 'Chat history cleared.' # Handle chat export if q.args.export_chat: if hasattr(q.client, 'transcript'): chat_history = [] for msg in q.page['chat'].data: # Updated to use User: and Response: prefixes prefix = "User: " if msg[1] else "Response: " chat_history.append(f'{prefix}{msg[0]}') # Create formatted chat history text chat_history_text = '\n'.join(chat_history) # Create full export content export_content = f'''YouTube Video Transcript Chatbot Transcript: {q.client.transcript} Chat History: {chat_history_text}''' q.page['export'] = ui.form_card( box='1 13 12 2', items=[ ui.text_area( name='export_content', label='Chat Export (Copy and save)', value=export_content, height='200px' ) ] ) # Handle chat messages if q.args.chatbot: # Add user message with "User:" prefix user_message = f"User: {q.args.chatbot}" q.page['chat'].data += [user_message, True] await q.page.save() if hasattr(q.client, 'collection_id'): response = await get_gpt_response(q.client.collection_id, q.args.chatbot) # Add response with "Response:" prefix formatted_response = f"Response: {response}" q.page['chat'].data += [formatted_response, False] else: # Add error message with "Response:" prefix q.page['chat'].data += ['Response: Please fetch a video transcript first.', False] # Handle feedback if q.events.chatbot and q.events.chatbot.feedback: feedback = q.events.chatbot.feedback q.page['feedback'].items[0][1].content = f'Latest feedback: {feedback.type} on "{feedback.message}"' await q.page.save()