Spaces:
Sleeping
Sleeping
| from fastapi import FastAPI, File, UploadFile, Form, HTTPException | |
| from fastapi.responses import JSONResponse, FileResponse | |
| from typing import List, Optional | |
| from PIL import Image | |
| import tempfile | |
| import os | |
| import shutil | |
| import json # Added json import | |
| # Corrected and consolidated imports from pipeline | |
| from pipeline import ( | |
| extract_info, | |
| # extract_info_batch, # This function in pipeline.py takes file paths, FastAPI will call extract_info individually | |
| extract_reimbursement_form_info, | |
| extract_medical_info, | |
| extract_medical_info_batch | |
| ) | |
| # Assuming models.py contains necessary Pydantic models, though not directly used in this file for request validation beyond FastAPI's | |
| # from models import ReceiptData, ChildFeeForm | |
| app = FastAPI() | |
| # Ensure output directory exists | |
| os.makedirs("outputs", exist_ok=True) | |
| async def read_root(): | |
| return {"message": "Welcome to the Document Processing API"} | |
| async def extract_receipt_batch_endpoint(files: List[UploadFile] = File(...)): | |
| results = [] | |
| if not files: | |
| raise HTTPException(status_code=400, detail="No files uploaded.") | |
| for file_upload in files: # Renamed to avoid conflict | |
| try: | |
| if not file_upload.content_type.startswith("image/"): | |
| results.append({"filename": file_upload.filename, "error": "File is not an image."}) | |
| continue | |
| pil_image = Image.open(file_upload.file) | |
| result_json_str = extract_info(pil_image) | |
| if result_json_str.startswith("```json"): | |
| actual_json_content = result_json_str[7:-4].strip() | |
| results.append({"filename": file_upload.filename, "data": json.loads(actual_json_content)}) | |
| else: | |
| results.append({"filename": file_upload.filename, "data": json.loads(result_json_str)}) | |
| except Exception as e: | |
| results.append({"filename": file_upload.filename, "error": str(e)}) | |
| finally: | |
| file_upload.file.close() # Ensure file is closed | |
| return JSONResponse(content=results) | |
| async def extract_reimbursement_form_batch_endpoint( | |
| files: List[UploadFile] = File(...), | |
| emp_name: str = Form(...), | |
| emp_code: str = Form(...), | |
| department: str = Form(...), | |
| form_name: str = Form(...) | |
| ): | |
| pil_images = [] | |
| if not files: | |
| raise HTTPException(status_code=400, detail="No files uploaded for child fee processing.") | |
| for file_upload in files: | |
| try: | |
| if not file_upload.content_type.startswith("image/"): | |
| # Consider how to handle mix of valid/invalid files; for now, error out | |
| raise HTTPException(status_code=400, detail=f"File '{file_upload.filename}' is not an image.") | |
| pil_images.append(Image.open(file_upload.file)) | |
| except Exception as e: # Catch error during Image.open or content_type check | |
| # Clean up already opened files if any before raising | |
| for uploaded_file_obj in files: # Close all originally uploaded file objects | |
| if hasattr(uploaded_file_obj, 'file') and not uploaded_file_obj.file.closed: | |
| uploaded_file_obj.file.close() | |
| raise HTTPException(status_code=400, detail=f"Error processing file '{file_upload.filename}': {str(e)}") | |
| # We don't close file_upload.file here, Image.open() might keep it open or it might be closed by PIL. | |
| # The finally block will handle closing all files. | |
| if not pil_images: # Should be caught by `if not files` or the loop erroring, but as a safeguard. | |
| raise HTTPException(status_code=400, detail="No valid images could be processed.") | |
| try: | |
| pdf_path = extract_reimbursement_form_info( | |
| img_inputs=pil_images, | |
| emp_name=emp_name, | |
| emp_code=emp_code, | |
| department=department, | |
| form_name=form_name | |
| ) | |
| if pdf_path and os.path.exists(pdf_path): | |
| return FileResponse(pdf_path, media_type='application/pdf', filename=os.path.basename(pdf_path)) | |
| else: | |
| # This implies extract_reimbursement_form_info returned None (e.g. no items extracted, or PDF gen error) | |
| raise HTTPException(status_code=500, detail="Failed to generate PDF. No items might have been extracted or an internal error occurred.") | |
| except HTTPException as he: | |
| raise he | |
| except Exception as e: | |
| return JSONResponse(status_code=500, content={"error": "Failed to process child fee form batch", "detail": str(e)}) | |
| finally: | |
| # Ensure all uploaded files are closed | |
| for file_upload in files: | |
| if hasattr(file_upload, 'file') and not file_upload.file.closed: | |
| file_upload.file.close() | |
| async def extract_medical_batch_endpoint( | |
| files: List[UploadFile] = File(...), | |
| emp_name: str = Form(...), | |
| emp_code: str = Form(...), | |
| department: str = Form(...), | |
| designation: str = Form(...), | |
| company: str = Form(...), | |
| extension_no: str = Form(...) | |
| ): | |
| if not files: | |
| raise HTTPException(status_code=400, detail="No files uploaded.") | |
| temp_files_info = [] | |
| temp_dir = tempfile.mkdtemp() | |
| try: | |
| for file_upload in files: | |
| if not file_upload.content_type.startswith("image/"): | |
| # Clean up for this specific error case | |
| for temp_info_obj in temp_files_info: # Iterate over created MockFileObject | |
| if os.path.exists(temp_info_obj.name): | |
| os.remove(temp_info_obj.name) | |
| if os.path.exists(temp_dir): | |
| shutil.rmtree(temp_dir) | |
| raise HTTPException(status_code=400, detail=f"File '{file_upload.filename}' is not an image.") | |
| temp_file_path = "" | |
| try: | |
| # Ensure filename is somewhat safe for path joining, though mkdtemp helps isolate | |
| safe_filename = os.path.basename(file_upload.filename) if file_upload.filename else "unknown_file" | |
| temp_file_path = os.path.join(temp_dir, safe_filename) | |
| with open(temp_file_path, "wb") as f_temp: | |
| shutil.copyfileobj(file_upload.file, f_temp) | |
| class MockFileObject: # Defined inside or ensure it's available | |
| def __init__(self, path, original_filename): | |
| self.name = path | |
| self.original_filename = original_filename | |
| temp_files_info.append(MockFileObject(temp_file_path, file_upload.filename)) | |
| finally: | |
| file_upload.file.close() | |
| if not temp_files_info: | |
| if os.path.exists(temp_dir): # Cleanup if no valid files were processed | |
| shutil.rmtree(temp_dir) | |
| raise HTTPException(status_code=400, detail="No valid image files to process after filtering.") | |
| html_path = extract_medical_info_batch( | |
| image_file_list=temp_files_info, # Pass list of MockFileObjects | |
| emp_name=emp_name, | |
| emp_code=emp_code, | |
| department=department, | |
| designation=designation, | |
| company=company, | |
| extension_no=extension_no | |
| ) | |
| if html_path and os.path.exists(html_path): | |
| status_code_to_return = 200 | |
| # Check if the returned path is for the error HTML page generated when no images are provided. | |
| if "error_no_medical_form_images" in os.path.basename(html_path): | |
| status_code_to_return = 400 | |
| # This endpoint now directly returns the HTML from the pipeline | |
| return FileResponse(html_path, media_type='text/html', filename=os.path.basename(html_path), status_code=status_code_to_return) | |
| else: | |
| # This means extract_medical_info_batch returned None (e.g., no images were processed or HTML generation failed) | |
| raise HTTPException(status_code=500, detail="Failed to generate consolidated HTML medical form. This could be due to no images or an internal error during HTML generation.") | |
| except HTTPException as he: | |
| # General cleanup for HTTPExceptions raised within the main try | |
| for temp_info_obj in temp_files_info: | |
| if os.path.exists(temp_info_obj.name): | |
| os.remove(temp_info_obj.name) | |
| if os.path.exists(temp_dir): | |
| shutil.rmtree(temp_dir) | |
| raise he | |
| except Exception as e: | |
| # General cleanup for other exceptions | |
| for temp_info_obj in temp_files_info: | |
| if os.path.exists(temp_info_obj.name): | |
| os.remove(temp_info_obj.name) | |
| if os.path.exists(temp_dir): | |
| shutil.rmtree(temp_dir) | |
| return JSONResponse(status_code=500, content={"error": "Failed to process batch medical forms", "detail": str(e)}) | |
| finally: | |
| # This finally block attempts cleanup again, belt-and-suspenders. | |
| # It's particularly for the temp_dir itself if not cleaned by specific error handlers. | |
| # Individual files in temp_files_info should ideally be cleaned by the except blocks. | |
| if 'temp_dir' in locals() and os.path.exists(temp_dir): | |
| # Aggressively try to clean contents if not already done | |
| for item_name in os.listdir(temp_dir): | |
| item_path = os.path.join(temp_dir, item_name) | |
| try: | |
| if os.path.isfile(item_path) or os.path.islink(item_path): | |
| os.unlink(item_path) | |
| elif os.path.isdir(item_path): # Should not happen if temp_files are files | |
| shutil.rmtree(item_path) | |
| except Exception as e_clean_item: | |
| print(f"Error cleaning up item {item_path} in temp_dir: {e_clean_item}") | |
| try: | |
| shutil.rmtree(temp_dir) # Remove the directory itself | |
| except Exception as e_clean_dir: | |
| print(f"Error final cleanup of temp directory {temp_dir}: {e_clean_dir}") | |
| # Ensure no trailing comments like "# We will add more endpoints below" | |