Spaces:
Sleeping
Sleeping
| import os | |
| import re | |
| import json | |
| import tempfile | |
| import gradio as gr | |
| from paddleocr import PaddleOCR | |
| import fitz # PyMuPDF | |
| from simple_salesforce import Salesforce | |
| from dotenv import load_dotenv | |
| import logging | |
| from fastapi import FastAPI, UploadFile, File | |
| from fastapi.responses import JSONResponse | |
| import time | |
| import base64 | |
| from reportlab.lib.pagesizes import letter | |
| from reportlab.pdfgen import canvas | |
| from io import BytesIO | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| load_dotenv() | |
| SF_USERNAME = os.getenv('SF_USERNAME') | |
| SF_PASSWORD = os.getenv('SF_PASSWORD') | |
| SF_SECURITY_TOKEN = os.getenv('SF_SECURITY_TOKEN') | |
| # Initialize PaddleOCR with better parameters | |
| ocr = PaddleOCR(use_angle_cls=True, lang='en', use_gpu=False, det_limit_side_len=2000) | |
| required_values = [ | |
| "Vendor Name", | |
| "Tax Identification Number (TIN)", | |
| "Address", | |
| "Certification Details", | |
| "Contract Terms", | |
| "Payment Terms", | |
| "Signature" | |
| ] | |
| VALID_FLAGS = ['Valid', 'Incomplete', 'Missing', 'Invalid'] | |
| app = FastAPI() | |
| def generate_pdf_from_text(text, vendor_name): | |
| try: | |
| pdf_buffer = BytesIO() | |
| c = canvas.Canvas(pdf_buffer, pagesize=letter) | |
| width, height = letter | |
| text_object = c.beginText(40, height - 40) | |
| for line in text.split('\n'): | |
| text_object.textLine(line) | |
| c.drawText(text_object) | |
| c.showPage() | |
| c.save() | |
| pdf_buffer.seek(0) | |
| return pdf_buffer | |
| except Exception as e: | |
| logger.error(f"Error generating PDF: {e}") | |
| return None | |
| def upload_pdf_to_salesforce(sf, pdf_buffer, vendor_name): | |
| try: | |
| encoded_pdf = base64.b64encode(pdf_buffer.getvalue()).decode('utf-8') | |
| timestamp = int(time.time()) | |
| file_name = f"{vendor_name}_ExtractedText_{timestamp}.pdf" | |
| content_version_data = { | |
| "Title": file_name, | |
| "PathOnClient": file_name, | |
| "VersionData": encoded_pdf | |
| } | |
| content_version = sf.ContentVersion.create(content_version_data) | |
| file_url = f"https://{sf.sf_instance}/sfc/servlet.shepherd/version/download/{content_version['id']}" | |
| return file_url | |
| except Exception as e: | |
| logger.error(f"Error uploading PDF to Salesforce: {e}") | |
| return None | |
| # Updated Vendor Name Extraction Logic with better handling | |
| def extract_vendor_name(text): | |
| print("\n=== OCR Extracted Text Start ===") | |
| print(text) | |
| print("=== OCR Extracted Text End ===\n") | |
| if not text or text.isspace(): | |
| logger.warning("Extracted text is empty or whitespace.") | |
| return "Unknown Vendor" | |
| # Try regex for "Vendor Name: ..." or similar patterns | |
| match = re.search(r"(?i)vendor\s*name\s*[:\-]?\s*(.+?)(?:\n|$)", text) | |
| if match: | |
| vendor_name = match.group(1).strip() | |
| if vendor_name: | |
| return vendor_name | |
| # Fallback: Look for any line that might contain a vendor name | |
| for line in text.splitlines(): | |
| line = line.strip() | |
| if "vendor" in line.lower() and len(line.split()) <= 5 and len(line) > 3: | |
| return line | |
| logger.warning("Could not extract a valid vendor name from the text.") | |
| return "Unknown Vendor" | |
| def analyze_document(document_text): | |
| missing = [] | |
| for value in required_values: | |
| if value.lower() not in document_text.lower(): | |
| missing.append(value) | |
| return missing | |
| def insert_into_salesforce(vendor_name, extracted_text, category, score, comments, flags): | |
| try: | |
| sf = Salesforce(username=SF_USERNAME, password=SF_PASSWORD, security_token=SF_SECURITY_TOKEN) | |
| vendor_name_clean = vendor_name.strip() | |
| # Check if vendor_name_clean is empty or invalid | |
| if not vendor_name_clean or vendor_name_clean.lower() == "unknown vendor": | |
| logger.warning("Vendor name is invalid or empty. Skipping Salesforce query.") | |
| return "Error: Invalid vendor name" | |
| # Escape single quotes in vendor_name_clean to prevent SOQL injection | |
| vendor_name_clean = vendor_name_clean.replace("'", "\\'") | |
| vendor_record = sf.query(f"SELECT Id FROM Vendor__c WHERE Name = '{vendor_name_clean}' LIMIT 1") | |
| if vendor_record['totalSize'] == 0: | |
| logger.warning(f"Vendor '{vendor_name_clean}' not found in Vendor__c object!") | |
| vendor_id = None | |
| else: | |
| vendor_id = vendor_record['records'][0]['Id'] | |
| logger.info(f"Vendor found with ID: {vendor_id}") | |
| pdf_buffer = generate_pdf_from_text(extracted_text, vendor_name_clean) | |
| pdf_url = upload_pdf_to_salesforce(sf, pdf_buffer, vendor_name_clean) if pdf_buffer else None | |
| result = sf.Vendor_Scorecard__c.create({ | |
| 'Vendor_Name__c': vendor_name_clean, | |
| 'Extracted_Text_URL__c': pdf_url or "", | |
| 'Score__c': score, | |
| 'Category_Match__c': category, | |
| 'Comments__c': comments, | |
| 'Flags__c': flags | |
| }) | |
| logger.info(f"Record inserted successfully with ID: {result.get('id')}") | |
| return result | |
| except Exception as e: | |
| logger.error(f"Error inserting into Salesforce: {e}") | |
| return f"Error: {e}" | |
| def process_pdf(pdf_file): | |
| start_time = time.time() | |
| try: | |
| if not pdf_file: | |
| return "No file uploaded", "Error", 0, "Error", "Error" | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=".pdf") as temp_file: | |
| temp_file.write(open(pdf_file.name, 'rb').read()) | |
| temp_file_path = temp_file.name | |
| # Open PDF with PyMuPDF | |
| pdf_doc = fitz.open(temp_file_path) | |
| extracted_text = "" | |
| for page in pdf_doc: | |
| try: | |
| # Increase resolution for better OCR accuracy | |
| pix = page.get_pixmap(matrix=fitz.Matrix(300/72, 300/72), alpha=False) | |
| page_path = tempfile.mktemp(suffix=".png") | |
| pix.save(page_path) | |
| # Run OCR on the image | |
| result = ocr.ocr(page_path) | |
| if result and result[0]: | |
| page_text = "\n".join([line[1][0] for line in result[0]]) | |
| extracted_text += page_text + "\n" | |
| else: | |
| logger.warning(f"No text extracted from page {page.number}.") | |
| except Exception as e: | |
| logger.error(f"Error processing page {page.number}: {e}") | |
| continue | |
| finally: | |
| # Clean up temporary image file | |
| if os.path.exists(page_path): | |
| os.remove(page_path) | |
| # Clean up temporary PDF file | |
| os.remove(temp_file_path) | |
| if not extracted_text.strip(): | |
| logger.error("No text extracted from the PDF.") | |
| return "Error: No text extracted", "Error", 0, "Error", "Error" | |
| vendor_name = extract_vendor_name(extracted_text) | |
| missing = analyze_document(extracted_text) | |
| missing_count = len(missing) | |
| if missing_count == 0: | |
| category, score, comments, flags = 'Compliant', 100, 'All values present.', 'Valid' | |
| elif missing_count == 1: | |
| category, score, comments, flags = 'Partially Compliant', 85, 'One value missing.', 'Incomplete' | |
| elif 1 < missing_count < 3: | |
| category, score, comments, flags = 'Non-Compliant', 60, 'Two values missing.', 'Missing' | |
| else: | |
| category, score, comments, flags = 'Not Applicable', 40, 'Three or more values missing.', 'Invalid' | |
| insert_result = insert_into_salesforce(vendor_name, extracted_text, category, score, comments, flags) | |
| duration = time.time() - start_time | |
| logger.info(f"Processing time: {duration:.2f} seconds") | |
| return extracted_text, category, score, comments, flags | |
| except Exception as e: | |
| logger.error(f"Error processing PDF: {e}") | |
| return f"Error: {e}", "Error", 0, "Error", "Error" | |
| async def process_pdf_api(file: UploadFile = File(...)): | |
| try: | |
| contents = await file.read() | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=".pdf") as temp_file: | |
| temp_file.write(contents) | |
| extracted_text, category, score, comments, flags = process_pdf(temp_file) | |
| return JSONResponse(content={ | |
| "extracted_text": extracted_text, | |
| "category": category, | |
| "score": score, | |
| "comments": comments, | |
| "flags": flags | |
| }) | |
| except Exception as e: | |
| logger.error(f"Error processing the file via API: {e}") | |
| return JSONResponse(content={"error": str(e)}, status_code=500) | |
| def gradio_interface(pdf_file): | |
| return process_pdf(pdf_file) | |
| gr_interface = gr.Interface( | |
| fn=gradio_interface, | |
| inputs=gr.File(label="Upload PDF Document"), | |
| outputs=[ | |
| gr.Textbox(label="Extracted Text"), | |
| gr.Textbox(label="Category Match"), | |
| gr.Number(label="Score"), | |
| gr.Textbox(label="Comments"), | |
| gr.Textbox(label="Flags") | |
| ], | |
| live=True | |
| ) | |
| if __name__ == "__main__": | |
| import threading | |
| def run_gradio(): | |
| gr_interface.launch() | |
| threading.Thread(target=run_gradio).start() | |
| import uvicorn | |
| uvicorn.run(app, host="0.0.0.0", port=8000) |