voice_verse / src /streamlit_app.py
Kumaria's picture
Update src/streamlit_app.py
0b3f7a0 verified
import os
import logging
import traceback
import streamlit as st
from dotenv import load_dotenv
import numpy as np
import requests
import asyncio
import edge_tts
from langchain_community.document_loaders import DirectoryLoader, TextLoader
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_community.vectorstores import FAISS
from langchain.embeddings.base import Embeddings
from langchain_core.documents import Document
from huggingface_hub import InferenceClient
import io
from PyPDF2 import PdfReader
from docx import Document as DocxDocument
# Load environment variables if .env file exists
load_dotenv()
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler("app.log"),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
st.set_page_config(page_title="Voice Verse", layout="wide")
class HuggingFaceAPIEmbeddings(Embeddings):
"""Custom embeddings class using HuggingFace Hub InferenceClient."""
def __init__(self, api_key: str, model_name: str):
self.client = InferenceClient(token=api_key)
self.model_name = model_name
def embed_documents(self, texts: list[str]) -> list[list[float]]:
"""Embed a list of documents."""
embeddings = []
for text in texts:
try:
result = self.client.feature_extraction(text, model=self.model_name)
if isinstance(result, np.ndarray):
embeddings.append(result.tolist())
else:
embedments.append(result)
except Exception as e:
st.error(f"Embedding error for text: {text[:50]}... | Error: {e}")
raise
return embeddings
def embed_query(self, text: str) -> list[float]:
"""Embed a single query."""
return self.embed_documents([text])[0]
st.title("πŸŽ™οΈ Voice Verse")
def process_uploaded_files(uploaded_files, _hf_token, _embedding_model, _chunk_size):
"""Process uploaded files and create vector store."""
all_docs = []
for uploaded_file in uploaded_files:
file_extension = uploaded_file.name.split(".")[-1].lower()
if file_extension == "pdf":
pdf_reader = PdfReader(uploaded_file)
text = ""
for page in pdf_reader.pages:
text += page.extract_text()
all_docs.append(Document(page_content=text, metadata={"source": uploaded_file.name}))
elif file_extension == "docx":
doc = DocxDocument(uploaded_file)
text = "\n".join([para.text for para in doc.paragraphs])
all_docs.append(Document(page_content=text, metadata={"source": uploaded_file.name}))
elif file_extension == "txt":
text = uploaded_file.read().decode("utf-8")
all_docs.append(Document(page_content=text, metadata={"source": uploaded_file.name}))
if not all_docs:
return None, 0, 0
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=_chunk_size,
chunk_overlap=200,
separators=["\n\n", "\n", ". ", " ", ""]
)
chunks = text_splitter.split_documents(all_docs)
embeddings = HuggingFaceAPIEmbeddings(
api_key=_hf_token,
model_name=_embedding_model
)
vectorstore = FAISS.from_documents(
documents=chunks,
embedding=embeddings
)
return vectorstore, len(all_docs), len(chunks)
def generate_answer(query: str, context: str, token: str, model: str) -> str:
"""Use HuggingFace Inference API to generate an answer."""
client = InferenceClient(token=token)
system_message = "You are a helpful AI assistant. Answer questions based ONLY on the provided context. If the answer is not in the context, say 'I cannot find this information in the provided documents'."
user_message = f"Context:\n{context}\n\nQuestion: {query}"
try:
messages = [
{"role": "system", "content": system_message},
{"role": "user", "content": user_message}
]
response = client.chat_completion(
messages=messages,
model=model,
max_tokens=512,
temperature=0.2,
top_p=0.9,
)
if hasattr(response, 'choices') and len(response.choices) > 0:
answer = response.choices[0].message.content.strip()
return answer if answer else "⚠️ Model returned empty response"
else:
return "⚠️ Unexpected response format"
except Exception as e:
error_msg = str(e).lower()
full_error = traceback.format_exc()
logger.error(f"Generate answer error: {full_error}")
if "503" in error_msg or "loading" in error_msg:
return f"⚠️ Model is currently loading. Please wait 20-30 seconds and try again. [Details: {str(e)}]"
elif "401" in error_msg or "unauthorized" in error_msg:
return f"⚠️ Authentication failed. Please check your HuggingFace token. [Details: {str(e)}]"
elif "403" in error_msg or "forbidden" in error_msg:
return f"⚠️ Access forbidden. Make sure 'Make calls to Inference Providers' is enabled. [Details: {str(e)}]"
elif "timeout" in error_msg:
return f"⚠️ Request timed out. Please try again. [Details: {str(e)}]"
elif "not supported" in error_msg:
return f"⚠️ This model doesn't support chat completion. Try selecting a different model from the sidebar. [Details: {str(e)}]"
else:
return f"⚠️ Error: {str(e)}\n\n{full_error}"
def generate_creative_content(prompt_type, context, token, model):
"""Generate specialized creative scripts based on context."""
prompts = {
"Script": f"Explain the contents of the document in form of an interesting story:\n{context}",
"Podcast": f"Generate a 2-person Management case study podcast dialogue (Pranjal and Harnur) discussing this content:\n{context}",
"Story": f"Turn this content into a compelling narrative story:\n{context}",
"Debate": f"Generate a 2 person debate script (Pranjal and Harnur) with a Pro and Con perspective on the following content:\n{context}",
"Song": f"Write rap song lyrics based on this content:\n{context}"
}
query = prompts.get(prompt_type, prompts["Script"])
return generate_answer(query, context, token, model)
async def generate_voice_edge_async(text: str, voice: str) -> bytes | None:
"""Internal async function to generate voice using edge-tts."""
try:
communicate = edge_tts.Communicate(text, voice)
audio_data = b""
async for chunk in communicate.stream():
if chunk["type"] == "audio":
audio_data += chunk["data"]
return audio_data
except Exception as e:
logger.error(f"edge-tts error for voice {voice}: {e}")
return None
def generate_voice_edge(text: str, voice: str = "en-US-AndrewNeural") -> bytes | None:
"""Wrapper to run the async edge-tts function."""
try:
return asyncio.run(generate_voice_edge_async(text, voice))
except Exception as e:
logger.error(f"Failed to run edge-tts wrapper: {e}")
return None
def parse_podcast_script(script: str) -> list[dict]:
"""Parse dialogue labels like 'Host A:' and 'Host B:' into fragments."""
segments = []
lines = script.split("\n")
current_speaker = "Pranjal"
current_text = ""
for line in lines:
line = line.strip()
if not line: continue
# Check for speaker labels
if "Pranjal" in line:
if current_text:
segments.append({"speaker": current_speaker, "text": current_text.strip()})
current_speaker = "Pranjal"
current_text = line.replace("pranjal", "").strip()
elif "Harnur" in line:
if current_text:
segments.append({"speaker": current_speaker, "text": current_text.strip()})
current_speaker = "Harnur"
current_text = line.replace("Harnur", "").strip()
elif ":" in line[:15] and not line.startswith("http"): # Generic speaker detection
if current_text:
segments.append({"speaker": current_speaker, "text": current_text.strip()})
parts = line.split(":", 1)
current_speaker = parts[0].strip()
current_text = parts[1].strip()
else:
current_text += " " + line
if current_text:
segments.append({"speaker": current_speaker, "text": current_text.strip()})
return segments
def generate_multi_voice_podcast(script: str) -> bytes | None:
"""
Generate a multi-voice podcast by concatenating raw MP3 bytes.
No FFmpeg or pydub required β€” works on Windows and HuggingFace Spaces.
"""
segments = parse_podcast_script(script)
if not segments:
return None
# Assign voices dynamically based on unique speakers found in the script
unique_speakers = []
for s in segments:
if s["speaker"] not in unique_speakers:
unique_speakers.append(s["speaker"])
available_voices = [
"en-US-AndrewNeural",
"en-US-EmmaNeural",
"en-GB-SoniaNeural",
"en-GB-RyanNeural"
]
speaker_voice_map = {
speaker: available_voices[i % len(available_voices)]
for i, speaker in enumerate(unique_speakers)
}
all_audio_chunks = []
progress_bar = st.progress(0, text="Generating voices...")
for i, seg in enumerate(segments):
voice = speaker_voice_map.get(seg["speaker"], "en-US-AndrewNeural")
logger.info(f"Segment {i+1}/{len(segments)} | Speaker: {seg['speaker']} | Voice: {voice}")
progress_bar.progress(
(i + 1) / len(segments),
text=f"Generating voice for {seg['speaker']}..."
)
chunk_bytes = generate_voice_edge(seg["text"], voice)
if chunk_bytes:
all_audio_chunks.append(chunk_bytes)
progress_bar.empty()
if all_audio_chunks:
# Concatenate raw MP3 bytes β€” no FFmpeg needed
return b"".join(all_audio_chunks)
return None
def generate_voice(text: str, token: str = None) -> bytes | None:
"""
Generate audio using gTTS (Google Text-to-Speech).
Free, no API key required, works on Windows and HuggingFace Spaces.
Returns MP3 bytes, or None on failure.
"""
try:
from gtts import gTTS
except ImportError:
st.error("⚠️ gTTS not installed. Run: pip install gtts")
logger.error("gTTS not installed.")
return None
# Strip markdown symbols so they aren't read aloud
clean_text = (
text.replace("**", "").replace("*", "")
.replace("#", "").replace("`", "")
.replace("---", "").strip()
)
clean_text = clean_text[:2000] # Cap length for reasonable audio duration
if not clean_text:
st.warning("⚠️ No text to convert to speech.")
return None
logger.info(f"Generating TTS via gTTS | snippet: {clean_text[:60]}...")
try:
tts = gTTS(text=clean_text, lang="en", slow=False)
audio_buffer = io.BytesIO()
tts.write_to_fp(audio_buffer)
audio_buffer.seek(0)
audio_bytes = audio_buffer.read()
logger.info(f"gTTS success. Bytes: {len(audio_bytes)}")
return audio_bytes
except Exception as e:
full_error = traceback.format_exc()
logger.error(f"gTTS error: {full_error}")
st.error(f"⚠️ TTS generation failed: {str(e)}")
with st.expander("Show Traceback"):
st.code(full_error)
return None
# Sidebar
with st.sidebar:
st.header("Configuration")
hf_token = os.getenv("HF_TOKEN", "")
embedding_model = st.selectbox(
"Embedding Model",
[
"sentence-transformers/all-MiniLM-L6-v2",
"BAAI/bge-small-en-v1.5",
"sentence-transformers/all-mpnet-base-v2"
],
help="Lightweight models that run on HuggingFace's servers"
)
llm_model = st.selectbox(
"LLM Model",
[
"meta-llama/Llama-3.2-3B-Instruct",
"mistralai/Mistral-7B-Instruct-v0.2",
"HuggingFaceH4/zephyr-7b-beta",
"microsoft/Phi-3-mini-4k-instruct",
"google/gemma-2-2b-it"
],
help="Language model for generating answers (chat-optimized models)"
)
chunk_size = st.slider("Chunk Size", 500, 2000, 1000, 100)
num_results = st.slider("Number of Retrieved Documents", 1, 5, 3)
st.markdown("---")
st.markdown("### πŸ“‹ Setup Instructions")
st.markdown(
"1. Go to [HuggingFace](https://huggingface.co/settings/tokens)\n"
"2. Create **Fine-grained** token\n"
"3. βœ… Enable **'Make calls to Inference Providers'**\n"
"4. Copy and paste token above"
)
# Initialize session state for chat history
if "messages" not in st.session_state:
st.session_state.messages = []
# Main Application Logic
if not hf_token:
st.warning("⚠️ Please enter your HuggingFace token in the sidebar.")
st.stop()
# --- Document Upload Section ---
with st.expander("πŸ“€ Upload Documents", expanded=not st.session_state.get("vector_store")):
uploaded_files = st.file_uploader(
"Upload Documents",
type=["pdf", "docx", "txt"],
accept_multiple_files=True
)
if st.button("πŸ”„ Upload the Data"):
if not uploaded_files:
st.error("Please upload some files first!")
else:
with st.spinner("Processing documents..."):
vector_store, num_docs, num_chunks = process_uploaded_files(
uploaded_files, hf_token, embedding_model, chunk_size
)
if vector_store:
st.session_state.vector_store = vector_store
st.session_state.num_docs = num_docs
st.session_state.num_chunks = num_chunks
st.success(f"βœ… Processed {num_docs} documents.")
# --- AUTO GENERATE SUMMARY & AUDIO ---
with st.spinner("Generating summary and audio..."):
try:
retriever = vector_store.as_retriever(search_kwargs={"k": num_results})
relevant_docs = retriever.invoke("Summarize the main points of the document.")
context = "\n\n".join([doc.page_content for doc in relevant_docs])
summary = generate_answer("You are a professor at an MBA College, and are tasked with summarising and explaining the concepts manegarially in detail ", context, hf_token, llm_model)
st.session_state.last_summary = summary
audio = generate_voice(summary, hf_token)
if audio:
st.session_state.last_summary_audio = audio
except Exception as e:
st.warning(f"Auto-summary failed: {e}")
else:
st.error("Failed to process documents.")
# Initialize tabs
tab1, tab2, tab3 = st.tabs(["πŸ“„ Summary", "πŸ’¬ Chatbot", "🎧 Voice Studio"])
if "vector_store" in st.session_state:
vector_store = st.session_state.vector_store
retriever = vector_store.as_retriever(search_kwargs={"k": num_results})
with tab1:
st.header("Document Summary")
if st.button("Re-generate Summary"):
with st.spinner("Summarizing..."):
relevant_docs = retriever.invoke("Summarize the main points of the document.")
context = "\n\n".join([doc.page_content for doc in relevant_docs])
summary = generate_answer("You are a professor at an MBA College, and are tasked with summarising and explaining the concepts manegarially in detail ", context, hf_token, llm_model)
st.session_state.last_summary = summary
# Update audio too
with st.spinner("Updating audio..."):
audio = generate_voice(summary, hf_token)
if audio:
st.session_state.last_summary_audio = audio
# Display summary if available in session state
if "last_summary" in st.session_state:
st.markdown(st.session_state.last_summary)
st.markdown("---")
if "last_summary_audio" in st.session_state:
st.audio(st.session_state.last_summary_audio, format="audio/mpeg")
if st.button("πŸ”Š Force Re-generate Audio"):
with st.spinner("Generating audio for summary..."):
audio = generate_voice(st.session_state.last_summary, hf_token)
if audio:
st.session_state.last_summary_audio = audio
st.success("βœ… Audio ready!")
st.audio(audio, format="audio/mpeg")
else:
st.error("❌ Could not generate audio. Check your token and try again.")
with tab2:
st.header("Chat with Documents")
for message in st.session_state.messages:
with st.chat_message(message["role"]):
st.markdown(message["content"])
if user_input := st.chat_input("Ask something about your documents..."):
st.session_state.messages.append({"role": "user", "content": user_input})
with st.chat_message("user"):
st.markdown(user_input)
with st.chat_message("assistant"):
relevant_docs = retriever.invoke(user_input)
if relevant_docs:
context = "\n\n".join([doc.page_content for doc in relevant_docs])
response = generate_answer(user_input, context, hf_token, llm_model)
st.markdown(response)
else:
response = "❌ No relevant context found."
st.markdown(response)
st.session_state.messages.append({"role": "assistant", "content": response})
with tab3:
st.header("Voice Studio")
st.info("Generate creative scripts and listen to them!")
col1, col2, col3, col4 = st.columns(4)
task = None
if col1.button("πŸ“œ Script"): task = "Script"
if col2.button("πŸŽ™οΈ Podcast"): task = "Podcast"
if col3.button("πŸ“– Story"): task = "Story"
if col4.button("βš–οΈ Debate"): task = "Debate"
# if col5.button("🎡 Song"): task = "Song"
if task:
with st.spinner(f"Generating {task}..."):
rel_docs = retriever.invoke(f"Key information for {task}")
context = "\n\n".join([doc.page_content for doc in rel_docs])
content = generate_creative_content(task, context, hf_token, llm_model)
st.subheader(f"Generated {task}")
st.markdown(content)
with st.spinner("Generating Audio..."):
if task == "Podcast" or task == "Debate":
audio = generate_multi_voice_podcast(content)
else:
audio = generate_voice_edge(content)
if audio:
st.audio(audio, format="audio/mp3")
else:
st.error("❌ Audio generation failed.")
else:
st.info("Please upload and process documents in the sidebar to get started.")
# Footer
st.markdown("---")