import openai from dotenv import load_dotenv from io import BytesIO import os, uuid from PIL import Image import base64 import json from models import ReceiptData, ChildFeeForm from form_fill import fill_child_fee_pdf, fill_medical_pdf from fraud import process_receipt from datetime import datetime import html from typing import List load_dotenv() openai.api_key = os.getenv("OPENAI_API_KEY", "").strip() reciept_system_prompt = ( "You are an expert at extracting data from receipts. " "Read the provided image of a receipt and return a JSON object that matches the following Pydantic model:\n" "from typing import List, Optional\n" "class ReceiptItem(BaseModel):\n" " description: str\n" " amount: float\n\n" "class FraudData(BaseModel):\n" " fraud_detected: bool # either True or False\n" " fraud_type: Optional[str] = None # Type of fraud if detected, e.g., \"duplicate\", \"suspicious\" \n\n" "class ReceiptData(BaseModel):\n" " fraud_check: Optional[List[FraudData]] = [] # Optional field for fraud detection, always set to empty list\n" " merchant: str #Only extract the brand name, not the branch name - Only the brand\n" " date: str\n" " total_amount: float\n #Try your hardest to find the accurate total amount\n" " items: Optional[List[ReceiptItem]] = None\n" "- Extract only the above given information.\n" "- If a value is missing, set it to null, \"\", or an empty list as appropriate.\n" "- For the items field, provide a list of objects with description and amount.\n" "- For fraud_check, always set to an empty list [].\n" "- Only return a valid JSON object matching the model above.\n" "- Do not add any explanation or extra text—only the JSON." ) fee_bill_system_prompt = ( "You are an expert at extracting data from fee bills. " "Read the provided image of a fee bill and return a JSON object that matches the following Pydantic model:\n" "from typing import List, Optional\n" "class FeeItem(BaseModel):\n" " bill_date: Optional[str] = None # Bill Date Field, leave null if not found\n" " description: str\n" " amount: float\n\n" " bill_month: Optional[str] = None # Bill Month Field, leave null if not found\n" "class FeeBillData(BaseModel):\n" " items: List[FeeItem]\n" " total: float\n" "- Extract only the above given information to the best of your ability\n" "- If a value is missing, set it to null, \"\", or an empty list as appropriate.\n" "- For the items field, provide a list of objects with date, description, and amount.\n" "- The total field must be the sum of all amount values in items.\n" "- Only return a valid JSON object matching the model above.\n" "- Do not add any explanation or extra text—only the JSON." ) medical_form_system_prompt = ( "You are an expert at extracting structured data from tabular forms containing sample data. " "Your task is to read the provided form and return a JSON object that matches the following Pydantic model:\n" "class Item(BaseModel):\n" " name: str #the patient name\n" " relationship: # self, spouse, parent, child\n" " category: # in-patient, out-patient, maternity(cesarean), maternity(normal)\n" " detail: # doctor's fee, diagnostic tests, medicines, other hospitalization - only chose from these options, infer from the image which one it is\n" " bill_month: Optional[str] = None # Bill Month Field, if not directly stated, find the date and infer the month from that, format should be month - year (mm/yy), if not found return null\n" " amount: float - try your best to extract the exact amount present in the image, sometimes there will be discounts applied, look for the total amount paid\n" "class Form(BaseModel):\n" " claims: List[Item]\n" " total: float\n" "- Extract only the above information. If a value is missing, set it to null, \"\", or an empty list as appropriate.\n" "- For the claims field, provide a list of objects with name, relationship, category, detail, and amount.\n" "- The total field must be the sum of all amount values in claims.\n" "- Only return a valid JSON object matching the model above.\n" "- Do not add any explanation or extra text—only the JSON." "- Try your very best to extract this information as it is very important that you do so\n" "- Only extract claim items that have an explicitly stated amount next to a discernible service or in an itemized list. Do not infer multiple items if only one amount is clearly listed as a charge.\n" "- If you are unable to extract information, return an empty json in the format requested above, never give a response other than a json" ) def pil_to_bytes(pil_img, quality=70): buf = BytesIO() pil_img.save(buf, format='JPEG', quality=quality) buf.seek(0) return buf def preprocess_image(pil_img, max_size=812): return pil_img.resize((max_size, max_size), Image.LANCZOS) def extract_info(pil_img): processed_image = preprocess_image(pil_img) img_bytes = pil_to_bytes(processed_image) img_base64 = base64.b64encode(img_bytes.getvalue()).decode("utf-8") response = openai.chat.completions.create( model="gpt-4o", messages=[ { "role": "system", "content": reciept_system_prompt }, { "role": "user", "content": [ {"type": "text", "text": "Here is a receipt image:"}, {"type": "image_url", "image_url": {"url": "data:image/png;base64," + img_base64}} ] } ] ) raw_output = response.choices[0].message.content # print(raw_output) try: if raw_output.startswith("```"): raw_output = raw_output.strip("` \n") if raw_output.startswith("json"): raw_output = raw_output[4:].strip() data = json.loads(raw_output) # print(data) validated = ReceiptData(**data) validated_dict = validated.dict() # This is a Python dict, perfect for fraud check print(validated_dict) result = process_receipt(validated_dict) # This expects a dict! result_json = json.dumps(result, indent=2, ensure_ascii=True) # For display print(result_json) return f"```json\n{result_json}\n```" except Exception as e: return f"```json\n{json.dumps({'error': str(e), 'raw_output': raw_output}, indent=2)}\n```" def extract_info_batch(file_list): """ Accepts a list of file objects/paths, processes each as a PIL image, and returns results. """ results = [] for file in file_list: img = Image.open(file) results.append(extract_info(img)) return "\n\n".join(results) def extract_reimbursement_form_info(img_inputs: List[Image.Image], emp_name: str, emp_code: str, department: str, form_name: str): print(f"Processing child fee info for: {emp_name}, {emp_code}, {department}, Form: {form_name}") consolidated_items = [] consolidated_total = 0.0 first_bill_month_found = "" processed_image_count = 0 for i, img_input_item in enumerate(img_inputs): print(f"Processing image {i+1} of {len(img_inputs)} for child fee form...") try: current_pil_img = None if isinstance(img_input_item, Image.Image): current_pil_img = img_input_item else: # Assume img_input_item is a path, filename, or a file-like object # that Image.open() can handle (like Gradio's NamedString if it behaves like a path or has a read method) current_pil_img = Image.open(img_input_item) processed_image = preprocess_image(current_pil_img) img_bytes = pil_to_bytes(processed_image) img_base64 = base64.b64encode(img_bytes.getvalue()).decode("utf-8") response = openai.chat.completions.create( model="gpt-4o", messages=[ {"role": "system", "content": fee_bill_system_prompt}, {"role": "user", "content": [ {"type": "text", "text": f"Here is a child fee bill image (part {i+1} of a batch):"}, {"type": "image_url", "image_url": {"url": "data:image/png;base64," + img_base64}} ]} ] ) raw_output = response.choices[0].message.content print(f"Raw output from LLM for image {i+1}: {raw_output}") if raw_output.startswith("```"): raw_output = raw_output.strip("` \n") if raw_output.startswith("json"): raw_output = raw_output[4:].strip() data = json.loads(raw_output) print(f"Parsed data from LLM for image {i+1}: {data}") current_items = data.get("items", []) if current_items: consolidated_items.extend(current_items) # Summing up totals from each bill, or summing items directly for more accuracy for item in current_items: consolidated_total += float(item.get("amount", 0) or 0) if not first_bill_month_found and current_items and "bill_month" in current_items[0]: first_bill_month_found = current_items[0]["bill_month"] processed_image_count +=1 except Exception as e: print(f"ERROR processing image {i+1} for child fee form: {e}") # Decide if one error should stop the whole batch or just skip the problematic image # For now, we skip and continue continue if not consolidated_items: # No items extracted from any image print("No items were extracted from any of the provided images for child fee form.") # Potentially return an error or an empty PDF/status message # For now, let's create an empty PDF as the function expects to return a path # Or, it might be better to return None and let the API endpoint handle the error response. return None print(f"Consolidated {len(consolidated_items)} items from {processed_image_count} images.") print(f"Final total: {consolidated_total}, Bill month to use: {first_bill_month_found}") os.makedirs("outputs", exist_ok=True) # Adjust filename to indicate consolidation if multiple images were processed file_suffix = f"{uuid.uuid4().hex}" if len(img_inputs) > 1: file_suffix = f"batch_{file_suffix}" if form_name == "Child Fee Reimbursement Form": output_pdf_path = f"outputs/filled_child_fee_reimbursement_form_{file_suffix}.pdf" elif form_name == "Internet Charges Form": output_pdf_path = f"outputs/filled_internet_charges_reimbursement_form_{file_suffix}.pdf" elif form_name == "Mobile Reimbursement Form": output_pdf_path = f"outputs/filled_mobile_reimbursement_form_{file_suffix}.pdf" else: # Default or error case output_pdf_path = f"outputs/filled_unknown_reimbursement_form_{file_suffix}.pdf" try: filled_pdf_path = fill_child_fee_pdf( template_pdf_path="templates/REIMBURSEMENT FORM.pdf", output_pdf_path=output_pdf_path, emp_name=emp_name, emp_code=emp_code, department=department, bill_month=first_bill_month_found, items=consolidated_items, # Use consolidated items total=consolidated_total # Use consolidated total ) return filled_pdf_path except Exception as e: print(f"ERROR during PDF generation for consolidated child fee form: {e}") return None def extract_medical_info(pil_img, emp_name, emp_code, department, designation, company, extension_no,): processed_image = preprocess_image(pil_img) img_bytes = pil_to_bytes(processed_image) img_base64 = base64.b64encode(img_bytes.getvalue()).decode("utf-8") response = openai.chat.completions.create( model="gpt-4o", messages=[ {"role": "system", "content": medical_form_system_prompt}, {"role": "user", "content": [ {"type": "text", "text": "Here is a medical form image:"}, {"type": "image_url", "image_url": {"url": "data:image/png;base64," + img_base64}} ]} ] ) raw_output = response.choices[0].message.content print(raw_output) try: if raw_output.startswith("```"): raw_output = raw_output.strip("` \\n") if raw_output.startswith("json"): raw_output = raw_output[4:].strip() data_from_llm = json.loads(raw_output) print("Data from LLM:", data_from_llm) # Extract bill_month from LLM data claims_from_llm = data_from_llm.get("claims", []) bill_month_from_llm = "" # Default to empty string if claims_from_llm and isinstance(claims_from_llm, list) and len(claims_from_llm) > 0: first_claim = claims_from_llm[0] if isinstance(first_claim, dict) and "bill_month" in first_claim: bill_month_from_llm = first_claim.get("bill_month", "") print(f"Extracted billing month from LLM: '{bill_month_from_llm}'") # Get total from LLM as well total_from_llm = data_from_llm.get("total", 0) # Default to 0 if not found print(f"Extracted total from LLM: {total_from_llm}") form_header_data = { "company": company, "employee_name": emp_name, "department": department, "designation": designation, "extension_no": extension_no, "employee_code": emp_code, "date": datetime.now().strftime("%Y-%m-%d"), "billing_month": bill_month_from_llm, "claims": claims_from_llm, # Pass the full claims array "total_amount": total_from_llm # Pass the LLM's total (JS will also calculate) } json_data_for_script = json.dumps(form_header_data) html_template_path = os.path.join("templates", "medical_form.html") with open(html_template_path, "r", encoding="utf-8") as f: html_content = f.read() # Correctly formatted script to inject script_to_inject = f'''
{html.escape(str(e))}
Raw LLM Output:
{raw_output_escaped}