Spaces:
Runtime error
Runtime error
| from dotenv import load_dotenv | |
| import os | |
| import gradio as gr | |
| from PyPDF2 import PdfReader | |
| from google import genai | |
| #from langchain.text_splitter import RecursiveCharacterTextSplitter | |
| from langchain.text_splitter import CharacterTextSplitter | |
| from langchain_google_genai import GoogleGenerativeAIEmbeddings, ChatGoogleGenerativeAI | |
| from langchain_community.vectorstores import FAISS | |
| from langchain.chains.question_answering import load_qa_chain | |
| from langchain.prompts import PromptTemplate | |
| import shutil | |
| import tempfile | |
| from docx import Document | |
| from docx.shared import Inches | |
| from datetime import datetime | |
| # Load environment variables | |
| load_dotenv() | |
| # Delay reading API key: provide helper function, read only when needed | |
| def _get_api_key() -> str: | |
| candidate_keys = [ | |
| "GOOGLE_API_KEY", | |
| "GEMINI_API_KEY", | |
| "GOOGLE_GENAI_API_KEY", | |
| "GENAI_API_KEY", | |
| ] | |
| for key_name in candidate_keys: | |
| value = os.getenv(key_name, "").strip() | |
| if value: | |
| # Sync to GOOGLE_API_KEY for compatibility with underlying libraries | |
| os.environ["GOOGLE_API_KEY"] = value | |
| return value | |
| return "" | |
| class PDFChatBot: | |
| def __init__(self): | |
| self.vector_store = None | |
| # Delay embedding model initialization until actually needed | |
| self.embeddings = None | |
| self.processed_files = [] | |
| self.chat_history = [] # Store chat history | |
| def get_pdf_text(self, pdf_files): | |
| """Extract text from multiple PDF files""" | |
| raw_text = "" | |
| processed_count = 0 | |
| if not pdf_files: | |
| return raw_text, processed_count | |
| # Handle single file and multiple files | |
| if not isinstance(pdf_files, list): | |
| pdf_files = [pdf_files] | |
| for pdf_file in pdf_files: | |
| try: | |
| # If uploaded file object, use its name attribute | |
| pdf_path = pdf_file.name if hasattr(pdf_file, "name") else pdf_file | |
| pdf_reader = PdfReader(pdf_path) | |
| file_text = "" | |
| for page in pdf_reader.pages: | |
| text = page.extract_text() | |
| if text: | |
| file_text += text + "\n" | |
| if file_text.strip(): | |
| raw_text += file_text | |
| processed_count += 1 | |
| self.processed_files.append(os.path.basename(pdf_path)) | |
| except Exception as e: | |
| print(f"Error while reading PDF: {str(e)}") | |
| continue | |
| return raw_text, processed_count | |
| def get_pdf_text_via_gemini(self, pdf_files): | |
| """Use Gemini 2.0 Flash to directly parse PDF text (via Files API).""" | |
| api_key = _get_api_key() | |
| if not api_key: | |
| return "", 0 | |
| genai.configure(api_key=api_key) | |
| model = genai.GenerativeModel("gemini-2.0-flash-exp") | |
| def get_text_chunks(self, text): | |
| """Split text into chunks for processing""" | |
| text_splitter = CharacterTextSplitter( | |
| separator="\n", | |
| chunk_size=10000, | |
| chunk_overlap=1000, | |
| length_function=len, | |
| ) | |
| return text_splitter.split_text(text) | |
| def create_vector_store(self, chunks): | |
| """Create FAISS vector store from text chunks""" | |
| try: | |
| if self.embeddings is None: | |
| api_key = _get_api_key() | |
| if not api_key: | |
| return False | |
| self.embeddings = GoogleGenerativeAIEmbeddings( | |
| model="models/text-embedding-004", | |
| google_api_key=api_key, | |
| ) | |
| self.vector_store = FAISS.from_texts(chunks, self.embeddings) | |
| self.vector_store.save_local("faiss_index") | |
| return True | |
| except Exception as e: | |
| print(f"Error while creating vector store: {str(e)}") | |
| return False | |
| def load_vector_store(self): | |
| """Load existing vector store""" | |
| try: | |
| if not os.path.exists("faiss_index"): | |
| return False | |
| if self.embeddings is None: | |
| api_key = _get_api_key() | |
| if not api_key: | |
| return False | |
| self.embeddings = GoogleGenerativeAIEmbeddings( | |
| model="models/text-embedding-004", | |
| google_api_key=api_key, | |
| ) | |
| self.vector_store = FAISS.load_local( | |
| "faiss_index", | |
| embeddings=self.embeddings, | |
| allow_dangerous_deserialization=True, | |
| ) | |
| return True | |
| except Exception as e: | |
| print(f"Error while loading vector store: {str(e)}") | |
| return False | |
| def get_conversational_chain(self, temperature=0.3, max_tokens=4096): | |
| """Create conversational QA chain""" | |
| prompt_template = """ | |
| Answer the question in as much detail as possible based on the provided context. | |
| If you need more information to answer perfectly, ask for the missing details. | |
| If the answer cannot be found in the provided content, simply say: | |
| "The answer cannot be found in the provided content." | |
| Context: | |
| {context} | |
| Question: | |
| {question} | |
| Answer: | |
| """ | |
| api_key = _get_api_key() | |
| if not api_key: | |
| raise RuntimeError( | |
| "API key not set. Please configure GOOGLE_API_KEY after deployment." | |
| ) | |
| model = ChatGoogleGenerativeAI( | |
| model="gemini-2.0-flash-exp", | |
| google_api_key=api_key, | |
| temperature=temperature, | |
| max_tokens=max_tokens, | |
| top_p=0.8, | |
| ) | |
| prompt = PromptTemplate( | |
| template=prompt_template, | |
| input_variables=["context", "question"], | |
| ) | |
| return load_qa_chain( | |
| model, | |
| chain_type="stuff", | |
| prompt=prompt, | |
| ) | |
| def process_pdfs(self, pdf_files, progress=gr.Progress(), use_gemini=False): | |
| """Process PDF files""" | |
| if not pdf_files: | |
| return "Please upload at least one PDF file.", "" | |
| self.processed_files = [] | |
| progress(0, desc="Starting PDF processing...") | |
| # Extract text | |
| progress(0.2, desc="Extracting PDF text...") | |
| if use_gemini: | |
| raw_text, processed_count = self.get_pdf_text_via_gemini(pdf_files) | |
| else: | |
| raw_text, processed_count = self.get_pdf_text(pdf_files) | |
| if not raw_text.strip(): | |
| return "Unable to extract text from the PDF files.", "" | |
| # Split text | |
| progress(0.4, desc="Splitting text...") | |
| text_chunks = self.get_text_chunks(raw_text) | |
| # Create vector store | |
| progress(0.6, desc="Creating vector store...") | |
| success = self.create_vector_store(text_chunks) | |
| progress(1.0, desc="Processing completed!") | |
| if success: | |
| file_list = "Processed files:\n" + "\n".join( | |
| [f"• {file}" for file in self.processed_files] | |
| ) | |
| return ( | |
| f"✅ Successfully processed {processed_count} PDF files!\n" | |
| f"Total text chunks: {len(text_chunks)}\n" | |
| "You can now start asking questions.", | |
| file_list, | |
| ) | |
| else: | |
| return "❌ PDF processing failed. Please try again.", "" | |
| def clear_data(self): | |
| """Clear processed data""" | |
| try: | |
| if os.path.exists("faiss_index"): | |
| shutil.rmtree("faiss_index") | |
| self.vector_store = None | |
| self.processed_files = [] | |
| self.chat_history = [] | |
| return "✅ All processed data has been cleared!", "" | |
| except Exception as e: | |
| return f"❌ Error while clearing data: {str(e)}", "" | |
| def create_docx_report(self, chat_history): | |
| """Create a DOCX report containing chat history""" | |
| try: | |
| doc = Document() | |
| # Title | |
| title = doc.add_heading("PDF Chatbot - Q&A Report", 0) | |
| title.alignment = 1 # Center alignment | |
| # Generation time | |
| doc.add_paragraph( | |
| f"Generated at: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}" | |
| ) | |
| # Processed files | |
| if self.processed_files: | |
| doc.add_heading("Processed PDF files:", level=2) | |
| for i, file in enumerate(self.processed_files, 1): | |
| doc.add_paragraph(f"{i}. {file}", style="List Number") | |
| doc.add_paragraph("") | |
| # Chat history | |
| doc.add_heading("Q&A History:", level=2) | |
| if not chat_history: | |
| doc.add_paragraph("There is currently no chat history.") | |
| else: | |
| for i in range(0, len(chat_history), 2): | |
| if i + 1 < len(chat_history): | |
| question = chat_history[i]["content"] | |
| answer = chat_history[i + 1]["content"] | |
| # Question | |
| q_paragraph = doc.add_paragraph() | |
| q_run = q_paragraph.add_run(f"Question {(i // 2) + 1}: ") | |
| q_run.bold = True | |
| q_run.font.size = Inches(0.14) | |
| # ⚠️ Answer handling & saving likely continues in PART 4 | |
| except Exception as e: | |
| raise RuntimeError(f"Error while creating DOCX report: {str(e)}") | |
| # Initialize chatbot | |
| bot = PDFChatBot() | |
| def clear_chat(): | |
| """Clear chat history""" | |
| bot.chat_history = [] | |
| return [], "" | |
| def clear_all_data(): | |
| return bot.clear_data() | |
| def load_existing_data(): | |
| if bot.load_vector_store(): | |
| return "✅ Successfully loaded processed data!", "" | |
| else: | |
| return "❌ No processed data found.", "" | |
| def set_api_key(api_key: str): | |
| """ | |
| Set / update Google Gemini API key. | |
| Updated only in memory and environment variables. | |
| Will not be written to disk. | |
| """ | |
| key = (api_key or "").strip() | |
| if not key: | |
| return "❌ No API key provided. Please paste a valid GOOGLE_API_KEY." | |
| os.environ["GOOGLE_API_KEY"] = key | |
| # Reset embeddings to ensure re-initialization with new key | |
| try: | |
| bot.embeddings = None | |
| except Exception: | |
| pass | |
| return "✅ API key set (valid for this session only)." | |
| # Create custom theme | |
| custom_theme = gr.themes.Soft( | |
| primary_hue="blue", | |
| secondary_hue="gray", | |
| neutral_hue="slate", | |
| font=gr.themes.GoogleFont("Noto Sans TC"), | |
| font_mono=gr.themes.GoogleFont("JetBrains Mono"), | |
| ) | |
| # Create Gradio interface | |
| with gr.Blocks( | |
| title="PDF Intelligent Q&A System", | |
| theme=custom_theme, | |
| css=""" | |
| .gradio-container { | |
| max-width: 1200px !important; | |
| margin: auto !important; | |
| } | |
| .main-header { | |
| text-align: center; | |
| padding: 20px; | |
| background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); | |
| color: white; | |
| border-radius: 10px; | |
| margin-bottom: 20px; | |
| } | |
| .status-box { | |
| background-color: #f8f9fa; | |
| border-left: 4px solid #007bff; | |
| padding: 15px; | |
| border-radius: 5px; | |
| } | |
| .file-info { | |
| background-color: #e8f5e8; | |
| border-left: 4px solid #28a745; | |
| padding: 10px; | |
| border-radius: 5px; | |
| } | |
| """, | |
| ): | |
| # Main header section | |
| with gr.Row(): | |
| gr.HTML(""" | |
| <div class="main-header"> | |
| <h1>🤖 PDF Intelligent Q&A System</h1> | |
| <p>Based on Gemini 2.0 Flash RAG technology | Supports multilingual Q&A</p> | |
| </div> | |
| """) | |
| # Main feature area | |
| with gr.Tab("📁 File Management", id="file_tab"): | |
| with gr.Row(): | |
| with gr.Column(scale=3): | |
| # File upload section | |
| with gr.Group(): | |
| gr.Markdown("### 📤 Upload PDF Files") | |
| api_key_box = gr.Textbox( | |
| label="Google API Key (optional – paste after deployment)", | |
| placeholder="Key starting with sk- or AIza (not saved to disk)", | |
| type="password", | |
| ) | |
| set_key_btn = gr.Button("🔑 Set API Key") | |
| file_upload = gr.File( | |
| file_count="multiple", | |
| file_types=[".pdf"], | |
| label="Select PDF files", | |
| height=150, | |
| ) | |
| use_gemini_toggle = gr.Checkbox( | |
| label="Use Gemini to parse PDF (supports scanned images)", | |
| value=False, | |
| ) | |
| # Processing options | |
| with gr.Row(): | |
| process_btn = gr.Button( | |
| "🚀 Start Processing", | |
| variant="primary", | |
| size="lg", | |
| scale=2, | |
| ) | |
| load_btn = gr.Button( | |
| "📂 Load processed data", | |
| variant="secondary", | |
| scale=1, | |
| ) | |
| clear_btn = gr.Button( | |
| "🗑️ Clear all data", | |
| variant="stop", | |
| scale=1, | |
| ) | |
| with gr.Column(scale=2): | |
| # Status display section | |
| with gr.Group(): | |
| gr.Markdown("### 📊 Processing Status") | |
| status_text = gr.Textbox( | |
| label="Progress", | |
| lines=6, | |
| interactive=False, | |
| elem_classes=["status-box"], | |
| ) | |
| # File list | |
| gr.Markdown("### 📋 Processed Files") | |
| file_list = gr.Textbox( | |
| label="File list", | |
| lines=8, | |
| interactive=False, | |
| elem_classes=["file-info"], | |
| ) | |
| # Chat tab | |
| with gr.Tab("💬 Intelligent Chat", id="chat_tab"): | |
| with gr.Row(): | |
| with gr.Column(scale=4): | |
| chatbot = gr.Chatbot( | |
| label="💬 Chat History", | |
| height=600, | |
| show_copy_button=True, | |
| type="messages", | |
| avatar_images=["👤", "🤖"], | |
| ) | |
| with gr.Column(scale=1): | |
| # Sidebar features | |
| with gr.Group(): | |
| gr.Markdown("### ⚙️ Q&A Settings") | |
| temperature = gr.Slider( | |
| minimum=0.1, | |
| maximum=1.0, | |
| value=0.3, | |
| step=0.05, | |
| label="Temperature", | |
| ) | |
| # Input area | |
| with gr.Row(): | |
| question_input = gr.Textbox( | |
| placeholder="Please enter your question... (supports multiple languages)", | |
| label="💭 Question Input", | |
| lines=3, | |
| scale=4, | |
| max_lines=5, | |
| ) | |
| ask_btn = gr.Button( | |
| "📤 Send Question", | |
| variant="primary", | |
| scale=1, | |
| size="lg", | |
| ) | |
| # Quick actions | |
| with gr.Row(): | |
| clear_chat_btn = gr.Button( | |
| "🧹 Clear Chat", | |
| variant="secondary", | |
| scale=1, | |
| ) | |
| download_btn = gr.Button( | |
| "📥 Download Chat History", | |
| variant="primary", | |
| scale=1, | |
| ) | |
| export_btn = gr.Button( | |
| "📄 Export to Word", | |
| variant="secondary", | |
| scale=1, | |
| ) | |
| # Example questions | |
| with gr.Group(): | |
| gr.Markdown("### 💡 Example Questions") | |
| gr.Examples( | |
| examples=[ | |
| "What is the main content of this document?", | |
| "Please summarize the key points and concepts.", | |
| "What important data or statistics are mentioned?", | |
| "Can you explain a specific topic in detail?", | |
| "What is the conclusion of the document?", | |
| "What important recommendations are provided?", | |
| "What risks or challenges are mentioned?", | |
| "Compare the different viewpoints discussed.", | |
| ], | |
| inputs=question_input, | |
| label="Click an example to autofill", | |
| ) | |
| # Hidden file download component | |
| download_file = gr.File(visible=False) | |
| # Download handler | |
| def handle_download(): | |
| file_path = download_chat_history() # ⚠️ must exist elsewhere | |
| if file_path: | |
| return gr.update(value=file_path, visible=True) | |
| else: | |
| gr.Warning("No chat history available for download!") | |
| return gr.update(visible=False) | |
| # Event handlers | |
| process_btn.click( | |
| fn=upload_and_process, # ⚠️ must exist | |
| inputs=[file_upload, use_gemini_toggle], | |
| outputs=[status_text, file_list], | |
| show_progress=True, | |
| ) | |
| set_key_btn.click( | |
| fn=set_api_key, | |
| inputs=[api_key_box], | |
| outputs=[status_text], | |
| ) | |
| load_btn.click( | |
| fn=load_existing_data, | |
| outputs=[status_text, file_list], | |
| ) | |
| clear_btn.click( | |
| fn=clear_all_data, | |
| outputs=[status_text, file_list], | |
| ) | |
| ask_btn.click( | |
| fn=ask_question, # ⚠️ must exist | |
| inputs=[question_input, chatbot, temperature, max_tokens, search_k], | |
| outputs=[chatbot, question_input], | |
| ) | |