Spaces:
Sleeping
Sleeping
| import google.generativeai as genai | |
| genai.configure(api_key="AIzaSyDxp4tYzBK7RB8y3jIIF4TpyPZgCQP8NTY") | |
| import os | |
| import pandas as pd | |
| import io | |
| import tempfile | |
| from PyPDF2 import PdfReader | |
| import re | |
| import csv | |
| from PIL import Image | |
| import fitz # PyMuPDF | |
| from PIL import Image | |
| def configure_gemini(api_key: str): | |
| """Configure Gemini API with the provided key""" | |
| genai.configure(api_key=api_key) | |
| # def pdf_to_images(pdf_bytes: bytes) -> list: | |
| # """Convert PDF bytes to list of PIL Images""" | |
| # return convert_from_bytes(pdf_bytes) | |
| def pdf_to_images(pdf_bytes: bytes) -> list[Image.Image]: | |
| """Convert PDF to PIL Images using PyMuPDF (no poppler needed).""" | |
| doc = fitz.open(stream=pdf_bytes, filetype="pdf") | |
| images = [] | |
| for page in doc: | |
| pix = page.get_pixmap() | |
| img = Image.frombytes("RGB", [pix.width, pix.height], pix.samples) | |
| images.append(img) | |
| return images | |
| def process_local_pdf(pdf_bytes: bytes): | |
| """ | |
| Process a local PDF file with Gemini AI. | |
| Args: | |
| file_path: Path to the PDF file | |
| prompt: The prompt template to use (should contain {page_num} if needed) | |
| api_key: Your Google AI Studio API key | |
| """ | |
| # Configure Gemini | |
| prompt ="""Please analyze the provided images of the real estate document set and perform the following actions: | |
| 1. *Identify Parties:* Determine and list Seller 1, Seller 2 (if applicable), Buyer 1, and Buyer 2. | |
| 2. *Identify Missing Items:* Locate and list all instances of missing signatures and missing initials for all parties across all documents. | |
| 3. *Identify Checked Boxes:* Locate and list all checkboxes that have been marked or checked. | |
| 4. *Generate Secondary Questions:* For checkboxes that indicate significant waivers (e.g., home warranty, inspection rights, lead paint assessment), specific conditions (e.g., cash sale, contingency status), potential conflicts, or reference other documents, formulate a relevant 'Secondary Question' designed to prompt confirmation or clarification from the user/parties involved. | |
| 5. *Check for Required Paperwork:* Based only on the checkboxes identified in step 3 that explicitly state or strongly imply a specific addendum or disclosure document should be attached (e.g., "Lead Based Paint Disclosure Addendum attached", "See Counter Offer Addendum", "Seller's Disclosure...Addendum attached", "Retainer Addendum attached", etc.), check if a document matching that description appears to be present within the provided image set. Note whether this implied paperwork is 'Found', 'Missing', or 'Potentially Missing/Ambiguous' within the provided images. | |
| 6. *Identify Conflicts:* Specifically look for and note any directly contradictory information or conflicting checked boxes (like the conflicting inspection clauses found previously). | |
| 7. *Provide Location:* For every identified item (missing signature/initial, checked box, required paperwork status, party identification, conflict), specify the approximate line number(s) or clear location on the page (e.g., Bottom Right Initials, Seller Signature Block). | |
| 8. *Format Output:* Present all findings comprehensively in CSV format. The CSV columns should be: | |
| * Category (e.g., Parties, Missing Item, Checked Box, Required Paperwork, Conflict) | |
| * Location (Document Name/Page, e.g., Sale Contract Pg 2) | |
| * Image number (just make image number {} done) | |
| * Item Type (e.g., Seller Initials, Home Warranty Waiver, Lead Paint Addendum Check, Lead Paint Addendum Document) | |
| * Status (e.g., Identified, Missing, Checked, Found, Potentially Missing, Conflict Detected) | |
| * Details (Specifics like names, text of the checkbox, description of the issue or document status) | |
| * Secondary Question (if applicable) (The question generated in step 4) | |
| Please apply this analysis to the entire set of documents provided. | |
| """ | |
| # Convert to images | |
| images = pdf_to_images(pdf_bytes) | |
| # Process each page | |
| combined_df = pd.DataFrame() | |
| for i, img in enumerate(images): | |
| try: | |
| model = genai.GenerativeModel('gemini-2.5-pro-exp-03-25') # Updated model name | |
| local_prompt = prompt.format(i+1) | |
| # Send both the prompt and image to Gemini | |
| response = model.generate_content([local_prompt, img]) | |
| # Extract CSV response | |
| answer_csv = extract_csv_from_response(response) | |
| answer_df = csv_to_dataframe(answer_csv) | |
| # Combine DataFrames if needed | |
| if not answer_df.empty: | |
| combined_df = pd.concat([combined_df, answer_df], ignore_index=True) | |
| print(f"Processed page {i+1}") | |
| print("Response:") | |
| print(answer_csv) | |
| print("\n" + "="*50 + "\n") | |
| except Exception as e: | |
| print(f"Error processing page {i+1}: {str(e)}") | |
| return combined_df | |
| def analyze_single_document(images: list, prompt: str) -> dict: | |
| """Analyze a single document and return results""" | |
| model = genai.GenerativeModel('gemini-2.5-pro-exp-03-25') | |
| response = model.generate_content([prompt] + images) | |
| return response.text | |
| def analyze_pdf_directly(pdf_bytes: bytes, prompt: str, model_name: str = "gemini-1.5-pro"): | |
| """Analyze a PDF directly using Gemini's PDF support""" | |
| model = genai.GenerativeModel(model_name) | |
| # Create a temporary PDF file | |
| with tempfile.NamedTemporaryFile(suffix=".pdf", delete=False) as tmp_file: | |
| tmp_file.write(pdf_bytes) | |
| tmp_file_path = tmp_file.name | |
| try: | |
| # Use the file upload feature | |
| response = model.generate_content( | |
| [prompt, genai.upload_file(tmp_file_path)] | |
| ) | |
| print(f"Response: {response}") | |
| return response.text | |
| finally: | |
| # Clean up temporary file | |
| if os.path.exists(tmp_file_path): | |
| os.unlink(tmp_file_path) | |
| def extract_response_text(response) -> str: | |
| """Extract text content from Gemini response object""" | |
| try: | |
| if hasattr(response, 'text'): | |
| return response.text | |
| elif hasattr(response, 'result') and hasattr(response.result, 'candidates'): | |
| for candidate in response.result.candidates: | |
| if hasattr(candidate, 'content') and hasattr(candidate.content, 'parts'): | |
| for part in candidate.content.parts: | |
| if hasattr(part, 'text'): | |
| return part.text | |
| return str(response) | |
| except Exception as e: | |
| print(f"Error extracting response text: {str(e)}") | |
| return str(response) | |
| def extract_csv_from_response(response) -> str: | |
| """Extract CSV data from Gemini response""" | |
| try: | |
| # Get the text content from the response | |
| response_text = extract_response_text(response) | |
| # Extract CSV content between ```csv markers | |
| csv_match = re.search(r'```csv(.*?)```', response_text, re.DOTALL) | |
| if csv_match: | |
| return csv_match.group(1).strip() | |
| # Fallback: Try to find any CSV-like content | |
| lines = [] | |
| in_csv = False | |
| for line in response_text.split('\n'): | |
| if ',' in line and ('Category,' in line or 'Location,' in line): | |
| in_csv = True | |
| if in_csv: | |
| lines.append(line) | |
| if lines: | |
| return '\n'.join(lines) | |
| return response_text # Return full response if no CSV found | |
| except Exception as e: | |
| print(f"Error extracting CSV: {str(e)}") | |
| return response.text if hasattr(response, 'text') else str(response) | |
| def csv_to_dataframe(csv_data: str) -> pd.DataFrame: | |
| """Convert CSV string to pandas DataFrame with error handling""" | |
| if not csv_data.strip(): | |
| return pd.DataFrame() | |
| try: | |
| # Clean line breaks and extra spaces | |
| cleaned_data = "\n".join([line.strip() for line in csv_data.split('\n') if line.strip()]) | |
| # Use CSV reader to handle irregular fields | |
| rows = [] | |
| reader = csv.reader(io.StringIO(cleaned_data), | |
| delimiter=',', | |
| quotechar='"', | |
| skipinitialspace=True) | |
| header = next(reader) | |
| for row in reader: | |
| if len(row) > len(header): | |
| # Combine extra fields into the last column | |
| row = row[:len(header)-1] + [','.join(row[len(header)-1:])] | |
| rows.append(row) | |
| return pd.DataFrame(rows, columns=header) | |
| except Exception as e: | |
| print(f"CSV conversion error: {str(e)}") | |
| try: | |
| # Fallback to pandas with flexible parsing | |
| return pd.read_csv(io.StringIO(cleaned_data), | |
| on_bad_lines='warn', | |
| engine='python', | |
| quotechar='"', | |
| skipinitialspace=True) | |
| except Exception as fallback_error: | |
| print(f"Fallback conversion failed: {str(fallback_error)}") | |
| return pd.DataFrame() | |
| def save_csv(csv_data: str, filename: str) -> str: | |
| """Save CSV data to file""" | |
| with open(filename, 'w', newline='', encoding='utf-8') as csvfile: | |
| csvfile.write(csv_data.strip()) | |
| return filename | |
| def get_pdf_metadata(pdf_bytes: bytes) -> dict: | |
| """Extract basic PDF metadata""" | |
| reader = PdfReader(io.BytesIO(pdf_bytes)) | |
| return { | |
| 'page_count': len(reader.pages), | |
| 'author': reader.metadata.author if reader.metadata else None, | |
| 'title': reader.metadata.title if reader.metadata else None | |
| } |