Cathaltwo's picture
Update app.py
2e11006 verified
import os
import json
import gradio as gr
import pandas as pd
from io import StringIO
import PIL.Image
# Ensure you have the new library: pip install google-genai
import google.genai as genai
from google.genai import types
import pdf2image
# Default image options (uploaded to Hugging Face Space)
DEFAULT_IMAGE = "default_image.jpg"
def extract_first_image(file_path):
"""Extracts the first image from a PDF or returns the image file directly."""
if file_path is None:
return DEFAULT_IMAGE
try:
if file_path.lower().endswith('.pdf'):
images = pdf2image.convert_from_path(file_path, first_page=1, last_page=1)
if images:
image_path = "temp_image.jpg"
images[0].save(image_path, 'JPEG')
return image_path
else:
raise ValueError("PDF contains no images on the first page.")
except Exception as e:
raise gr.Error(f"Error extracting image from PDF: {e}")
return file_path # If it's an image, return as is
def convert_response_to_json(response_text):
"""Placeholder for JSON conversion, not directly used in the new table flow."""
try:
return json.loads(response_text)
except json.JSONDecodeError:
return {"pipe": ""}
def output_string_txt_to_df(string):
formatted_string = string.replace('.', '')
data_io = StringIO(formatted_string)
try:
df = pd.read_csv(data_io, sep='|', on_bad_lines='skip')
return df
except Exception as e:
return pd.DataFrame([{"Error": str(e)}])
# UNIVERSAL STEP: Generates full Markdown from the image
def generate_full_markdown_from_image(image_path, api_key):
"""Converts the entire document image to a comprehensive markdown string using Gemini AI."""
# New way to configure API key:
client = genai.Client(api_key=api_key)
# Changed model to gemini-2.5-pro as per user's deep thinking example
model_name = "gemini-2.5-pro"
system_prompt = """You are an expert in extracting and structuring all relevant information from historical documents into comprehensive markdown format, including both narrative text and tabular data. Your primary goal is to produce a single, comprehensive, and highly structured output that makes the document's content easily consumable.
Overall Output Structure:
The output must be a single string containing two main sections:
1. Textual Content: Extracted titles and paragraphs.
2. Tabular Data: A comprehensive, flattened tabular dataset.
Output Format Details:
* For Textual Content:
* Main Title: If present, identify the primary title of the document and format it
* Paragraphs: Extract all significant paragraphs. Each paragraph should be on its own line
* Ensure logical flow for paragraphs, maintaining their original order.
* use Markdown Formating
* For Tabular Data:
* The table must be clearly separated from the textual content (e.g., by a few blank lines).
* Columns must be delimited by pipes (|) and rows by newlines (\\n).
* Ensure no leading or trailing spaces around the pipe delimiters within the table.
* Remember pipes (|) at the start of rows and end of rows
Extraction Rules:
1. Tabular Data - Spanning Rows as Contextual Columns:
* Identify rows that appear to span across all columns (e.g., acting as section titles, categories, or group indicators for subsequent data).
* For each such 'spanning row', extract its content and add it as a new column (named 'Section' or 'Category' - choose whichever fits best, 'Section' is a good default) to all subsequent data rows.
* This new column's value should persist for all rows until another spanning row is encountered. This process effectively flattens hierarchical or grouped data into a single, continuous table, providing clear context for each record.
2. Tabular Data - Primary Headers:
* For tables with multi-level headers, use the most detailed header row (the one containing the maximum number of distinct data columns) as the primary header for your output table.
* Higher-level header information should be integrated into the 'Section' column if it provides a logical grouping, or combined with primary header names if it clarifies the column's meaning.
3. Data Integrity:
* Preserve data types (e.g., numbers, dates) where evident.
* Represent missing or unreadable data as empty cells.
4. Completeness:
* Extract all relevant text and tabular data from the document.
* Integrate all identified tables into the single, comprehensive tabular dataset using the rules above.
"""
generation_config = types.GenerateContentConfig( # Use genai.types.GenerationConfig for proper typing
temperature=0.7,
top_p=0.95,
top_k=40,
max_output_tokens=8192,
system_instruction=system_prompt,
# INTEGRATE DEEP THINKING HERE
thinking_config=genai.types.ThinkingConfig(
thinking_budget=-1,
),
response_mime_type="text/plain",
)
try:
# Use client.models.GenerativeModel
image = PIL.Image.open(image_path)
gr.Info("Converting document to full markdown (with deep thinking)... This may take a moment.")
response =client.models.generate_content(model = model_name,
contents = [image, "\n\n", ""],
config = generation_config)
return response.text
except Exception as e:
raise gr.Error(f"Error converting document to full markdown with Gemini: {e}")
# Extracts tables FROM a markdown string (no direct Gemini call here, so no deep thinking)
def extract_tables_from_markdown(markdown_string):
"""
Identifies and extracts the first GitHub Flavored Markdown (GFM) table
from a markdown string, then converts it to HTML and CSV.
"""
lines = markdown_string.strip().split('\n')
table_lines = []
in_potential_table = False
# Simple state machine to find the first table block
for i, line in enumerate(lines):
stripped_line = line.strip()
if stripped_line.startswith('|') and '|' in stripped_line:
table_lines.append(line)
in_potential_table = True
elif in_potential_table and all(c in ('-', '|', ' ') for c in stripped_line) and '|' in stripped_line:
# This is a separator line (e.g., |---|---|)
table_lines.append(line)
elif in_potential_table and not stripped_line: # Empty line after table data
# End of table block detected by empty line
if len(table_lines) > 1 and any(all(c in ('-', '|', ' ') for c in l) for l in table_lines[1:]): # Must have header and separator
break # Found the first table block
else: # If not a valid table block, reset
table_lines = []
in_potential_table = False
elif in_potential_table: # Non-pipe line within a potential table block means it's not a table
table_lines = []
in_potential_table = False
if not table_lines:
gr.Warning("No valid GitHub Flavored Markdown (GFM) tables were identified in the document's content.")
return "", None # Return empty HTML and CSV if no tables found
markdown_table_string = "\n".join(table_lines)
try:
df = output_string_txt_to_df(markdown_table_string)
if df.empty:
gr.Warning("Identified Markdown table was empty after parsing.")
return "", None
#csv_file_path = "extracted_markdown_table.csv"
#df.to_csv(csv_file_path, index=False)
return df
except gr.Error as e:
# Re-raise error from output_string_txt_to_df
raise e
except Exception as e:
raise gr.Error(f"Error converting identified Markdown table to DataFrame: {e}")
# Answers specific queries about the document using Gemini AI (on the original image)
def query_document(image_path, query, api_key):
"""Answers specific queries about the document using Gemini AI."""
if not query:
gr.Warning("Please enter a query to get a response.")
return "" # Return empty response if no query
# New way to configure API key:
client = genai.Client(api_key=api_key)
# Changed model to gemini-2.5-pro as per user's deep thinking example
model_name = "gemini-2.5-pro"
system_prompt = f"Answer the following question based on the content of the historical document: '{query}'. Provide a concise and accurate answer."
generation_config = types.GenerateContentConfig( # Use genai.types.GenerationConfig for proper typing
temperature=0.5,
top_p=0.95,
top_k=40,
max_output_tokens=8192,
# INTEGRATE DEEP THINKING HERE
thinking_config=genai.types.ThinkingConfig(
thinking_budget=-1,
),
response_mime_type="text/plain",
)
try:
# Use client.models.GenerativeModel
image = PIL.Image.open(image_path)
gr.Info(f"Processing query: '{query}' (with deep thinking)...")
response =client.models.generate_content(model = model_name,
contents = [image, f"\n\nUser query: {query}"],
config = generation_config)
return response.text
except Exception as e:
raise gr.Error(f"Error processing query with Gemini: {e}")
# --- Refactored process_file to implement the new workflow ---
def process_file(file, api_key, processing_mode, query_text):
# Initialize all outputs to their "cleared" state
cleared_df = ""
#cleared_csv = None
cleared_markdown = ""
cleared_query_response = ""
# --- Step 1: Validate API Key ---
if not api_key:
raise gr.Error("Please enter your Google API key to proceed.")
# --- Step 2: Extract First Image ---
try:
image_path = extract_first_image(file)
except gr.Error as e:
raise e # Re-raise Gradio errors for display
except Exception as e:
raise gr.Error(f"An unexpected error occurred during image extraction: {e}")
# --- Step 3: Convert Image to Full Markdown (UNIVERSAL STEP) ---
full_document_markdown = ""
try:
full_document_markdown = generate_full_markdown_from_image(image_path, api_key)
except gr.Error as e:
raise e # Re-raise Gradio errors for display
except Exception as e:
raise gr.Error(f"An unexpected error occurred during initial markdown conversion: {e}")
# --- Step 4: Branch based on Processing Mode ---
try:
if processing_mode == "Generate Markdown & Extract Tables":
df_result = extract_tables_from_markdown(full_document_markdown)
# In this mode, we show BOTH the full markdown AND the extracted table
return df_result, full_document_markdown, cleared_query_response
elif processing_mode == "Specific Query":
# For "Specific Query", we use the original image for multimodal querying.
query_result = query_document(image_path, query_text, api_key)
return cleared_df, cleared_markdown, query_result
except gr.Error as e:
raise e # Propagate Gradio errors for display
except Exception as e:
raise gr.Error(f"An unexpected error occurred during the selected processing mode: {e}")
# Create the Gradio interface
with gr.Blocks(title="Historical Document OCR") as app:
gr.Markdown("# Historical Document OCR")
gr.Markdown("This app processes documents by generating full markdown, extracting tables, or answering specific queries.")
with gr.Row():
with gr.Column():
api_key_input = gr.Textbox(label="Google API Key", placeholder="Enter your Gemini API key here", type="password")
file_input = gr.File(label="Upload PDF or image")
# Updated radio button options
processing_mode_radio = gr.Radio(
["Generate Markdown & Extract Tables", "Specific Query"],
label="Choose Processing Mode",
value="Generate Markdown & Extract Tables" # Default selection
)
query_input = gr.Textbox(label="Specific Query (if 'Specific Query' mode selected)", placeholder="e.g., What is the total revenue in 1850?", interactive=False)
process_button = gr.Button("Process File")
image_display = gr.Image(value=DEFAULT_IMAGE, label="Uploaded Image")
with gr.Column():
output_df = gr.Dataframe(label="Extracted Table",interactive=True,
show_copy_button=True,
show_fullscreen_button=True)
# This is the button you want to trigger the download based on current DataFrame content
process_dataframe_changes_button = gr.Button("Process dataframe changes")
output_csv = gr.File(label="Download CSV")
output_markdown = gr.Markdown(label="Document in Markdown")
output_query_response = gr.Markdown(label="Query Response")
def update_image(file):
return extract_first_image(file) if file else DEFAULT_IMAGE
def update_query_input_interactivity(mode):
# Clear all outputs when the mode changes to provide a clean slate
return gr.update(interactive=(mode == "Specific Query")), pd.DataFrame(), "", ""
def export_csv(df_to_export): # Changed parameter name for clarity
if df_to_export is None or df_to_export.empty:
gr.Warning("No data to export.")
return None # Return None to clear the download component
csv_file_path = "output.csv"
df_to_export.to_csv(csv_file_path, index=False)
return gr.update(value=csv_file_path, visible=True)
file_input.change(fn=update_image, inputs=[file_input], outputs=image_display)
# --- FIX STARTS HERE ---
# The 'output_df' must be passed as an input to the 'export_csv' function
process_dataframe_changes_button.click(
fn=export_csv,
inputs=[output_df], # Pass the current state of output_df
outputs=output_csv
)
# --- FIX ENDS HERE ---
# Event listener for radio button change: updates query input interactivity and clears outputs
processing_mode_radio.change(
fn=update_query_input_interactivity,
inputs=[processing_mode_radio],
outputs=[query_input, output_df, output_markdown, output_query_response]
)
# Main button click event to process the file based on selected mode
process_button.click(
fn=process_file,
inputs=[file_input, api_key_input, processing_mode_radio, query_input],
outputs=[output_df, output_markdown, output_query_response]
)
if __name__ == "__main__":
app.launch()