| |
| from h2o_wave import main, app, Q, ui, data |
| from youtube_transcript_api import YouTubeTranscriptApi |
| from h2ogpte import H2OGPTE |
| import re |
| import os |
| from dotenv import load_dotenv |
| from collections import Counter |
| import nltk |
| from nltk.tokenize import word_tokenize |
| from nltk.corpus import stopwords |
| from nltk.sentiment import SentimentIntensityAnalyzer |
| from textblob import TextBlob |
| from nltk.tokenize import sent_tokenize |
| import asyncio |
| import logging |
|
|
| logging.basicConfig( |
| level=logging.INFO, |
| format='%(asctime)s - %(levelname)s - %(message)s' |
| ) |
| logger = logging.getLogger(__name__) |
|
|
| class TranscriptAnalyzer: |
| def __init__(self, transcript_list): |
| |
| self.transcript = ' '.join([entry['text'] for entry in transcript_list]) |
| self.transcript_list = transcript_list |
| self.sia = SentimentIntensityAnalyzer() |
| self.stop_words = set(stopwords.words('english')) |
| self.additional_stops = {'um', 'uh', 'like', 'okay', 'right', 'well', 'so'} |
| self.stop_words.update(self.additional_stops) |
| |
| self.sentences = sent_tokenize(self.transcript) |
| self.words = word_tokenize(self.transcript.lower()) |
|
|
| def analyze(self): |
| try: |
| return { |
| 'word_freq': self._analyze_word_frequency(), |
| 'sentiments': self._analyze_sentiment(), |
| 'topics': self._extract_topics(), |
| 'time_segments': self._create_time_segments() |
| } |
| except Exception as e: |
| logger.error(f"Error in transcript analysis: {e}") |
| return { |
| 'word_freq': [('no data', 1)], |
| 'sentiments': [0.0], |
| 'topics': [('no topics', 1, ['none'])], |
| 'time_segments': [{ |
| 'start_time': '0:00', |
| 'end_time': '0:00', |
| 'text': 'Analysis not available', |
| 'sentiment': 0.0, |
| 'topics': [] |
| }] |
| } |
| |
| def _analyze_word_frequency(self): |
| words = [word.strip('.,!?()[]{}":;') for word in self.words] |
| words = [word for word in words if ( |
| word.isalnum() and |
| not word.isnumeric() and |
| len(word) > 2 and |
| word not in self.stop_words |
| )] |
| return Counter(words).most_common(15) |
|
|
| def _analyze_sentiment(self): |
| """Analyze sentiment""" |
| return [self.sia.polarity_scores(sentence)['compound'] for sentence in self.sentences] |
|
|
| def _extract_topics(self): |
| blob = TextBlob(self.transcript) |
| noun_phrases = [phrase for phrase in blob.noun_phrases if len(phrase.split()) >= 2] |
| topic_counter = Counter(noun_phrases) |
| |
| topics = [] |
| for topic, count in topic_counter.most_common(10): |
| topics.append((topic, count, ['related'])) |
| return topics |
| |
| def _create_time_segments(self): |
| """Create time-based segments using actual YouTube timestamps""" |
| segments = [] |
| segment_size = 5 |
| |
| for i in range(0, len(self.transcript_list), segment_size): |
| segment_entries = self.transcript_list[i:i + segment_size] |
| segment_text = ' '.join([entry['text'] for entry in segment_entries]) |
| |
| |
| start_time = segment_entries[0]['start'] |
| end_time = segment_entries[-1]['start'] + segment_entries[-1]['duration'] |
| |
| |
| start_min, start_sec = divmod(int(start_time), 60) |
| end_min, end_sec = divmod(int(end_time), 60) |
| |
| sentiment_scores = self.sia.polarity_scores(segment_text) |
| blob = TextBlob(segment_text) |
| |
| segments.append({ |
| 'start_time': f"{start_min}:{start_sec:02d}", |
| 'end_time': f"{end_min}:{end_sec:02d}", |
| 'text': segment_text, |
| 'sentiment': sentiment_scores['compound'], |
| 'topics': [phrase for phrase in blob.noun_phrases][:3] |
| }) |
| |
| return segments |
|
|
|
|
| |
| nltk_dependencies = ['punkt', 'stopwords', 'vader_lexicon'] |
| for dep in nltk_dependencies: |
| try: |
| nltk.download(dep, quiet=True) |
| except Exception as e: |
| print(f"Error downloading {dep}: {e}") |
|
|
| |
| load_dotenv() |
|
|
| |
| h2ogpt_url = os.getenv('H2OGPT_URL') |
| h2ogpt_api_key = os.getenv('H2OGPT_API_KEY') |
|
|
| client = H2OGPTE( |
| address=h2ogpt_url, |
| api_key=h2ogpt_api_key |
| ) |
|
|
| def analyze_transcript(transcript): |
| """Analyze transcript for insights with more sophisticated processing""" |
| try: |
| |
| |
| tokens = word_tokenize(transcript.lower()) |
| stop_words = set(stopwords.words('english')) |
| |
| additional_stops = {'um', 'uh', 'like', 'okay', 'right', 'well', 'so', 'and', 'the', 'to', 'of', 'in', 'a', 'is', 'that'} |
| stop_words.update(additional_stops) |
| |
| |
| words = [word for word in tokens if ( |
| word.isalnum() and |
| not word.isnumeric() and |
| len(word) > 2 and |
| word not in stop_words |
| )] |
| |
| |
| word_freq = Counter(words).most_common(10) |
| |
| |
| |
| sentences = [s.strip() for s in re.split('[.!?]', transcript) if len(s.strip()) > 20] |
| sentiments = [] |
| sia = SentimentIntensityAnalyzer() |
| |
| for sentence in sentences: |
| sentiment_scores = sia.polarity_scores(sentence) |
| |
| sentiments.append(sentiment_scores['compound']) |
| |
| |
| if len(sentiments) > 50: |
| step = len(sentiments) // 50 |
| sentiments = sentiments[::step][:50] |
| |
| |
| blob = TextBlob(transcript) |
| noun_phrases = [phrase for phrase in blob.noun_phrases if len(phrase.split()) >= 2] |
| topics = Counter(noun_phrases).most_common(5) |
| |
| |
| if not word_freq: |
| word_freq = [('no', 1)] |
| if not sentiments: |
| sentiments = [0.0] |
| if not topics: |
| topics = [('no topics found', 1)] |
| |
| return { |
| 'word_freq': word_freq, |
| 'sentiments': sentiments, |
| 'topics': topics |
| } |
| except Exception as e: |
| print(f"Error in transcript analysis: {e}") |
| return { |
| 'word_freq': [('error', 1)], |
| 'sentiments': [0.0], |
| 'topics': [('error', 1)] |
| } |
|
|
| def create_word_frequency_plot(word_freq): |
| """Create word frequency plot components""" |
| plot_data = data( |
| fields=['word', 'count'], |
| rows=[{'word': word, 'count': count} for word, count in word_freq] |
| ) |
| plot = ui.plot([ |
| ui.mark( |
| type='interval', |
| x='=word', |
| y='=count' |
| ) |
| ]) |
| return plot_data, plot |
|
|
| def create_sentiment_plot(sentiments): |
| """Create sentiment plot components""" |
| plot_data = data( |
| fields=['index', 'sentiment'], |
| rows=[{'index': str(i), 'sentiment': score} for i, score in enumerate(sentiments)] |
| ) |
| plot = ui.plot([ |
| ui.mark( |
| type='line', |
| x='=index', |
| y='=sentiment' |
| ) |
| ]) |
| return plot_data, plot |
|
|
| def extract_video_id(url): |
| """Extract YouTube video ID from URL""" |
| pattern = r'(?:v=|\/)([\w-]{11})(?:\?|\/|&|$)' |
| match = re.search(pattern, url) |
| return match.group(1) if match else None |
|
|
| async def get_transcript(video_id: str): |
| """Get transcript for YouTube video""" |
| try: |
| transcript_list = await asyncio.get_event_loop().run_in_executor( |
| None, YouTubeTranscriptApi.get_transcript, video_id |
| ) |
| return transcript_list |
| |
| except Exception as e: |
| print(f"Error fetching transcript: {e}") |
| return f"Error: {str(e)}" |
|
|
| async def setup_h2ogpt_collection(transcript, video_id): |
| """Setup H2O GPT collection for the video transcript""" |
| try: |
| collection_id = client.create_collection( |
| name=f'YouTube_Video_{video_id}', |
| description='YouTube video transcript for chat interaction' |
| ) |
| |
| with open(f'transcript_{video_id}.txt', 'w', encoding='utf-8') as f: |
| f.write(transcript) |
| |
| with open(f'transcript_{video_id}.txt', 'rb') as f: |
| upload_id = client.upload(f'transcript_{video_id}.txt', f) |
| |
| client.ingest_uploads(collection_id, [upload_id]) |
| os.remove(f'transcript_{video_id}.txt') |
| |
| return collection_id |
| except Exception as e: |
| return f"Error setting up H2O GPT: {str(e)}" |
|
|
| async def get_gpt_response(collection_id, question): |
| """Get response from H2O GPT""" |
| try: |
| chat_session_id = client.create_chat_session(collection_id) |
| with client.connect(chat_session_id) as session: |
| response = session.query( |
| question, |
| timeout=60, |
| rag_config={"rag_type": "rag"} |
| ) |
| return response.content |
| except Exception as e: |
| return f"Error getting response: {str(e)}" |
|
|
| @app('/chatbot') |
| async def serve(q: Q): |
| if not q.client.initialized: |
| q.client.initialized = True |
| |
| |
| q.page['header'] = ui.header_card( |
| box='1 1 12 1', |
| title='YouTube Video Transcript Chatbot & Analysis | xAmplify', |
| subtitle='Enter a YouTube URL to analyse and chat about the video content', |
| color='primary' |
| ) |
| |
| |
| q.page['url_form'] = ui.form_card( |
| box='1 2 12 1', |
| items=[ |
| ui.inline([ |
| ui.textbox( |
| name='video_url', |
| |
| placeholder='Enter YouTube video URL...', |
| width='800px' |
| ), |
| ui.button( |
| name='submit_url', |
| label='Fetch Transcript', |
| primary=True |
| ), |
| ui.button( |
| name='clear_chat', |
| label='Clear Chat', |
| icon='Delete' |
| ) |
| ]) |
| ] |
| ) |
| |
| |
| q.page['status'] = ui.form_card( |
| box='1 3 12 1', |
| items=[ |
| ui.text('Please enter a YouTube URL to begin.') |
| ] |
| ) |
| |
| |
| q.page['transcript'] = ui.form_card( |
| box='1 4 6 4', |
| title='Video Transcript', |
| items=[ |
| ui.text('Transcript will appear here...') |
| ] |
| ) |
| |
| |
| q.page['word_freq'] = ui.plot_card( |
| box='1 8 3 4', |
| title='Word Frequency Analysis', |
| |
| data=data('word count', rows=[('', 0)], pack=True), |
| plot=ui.plot([ui.mark(type='interval', x='=word', y='=count')]) |
| ) |
| |
| q.page['sentiment'] = ui.plot_card( |
| box='4 8 3 4', |
| title='Sentiment Flow', |
| |
| data=data('index sentiment', rows=[(0, 0)], pack=True), |
| plot=ui.plot([ui.mark(type='line', x='=index', y='=sentiment')]) |
| ) |
| |
| |
| q.page['topics'] = ui.markdown_card( |
| box='7 8 6 4', |
| title='Key Topics', |
| content='Key topics discussed in the video with their frequency of mention', |
| ) |
| |
| |
| q.page['chat'] = ui.chatbot_card( |
| box='7 4 6 4', |
| data=data(fields='content from_user', t='list'), |
| name='chatbot', |
| events=['feedback'], |
| placeholder='Type your question here...', |
| disabled=True, |
| ) |
| |
| |
| q.page['feedback'] = ui.form_card( |
| box='1 12 12 1', |
| items=[ |
| ui.inline([ |
| ui.text_l('Response Feedback'), |
| ui.text(name='feedback_text', content='No feedback yet.'), |
| ui.button(name='export_chat', label='Export Chat', icon='Download') |
| ]) |
| ] |
| ) |
| |
| |
| if q.args.submit_url: |
| url = q.args.video_url |
| video_id = extract_video_id(url) |
| |
| if not video_id: |
| q.page['status'].items = [ |
| ui.message_bar( |
| type='error', |
| text='Invalid YouTube URL. Please check and try again.' |
| ) |
| ] |
| return |
| |
| |
| q.page['status'].items = [ |
| ui.progress(label='Processing video transcript...', value=True) |
| ] |
| await q.page.save() |
| |
| |
| transcript_list = await get_transcript(video_id) |
| if isinstance(transcript_list, str) and transcript_list.startswith('Error'): |
| q.page['status'].items = [ |
| ui.message_bar(type='error', text=transcript_list) |
| ] |
| return |
| |
| |
| q.client.transcript = ' '.join([entry['text'] for entry in transcript_list]) |
| analyzer = TranscriptAnalyzer(transcript_list) |
| analysis = analyzer.analyze() |
| |
| |
| transcript_items = [] |
| transcript_items.append(ui.text_xl('Video Transcript')) |
| transcript_items.append(ui.separator()) |
|
|
| for segment in analysis['time_segments']: |
| |
| transcript_items.append( |
| ui.text( |
| f"**[{segment['start_time']} - {segment['end_time']}]**", |
| size='s' |
| ) |
| ) |
| |
| |
| transcript_items.append(ui.text(segment['text'])) |
| |
| |
| sentiment_value = (segment['sentiment'] + 1) / 2 |
| transcript_items.append( |
| ui.progress( |
| label='Sentiment', |
| value=sentiment_value, |
| caption=f"{'Positive' if segment['sentiment'] > 0.1 else 'Negative' if segment['sentiment'] < -0.1 else 'Neutral'}" |
| ) |
| ) |
| |
| |
| if segment['topics']: |
| transcript_items.append( |
| ui.text( |
| f"Topics: {', '.join(segment['topics'])}", |
| size='s' |
| ) |
| ) |
| |
| |
| transcript_items.append(ui.separator()) |
| |
| q.page['transcript'].items = transcript_items |
| |
| |
| word_freq_data = [(word, count) for word, count in analysis['word_freq']] |
| q.page['word_freq'].data = data('word count', rows=word_freq_data, pack=True) |
|
|
| |
| sentiment_data = [(i, score) for i, score in enumerate(analysis['sentiments'])] |
| q.page['sentiment'].data = data('index sentiment', rows=sentiment_data, pack=True) |
| |
| |
| topics_md = "## Key Topics Discussed\n" + "\n".join([ |
| f"- {topic} ({count} mentions)\n Related: {', '.join(related)}" |
| for topic, count, related in analysis['topics'] |
| ]) |
| q.page['topics'].content = topics_md |
| |
| |
| collection_id = await setup_h2ogpt_collection(q.client.transcript, video_id) |
| if isinstance(collection_id, str) and collection_id.startswith('Error'): |
| q.page['status'].items = [ |
| ui.message_bar(type='error', text=collection_id) |
| ] |
| return |
| |
| |
| q.client.collection_id = collection_id |
| |
| |
| q.page['chat'].disabled = False |
| q.page['status'].items = [ |
| ui.message_bar( |
| type='success', |
| text='Transcript processed successfully! You can now ask questions about the video.' |
| ) |
| ] |
| |
| |
| if q.args.clear_chat: |
| q.page['chat'].data = data(fields='content from_user', t='list') |
| q.page['feedback'].items[0][1].content = 'Chat history cleared.' |
| |
| |
| if q.args.export_chat: |
| if hasattr(q.client, 'transcript'): |
| chat_history = [] |
| for msg in q.page['chat'].data: |
| |
| prefix = "User: " if msg[1] else "Response: " |
| chat_history.append(f'{prefix}{msg[0]}') |
| |
| |
| chat_history_text = '\n'.join(chat_history) |
|
|
| |
| export_content = f'''YouTube Video Transcript Chatbot |
| |
| Transcript: |
| {q.client.transcript} |
| |
| Chat History: |
| {chat_history_text}''' |
| |
| q.page['export'] = ui.form_card( |
| box='1 13 12 2', |
| items=[ |
| ui.text_area( |
| name='export_content', |
| label='Chat Export (Copy and save)', |
| value=export_content, |
| height='200px' |
| ) |
| ] |
| ) |
| |
| |
| if q.args.chatbot: |
| |
| user_message = f"User: {q.args.chatbot}" |
| q.page['chat'].data += [user_message, True] |
| await q.page.save() |
| |
| if hasattr(q.client, 'collection_id'): |
| response = await get_gpt_response(q.client.collection_id, q.args.chatbot) |
| |
| formatted_response = f"Response: {response}" |
| q.page['chat'].data += [formatted_response, False] |
| else: |
| |
| q.page['chat'].data += ['Response: Please fetch a video transcript first.', False] |
| |
| |
| if q.events.chatbot and q.events.chatbot.feedback: |
| feedback = q.events.chatbot.feedback |
| q.page['feedback'].items[0][1].content = f'Latest feedback: {feedback.type} on "{feedback.message}"' |
| |
| await q.page.save() |