Spaces:
Sleeping
Sleeping
| from pdfminer.high_level import extract_text | |
| from pdf2image import convert_from_path # Convert PDF pages to images | |
| import base64 | |
| import io | |
| import os | |
| from PIL import Image | |
| import json | |
| from openai import OpenAI | |
| from dotenv import load_dotenv | |
| from huggingface_hub import HfApi | |
| import shutil | |
| import gradio as gr | |
| load_dotenv() | |
| client = OpenAI() | |
| # from huggingface_hub import login | |
| # login(token=os.getenv("HF_API_KEY")) | |
| # Function to encode image to Base64 | |
| def encode_image(image_input): | |
| """ | |
| Encode an image to Base64. | |
| Supports both file paths (str) and in-memory PIL images. | |
| """ | |
| if isinstance(image_input, str): # If input is a file path | |
| with open(image_input, "rb") as image_file: | |
| return base64.b64encode(image_file.read()).decode("utf-8") | |
| elif isinstance(image_input, Image.Image): # If input is a PIL image | |
| buffered = io.BytesIO() | |
| image_input.save(buffered, format="JPEG") | |
| return base64.b64encode(buffered.getvalue()).decode("utf-8") | |
| else: | |
| raise ValueError("Unsupported input type. Provide a file path or a PIL image.") | |
| # Function to process image files | |
| def process_image(image_path): | |
| print(f"πΌοΈ Processing image file: {image_path}") | |
| image_base64 = encode_image(image_path) | |
| image_url = f"data:image/jpeg;base64,{image_base64}" | |
| response = client.chat.completions.create( | |
| model="gpt-4o", | |
| messages=[ | |
| { | |
| "role": "user", | |
| "content": [ | |
| {"type": "text", "text": "Extract all text from this image."}, | |
| {"type": "image_url", "image_url": {"url": image_url}}, | |
| ], | |
| } | |
| ], | |
| ) | |
| extracted_text = response.choices[0].message.content.strip() | |
| # print(f"π Extracted text: {extracted_text}") | |
| return extracted_text | |
| # Function to process text-based PDFs | |
| def process_text_pdf(pdf_path): | |
| text_content = extract_text(pdf_path).strip() | |
| if text_content: | |
| print(f"π Extracting text from PDF: {pdf_path}") | |
| return text_content | |
| return None # No text found, fallback to image processing | |
| # Function to process scanned PDFs (image-based) | |
| def process_image_pdf(pdf_path): | |
| print(f"πΌοΈ No text found! Processing as an image-based (scanned) PDF: {pdf_path}") | |
| images = convert_from_path(pdf_path) | |
| extracted_text = [] | |
| for i, image in enumerate(images): | |
| image_text = process_image(image) | |
| extracted_text.append(image_text) | |
| return "\n\n".join(extracted_text) | |
| # Function to detect file type and extract text accordingly | |
| def process_file(file_path): | |
| if not os.path.exists(file_path): | |
| print(f"β Error: File not found: {file_path}") | |
| return None | |
| file_extension = file_path.lower().split(".")[-1] | |
| if file_extension in ["jpg", "jpeg", "png"]: | |
| return process_image(file_path) # Process images | |
| elif file_extension == "pdf": | |
| text_data = process_text_pdf(file_path) | |
| if text_data: # If text extraction succeeds, return it | |
| return text_data | |
| return process_image_pdf(file_path) # Otherwise, process as image | |
| else: | |
| print(f"β Unsupported file type: {file_path}") | |
| return None | |
| def extract_certificate_details(certificate_path): | |
| certificate_text = process_file(certificate_path) | |
| print(f"πΌοΈ Extracting details from certificate: {certificate_path}") | |
| if not certificate_text: | |
| print(f"β Error: Certificate text could not be extracted from {certificate_path}") | |
| return None | |
| # Ask GPT-4o to compare the texts | |
| response = client.chat.completions.create( | |
| model="gpt-4o", | |
| response_format={ "type": "json_object" }, | |
| seed=123, | |
| temperature=0, | |
| messages=[ | |
| { | |
| "role": "developer", | |
| "content": f"""Extract the following details from the certificate text in JSON format, leave blank if not found: | |
| {{ | |
| "Certificate Name": "", | |
| "Certificate ID": "", | |
| "Ship Name": "", | |
| "Date of Issue": "", | |
| "Expiration Date": "" | |
| }} | |
| Certificate Text: | |
| {certificate_text} | |
| """ | |
| } | |
| ], | |
| ) | |
| result = response.choices[0].message.content | |
| result_json = json.loads(result) # Parse the result as JSON | |
| certificate_name = result_json.get("Certificate Name", "") | |
| certificate_id = result_json.get("Certificate ID", "") | |
| ship_name = result_json.get("Ship Name", "") | |
| date_of_issue = result_json.get("Date of Issue", "") | |
| expiration_date = result_json.get("Expiration Date", "") | |
| print(f"β Extracted details:\n- Certificate Name: {certificate_name}\n- Certificate ID: {certificate_id}\n- Ship Name: {ship_name}\n- Date of Issue: {date_of_issue}\n- Expiration Date: {expiration_date}") | |
| return certificate_text, certificate_name, certificate_id, ship_name, date_of_issue, expiration_date | |
| # Function to compare two certificates using AI | |
| def compare_certificates(new_cert_details, old_cert_details): | |
| # Ask GPT-4o to compare the texts | |
| response = client.chat.completions.create( | |
| model="gpt-4o", | |
| messages=[ | |
| { | |
| "role": "user", | |
| "content": f"""Compare the two certificates below and provide a structured summary highlighting key differences in the format below: | |
| ### Comparison Summary: | |
| - Identify differences in terms of: | |
| - Certificate ID | |
| - Date of Issue | |
| - Expiration Date | |
| - Highlight any changes in other key details, if applicable. | |
| ### Take Note: | |
| - Clearly structure the output for easy reading | |
| - Do not include any structural changes in the text, only content changes | |
| ### Old Certificate: | |
| {old_cert_details} | |
| ### New Certificate: | |
| {new_cert_details}""" | |
| } | |
| ], | |
| ) | |
| comparison_result = response.choices[0].message.content.strip() | |
| return comparison_result | |
| def gradio_upload_certificate(uploaded_file): | |
| # Save uploaded file to local path immediately | |
| file_ext = os.path.splitext(uploaded_file.name)[-1] | |
| temp_path = f"temp_uploaded_file{file_ext}" | |
| shutil.copy(uploaded_file, temp_path) | |
| extracted = extract_certificate_details(temp_path) | |
| if not extracted: | |
| return "β Failed to extract certificate details." | |
| certificate_text, certificate_name, certificate_id, ship_name, date_of_issue, expiration_date = extracted | |
| if not all([certificate_name, ship_name]): | |
| return "β Missing key fields, unable to rename or upload." | |
| safe_cert_name = certificate_name.replace(" ", "_") | |
| safe_ship_name = ship_name.replace(" ", "_") | |
| save_dir = os.path.join("hf_dataset_upload", safe_ship_name, safe_cert_name) | |
| os.makedirs(save_dir, exist_ok=True) | |
| # Check for existing certificates in the directory | |
| existing_files = [ | |
| f for f in os.listdir(save_dir) if os.path.isfile(os.path.join(save_dir, f)) | |
| ] | |
| if existing_files: | |
| old_cert_path = os.path.join(save_dir, existing_files[0]) | |
| print(f"π Existing certificate found: {old_cert_path}") | |
| old_text, old_name, old_id, old_ship_name, old_date_of_issue, old_expiration_date = extract_certificate_details(old_cert_path) | |
| if not old_text: | |
| return "β Failed to process the existing certificate for comparison." | |
| new_cert_details = { | |
| "Certificate Name": certificate_name, | |
| "Certificate ID": certificate_id, | |
| "Ship Name": ship_name, | |
| "Date of Issue": date_of_issue, | |
| "Expiration Date": expiration_date, | |
| "Certificate Text": certificate_text | |
| } | |
| old_cert_details = { | |
| "Certificate Name": old_name, | |
| "Certificate ID": old_id, | |
| "Ship Name": old_ship_name, | |
| "Date of Issue": old_date_of_issue, | |
| "Expiration Date": old_expiration_date, | |
| "Certificate Text": old_text | |
| } | |
| # Compare the old and new certificates | |
| comparison_result = compare_certificates(new_cert_details, old_cert_details) | |
| # Always delete the existing file before saving the new one | |
| for existing_file in existing_files: | |
| os.remove(os.path.join(save_dir, existing_file)) | |
| # Remove the file from Hugging Face as well | |
| hf_file_path = f"{safe_ship_name}/{safe_cert_name}/{existing_file}" | |
| api = HfApi(token=os.getenv("HF_API_KEY")) | |
| api.delete_file( | |
| path_in_repo=hf_file_path, | |
| repo_id="MikeMai/Certificates_Management", | |
| repo_type="dataset", | |
| ) | |
| # Replace the existing file with the uploaded file | |
| new_filename = f"{safe_ship_name}_{safe_cert_name}{file_ext}" | |
| new_path = os.path.join(save_dir, new_filename) | |
| shutil.copy(temp_path, new_path) | |
| print(f"β Replaced the existing file with the uploaded file: {new_path}") | |
| api = HfApi(token=os.getenv("HF_API_KEY")) | |
| api.upload_folder( | |
| folder_path="hf_dataset_upload", | |
| repo_id="MikeMai/Certificates_Management", | |
| repo_type="dataset", | |
| ) | |
| hf_path = f"https://huggingface.co/datasets/MikeMai/Certificates_Management/blob/main/{safe_ship_name}/{safe_cert_name}/{new_filename}" | |
| return f""" | |
| β **Certificate Uploaded Successfully! Existing Certificate** | |
| π [View on Hugging Face Hub]({hf_path}) | |
| **New Certificate Details**: | |
| **Certificate Name**: {new_cert_details['Certificate Name']} | |
| **Certificate ID**: {new_cert_details['Certificate ID']} | |
| **Ship Name**: {new_cert_details['Ship Name']} | |
| **Date of Issue**: {new_cert_details['Date of Issue'] or "N/A"} | |
| **Expiration Date**: {new_cert_details['Expiration Date'] or "N/A"} | |
| **Old Certificate Details**: | |
| **Certificate Name**: {old_cert_details['Certificate Name']} | |
| **Certificate ID**: {old_cert_details['Certificate ID']} | |
| **Ship Name**: {old_cert_details['Ship Name']} | |
| **Date of Issue**: {old_cert_details['Date of Issue'] or "N/A"} | |
| **Expiration Date**: {old_cert_details['Expiration Date'] or "N/A"} | |
| {comparison_result} | |
| """ | |
| else: | |
| # Save the new file if it doesn't exist | |
| new_filename = f"{safe_ship_name}_{safe_cert_name}{file_ext}" | |
| new_path = os.path.join(save_dir, new_filename) | |
| shutil.copy(temp_path, new_path) | |
| api = HfApi(token=os.getenv("HF_API_KEY")) | |
| api.upload_folder( | |
| folder_path="hf_dataset_upload", | |
| repo_id="MikeMai/Certificates_Management", | |
| repo_type="dataset", | |
| ) | |
| hf_path = f"https://huggingface.co/datasets/MikeMai/Certificates_Management/blob/main/{safe_ship_name}/{safe_cert_name}/{new_filename}" | |
| return f""" | |
| β **Certificate Uploaded Successfully!** | |
| **Certificate Name**: {certificate_name} | |
| **Certificate ID**: {certificate_id} | |
| **Ship Name**: {ship_name} | |
| **Date of Issue**: {date_of_issue or "N/A"} | |
| **Expiration Date**: {expiration_date or "N/A"} | |
| π [View on Hugging Face Hub]({hf_path}) | |
| """ | |
| # Launch Gradio UI | |
| gr.Interface( | |
| fn=gradio_upload_certificate, | |
| inputs=gr.File(label="Upload Certificate (PDF or Image)"), | |
| outputs=gr.Markdown(label="Upload Result"), | |
| title="π Certificate Manager", | |
| description="Upload a certificate to extract certificate details, rename, and store in respective folders.", | |
| show_progress='full', | |
| allow_flagging="never" | |
| ).launch() | |
| # # Run the script with your files | |
| # old_cert = "load_line_cert_old.jpg" # Change to your old cert file | |
| # new_cert = "load_line_cert_new.pdf" # Change to your new cert file | |
| # extract_certificate_details(new_cert) | |
| # comparison_result = compare_certificates(old_cert, new_cert, True) | |
| # print("\nπ AI-Based Structured Comparison:\n") | |
| # print(comparison_result) | |
| # Gradio Interface ------------------------------ | |
| # import gradio as gr | |
| # from gradio.themes.base import Base | |
| # interface = gr.Interface( | |
| # fn=compare_certificates, | |
| # title="Certificate Comparison Summarizer", | |
| # inputs=[gr.File(label="Old Certificate"), gr.File(label="New Certificate")], | |
| # outputs=[gr.Textbox(label="Comparison Summary")], | |
| # allow_flagging="never", | |
| # theme=Base() | |
| # ) | |
| # interface.launch() |