ryangpt / app.py
debaghtk's picture
add submit button
bdde360
import os
import gradio as gr
from langchain_community.vectorstores import FAISS
from langchain_openai import OpenAIEmbeddings
from langchain.text_splitter import CharacterTextSplitter
from langchain.schema import Document
from dotenv import load_dotenv
from openai import AsyncOpenAI
import tiktoken
import asyncio
import re
import faiss
import numpy as np
import requests
import tempfile
import random
# Load environment variables from .env file
load_dotenv()
# Set your OpenAI API key from environment variable
openai_api_key = os.getenv("OPENAI_API_KEY")
client = AsyncOpenAI(api_key=openai_api_key)
# Get ElevenLabs API key from environment variable
elevenlabs_api_key = os.getenv("ELEVENLABS_API_KEY")
# Specify the voice ID
voice_id = os.getenv("ELEVENLABS_VOICE_ID")
# Load and process the transcripts
documents = []
folder_path = "transcriptions-local" if os.getenv("ENVIRONMENT") == "local" else "transcriptions"
for filename in os.listdir(folder_path):
if filename.endswith(".txt"):
file_path = os.path.join(folder_path, filename)
with open(file_path, "r", encoding='utf-8') as f:
content = f.read()
# Extract video ID from filename
base_name = os.path.splitext(filename)[0]
video_id = base_name.replace("_transcription", "")
youtube_link_base = f"https://www.youtube.com/watch?v={video_id}"
# Parse the transcript content to extract entries with timestamps
lines = content.splitlines()
for line in lines:
# Updated regex to match your transcript format
match = re.match(r"^\[(\d+\.\d+) - (\d+\.\d+)\]\s*(.*)$", line)
if match:
start_time = float(match.group(1))
end_time = float(match.group(2))
text = match.group(3)
# Create a Document for each transcript entry
entry = Document(
page_content=text,
metadata={
"youtube_link": youtube_link_base,
"start_time": start_time,
"end_time": end_time,
"timestamp_link": f"{youtube_link_base}&t={int(start_time)}"
}
)
documents.append(entry)
else:
# Handle other lines if needed
continue
# Check documents length
print(f"Number of documents loaded: {len(documents)}")
# Continue with splitting documents if necessary
text_splitter = CharacterTextSplitter(chunk_size=500, chunk_overlap=50)
split_documents = text_splitter.split_documents(documents)
# Check split_documents length
print(f"Number of documents after splitting: {len(split_documents)}")
# Create OpenAI embeddings and use FAISS as the vector store for retrieval
embeddings = OpenAIEmbeddings(openai_api_key=openai_api_key)
print("Generating embeddings...")
# Generate embeddings and create the vector store
vector_store = FAISS.from_documents(split_documents, embeddings)
# Define the conversation history
conversation_history = []
# Function to count tokens
def count_tokens(messages):
encoding = tiktoken.encoding_for_model("gpt-3.5-turbo")
return sum(len(encoding.encode(message['content'])) for message in messages if 'content' in message)
# Function to truncate messages to fit within the token limit
def truncate_messages(messages, max_tokens):
encoding = tiktoken.encoding_for_model("gpt-3.5-turbo")
truncated_messages = []
total_tokens = 0
# Try to include the system message if it exists
system_message = next((msg for msg in messages if msg['role'] == 'system'), None)
if system_message:
truncated_messages.append(system_message)
total_tokens = count_tokens([system_message])
# Always include the latest user message
user_message = next(msg for msg in reversed(messages) if msg['role'] == 'user')
truncated_messages.append(user_message)
total_tokens += count_tokens([user_message])
# Add other messages if there's room, prioritizing more recent messages
for message in reversed(messages):
if message['role'] != 'system' and message != user_message:
message_tokens = count_tokens([message])
if total_tokens + message_tokens <= max_tokens:
truncated_messages.insert(1, message) # Insert after system message or at the beginning
total_tokens += message_tokens
else:
break
return truncated_messages
# Function to get dynamic threshold
def get_dynamic_threshold(scores):
if not scores:
return 0
mean_score = sum(scores) / len(scores)
return mean_score * 0.8 # You can adjust this multiplier
# Function to handle user input and generate response
async def chatbot_response(user_input, history):
global conversation_history
max_context_length = 16000 # Leave some room for the response
system_message = {"role": "system", "content": "You are Ryan Fernando, a celebrity nutritionist and celebrity himself with more than a million followers on social media. You are answering questions based on the transcription of your YouTube videos. You are also a helpful assistant."}
messages = [system_message] + conversation_history + [{"role": "user", "content": user_input}]
relevant_docs = vector_store.similarity_search_with_score(user_input, k=3)
# Calculate dynamic threshold
scores = [score for _, score in relevant_docs]
threshold = get_dynamic_threshold(scores)
filtered_docs = []
for doc, score in relevant_docs:
if score > threshold:
filtered_docs.append((doc, score))
relevant_text = doc.page_content
start_time = doc.metadata.get("start_time", 0)
timestamp_link = doc.metadata.get("timestamp_link", "Link not found")
messages.append({
"role": "system",
"content": f"Consider this relevant information (relevance score: {score:.2f}, timestamp: {start_time}s): {relevant_text}"
})
# Check token count and break if we're approaching the limit
if count_tokens(messages) > max_context_length:
break
if filtered_docs:
messages.append({"role": "system", "content": f"Based on the above information, answer the user's question: {user_input}"})
else:
timestamp_link = "No relevant video found."
# Final truncation to ensure we're within limits
messages = truncate_messages(messages, max_context_length)
response = await client.chat.completions.create(model="gpt-3.5-turbo", messages=messages)
answer = response.choices[0].message.content
if filtered_docs:
# Get the most relevant document (highest score)
most_relevant_doc, highest_score = max(filtered_docs, key=lambda x: x[1])
timestamp_link = most_relevant_doc.metadata.get("timestamp_link", "Link not found")
start_time = most_relevant_doc.metadata.get("start_time", 0)
answer_with_link_and_description = (
f"{answer}\n\n"
f"Watch the most relevant part of the video here: {timestamp_link}"
)
else:
answer_with_link_and_description = f"{answer}\n\nNo relevant video found for this query."
# Generate speech using ElevenLabs
headers = {
"Accept": "audio/mpeg",
"xi-api-key": elevenlabs_api_key,
"Content-Type": "application/json"
}
data = {
"text": answer,
"voice_settings": {
"stability": 0.5,
"similarity_boost": 0.75
}
}
tts_url = f"https://api.elevenlabs.io/v1/text-to-speech/{voice_id}"
response = requests.post(tts_url, json=data, headers=headers)
if response.status_code == 200:
# Create a temporary file
temp_audio_file = tempfile.NamedTemporaryFile(suffix=".mp3", delete=False)
temp_audio_file.write(response.content)
temp_audio_file.close()
audio_file_path = temp_audio_file.name
else:
print(f"Error in ElevenLabs API: {response.status_code}, {response.text}")
audio_file_path = None
# Update conversation history
conversation_history.append({"role": "user", "content": user_input})
conversation_history.append({"role": "assistant", "content": answer_with_link_and_description})
conversation_history = truncate_messages(conversation_history, max_context_length // 2) # Use half the max length for history
return answer_with_link_and_description, audio_file_path
async def main_chatbot(message, history):
print(f"Main chatbot function called. Message: {message}")
response, audio_path = await chatbot_response(message, history)
history.append({"role": "human", "content": message})
history.append({"role": "assistant", "content": response})
return history, audio_path
with gr.Blocks() as demo:
chatbot = gr.Chatbot()
msg = gr.Textbox()
submit = gr.Button("Submit")
clear = gr.Button("Clear")
audio_output = gr.Audio(label="Assistant's Voice")
async def user(user_message, history):
print(f"User function called. Message: {user_message}")
return "", history + [[user_message, None]]
async def bot(history):
print(f"Bot function called. History length: {len(history)}")
if not history:
print("History is empty")
return history, None
user_message = history[-1][0]
print(f"User message: {user_message}")
bot_response, audio_path = await chatbot_response(user_message, history[:-1])
history[-1][1] = bot_response
return history, audio_path
submit.click(user, [msg, chatbot], [msg, chatbot], queue=False).then(
bot, chatbot, [chatbot, audio_output]
)
msg.submit(user, [msg, chatbot], [msg, chatbot], queue=False).then(
bot, chatbot, [chatbot, audio_output]
)
clear.click(lambda: None, None, chatbot, queue=False)
demo.launch()
CHUNK_SIZE = 10000 # Adjust as needed
def save_faiss_index(index, filepath):
faiss.write_index(index, filepath)
def load_faiss_index(filepath):
return faiss.read_index(filepath)
def process_documents_in_chunks(documents, embeddings, index_filepath):
index = None
if os.path.exists(index_filepath):
index = load_faiss_index(index_filepath)
print("Loaded existing FAISS index.")
else:
# Initialize a new index
embedding_size = len(embeddings.embed_query("test"))
index = faiss.IndexFlatL2(embedding_size)
print("Created new FAISS index.")
num_chunks = len(documents) // CHUNK_SIZE + int(len(documents) % CHUNK_SIZE > 0)
for i in range(num_chunks):
chunk_docs = documents[i*CHUNK_SIZE:(i+1)*CHUNK_SIZE]
texts = [doc.page_content for doc in chunk_docs]
metadatas = [doc.metadata for doc in chunk_docs]
# Generate embeddings for the chunk
chunk_embeddings = embeddings.embed_documents(texts)
# Convert embeddings to a numpy array
embedding_array = np.array(chunk_embeddings).astype("float32")
# Add embeddings to the index
index.add(embedding_array)
# Optionally, store metadata separately (e.g., in a list or a database)
# For simplicity, we'll assume we can retrieve documents by index
# Save the index after each chunk
save_faiss_index(index, index_filepath)
print(f"Processed chunk {i+1}/{num_chunks} and updated FAISS index.")
def random_response(message, history):
response = random.choice(["Yes", "No"])
history.append({"role": "human", "content": message})
history.append({"role": "assistant", "content": response})
return history