Spaces:
Sleeping
Sleeping
| # -*- coding: utf-8 -*- | |
| """ITI110_Final.ipynb | |
| Automatically generated by Colab. | |
| Original file is located at | |
| https://colab.research.google.com/drive/1wAe1__d6108Sb-qIL2rOlwhLXhE3B_Yo | |
| """ | |
| # Install and import necessary libraries to access Groq. | |
| import subprocess | |
| import sys | |
| # Install required packages | |
| def install_packages(): | |
| packages = ["groq", "gradio", "ultralytics", "moviepy", "requests", "soundfile", "pandas", "datetime", "openai", | |
| "pydub", "matplotlib", "numpy", "fpdf"] | |
| subprocess.check_call([sys.executable, "-m", "pip", "install"] + packages) | |
| install_packages() # Call function to install packages | |
| import os | |
| os.system("pip uninstall -y moviepy && pip install --no-cache-dir moviepy") | |
| # FOR SENTIMENT ANALYSIS - SETYANI | |
| # Install and import necessary libraries to access Groq | |
| #!pip install groq gradio opencv-python moviepy requests soundfile pydub matplotlib numpy fpdf | |
| import os | |
| import groq | |
| from groq import Groq | |
| import gradio as gr | |
| import numpy as np | |
| import tempfile | |
| import requests | |
| from moviepy import VideoFileClip | |
| from pydub import AudioSegment | |
| import matplotlib.pyplot as plt | |
| import time | |
| import seaborn as sns | |
| from collections import Counter | |
| from fpdf import FPDF | |
| # Global Variables | |
| sentiment_scores = {"positive": 1, "neutral": 0, "negative": -1} | |
| sentiment_history = [] | |
| transcribed_text = "Listening..." | |
| report_path = "sentiment_report.pdf" | |
| sentiment_trend_path = "sentiment_trend.png" | |
| sentiment_heatmap_path = "sentiment_heatmap.png" | |
| sentiment_pie_chart_path = "sentiment_pie_chart.png" | |
| emotion_trend_path = "emotiont_trend.png" | |
| emotion_heatmap_path = "emotion_heatmap.png" | |
| emotion_pie_chart_path = "emotion_pie_chart.png" | |
| # Get the key to access Groq | |
| API_KEY = os.environ.get("GROQ_API_KEY", "No Key Found") | |
| # Initialize Groq Client | |
| grog_client = groq.Groq(api_key=API_KEY) | |
| # MAIN function to convert audio into text using Groq Whisper speech-to-text service | |
| def transcribe_audio(audio_file_path): | |
| # Open the audio file | |
| with open(audio_file_path, "rb") as file: | |
| # Create an audio transcription using the grog_client API | |
| transcription = grog_client.audio.transcriptions.create( | |
| file=(audio_file_path, file.read()), # Read the audio file from the specified path and send it as input | |
| model="whisper-large-v3", # chosen Whisper model to be used for transcription | |
| #model="whisper-large-v3-turbo", # tested another Whisper model | |
| #model="distil-whisper-large-v3-en", # tested another Whisper model | |
| prompt="Specify context or spelling", # Optional prompt to provide context or spelling preferences | |
| response_format="json", # Specify the format of the response (JSON format in this case) | |
| language="en", # Specify the language of the audio (English in this case) | |
| temperature=0.0 # Control the randomness of the output (0.0 means deterministic output) | |
| ) | |
| return transcription.text | |
| # MAIN function to do sentiment analysis using Groq LLM model llama3-8b-8192 | |
| def analyze_sentiment(text): | |
| # Create a completion using the grog_client API | |
| response = grog_client.chat.completions.create( | |
| model="llama3-8b-8192", # Specify the model to be used for generating the completion | |
| messages=[ | |
| {"role": "system", "content": "You are an expert in text sentiment analysis. Analyze the sentiment of this text and return only 'Positive', 'Negative', or 'Neutral'."}, | |
| {"role": "user", "content": text} | |
| ], | |
| temperature=0.0, # Control the randomness of the output (0.0 means deterministic output) | |
| max_tokens=200 # Limit the response length to 200 tokens | |
| ) | |
| sentiment = response.choices[0].message.content | |
| #print(sentiment) | |
| sentiment_history.append(sentiment_scores.get(sentiment.lower(), 0)) | |
| print(sentiment_history) | |
| return sentiment | |
| # Integrated and tested AZURE services for Speech-to-text using Whisper and | |
| # Azure Sentiment Analysis using gpt-35-turbo-16k vs Azure LANGUAGE service for text analytic | |
| #!pip install azure-cognitiveservices-speech azure-ai-textanalytics azure-core azure-identity | |
| # Removed Azure codes here to protect the keys, only included in the project submission | |
| # CLEANUP transcribed text before doing Sentiment Analysis | |
| import re #used for regular expressions | |
| # Helper function to remove suffixes from numbers in the input text. | |
| def remove_suffixes(text): | |
| # Regular expression to find numbers followed by common suffixes | |
| pattern = r'(\d+)(st|nd|rd|th)' | |
| # Replace the matched pattern with just the number (capture group 1) | |
| cleaned_text = re.sub(pattern, r'\1', text) | |
| return cleaned_text # Return the cleaned text without suffixes | |
| # Helper function to remove repeated phrases in the transcript text which sometimes exist due to transcription error | |
| def remove_repeated_phrases(text): | |
| # Regular expression to find repeated phrases with length up to 3 words | |
| pattern = r'\b(\w+\s+\w+\s+\w+|\w+\s+\w+|\w+)\s+\1\b' | |
| prev_text = '' | |
| while prev_text != text: | |
| prev_text = text # Store previous version for comparison | |
| text = re.sub(pattern, r'\1 \1', text, flags=re.IGNORECASE) # Keep only two instances for genuine repeat, e.g: bye. bye. | |
| return text # Return the cleaned text without repeated phrases | |
| # Example Usage | |
| #text = "hello world hello world hello world test test test again again again" | |
| #cleaned_text = remove_repeated_phrases(text) | |
| #print(cleaned_text) # Output: "hello world hello world test test again again" | |
| # Helper function for text preprocessing before calculating WER | |
| def preprocess_text(text): | |
| text = remove_repeated_phrases(text) #remove repeated phrases due to transcription error | |
| text = text.replace('\n', ' ') #replace newline with space | |
| text = text.lower() #convert text to lower case | |
| text = text.replace('-', '') #replace hypen with none | |
| text = re.sub(r'[^a-z\s0-9!?]', ' ', text)#replace with space those NON lowercase letters, NON whitespace chars, NON numbers, NON exclamation, NON question mark | |
| text = re.sub(r'\b(okay)\b', 'ok', text) #replace okay with ok to standardize the format | |
| text = re.sub(r'\b(yeah)\b', 'yes', text) #replace yeah with yes to standardize the format | |
| text = re.sub(r'\b(um)\b', '', text) #remove the word um filler word | |
| text = re.sub(r'\b(uh)\b', '', text) #remove the word uh filler word | |
| text = remove_suffixes(text) #remove suffixes behind numbers like st, nd, rd, th | |
| text = re.sub(r'\s+', ' ', text).strip() #Removes extra spaces, including leading, trailing, and multiple spaces between words | |
| return text # Return the cleaned text after preprocessing | |
| # HELPER function for Display Output of Sentiment Analysis | |
| # Update the Sentiment Trend Over Time real-time graph | |
| def update_plot(): | |
| plt.clf() | |
| # Generate timestamps | |
| timestamps = list(range(len(sentiment_history))) | |
| # Define color mapping for sentiment scores | |
| colors = ["red" if s < -0.3 else "yellow" if -0.3 <= s <= 0.3 else "green" for s in sentiment_history] | |
| plt.figure(figsize=(8, 4)) | |
| # Plot sentiment scores with colored markers | |
| for i in range(len(sentiment_history)): | |
| plt.plot(timestamps[i], sentiment_history[i], marker="o", color=colors[i], markersize=8) | |
| # Plot line segments with the color of the next point | |
| for i in range(len(sentiment_history) - 1): | |
| plt.plot(timestamps[i:i+2], sentiment_history[i:i+2], linestyle="-", color=colors[i+1], linewidth=2) | |
| plt.title("Sentiment Trend Over Time") | |
| plt.xlabel("Time (Speech Segments)") | |
| plt.ylabel("Sentiment Score") | |
| plt.ylim([-1, 1]) | |
| plt.yticks([-1, 0, 1], ["Negative", "Neutral", "Positive"]) | |
| plt.savefig(sentiment_trend_path) # Save the plot as an image | |
| plt.close() | |
| # Generate the sentiment heatmap using red, yellow, and green colors. | |
| def generate_sentiment_heatmap(): | |
| plt.clf() | |
| #if not sentiment_history: | |
| # return | |
| # Convert sentiment scores to corresponding colors | |
| heatmap_data = np.array(sentiment_history).reshape(1, -1) | |
| #print(heatmap_data) | |
| # Define color mapping for sentiment scores | |
| color_mapping = ["red", "yellow", "green"] | |
| plt.figure(figsize=(6, 3)) | |
| ax = sns.heatmap(heatmap_data, annot=True, cmap=color_mapping, xticklabels=False, | |
| yticklabels=["Sentiment"], cbar=True, vmin=-1, vmax=1) | |
| # Customize color bar labels | |
| colorbar = ax.collections[0].colorbar | |
| colorbar.set_ticks([-1, 0, 1]) | |
| colorbar.set_ticklabels(["Negative", "Neutral", "Positive"]) | |
| plt.title("Sentiment Heatmap") # (Red = Negative, Yellow = Neutral, Green = Positive) | |
| plt.show() | |
| plt.savefig(sentiment_heatmap_path) | |
| plt.close() | |
| # Generate a Pie Chart for Sentiment Distribution. | |
| def generate_sentiment_pie_chart(): | |
| plt.clf() | |
| #if not sentiment_history: | |
| # return | |
| # Count occurrences of each sentiment category | |
| sentiment_labels = ["Negative", "Neutral", "Positive"] | |
| sentiment_counts = Counter(["Negative" if s < -0.3 else "Neutral" if -0.3 <= s <= 0.3 else "Positive" for s in sentiment_history]) | |
| # Extract count values | |
| counts = [sentiment_counts[label] for label in sentiment_labels] | |
| # Define colors | |
| colors = ["red", "yellow", "green"] | |
| # Plot pie chart | |
| plt.figure(figsize=(4, 4)) | |
| plt.pie(counts, labels=sentiment_labels, autopct="%1.1f%%", colors=colors, startangle=140) | |
| plt.title("Sentiment Distribution") | |
| plt.savefig(sentiment_pie_chart_path) | |
| plt.close() | |
| # Create and save a PDF report with transcription and sentiment analysis graphs. | |
| def generate_pdf_report(text): | |
| pdf = FPDF() | |
| pdf.set_auto_page_break(auto=True, margin=15) | |
| pdf.add_page() | |
| # Title | |
| pdf.set_font("Arial", style='B', size=16) | |
| pdf.cell(200, 10, "Sentiment Analysis Report", ln=True, align="C") | |
| pdf.ln(10) | |
| # Transcribed Text | |
| pdf.set_font("Arial", size=12) | |
| pdf.multi_cell(0, 10, f"Transcribed Text:\n\n{text}") | |
| pdf.ln(10) | |
| # Add images | |
| for img_path in [sentiment_trend_path, sentiment_heatmap_path, sentiment_pie_chart_path, emotion_trend_path, emotion_heatmap_path, emotion_pie_chart_path]: | |
| if os.path.exists(img_path): | |
| pdf.add_page() | |
| pdf.image(img_path, x=10, w=180) | |
| pdf.output(report_path) | |
| return report_path | |
| # FOR FACE EMOTION ANALYSYS - SONG MING | |
| #!pip install gradio ultralytics pandas matplotlib datetime | |
| import gradio as gr | |
| from ultralytics import YOLO | |
| import pandas as pd | |
| import seaborn as sns | |
| import matplotlib.pyplot as plt | |
| import logging | |
| import cv2 | |
| from datetime import datetime | |
| import os | |
| # Configure logging (optional) | |
| logging.basicConfig(filename='emotion_analysis.log', level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | |
| # Load model (outside the function) | |
| try: | |
| model = YOLO('yolo11m_affectnet_best.pt') # Replace with your model path. Download this model first! | |
| except Exception as e: | |
| logging.error(f"Error loading YOLO model: {e}. Make sure the path is correct.") | |
| print(f"Error loading YOLO model: {e}. Make sure the path is correct.") | |
| model = None | |
| emotion_labels = ["neutral", "happy", "sad", "angry", "fearful", "disgusted", "surprised", "not_detected"] | |
| # Initialize an empty global DataFrame | |
| combined_df = pd.DataFrame(columns=['Emotion', 'Confidence', 'Frame', 'Class', 'Timestamp']) | |
| def analyze_video(video_file, interval_seconds=5, confidence=30, iou=30): | |
| if model is None: return "<p>YOLO model failed to load. Check the logs.</p>" | |
| model.conf = confidence / 100.0 | |
| model.iou = iou / 100.0 | |
| cap = cv2.VideoCapture(video_file) | |
| if not cap.isOpened(): | |
| print(f"Error opening video file: {video_file}") | |
| return "<p>Error opening video file.</p>" | |
| fps = cap.get(cv2.CAP_PROP_FPS) | |
| total_frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) | |
| all_emotions_data = [] | |
| current_frame = 0 | |
| interval_frames = int(fps * interval_seconds) | |
| while current_frame < total_frame_count: | |
| cap.set(cv2.CAP_PROP_POS_FRAMES, current_frame) | |
| ret, frame = cap.read() | |
| if not ret: | |
| continue | |
| analyze_emotion(frame, current_frame, all_emotions_data) | |
| current_frame += interval_frames # Move to the next frame in the next interval | |
| print(f"Finished Processing : {current_frame}") | |
| cap.release() | |
| print(f"Finished Processing all frames") | |
| all_emotions_df = pd.DataFrame(all_emotions_data) | |
| if all_emotions_df.empty: | |
| return "No emotions detected in the video." | |
| combined_df = all_emotions_df.groupby(['Frame', 'Emotion'], as_index=False).agg({'Confidence': 'mean', 'Class': 'first', 'Timestamp': 'first'}) | |
| # Line plot | |
| plt.figure(figsize=(10, 6)) | |
| sns.lineplot(data=combined_df, x='Frame', y='Confidence', hue='Emotion', marker='o') | |
| plt.title('Emotion Detections Over Time') | |
| plt.xlabel('Frame') | |
| plt.ylabel('Confidence') | |
| #line_plot_path = os.path.abspath('line_plot.png') | |
| plt.savefig(emotion_trend_path) | |
| plt.close() | |
| # Pie chart | |
| pie_data = combined_df['Emotion'].value_counts() | |
| plt.figure(figsize=(20, 12)) | |
| plt.pie(pie_data, labels=pie_data.index, autopct='%1.1f%%', startangle=90) | |
| plt.title('Emotion Distribution') | |
| #pie_chart_path = os.path.abspath('pie_chart.png') | |
| plt.savefig(emotion_pie_chart_path) | |
| plt.close() | |
| # Heatmap | |
| plt.figure(figsize=(10, 6)) | |
| heatmap_data = pd.pivot_table(combined_df, values='Confidence', index='Frame', columns='Emotion', fill_value=0) | |
| sns.heatmap(heatmap_data, cmap='YlGnBu', cbar_kws={'label': 'Confidence'}) | |
| plt.title('Emotion Heatmap') | |
| plt.xlabel('Emotion') | |
| plt.ylabel('Frame') | |
| #heatmap_path = os.path.abspath('heatmap.png') | |
| plt.savefig(emotion_heatmap_path) | |
| plt.close() | |
| def analyze_emotion(frame, frame_index, all_emotions_data): | |
| if model is None: | |
| return | |
| results = model(frame) | |
| for result in results: | |
| boxes = result.boxes | |
| for box in boxes: | |
| conf = float(box.conf) | |
| cls = int(box.cls.item()) | |
| if cls < len(emotion_labels): | |
| predicted_emotion = emotion_labels[cls] | |
| else: | |
| predicted_emotion = 'not_detected' | |
| logging.warning(f"Predicted class {cls} out of range. Setting to 'not_detected'.") | |
| conf = 0.0 | |
| if conf > model.conf: | |
| timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f') | |
| all_emotions_data.append({ | |
| 'Emotion': predicted_emotion, | |
| 'Confidence': conf, | |
| 'Frame': frame_index, | |
| 'Class': cls, | |
| 'Timestamp': timestamp | |
| }) | |
| # MAIN FUNCTIONs FOR GRADIO APPLICATION - SETYANI | |
| # 17/2 video file sentiment analysis working | |
| # 21/2 fixed heatmap display, add button click handler for clear, download report | |
| # 23/2 integrated Azure Whisper, GPT and Language services created by Thim Wai, however the performance is too slow so switch back to Groq | |
| # 25/2 integrated Face Emotion analysis from SongMing | |
| #========================================================================================================================================== | |
| # MAIN function to process uploaded video from Gradio User Interface | |
| def process_video_gradio(video_path): | |
| global sentiment_history | |
| sentiment_history = [] # Reset sentiment history | |
| if not os.path.exists(video_path): | |
| raise ValueError("File not found.") | |
| clear_function() # clear the previous analysis files if exist | |
| video_clip = VideoFileClip(video_path) # extract video | |
| audio_clip = video_clip.audio # extract audio | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=".wav") as temp_audio: | |
| audio_clip.write_audiofile(temp_audio.name) | |
| full_audio_path = temp_audio.name | |
| audio = AudioSegment.from_wav(full_audio_path) | |
| segment_length = 5000 # 5 seconds per segment | |
| num_segments = len(audio) // segment_length | |
| transcribed_text = "" | |
| for i in range(num_segments): | |
| segment = audio[i * segment_length: (i + 1) * segment_length] # split audio into segment of 5sec each to be analysed | |
| segment_path = f"temp_segment_{i}.wav" | |
| segment.export(segment_path, format="wav") | |
| segment_text = transcribe_audio(segment_path) # CALL transcribe audio using Groq Whisper | |
| #segment_text = transcribe_audio_azure(segment_path) # CALL transcribe audio using Azure Whisper | |
| # Insert segment number inside the text for easy comparison with Sentiment Trend | |
| segment_text = f"[{i}] {segment_text}" | |
| transcribed_text += segment_text + "\n" # added new line for display purpose | |
| preprocess_text(segment_text) | |
| sentiment = analyze_sentiment(segment_text) # CALL analyze sentiment using Groq Llama | |
| #sentiment = analyze_sentiment_gpt(segment_text) # CALL analyze sentiment using Azure GPT | |
| #text_analytics_client = authenticate_text_analytics_client() # CALL analyze sentiment using Azure Language Service | |
| #sentiment = analyze_sentiment_azure(text_analytics_client, segment_text) # CALL analyze sentiment using Azure Language Service | |
| os.remove(segment_path) # Cleanup segment files | |
| update_plot() # Update plot after processing each segment | |
| yield transcribed_text, sentiment_trend_path, sentiment_heatmap_path, sentiment_pie_chart_path, emotion_trend_path, emotion_heatmap_path, emotion_pie_chart_path | |
| os.remove(full_audio_path) # Cleanup full audio file | |
| generate_sentiment_heatmap() | |
| generate_sentiment_pie_chart() | |
| yield transcribed_text, sentiment_trend_path, sentiment_heatmap_path, sentiment_pie_chart_path, emotion_trend_path, emotion_heatmap_path, emotion_pie_chart_path | |
| analyze_video(video_path) | |
| report_path = generate_pdf_report(transcribed_text) | |
| # update final heatmap and pie chart before return | |
| yield transcribed_text, sentiment_trend_path, sentiment_heatmap_path, sentiment_pie_chart_path, emotion_trend_path, emotion_heatmap_path, emotion_pie_chart_path | |
| return transcribed_text, sentiment_trend_path, sentiment_heatmap_path, sentiment_pie_chart_path, emotion_trend_path, emotion_heatmap_path, emotion_pie_chart_path | |
| # Function to handle 'Download Report' button | |
| def download_report_function(): | |
| if not os.path.exists(report_path): | |
| raise ValueError("Please upload video file for report analysis.") | |
| return report_path | |
| # Function to handle 'Clear' button | |
| def clear_function(): | |
| if os.path.isfile(sentiment_trend_path): # Ensure it is a file before attempting to delete | |
| os.remove(sentiment_trend_path) | |
| if os.path.isfile(sentiment_heatmap_path): | |
| os.remove(sentiment_heatmap_path) | |
| if os.path.isfile(sentiment_pie_chart_path): | |
| os.remove(sentiment_pie_chart_path) | |
| if os.path.isfile(emotion_trend_path): # Ensure it is a file before attempting to delete | |
| os.remove(emotion_trend_path) | |
| if os.path.isfile(emotion_heatmap_path): | |
| os.remove(emotion_heatmap_path) | |
| if os.path.isfile(emotion_pie_chart_path): | |
| os.remove(emotion_pie_chart_path) | |
| #if os.path.isfile(report_path): | |
| #os.remove(report_path) | |
| #return gr.update(value=None, interactive=True), gr.update(value="", interactive=False), gr.update(value=""), gr.update(value=""), gr.update(value=""), gr.update(value=""), gr.update(value=""), gr.update(value="") | |
| return None, None, None, None, None, None, None, None | |
| iface = gr.Interface( | |
| fn=process_video_gradio, | |
| inputs=gr.Video(label="Video"), | |
| outputs=[ | |
| gr.Textbox(label="Transcribed Text"), | |
| gr.Image(label="Sentiment Trend Over Time"), | |
| gr.Image(label="Sentiment Heatmap"), | |
| gr.Image(label="Sentiment Distribution Pie Chart"), | |
| gr.Image(label="Emotion Trend Over Time"), | |
| gr.Image(label="Emotion Heatmap"), | |
| gr.Image(label="Emotion Distribution Pie Chart") | |
| ], | |
| allow_flagging="never", # Disable flag button | |
| title="Real-Time Video Sentiment Analysis", | |
| description="Upload a video file or use your webcam for live video streaming to analyze speech sentiment dynamically.", | |
| live=True # Enable live updates for streaming | |
| ) | |
| with gr.Blocks() as iface: | |
| with gr.Row(): | |
| video_input = gr.Video(label="Video", scale=1, interactive = True) # Video box takes more space | |
| transcribed_text = gr.Textbox(label="Transcribed Text", lines=15, max_lines=15, interactive=False, scale=1) | |
| with gr.Row(): | |
| sentiment_trend = gr.Image(label="Sentiment Trend Over Time", scale=2) | |
| sentiment_heatmap = gr.Image(label="Sentiment Heatmap", scale=1) | |
| sentiment_pie_chart = gr.Image(label="Sentiment Distribution Pie Chart", scale=1) | |
| with gr.Row(): | |
| emotion_trend = gr.Image(label="Emotion Trend Over Time", scale=2) | |
| emotion_heatmap = gr.Image(label="Emotion Heatmap", scale=1) | |
| emotion_pie_chart = gr.Image(label="Emotion Distribution Pie Chart", scale=1) | |
| with gr.Row(): | |
| # Buttons for manual control | |
| download_button = gr.Button("Download Report") | |
| clear_button = gr.Button("Clear") | |
| video_input.change(fn=process_video_gradio, inputs=video_input, outputs=[transcribed_text, | |
| sentiment_trend, sentiment_heatmap, sentiment_pie_chart, | |
| emotion_trend, emotion_heatmap, emotion_pie_chart | |
| ]) | |
| # Add custom JavaScript to trigger play button after uploading | |
| instructions = gr.HTML(""" | |
| <script> | |
| document.querySelector('input[type="file"]').addEventListener('change', function() { | |
| var intervalId = setInterval(function() { | |
| var videoPlayer = document.querySelector('video'); | |
| if (videoPlayer) { | |
| videoPlayer.play(); | |
| clearInterval(intervalId); | |
| } | |
| }, 500); | |
| }); | |
| </script> | |
| """) | |
| # Link the button clicks to the functions that handle them | |
| download_button.click(fn=download_report_function, inputs=[], outputs=gr.File()) | |
| clear_button.click( | |
| fn=clear_function, | |
| inputs=[], | |
| outputs=[video_input, transcribed_text, sentiment_trend, sentiment_heatmap, sentiment_pie_chart, emotion_trend, emotion_heatmap, emotion_pie_chart]) | |
| iface.launch(inline=False, share=True) |