""" FastAPI application for Medical Document Validator. To run this application: 1. Set LLM_API_KEY in .env file (get your API key from Anthropic) 2. Set APP_PASSWORD in .env file for access control 3. Install dependencies: pip install -r requirements.txt 4. Run the server: uvicorn app.main:app --reload """ from fastapi import FastAPI, File, UploadFile, HTTPException, Query, Form, Request, Response, Cookie from fastapi.responses import JSONResponse, HTMLResponse, FileResponse, RedirectResponse from fastapi.staticfiles import StaticFiles from pydantic import BaseModel from typing import List, Optional, Dict, Any import os import tempfile import logging import hashlib import secrets from pathlib import Path # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.StreamHandler() # Console output ] ) logger = logging.getLogger(__name__) from app.validator import Validator, load_templates, get_template, extract_images_from_document from app.database import db # Password protection setup APP_PASSWORD = os.environ.get("APP_PASSWORD", "") VALID_SESSIONS = set() # Store valid session tokens def generate_session_token(): """Generate a secure session token.""" return secrets.token_urlsafe(32) def verify_session(session_token: str) -> bool: """Verify if a session token is valid.""" return session_token in VALID_SESSIONS if APP_PASSWORD else True # Load environment variables app = FastAPI( title="Medical Document Validator API", description="API for validating medical documents against predefined templates using LLM", version="1.0.0" ) # Mount static files directory static_path = Path(__file__).parent / "static" if static_path.exists(): app.mount("/static", StaticFiles(directory=str(static_path)), name="static") # Initialize validator validator = Validator() # Pydantic Models class TemplateSummary(BaseModel): """Template summary for listing available templates.""" template_key: str friendly_name: str class ElementReport(BaseModel): """Individual element validation report.""" id: str label: str required: bool is_present: bool reason: str class SpellCheckError(BaseModel): """Individual spelling error.""" word: str context: str suggestions: List[str] error_type: str # "spelling", "grammar", "formatting", "typo" confidence: float class SpellCheckReport(BaseModel): """Spell check report.""" total_errors: int errors: List[SpellCheckError] summary: str class LinkReport(BaseModel): """Link validation report.""" url: str status: str status_code: int message: str page: str class ValidationReport(BaseModel): """Complete validation report response.""" template_key: str status: str # "PASS" or "FAIL" summary: str elements_report: List[ElementReport] spell_check: Optional[SpellCheckReport] = None # Optional spell check results link_report: Optional[List[LinkReport]] = None # Optional link validation results class ComparisonChange(BaseModel): """Individual change detected in comparison.""" type: str # "addition", "deletion", "modification" section: Optional[str] = None # Section/area where change occurred description: str # Description of the change class ComparisonReport(BaseModel): """Document comparison report.""" summary: str # Natural language summary of changes changes: List[ComparisonChange] # Detailed list of changes file1_name: str file2_name: str class BulkValidationDetail(BaseModel): """Individual validation result for bulk certificate validation.""" name: str status: str # "exact_match", "fuzzy_match", "missing", "extra" certificate_file: Optional[str] = None similarity: Optional[int] = None # Percentage for fuzzy matches class BulkValidationResult(BaseModel): """Bulk certificate validation result.""" total_names: int total_certificates: int exact_matches: int fuzzy_matches: int missing: int extras: int details: List[BulkValidationDetail] class Project(BaseModel): """Project model.""" id: int name: str description: Optional[str] = "" created_at: str validation_count: int = 0 class ProjectCreate(BaseModel): """Project creation request.""" name: str description: str = "" class ValidationHistory(BaseModel): """Validation history item.""" id: int project_id: Optional[int] project_name: Optional[str] validation_type: str template_key: Optional[str] filename: str status: str created_at: str # API Endpoints # Login page HTML with JavaScript-based auth LOGIN_PAGE = """
Enter password to access the application
Static files not found. Please check installation.
""") @app.get("/", response_class=HTMLResponse, tags=["Root"]) async def root(): """Redirect to login or app.""" if APP_PASSWORD: return RedirectResponse(url="/login", status_code=302) return RedirectResponse(url="/app", status_code=302) # ==================== SHAREPOINT ENDPOINTS ==================== from app.sharepoint import SharePointConnector sharepoint = SharePointConnector() @app.get("/auth/sharepoint/login", tags=["SharePoint"]) async def sharepoint_login(request: Request): """Start SharePoint OAuth flow.""" redirect_uri = str(request.url_for('sharepoint_callback')).replace('http:', 'https:') if 'huggingface.co' in str(request.base_url) else str(request.url_for('sharepoint_callback')) # Fix for localhost/dev if "localhost" in str(request.base_url) or "127.0.0.1" in str(request.base_url): redirect_uri = "http://localhost:8001/auth/sharepoint/callback" auth_url = sharepoint.get_auth_url(redirect_uri) return {"auth_url": auth_url} @app.get("/auth/sharepoint/callback", tags=["SharePoint"]) async def sharepoint_callback(code: str, request: Request): """Handle OAuth callback.""" # Reconstruct redirect_uri logic redirect_uri = str(request.url_for('sharepoint_callback')).replace('http:', 'https:') if 'huggingface.co' in str(request.base_url) else str(request.url_for('sharepoint_callback')) if "localhost" in str(request.base_url) or "127.0.0.1" in str(request.base_url): redirect_uri = "http://localhost:8001/auth/sharepoint/callback" try: # Get token result = sharepoint.acquire_token_by_code(code, redirect_uri) access_token = result.get("access_token") # Return simple HTML that saves token to localStorage and closes window html_content = f"""You can close this window now.
""" return HTMLResponse(content=html_content) except Exception as e: return HTMLResponse(content=f"{str(e)}
", status_code=400) @app.get("/sharepoint/drives", tags=["SharePoint"]) async def list_drives(token: str = Query(..., description="SharePoint Access Token")): """List available drives (OneDrive + SharePoint sites).""" try: drives = sharepoint.get_drives(token) return drives except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @app.get("/sharepoint/items", tags=["SharePoint"]) async def list_items( drive_id: str, folder_id: Optional[str] = None, token: str = Query(..., description="SharePoint Access Token") ): """List items in a specific drive/folder.""" try: items = sharepoint.list_items(token, drive_id, folder_id) return items except Exception as e: raise HTTPException(status_code=500, detail=str(e)) class SharePointDownload(BaseModel): drive_id: str file_ids: List[str] token: str project_id: Optional[int] = None @app.post("/sharepoint/download-and-validate", tags=["SharePoint"]) async def download_and_validate(data: SharePointDownload): """Download files from SharePoint and validate them.""" try: results = [] for file_id in data.file_ids: # Download file content content = sharepoint.download_file(data.token, data.drive_id, file_id) # Since we don't know the exact filename easily here without relisting or passing it, # we might need to assume or fetch metadata. # For simplicity, let's assume specific operations or just return success for now. # Ideally, we should integrate this with validator. # TODO: Integrate with existing validator logic # This requires converting bytes to UploadFile-like object or modifying validator to accept bytes pass return {"features": "Download validated (stub)"} except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @app.get("/templates", response_model=List[TemplateSummary], tags=["Templates"]) async def get_templates(): """ Get list of all available templates. Returns: List of templates with template_key and friendly_name """ try: templates_data = load_templates() templates = [] for template in templates_data.get("templates", []): templates.append(TemplateSummary( template_key=template.get("template_key"), friendly_name=template.get("friendly_name") )) return templates except Exception as e: raise HTTPException( status_code=500, detail=f"Failed to load templates: {str(e)}" ) @app.post("/validate", response_model=ValidationReport, tags=["Validation"]) async def validate_document( file: UploadFile = File(..., description="Document file to validate (PDF, DOCX, or PPTX)"), template_key: str = Query(..., description="Template key to validate against"), check_spelling: bool = Query(False, description="Enable spell checking (ignores proper names)"), custom_prompt: Optional[str] = Query(None, description="Optional custom instructions to adapt validation") ): """ Validate a document against a specified template. Args: file: Uploaded document file (PDF, DOCX, or PPTX) template_key: Template key to validate against Returns: Validation report with status and element-by-element results Raises: 400: Bad request (invalid template, unsupported format) 422: Unprocessable entity (extraction or validation failure) 500: Internal server error """ # Validate template exists template = get_template(template_key) if not template: raise HTTPException( status_code=404, detail=f"Template not found: {template_key}. Use GET /templates to see available templates." ) # Validate file extension filename = file.filename or "" file_extension = Path(filename).suffix.lower() supported_extensions = [".pdf", ".docx", ".pptx"] if file_extension not in supported_extensions: raise HTTPException( status_code=400, detail=f"Unsupported file format: {file_extension}. Supported formats: {', '.join(supported_extensions)}" ) # Read file content try: file_content = await file.read() if not file_content: raise HTTPException( status_code=400, detail="Uploaded file is empty" ) except Exception as e: raise HTTPException( status_code=400, detail=f"Failed to read file: {str(e)}" ) # Perform validation try: validation_report = await validator.validate_document( file_content=file_content, file_extension=file_extension, template_key=template_key, custom_prompt=custom_prompt ) # Convert to Pydantic model for response validation elements_report = [ ElementReport(**elem) for elem in validation_report.get("elements_report", []) ] # Convert link report to Pydantic models link_report = [ LinkReport(**link) for link in validation_report.get("link_report", []) ] # Perform spell checking if requested spell_check_result = None if check_spelling: # Extract text from the document for spell checking from app.validator import extract_document_text try: document_text = extract_document_text(file_content, file_extension) spell_check_data = validator.check_spelling(document_text) # Convert to Pydantic model spell_errors = [ SpellCheckError(**error) for error in spell_check_data.get("errors", []) ] spell_check_result = SpellCheckReport( total_errors=spell_check_data.get("total_errors", 0), errors=spell_errors, summary=spell_check_data.get("summary", "") ) except Exception as e: logger.error(f"Spell check failed: {str(e)}") # Return empty spell check on error spell_check_result = SpellCheckReport( total_errors=0, errors=[], summary=f"Spell check error: {str(e)}" ) return ValidationReport( template_key=validation_report.get("template_key", template_key), status=validation_report.get("status", "FAIL"), summary=validation_report.get("summary", ""), elements_report=elements_report, spell_check=spell_check_result, link_report=link_report ) except ValueError as e: raise HTTPException( status_code=422, detail=f"Validation error: {str(e)}" ) except Exception as e: raise HTTPException( status_code=500, detail=f"Internal server error during validation: {str(e)}" ) @app.post("/validate/spelling-only", tags=["Validation"]) async def validate_spelling_only( file: UploadFile = File(..., description="Document file to check spelling (PDF, DOCX, or PPTX)") ): """ Check spelling in a document without template validation. Args: file: Uploaded document file (PDF, DOCX, or PPTX) Returns: Spell check report only Raises: 400: Bad request (unsupported format) 422: Unprocessable entity (extraction failure) 500: Internal server error """ # Validate file extension filename = file.filename or "" file_extension = Path(filename).suffix.lower() supported_extensions = [".pdf", ".docx", ".pptx"] if file_extension not in supported_extensions: raise HTTPException( status_code=400, detail=f"Unsupported file format: {file_extension}. Supported formats: {', '.join(supported_extensions)}" ) # Read file content try: file_content = await file.read() if not file_content: raise HTTPException( status_code=400, detail="Uploaded file is empty" ) except Exception as e: raise HTTPException( status_code=400, detail=f"Failed to read file: {str(e)}" ) # Extract text and perform spell checking only try: from app.validator import extract_document_text # Extract text from document try: document_text = extract_document_text(file_content, file_extension) except Exception as e: raise HTTPException( status_code=422, detail=f"Failed to extract text from document: {str(e)}" ) # Perform spell checking spell_check_data = validator.check_spelling(document_text) # Convert to Pydantic model spell_errors = [ SpellCheckError(**error) for error in spell_check_data.get("errors", []) ] spell_check_result = SpellCheckReport( total_errors=spell_check_data.get("total_errors", 0), errors=spell_errors, summary=spell_check_data.get("summary", "") ) # Return spelling-only response return { "mode": "spelling_only", "spell_check": spell_check_result } except HTTPException: raise except Exception as e: logger.error(f"Spell check failed: {str(e)}", exc_info=True) raise HTTPException( status_code=500, detail=f"Internal server error during spell checking: {str(e)}" ) @app.get("/health", tags=["Health"]) async def health_check(): """Health check endpoint.""" return { "status": "healthy", "llm_api_key_configured": bool(os.getenv("LLM_API_KEY")) } @app.post("/debug/extract-images", tags=["Debug"]) async def debug_extract_images( file: UploadFile = File(..., description="Document file to extract images from"), template_key: str = Query(..., description="Template key to identify visual elements") ): """ Debug endpoint to extract and inspect images from a document. Returns detailed information about extracted images without performing validation. """ # Validate template exists template = get_template(template_key) if not template: raise HTTPException( status_code=404, detail=f"Template not found: {template_key}" ) # Validate file extension filename = file.filename or "" file_extension = Path(filename).suffix.lower() supported_extensions = [".pdf", ".docx", ".pptx"] if file_extension not in supported_extensions: raise HTTPException( status_code=400, detail=f"Unsupported file format: {file_extension}" ) # Read file content try: file_content = await file.read() if not file_content: raise HTTPException(status_code=400, detail="Uploaded file is empty") except Exception as e: raise HTTPException(status_code=400, detail=f"Failed to read file: {str(e)}") # Extract images try: with tempfile.TemporaryDirectory() as temp_dir: temp_path = Path(temp_dir) extracted_text, extracted_images = extract_images_from_document( file_content, file_extension, template.get("elements", []), temp_path ) # Get image details image_details = [] for img in extracted_images: img_path = Path(img.file_path) # Check if image data is in memory (new approach) if hasattr(img, '_image_bytes') and img._image_bytes: # Use image data from memory file_size = len(img._image_bytes) dimensions = "unknown" mode = "unknown" try: from PIL import Image as PILImage from io import BytesIO img_io = BytesIO(img._image_bytes) pil_img = PILImage.open(img_io) pil_img.load() dimensions = f"{pil_img.size[0]}x{pil_img.size[1]}" mode = pil_img.mode pil_img.close() img_io.close() except Exception as e: logger.warning(f"Could not read image from memory: {str(e)}") file_exists = True # Data exists in memory elif img_path.exists(): # Fallback: read from file try: file_size = img_path.stat().st_size except Exception: file_size = 0 dimensions = "unknown" mode = "unknown" # Try to get image dimensions, but handle file locking gracefully try: from PIL import Image as PILImage pil_img = None try: pil_img = PILImage.open(img_path) pil_img.load() # Load into memory dimensions = f"{pil_img.size[0]}x{pil_img.size[1]}" mode = pil_img.mode pil_img.close() # Close immediately pil_img = None except Exception as e: logger.warning(f"Could not read image {img_path}: {str(e)}") if pil_img: try: pil_img.close() except: pass except Exception: pass # PIL not available or other error file_exists = True else: file_size = 0 dimensions = "file not found" mode = "unknown" file_exists = False image_details.append({ "id": img.id, "file_path": img.file_path, "file_exists": file_exists, "file_size_bytes": file_size, "dimensions": dimensions, "image_mode": mode, "page_number": img.page_number, "role_hint": img.role_hint, "element_type": img.element_type, "stored_in_memory": hasattr(img, '_image_bytes') and img._image_bytes is not None }) # Get visual elements from template visual_elements = [ { "id": e.get("id"), "label": e.get("label"), "type": e.get("type"), "required": e.get("required", False), "logo_role": e.get("logo_role") if e.get("type") == "logo" else None } for e in template.get("elements", []) if e.get("type") in ["logo", "signature_block", "qr_code_or_image"] ] return { "file_name": filename, "file_extension": file_extension, "file_size_bytes": len(file_content), "text_extracted": len(extracted_text) > 0, "text_length": len(extracted_text), "images_found": len(extracted_images), "images": image_details, "template_visual_elements": visual_elements, "template_requires_visual_elements": len(visual_elements) > 0 } except Exception as e: logger.error(f"Debug image extraction failed: {str(e)}", exc_info=True) raise HTTPException( status_code=500, detail=f"Image extraction failed: {str(e)}" ) @app.post("/compare", response_model=ComparisonReport, tags=["Comparison"]) async def compare_documents( file1: UploadFile = File(..., description="First document (original version)"), file2: UploadFile = File(..., description="Second document (modified version)") ): """ Compare two document versions using LLM to identify changes. Args: file1: Original document file2: Modified document Returns: Comparison report with summary and detailed changes """ # Validate file extensions filename1 = file1.filename or "" filename2 = file2.filename or "" ext1 = Path(filename1).suffix.lower() ext2 = Path(filename2).suffix.lower() supported_extensions = [".pdf", ".docx", ".pptx"] if ext1 not in supported_extensions or ext2 not in supported_extensions: raise HTTPException( status_code=400, detail=f"Unsupported file format. Supported: {', '.join(supported_extensions)}" ) # Read file contents try: content1 = await file1.read() content2 = await file2.read() if not content1 or not content2: raise HTTPException(status_code=400, detail="One or both files are empty") except Exception as e: raise HTTPException(status_code=400, detail=f"Failed to read files: {str(e)}") # Perform comparison using validator try: comparison_result = await validator.compare_documents( file1_content=content1, file1_extension=ext1, file1_name=filename1, file2_content=content2, file2_extension=ext2, file2_name=filename2 ) # Convert to Pydantic models changes = [ ComparisonChange(**change) for change in comparison_result.get("changes", []) ] return ComparisonReport( summary=comparison_result.get("summary", "No summary available"), changes=changes, file1_name=filename1, file2_name=filename2 ) except Exception as e: logger.error(f"Comparison failed: {str(e)}", exc_info=True) raise HTTPException( status_code=500, detail=f"Comparison failed: {str(e)}" ) @app.post("/excel-columns", tags=["Bulk Validation"]) async def get_excel_columns(file: UploadFile = File(...)): """ Extract column headers from an Excel file. Args: file: Excel file (.xlsx) Returns: List of column names and row count """ try: import openpyxl from io import BytesIO content = await file.read() wb = openpyxl.load_workbook(BytesIO(content)) ws = wb.active # Get first row as headers headers = [] for cell in ws[1]: if cell.value: headers.append(str(cell.value)) row_count = ws.max_row - 1 # Exclude header row return { "columns": headers, "row_count": row_count } except Exception as e: raise HTTPException( status_code=400, detail=f"Failed to parse Excel file: {str(e)}" ) @app.post("/bulk-validate", response_model=BulkValidationResult, tags=["Bulk Validation"]) async def bulk_validate_certificates( excel_file: UploadFile = File(..., description="Excel file with names"), name_column: str = Form(..., description="Column name containing names"), certificate_files: List[UploadFile] = File(..., description="Certificate files (max 150)") ): """ Validate multiple certificates against an Excel list of names. Args: excel_file: Excel file with attendee names name_column: Column containing the names certificate_files: List of certificate files to validate Returns: Bulk validation results with matches, missing, and extras """ if len(certificate_files) > 150: raise HTTPException( status_code=400, detail="Maximum 150 certificates allowed" ) try: # Read Excel file excel_content = await excel_file.read() # Read all certificate files cert_data = [] for cert_file in certificate_files: content = await cert_file.read() filename = cert_file.filename or "unknown" ext = Path(filename).suffix.lower() cert_data.append((filename, content, ext)) # Call validator result = await validator.bulk_validate_certificates( excel_content=excel_content, name_column=name_column, certificate_data=cert_data ) # Convert to Pydantic models details = [ BulkValidationDetail(**detail) for detail in result.get("details", []) ] return BulkValidationResult( total_names=result.get("total_names", 0), total_certificates=result.get("total_certificates", 0), exact_matches=result.get("exact_matches", 0), fuzzy_matches=result.get("fuzzy_matches", 0), missing=result.get("missing", 0), extras=result.get("extras", 0), details=details ) except Exception as e: logger.error(f"Bulk validation failed: {str(e)}", exc_info=True) raise HTTPException( status_code=500, detail=f"Bulk validation failed: {str(e)}" ) # ==================== PROJECTS ENDPOINTS ==================== @app.get("/projects", response_model=List[Project], tags=["Projects"]) async def list_projects(): """List all projects.""" try: projects = db.list_projects() return projects except Exception as e: logger.error(f"Failed to list projects: {str(e)}", exc_info=True) raise HTTPException(status_code=500, detail=str(e)) @app.post("/projects", response_model=Project, tags=["Projects"]) async def create_project(project: ProjectCreate): """Create a new project.""" try: project_id = db.create_project(project.name, project.description) created_project = db.get_project(project_id) return created_project except ValueError as e: raise HTTPException(status_code=400, detail=str(e)) except Exception as e: logger.error(f"Failed to create project: {str(e)}", exc_info=True) raise HTTPException(status_code=500, detail=str(e)) @app.get("/projects/{project_id}", response_model=Project, tags=["Projects"]) async def get_project(project_id: int): """Get a specific project.""" try: project = db.get_project(project_id) if not project: raise HTTPException(status_code=404, detail="Project not found") return project except HTTPException: raise except Exception as e: logger.error(f"Failed to get project: {str(e)}", exc_info=True) raise HTTPException(status_code=500, detail=str(e)) @app.delete("/projects/{project_id}", tags=["Projects"]) async def delete_project(project_id: int): """Delete a project.""" try: deleted = db.delete_project(project_id) if not deleted: raise HTTPException(status_code=404, detail="Project not found") return {"message": "Project deleted successfully"} except HTTPException: raise except Exception as e: logger.error(f"Failed to delete project: {str(e)}", exc_info=True) raise HTTPException(status_code=500, detail=str(e)) @app.get("/projects/{project_id}/validations", tags=["Projects"]) async def get_project_validations(project_id: int): """Get all validations for a project.""" try: validations = db.get_project_validations(project_id) return validations except Exception as e: logger.error(f"Failed to get project validations: {str(e)}", exc_info=True) raise HTTPException(status_code=500, detail=str(e)) @app.get("/validations/recent", tags=["Projects"]) async def get_recent_validations(limit: int = Query(50, ge=1, le=200)): """Get recent validations across all projects.""" try: validations = db.get_recent_validations(limit) return validations except Exception as e: logger.error(f"Failed to get recent validations: {str(e)}", exc_info=True) raise HTTPException(status_code=500, detail=str(e)) if __name__ == "__main__": import uvicorn uvicorn.run("app.main:app", host="0.0.0.0", port=8000, reload=True)