from pdfminer.high_level import extract_text from pdf2image import convert_from_path # Convert PDF pages to images import base64 import io import os from PIL import Image import json from openai import OpenAI from dotenv import load_dotenv from huggingface_hub import HfApi import shutil import gradio as gr load_dotenv() client = OpenAI() # from huggingface_hub import login # login(token=os.getenv("HF_API_KEY")) # Function to encode image to Base64 def encode_image(image_input): """ Encode an image to Base64. Supports both file paths (str) and in-memory PIL images. """ if isinstance(image_input, str): # If input is a file path with open(image_input, "rb") as image_file: return base64.b64encode(image_file.read()).decode("utf-8") elif isinstance(image_input, Image.Image): # If input is a PIL image buffered = io.BytesIO() image_input.save(buffered, format="JPEG") return base64.b64encode(buffered.getvalue()).decode("utf-8") else: raise ValueError("Unsupported input type. Provide a file path or a PIL image.") # Function to process image files def process_image(image_path): print(f"šŸ–¼ļø Processing image file: {image_path}") image_base64 = encode_image(image_path) image_url = f"data:image/jpeg;base64,{image_base64}" response = client.chat.completions.create( model="gpt-4o", messages=[ { "role": "user", "content": [ {"type": "text", "text": "Extract all text from this image."}, {"type": "image_url", "image_url": {"url": image_url}}, ], } ], ) extracted_text = response.choices[0].message.content.strip() # print(f"šŸ“ Extracted text: {extracted_text}") return extracted_text # Function to process text-based PDFs def process_text_pdf(pdf_path): text_content = extract_text(pdf_path).strip() if text_content: print(f"šŸ“„ Extracting text from PDF: {pdf_path}") return text_content return None # No text found, fallback to image processing # Function to process scanned PDFs (image-based) def process_image_pdf(pdf_path): print(f"šŸ–¼ļø No text found! Processing as an image-based (scanned) PDF: {pdf_path}") images = convert_from_path(pdf_path) extracted_text = [] for i, image in enumerate(images): image_text = process_image(image) extracted_text.append(image_text) return "\n\n".join(extracted_text) # Function to detect file type and extract text accordingly def process_file(file_path): if not os.path.exists(file_path): print(f"āŒ Error: File not found: {file_path}") return None file_extension = file_path.lower().split(".")[-1] if file_extension in ["jpg", "jpeg", "png"]: return process_image(file_path) # Process images elif file_extension == "pdf": text_data = process_text_pdf(file_path) if text_data: # If text extraction succeeds, return it return text_data return process_image_pdf(file_path) # Otherwise, process as image else: print(f"āŒ Unsupported file type: {file_path}") return None def extract_certificate_details(certificate_path): certificate_text = process_file(certificate_path) print(f"šŸ–¼ļø Extracting details from certificate: {certificate_path}") if not certificate_text: print(f"āŒ Error: Certificate text could not be extracted from {certificate_path}") return None # Ask GPT-4o to compare the texts response = client.chat.completions.create( model="gpt-4o", response_format={ "type": "json_object" }, seed=123, temperature=0, messages=[ { "role": "developer", "content": f"""Extract the following details from the certificate text in JSON format, leave blank if not found: {{ "Certificate Name": "", "Certificate ID": "", "Ship Name": "", "Date of Issue": "", "Expiration Date": "" }} Certificate Text: {certificate_text} """ } ], ) result = response.choices[0].message.content result_json = json.loads(result) # Parse the result as JSON certificate_name = result_json.get("Certificate Name", "") certificate_id = result_json.get("Certificate ID", "") ship_name = result_json.get("Ship Name", "") date_of_issue = result_json.get("Date of Issue", "") expiration_date = result_json.get("Expiration Date", "") print(f"āœ… Extracted details:\n- Certificate Name: {certificate_name}\n- Certificate ID: {certificate_id}\n- Ship Name: {ship_name}\n- Date of Issue: {date_of_issue}\n- Expiration Date: {expiration_date}") return certificate_text, certificate_name, certificate_id, ship_name, date_of_issue, expiration_date # Function to compare two certificates using AI def compare_certificates(new_cert_details, old_cert_details): # Ask GPT-4o to compare the texts response = client.chat.completions.create( model="gpt-4o", messages=[ { "role": "user", "content": f"""Compare the two certificates below and provide a structured summary highlighting key differences in the format below: ### Comparison Summary: - Identify differences in terms of: - Certificate ID - Date of Issue - Expiration Date - Highlight any changes in other key details, if applicable. ### Take Note: - Clearly structure the output for easy reading - Do not include any structural changes in the text, only content changes ### Old Certificate: {old_cert_details} ### New Certificate: {new_cert_details}""" } ], ) comparison_result = response.choices[0].message.content.strip() return comparison_result def gradio_upload_certificate(uploaded_file): # Save uploaded file to local path immediately file_ext = os.path.splitext(uploaded_file.name)[-1] temp_path = f"temp_uploaded_file{file_ext}" shutil.copy(uploaded_file, temp_path) extracted = extract_certificate_details(temp_path) if not extracted: return "āŒ Failed to extract certificate details." certificate_text, certificate_name, certificate_id, ship_name, date_of_issue, expiration_date = extracted if not all([certificate_name, ship_name]): return "āŒ Missing key fields, unable to rename or upload." safe_cert_name = certificate_name.replace(" ", "_") safe_ship_name = ship_name.replace(" ", "_") save_dir = os.path.join("hf_dataset_upload", safe_ship_name, safe_cert_name) os.makedirs(save_dir, exist_ok=True) # Check for existing certificates in the directory existing_files = [ f for f in os.listdir(save_dir) if os.path.isfile(os.path.join(save_dir, f)) ] if existing_files: old_cert_path = os.path.join(save_dir, existing_files[0]) print(f"šŸ“‚ Existing certificate found: {old_cert_path}") old_text, old_name, old_id, old_ship_name, old_date_of_issue, old_expiration_date = extract_certificate_details(old_cert_path) if not old_text: return "āŒ Failed to process the existing certificate for comparison." new_cert_details = { "Certificate Name": certificate_name, "Certificate ID": certificate_id, "Ship Name": ship_name, "Date of Issue": date_of_issue, "Expiration Date": expiration_date, "Certificate Text": certificate_text } old_cert_details = { "Certificate Name": old_name, "Certificate ID": old_id, "Ship Name": old_ship_name, "Date of Issue": old_date_of_issue, "Expiration Date": old_expiration_date, "Certificate Text": old_text } # Compare the old and new certificates comparison_result = compare_certificates(new_cert_details, old_cert_details) # Always delete the existing file before saving the new one for existing_file in existing_files: os.remove(os.path.join(save_dir, existing_file)) # Remove the file from Hugging Face as well hf_file_path = f"{safe_ship_name}/{safe_cert_name}/{existing_file}" api = HfApi(token=os.getenv("HF_API_KEY")) api.delete_file( path_in_repo=hf_file_path, repo_id="MikeMai/Certificates_Management", repo_type="dataset", ) # Replace the existing file with the uploaded file new_filename = f"{safe_ship_name}_{safe_cert_name}{file_ext}" new_path = os.path.join(save_dir, new_filename) shutil.copy(temp_path, new_path) print(f"āœ… Replaced the existing file with the uploaded file: {new_path}") api = HfApi(token=os.getenv("HF_API_KEY")) api.upload_folder( folder_path="hf_dataset_upload", repo_id="MikeMai/Certificates_Management", repo_type="dataset", ) hf_path = f"https://huggingface.co/datasets/MikeMai/Certificates_Management/blob/main/{safe_ship_name}/{safe_cert_name}/{new_filename}" return f""" āœ… **Certificate Uploaded Successfully! Existing Certificate** šŸ”— [View on Hugging Face Hub]({hf_path}) **New Certificate Details**: **Certificate Name**: {new_cert_details['Certificate Name']} **Certificate ID**: {new_cert_details['Certificate ID']} **Ship Name**: {new_cert_details['Ship Name']} **Date of Issue**: {new_cert_details['Date of Issue'] or "N/A"} **Expiration Date**: {new_cert_details['Expiration Date'] or "N/A"} **Old Certificate Details**: **Certificate Name**: {old_cert_details['Certificate Name']} **Certificate ID**: {old_cert_details['Certificate ID']} **Ship Name**: {old_cert_details['Ship Name']} **Date of Issue**: {old_cert_details['Date of Issue'] or "N/A"} **Expiration Date**: {old_cert_details['Expiration Date'] or "N/A"} {comparison_result} """ else: # Save the new file if it doesn't exist new_filename = f"{safe_ship_name}_{safe_cert_name}{file_ext}" new_path = os.path.join(save_dir, new_filename) shutil.copy(temp_path, new_path) api = HfApi(token=os.getenv("HF_API_KEY")) api.upload_folder( folder_path="hf_dataset_upload", repo_id="MikeMai/Certificates_Management", repo_type="dataset", ) hf_path = f"https://huggingface.co/datasets/MikeMai/Certificates_Management/blob/main/{safe_ship_name}/{safe_cert_name}/{new_filename}" return f""" āœ… **Certificate Uploaded Successfully!** **Certificate Name**: {certificate_name} **Certificate ID**: {certificate_id} **Ship Name**: {ship_name} **Date of Issue**: {date_of_issue or "N/A"} **Expiration Date**: {expiration_date or "N/A"} šŸ”— [View on Hugging Face Hub]({hf_path}) """ # Launch Gradio UI gr.Interface( fn=gradio_upload_certificate, inputs=gr.File(label="Upload Certificate (PDF or Image)"), outputs=gr.Markdown(label="Upload Result"), title="šŸ“œ Certificate Manager", description="Upload a certificate to extract certificate details, rename, and store in respective folders.", show_progress='full', allow_flagging="never" ).launch() # # Run the script with your files # old_cert = "load_line_cert_old.jpg" # Change to your old cert file # new_cert = "load_line_cert_new.pdf" # Change to your new cert file # extract_certificate_details(new_cert) # comparison_result = compare_certificates(old_cert, new_cert, True) # print("\nšŸ”Ž AI-Based Structured Comparison:\n") # print(comparison_result) # Gradio Interface ------------------------------ # import gradio as gr # from gradio.themes.base import Base # interface = gr.Interface( # fn=compare_certificates, # title="Certificate Comparison Summarizer", # inputs=[gr.File(label="Old Certificate"), gr.File(label="New Certificate")], # outputs=[gr.Textbox(label="Comparison Summary")], # allow_flagging="never", # theme=Base() # ) # interface.launch()