Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| import cv2 | |
| import numpy as np | |
| from fpdf import FPDF | |
| import tempfile | |
| import os | |
| from paddleocr import PaddleOCR | |
| import time | |
| from simple_salesforce import Salesforce | |
| from datetime import datetime | |
| import base64 | |
| import io | |
| import logging | |
| from PIL import Image | |
| # Set up logging to debug file writing issues | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| # Initialize PaddleOCR once with updated parameters | |
| ocr_model = PaddleOCR(use_textline_orientation=True, lang='en') | |
| def analyze_uv_coverage(img, brightness_threshold=150, kernel_size=5, apply_blur=True, adaptive_thresh=False): | |
| gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) | |
| if apply_blur: | |
| gray = cv2.GaussianBlur(gray, (5, 5), 0) | |
| if adaptive_thresh: | |
| binary_mask = cv2.adaptiveThreshold( | |
| gray, 255, | |
| cv2.ADAPTIVE_THRESH_GAUSSIAN_C, | |
| cv2.THRESH_BINARY, | |
| 11, 2) | |
| else: | |
| _, binary_mask = cv2.threshold(gray, brightness_threshold, 255, cv2.THRESH_BINARY) | |
| kernel = np.ones((kernel_size, kernel_size), np.uint8) | |
| binary_mask = cv2.morphologyEx(binary_mask, cv2.MORPH_OPEN, kernel, iterations=1) | |
| binary_mask = cv2.morphologyEx(binary_mask, cv2.MORPH_CLOSE, kernel, iterations=1) | |
| total_pixels = binary_mask.size | |
| sterilized_pixels = cv2.countNonZero(binary_mask) | |
| coverage_percent = (sterilized_pixels / total_pixels) * 100 | |
| overlay = img.copy() | |
| overlay[binary_mask == 255] = [0, 255, 0] | |
| overlay[binary_mask == 0] = [0, 0, 255] | |
| annotated_img = cv2.addWeighted(img, 0.6, overlay, 0.4, 0) | |
| return annotated_img, coverage_percent | |
| def create_pdf_report(coverage_percent, extracted_texts, annotated_image_path, output_path): | |
| pdf = FPDF() | |
| pdf.add_page() | |
| pdf.set_font("Arial", 'B', 16) | |
| pdf.cell(200, 10, txt="UV Sterilization Report", ln=True, align='C') | |
| pdf.ln(10) | |
| pdf.set_font("Arial", size=12) | |
| pdf.cell(0, 10, f"Sterilization Coverage: {coverage_percent:.2f}%", ln=True) | |
| pdf.ln(5) | |
| pdf.cell(0, 10, "Extracted Text from Image (OCR):", ln=True) | |
| pdf.set_font("Arial", size=10) | |
| if extracted_texts: | |
| for text in extracted_texts: | |
| if len(text.strip()) > 1: | |
| pdf.multi_cell(0, 8, f"- {text}") | |
| else: | |
| pdf.cell(0, 8, "No text detected.", ln=True) | |
| pdf.ln(10) | |
| pdf.cell(0, 10, "Annotated Image:", ln=True) | |
| pdf.image(annotated_image_path, x=10, y=pdf.get_y(), w=pdf.w - 20) | |
| pdf.output(output_path) | |
| def upload_image_to_salesforce(image_path, image_name, record_id=None): | |
| try: | |
| sf = Salesforce( | |
| username=os.environ['SF_USERNAME'], | |
| password=os.environ['SF_PASSWORD'], | |
| security_token=os.environ['SF_SECURITY_TOKEN'], | |
| domain=os.environ.get('SF_DOMAIN', 'login') | |
| ) | |
| with open(image_path, "rb") as f: | |
| image_data = f.read() | |
| encoded_image_data = base64.b64encode(image_data).decode('utf-8') | |
| content_version_data = { | |
| "Title": image_name, | |
| "PathOnClient": image_name, | |
| "VersionData": encoded_image_data, | |
| } | |
| if record_id: | |
| content_version_data["FirstPublishLocationId"] = record_id | |
| content_version = sf.ContentVersion.create(content_version_data) | |
| content_version_id = content_version["id"] | |
| image_url = f"https://{sf.sf_instance}/sfc/servlet.shepherd/version/download/{content_version_id}" | |
| return image_url | |
| except Exception as e: | |
| logger.error(f"Error uploading image to Salesforce: {str(e)}", exc_info=True) | |
| raise | |
| def upload_image_and_get_url(image_path): | |
| from datetime import datetime | |
| import uuid | |
| unique_filename = f"{uuid.uuid4().hex}_{datetime.utcnow().strftime('%Y%m%d_%H%M%S')}.jpg" | |
| return upload_image_to_salesforce(image_path, unique_filename) | |
| def save_record_to_salesforce(annotated_image_url, coverage_percent, original_image_pil, compliance_threshold=80): | |
| sf = Salesforce( | |
| username=os.environ['SF_USERNAME'], | |
| password=os.environ['SF_PASSWORD'], | |
| security_token=os.environ['SF_SECURITY_TOKEN'], | |
| domain=os.environ.get('SF_DOMAIN', 'login') | |
| ) | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=".jpg") as temp_orig_img_file: | |
| original_image_pil.save(temp_orig_img_file.name, format="JPEG") | |
| temp_orig_img_path = temp_orig_img_file.name | |
| original_image_url = upload_image_and_get_url(temp_orig_img_path) | |
| os.unlink(temp_orig_img_path) | |
| compliance_status = 'Pass' if coverage_percent >= compliance_threshold else 'Fail' | |
| technician_id = os.environ.get('SF_TECHNICIAN_ID') | |
| record_name = f"UV Verification - {datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S')}" | |
| sf.UV_Verification__c.create({ | |
| 'Name': record_name, | |
| 'Annotated_Image__c': annotated_image_url, | |
| 'Coverage_Percentage__c': round(coverage_percent, 2), | |
| 'Original_Image__c': original_image_url, | |
| 'Compliance_Status__c': compliance_status, | |
| 'Technician_ID__c': technician_id, | |
| 'Verified_On__c': datetime.utcnow().isoformat() | |
| }) | |
| def process_image(input_img, brightness_threshold=150, slider_state=gr.State()): | |
| img = cv2.cvtColor(np.array(input_img), cv2.COLOR_RGB2BGR) | |
| max_dim = 640 | |
| h, w = img.shape[:2] | |
| if max(h, w) > max_dim: | |
| scale = max_dim / max(h, w) | |
| img = cv2.resize(img, (int(w * scale), int(h * scale))) | |
| start_time = time.time() | |
| ocr_result = ocr_model.ocr(img) | |
| extracted_texts = [] | |
| for line in ocr_result: | |
| if line: | |
| for word_info in line: | |
| text = word_info[1][0].strip() | |
| if len(text) > 1: | |
| extracted_texts.append(text) | |
| annotated_img, coverage_percent = analyze_uv_coverage(img, brightness_threshold) | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=".jpg") as temp_img_file: | |
| cv2.imwrite(temp_img_file.name, annotated_img) | |
| annotated_img_path = temp_img_file.name | |
| temp_pdf_file = tempfile.NamedTemporaryFile(delete=False, suffix=".pdf") | |
| temp_pdf_file.close() | |
| create_pdf_report(coverage_percent, extracted_texts, annotated_img_path, temp_pdf_file.name) | |
| annotated_image_url = upload_image_and_get_url(annotated_img_path) | |
| save_record_to_salesforce(annotated_image_url, coverage_percent, input_img) | |
| annotated_img_rgb = cv2.cvtColor(annotated_img, cv2.COLOR_BGR2RGB) | |
| report_text = f"UV Sterilization Coverage: {coverage_percent:.2f}%" | |
| os.unlink(annotated_img_path) | |
| slider_state = brightness_threshold | |
| return annotated_img_rgb, report_text, temp_pdf_file.name, slider_state | |
| with gr.Blocks(title="UV Sterilization Coverage Analyzer") as demo: | |
| gr.Markdown("## UV Sterilization Coverage Analyzer") | |
| gr.Markdown("Upload a post-UV sterilization image to analyze surface coverage and generate a compliance report.") | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| image_input = gr.Image(type="pil", label="Upload Post-UV Sterilization Image") | |
| brightness_slider = gr.Slider(50, 255, value=150, step=1, label="Brightness Threshold") | |
| with gr.Row(): | |
| clear_btn = gr.Button("Clear") | |
| submit_btn = gr.Button("Submit", variant="primary") | |
| with gr.Column(scale=1): | |
| annotated_output = gr.Image(type="numpy", label="Annotated Image") | |
| report_text = gr.Textbox(label="UV Sterilization Report", lines=5) | |
| pdf_output = gr.File(label="Download PDF Report") | |
| def reset_slider(_: Image.Image): | |
| return gr.update(value=150) | |
| def clear_all(): | |
| return None, 150, None, "", None | |
| image_input.change(fn=reset_slider, inputs=image_input, outputs=brightness_slider) | |
| submit_btn.click(fn=process_image, | |
| inputs=[image_input, brightness_slider], | |
| outputs=[annotated_output, report_text, pdf_output, brightness_slider]) | |
| clear_btn.click(fn=clear_all, | |
| inputs=[], | |
| outputs=[image_input, brightness_slider, annotated_output, report_text, pdf_output]) | |
| demo.queue() | |
| demo.launch() |