import openai from dotenv import load_dotenv from io import BytesIO import os, uuid from PIL import Image import base64 import json from models import ReceiptData, ChildFeeForm from form_fill import fill_child_fee_pdf, fill_medical_pdf from fraud import process_receipt from datetime import datetime import html from typing import List load_dotenv() openai.api_key = os.getenv("OPENAI_API_KEY", "").strip() reciept_system_prompt = ( "You are an expert at extracting data from receipts. " "Read the provided image of a receipt and return a JSON object that matches the following Pydantic model:\n" "from typing import List, Optional\n" "class ReceiptItem(BaseModel):\n" " description: str\n" " amount: float\n\n" "class FraudData(BaseModel):\n" " fraud_detected: bool # either True or False\n" " fraud_type: Optional[str] = None # Type of fraud if detected, e.g., \"duplicate\", \"suspicious\" \n\n" "class ReceiptData(BaseModel):\n" " fraud_check: Optional[List[FraudData]] = [] # Optional field for fraud detection, always set to empty list\n" " merchant: str #Only extract the brand name, not the branch name - Only the brand\n" " date: str\n" " total_amount: float\n #Try your hardest to find the accurate total amount\n" " items: Optional[List[ReceiptItem]] = None\n" "- Extract only the above given information.\n" "- If a value is missing, set it to null, \"\", or an empty list as appropriate.\n" "- For the items field, provide a list of objects with description and amount.\n" "- For fraud_check, always set to an empty list [].\n" "- Only return a valid JSON object matching the model above.\n" "- Do not add any explanation or extra text—only the JSON." ) fee_bill_system_prompt = ( "You are an expert at extracting data from fee bills. " "Read the provided image of a fee bill and return a JSON object that matches the following Pydantic model:\n" "from typing import List, Optional\n" "class FeeItem(BaseModel):\n" " bill_date: Optional[str] = None # Bill Date Field, leave null if not found\n" " description: str\n" " amount: float\n\n" " bill_month: Optional[str] = None # Bill Month Field, leave null if not found\n" "class FeeBillData(BaseModel):\n" " items: List[FeeItem]\n" " total: float\n" "- Extract only the above given information to the best of your ability\n" "- If a value is missing, set it to null, \"\", or an empty list as appropriate.\n" "- For the items field, provide a list of objects with date, description, and amount.\n" "- The total field must be the sum of all amount values in items.\n" "- Only return a valid JSON object matching the model above.\n" "- Do not add any explanation or extra text—only the JSON." ) medical_form_system_prompt = ( "You are an expert at extracting structured data from tabular forms containing sample data. " "Your task is to read the provided form and return a JSON object that matches the following Pydantic model:\n" "class Item(BaseModel):\n" " name: str #the patient name\n" " relationship: # self, spouse, parent, child\n" " category: # in-patient, out-patient, maternity(cesarean), maternity(normal)\n" " detail: # doctor's fee, diagnostic tests, medicines, other hospitalization - only chose from these options, infer from the image which one it is\n" " bill_month: Optional[str] = None # Bill Month Field, if not directly stated, find the date and infer the month from that, format should be month - year (mm/yy), if not found return null\n" " amount: float - try your best to extract the exact amount present in the image, sometimes there will be discounts applied, look for the total amount paid\n" "class Form(BaseModel):\n" " claims: List[Item]\n" " total: float\n" "- Extract only the above information. If a value is missing, set it to null, \"\", or an empty list as appropriate.\n" "- For the claims field, provide a list of objects with name, relationship, category, detail, and amount.\n" "- The total field must be the sum of all amount values in claims.\n" "- Only return a valid JSON object matching the model above.\n" "- Do not add any explanation or extra text—only the JSON." "- Try your very best to extract this information as it is very important that you do so\n" "- Only extract claim items that have an explicitly stated amount next to a discernible service or in an itemized list. Do not infer multiple items if only one amount is clearly listed as a charge.\n" "- If you are unable to extract information, return an empty json in the format requested above, never give a response other than a json" ) def pil_to_bytes(pil_img, quality=70): buf = BytesIO() pil_img.save(buf, format='JPEG', quality=quality) buf.seek(0) return buf def preprocess_image(pil_img, max_size=812): return pil_img.resize((max_size, max_size), Image.LANCZOS) def extract_info(pil_img): processed_image = preprocess_image(pil_img) img_bytes = pil_to_bytes(processed_image) img_base64 = base64.b64encode(img_bytes.getvalue()).decode("utf-8") response = openai.chat.completions.create( model="gpt-4o", messages=[ { "role": "system", "content": reciept_system_prompt }, { "role": "user", "content": [ {"type": "text", "text": "Here is a receipt image:"}, {"type": "image_url", "image_url": {"url": "data:image/png;base64," + img_base64}} ] } ] ) raw_output = response.choices[0].message.content # print(raw_output) try: if raw_output.startswith("```"): raw_output = raw_output.strip("` \n") if raw_output.startswith("json"): raw_output = raw_output[4:].strip() data = json.loads(raw_output) # print(data) validated = ReceiptData(**data) validated_dict = validated.dict() # This is a Python dict, perfect for fraud check print(validated_dict) result = process_receipt(validated_dict) # This expects a dict! result_json = json.dumps(result, indent=2, ensure_ascii=True) # For display print(result_json) return f"```json\n{result_json}\n```" except Exception as e: return f"```json\n{json.dumps({'error': str(e), 'raw_output': raw_output}, indent=2)}\n```" def extract_info_batch(file_list): """ Accepts a list of file objects/paths, processes each as a PIL image, and returns results. """ results = [] for file in file_list: img = Image.open(file) results.append(extract_info(img)) return "\n\n".join(results) def extract_reimbursement_form_info(img_inputs: List[Image.Image], emp_name: str, emp_code: str, department: str, form_name: str): print(f"Processing child fee info for: {emp_name}, {emp_code}, {department}, Form: {form_name}") consolidated_items = [] consolidated_total = 0.0 first_bill_month_found = "" processed_image_count = 0 for i, img_input_item in enumerate(img_inputs): print(f"Processing image {i+1} of {len(img_inputs)} for child fee form...") try: current_pil_img = None if isinstance(img_input_item, Image.Image): current_pil_img = img_input_item else: # Assume img_input_item is a path, filename, or a file-like object # that Image.open() can handle (like Gradio's NamedString if it behaves like a path or has a read method) current_pil_img = Image.open(img_input_item) processed_image = preprocess_image(current_pil_img) img_bytes = pil_to_bytes(processed_image) img_base64 = base64.b64encode(img_bytes.getvalue()).decode("utf-8") response = openai.chat.completions.create( model="gpt-4o", messages=[ {"role": "system", "content": fee_bill_system_prompt}, {"role": "user", "content": [ {"type": "text", "text": f"Here is a child fee bill image (part {i+1} of a batch):"}, {"type": "image_url", "image_url": {"url": "data:image/png;base64," + img_base64}} ]} ] ) raw_output = response.choices[0].message.content print(f"Raw output from LLM for image {i+1}: {raw_output}") if raw_output.startswith("```"): raw_output = raw_output.strip("` \n") if raw_output.startswith("json"): raw_output = raw_output[4:].strip() data = json.loads(raw_output) print(f"Parsed data from LLM for image {i+1}: {data}") current_items = data.get("items", []) if current_items: consolidated_items.extend(current_items) # Summing up totals from each bill, or summing items directly for more accuracy for item in current_items: consolidated_total += float(item.get("amount", 0) or 0) if not first_bill_month_found and current_items and "bill_month" in current_items[0]: first_bill_month_found = current_items[0]["bill_month"] processed_image_count +=1 except Exception as e: print(f"ERROR processing image {i+1} for child fee form: {e}") # Decide if one error should stop the whole batch or just skip the problematic image # For now, we skip and continue continue if not consolidated_items: # No items extracted from any image print("No items were extracted from any of the provided images for child fee form.") # Potentially return an error or an empty PDF/status message # For now, let's create an empty PDF as the function expects to return a path # Or, it might be better to return None and let the API endpoint handle the error response. return None print(f"Consolidated {len(consolidated_items)} items from {processed_image_count} images.") print(f"Final total: {consolidated_total}, Bill month to use: {first_bill_month_found}") os.makedirs("outputs", exist_ok=True) # Adjust filename to indicate consolidation if multiple images were processed file_suffix = f"{uuid.uuid4().hex}" if len(img_inputs) > 1: file_suffix = f"batch_{file_suffix}" if form_name == "Child Fee Reimbursement Form": output_pdf_path = f"outputs/filled_child_fee_reimbursement_form_{file_suffix}.pdf" elif form_name == "Internet Charges Form": output_pdf_path = f"outputs/filled_internet_charges_reimbursement_form_{file_suffix}.pdf" elif form_name == "Mobile Reimbursement Form": output_pdf_path = f"outputs/filled_mobile_reimbursement_form_{file_suffix}.pdf" else: # Default or error case output_pdf_path = f"outputs/filled_unknown_reimbursement_form_{file_suffix}.pdf" try: filled_pdf_path = fill_child_fee_pdf( template_pdf_path="templates/REIMBURSEMENT FORM.pdf", output_pdf_path=output_pdf_path, emp_name=emp_name, emp_code=emp_code, department=department, bill_month=first_bill_month_found, items=consolidated_items, # Use consolidated items total=consolidated_total # Use consolidated total ) return filled_pdf_path except Exception as e: print(f"ERROR during PDF generation for consolidated child fee form: {e}") return None def extract_medical_info(pil_img, emp_name, emp_code, department, designation, company, extension_no,): processed_image = preprocess_image(pil_img) img_bytes = pil_to_bytes(processed_image) img_base64 = base64.b64encode(img_bytes.getvalue()).decode("utf-8") response = openai.chat.completions.create( model="gpt-4o", messages=[ {"role": "system", "content": medical_form_system_prompt}, {"role": "user", "content": [ {"type": "text", "text": "Here is a medical form image:"}, {"type": "image_url", "image_url": {"url": "data:image/png;base64," + img_base64}} ]} ] ) raw_output = response.choices[0].message.content print(raw_output) try: if raw_output.startswith("```"): raw_output = raw_output.strip("` \\n") if raw_output.startswith("json"): raw_output = raw_output[4:].strip() data_from_llm = json.loads(raw_output) print("Data from LLM:", data_from_llm) # Extract bill_month from LLM data claims_from_llm = data_from_llm.get("claims", []) bill_month_from_llm = "" # Default to empty string if claims_from_llm and isinstance(claims_from_llm, list) and len(claims_from_llm) > 0: first_claim = claims_from_llm[0] if isinstance(first_claim, dict) and "bill_month" in first_claim: bill_month_from_llm = first_claim.get("bill_month", "") print(f"Extracted billing month from LLM: '{bill_month_from_llm}'") # Get total from LLM as well total_from_llm = data_from_llm.get("total", 0) # Default to 0 if not found print(f"Extracted total from LLM: {total_from_llm}") form_header_data = { "company": company, "employee_name": emp_name, "department": department, "designation": designation, "extension_no": extension_no, "employee_code": emp_code, "date": datetime.now().strftime("%Y-%m-%d"), "billing_month": bill_month_from_llm, "claims": claims_from_llm, # Pass the full claims array "total_amount": total_from_llm # Pass the LLM's total (JS will also calculate) } json_data_for_script = json.dumps(form_header_data) html_template_path = os.path.join("templates", "medical_form.html") with open(html_template_path, "r", encoding="utf-8") as f: html_content = f.read() # Correctly formatted script to inject script_to_inject = f''' ''' if "" in html_content: html_content = html_content.replace("", script_to_inject, 1) else: html_content += script_to_inject.replace("","") output_dir = "outputs" os.makedirs(output_dir, exist_ok=True) output_html_filename = f"filled_medical_form_{uuid.uuid4().hex}.html" output_html_path = os.path.join(output_dir, output_html_filename) with open(output_html_path, "w", encoding="utf-8") as f: f.write(html_content) print(f"Populated HTML form saved to: {output_html_path}") return output_html_path except Exception as e: print(f"ERROR in extract_medical_info: {e}") raw_output_escaped = html.escape(raw_output) # Escape raw_output for safe HTML display error_html_content = f"

