import os import openai import streamlit as st import io from pydub import AudioSegment from youtube_transcript_api import YouTubeTranscriptApi, TranscriptsDisabled, NoTranscriptFound import fitz # PyMuPDF import tiktoken # For token counting import traceback # For detailed error logging # --- Configuration --- # Models chosen for speed and capability balance TRANSCRIPTION_MODEL = "whisper-1" LANGUAGE_MODEL = "gpt-3.5-turbo" # Approximate context window limit for the language model (input tokens) MAX_TOKENS_FOR_SUMMARY_INPUT = 3500 MAX_TOKENS_FOR_CHAT_INPUT = 3500 # Context + Question AUDIO_SIZE_LIMIT_MB = 25 # OpenAI API limit # --- Helper Functions --- # Initialize tiktoken encoder globally try: encoding = tiktoken.encoding_for_model(LANGUAGE_MODEL) except Exception as e: st.warning(f"Could not initialize token encoder for {LANGUAGE_MODEL}: {e}. Using word count fallback.") encoding = None def count_tokens(text): """Counts tokens using tiktoken, with fallback.""" if not text: return 0 if encoding: try: return len(encoding.encode(text)) except Exception as e: st.warning(f"Token encoding failed: {e}. Falling back to word count.") return len(text.split()) # Fallback if encoding fails else: # Fallback estimate if tiktoken failed to initialize return len(text.split()) def truncate_text_by_tokens(text, max_tokens): """Truncates text to fit within a token limit.""" if not text: return "" if encoding: try: tokens = encoding.encode(text) if len(tokens) > max_tokens: truncated_tokens = tokens[:max_tokens] return encoding.decode(truncated_tokens) return text except Exception as e: st.warning(f"Token encoding/decoding failed during truncation: {e}. Using word count fallback.") words = text.split() estimated_words = int(max_tokens * 0.7) return " ".join(words[:estimated_words]) else: words = text.split() estimated_words = int(max_tokens * 0.7) return " ".join(words[:estimated_words]) # --- Core Functions --- def initialize_openai(): """Initializes OpenAI API key from Streamlit secrets.""" try: api_key = st.secrets["OPENAI_API_KEY"] if not api_key: st.error("OpenAI API Key not found in Secrets. Please add 'OPENAI_API_KEY' to your Hugging Face Space secrets.") return False openai.api_key = api_key return True except KeyError: st.error("OpenAI API Key not found in Secrets. Please add 'OPENAI_API_KEY' to your Hugging Face Space secrets.") return False except Exception as e: st.error(f"Error initializing OpenAI: {e}") return False def transcribe_audio(audio_file): """Transcribes audio using OpenAI Whisper API.""" if audio_file.size > AUDIO_SIZE_LIMIT_MB * 1024 * 1024: st.error(f"Audio file size exceeds {AUDIO_SIZE_LIMIT_MB}MB limit.") return None try: audio = AudioSegment.from_file(audio_file) buffer = io.BytesIO() audio.export(buffer, format="wav") buffer.seek(0) buffer.name = "audio.wav" # Required by OpenAI API response = openai.Audio.transcribe( model=TRANSCRIPTION_MODEL, file=buffer, response_format="verbose_json" ) transcription_text = "\n".join( [f"[{seg['start']:.2f}-{seg['end']:.2f}] {seg['text']}" for seg in response['segments']] ) return transcription_text except openai.error.AuthenticationError: st.error("Authentication Error: Invalid OpenAI API Key provided in Secrets.") return None except openai.error.RateLimitError: st.error("OpenAI API Rate Limit Exceeded. Please check your usage or wait.") return None except Exception as e: st.error(f"Error during audio transcription: {str(e)}") print(f"Transcription Error Traceback:\n{traceback.format_exc()}") return None def extract_text_from_pdf(pdf_file): """Extracts text from a PDF using PyMuPDF.""" try: pdf_bytes = pdf_file.getvalue() doc = fitz.open(stream=pdf_bytes, filetype="pdf") text = "" for page in doc: text += page.get_text() + "\n" doc.close() if not text.strip(): st.warning("No text could be extracted. The PDF might be image-based (scanned) or empty.") return "" return text except Exception as e: st.error(f"Error reading PDF: {str(e)}") print(f"PDF Extraction Error Traceback:\n{traceback.format_exc()}") return None def get_youtube_transcript(url): """Gets English transcript from a YouTube video.""" try: video_id = None if "watch?v=" in url: video_id = url.split("watch?v=")[1].split("&")[0] elif "youtu.be/" in url: video_id = url.split("youtu.be/")[1].split("?")[0] elif "youtu.be/" in url: video_id = url.split("/")[-1].split("?")[0] elif "youtu.be//" in url: video_id = url.split("/")[-1].split("?")[0] else: # Basic check for other potential valid IDs (e.g., youtu.be links) parts = url.split("/") potential_id = parts[-1].split("?")[0] if len(potential_id) == 11: # Common length for YouTube IDs video_id = potential_id else: st.error("Could not automatically determine Video ID from URL. Please use standard 'watch?v=' URL.") return None if not video_id: st.error("Failed to extract video ID.") return None transcript_list = YouTubeTranscriptApi.list_transcripts(video_id) try: # Prioritize manual transcripts, fallback to generated transcript = transcript_list.find_manually_created_transcript(['en']) except NoTranscriptFound: try: transcript = transcript_list.find_generated_transcript(['en']) st.info("Using auto-generated English transcript.") except NoTranscriptFound: st.warning(f"No English transcript (manual or generated) found for video: {url}") return None transcript_data = transcript.fetch() transcription_text = "\n".join( [f"[{entry['start']:.2f}-{entry['start']+entry['duration']:.2f}] {entry['text']}" for entry in transcript_data] ) return transcription_text except TranscriptsDisabled: st.error(f"Transcripts are disabled for video: {url}") return None except Exception as e: st.error(f"Error fetching YouTube transcript: {str(e)}") print(f"YouTube Transcript Error Traceback:\n{traceback.format_exc()}") return None def generate_summary(text_to_summarize, max_output_tokens=800): """Generates summary using OpenAI API, handling potential truncation.""" input_token_count = count_tokens(text_to_summarize) if input_token_count > MAX_TOKENS_FOR_SUMMARY_INPUT: st.warning(f"Input text ({input_token_count} tokens) exceeds the limit ({MAX_TOKENS_FOR_SUMMARY_INPUT} tokens) for the summarization model. Truncating input.") text_to_summarize = truncate_text_by_tokens(text_to_summarize, MAX_TOKENS_FOR_SUMMARY_INPUT) input_token_count = count_tokens(text_to_summarize) # Recount if not text_to_summarize: st.error("Input text for summarization is empty.") return None prompt = f"Summarize the following text comprehensively, focusing on key points, concepts, and conclusions. Aim for a detailed summary but keep it concise where possible:\n\n{text_to_summarize}" try: response = openai.ChatCompletion.create( model=LANGUAGE_MODEL, messages=[{'role': 'user', 'content': prompt}], max_tokens=max_output_tokens, temperature=0.5 ) return response.choices[0].message.content.strip() except openai.error.AuthenticationError: st.error("Authentication Error: Invalid OpenAI API Key provided in Secrets.") return None except openai.error.RateLimitError: st.error("OpenAI API Rate Limit Exceeded during summarization.") return None except openai.error.InvalidRequestError as e: st.error(f"Invalid Request during summarization: {e}.") return None except Exception as e: st.error(f"Error during summary generation: {str(e)}") print(f"Summarization Error Traceback:\n{traceback.format_exc()}") return None def chat_with_ai(question, context, max_output_tokens=500): """Answers questions based on the provided context using OpenAI API.""" if not question: st.warning("Please enter a question.") return None if not context: st.error("Cannot answer question: No context available.") return None prompt = f"Based *only* on the following content:\n\n---\n{context}\n---\n\nAnswer the question: {question}" prompt_token_count = count_tokens(prompt) if prompt_token_count > MAX_TOKENS_FOR_CHAT_INPUT: st.error(f"The question and context combined ({prompt_token_count} tokens) exceed the model's input limit ({MAX_TOKENS_FOR_CHAT_INPUT} tokens). Try using the summary as context or ask a shorter question.") return None try: response = openai.ChatCompletion.create( model=LANGUAGE_MODEL, messages=[{'role': 'user', 'content': prompt}], max_tokens=max_output_tokens, temperature=0.3 ) return response.choices[0].message.content.strip() except openai.error.AuthenticationError: st.error("Authentication Error: Invalid OpenAI API Key provided in Secrets.") return None except openai.error.RateLimitError: st.error("OpenAI API Rate Limit Exceeded during chat.") return None except openai.error.InvalidRequestError as e: st.error(f"Invalid Request during chat: {e}.") return None except Exception as e: st.error(f"Error during AI chat: {str(e)}") print(f"Chat Error Traceback:\n{traceback.format_exc()}") return None # --- Streamlit App Main Function --- def main(): st.set_page_config(layout="wide", page_title="AI Summarization Bot") # --- Styling (Restored Original CSS) --- st.markdown(""" """, unsafe_allow_html=True) st.markdown("