import os import openai import streamlit as st import io from pydub import AudioSegment from youtube_transcript_api import YouTubeTranscriptApi, TranscriptsDisabled, NoTranscriptFound import fitz # PyMuPDF import tiktoken # For token counting import traceback # For detailed error logging # --- Configuration --- # Models chosen for speed and capability balance TRANSCRIPTION_MODEL = "whisper-1" LANGUAGE_MODEL = "gpt-3.5-turbo" # Approximate context window limit for the language model (input tokens) MAX_TOKENS_FOR_SUMMARY_INPUT = 3500 MAX_TOKENS_FOR_CHAT_INPUT = 3500 # Context + Question AUDIO_SIZE_LIMIT_MB = 25 # OpenAI API limit # --- Helper Functions --- # Initialize tiktoken encoder globally try: encoding = tiktoken.encoding_for_model(LANGUAGE_MODEL) except Exception as e: st.warning(f"Could not initialize token encoder for {LANGUAGE_MODEL}: {e}. Using word count fallback.") encoding = None def count_tokens(text): """Counts tokens using tiktoken, with fallback.""" if not text: return 0 if encoding: try: return len(encoding.encode(text)) except Exception as e: st.warning(f"Token encoding failed: {e}. Falling back to word count.") return len(text.split()) # Fallback if encoding fails else: # Fallback estimate if tiktoken failed to initialize return len(text.split()) def truncate_text_by_tokens(text, max_tokens): """Truncates text to fit within a token limit.""" if not text: return "" if encoding: try: tokens = encoding.encode(text) if len(tokens) > max_tokens: truncated_tokens = tokens[:max_tokens] return encoding.decode(truncated_tokens) return text except Exception as e: st.warning(f"Token encoding/decoding failed during truncation: {e}. Using word count fallback.") words = text.split() estimated_words = int(max_tokens * 0.7) return " ".join(words[:estimated_words]) else: words = text.split() estimated_words = int(max_tokens * 0.7) return " ".join(words[:estimated_words]) # --- Core Functions --- def initialize_openai(): """Initializes OpenAI API key from Streamlit secrets.""" try: api_key = st.secrets["OPENAI_API_KEY"] if not api_key: st.error("OpenAI API Key not found in Secrets. Please add 'OPENAI_API_KEY' to your Hugging Face Space secrets.") return False openai.api_key = api_key return True except KeyError: st.error("OpenAI API Key not found in Secrets. Please add 'OPENAI_API_KEY' to your Hugging Face Space secrets.") return False except Exception as e: st.error(f"Error initializing OpenAI: {e}") return False def transcribe_audio(audio_file): """Transcribes audio using OpenAI Whisper API.""" if audio_file.size > AUDIO_SIZE_LIMIT_MB * 1024 * 1024: st.error(f"Audio file size exceeds {AUDIO_SIZE_LIMIT_MB}MB limit.") return None try: audio = AudioSegment.from_file(audio_file) buffer = io.BytesIO() audio.export(buffer, format="wav") buffer.seek(0) buffer.name = "audio.wav" # Required by OpenAI API response = openai.Audio.transcribe( model=TRANSCRIPTION_MODEL, file=buffer, response_format="verbose_json" ) transcription_text = "\n".join( [f"[{seg['start']:.2f}-{seg['end']:.2f}] {seg['text']}" for seg in response['segments']] ) return transcription_text except openai.error.AuthenticationError: st.error("Authentication Error: Invalid OpenAI API Key provided in Secrets.") return None except openai.error.RateLimitError: st.error("OpenAI API Rate Limit Exceeded. Please check your usage or wait.") return None except Exception as e: st.error(f"Error during audio transcription: {str(e)}") print(f"Transcription Error Traceback:\n{traceback.format_exc()}") return None def extract_text_from_pdf(pdf_file): """Extracts text from a PDF using PyMuPDF.""" try: pdf_bytes = pdf_file.getvalue() doc = fitz.open(stream=pdf_bytes, filetype="pdf") text = "" for page in doc: text += page.get_text() + "\n" doc.close() if not text.strip(): st.warning("No text could be extracted. The PDF might be image-based (scanned) or empty.") return "" return text except Exception as e: st.error(f"Error reading PDF: {str(e)}") print(f"PDF Extraction Error Traceback:\n{traceback.format_exc()}") return None def get_youtube_transcript(url): """Gets English transcript from a YouTube video.""" try: video_id = None if "watch?v=" in url: video_id = url.split("watch?v=")[1].split("&")[0] elif "youtu.be/" in url: video_id = url.split("youtu.be/")[1].split("?")[0] elif "youtu.be/" in url: video_id = url.split("/")[-1].split("?")[0] elif "youtu.be//" in url: video_id = url.split("/")[-1].split("?")[0] else: # Basic check for other potential valid IDs (e.g., youtu.be links) parts = url.split("/") potential_id = parts[-1].split("?")[0] if len(potential_id) == 11: # Common length for YouTube IDs video_id = potential_id else: st.error("Could not automatically determine Video ID from URL. Please use standard 'watch?v=' URL.") return None if not video_id: st.error("Failed to extract video ID.") return None transcript_list = YouTubeTranscriptApi.list_transcripts(video_id) try: # Prioritize manual transcripts, fallback to generated transcript = transcript_list.find_manually_created_transcript(['en']) except NoTranscriptFound: try: transcript = transcript_list.find_generated_transcript(['en']) st.info("Using auto-generated English transcript.") except NoTranscriptFound: st.warning(f"No English transcript (manual or generated) found for video: {url}") return None transcript_data = transcript.fetch() transcription_text = "\n".join( [f"[{entry['start']:.2f}-{entry['start']+entry['duration']:.2f}] {entry['text']}" for entry in transcript_data] ) return transcription_text except TranscriptsDisabled: st.error(f"Transcripts are disabled for video: {url}") return None except Exception as e: st.error(f"Error fetching YouTube transcript: {str(e)}") print(f"YouTube Transcript Error Traceback:\n{traceback.format_exc()}") return None def generate_summary(text_to_summarize, max_output_tokens=800): """Generates summary using OpenAI API, handling potential truncation.""" input_token_count = count_tokens(text_to_summarize) if input_token_count > MAX_TOKENS_FOR_SUMMARY_INPUT: st.warning(f"Input text ({input_token_count} tokens) exceeds the limit ({MAX_TOKENS_FOR_SUMMARY_INPUT} tokens) for the summarization model. Truncating input.") text_to_summarize = truncate_text_by_tokens(text_to_summarize, MAX_TOKENS_FOR_SUMMARY_INPUT) input_token_count = count_tokens(text_to_summarize) # Recount if not text_to_summarize: st.error("Input text for summarization is empty.") return None prompt = f"Summarize the following text comprehensively, focusing on key points, concepts, and conclusions. Aim for a detailed summary but keep it concise where possible:\n\n{text_to_summarize}" try: response = openai.ChatCompletion.create( model=LANGUAGE_MODEL, messages=[{'role': 'user', 'content': prompt}], max_tokens=max_output_tokens, temperature=0.5 ) return response.choices[0].message.content.strip() except openai.error.AuthenticationError: st.error("Authentication Error: Invalid OpenAI API Key provided in Secrets.") return None except openai.error.RateLimitError: st.error("OpenAI API Rate Limit Exceeded during summarization.") return None except openai.error.InvalidRequestError as e: st.error(f"Invalid Request during summarization: {e}.") return None except Exception as e: st.error(f"Error during summary generation: {str(e)}") print(f"Summarization Error Traceback:\n{traceback.format_exc()}") return None def chat_with_ai(question, context, max_output_tokens=500): """Answers questions based on the provided context using OpenAI API.""" if not question: st.warning("Please enter a question.") return None if not context: st.error("Cannot answer question: No context available.") return None prompt = f"Based *only* on the following content:\n\n---\n{context}\n---\n\nAnswer the question: {question}" prompt_token_count = count_tokens(prompt) if prompt_token_count > MAX_TOKENS_FOR_CHAT_INPUT: st.error(f"The question and context combined ({prompt_token_count} tokens) exceed the model's input limit ({MAX_TOKENS_FOR_CHAT_INPUT} tokens). Try using the summary as context or ask a shorter question.") return None try: response = openai.ChatCompletion.create( model=LANGUAGE_MODEL, messages=[{'role': 'user', 'content': prompt}], max_tokens=max_output_tokens, temperature=0.3 ) return response.choices[0].message.content.strip() except openai.error.AuthenticationError: st.error("Authentication Error: Invalid OpenAI API Key provided in Secrets.") return None except openai.error.RateLimitError: st.error("OpenAI API Rate Limit Exceeded during chat.") return None except openai.error.InvalidRequestError as e: st.error(f"Invalid Request during chat: {e}.") return None except Exception as e: st.error(f"Error during AI chat: {str(e)}") print(f"Chat Error Traceback:\n{traceback.format_exc()}") return None # --- Streamlit App Main Function --- def main(): st.set_page_config(layout="wide", page_title="AI Summarization Bot") # --- Styling (Restored Original CSS) --- st.markdown(""" """, unsafe_allow_html=True) st.markdown("

