Spaces:
Sleeping
Sleeping
| import os | |
| import json | |
| import gradio as gr | |
| import pandas as pd | |
| from io import StringIO | |
| import PIL.Image | |
| # Ensure you have the new library: pip install google-genai | |
| import google.genai as genai | |
| from google.genai import types | |
| import pdf2image | |
| # Default image options (uploaded to Hugging Face Space) | |
| DEFAULT_IMAGE = "default_image.jpg" | |
| def extract_first_image(file_path): | |
| """Extracts the first image from a PDF or returns the image file directly.""" | |
| if file_path is None: | |
| return DEFAULT_IMAGE | |
| try: | |
| if file_path.lower().endswith('.pdf'): | |
| images = pdf2image.convert_from_path(file_path, first_page=1, last_page=1) | |
| if images: | |
| image_path = "temp_image.jpg" | |
| images[0].save(image_path, 'JPEG') | |
| return image_path | |
| else: | |
| raise ValueError("PDF contains no images on the first page.") | |
| except Exception as e: | |
| raise gr.Error(f"Error extracting image from PDF: {e}") | |
| return file_path # If it's an image, return as is | |
| def convert_response_to_json(response_text): | |
| """Placeholder for JSON conversion, not directly used in the new table flow.""" | |
| try: | |
| return json.loads(response_text) | |
| except json.JSONDecodeError: | |
| return {"pipe": ""} | |
| def output_string_txt_to_df(string): | |
| formatted_string = string.replace('.', '') | |
| data_io = StringIO(formatted_string) | |
| try: | |
| df = pd.read_csv(data_io, sep='|', on_bad_lines='skip') | |
| return df | |
| except Exception as e: | |
| return pd.DataFrame([{"Error": str(e)}]) | |
| # UNIVERSAL STEP: Generates full Markdown from the image | |
| def generate_full_markdown_from_image(image_path, api_key): | |
| """Converts the entire document image to a comprehensive markdown string using Gemini AI.""" | |
| # New way to configure API key: | |
| client = genai.Client(api_key=api_key) | |
| # Changed model to gemini-2.5-pro as per user's deep thinking example | |
| model_name = "gemini-2.5-pro" | |
| system_prompt = """You are an expert in extracting and structuring all relevant information from historical documents into comprehensive markdown format, including both narrative text and tabular data. Your primary goal is to produce a single, comprehensive, and highly structured output that makes the document's content easily consumable. | |
| Overall Output Structure: | |
| The output must be a single string containing two main sections: | |
| 1. Textual Content: Extracted titles and paragraphs. | |
| 2. Tabular Data: A comprehensive, flattened tabular dataset. | |
| Output Format Details: | |
| * For Textual Content: | |
| * Main Title: If present, identify the primary title of the document and format it | |
| * Paragraphs: Extract all significant paragraphs. Each paragraph should be on its own line | |
| * Ensure logical flow for paragraphs, maintaining their original order. | |
| * use Markdown Formating | |
| * For Tabular Data: | |
| * The table must be clearly separated from the textual content (e.g., by a few blank lines). | |
| * Columns must be delimited by pipes (|) and rows by newlines (\\n). | |
| * Ensure no leading or trailing spaces around the pipe delimiters within the table. | |
| * Remember pipes (|) at the start of rows and end of rows | |
| Extraction Rules: | |
| 1. Tabular Data - Spanning Rows as Contextual Columns: | |
| * Identify rows that appear to span across all columns (e.g., acting as section titles, categories, or group indicators for subsequent data). | |
| * For each such 'spanning row', extract its content and add it as a new column (named 'Section' or 'Category' - choose whichever fits best, 'Section' is a good default) to all subsequent data rows. | |
| * This new column's value should persist for all rows until another spanning row is encountered. This process effectively flattens hierarchical or grouped data into a single, continuous table, providing clear context for each record. | |
| 2. Tabular Data - Primary Headers: | |
| * For tables with multi-level headers, use the most detailed header row (the one containing the maximum number of distinct data columns) as the primary header for your output table. | |
| * Higher-level header information should be integrated into the 'Section' column if it provides a logical grouping, or combined with primary header names if it clarifies the column's meaning. | |
| 3. Data Integrity: | |
| * Preserve data types (e.g., numbers, dates) where evident. | |
| * Represent missing or unreadable data as empty cells. | |
| 4. Completeness: | |
| * Extract all relevant text and tabular data from the document. | |
| * Integrate all identified tables into the single, comprehensive tabular dataset using the rules above. | |
| """ | |
| generation_config = types.GenerateContentConfig( # Use genai.types.GenerationConfig for proper typing | |
| temperature=0.7, | |
| top_p=0.95, | |
| top_k=40, | |
| max_output_tokens=8192, | |
| system_instruction=system_prompt, | |
| # INTEGRATE DEEP THINKING HERE | |
| thinking_config=genai.types.ThinkingConfig( | |
| thinking_budget=-1, | |
| ), | |
| response_mime_type="text/plain", | |
| ) | |
| try: | |
| # Use client.models.GenerativeModel | |
| image = PIL.Image.open(image_path) | |
| gr.Info("Converting document to full markdown (with deep thinking)... This may take a moment.") | |
| response =client.models.generate_content(model = model_name, | |
| contents = [image, "\n\n", ""], | |
| config = generation_config) | |
| return response.text | |
| except Exception as e: | |
| raise gr.Error(f"Error converting document to full markdown with Gemini: {e}") | |
| # Extracts tables FROM a markdown string (no direct Gemini call here, so no deep thinking) | |
| def extract_tables_from_markdown(markdown_string): | |
| """ | |
| Identifies and extracts the first GitHub Flavored Markdown (GFM) table | |
| from a markdown string, then converts it to HTML and CSV. | |
| """ | |
| lines = markdown_string.strip().split('\n') | |
| table_lines = [] | |
| in_potential_table = False | |
| # Simple state machine to find the first table block | |
| for i, line in enumerate(lines): | |
| stripped_line = line.strip() | |
| if stripped_line.startswith('|') and '|' in stripped_line: | |
| table_lines.append(line) | |
| in_potential_table = True | |
| elif in_potential_table and all(c in ('-', '|', ' ') for c in stripped_line) and '|' in stripped_line: | |
| # This is a separator line (e.g., |---|---|) | |
| table_lines.append(line) | |
| elif in_potential_table and not stripped_line: # Empty line after table data | |
| # End of table block detected by empty line | |
| if len(table_lines) > 1 and any(all(c in ('-', '|', ' ') for c in l) for l in table_lines[1:]): # Must have header and separator | |
| break # Found the first table block | |
| else: # If not a valid table block, reset | |
| table_lines = [] | |
| in_potential_table = False | |
| elif in_potential_table: # Non-pipe line within a potential table block means it's not a table | |
| table_lines = [] | |
| in_potential_table = False | |
| if not table_lines: | |
| gr.Warning("No valid GitHub Flavored Markdown (GFM) tables were identified in the document's content.") | |
| return "", None # Return empty HTML and CSV if no tables found | |
| markdown_table_string = "\n".join(table_lines) | |
| try: | |
| df = output_string_txt_to_df(markdown_table_string) | |
| if df.empty: | |
| gr.Warning("Identified Markdown table was empty after parsing.") | |
| return "", None | |
| #csv_file_path = "extracted_markdown_table.csv" | |
| #df.to_csv(csv_file_path, index=False) | |
| return df | |
| except gr.Error as e: | |
| # Re-raise error from output_string_txt_to_df | |
| raise e | |
| except Exception as e: | |
| raise gr.Error(f"Error converting identified Markdown table to DataFrame: {e}") | |
| # Answers specific queries about the document using Gemini AI (on the original image) | |
| def query_document(image_path, query, api_key): | |
| """Answers specific queries about the document using Gemini AI.""" | |
| if not query: | |
| gr.Warning("Please enter a query to get a response.") | |
| return "" # Return empty response if no query | |
| # New way to configure API key: | |
| client = genai.Client(api_key=api_key) | |
| # Changed model to gemini-2.5-pro as per user's deep thinking example | |
| model_name = "gemini-2.5-pro" | |
| system_prompt = f"Answer the following question based on the content of the historical document: '{query}'. Provide a concise and accurate answer." | |
| generation_config = types.GenerateContentConfig( # Use genai.types.GenerationConfig for proper typing | |
| temperature=0.5, | |
| top_p=0.95, | |
| top_k=40, | |
| max_output_tokens=8192, | |
| # INTEGRATE DEEP THINKING HERE | |
| thinking_config=genai.types.ThinkingConfig( | |
| thinking_budget=-1, | |
| ), | |
| response_mime_type="text/plain", | |
| ) | |
| try: | |
| # Use client.models.GenerativeModel | |
| image = PIL.Image.open(image_path) | |
| gr.Info(f"Processing query: '{query}' (with deep thinking)...") | |
| response =client.models.generate_content(model = model_name, | |
| contents = [image, f"\n\nUser query: {query}"], | |
| config = generation_config) | |
| return response.text | |
| except Exception as e: | |
| raise gr.Error(f"Error processing query with Gemini: {e}") | |
| # --- Refactored process_file to implement the new workflow --- | |
| def process_file(file, api_key, processing_mode, query_text): | |
| # Initialize all outputs to their "cleared" state | |
| cleared_df = "" | |
| #cleared_csv = None | |
| cleared_markdown = "" | |
| cleared_query_response = "" | |
| # --- Step 1: Validate API Key --- | |
| if not api_key: | |
| raise gr.Error("Please enter your Google API key to proceed.") | |
| # --- Step 2: Extract First Image --- | |
| try: | |
| image_path = extract_first_image(file) | |
| except gr.Error as e: | |
| raise e # Re-raise Gradio errors for display | |
| except Exception as e: | |
| raise gr.Error(f"An unexpected error occurred during image extraction: {e}") | |
| # --- Step 3: Convert Image to Full Markdown (UNIVERSAL STEP) --- | |
| full_document_markdown = "" | |
| try: | |
| full_document_markdown = generate_full_markdown_from_image(image_path, api_key) | |
| except gr.Error as e: | |
| raise e # Re-raise Gradio errors for display | |
| except Exception as e: | |
| raise gr.Error(f"An unexpected error occurred during initial markdown conversion: {e}") | |
| # --- Step 4: Branch based on Processing Mode --- | |
| try: | |
| if processing_mode == "Generate Markdown & Extract Tables": | |
| df_result = extract_tables_from_markdown(full_document_markdown) | |
| # In this mode, we show BOTH the full markdown AND the extracted table | |
| return df_result, full_document_markdown, cleared_query_response | |
| elif processing_mode == "Specific Query": | |
| # For "Specific Query", we use the original image for multimodal querying. | |
| query_result = query_document(image_path, query_text, api_key) | |
| return cleared_df, cleared_markdown, query_result | |
| except gr.Error as e: | |
| raise e # Propagate Gradio errors for display | |
| except Exception as e: | |
| raise gr.Error(f"An unexpected error occurred during the selected processing mode: {e}") | |
| # Create the Gradio interface | |
| with gr.Blocks(title="Historical Document OCR") as app: | |
| gr.Markdown("# Historical Document OCR") | |
| gr.Markdown("This app processes documents by generating full markdown, extracting tables, or answering specific queries.") | |
| with gr.Row(): | |
| with gr.Column(): | |
| api_key_input = gr.Textbox(label="Google API Key", placeholder="Enter your Gemini API key here", type="password") | |
| file_input = gr.File(label="Upload PDF or image") | |
| # Updated radio button options | |
| processing_mode_radio = gr.Radio( | |
| ["Generate Markdown & Extract Tables", "Specific Query"], | |
| label="Choose Processing Mode", | |
| value="Generate Markdown & Extract Tables" # Default selection | |
| ) | |
| query_input = gr.Textbox(label="Specific Query (if 'Specific Query' mode selected)", placeholder="e.g., What is the total revenue in 1850?", interactive=False) | |
| process_button = gr.Button("Process File") | |
| image_display = gr.Image(value=DEFAULT_IMAGE, label="Uploaded Image") | |
| with gr.Column(): | |
| output_df = gr.Dataframe(label="Extracted Table",interactive=True, | |
| show_copy_button=True, | |
| show_fullscreen_button=True) | |
| # This is the button you want to trigger the download based on current DataFrame content | |
| process_dataframe_changes_button = gr.Button("Process dataframe changes") | |
| output_csv = gr.File(label="Download CSV") | |
| output_markdown = gr.Markdown(label="Document in Markdown") | |
| output_query_response = gr.Markdown(label="Query Response") | |
| def update_image(file): | |
| return extract_first_image(file) if file else DEFAULT_IMAGE | |
| def update_query_input_interactivity(mode): | |
| # Clear all outputs when the mode changes to provide a clean slate | |
| return gr.update(interactive=(mode == "Specific Query")), pd.DataFrame(), "", "" | |
| def export_csv(df_to_export): # Changed parameter name for clarity | |
| if df_to_export is None or df_to_export.empty: | |
| gr.Warning("No data to export.") | |
| return None # Return None to clear the download component | |
| csv_file_path = "output.csv" | |
| df_to_export.to_csv(csv_file_path, index=False) | |
| return gr.update(value=csv_file_path, visible=True) | |
| file_input.change(fn=update_image, inputs=[file_input], outputs=image_display) | |
| # --- FIX STARTS HERE --- | |
| # The 'output_df' must be passed as an input to the 'export_csv' function | |
| process_dataframe_changes_button.click( | |
| fn=export_csv, | |
| inputs=[output_df], # Pass the current state of output_df | |
| outputs=output_csv | |
| ) | |
| # --- FIX ENDS HERE --- | |
| # Event listener for radio button change: updates query input interactivity and clears outputs | |
| processing_mode_radio.change( | |
| fn=update_query_input_interactivity, | |
| inputs=[processing_mode_radio], | |
| outputs=[query_input, output_df, output_markdown, output_query_response] | |
| ) | |
| # Main button click event to process the file based on selected mode | |
| process_button.click( | |
| fn=process_file, | |
| inputs=[file_input, api_key_input, processing_mode_radio, query_input], | |
| outputs=[output_df, output_markdown, output_query_response] | |
| ) | |
| if __name__ == "__main__": | |
| app.launch() |