Smartlab_tech / app.py
pavansuresh's picture
Update app.py
26c0681 verified
import gradio as gr
import cv2
import numpy as np
from fpdf import FPDF
import tempfile
import os
from paddleocr import PaddleOCR
import time
from simple_salesforce import Salesforce
from datetime import datetime
import base64
import io
import logging
from PIL import Image
# Set up logging to debug file writing issues
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Initialize PaddleOCR once with updated parameters
ocr_model = PaddleOCR(use_textline_orientation=True, lang='en')
def analyze_uv_coverage(img, brightness_threshold=150, kernel_size=5, apply_blur=True, adaptive_thresh=False):
gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
if apply_blur:
gray = cv2.GaussianBlur(gray, (5, 5), 0)
if adaptive_thresh:
binary_mask = cv2.adaptiveThreshold(
gray, 255,
cv2.ADAPTIVE_THRESH_GAUSSIAN_C,
cv2.THRESH_BINARY,
11, 2)
else:
_, binary_mask = cv2.threshold(gray, brightness_threshold, 255, cv2.THRESH_BINARY)
kernel = np.ones((kernel_size, kernel_size), np.uint8)
binary_mask = cv2.morphologyEx(binary_mask, cv2.MORPH_OPEN, kernel, iterations=1)
binary_mask = cv2.morphologyEx(binary_mask, cv2.MORPH_CLOSE, kernel, iterations=1)
total_pixels = binary_mask.size
sterilized_pixels = cv2.countNonZero(binary_mask)
coverage_percent = (sterilized_pixels / total_pixels) * 100
overlay = img.copy()
overlay[binary_mask == 255] = [0, 255, 0]
overlay[binary_mask == 0] = [0, 0, 255]
annotated_img = cv2.addWeighted(img, 0.6, overlay, 0.4, 0)
return annotated_img, coverage_percent
def create_pdf_report(coverage_percent, extracted_texts, annotated_image_path, output_path):
pdf = FPDF()
pdf.add_page()
pdf.set_font("Arial", 'B', 16)
pdf.cell(200, 10, txt="UV Sterilization Report", ln=True, align='C')
pdf.ln(10)
pdf.set_font("Arial", size=12)
pdf.cell(0, 10, f"Sterilization Coverage: {coverage_percent:.2f}%", ln=True)
pdf.ln(5)
pdf.cell(0, 10, "Extracted Text from Image (OCR):", ln=True)
pdf.set_font("Arial", size=10)
if extracted_texts:
for text in extracted_texts:
if len(text.strip()) > 1:
pdf.multi_cell(0, 8, f"- {text}")
else:
pdf.cell(0, 8, "No text detected.", ln=True)
pdf.ln(10)
pdf.cell(0, 10, "Annotated Image:", ln=True)
pdf.image(annotated_image_path, x=10, y=pdf.get_y(), w=pdf.w - 20)
pdf.output(output_path)
def upload_image_to_salesforce(image_path, image_name, record_id=None):
try:
sf = Salesforce(
username=os.environ['SF_USERNAME'],
password=os.environ['SF_PASSWORD'],
security_token=os.environ['SF_SECURITY_TOKEN'],
domain=os.environ.get('SF_DOMAIN', 'login')
)
with open(image_path, "rb") as f:
image_data = f.read()
encoded_image_data = base64.b64encode(image_data).decode('utf-8')
content_version_data = {
"Title": image_name,
"PathOnClient": image_name,
"VersionData": encoded_image_data,
}
if record_id:
content_version_data["FirstPublishLocationId"] = record_id
content_version = sf.ContentVersion.create(content_version_data)
content_version_id = content_version["id"]
image_url = f"https://{sf.sf_instance}/sfc/servlet.shepherd/version/download/{content_version_id}"
return image_url
except Exception as e:
logger.error(f"Error uploading image to Salesforce: {str(e)}", exc_info=True)
raise
def upload_image_and_get_url(image_path):
from datetime import datetime
import uuid
unique_filename = f"{uuid.uuid4().hex}_{datetime.utcnow().strftime('%Y%m%d_%H%M%S')}.jpg"
return upload_image_to_salesforce(image_path, unique_filename)
def save_record_to_salesforce(annotated_image_url, coverage_percent, original_image_pil, compliance_threshold=80):
sf = Salesforce(
username=os.environ['SF_USERNAME'],
password=os.environ['SF_PASSWORD'],
security_token=os.environ['SF_SECURITY_TOKEN'],
domain=os.environ.get('SF_DOMAIN', 'login')
)
with tempfile.NamedTemporaryFile(delete=False, suffix=".jpg") as temp_orig_img_file:
original_image_pil.save(temp_orig_img_file.name, format="JPEG")
temp_orig_img_path = temp_orig_img_file.name
original_image_url = upload_image_and_get_url(temp_orig_img_path)
os.unlink(temp_orig_img_path)
compliance_status = 'Pass' if coverage_percent >= compliance_threshold else 'Fail'
technician_id = os.environ.get('SF_TECHNICIAN_ID')
record_name = f"UV Verification - {datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S')}"
sf.UV_Verification__c.create({
'Name': record_name,
'Annotated_Image__c': annotated_image_url,
'Coverage_Percentage__c': round(coverage_percent, 2),
'Original_Image__c': original_image_url,
'Compliance_Status__c': compliance_status,
'Technician_ID__c': technician_id,
'Verified_On__c': datetime.utcnow().isoformat()
})
def process_image(input_img, brightness_threshold=150, slider_state=gr.State()):
img = cv2.cvtColor(np.array(input_img), cv2.COLOR_RGB2BGR)
max_dim = 640
h, w = img.shape[:2]
if max(h, w) > max_dim:
scale = max_dim / max(h, w)
img = cv2.resize(img, (int(w * scale), int(h * scale)))
start_time = time.time()
ocr_result = ocr_model.ocr(img)
extracted_texts = []
for line in ocr_result:
if line:
for word_info in line:
text = word_info[1][0].strip()
if len(text) > 1:
extracted_texts.append(text)
annotated_img, coverage_percent = analyze_uv_coverage(img, brightness_threshold)
with tempfile.NamedTemporaryFile(delete=False, suffix=".jpg") as temp_img_file:
cv2.imwrite(temp_img_file.name, annotated_img)
annotated_img_path = temp_img_file.name
temp_pdf_file = tempfile.NamedTemporaryFile(delete=False, suffix=".pdf")
temp_pdf_file.close()
create_pdf_report(coverage_percent, extracted_texts, annotated_img_path, temp_pdf_file.name)
annotated_image_url = upload_image_and_get_url(annotated_img_path)
save_record_to_salesforce(annotated_image_url, coverage_percent, input_img)
annotated_img_rgb = cv2.cvtColor(annotated_img, cv2.COLOR_BGR2RGB)
report_text = f"UV Sterilization Coverage: {coverage_percent:.2f}%"
os.unlink(annotated_img_path)
slider_state = brightness_threshold
return annotated_img_rgb, report_text, temp_pdf_file.name, slider_state
with gr.Blocks(title="UV Sterilization Coverage Analyzer") as demo:
gr.Markdown("## UV Sterilization Coverage Analyzer")
gr.Markdown("Upload a post-UV sterilization image to analyze surface coverage and generate a compliance report.")
with gr.Row():
with gr.Column(scale=1):
image_input = gr.Image(type="pil", label="Upload Post-UV Sterilization Image")
brightness_slider = gr.Slider(50, 255, value=150, step=1, label="Brightness Threshold")
with gr.Row():
clear_btn = gr.Button("Clear")
submit_btn = gr.Button("Submit", variant="primary")
with gr.Column(scale=1):
annotated_output = gr.Image(type="numpy", label="Annotated Image")
report_text = gr.Textbox(label="UV Sterilization Report", lines=5)
pdf_output = gr.File(label="Download PDF Report")
def reset_slider(_: Image.Image):
return gr.update(value=150)
def clear_all():
return None, 150, None, "", None
image_input.change(fn=reset_slider, inputs=image_input, outputs=brightness_slider)
submit_btn.click(fn=process_image,
inputs=[image_input, brightness_slider],
outputs=[annotated_output, report_text, pdf_output, brightness_slider])
clear_btn.click(fn=clear_all,
inputs=[],
outputs=[image_input, brightness_slider, annotated_output, report_text, pdf_output])
demo.queue()
demo.launch()