Error

{html.escape(str(e))}

Raw LLM Output:

{raw_output_escaped}
" output_dir = "outputs" os.makedirs(output_dir, exist_ok=True) error_html_filename = f"error_medical_form_{uuid.uuid4().hex}.html" error_html_path = os.path.join(output_dir, error_html_filename) with open(error_html_path, "w", encoding="utf-8") as f: f.write(error_html_content) return error_html_path def extract_medical_info_batch(image_file_list, emp_name, emp_code, department, designation, company, extension_no): """ Processes a batch of medical form images, consolidates all claims, generates a single populated HTML file, and returns its path. """ if not image_file_list: # Return an error HTML or an empty/default HTML path if no files are provided error_content = "

No Images Provided

Please upload at least one medical form image.

" output_dir = "outputs" os.makedirs(output_dir, exist_ok=True) error_filename = f"error_no_medical_form_images_{uuid.uuid4().hex[:8]}.html" error_path = os.path.join(output_dir, error_filename) with open(error_path, "w", encoding="utf-8") as f: f.write(error_content) return error_path # Gradio expects a list, so we might need to adjust app.py output handling or return [error_path] consolidated_claims = [] first_billing_month_found = "" # To store the billing month from the first processed image that has one grand_total_from_llm = 0.0 processed_file_names = [] print(f"DEBUG: Starting batch processing for {len(image_file_list)} images.") # DEBUG for i, image_file_path_obj in enumerate(image_file_list): # DEBUG: Added enumerate image_name_for_log = image_file_path_obj.name if hasattr(image_file_path_obj, 'name') else str(image_file_path_obj) processed_file_names.append(os.path.basename(image_name_for_log)) print(f"DEBUG: --- Iteration {i+1} for image: {image_name_for_log} ---") # DEBUG try: print(f"Processing medical form for consolidation: {image_name_for_log}") pil_img = Image.open(image_file_path_obj.name if hasattr(image_file_path_obj, 'name') else image_file_path_obj) processed_image = preprocess_image(pil_img) img_bytes = pil_to_bytes(processed_image) img_base64 = base64.b64encode(img_bytes.getvalue()).decode("utf-8") response = openai.chat.completions.create( model="gpt-4o", messages=[ {"role": "system", "content": medical_form_system_prompt}, {"role": "user", "content": [ {"type": "text", "text": f"Extract claims from this medical form image ({image_name_for_log}):"}, {"type": "image_url", "image_url": {"url": "data:image/png;base64," + img_base64}} ]} ] ) raw_output = response.choices[0].message.content print(f"DEBUG: Raw LLM output for {image_name_for_log}:\n{raw_output}\n") # DEBUG if raw_output.startswith("```"): raw_output = raw_output.strip("` \\n") if raw_output.startswith("json"): raw_output = raw_output[4:].strip() data_from_llm = json.loads(raw_output) print(f"DEBUG: Parsed LLM data for {image_name_for_log}:\n{json.dumps(data_from_llm, indent=2)}\n") # DEBUG current_claims = data_from_llm.get("claims", []) print(f"DEBUG: Current claims extracted for {image_name_for_log}: {len(current_claims)} items") # DEBUG # print(f"DEBUG: Current claims content for {image_name_for_log}: {json.dumps(current_claims, indent=2)}") # DEBUG - Can be very verbose if current_claims and isinstance(current_claims, list): consolidated_claims.extend(current_claims) print(f"DEBUG: Consolidated claims after {image_name_for_log}: {len(consolidated_claims)} items total") # DEBUG # print(f"DEBUG: Consolidated claims content after {image_name_for_log}: {json.dumps(consolidated_claims, indent=2)}") # DEBUG - Can be very verbose # Get billing month from the first item of the current form's claims, if not already found if not first_billing_month_found and current_claims and isinstance(current_claims, list) and len(current_claims) > 0: first_claim_current_img = current_claims[0] if isinstance(first_claim_current_img, dict) and "bill_month" in first_claim_current_img: first_billing_month_found = first_claim_current_img.get("bill_month", "") grand_total_from_llm += float(data_from_llm.get("total", 0) or 0) # Ensure float and handle None except Exception as e: print(f"ERROR processing medical form image '{image_name_for_log}' for consolidation: {e}") # We can decide to stop or continue. For now, let's log and continue, # but not add its claims. The final HTML will be generated with claims from successful ones. # Optionally, add an error marker to the final HTML or a separate error report. # For simplicity here, we just skip this file's claims on error. print(f"DEBUG: --- End of loop for consolidating claims ---") # DEBUG print(f"DEBUG: Final consolidated_claims before HTML generation: {len(consolidated_claims)} items") # DEBUG # print(f"DEBUG: Final consolidated_claims content: {json.dumps(consolidated_claims, indent=2)}") # DEBUG - Can be very verbose # Now, prepare and save the single consolidated HTML file form_header_data = { "company": company, "employee_name": emp_name, "department": department, "designation": designation, "extension_no": extension_no, "employee_code": emp_code, "date": datetime.now().strftime("%Y-%m-%d"), "billing_month": first_billing_month_found, "claims": consolidated_claims, # All claims from all images "total_amount": grand_total_from_llm # Sum of totals from LLM (JS will also recalc) } json_data_for_script = json.dumps(form_header_data) html_template_path = os.path.join("templates", "medical_form.html") with open(html_template_path, "r", encoding="utf-8") as f: html_content_template = f.read() current_html_content = str(html_content_template) script_to_inject = f''' ''' if "" in current_html_content: current_html_content = current_html_content.replace("", script_to_inject, 1) else: current_html_content += script_to_inject.replace("","") output_dir = "outputs" os.makedirs(output_dir, exist_ok=True) # Create a filename for the consolidated report # You could include employee name or a timestamp if these are consistent per batch. consolidated_filename_भाग = "_-".join(filter(None, processed_file_names)) if not consolidated_filename_भाग: consolidated_filename_भाग = "batch" output_html_filename = f"consolidated_medical_form_{consolidated_filename_भाग[:50]}_{uuid.uuid4().hex[:8]}.html" output_html_path = os.path.join(output_dir, output_html_filename) with open(output_html_path, "w", encoding="utf-8") as f: f.write(current_html_content) print(f"Consolidated HTML form saved to: {output_html_path}") return output_html_path # Return path to the single consolidated HTML