Spaces:
Sleeping
Sleeping
| import openai | |
| from dotenv import load_dotenv | |
| from io import BytesIO | |
| import os, uuid | |
| from PIL import Image | |
| import base64 | |
| import json | |
| from models import ReceiptData, ChildFeeForm | |
| from form_fill import fill_child_fee_pdf, fill_medical_pdf | |
| from fraud import process_receipt | |
| from datetime import datetime | |
| import html | |
| from typing import List | |
| load_dotenv() | |
| openai.api_key = os.getenv("OPENAI_API_KEY", "").strip() | |
| reciept_system_prompt = ( | |
| "You are an expert at extracting data from receipts. " | |
| "Read the provided image of a receipt and return a JSON object that matches the following Pydantic model:\n" | |
| "from typing import List, Optional\n" | |
| "class ReceiptItem(BaseModel):\n" | |
| " description: str\n" | |
| " amount: float\n\n" | |
| "class FraudData(BaseModel):\n" | |
| " fraud_detected: bool # either True or False\n" | |
| " fraud_type: Optional[str] = None # Type of fraud if detected, e.g., \"duplicate\", \"suspicious\" \n\n" | |
| "class ReceiptData(BaseModel):\n" | |
| " fraud_check: Optional[List[FraudData]] = [] # Optional field for fraud detection, always set to empty list\n" | |
| " merchant: str #Only extract the brand name, not the branch name - Only the brand\n" | |
| " date: str\n" | |
| " total_amount: float\n #Try your hardest to find the accurate total amount\n" | |
| " items: Optional[List[ReceiptItem]] = None\n" | |
| "- Extract only the above given information.\n" | |
| "- If a value is missing, set it to null, \"\", or an empty list as appropriate.\n" | |
| "- For the items field, provide a list of objects with description and amount.\n" | |
| "- For fraud_check, always set to an empty list [].\n" | |
| "- Only return a valid JSON object matching the model above.\n" | |
| "- Do not add any explanation or extra text—only the JSON." | |
| ) | |
| fee_bill_system_prompt = ( | |
| "You are an expert at extracting data from fee bills. " | |
| "Read the provided image of a fee bill and return a JSON object that matches the following Pydantic model:\n" | |
| "from typing import List, Optional\n" | |
| "class FeeItem(BaseModel):\n" | |
| " bill_date: Optional[str] = None # Bill Date Field, leave null if not found\n" | |
| " description: str\n" | |
| " amount: float\n\n" | |
| " bill_month: Optional[str] = None # Bill Month Field, leave null if not found\n" | |
| "class FeeBillData(BaseModel):\n" | |
| " items: List[FeeItem]\n" | |
| " total: float\n" | |
| "- Extract only the above given information to the best of your ability\n" | |
| "- If a value is missing, set it to null, \"\", or an empty list as appropriate.\n" | |
| "- For the items field, provide a list of objects with date, description, and amount.\n" | |
| "- The total field must be the sum of all amount values in items.\n" | |
| "- Only return a valid JSON object matching the model above.\n" | |
| "- Do not add any explanation or extra text—only the JSON." | |
| ) | |
| medical_form_system_prompt = ( | |
| "You are an expert at extracting structured data from tabular forms containing sample data. " | |
| "Your task is to read the provided form and return a JSON object that matches the following Pydantic model:\n" | |
| "class Item(BaseModel):\n" | |
| " name: str #the patient name\n" | |
| " relationship: # self, spouse, parent, child\n" | |
| " category: # in-patient, out-patient, maternity(cesarean), maternity(normal)\n" | |
| " detail: # doctor's fee, diagnostic tests, medicines, other hospitalization - only chose from these options, infer from the image which one it is\n" | |
| " bill_month: Optional[str] = None # Bill Month Field, if not directly stated, find the date and infer the month from that, format should be month - year (mm/yy), if not found return null\n" | |
| " amount: float - try your best to extract the exact amount present in the image, sometimes there will be discounts applied, look for the total amount paid\n" | |
| "class Form(BaseModel):\n" | |
| " claims: List[Item]\n" | |
| " total: float\n" | |
| "- Extract only the above information. If a value is missing, set it to null, \"\", or an empty list as appropriate.\n" | |
| "- For the claims field, provide a list of objects with name, relationship, category, detail, and amount.\n" | |
| "- The total field must be the sum of all amount values in claims.\n" | |
| "- Only return a valid JSON object matching the model above.\n" | |
| "- Do not add any explanation or extra text—only the JSON." | |
| "- Try your very best to extract this information as it is very important that you do so\n" | |
| "- Only extract claim items that have an explicitly stated amount next to a discernible service or in an itemized list. Do not infer multiple items if only one amount is clearly listed as a charge.\n" | |
| "- If you are unable to extract information, return an empty json in the format requested above, never give a response other than a json" | |
| ) | |
| def pil_to_bytes(pil_img, quality=70): | |
| buf = BytesIO() | |
| pil_img.save(buf, format='JPEG', quality=quality) | |
| buf.seek(0) | |
| return buf | |
| def preprocess_image(pil_img, max_size=812): | |
| return pil_img.resize((max_size, max_size), Image.LANCZOS) | |
| def extract_info(pil_img): | |
| processed_image = preprocess_image(pil_img) | |
| img_bytes = pil_to_bytes(processed_image) | |
| img_base64 = base64.b64encode(img_bytes.getvalue()).decode("utf-8") | |
| response = openai.chat.completions.create( | |
| model="gpt-4o", | |
| messages=[ | |
| { | |
| "role": "system", | |
| "content": reciept_system_prompt | |
| }, | |
| { | |
| "role": "user", | |
| "content": [ | |
| {"type": "text", "text": "Here is a receipt image:"}, | |
| {"type": "image_url", "image_url": {"url": "data:image/png;base64," + img_base64}} | |
| ] | |
| } | |
| ] | |
| ) | |
| raw_output = response.choices[0].message.content | |
| # print(raw_output) | |
| try: | |
| if raw_output.startswith("```"): | |
| raw_output = raw_output.strip("` \n") | |
| if raw_output.startswith("json"): | |
| raw_output = raw_output[4:].strip() | |
| data = json.loads(raw_output) | |
| # print(data) | |
| validated = ReceiptData(**data) | |
| validated_dict = validated.dict() # This is a Python dict, perfect for fraud check | |
| print(validated_dict) | |
| result = process_receipt(validated_dict) # This expects a dict! | |
| result_json = json.dumps(result, indent=2, ensure_ascii=True) # For display | |
| print(result_json) | |
| return f"```json\n{result_json}\n```" | |
| except Exception as e: | |
| return f"```json\n{json.dumps({'error': str(e), 'raw_output': raw_output}, indent=2)}\n```" | |
| def extract_info_batch(file_list): | |
| """ | |
| Accepts a list of file objects/paths, processes each as a PIL image, and returns results. | |
| """ | |
| results = [] | |
| for file in file_list: | |
| img = Image.open(file) | |
| results.append(extract_info(img)) | |
| return "\n\n".join(results) | |
| def extract_reimbursement_form_info(img_inputs: List[Image.Image], emp_name: str, emp_code: str, department: str, form_name: str): | |
| print(f"Processing child fee info for: {emp_name}, {emp_code}, {department}, Form: {form_name}") | |
| consolidated_items = [] | |
| consolidated_total = 0.0 | |
| first_bill_month_found = "" | |
| processed_image_count = 0 | |
| for i, img_input_item in enumerate(img_inputs): | |
| print(f"Processing image {i+1} of {len(img_inputs)} for child fee form...") | |
| try: | |
| current_pil_img = None | |
| if isinstance(img_input_item, Image.Image): | |
| current_pil_img = img_input_item | |
| else: | |
| # Assume img_input_item is a path, filename, or a file-like object | |
| # that Image.open() can handle (like Gradio's NamedString if it behaves like a path or has a read method) | |
| current_pil_img = Image.open(img_input_item) | |
| processed_image = preprocess_image(current_pil_img) | |
| img_bytes = pil_to_bytes(processed_image) | |
| img_base64 = base64.b64encode(img_bytes.getvalue()).decode("utf-8") | |
| response = openai.chat.completions.create( | |
| model="gpt-4o", | |
| messages=[ | |
| {"role": "system", "content": fee_bill_system_prompt}, | |
| {"role": "user", | |
| "content": [ | |
| {"type": "text", "text": f"Here is a child fee bill image (part {i+1} of a batch):"}, | |
| {"type": "image_url", "image_url": {"url": "data:image/png;base64," + img_base64}} | |
| ]} | |
| ] | |
| ) | |
| raw_output = response.choices[0].message.content | |
| print(f"Raw output from LLM for image {i+1}: {raw_output}") | |
| if raw_output.startswith("```"): | |
| raw_output = raw_output.strip("` \n") | |
| if raw_output.startswith("json"): | |
| raw_output = raw_output[4:].strip() | |
| data = json.loads(raw_output) | |
| print(f"Parsed data from LLM for image {i+1}: {data}") | |
| current_items = data.get("items", []) | |
| if current_items: | |
| consolidated_items.extend(current_items) | |
| # Summing up totals from each bill, or summing items directly for more accuracy | |
| for item in current_items: | |
| consolidated_total += float(item.get("amount", 0) or 0) | |
| if not first_bill_month_found and current_items and "bill_month" in current_items[0]: | |
| first_bill_month_found = current_items[0]["bill_month"] | |
| processed_image_count +=1 | |
| except Exception as e: | |
| print(f"ERROR processing image {i+1} for child fee form: {e}") | |
| # Decide if one error should stop the whole batch or just skip the problematic image | |
| # For now, we skip and continue | |
| continue | |
| if not consolidated_items: # No items extracted from any image | |
| print("No items were extracted from any of the provided images for child fee form.") | |
| # Potentially return an error or an empty PDF/status message | |
| # For now, let's create an empty PDF as the function expects to return a path | |
| # Or, it might be better to return None and let the API endpoint handle the error response. | |
| return None | |
| print(f"Consolidated {len(consolidated_items)} items from {processed_image_count} images.") | |
| print(f"Final total: {consolidated_total}, Bill month to use: {first_bill_month_found}") | |
| os.makedirs("outputs", exist_ok=True) | |
| # Adjust filename to indicate consolidation if multiple images were processed | |
| file_suffix = f"{uuid.uuid4().hex}" | |
| if len(img_inputs) > 1: | |
| file_suffix = f"batch_{file_suffix}" | |
| if form_name == "Child Fee Reimbursement Form": | |
| output_pdf_path = f"outputs/filled_child_fee_reimbursement_form_{file_suffix}.pdf" | |
| elif form_name == "Internet Charges Form": | |
| output_pdf_path = f"outputs/filled_internet_charges_reimbursement_form_{file_suffix}.pdf" | |
| elif form_name == "Mobile Reimbursement Form": | |
| output_pdf_path = f"outputs/filled_mobile_reimbursement_form_{file_suffix}.pdf" | |
| else: # Default or error case | |
| output_pdf_path = f"outputs/filled_unknown_reimbursement_form_{file_suffix}.pdf" | |
| try: | |
| filled_pdf_path = fill_child_fee_pdf( | |
| template_pdf_path="templates/REIMBURSEMENT FORM.pdf", | |
| output_pdf_path=output_pdf_path, | |
| emp_name=emp_name, | |
| emp_code=emp_code, | |
| department=department, | |
| bill_month=first_bill_month_found, | |
| items=consolidated_items, # Use consolidated items | |
| total=consolidated_total # Use consolidated total | |
| ) | |
| return filled_pdf_path | |
| except Exception as e: | |
| print(f"ERROR during PDF generation for consolidated child fee form: {e}") | |
| return None | |
| def extract_medical_info(pil_img, emp_name, emp_code, department, designation, company, extension_no,): | |
| processed_image = preprocess_image(pil_img) | |
| img_bytes = pil_to_bytes(processed_image) | |
| img_base64 = base64.b64encode(img_bytes.getvalue()).decode("utf-8") | |
| response = openai.chat.completions.create( | |
| model="gpt-4o", | |
| messages=[ | |
| {"role": "system", "content": medical_form_system_prompt}, | |
| {"role": "user", | |
| "content": [ | |
| {"type": "text", "text": "Here is a medical form image:"}, | |
| {"type": "image_url", "image_url": {"url": "data:image/png;base64," + img_base64}} | |
| ]} | |
| ] | |
| ) | |
| raw_output = response.choices[0].message.content | |
| print(raw_output) | |
| try: | |
| if raw_output.startswith("```"): | |
| raw_output = raw_output.strip("` \\n") | |
| if raw_output.startswith("json"): | |
| raw_output = raw_output[4:].strip() | |
| data_from_llm = json.loads(raw_output) | |
| print("Data from LLM:", data_from_llm) | |
| # Extract bill_month from LLM data | |
| claims_from_llm = data_from_llm.get("claims", []) | |
| bill_month_from_llm = "" # Default to empty string | |
| if claims_from_llm and isinstance(claims_from_llm, list) and len(claims_from_llm) > 0: | |
| first_claim = claims_from_llm[0] | |
| if isinstance(first_claim, dict) and "bill_month" in first_claim: | |
| bill_month_from_llm = first_claim.get("bill_month", "") | |
| print(f"Extracted billing month from LLM: '{bill_month_from_llm}'") | |
| # Get total from LLM as well | |
| total_from_llm = data_from_llm.get("total", 0) # Default to 0 if not found | |
| print(f"Extracted total from LLM: {total_from_llm}") | |
| form_header_data = { | |
| "company": company, | |
| "employee_name": emp_name, | |
| "department": department, | |
| "designation": designation, | |
| "extension_no": extension_no, | |
| "employee_code": emp_code, | |
| "date": datetime.now().strftime("%Y-%m-%d"), | |
| "billing_month": bill_month_from_llm, | |
| "claims": claims_from_llm, # Pass the full claims array | |
| "total_amount": total_from_llm # Pass the LLM's total (JS will also calculate) | |
| } | |
| json_data_for_script = json.dumps(form_header_data) | |
| html_template_path = os.path.join("templates", "medical_form.html") | |
| with open(html_template_path, "r", encoding="utf-8") as f: | |
| html_content = f.read() | |
| # Correctly formatted script to inject | |
| script_to_inject = f''' | |
| <script> | |
| document.addEventListener('DOMContentLoaded', function() {{ | |
| const dataToLoad = {json_data_for_script}; | |
| if (typeof populateMedicalForm === 'function') {{ | |
| populateMedicalForm(dataToLoad); | |
| }} else {{ | |
| console.error('populateMedicalForm function not defined when trying to load data.'); | |
| }} | |
| }}); | |
| </script> | |
| </body>''' | |
| if "</body>" in html_content: | |
| html_content = html_content.replace("</body>", script_to_inject, 1) | |
| else: | |
| html_content += script_to_inject.replace("</body>","") | |
| output_dir = "outputs" | |
| os.makedirs(output_dir, exist_ok=True) | |
| output_html_filename = f"filled_medical_form_{uuid.uuid4().hex}.html" | |
| output_html_path = os.path.join(output_dir, output_html_filename) | |
| with open(output_html_path, "w", encoding="utf-8") as f: | |
| f.write(html_content) | |
| print(f"Populated HTML form saved to: {output_html_path}") | |
| return output_html_path | |
| except Exception as e: | |
| print(f"ERROR in extract_medical_info: {e}") | |
| raw_output_escaped = html.escape(raw_output) # Escape raw_output for safe HTML display | |
| error_html_content = f"<html><body><h1>Error</h1><p>{html.escape(str(e))}</p><p>Raw LLM Output:</p><pre>{raw_output_escaped}</pre></body></html>" | |
| output_dir = "outputs" | |
| os.makedirs(output_dir, exist_ok=True) | |
| error_html_filename = f"error_medical_form_{uuid.uuid4().hex}.html" | |
| error_html_path = os.path.join(output_dir, error_html_filename) | |
| with open(error_html_path, "w", encoding="utf-8") as f: | |
| f.write(error_html_content) | |
| return error_html_path | |
| def extract_medical_info_batch(image_file_list, emp_name, emp_code, department, designation, company, extension_no): | |
| """ | |
| Processes a batch of medical form images, consolidates all claims, | |
| generates a single populated HTML file, and returns its path. | |
| """ | |
| if not image_file_list: | |
| # Return an error HTML or an empty/default HTML path if no files are provided | |
| error_content = "<html><body><h1>No Images Provided</h1><p>Please upload at least one medical form image.</p></body></html>" | |
| output_dir = "outputs" | |
| os.makedirs(output_dir, exist_ok=True) | |
| error_filename = f"error_no_medical_form_images_{uuid.uuid4().hex[:8]}.html" | |
| error_path = os.path.join(output_dir, error_filename) | |
| with open(error_path, "w", encoding="utf-8") as f: | |
| f.write(error_content) | |
| return error_path # Gradio expects a list, so we might need to adjust app.py output handling or return [error_path] | |
| consolidated_claims = [] | |
| first_billing_month_found = "" # To store the billing month from the first processed image that has one | |
| grand_total_from_llm = 0.0 | |
| processed_file_names = [] | |
| print(f"DEBUG: Starting batch processing for {len(image_file_list)} images.") # DEBUG | |
| for i, image_file_path_obj in enumerate(image_file_list): # DEBUG: Added enumerate | |
| image_name_for_log = image_file_path_obj.name if hasattr(image_file_path_obj, 'name') else str(image_file_path_obj) | |
| processed_file_names.append(os.path.basename(image_name_for_log)) | |
| print(f"DEBUG: --- Iteration {i+1} for image: {image_name_for_log} ---") # DEBUG | |
| try: | |
| print(f"Processing medical form for consolidation: {image_name_for_log}") | |
| pil_img = Image.open(image_file_path_obj.name if hasattr(image_file_path_obj, 'name') else image_file_path_obj) | |
| processed_image = preprocess_image(pil_img) | |
| img_bytes = pil_to_bytes(processed_image) | |
| img_base64 = base64.b64encode(img_bytes.getvalue()).decode("utf-8") | |
| response = openai.chat.completions.create( | |
| model="gpt-4o", | |
| messages=[ | |
| {"role": "system", "content": medical_form_system_prompt}, | |
| {"role": "user", | |
| "content": [ | |
| {"type": "text", "text": f"Extract claims from this medical form image ({image_name_for_log}):"}, | |
| {"type": "image_url", "image_url": {"url": "data:image/png;base64," + img_base64}} | |
| ]} | |
| ] | |
| ) | |
| raw_output = response.choices[0].message.content | |
| print(f"DEBUG: Raw LLM output for {image_name_for_log}:\n{raw_output}\n") # DEBUG | |
| if raw_output.startswith("```"): | |
| raw_output = raw_output.strip("` \\n") | |
| if raw_output.startswith("json"): | |
| raw_output = raw_output[4:].strip() | |
| data_from_llm = json.loads(raw_output) | |
| print(f"DEBUG: Parsed LLM data for {image_name_for_log}:\n{json.dumps(data_from_llm, indent=2)}\n") # DEBUG | |
| current_claims = data_from_llm.get("claims", []) | |
| print(f"DEBUG: Current claims extracted for {image_name_for_log}: {len(current_claims)} items") # DEBUG | |
| # print(f"DEBUG: Current claims content for {image_name_for_log}: {json.dumps(current_claims, indent=2)}") # DEBUG - Can be very verbose | |
| if current_claims and isinstance(current_claims, list): | |
| consolidated_claims.extend(current_claims) | |
| print(f"DEBUG: Consolidated claims after {image_name_for_log}: {len(consolidated_claims)} items total") # DEBUG | |
| # print(f"DEBUG: Consolidated claims content after {image_name_for_log}: {json.dumps(consolidated_claims, indent=2)}") # DEBUG - Can be very verbose | |
| # Get billing month from the first item of the current form's claims, if not already found | |
| if not first_billing_month_found and current_claims and isinstance(current_claims, list) and len(current_claims) > 0: | |
| first_claim_current_img = current_claims[0] | |
| if isinstance(first_claim_current_img, dict) and "bill_month" in first_claim_current_img: | |
| first_billing_month_found = first_claim_current_img.get("bill_month", "") | |
| grand_total_from_llm += float(data_from_llm.get("total", 0) or 0) # Ensure float and handle None | |
| except Exception as e: | |
| print(f"ERROR processing medical form image '{image_name_for_log}' for consolidation: {e}") | |
| # We can decide to stop or continue. For now, let's log and continue, | |
| # but not add its claims. The final HTML will be generated with claims from successful ones. | |
| # Optionally, add an error marker to the final HTML or a separate error report. | |
| # For simplicity here, we just skip this file's claims on error. | |
| print(f"DEBUG: --- End of loop for consolidating claims ---") # DEBUG | |
| print(f"DEBUG: Final consolidated_claims before HTML generation: {len(consolidated_claims)} items") # DEBUG | |
| # print(f"DEBUG: Final consolidated_claims content: {json.dumps(consolidated_claims, indent=2)}") # DEBUG - Can be very verbose | |
| # Now, prepare and save the single consolidated HTML file | |
| form_header_data = { | |
| "company": company, | |
| "employee_name": emp_name, | |
| "department": department, | |
| "designation": designation, | |
| "extension_no": extension_no, | |
| "employee_code": emp_code, | |
| "date": datetime.now().strftime("%Y-%m-%d"), | |
| "billing_month": first_billing_month_found, | |
| "claims": consolidated_claims, # All claims from all images | |
| "total_amount": grand_total_from_llm # Sum of totals from LLM (JS will also recalc) | |
| } | |
| json_data_for_script = json.dumps(form_header_data) | |
| html_template_path = os.path.join("templates", "medical_form.html") | |
| with open(html_template_path, "r", encoding="utf-8") as f: | |
| html_content_template = f.read() | |
| current_html_content = str(html_content_template) | |
| script_to_inject = f''' | |
| <script> | |
| document.addEventListener('DOMContentLoaded', function() {{ | |
| const dataToLoad = {json_data_for_script}; | |
| if (typeof populateMedicalForm === 'function') {{ | |
| populateMedicalForm(dataToLoad); | |
| }} else {{ | |
| console.error('populateMedicalForm function not defined when trying to load data.'); | |
| }} | |
| }}); | |
| </script> | |
| </body>''' | |
| if "</body>" in current_html_content: | |
| current_html_content = current_html_content.replace("</body>", script_to_inject, 1) | |
| else: | |
| current_html_content += script_to_inject.replace("</body>","") | |
| output_dir = "outputs" | |
| os.makedirs(output_dir, exist_ok=True) | |
| # Create a filename for the consolidated report | |
| # You could include employee name or a timestamp if these are consistent per batch. | |
| consolidated_filename_भाग = "_-".join(filter(None, processed_file_names)) | |
| if not consolidated_filename_भाग: | |
| consolidated_filename_भाग = "batch" | |
| output_html_filename = f"consolidated_medical_form_{consolidated_filename_भाग[:50]}_{uuid.uuid4().hex[:8]}.html" | |
| output_html_path = os.path.join(output_dir, output_html_filename) | |
| with open(output_html_path, "w", encoding="utf-8") as f: | |
| f.write(current_html_content) | |
| print(f"Consolidated HTML form saved to: {output_html_path}") | |
| return output_html_path # Return path to the single consolidated HTML | |