import gradio as gr import cv2 import numpy as np from fpdf import FPDF import tempfile import os from paddleocr import PaddleOCR import time from simple_salesforce import Salesforce from datetime import datetime import base64 import io import logging from PIL import Image # Set up logging to debug file writing issues logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Initialize PaddleOCR once with updated parameters ocr_model = PaddleOCR(use_textline_orientation=True, lang='en') def analyze_uv_coverage(img, brightness_threshold=150, kernel_size=5, apply_blur=True, adaptive_thresh=False): gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) if apply_blur: gray = cv2.GaussianBlur(gray, (5, 5), 0) if adaptive_thresh: binary_mask = cv2.adaptiveThreshold( gray, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C, cv2.THRESH_BINARY, 11, 2) else: _, binary_mask = cv2.threshold(gray, brightness_threshold, 255, cv2.THRESH_BINARY) kernel = np.ones((kernel_size, kernel_size), np.uint8) binary_mask = cv2.morphologyEx(binary_mask, cv2.MORPH_OPEN, kernel, iterations=1) binary_mask = cv2.morphologyEx(binary_mask, cv2.MORPH_CLOSE, kernel, iterations=1) total_pixels = binary_mask.size sterilized_pixels = cv2.countNonZero(binary_mask) coverage_percent = (sterilized_pixels / total_pixels) * 100 overlay = img.copy() overlay[binary_mask == 255] = [0, 255, 0] overlay[binary_mask == 0] = [0, 0, 255] annotated_img = cv2.addWeighted(img, 0.6, overlay, 0.4, 0) return annotated_img, coverage_percent def create_pdf_report(coverage_percent, extracted_texts, annotated_image_path, output_path): pdf = FPDF() pdf.add_page() pdf.set_font("Arial", 'B', 16) pdf.cell(200, 10, txt="UV Sterilization Report", ln=True, align='C') pdf.ln(10) pdf.set_font("Arial", size=12) pdf.cell(0, 10, f"Sterilization Coverage: {coverage_percent:.2f}%", ln=True) pdf.ln(5) pdf.cell(0, 10, "Extracted Text from Image (OCR):", ln=True) pdf.set_font("Arial", size=10) if extracted_texts: for text in extracted_texts: if len(text.strip()) > 1: pdf.multi_cell(0, 8, f"- {text}") else: pdf.cell(0, 8, "No text detected.", ln=True) pdf.ln(10) pdf.cell(0, 10, "Annotated Image:", ln=True) pdf.image(annotated_image_path, x=10, y=pdf.get_y(), w=pdf.w - 20) pdf.output(output_path) def upload_image_to_salesforce(image_path, image_name, record_id=None): try: sf = Salesforce( username=os.environ['SF_USERNAME'], password=os.environ['SF_PASSWORD'], security_token=os.environ['SF_SECURITY_TOKEN'], domain=os.environ.get('SF_DOMAIN', 'login') ) with open(image_path, "rb") as f: image_data = f.read() encoded_image_data = base64.b64encode(image_data).decode('utf-8') content_version_data = { "Title": image_name, "PathOnClient": image_name, "VersionData": encoded_image_data, } if record_id: content_version_data["FirstPublishLocationId"] = record_id content_version = sf.ContentVersion.create(content_version_data) content_version_id = content_version["id"] image_url = f"https://{sf.sf_instance}/sfc/servlet.shepherd/version/download/{content_version_id}" return image_url except Exception as e: logger.error(f"Error uploading image to Salesforce: {str(e)}", exc_info=True) raise def upload_image_and_get_url(image_path): from datetime import datetime import uuid unique_filename = f"{uuid.uuid4().hex}_{datetime.utcnow().strftime('%Y%m%d_%H%M%S')}.jpg" return upload_image_to_salesforce(image_path, unique_filename) def save_record_to_salesforce(annotated_image_url, coverage_percent, original_image_pil, compliance_threshold=80): sf = Salesforce( username=os.environ['SF_USERNAME'], password=os.environ['SF_PASSWORD'], security_token=os.environ['SF_SECURITY_TOKEN'], domain=os.environ.get('SF_DOMAIN', 'login') ) with tempfile.NamedTemporaryFile(delete=False, suffix=".jpg") as temp_orig_img_file: original_image_pil.save(temp_orig_img_file.name, format="JPEG") temp_orig_img_path = temp_orig_img_file.name original_image_url = upload_image_and_get_url(temp_orig_img_path) os.unlink(temp_orig_img_path) compliance_status = 'Pass' if coverage_percent >= compliance_threshold else 'Fail' technician_id = os.environ.get('SF_TECHNICIAN_ID') record_name = f"UV Verification - {datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S')}" sf.UV_Verification__c.create({ 'Name': record_name, 'Annotated_Image__c': annotated_image_url, 'Coverage_Percentage__c': round(coverage_percent, 2), 'Original_Image__c': original_image_url, 'Compliance_Status__c': compliance_status, 'Technician_ID__c': technician_id, 'Verified_On__c': datetime.utcnow().isoformat() }) def process_image(input_img, brightness_threshold=150, slider_state=gr.State()): img = cv2.cvtColor(np.array(input_img), cv2.COLOR_RGB2BGR) max_dim = 640 h, w = img.shape[:2] if max(h, w) > max_dim: scale = max_dim / max(h, w) img = cv2.resize(img, (int(w * scale), int(h * scale))) start_time = time.time() ocr_result = ocr_model.ocr(img) extracted_texts = [] for line in ocr_result: if line: for word_info in line: text = word_info[1][0].strip() if len(text) > 1: extracted_texts.append(text) annotated_img, coverage_percent = analyze_uv_coverage(img, brightness_threshold) with tempfile.NamedTemporaryFile(delete=False, suffix=".jpg") as temp_img_file: cv2.imwrite(temp_img_file.name, annotated_img) annotated_img_path = temp_img_file.name temp_pdf_file = tempfile.NamedTemporaryFile(delete=False, suffix=".pdf") temp_pdf_file.close() create_pdf_report(coverage_percent, extracted_texts, annotated_img_path, temp_pdf_file.name) annotated_image_url = upload_image_and_get_url(annotated_img_path) save_record_to_salesforce(annotated_image_url, coverage_percent, input_img) annotated_img_rgb = cv2.cvtColor(annotated_img, cv2.COLOR_BGR2RGB) report_text = f"UV Sterilization Coverage: {coverage_percent:.2f}%" os.unlink(annotated_img_path) slider_state = brightness_threshold return annotated_img_rgb, report_text, temp_pdf_file.name, slider_state with gr.Blocks(title="UV Sterilization Coverage Analyzer") as demo: gr.Markdown("## UV Sterilization Coverage Analyzer") gr.Markdown("Upload a post-UV sterilization image to analyze surface coverage and generate a compliance report.") with gr.Row(): with gr.Column(scale=1): image_input = gr.Image(type="pil", label="Upload Post-UV Sterilization Image") brightness_slider = gr.Slider(50, 255, value=150, step=1, label="Brightness Threshold") with gr.Row(): clear_btn = gr.Button("Clear") submit_btn = gr.Button("Submit", variant="primary") with gr.Column(scale=1): annotated_output = gr.Image(type="numpy", label="Annotated Image") report_text = gr.Textbox(label="UV Sterilization Report", lines=5) pdf_output = gr.File(label="Download PDF Report") def reset_slider(_: Image.Image): return gr.update(value=150) def clear_all(): return None, 150, None, "", None image_input.change(fn=reset_slider, inputs=image_input, outputs=brightness_slider) submit_btn.click(fn=process_image, inputs=[image_input, brightness_slider], outputs=[annotated_output, report_text, pdf_output, brightness_slider]) clear_btn.click(fn=clear_all, inputs=[], outputs=[image_input, brightness_slider, annotated_output, report_text, pdf_output]) demo.queue() demo.launch()