Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| import fitz # PyMuPDF | |
| import os | |
| import tempfile | |
| import requests | |
| from sklearn.feature_extraction.text import TfidfVectorizer | |
| from sklearn.metrics.pairwise import cosine_similarity | |
| from datetime import datetime | |
| # === CONFIG CHECK === | |
| if not os.getenv("GROQ_API_KEY"): | |
| print("WARNING: GROQ_API_KEY environment variable not set. API calls will fail.") | |
| # === Globals === | |
| vectorizer = TfidfVectorizer(stop_words='english') | |
| # === UTILITY FUNCTIONS === | |
| """ def call_groq_api(prompt): | |
| api_key = os.getenv("GROQ_API_KEY") | |
| if not api_key: | |
| return "Error: GROQ_API_KEY environment variable not set." | |
| headers = {"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"} | |
| data = {"model": "llama-3.1-70b-versatile", "messages": [{"role": "user", "content": prompt}]} | |
| try: | |
| response = requests.post("https://api.groq.com/openai/v1/chat/completions", json=data, headers=headers) | |
| if response.status_code != 200: | |
| return f"API Error {response.status_code}: {response.text}" | |
| return response.json()["choices"][0]["message"]["content"] | |
| except requests.exceptions.RequestException as e: | |
| return f"API Error: {str(e)}" | |
| except (KeyError, IndexError) as e: | |
| return f"Error parsing API response: {str(e)}" | |
| """ | |
| def call_groq_api(prompt): | |
| api_key = os.getenv("GROQ_API_KEY") | |
| if not api_key: | |
| return "Error: GROQ_API_KEY environment variable not set." | |
| headers = { | |
| "Authorization": f"Bearer {api_key}", | |
| "Content-Type": "application/json" | |
| } | |
| data = { | |
| "model": "llama-3.3-70b-versatile", | |
| "messages": [{"role": "user", "content": prompt}], | |
| "temperature": 0.7 | |
| } | |
| try: | |
| response = requests.post("https://api.groq.com/openai/v1/chat/completions", json=data, headers=headers) | |
| if response.status_code != 200: | |
| return f"API Error {response.status_code}: {response.text}" | |
| result = response.json() | |
| return result["choices"][0]["message"]["content"] | |
| except requests.exceptions.RequestException as e: | |
| return f"Network Error: {e}" | |
| except Exception as e: | |
| return f"Unexpected Error: {e}" | |
| def extract_text_from_pdfs(pdf_files): | |
| chunks, pages, file_names = [], [], [] | |
| for file in pdf_files: | |
| try: | |
| doc = fitz.open(file.name) | |
| for page_num, page in enumerate(doc, start=1): | |
| text = page.get_text().strip() | |
| if text: | |
| chunks.append(text) | |
| pages.append(page_num) | |
| file_names.append(os.path.basename(file.name)) | |
| except Exception as e: | |
| print(f"Error processing {file.name}: {e}") | |
| return chunks, pages, file_names | |
| def retrieve_context(query, chunks, pages, file_names, top_k=3): | |
| all_texts = chunks + [query] | |
| tfidf_matrix = vectorizer.fit_transform(all_texts) | |
| query_vec = tfidf_matrix[-1] | |
| similarities = cosine_similarity(query_vec, tfidf_matrix[:-1]).flatten() | |
| if max(similarities) < 0.2: | |
| return "Ask a relevant question.", [], [] | |
| top_indices = similarities.argsort()[-top_k:][::-1] | |
| selected_chunks = [chunks[i] for i in top_indices] | |
| references = [f"{file_names[i]} (p.{pages[i]})" for i in top_indices] | |
| return "\n".join(selected_chunks), selected_chunks, references | |
| def download_chat(chat_history): | |
| if not chat_history: | |
| return None | |
| timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S") | |
| filename = f"chat_{timestamp}.txt" | |
| path = os.path.join(tempfile.gettempdir(), filename) | |
| with open(path, "w", encoding="utf-8") as f: | |
| for q, a in chat_history: | |
| f.write(f"Q: {q}\nA: {a}\n\n") | |
| return path | |
| # === Main Q&A Logic === | |
| def answer_question(text_input, pdf_files, chat_history): | |
| if chat_history is None: | |
| chat_history = [] | |
| if not text_input: | |
| return "β Please type a question.", chat_history, chat_history | |
| if not pdf_files: | |
| return "β Please upload PDF files first.", chat_history, chat_history | |
| chunks, pages, file_names = extract_text_from_pdfs(pdf_files) | |
| if not chunks: | |
| return "β Could not extract text from PDFs.", chat_history, chat_history | |
| context, matched_chunks, references = retrieve_context(text_input, chunks, pages, file_names) | |
| if context == "Ask a relevant question.": | |
| response = "β οΈ Ask a relevant question based on the PDFs." | |
| chat_history.append([text_input, response]) | |
| return response, chat_history, chat_history | |
| prompt = f"Answer the question using this context:\n\n{context}\n\nQuestion: {text_input}\n\nAnswer:" | |
| answer = call_groq_api(prompt) | |
| full_answer = f"{answer}\n\nπ Sources: {', '.join(references)}" | |
| chat_history.append([text_input, full_answer]) | |
| return full_answer, chat_history, chat_history | |
| # === Custom CSS === | |
| custom_css = """ | |
| .gradio-container { | |
| max-width: 900px !important; | |
| margin: auto; | |
| font-family: 'Segoe UI', sans-serif; | |
| } | |
| body { | |
| background-color: var(--background-primary); | |
| color: var(--body-text-color); | |
| } | |
| textarea, input, button { | |
| font-family: 'Segoe UI', sans-serif !important; | |
| } | |
| """ | |
| # === Launch UI === | |
| with gr.Blocks(css=custom_css, theme=gr.themes.Base()) as demo: | |
| gr.Markdown(""" | |
| # π§ **SmartPDF Q&A Bot** | |
| _Ask questions from your PDFs. Get answers with page references. Download chat history._ | |
| """, elem_id="title") | |
| chat_state = gr.State([]) | |
| with gr.Tabs(): | |
| with gr.Tab("π Upload PDFs"): | |
| gr.Markdown("### Step 1: Upload one or more PDF documents.") | |
| pdf_input = gr.File(label="π Upload PDF Files", file_types=[".pdf"], file_count="multiple") | |
| with gr.Tab("π¬ Ask Questions"): | |
| gr.Markdown("### Step 2: Ask a question about the uploaded documents.") | |
| with gr.Row(): | |
| text_input = gr.Textbox(label="β Type your question here", placeholder="e.g. What is the main idea of the first document?", lines=2) | |
| ask_btn = gr.Button("π Ask") | |
| answer_output = gr.Textbox(label="π§ Answer", lines=6) | |
| chatbox = gr.Dataframe(headers=["User", "Bot"], label="π¬ Chat History", interactive=False) | |
| with gr.Tab("π₯ Export Chat History"): | |
| gr.Markdown("### Step 3: Download your chat session.") | |
| download_btn = gr.Button("β¬οΈ Download Chat History") | |
| download_file = gr.File(label="π Your Chat File", visible=False) | |
| # === Button Event Binding === | |
| ask_btn.click( | |
| answer_question, | |
| inputs=[text_input, pdf_input, chat_state], | |
| outputs=[answer_output, chatbox, chat_state] | |
| ) | |
| download_btn.click( | |
| download_chat, | |
| inputs=[chat_state], | |
| outputs=download_file | |
| ).then(lambda: gr.update(visible=True), None, [download_file]) | |
| if __name__ == "__main__": | |
| demo.launch(share=True) | |