AI Summarization Bot 🤖

", unsafe_allow_html=True) # Removed redundant description paragraph as title is descriptive # Initialize OpenAI API Key if 'openai_initialized' not in st.session_state: st.session_state['openai_initialized'] = initialize_openai() if not st.session_state.get('openai_initialized'): st.warning("OpenAI initialization failed. Please ensure your API key is correctly set in Hugging Face secrets and refresh.") st.stop() # --- Sidebar for Inputs --- st.sidebar.header("Input Options") input_type = st.sidebar.selectbox("Select Input Type", ["Audio File", "PDF Document", "YouTube URL"], key="input_type_select") # Initialize session state variables if 'full_text' not in st.session_state: st.session_state['full_text'] = None if 'summary' not in st.session_state: st.session_state['summary'] = None if 'last_input_type' not in st.session_state: st.session_state['last_input_type'] = None if 'last_input_data_key' not in st.session_state: st.session_state['last_input_data_key'] = None if 'current_input_key' not in st.session_state: st.session_state['current_input_key'] = None # Clear results if input type changes if st.session_state['last_input_type'] != input_type: st.session_state['full_text'] = None st.session_state['summary'] = None st.session_state['last_input_data_key'] = None st.session_state['current_input_key'] = None # Reset current key too st.session_state['last_input_type'] = input_type # --- Input Elements --- uploaded_file = None youtube_url = None process_button_pressed = False if input_type == "Audio File": uploaded_file = st.sidebar.file_uploader("Upload audio file (Max 25MB)", type=["mp3", "wav", "m4a", "ogg", "webm"], key="audio_uploader") if uploaded_file: # Use file name and size as the key instead of non-existent .id st.session_state['current_input_key'] = f"{uploaded_file.name}-{uploaded_file.size}" elif input_type == "PDF Document": uploaded_file = st.sidebar.file_uploader("Upload PDF document", type=["pdf"], key="pdf_uploader") if uploaded_file: # Use file name and size as the key st.session_state['current_input_key'] = f"{uploaded_file.name}-{uploaded_file.size}" elif input_type == "YouTube URL": youtube_url = st.sidebar.text_input("Enter YouTube URL", key="youtube_input", placeholder="e.g., https://www.youtube.com/watch?v=...") if youtube_url: st.session_state['current_input_key'] = youtube_url # Use URL as key st.sidebar.markdown("---") # Separator st.sidebar.markdown("### Steps:") st.sidebar.markdown("1. Select input type & provide source.") st.sidebar.markdown("2. Click 'Generate Summary & Notes'.") st.sidebar.markdown("3. Review results and use chat if needed.") # Single "Generate" button if st.sidebar.button("Generate Summary & Notes", key="generate_button", use_container_width=True): # Make button wider current_key = st.session_state.get('current_input_key') # Check if input is provided for the selected type valid_input_provided = False if input_type == "Audio File" and uploaded_file: valid_input_provided = True elif input_type == "PDF Document" and uploaded_file: valid_input_provided = True elif input_type == "YouTube URL" and youtube_url: valid_input_provided = True if valid_input_provided: # Check if it's a *new* input compared to the last processed one if current_key != st.session_state.get('last_input_data_key'): st.session_state['full_text'] = None st.session_state['summary'] = None st.session_state['last_input_data_key'] = current_key process_button_pressed = True else: # Input hasn't changed, check if results already exist if st.session_state.get('full_text') or st.session_state.get('summary'): st.info("Results for the current input are already displayed. Upload a new file or URL to generate again.") else: # Results don't exist for some reason, re-process process_button_pressed = True else: st.warning("Please provide input (upload file or enter URL) before generating.") # --- Processing Logic --- if process_button_pressed: extracted_text = None input_valid = False # Re-check validity just before processing if input_type == "Audio File" and uploaded_file: input_valid = True with st.spinner('Transcribing audio... (this may take a while)'): extracted_text = transcribe_audio(uploaded_file) elif input_type == "PDF Document" and uploaded_file: input_valid = True with st.spinner('Extracting text from PDF...'): extracted_text = extract_text_from_pdf(uploaded_file) elif input_type == "YouTube URL" and youtube_url: input_valid = True with st.spinner('Fetching YouTube transcript...'): extracted_text = get_youtube_transcript(youtube_url) if input_valid and extracted_text is not None: st.session_state['full_text'] = extracted_text if extracted_text: # Only summarize if text extraction was successful with st.spinner('Generating summary...'): summary_text = generate_summary(extracted_text) st.session_state['summary'] = summary_text if not summary_text: st.error("Summary generation failed.") # Keep error message if summary is None else: st.warning("Text extraction resulted in empty content. Cannot generate summary.") st.session_state['summary'] = None elif input_valid and extracted_text is None: # Error already shown in extraction func OR warning shown if text was empty st.session_state['full_text'] = None st.session_state['summary'] = None # --- Display Results --- # Use columns only if there's something to display to avoid empty columns if st.session_state.get('full_text') or st.session_state.get('summary'): st.markdown("---") # Separator before results col1, col2 = st.columns([1, 1]) with col1: st.markdown("

