Demo_Build / pipeline.py
blessedpug's picture
Implemented FastAPI endpoints - Implemented batch processing for pdf forms
a1a13bb
import openai
from dotenv import load_dotenv
from io import BytesIO
import os, uuid
from PIL import Image
import base64
import json
from models import ReceiptData, ChildFeeForm
from form_fill import fill_child_fee_pdf, fill_medical_pdf
from fraud import process_receipt
from datetime import datetime
import html
from typing import List
load_dotenv()
openai.api_key = os.getenv("OPENAI_API_KEY", "").strip()
reciept_system_prompt = (
"You are an expert at extracting data from receipts. "
"Read the provided image of a receipt and return a JSON object that matches the following Pydantic model:\n"
"from typing import List, Optional\n"
"class ReceiptItem(BaseModel):\n"
" description: str\n"
" amount: float\n\n"
"class FraudData(BaseModel):\n"
" fraud_detected: bool # either True or False\n"
" fraud_type: Optional[str] = None # Type of fraud if detected, e.g., \"duplicate\", \"suspicious\" \n\n"
"class ReceiptData(BaseModel):\n"
" fraud_check: Optional[List[FraudData]] = [] # Optional field for fraud detection, always set to empty list\n"
" merchant: str #Only extract the brand name, not the branch name - Only the brand\n"
" date: str\n"
" total_amount: float\n #Try your hardest to find the accurate total amount\n"
" items: Optional[List[ReceiptItem]] = None\n"
"- Extract only the above given information.\n"
"- If a value is missing, set it to null, \"\", or an empty list as appropriate.\n"
"- For the items field, provide a list of objects with description and amount.\n"
"- For fraud_check, always set to an empty list [].\n"
"- Only return a valid JSON object matching the model above.\n"
"- Do not add any explanation or extra text—only the JSON."
)
fee_bill_system_prompt = (
"You are an expert at extracting data from fee bills. "
"Read the provided image of a fee bill and return a JSON object that matches the following Pydantic model:\n"
"from typing import List, Optional\n"
"class FeeItem(BaseModel):\n"
" bill_date: Optional[str] = None # Bill Date Field, leave null if not found\n"
" description: str\n"
" amount: float\n\n"
" bill_month: Optional[str] = None # Bill Month Field, leave null if not found\n"
"class FeeBillData(BaseModel):\n"
" items: List[FeeItem]\n"
" total: float\n"
"- Extract only the above given information to the best of your ability\n"
"- If a value is missing, set it to null, \"\", or an empty list as appropriate.\n"
"- For the items field, provide a list of objects with date, description, and amount.\n"
"- The total field must be the sum of all amount values in items.\n"
"- Only return a valid JSON object matching the model above.\n"
"- Do not add any explanation or extra text—only the JSON."
)
medical_form_system_prompt = (
"You are an expert at extracting structured data from tabular forms containing sample data. "
"Your task is to read the provided form and return a JSON object that matches the following Pydantic model:\n"
"class Item(BaseModel):\n"
" name: str #the patient name\n"
" relationship: # self, spouse, parent, child\n"
" category: # in-patient, out-patient, maternity(cesarean), maternity(normal)\n"
" detail: # doctor's fee, diagnostic tests, medicines, other hospitalization - only chose from these options, infer from the image which one it is\n"
" bill_month: Optional[str] = None # Bill Month Field, if not directly stated, find the date and infer the month from that, format should be month - year (mm/yy), if not found return null\n"
" amount: float - try your best to extract the exact amount present in the image, sometimes there will be discounts applied, look for the total amount paid\n"
"class Form(BaseModel):\n"
" claims: List[Item]\n"
" total: float\n"
"- Extract only the above information. If a value is missing, set it to null, \"\", or an empty list as appropriate.\n"
"- For the claims field, provide a list of objects with name, relationship, category, detail, and amount.\n"
"- The total field must be the sum of all amount values in claims.\n"
"- Only return a valid JSON object matching the model above.\n"
"- Do not add any explanation or extra text—only the JSON."
"- Try your very best to extract this information as it is very important that you do so\n"
"- Only extract claim items that have an explicitly stated amount next to a discernible service or in an itemized list. Do not infer multiple items if only one amount is clearly listed as a charge.\n"
"- If you are unable to extract information, return an empty json in the format requested above, never give a response other than a json"
)
def pil_to_bytes(pil_img, quality=70):
buf = BytesIO()
pil_img.save(buf, format='JPEG', quality=quality)
buf.seek(0)
return buf
def preprocess_image(pil_img, max_size=812):
return pil_img.resize((max_size, max_size), Image.LANCZOS)
def extract_info(pil_img):
processed_image = preprocess_image(pil_img)
img_bytes = pil_to_bytes(processed_image)
img_base64 = base64.b64encode(img_bytes.getvalue()).decode("utf-8")
response = openai.chat.completions.create(
model="gpt-4o",
messages=[
{
"role": "system",
"content": reciept_system_prompt
},
{
"role": "user",
"content": [
{"type": "text", "text": "Here is a receipt image:"},
{"type": "image_url", "image_url": {"url": "data:image/png;base64," + img_base64}}
]
}
]
)
raw_output = response.choices[0].message.content
# print(raw_output)
try:
if raw_output.startswith("```"):
raw_output = raw_output.strip("` \n")
if raw_output.startswith("json"):
raw_output = raw_output[4:].strip()
data = json.loads(raw_output)
# print(data)
validated = ReceiptData(**data)
validated_dict = validated.dict() # This is a Python dict, perfect for fraud check
print(validated_dict)
result = process_receipt(validated_dict) # This expects a dict!
result_json = json.dumps(result, indent=2, ensure_ascii=True) # For display
print(result_json)
return f"```json\n{result_json}\n```"
except Exception as e:
return f"```json\n{json.dumps({'error': str(e), 'raw_output': raw_output}, indent=2)}\n```"
def extract_info_batch(file_list):
"""
Accepts a list of file objects/paths, processes each as a PIL image, and returns results.
"""
results = []
for file in file_list:
img = Image.open(file)
results.append(extract_info(img))
return "\n\n".join(results)
def extract_reimbursement_form_info(img_inputs: List[Image.Image], emp_name: str, emp_code: str, department: str, form_name: str):
print(f"Processing child fee info for: {emp_name}, {emp_code}, {department}, Form: {form_name}")
consolidated_items = []
consolidated_total = 0.0
first_bill_month_found = ""
processed_image_count = 0
for i, img_input_item in enumerate(img_inputs):
print(f"Processing image {i+1} of {len(img_inputs)} for child fee form...")
try:
current_pil_img = None
if isinstance(img_input_item, Image.Image):
current_pil_img = img_input_item
else:
# Assume img_input_item is a path, filename, or a file-like object
# that Image.open() can handle (like Gradio's NamedString if it behaves like a path or has a read method)
current_pil_img = Image.open(img_input_item)
processed_image = preprocess_image(current_pil_img)
img_bytes = pil_to_bytes(processed_image)
img_base64 = base64.b64encode(img_bytes.getvalue()).decode("utf-8")
response = openai.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "system", "content": fee_bill_system_prompt},
{"role": "user",
"content": [
{"type": "text", "text": f"Here is a child fee bill image (part {i+1} of a batch):"},
{"type": "image_url", "image_url": {"url": "data:image/png;base64," + img_base64}}
]}
]
)
raw_output = response.choices[0].message.content
print(f"Raw output from LLM for image {i+1}: {raw_output}")
if raw_output.startswith("```"):
raw_output = raw_output.strip("` \n")
if raw_output.startswith("json"):
raw_output = raw_output[4:].strip()
data = json.loads(raw_output)
print(f"Parsed data from LLM for image {i+1}: {data}")
current_items = data.get("items", [])
if current_items:
consolidated_items.extend(current_items)
# Summing up totals from each bill, or summing items directly for more accuracy
for item in current_items:
consolidated_total += float(item.get("amount", 0) or 0)
if not first_bill_month_found and current_items and "bill_month" in current_items[0]:
first_bill_month_found = current_items[0]["bill_month"]
processed_image_count +=1
except Exception as e:
print(f"ERROR processing image {i+1} for child fee form: {e}")
# Decide if one error should stop the whole batch or just skip the problematic image
# For now, we skip and continue
continue
if not consolidated_items: # No items extracted from any image
print("No items were extracted from any of the provided images for child fee form.")
# Potentially return an error or an empty PDF/status message
# For now, let's create an empty PDF as the function expects to return a path
# Or, it might be better to return None and let the API endpoint handle the error response.
return None
print(f"Consolidated {len(consolidated_items)} items from {processed_image_count} images.")
print(f"Final total: {consolidated_total}, Bill month to use: {first_bill_month_found}")
os.makedirs("outputs", exist_ok=True)
# Adjust filename to indicate consolidation if multiple images were processed
file_suffix = f"{uuid.uuid4().hex}"
if len(img_inputs) > 1:
file_suffix = f"batch_{file_suffix}"
if form_name == "Child Fee Reimbursement Form":
output_pdf_path = f"outputs/filled_child_fee_reimbursement_form_{file_suffix}.pdf"
elif form_name == "Internet Charges Form":
output_pdf_path = f"outputs/filled_internet_charges_reimbursement_form_{file_suffix}.pdf"
elif form_name == "Mobile Reimbursement Form":
output_pdf_path = f"outputs/filled_mobile_reimbursement_form_{file_suffix}.pdf"
else: # Default or error case
output_pdf_path = f"outputs/filled_unknown_reimbursement_form_{file_suffix}.pdf"
try:
filled_pdf_path = fill_child_fee_pdf(
template_pdf_path="templates/REIMBURSEMENT FORM.pdf",
output_pdf_path=output_pdf_path,
emp_name=emp_name,
emp_code=emp_code,
department=department,
bill_month=first_bill_month_found,
items=consolidated_items, # Use consolidated items
total=consolidated_total # Use consolidated total
)
return filled_pdf_path
except Exception as e:
print(f"ERROR during PDF generation for consolidated child fee form: {e}")
return None
def extract_medical_info(pil_img, emp_name, emp_code, department, designation, company, extension_no,):
processed_image = preprocess_image(pil_img)
img_bytes = pil_to_bytes(processed_image)
img_base64 = base64.b64encode(img_bytes.getvalue()).decode("utf-8")
response = openai.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "system", "content": medical_form_system_prompt},
{"role": "user",
"content": [
{"type": "text", "text": "Here is a medical form image:"},
{"type": "image_url", "image_url": {"url": "data:image/png;base64," + img_base64}}
]}
]
)
raw_output = response.choices[0].message.content
print(raw_output)
try:
if raw_output.startswith("```"):
raw_output = raw_output.strip("` \\n")
if raw_output.startswith("json"):
raw_output = raw_output[4:].strip()
data_from_llm = json.loads(raw_output)
print("Data from LLM:", data_from_llm)
# Extract bill_month from LLM data
claims_from_llm = data_from_llm.get("claims", [])
bill_month_from_llm = "" # Default to empty string
if claims_from_llm and isinstance(claims_from_llm, list) and len(claims_from_llm) > 0:
first_claim = claims_from_llm[0]
if isinstance(first_claim, dict) and "bill_month" in first_claim:
bill_month_from_llm = first_claim.get("bill_month", "")
print(f"Extracted billing month from LLM: '{bill_month_from_llm}'")
# Get total from LLM as well
total_from_llm = data_from_llm.get("total", 0) # Default to 0 if not found
print(f"Extracted total from LLM: {total_from_llm}")
form_header_data = {
"company": company,
"employee_name": emp_name,
"department": department,
"designation": designation,
"extension_no": extension_no,
"employee_code": emp_code,
"date": datetime.now().strftime("%Y-%m-%d"),
"billing_month": bill_month_from_llm,
"claims": claims_from_llm, # Pass the full claims array
"total_amount": total_from_llm # Pass the LLM's total (JS will also calculate)
}
json_data_for_script = json.dumps(form_header_data)
html_template_path = os.path.join("templates", "medical_form.html")
with open(html_template_path, "r", encoding="utf-8") as f:
html_content = f.read()
# Correctly formatted script to inject
script_to_inject = f'''
<script>
document.addEventListener('DOMContentLoaded', function() {{
const dataToLoad = {json_data_for_script};
if (typeof populateMedicalForm === 'function') {{
populateMedicalForm(dataToLoad);
}} else {{
console.error('populateMedicalForm function not defined when trying to load data.');
}}
}});
</script>
</body>'''
if "</body>" in html_content:
html_content = html_content.replace("</body>", script_to_inject, 1)
else:
html_content += script_to_inject.replace("</body>","")
output_dir = "outputs"
os.makedirs(output_dir, exist_ok=True)
output_html_filename = f"filled_medical_form_{uuid.uuid4().hex}.html"
output_html_path = os.path.join(output_dir, output_html_filename)
with open(output_html_path, "w", encoding="utf-8") as f:
f.write(html_content)
print(f"Populated HTML form saved to: {output_html_path}")
return output_html_path
except Exception as e:
print(f"ERROR in extract_medical_info: {e}")
raw_output_escaped = html.escape(raw_output) # Escape raw_output for safe HTML display
error_html_content = f"<html><body><h1>Error</h1><p>{html.escape(str(e))}</p><p>Raw LLM Output:</p><pre>{raw_output_escaped}</pre></body></html>"
output_dir = "outputs"
os.makedirs(output_dir, exist_ok=True)
error_html_filename = f"error_medical_form_{uuid.uuid4().hex}.html"
error_html_path = os.path.join(output_dir, error_html_filename)
with open(error_html_path, "w", encoding="utf-8") as f:
f.write(error_html_content)
return error_html_path
def extract_medical_info_batch(image_file_list, emp_name, emp_code, department, designation, company, extension_no):
"""
Processes a batch of medical form images, consolidates all claims,
generates a single populated HTML file, and returns its path.
"""
if not image_file_list:
# Return an error HTML or an empty/default HTML path if no files are provided
error_content = "<html><body><h1>No Images Provided</h1><p>Please upload at least one medical form image.</p></body></html>"
output_dir = "outputs"
os.makedirs(output_dir, exist_ok=True)
error_filename = f"error_no_medical_form_images_{uuid.uuid4().hex[:8]}.html"
error_path = os.path.join(output_dir, error_filename)
with open(error_path, "w", encoding="utf-8") as f:
f.write(error_content)
return error_path # Gradio expects a list, so we might need to adjust app.py output handling or return [error_path]
consolidated_claims = []
first_billing_month_found = "" # To store the billing month from the first processed image that has one
grand_total_from_llm = 0.0
processed_file_names = []
print(f"DEBUG: Starting batch processing for {len(image_file_list)} images.") # DEBUG
for i, image_file_path_obj in enumerate(image_file_list): # DEBUG: Added enumerate
image_name_for_log = image_file_path_obj.name if hasattr(image_file_path_obj, 'name') else str(image_file_path_obj)
processed_file_names.append(os.path.basename(image_name_for_log))
print(f"DEBUG: --- Iteration {i+1} for image: {image_name_for_log} ---") # DEBUG
try:
print(f"Processing medical form for consolidation: {image_name_for_log}")
pil_img = Image.open(image_file_path_obj.name if hasattr(image_file_path_obj, 'name') else image_file_path_obj)
processed_image = preprocess_image(pil_img)
img_bytes = pil_to_bytes(processed_image)
img_base64 = base64.b64encode(img_bytes.getvalue()).decode("utf-8")
response = openai.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "system", "content": medical_form_system_prompt},
{"role": "user",
"content": [
{"type": "text", "text": f"Extract claims from this medical form image ({image_name_for_log}):"},
{"type": "image_url", "image_url": {"url": "data:image/png;base64," + img_base64}}
]}
]
)
raw_output = response.choices[0].message.content
print(f"DEBUG: Raw LLM output for {image_name_for_log}:\n{raw_output}\n") # DEBUG
if raw_output.startswith("```"):
raw_output = raw_output.strip("` \\n")
if raw_output.startswith("json"):
raw_output = raw_output[4:].strip()
data_from_llm = json.loads(raw_output)
print(f"DEBUG: Parsed LLM data for {image_name_for_log}:\n{json.dumps(data_from_llm, indent=2)}\n") # DEBUG
current_claims = data_from_llm.get("claims", [])
print(f"DEBUG: Current claims extracted for {image_name_for_log}: {len(current_claims)} items") # DEBUG
# print(f"DEBUG: Current claims content for {image_name_for_log}: {json.dumps(current_claims, indent=2)}") # DEBUG - Can be very verbose
if current_claims and isinstance(current_claims, list):
consolidated_claims.extend(current_claims)
print(f"DEBUG: Consolidated claims after {image_name_for_log}: {len(consolidated_claims)} items total") # DEBUG
# print(f"DEBUG: Consolidated claims content after {image_name_for_log}: {json.dumps(consolidated_claims, indent=2)}") # DEBUG - Can be very verbose
# Get billing month from the first item of the current form's claims, if not already found
if not first_billing_month_found and current_claims and isinstance(current_claims, list) and len(current_claims) > 0:
first_claim_current_img = current_claims[0]
if isinstance(first_claim_current_img, dict) and "bill_month" in first_claim_current_img:
first_billing_month_found = first_claim_current_img.get("bill_month", "")
grand_total_from_llm += float(data_from_llm.get("total", 0) or 0) # Ensure float and handle None
except Exception as e:
print(f"ERROR processing medical form image '{image_name_for_log}' for consolidation: {e}")
# We can decide to stop or continue. For now, let's log and continue,
# but not add its claims. The final HTML will be generated with claims from successful ones.
# Optionally, add an error marker to the final HTML or a separate error report.
# For simplicity here, we just skip this file's claims on error.
print(f"DEBUG: --- End of loop for consolidating claims ---") # DEBUG
print(f"DEBUG: Final consolidated_claims before HTML generation: {len(consolidated_claims)} items") # DEBUG
# print(f"DEBUG: Final consolidated_claims content: {json.dumps(consolidated_claims, indent=2)}") # DEBUG - Can be very verbose
# Now, prepare and save the single consolidated HTML file
form_header_data = {
"company": company,
"employee_name": emp_name,
"department": department,
"designation": designation,
"extension_no": extension_no,
"employee_code": emp_code,
"date": datetime.now().strftime("%Y-%m-%d"),
"billing_month": first_billing_month_found,
"claims": consolidated_claims, # All claims from all images
"total_amount": grand_total_from_llm # Sum of totals from LLM (JS will also recalc)
}
json_data_for_script = json.dumps(form_header_data)
html_template_path = os.path.join("templates", "medical_form.html")
with open(html_template_path, "r", encoding="utf-8") as f:
html_content_template = f.read()
current_html_content = str(html_content_template)
script_to_inject = f'''
<script>
document.addEventListener('DOMContentLoaded', function() {{
const dataToLoad = {json_data_for_script};
if (typeof populateMedicalForm === 'function') {{
populateMedicalForm(dataToLoad);
}} else {{
console.error('populateMedicalForm function not defined when trying to load data.');
}}
}});
</script>
</body>'''
if "</body>" in current_html_content:
current_html_content = current_html_content.replace("</body>", script_to_inject, 1)
else:
current_html_content += script_to_inject.replace("</body>","")
output_dir = "outputs"
os.makedirs(output_dir, exist_ok=True)
# Create a filename for the consolidated report
# You could include employee name or a timestamp if these are consistent per batch.
consolidated_filename_भाग = "_-".join(filter(None, processed_file_names))
if not consolidated_filename_भाग:
consolidated_filename_भाग = "batch"
output_html_filename = f"consolidated_medical_form_{consolidated_filename_भाग[:50]}_{uuid.uuid4().hex[:8]}.html"
output_html_path = os.path.join(output_dir, output_html_filename)
with open(output_html_path, "w", encoding="utf-8") as f:
f.write(current_html_content)
print(f"Consolidated HTML form saved to: {output_html_path}")
return output_html_path # Return path to the single consolidated HTML