Spaces:
Sleeping
Sleeping
| import os | |
| import tempfile | |
| import time | |
| import re | |
| import logging | |
| from datetime import datetime | |
| import gradio as gr | |
| import google.generativeai as genai | |
| from PyPDF2 import PdfReader | |
| from tika import parser | |
| # Configure logging | |
| LOG_FILE = "pdf_processor_log.txt" | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format="%(asctime)s - %(levelname)s - %(message)s", | |
| handlers=[ | |
| logging.StreamHandler(), | |
| logging.FileHandler(LOG_FILE) | |
| ] | |
| ) | |
| logger = logging.getLogger("pdf_processor") | |
| # Try Unstructured.io | |
| try: | |
| from unstructured.partition.pdf import partition_pdf | |
| UNSTRUCTURED_AVAILABLE = True | |
| except ImportError: | |
| UNSTRUCTURED_AVAILABLE = False | |
| logger.warning("unstructured.partition.pdf not available; skipping that method") | |
| # Load Gemini API key from env (set in your Space Secrets) | |
| API_KEY = os.getenv("GOOGLE_API_KEY") | |
| if API_KEY: | |
| genai.configure(api_key=API_KEY) | |
| else: | |
| logger.warning("GOOGLE_API_KEY not set in environment.") | |
| EXTRACTED_TEXT = "" | |
| PDF_SECTIONS = [] | |
| EXTRACTION_METHOD = "" | |
| # --- Extraction Functions --- | |
| def extract_text_with_unstructured(pdf_path): | |
| try: | |
| logger.info("Extracting via Unstructured.io...") | |
| elements = partition_pdf(filename=pdf_path, extract_images_in_pdf=False) | |
| sections, current = [], {"title": "Introduction", "content": ""} | |
| for e in elements: | |
| if hasattr(e, "text") and (t := e.text.strip()): | |
| if len(t) < 80 and (t.isupper() or t.endswith(":") or re.match(r"^[0-9]+\.?\s+", t)): | |
| if current["content"]: | |
| sections.append(current) | |
| current = {"title": t, "content": ""} | |
| else: | |
| current["content"] += t + "\n\n" | |
| if current["content"]: | |
| sections.append(current) | |
| return sections | |
| except Exception as e: | |
| # Bubble up so process_pdf can catch & log | |
| logger.error(f"Unstructured extraction error: {e}", exc_info=True) | |
| raise | |
| def extract_text_with_pypdf(pdf_path): | |
| logger.info("Extracting via PyPDF2...") | |
| reader = PdfReader(pdf_path) | |
| full_text = "" | |
| for i, page in enumerate(reader.pages, start=1): | |
| txt = page.extract_text() | |
| if txt: | |
| full_text += f"\n\n--- Page {i} ---\n\n{txt}" | |
| parts = re.split(r"\n\s*([A-Z][A-Z\s]+:?|[0-9]+\.\s+[A-Z].*?)\s*\n", full_text) | |
| if len(parts) > 1: | |
| return [ | |
| {"title": parts[i].strip(), "content": parts[i + 1].strip()} | |
| for i in range(1, len(parts), 2) | |
| ] | |
| return [{"title": "Document", "content": full_text}] | |
| def extract_text_with_tika(pdf_path): | |
| logger.info("Extracting via Tika...") | |
| parsed = parser.from_file(pdf_path) | |
| lines = (parsed.get("content") or "").split("\n") | |
| sections, current = [], {"title": "Introduction", "content": ""} | |
| for ln in lines: | |
| ln = ln.strip() | |
| if not ln: | |
| continue | |
| if len(ln) < 80 and (ln.isupper() or ln.endswith(":") or re.match(r"^[0-9]+\.?\s+[A-Z]", ln)): | |
| if current["content"]: | |
| sections.append(current) | |
| current = {"title": ln, "content": ""} | |
| else: | |
| current["content"] += ln + "\n\n" | |
| if current["content"]: | |
| sections.append(current) | |
| return sections | |
| # --- Gemini calls --- | |
| def generate_greg_brockman_summary(content): | |
| model = genai.GenerativeModel("gemini-1.5-pro") | |
| prompt = f""" | |
| You are an expert document analyst specializing in proposal evaluation. | |
| # GREG BROCKMAN TEMPLATE STRUCTURE | |
| 1. GOAL: ... | |
| (rest of template) ... | |
| CONTENT: | |
| {content} | |
| """ | |
| try: | |
| resp = model.generate_content(prompt) | |
| return resp.text, None | |
| except Exception as e: | |
| logger.error(f"Summary error: {e}") | |
| return None, str(e) | |
| def answer_question_about_pdf(content, question): | |
| model = genai.GenerativeModel("gemini-1.5-pro") | |
| prompt = f""" | |
| You are a precise document analysis assistant. | |
| DOCUMENT CONTENT: | |
| {content} | |
| QUESTION: {question} | |
| """ | |
| try: | |
| resp = model.generate_content(prompt) | |
| return resp.text, None | |
| except Exception as e: | |
| logger.error(f"Q&A error: {e}") | |
| return None, str(e) | |
| # --- Handlers --- | |
| def process_pdf(pdf_file, progress=gr.Progress()): | |
| global EXTRACTED_TEXT, PDF_SECTIONS, EXTRACTION_METHOD | |
| if not API_KEY: | |
| return None, None, "β Set GOOGLE_API_KEY in Secrets.", "" | |
| if pdf_file is None: | |
| return None, None, "β No file uploaded.", "" | |
| # Determine path & write bytes if needed | |
| tmp_dir = tempfile.gettempdir() | |
| # Case 1: NamedString (inβmemory) with .name & .data | |
| if hasattr(pdf_file, "name") and hasattr(pdf_file, "data"): | |
| path = os.path.join(tmp_dir, pdf_file.name) | |
| with open(path, "wb") as f: | |
| f.write(pdf_file.data) | |
| # Case 2: direct filepath (str) | |
| elif isinstance(pdf_file, str): | |
| path = pdf_file | |
| # Case 3: fileβlike with .read() | |
| elif hasattr(pdf_file, "read"): | |
| path = os.path.join(tmp_dir, getattr(pdf_file, "name", "uploaded.pdf")) | |
| with open(path, "wb") as f: | |
| f.write(pdf_file.read()) | |
| else: | |
| return None, None, "β Unrecognized upload type", "" | |
| # Try methods in order | |
| methods = [] | |
| if UNSTRUCTURED_AVAILABLE: | |
| methods.append(("unstructured", extract_text_with_unstructured)) | |
| methods += [ | |
| ("pypdf", extract_text_with_pypdf), | |
| ("tika", extract_text_with_tika), | |
| ] | |
| sections = None | |
| last_err = "" | |
| for name, fn in methods: | |
| try: | |
| secs = fn(path) | |
| if secs: | |
| sections = secs | |
| EXTRACTION_METHOD = name | |
| break | |
| except Exception as e: | |
| last_err = f"{name} failed: {e}" | |
| logger.warning(last_err) | |
| if not sections: | |
| return None, None, "β Extraction failed", last_err | |
| # Combine & summarize | |
| combined, structure = "", "" | |
| for i, sec in enumerate(sections, 1): | |
| structure += f"{i}. {sec['title']}\n" | |
| chunk = f"## {sec['title']}\n{sec['content']}\n\n" | |
| combined += chunk if len(combined + chunk) < 30000 else f"## {sec['title']}\n[Truncated]\n\n" | |
| EXTRACTED_TEXT = combined | |
| PDF_SECTIONS = sections | |
| summary, err = generate_greg_brockman_summary(combined) | |
| if err: | |
| return None, structure, f"β {err}", combined | |
| return summary, structure, "β PDF processed", f"Used {EXTRACTION_METHOD}" | |
| def ask_question(question): | |
| if not API_KEY: | |
| return "β Set GOOGLE_API_KEY in Secrets." | |
| if not EXTRACTED_TEXT: | |
| return "β Process a PDF first." | |
| if not question.strip(): | |
| return "β Enter a question." | |
| ans, err = answer_question_about_pdf(EXTRACTED_TEXT, question) | |
| return ans if not err else f"β {err}" | |
| def view_log(): | |
| try: | |
| return open(LOG_FILE).read() | |
| except Exception as e: | |
| return f"Error reading log: {e}" | |
| def save_summary(summary): | |
| if not summary: | |
| return "β No summary to save." | |
| fn = f"summary_{datetime.now():%Y%m%d_%H%M%S}.txt" | |
| with open(fn, "w", encoding="utf-8") as f: | |
| f.write(summary) | |
| return f"β Saved to {fn}" | |
| def save_qa(question, answer): | |
| if not question or not answer: | |
| return "β Nothing to save." | |
| fn = f"qa_{datetime.now():%Y%m%d_%H%M%S}.txt" | |
| with open(fn, "w", encoding="utf-8") as f: | |
| f.write(f"Q: {question}\n\nA: {answer}") | |
| return f"β Saved to {fn}" | |
| # --- Gradio UI --- | |
| with gr.Blocks(title="PDF Analyzer with Gemini API") as app: | |
| gr.Markdown("# π PDF Analyzer with Gemini API") | |
| gr.Markdown("Upload a PDF, get a Greg Brockman style summary, and ask questions.") | |
| with gr.Tab("Setup"): | |
| gr.Markdown("β οΈ Make sure `GOOGLE_API_KEY` is set in your Space's Secrets.") | |
| with gr.Tab("PDF Processing"): | |
| with gr.Row(): | |
| pdf_file = gr.File(label="Upload PDF", file_types=[".pdf"]) | |
| proc_btn = gr.Button("Process PDF", variant="primary") | |
| status = gr.Markdown("Awaiting uploadβ¦") | |
| summary_out = gr.Textbox(label="Summary", lines=15) | |
| structure_out = gr.Textbox(label="Structure", lines=8) | |
| log_info = gr.Textbox(label="Internal Log", lines=5) | |
| proc_btn.click( | |
| fn=process_pdf, | |
| inputs=[pdf_file], | |
| outputs=[summary_out, structure_out, status, log_info] | |
| ) | |
| save_sum_btn = gr.Button("Save Summary") | |
| save_sum_status = gr.Markdown() | |
| save_sum_btn.click(save_summary, inputs=[summary_out], outputs=[save_sum_status]) | |
| with gr.Tab("Ask Questions"): | |
| question_in = gr.Textbox(label="Your Question", lines=2) | |
| ask_btn = gr.Button("Ask", variant="primary") | |
| answer_out = gr.Textbox(label="Answer", lines=10) | |
| ask_btn.click(ask_question, inputs=[question_in], outputs=[answer_out]) | |
| save_qa_btn = gr.Button("Save Q&A") | |
| save_qa_status = gr.Markdown() | |
| save_qa_btn.click(save_qa, inputs=[question_in, answer_out], outputs=[save_qa_status]) | |
| with gr.Tab("System Log"): | |
| refresh_btn = gr.Button("Refresh Log") | |
| sys_log = gr.Textbox(label="System Log", lines=20) | |
| refresh_btn.click(view_log, inputs=None, outputs=[sys_log]) | |
| if __name__ == "__main__": | |
| app.launch(server_name="0.0.0.0") | |