Spaces:
Sleeping
Sleeping
| import os | |
| import logging | |
| import traceback | |
| import streamlit as st | |
| from dotenv import load_dotenv | |
| import numpy as np | |
| import requests | |
| import asyncio | |
| import edge_tts | |
| from langchain_community.document_loaders import DirectoryLoader, TextLoader | |
| from langchain_text_splitters import RecursiveCharacterTextSplitter | |
| from langchain_community.vectorstores import FAISS | |
| from langchain.embeddings.base import Embeddings | |
| from langchain_core.documents import Document | |
| from huggingface_hub import InferenceClient | |
| import io | |
| from PyPDF2 import PdfReader | |
| from docx import Document as DocxDocument | |
| # Load environment variables if .env file exists | |
| load_dotenv() | |
| # Configure logging | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', | |
| handlers=[ | |
| logging.FileHandler("app.log"), | |
| logging.StreamHandler() | |
| ] | |
| ) | |
| logger = logging.getLogger(__name__) | |
| st.set_page_config(page_title="Voice Verse", layout="wide") | |
| class HuggingFaceAPIEmbeddings(Embeddings): | |
| """Custom embeddings class using HuggingFace Hub InferenceClient.""" | |
| def __init__(self, api_key: str, model_name: str): | |
| self.client = InferenceClient(token=api_key) | |
| self.model_name = model_name | |
| def embed_documents(self, texts: list[str]) -> list[list[float]]: | |
| """Embed a list of documents.""" | |
| embeddings = [] | |
| for text in texts: | |
| try: | |
| result = self.client.feature_extraction(text, model=self.model_name) | |
| if isinstance(result, np.ndarray): | |
| embeddings.append(result.tolist()) | |
| else: | |
| embedments.append(result) | |
| except Exception as e: | |
| st.error(f"Embedding error for text: {text[:50]}... | Error: {e}") | |
| raise | |
| return embeddings | |
| def embed_query(self, text: str) -> list[float]: | |
| """Embed a single query.""" | |
| return self.embed_documents([text])[0] | |
| st.title("ποΈ Voice Verse") | |
| def process_uploaded_files(uploaded_files, _hf_token, _embedding_model, _chunk_size): | |
| """Process uploaded files and create vector store.""" | |
| all_docs = [] | |
| for uploaded_file in uploaded_files: | |
| file_extension = uploaded_file.name.split(".")[-1].lower() | |
| if file_extension == "pdf": | |
| pdf_reader = PdfReader(uploaded_file) | |
| text = "" | |
| for page in pdf_reader.pages: | |
| text += page.extract_text() | |
| all_docs.append(Document(page_content=text, metadata={"source": uploaded_file.name})) | |
| elif file_extension == "docx": | |
| doc = DocxDocument(uploaded_file) | |
| text = "\n".join([para.text for para in doc.paragraphs]) | |
| all_docs.append(Document(page_content=text, metadata={"source": uploaded_file.name})) | |
| elif file_extension == "txt": | |
| text = uploaded_file.read().decode("utf-8") | |
| all_docs.append(Document(page_content=text, metadata={"source": uploaded_file.name})) | |
| if not all_docs: | |
| return None, 0, 0 | |
| text_splitter = RecursiveCharacterTextSplitter( | |
| chunk_size=_chunk_size, | |
| chunk_overlap=200, | |
| separators=["\n\n", "\n", ". ", " ", ""] | |
| ) | |
| chunks = text_splitter.split_documents(all_docs) | |
| embeddings = HuggingFaceAPIEmbeddings( | |
| api_key=_hf_token, | |
| model_name=_embedding_model | |
| ) | |
| vectorstore = FAISS.from_documents( | |
| documents=chunks, | |
| embedding=embeddings | |
| ) | |
| return vectorstore, len(all_docs), len(chunks) | |
| def generate_answer(query: str, context: str, token: str, model: str) -> str: | |
| """Use HuggingFace Inference API to generate an answer.""" | |
| client = InferenceClient(token=token) | |
| system_message = "You are a helpful AI assistant. Answer questions based ONLY on the provided context. If the answer is not in the context, say 'I cannot find this information in the provided documents'." | |
| user_message = f"Context:\n{context}\n\nQuestion: {query}" | |
| try: | |
| messages = [ | |
| {"role": "system", "content": system_message}, | |
| {"role": "user", "content": user_message} | |
| ] | |
| response = client.chat_completion( | |
| messages=messages, | |
| model=model, | |
| max_tokens=512, | |
| temperature=0.2, | |
| top_p=0.9, | |
| ) | |
| if hasattr(response, 'choices') and len(response.choices) > 0: | |
| answer = response.choices[0].message.content.strip() | |
| return answer if answer else "β οΈ Model returned empty response" | |
| else: | |
| return "β οΈ Unexpected response format" | |
| except Exception as e: | |
| error_msg = str(e).lower() | |
| full_error = traceback.format_exc() | |
| logger.error(f"Generate answer error: {full_error}") | |
| if "503" in error_msg or "loading" in error_msg: | |
| return f"β οΈ Model is currently loading. Please wait 20-30 seconds and try again. [Details: {str(e)}]" | |
| elif "401" in error_msg or "unauthorized" in error_msg: | |
| return f"β οΈ Authentication failed. Please check your HuggingFace token. [Details: {str(e)}]" | |
| elif "403" in error_msg or "forbidden" in error_msg: | |
| return f"β οΈ Access forbidden. Make sure 'Make calls to Inference Providers' is enabled. [Details: {str(e)}]" | |
| elif "timeout" in error_msg: | |
| return f"β οΈ Request timed out. Please try again. [Details: {str(e)}]" | |
| elif "not supported" in error_msg: | |
| return f"β οΈ This model doesn't support chat completion. Try selecting a different model from the sidebar. [Details: {str(e)}]" | |
| else: | |
| return f"β οΈ Error: {str(e)}\n\n{full_error}" | |
| def generate_creative_content(prompt_type, context, token, model): | |
| """Generate specialized creative scripts based on context.""" | |
| prompts = { | |
| "Script": f"Explain the contents of the document in form of an interesting story:\n{context}", | |
| "Podcast": f"Generate a 2-person Management case study podcast dialogue (Pranjal and Harnur) discussing this content:\n{context}", | |
| "Story": f"Turn this content into a compelling narrative story:\n{context}", | |
| "Debate": f"Generate a 2 person debate script (Pranjal and Harnur) with a Pro and Con perspective on the following content:\n{context}", | |
| "Song": f"Write rap song lyrics based on this content:\n{context}" | |
| } | |
| query = prompts.get(prompt_type, prompts["Script"]) | |
| return generate_answer(query, context, token, model) | |
| async def generate_voice_edge_async(text: str, voice: str) -> bytes | None: | |
| """Internal async function to generate voice using edge-tts.""" | |
| try: | |
| communicate = edge_tts.Communicate(text, voice) | |
| audio_data = b"" | |
| async for chunk in communicate.stream(): | |
| if chunk["type"] == "audio": | |
| audio_data += chunk["data"] | |
| return audio_data | |
| except Exception as e: | |
| logger.error(f"edge-tts error for voice {voice}: {e}") | |
| return None | |
| def generate_voice_edge(text: str, voice: str = "en-US-AndrewNeural") -> bytes | None: | |
| """Wrapper to run the async edge-tts function.""" | |
| try: | |
| return asyncio.run(generate_voice_edge_async(text, voice)) | |
| except Exception as e: | |
| logger.error(f"Failed to run edge-tts wrapper: {e}") | |
| return None | |
| def parse_podcast_script(script: str) -> list[dict]: | |
| """Parse dialogue labels like 'Host A:' and 'Host B:' into fragments.""" | |
| segments = [] | |
| lines = script.split("\n") | |
| current_speaker = "Pranjal" | |
| current_text = "" | |
| for line in lines: | |
| line = line.strip() | |
| if not line: continue | |
| # Check for speaker labels | |
| if "Pranjal" in line: | |
| if current_text: | |
| segments.append({"speaker": current_speaker, "text": current_text.strip()}) | |
| current_speaker = "Pranjal" | |
| current_text = line.replace("pranjal", "").strip() | |
| elif "Harnur" in line: | |
| if current_text: | |
| segments.append({"speaker": current_speaker, "text": current_text.strip()}) | |
| current_speaker = "Harnur" | |
| current_text = line.replace("Harnur", "").strip() | |
| elif ":" in line[:15] and not line.startswith("http"): # Generic speaker detection | |
| if current_text: | |
| segments.append({"speaker": current_speaker, "text": current_text.strip()}) | |
| parts = line.split(":", 1) | |
| current_speaker = parts[0].strip() | |
| current_text = parts[1].strip() | |
| else: | |
| current_text += " " + line | |
| if current_text: | |
| segments.append({"speaker": current_speaker, "text": current_text.strip()}) | |
| return segments | |
| def generate_multi_voice_podcast(script: str) -> bytes | None: | |
| """ | |
| Generate a multi-voice podcast by concatenating raw MP3 bytes. | |
| No FFmpeg or pydub required β works on Windows and HuggingFace Spaces. | |
| """ | |
| segments = parse_podcast_script(script) | |
| if not segments: | |
| return None | |
| # Assign voices dynamically based on unique speakers found in the script | |
| unique_speakers = [] | |
| for s in segments: | |
| if s["speaker"] not in unique_speakers: | |
| unique_speakers.append(s["speaker"]) | |
| available_voices = [ | |
| "en-US-AndrewNeural", | |
| "en-US-EmmaNeural", | |
| "en-GB-SoniaNeural", | |
| "en-GB-RyanNeural" | |
| ] | |
| speaker_voice_map = { | |
| speaker: available_voices[i % len(available_voices)] | |
| for i, speaker in enumerate(unique_speakers) | |
| } | |
| all_audio_chunks = [] | |
| progress_bar = st.progress(0, text="Generating voices...") | |
| for i, seg in enumerate(segments): | |
| voice = speaker_voice_map.get(seg["speaker"], "en-US-AndrewNeural") | |
| logger.info(f"Segment {i+1}/{len(segments)} | Speaker: {seg['speaker']} | Voice: {voice}") | |
| progress_bar.progress( | |
| (i + 1) / len(segments), | |
| text=f"Generating voice for {seg['speaker']}..." | |
| ) | |
| chunk_bytes = generate_voice_edge(seg["text"], voice) | |
| if chunk_bytes: | |
| all_audio_chunks.append(chunk_bytes) | |
| progress_bar.empty() | |
| if all_audio_chunks: | |
| # Concatenate raw MP3 bytes β no FFmpeg needed | |
| return b"".join(all_audio_chunks) | |
| return None | |
| def generate_voice(text: str, token: str = None) -> bytes | None: | |
| """ | |
| Generate audio using gTTS (Google Text-to-Speech). | |
| Free, no API key required, works on Windows and HuggingFace Spaces. | |
| Returns MP3 bytes, or None on failure. | |
| """ | |
| try: | |
| from gtts import gTTS | |
| except ImportError: | |
| st.error("β οΈ gTTS not installed. Run: pip install gtts") | |
| logger.error("gTTS not installed.") | |
| return None | |
| # Strip markdown symbols so they aren't read aloud | |
| clean_text = ( | |
| text.replace("**", "").replace("*", "") | |
| .replace("#", "").replace("`", "") | |
| .replace("---", "").strip() | |
| ) | |
| clean_text = clean_text[:2000] # Cap length for reasonable audio duration | |
| if not clean_text: | |
| st.warning("β οΈ No text to convert to speech.") | |
| return None | |
| logger.info(f"Generating TTS via gTTS | snippet: {clean_text[:60]}...") | |
| try: | |
| tts = gTTS(text=clean_text, lang="en", slow=False) | |
| audio_buffer = io.BytesIO() | |
| tts.write_to_fp(audio_buffer) | |
| audio_buffer.seek(0) | |
| audio_bytes = audio_buffer.read() | |
| logger.info(f"gTTS success. Bytes: {len(audio_bytes)}") | |
| return audio_bytes | |
| except Exception as e: | |
| full_error = traceback.format_exc() | |
| logger.error(f"gTTS error: {full_error}") | |
| st.error(f"β οΈ TTS generation failed: {str(e)}") | |
| with st.expander("Show Traceback"): | |
| st.code(full_error) | |
| return None | |
| # Sidebar | |
| with st.sidebar: | |
| st.header("Configuration") | |
| hf_token = os.getenv("HF_TOKEN", "") | |
| embedding_model = st.selectbox( | |
| "Embedding Model", | |
| [ | |
| "sentence-transformers/all-MiniLM-L6-v2", | |
| "BAAI/bge-small-en-v1.5", | |
| "sentence-transformers/all-mpnet-base-v2" | |
| ], | |
| help="Lightweight models that run on HuggingFace's servers" | |
| ) | |
| llm_model = st.selectbox( | |
| "LLM Model", | |
| [ | |
| "meta-llama/Llama-3.2-3B-Instruct", | |
| "mistralai/Mistral-7B-Instruct-v0.2", | |
| "HuggingFaceH4/zephyr-7b-beta", | |
| "microsoft/Phi-3-mini-4k-instruct", | |
| "google/gemma-2-2b-it" | |
| ], | |
| help="Language model for generating answers (chat-optimized models)" | |
| ) | |
| chunk_size = st.slider("Chunk Size", 500, 2000, 1000, 100) | |
| num_results = st.slider("Number of Retrieved Documents", 1, 5, 3) | |
| st.markdown("---") | |
| st.markdown("### π Setup Instructions") | |
| st.markdown( | |
| "1. Go to [HuggingFace](https://huggingface.co/settings/tokens)\n" | |
| "2. Create **Fine-grained** token\n" | |
| "3. β Enable **'Make calls to Inference Providers'**\n" | |
| "4. Copy and paste token above" | |
| ) | |
| # Initialize session state for chat history | |
| if "messages" not in st.session_state: | |
| st.session_state.messages = [] | |
| # Main Application Logic | |
| if not hf_token: | |
| st.warning("β οΈ Please enter your HuggingFace token in the sidebar.") | |
| st.stop() | |
| # --- Document Upload Section --- | |
| with st.expander("π€ Upload Documents", expanded=not st.session_state.get("vector_store")): | |
| uploaded_files = st.file_uploader( | |
| "Upload Documents", | |
| type=["pdf", "docx", "txt"], | |
| accept_multiple_files=True | |
| ) | |
| if st.button("π Upload the Data"): | |
| if not uploaded_files: | |
| st.error("Please upload some files first!") | |
| else: | |
| with st.spinner("Processing documents..."): | |
| vector_store, num_docs, num_chunks = process_uploaded_files( | |
| uploaded_files, hf_token, embedding_model, chunk_size | |
| ) | |
| if vector_store: | |
| st.session_state.vector_store = vector_store | |
| st.session_state.num_docs = num_docs | |
| st.session_state.num_chunks = num_chunks | |
| st.success(f"β Processed {num_docs} documents.") | |
| # --- AUTO GENERATE SUMMARY & AUDIO --- | |
| with st.spinner("Generating summary and audio..."): | |
| try: | |
| retriever = vector_store.as_retriever(search_kwargs={"k": num_results}) | |
| relevant_docs = retriever.invoke("Summarize the main points of the document.") | |
| context = "\n\n".join([doc.page_content for doc in relevant_docs]) | |
| summary = generate_answer("You are a professor at an MBA College, and are tasked with summarising and explaining the concepts manegarially in detail ", context, hf_token, llm_model) | |
| st.session_state.last_summary = summary | |
| audio = generate_voice(summary, hf_token) | |
| if audio: | |
| st.session_state.last_summary_audio = audio | |
| except Exception as e: | |
| st.warning(f"Auto-summary failed: {e}") | |
| else: | |
| st.error("Failed to process documents.") | |
| # Initialize tabs | |
| tab1, tab2, tab3 = st.tabs(["π Summary", "π¬ Chatbot", "π§ Voice Studio"]) | |
| if "vector_store" in st.session_state: | |
| vector_store = st.session_state.vector_store | |
| retriever = vector_store.as_retriever(search_kwargs={"k": num_results}) | |
| with tab1: | |
| st.header("Document Summary") | |
| if st.button("Re-generate Summary"): | |
| with st.spinner("Summarizing..."): | |
| relevant_docs = retriever.invoke("Summarize the main points of the document.") | |
| context = "\n\n".join([doc.page_content for doc in relevant_docs]) | |
| summary = generate_answer("You are a professor at an MBA College, and are tasked with summarising and explaining the concepts manegarially in detail ", context, hf_token, llm_model) | |
| st.session_state.last_summary = summary | |
| # Update audio too | |
| with st.spinner("Updating audio..."): | |
| audio = generate_voice(summary, hf_token) | |
| if audio: | |
| st.session_state.last_summary_audio = audio | |
| # Display summary if available in session state | |
| if "last_summary" in st.session_state: | |
| st.markdown(st.session_state.last_summary) | |
| st.markdown("---") | |
| if "last_summary_audio" in st.session_state: | |
| st.audio(st.session_state.last_summary_audio, format="audio/mpeg") | |
| if st.button("π Force Re-generate Audio"): | |
| with st.spinner("Generating audio for summary..."): | |
| audio = generate_voice(st.session_state.last_summary, hf_token) | |
| if audio: | |
| st.session_state.last_summary_audio = audio | |
| st.success("β Audio ready!") | |
| st.audio(audio, format="audio/mpeg") | |
| else: | |
| st.error("β Could not generate audio. Check your token and try again.") | |
| with tab2: | |
| st.header("Chat with Documents") | |
| for message in st.session_state.messages: | |
| with st.chat_message(message["role"]): | |
| st.markdown(message["content"]) | |
| if user_input := st.chat_input("Ask something about your documents..."): | |
| st.session_state.messages.append({"role": "user", "content": user_input}) | |
| with st.chat_message("user"): | |
| st.markdown(user_input) | |
| with st.chat_message("assistant"): | |
| relevant_docs = retriever.invoke(user_input) | |
| if relevant_docs: | |
| context = "\n\n".join([doc.page_content for doc in relevant_docs]) | |
| response = generate_answer(user_input, context, hf_token, llm_model) | |
| st.markdown(response) | |
| else: | |
| response = "β No relevant context found." | |
| st.markdown(response) | |
| st.session_state.messages.append({"role": "assistant", "content": response}) | |
| with tab3: | |
| st.header("Voice Studio") | |
| st.info("Generate creative scripts and listen to them!") | |
| col1, col2, col3, col4 = st.columns(4) | |
| task = None | |
| if col1.button("π Script"): task = "Script" | |
| if col2.button("ποΈ Podcast"): task = "Podcast" | |
| if col3.button("π Story"): task = "Story" | |
| if col4.button("βοΈ Debate"): task = "Debate" | |
| # if col5.button("π΅ Song"): task = "Song" | |
| if task: | |
| with st.spinner(f"Generating {task}..."): | |
| rel_docs = retriever.invoke(f"Key information for {task}") | |
| context = "\n\n".join([doc.page_content for doc in rel_docs]) | |
| content = generate_creative_content(task, context, hf_token, llm_model) | |
| st.subheader(f"Generated {task}") | |
| st.markdown(content) | |
| with st.spinner("Generating Audio..."): | |
| if task == "Podcast" or task == "Debate": | |
| audio = generate_multi_voice_podcast(content) | |
| else: | |
| audio = generate_voice_edge(content) | |
| if audio: | |
| st.audio(audio, format="audio/mp3") | |
| else: | |
| st.error("β Audio generation failed.") | |
| else: | |
| st.info("Please upload and process documents in the sidebar to get started.") | |
| # Footer | |
| st.markdown("---") |