Full Text / Transcription

", unsafe_allow_html=True) full_text_content = st.session_state.get('full_text') if full_text_content: display_text = full_text_content # Simple truncation for display performance, not affecting summary/chat context if len(display_text) > 150000: display_text = display_text[:150000] + "\n\n... (Text truncated for display performance)" st.text_area("Full Content:", display_text, height=400, key="full_text_area", label_visibility="collapsed") else: # Show placeholder only if generation was attempted but failed/empty if st.session_state.get('last_input_data_key') and process_button_pressed: # Check if process was triggered st.info("No text extracted or transcribed.") with col2: st.markdown("

Generated Summary

", unsafe_allow_html=True) summary_content = st.session_state.get('summary') if summary_content: st.text_area("Summary:", summary_content, height=400, key="summary_area", label_visibility="collapsed") else: # Show placeholder only if generation was attempted but failed/empty if st.session_state.get('last_input_data_key') and process_button_pressed: st.warning("Summary could not be generated.") # --- Chat Section --- st.markdown("---") st.markdown("

Chat with AI about the Content

", unsafe_allow_html=True) context_option = st.radio( "Use as chat context:", ('Generated Summary', 'Full Text'), key='chat_context_option', horizontal=True, label_visibility="collapsed" # Hide label for radio itself ) chat_context = None context_name = "" if context_option == 'Generated Summary': if st.session_state.get('summary'): chat_context = st.session_state['summary'] context_name = "Summary" else: st.warning("Summary not available for chat context.") else: # Full Text option if st.session_state.get('full_text'): full_text_for_chat = st.session_state['full_text'] # Truncate context *before* passing to chat if needed # Estimate tokens needed for question + response buffer max_context_tokens = MAX_TOKENS_FOR_CHAT_INPUT - 500 chat_context = truncate_text_by_tokens(full_text_for_chat, max_context_tokens) if len(full_text_for_chat) > len(chat_context): context_name = "Full Text (Truncated for Chat)" else: context_name = "Full Text" else: st.warning("Full text not available for chat context.") if chat_context: # Display which context is being used subtly st.markdown(f"Chatting based on: **{context_name}**", unsafe_allow_html=True) question = st.text_input("Ask a question:", key="chat_question", placeholder="Ask anything about the selected context...") if st.button("Ask AI", key="ask_ai_button", use_container_width=True): if question: with st.spinner("AI is thinking..."): answer = chat_with_ai(question, chat_context) if answer: st.markdown("**AI Answer:**") # Use markdown for potentially better formatting of AI response st.markdown(answer) else: st.error("Failed to get an answer from the AI.") else: st.warning("Please enter a question first.") else: # Only show message if processing was attempted for current input if st.session_state.get('last_input_data_key'): st.markdown("_(Generate content or summary first to enable chat)_") # Add footer or instructions if desired st.sidebar.markdown("---") st.sidebar.info("Powered by OpenAI Whisper & GPT models.") if __name__ == "__main__": main()