Spaces:
Sleeping
Sleeping
| import fitz | |
| import faiss | |
| import torch | |
| import requests | |
| import json | |
| import time | |
| import gradio as gr | |
| import datetime | |
| import os | |
| from sentence_transformers import SentenceTransformer | |
| embed_model = SentenceTransformer("intfloat/multilingual-e5-large-instruct") | |
| chunks = [] | |
| index = None | |
| qa_history = [] | |
| uploaded_filename = "" | |
| def split_into_chunks(text, chunk_size=512, overlap=64): | |
| words = text.split() | |
| return [" ".join(words[i:i+chunk_size]) for i in range(0, len(words), chunk_size - overlap)] | |
| def get_embeddings(texts): | |
| prompts = [f"query: {t}" for t in texts] | |
| return embed_model.encode(prompts, normalize_embeddings=True) | |
| def ask_question_stream(query, history): | |
| if index is None: | |
| yield "β Please upload and process a PDF first." | |
| return | |
| query_vec = get_embeddings([query])[0].reshape(1, -1) | |
| _, I = index.search(query_vec, 4) | |
| context = "\n".join([chunks[i] for i in I[0]]) | |
| prompt = f"""Answer the question using only the below context. | |
| Context: | |
| {context} | |
| Question: {query} | |
| Answer:""" | |
| headers = { | |
| "Authorization": f"Bearer {os.getenv('OPENROUTER_API_KEY')}", | |
| "Content-Type": "application/json" | |
| } | |
| payload = { | |
| "model": "deepseek/deepseek-chat-v3-0324:free", | |
| "messages": [{"role": "user", "content": prompt}] | |
| } | |
| try: | |
| res = requests.post("https://openrouter.ai/api/v1/chat/completions", headers=headers, data=json.dumps(payload)) | |
| res_json = res.json() | |
| response = res_json["choices"][0]["message"]["content"] | |
| qa_history.append((query, response)) | |
| words = response.strip().split() | |
| for i in range(len(words)): | |
| yield " ".join(words[:i+1]) | |
| time.sleep(0.02) | |
| except Exception as e: | |
| yield f"β Error: {str(e)}" | |
| def process_pdf(pdf_file): | |
| global chunks, index, uploaded_filename | |
| if pdf_file is None: | |
| return "β No file selected." | |
| uploaded_filename = pdf_file.name.split("/")[-1] | |
| doc = fitz.open(pdf_file.name) | |
| full_text = "\n".join([page.get_text() for page in doc]) | |
| chunks = split_into_chunks(full_text) | |
| embeddings = get_embeddings(chunks) | |
| if not embeddings.any(): | |
| return "β No text extracted." | |
| dim = embeddings[0].shape[0] | |
| index = faiss.IndexFlatIP(dim) | |
| index.add(embeddings) | |
| return "β Processed. Ready for Q&A." | |
| def clear_cache(): | |
| global chunks, index, qa_history, uploaded_filename | |
| chunks, index, qa_history, uploaded_filename = [], None, [], "" | |
| return "ποΈ Cache cleared." | |
| def export_history(): | |
| if not qa_history: | |
| return None | |
| timestamp = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S") | |
| filename = f"qa_history_{timestamp}.txt" | |
| with open(filename, "w", encoding="utf-8") as f: | |
| for q, a in qa_history: | |
| f.write(f"Q: {q}\nA: {a}\n\n") | |
| return filename | |
| custom_css = """ | |
| #popup-alert { | |
| background-color: #fef3c7; | |
| color: #92400e; | |
| padding: 12px 20px; | |
| border-radius: 8px; | |
| border: 1px solid #fcd34d; | |
| font-weight: bold; | |
| position: relative; | |
| margin-bottom: 12px; | |
| } | |
| #popup-alert button { | |
| position: absolute; | |
| top: 4px; | |
| right: 8px; | |
| background: none; | |
| color: #92400e; | |
| border: none; | |
| font-size: 18px; | |
| cursor: pointer; | |
| } | |
| """ | |
| with gr.Blocks(theme=gr.themes.Soft(), css=custom_css) as app: | |
| with gr.Row(): | |
| gr.HTML( | |
| """<div style='text-align:center'> | |
| <h2>π€ Chat with Your Research Paper</h2> | |
| <div id='popup-alert' style="display: inline-block;"> | |
| β οΈ Please click βClear Cacheβ before uploading a new PDF. | |
| <button onclick="this.parentElement.style.display='none';">×</button> | |
| </div> | |
| </div>""" | |
| ) | |
| with gr.Row(equal_height=False): | |
| with gr.Column(scale=1, min_width=250): | |
| pdf_upload = gr.File(label="π Upload PDF", file_types=[".pdf"]) | |
| upload_status = gr.Textbox(label="Status", interactive=False) | |
| clear_button = gr.Button("π§Ή Clear Cache") | |
| export_button = gr.Button("π€ Export Q&A History") | |
| download_box = gr.File(visible=False) | |
| pdf_upload.change(fn=process_pdf, inputs=pdf_upload, outputs=upload_status) | |
| clear_button.click(fn=clear_cache, outputs=upload_status) | |
| export_button.click(fn=export_history, inputs=[], outputs=download_box) | |
| download_box.change(lambda x: gr.update(visible=True) if x else gr.update(visible=False), inputs=download_box, outputs=download_box) | |
| with gr.Column(scale=4, min_width=600): | |
| gr.ChatInterface( | |
| fn=ask_question_stream, | |
| chatbot=gr.Chatbot(label="π PDF Chatbot", show_copy_button=True), | |
| textbox=gr.Textbox(placeholder="Ask about the uploaded paper...", container=False, scale=7), | |
| examples=["What is the conclusion?", "Who are the authors?", "What are the key findings?"] | |
| ) | |
| app.launch() | |