Spaces:
Sleeping
Sleeping
| import os | |
| import shutil | |
| import gradio as gr | |
| from typing import List | |
| from youtube_transcript_api import YouTubeTranscriptApi | |
| from urllib.parse import urlparse, parse_qs | |
| import requests | |
| from PyPDF2 import PdfReader | |
| from langchain.text_splitter import RecursiveCharacterTextSplitter | |
| from langchain_community.vectorstores.faiss import FAISS | |
| from langchain.chains.question_answering import load_qa_chain | |
| from langchain.prompts import PromptTemplate | |
| from langchain_google_genai import GoogleGenerativeAIEmbeddings, ChatGoogleGenerativeAI | |
| import google.generativeai as genai | |
| from dotenv import load_dotenv | |
| # Load environment variables | |
| load_dotenv() | |
| # Configure Google API | |
| genai.configure(api_key=os.getenv("GOOGLE_API_KEY")) | |
| proxy_host = os.getenv('PROXY_HOST') | |
| proxy_port = os.getenv('PROXY_PORT') | |
| proxy_username = os.getenv('PROXY_USERNAME') | |
| proxy_password = os.getenv('PROXY_PASSWORD') | |
| # Format the proxy URL | |
| proxy_url = f'http://{proxy_username}:{proxy_password}@{proxy_host}:{proxy_port}' | |
| def extract_pdf_text(pdf_files): | |
| all_text = "" | |
| for pdf in pdf_files: | |
| pdf_reader = PdfReader(pdf) | |
| for page in pdf_reader.pages: | |
| all_text += page.extract_text() | |
| return all_text | |
| def extract_video_id(url): | |
| parsed_url = urlparse(url) | |
| if parsed_url.hostname == 'youtu.be': | |
| return parsed_url.path[1:] | |
| elif parsed_url.hostname in ['www.youtube.com', 'youtube.com']: | |
| query_params = parse_qs(parsed_url.query) | |
| return query_params.get('v', [None])[0] | |
| return None | |
| def extract_youtube_transcript(video_id): | |
| try: | |
| srt = YouTubeTranscriptApi.get_transcript(video_id, proxies={'https': proxy_url}) | |
| all_text = "" | |
| for dic in srt: | |
| all_text += dic['text'] + ' ' | |
| return all_text | |
| except Exception as e: | |
| print(f"Error extracting YouTube transcript: {e}") | |
| return str(e) | |
| def get_youtube_video_title(video_id): | |
| try: | |
| url = f"https://www.youtube.com/oembed?url=http://www.youtube.com/watch?v={video_id}&format=json" | |
| response = requests.get(url) | |
| data = response.json() | |
| return data['title'] | |
| except Exception: | |
| return "Untitled YouTube Video" | |
| def split_text_into_chunks(text): | |
| splitter = RecursiveCharacterTextSplitter(chunk_size=12000, chunk_overlap=1200) | |
| text_chunks = splitter.split_text(text) | |
| return text_chunks | |
| def create_vector_store(chunks): | |
| embeddings = GoogleGenerativeAIEmbeddings(model="models/embedding-001") | |
| vector_store = FAISS.from_texts(chunks, embedding=embeddings) | |
| vector_store.save_local("faiss_index") | |
| def setup_conversation_chain(template): | |
| model = ChatGoogleGenerativeAI(model="gemini-pro", temperature=0.3) | |
| prompt = PromptTemplate(template=template, input_variables=["context", "question"]) | |
| chain = load_qa_chain(model, chain_type="stuff", prompt=prompt) | |
| return chain | |
| def process_files(files, youtube_url): | |
| all_text = "" | |
| uploaded_files = [] | |
| # Process PDF files | |
| if files: | |
| os.makedirs("uploads", exist_ok=True) | |
| for file in files: | |
| # Extract just the filename from the full path | |
| filename = os.path.basename(file.name) | |
| file_path = os.path.join("uploads", filename) | |
| # Copy the file from the temporary location to our uploads directory | |
| shutil.copy(file.name, file_path) | |
| all_text += extract_pdf_text([file_path]) | |
| uploaded_files.append({"name": filename, "type": "pdf"}) | |
| # Process YouTube URL | |
| if youtube_url: | |
| video_id = extract_video_id(youtube_url) | |
| if video_id: | |
| transcript = extract_youtube_transcript(video_id) | |
| all_text += transcript | |
| video_title = get_youtube_video_title(video_id) | |
| uploaded_files.append({"name": video_title, "url": youtube_url}) | |
| else: | |
| return "Invalid YouTube URL", "" | |
| if not all_text: | |
| return "No content to process", "" | |
| chunks = split_text_into_chunks(all_text) | |
| create_vector_store(chunks) | |
| # Remove uploaded files after processing | |
| if os.path.exists("uploads"): | |
| for file in os.listdir("uploads"): | |
| file_path = os.path.join("uploads", file) | |
| if os.path.exists(file_path): | |
| os.remove(file_path) | |
| # Format the file list for display | |
| file_list_text = "\n".join( | |
| [f"- **{file['name']}**" + (f" ([Link]({file['url']}))" if 'url' in file else "") for file in uploaded_files] | |
| ) | |
| return "Content uploaded and processed successfully", file_list_text | |
| def ask_question(question): | |
| try: | |
| embeddings = GoogleGenerativeAIEmbeddings(model="models/embedding-001") | |
| indexed_data = FAISS.load_local("faiss_index", embeddings, allow_dangerous_deserialization=True) | |
| docs = indexed_data.similarity_search(question) | |
| prompt_template = """ | |
| Your alias is NeuralChat. Your task is to provide a thorough response based on the given context, ensuring all relevant details are included. | |
| If the requested information isn't available, simply state, "answer not available in context," then answer based on your understanding, connecting with the context. | |
| Don't provide incorrect information.\n\n | |
| Context: \n {context}?\n | |
| Question: \n {question}\n | |
| Answer: | |
| """ | |
| chain = setup_conversation_chain(prompt_template) | |
| response = chain({"input_documents": docs, "question": question}, return_only_outputs=True) | |
| return response["output_text"] | |
| except Exception as e: | |
| return f"An error occurred: {str(e)}" | |
| def chat(message, history): | |
| response = ask_question(message) | |
| history.append((message, response)) | |
| return history, "" | |
| theme = gr.themes.Monochrome().set( | |
| button_primary_background_fill="#FF0000", | |
| button_primary_background_fill_hover="#FF0000", | |
| ) | |
| # Gradio interface | |
| with gr.Blocks(theme=theme) as demo: | |
| gr.Markdown("# NeuralChat", elem_id="header") | |
| with gr.Row(): | |
| with gr.Column(scale=2): | |
| files = gr.File(label="Upload PDF Files", file_count="multiple") | |
| youtube_url = gr.Textbox(label="YouTube URL") | |
| upload_button = gr.Button("Upload and Process") | |
| upload_output = gr.Textbox(label="Upload Status") | |
| file_list = gr.Markdown(label="Uploaded Files") | |
| with gr.Column(scale=5): | |
| chatbot = gr.Chatbot(show_copy_button=True, scale=1.5) | |
| msg = gr.Textbox(label="Ask a question", lines=1) | |
| upload_button.click(process_files, inputs=[files, youtube_url], outputs=[upload_output, file_list]) | |
| msg.submit(chat, inputs=[msg, chatbot], outputs=[chatbot, msg]) | |
| if __name__ == "__main__": | |
| demo.launch() | |