Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| import cv2 | |
| import pytesseract | |
| import numpy as np | |
| import re | |
| import requests | |
| # Align image using OpenCV | |
| def align_form_from_image(image): | |
| img = cv2.cvtColor(np.array(image), cv2.COLOR_RGB2BGR) | |
| gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) | |
| blurred = cv2.GaussianBlur(gray, (5, 5), 0) | |
| edged = cv2.Canny(blurred, 75, 200) | |
| contours, _ = cv2.findContours(edged.copy(), cv2.RETR_LIST, cv2.CHAIN_APPROX_SIMPLE) | |
| contours = sorted(contours, key=cv2.contourArea, reverse=True)[:5] | |
| doc_cnts = None | |
| for c in contours: | |
| peri = cv2.arcLength(c, True) | |
| approx = cv2.approxPolyDP(c, 0.02 * peri, True) | |
| if len(approx) == 4: | |
| doc_cnts = approx | |
| break | |
| if doc_cnts is not None: | |
| pts = doc_cnts.reshape(4, 2) | |
| rect = order_points(pts) | |
| dst = np.array([[0, 0], [800, 0], [800, 1000], [0, 1000]], dtype="float32") | |
| M = cv2.getPerspectiveTransform(rect, dst) | |
| aligned = cv2.warpPerspective(img, M, (800, 1000)) | |
| else: | |
| aligned = img | |
| rgb = cv2.cvtColor(aligned, cv2.COLOR_BGR2RGB) | |
| text = pytesseract.image_to_string(rgb) | |
| return rgb, text | |
| def order_points(pts): | |
| rect = np.zeros((4, 2), dtype="float32") | |
| s = pts.sum(axis=1) | |
| rect[0] = pts[np.argmin(s)] | |
| rect[2] = pts[np.argmax(s)] | |
| diff = np.diff(pts, axis=1) | |
| rect[1] = pts[np.argmin(diff)] | |
| rect[3] = pts[np.argmax(diff)] | |
| return rect | |
| # Extract fields from driver payout form | |
| def parse_driver_payout_custom(text: str): | |
| def find_checkbox(label_yes, label_no): | |
| yes = re.search(label_yes + r"\s*[:]?[\s\S]{0,20}?β", text) | |
| no = re.search(label_no + r"\s*[:]?[\s\S]{0,20}?β", text) | |
| if yes and not no: | |
| return "Yes" | |
| elif no and not yes: | |
| return "No" | |
| elif yes and no: | |
| return "Both Checked" | |
| return "Unchecked" | |
| data = { | |
| "date": re.search(r"Date[:\s]*([\d/]+)", text), | |
| "time": re.search(r"Time[:\s]*([\d:]+)", text), | |
| "name": re.search(r"Name[:\s]*([A-Za-z ]+)", text), | |
| "email": re.search(r"Email[:\s]*(\S+@\S+)?", text), | |
| "phone": re.search(r"Phone Number[:\s]*(\d{3}[- ]\d{3}[- ]\d{4})", text), | |
| "service_type": None, | |
| "w9_filled_out": find_checkbox("Yes", "No"), | |
| "payment_received": re.search(r"\$\s?([\d.]+)", text), | |
| "payout": "Payout Now" if "Payout Now" in text and "β" in text.split("Payout Now")[1][:10] else ( | |
| "Payout Later" if "Payout Later" in text and "β" in text.split("Payout Later")[1][:10] else None), | |
| "team_member": re.search(r"Team Member's Name[:\s]*([A-Za-z]+)", text), | |
| "uploaded_to_drive": re.search(r"Uploaded to the Drive\?\s*(Yes|No)", text, re.IGNORECASE), | |
| } | |
| for service in ["Taxi", "Limo", "Uber", "Lyft", "Other"]: | |
| if f"{service}" in text and "β" in text.split(service)[1][:10]: | |
| data["service_type"] = service | |
| return {k: v.group(1).strip() if v else v for k, v in data.items()} | |
| # Send to webhook | |
| def send_to_webhook(webhook, *field_values): | |
| data = {f"field_{i}": val for i, val in enumerate(field_values)} | |
| try: | |
| resp = requests.post(webhook, json=data) | |
| return f"β Sent! Status: {resp.status_code}" | |
| except Exception as e: | |
| return f"β Failed: {str(e)}" | |
| # Launch Gradio app | |
| with gr.Blocks() as demo: | |
| gr.Markdown("# π Driver Payout Form OCR β Webhook") | |
| webhook_url = gr.State("https://example.com/webhook") | |
| with gr.Row(): | |
| image_input = gr.Image(type="pil", label="Upload or Take Photo", source="upload") | |
| aligned_image = gr.Image(type="numpy", label="Aligned Image") | |
| form_type = gr.Radio(["Driver Payout"], label="Form Type", value="Driver Payout") | |
| raw_text_output = gr.Textbox(label="OCR Text", lines=8) | |
| parsed_json = gr.JSON(label="Parsed Fields") | |
| editable_fields_group = gr.Group(visible=False) | |
| editable_fields = [] | |
| for i in range(12): # max 12 fields | |
| tb = gr.Textbox(label=f"Field {i+1}") | |
| editable_fields.append(tb) | |
| editable_fields_group.children = editable_fields | |
| status_output = gr.Textbox(label="Webhook Response") | |
| def process_image(image, form_type): | |
| aligned, text = align_form_from_image(image) | |
| parsed = parse_driver_payout_custom(text) | |
| visible = gr.update(visible=True) | |
| values = list(parsed.values()) + [""] * (12 - len(parsed)) | |
| return aligned, text, parsed, visible, values[:12] | |
| process_btn = gr.Button("OCR + Parse") | |
| process_btn.click( | |
| fn=process_image, | |
| inputs=[image_input, form_type], | |
| outputs=[aligned_image, raw_text_output, parsed_json, editable_fields_group] + editable_fields | |
| ) | |
| send_btn = gr.Button("Send to Webhook") | |
| send_btn.click( | |
| fn=send_to_webhook, | |
| inputs=[webhook_url] + editable_fields, | |
| outputs=status_output | |
| ) | |
| with gr.Accordion("Admin Settings", open=False): | |
| webhook_input = gr.Textbox(label="Set Webhook URL") | |
| set_webhook_btn = gr.Button("Save Webhook") | |
| set_webhook_btn.click(lambda url: url, inputs=webhook_input, outputs=webhook_url) | |
| if __name__ == "__main__": | |
| demo.launch() | |