Spaces:
Sleeping
Sleeping
| """ | |
| FastAPI application for Medical Document Validator. | |
| To run this application: | |
| 1. Set LLM_API_KEY in .env file (get your API key from Anthropic) | |
| 2. Set APP_PASSWORD in .env file for access control | |
| 3. Install dependencies: pip install -r requirements.txt | |
| 4. Run the server: uvicorn app.main:app --reload | |
| """ | |
| from fastapi import FastAPI, File, UploadFile, HTTPException, Query, Form, Request, Response, Cookie | |
| from fastapi.responses import JSONResponse, HTMLResponse, FileResponse, RedirectResponse | |
| from fastapi.staticfiles import StaticFiles | |
| from pydantic import BaseModel | |
| from typing import List, Optional, Dict, Any | |
| import os | |
| import tempfile | |
| import logging | |
| import hashlib | |
| import secrets | |
| from pathlib import Path | |
| # Configure logging | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', | |
| handlers=[ | |
| logging.StreamHandler() # Console output | |
| ] | |
| ) | |
| logger = logging.getLogger(__name__) | |
| from app.validator import Validator, load_templates, get_template, extract_images_from_document | |
| from app.database import db | |
| # Password protection setup | |
| APP_PASSWORD = os.environ.get("APP_PASSWORD", "") | |
| VALID_SESSIONS = set() # Store valid session tokens | |
| def generate_session_token(): | |
| """Generate a secure session token.""" | |
| return secrets.token_urlsafe(32) | |
| def verify_session(session_token: str) -> bool: | |
| """Verify if a session token is valid.""" | |
| return session_token in VALID_SESSIONS if APP_PASSWORD else True | |
| # Load environment variables | |
| app = FastAPI( | |
| title="Medical Document Validator API", | |
| description="API for validating medical documents against predefined templates using LLM", | |
| version="1.0.0" | |
| ) | |
| # Mount static files directory | |
| static_path = Path(__file__).parent / "static" | |
| if static_path.exists(): | |
| app.mount("/static", StaticFiles(directory=str(static_path)), name="static") | |
| # Initialize validator | |
| validator = Validator() | |
| # Pydantic Models | |
| class TemplateSummary(BaseModel): | |
| """Template summary for listing available templates.""" | |
| template_key: str | |
| friendly_name: str | |
| class ElementReport(BaseModel): | |
| """Individual element validation report.""" | |
| id: str | |
| label: str | |
| required: bool | |
| is_present: bool | |
| reason: str | |
| class SpellCheckError(BaseModel): | |
| """Individual spelling error.""" | |
| word: str | |
| context: str | |
| suggestions: List[str] | |
| error_type: str # "spelling", "grammar", "formatting", "typo" | |
| confidence: float | |
| class SpellCheckReport(BaseModel): | |
| """Spell check report.""" | |
| total_errors: int | |
| errors: List[SpellCheckError] | |
| summary: str | |
| class LinkReport(BaseModel): | |
| """Link validation report.""" | |
| url: str | |
| status: str | |
| status_code: int | |
| message: str | |
| page: str | |
| class ValidationReport(BaseModel): | |
| """Complete validation report response.""" | |
| template_key: str | |
| status: str # "PASS" or "FAIL" | |
| summary: str | |
| elements_report: List[ElementReport] | |
| spell_check: Optional[SpellCheckReport] = None # Optional spell check results | |
| link_report: Optional[List[LinkReport]] = None # Optional link validation results | |
| class ComparisonChange(BaseModel): | |
| """Individual change detected in comparison.""" | |
| type: str # "addition", "deletion", "modification" | |
| section: Optional[str] = None # Section/area where change occurred | |
| description: str # Description of the change | |
| class ComparisonReport(BaseModel): | |
| """Document comparison report.""" | |
| summary: str # Natural language summary of changes | |
| changes: List[ComparisonChange] # Detailed list of changes | |
| file1_name: str | |
| file2_name: str | |
| class BulkValidationDetail(BaseModel): | |
| """Individual validation result for bulk certificate validation.""" | |
| name: str | |
| status: str # "exact_match", "fuzzy_match", "missing", "extra" | |
| certificate_file: Optional[str] = None | |
| similarity: Optional[int] = None # Percentage for fuzzy matches | |
| class BulkValidationResult(BaseModel): | |
| """Bulk certificate validation result.""" | |
| total_names: int | |
| total_certificates: int | |
| exact_matches: int | |
| fuzzy_matches: int | |
| missing: int | |
| extras: int | |
| details: List[BulkValidationDetail] | |
| class Project(BaseModel): | |
| """Project model.""" | |
| id: int | |
| name: str | |
| description: Optional[str] = "" | |
| created_at: str | |
| validation_count: int = 0 | |
| class ProjectCreate(BaseModel): | |
| """Project creation request.""" | |
| name: str | |
| description: str = "" | |
| class ValidationHistory(BaseModel): | |
| """Validation history item.""" | |
| id: int | |
| project_id: Optional[int] | |
| project_name: Optional[str] | |
| validation_type: str | |
| template_key: Optional[str] | |
| filename: str | |
| status: str | |
| created_at: str | |
| # API Endpoints | |
| # Login page HTML with JavaScript-based auth | |
| LOGIN_PAGE = """ | |
| <!DOCTYPE html> | |
| <html> | |
| <head> | |
| <title>Login - Medical Document Validator</title> | |
| <meta name="viewport" content="width=device-width, initial-scale=1.0"> | |
| <style> | |
| body { font-family: Arial, sans-serif; display: flex; justify-content: center; align-items: center; min-height: 100vh; margin: 0; background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); } | |
| .login-box { background: white; padding: 40px; border-radius: 12px; box-shadow: 0 10px 40px rgba(0,0,0,0.2); max-width: 400px; width: 90%; } | |
| h1 { margin: 0 0 30px 0; color: #333; text-align: center; font-size: 24px; } | |
| .form-group { margin-bottom: 20px; } | |
| label { display: block; margin-bottom: 8px; font-weight: 600; color: #555; } | |
| input[type="password"] { width: 100%; padding: 12px; border: 1px solid #ddd; border-radius: 6px; font-size: 16px; box-sizing: border-box; } | |
| button { width: 100%; padding: 14px; background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); color: white; border: none; border-radius: 6px; font-size: 16px; font-weight: 600; cursor: pointer; transition: transform 0.2s; } | |
| button:hover { transform: translateY(-2px); } | |
| .error { color: #dc3545; text-align: center; margin-bottom: 20px; padding: 10px; background: #f8d7da; border-radius: 6px; display: none; } | |
| .subtitle { color: #666; text-align: center; margin-bottom: 30px; } | |
| </style> | |
| </head> | |
| <body> | |
| <div class="login-box"> | |
| <h1>🔐 Medical Document Validator</h1> | |
| <p class="subtitle">Enter password to access the application</p> | |
| <div class="error" id="error">Invalid password. Please try again.</div> | |
| <div class="form-group"> | |
| <label for="password">Password:</label> | |
| <input type="password" id="password" placeholder="Enter password" autofocus> | |
| </div> | |
| <button id="loginBtn">Login</button> | |
| </div> | |
| <script> | |
| async function attemptLogin() { | |
| const password = document.getElementById('password').value; | |
| const errorDiv = document.getElementById('error'); | |
| try { | |
| const response = await fetch('/verify-password', { | |
| method: 'POST', | |
| headers: {'Content-Type': 'application/json'}, | |
| body: JSON.stringify({password: password}) | |
| }); | |
| const data = await response.json(); | |
| if (data.valid) { | |
| localStorage.setItem('auth_token', data.token); | |
| window.location.href = '/app?token=' + data.token; | |
| } else { | |
| errorDiv.style.display = 'block'; | |
| } | |
| } catch (e) { | |
| errorDiv.textContent = 'Connection error. Please try again.'; | |
| errorDiv.style.display = 'block'; | |
| } | |
| } | |
| document.getElementById('loginBtn').addEventListener('click', attemptLogin); | |
| document.getElementById('password').addEventListener('keypress', function(e) { | |
| if (e.key === 'Enter') attemptLogin(); | |
| }); | |
| </script> | |
| </body> | |
| </html> | |
| """ | |
| async def login_page(): | |
| """Show login page.""" | |
| if not APP_PASSWORD: | |
| return RedirectResponse(url="/app", status_code=302) | |
| return HTMLResponse(LOGIN_PAGE) | |
| async def verify_password(request: Request): | |
| """Verify password and return token.""" | |
| try: | |
| data = await request.json() | |
| password = data.get("password", "") | |
| if password == APP_PASSWORD: | |
| token = generate_session_token() | |
| VALID_SESSIONS.add(token) | |
| return {"valid": True, "token": token} | |
| else: | |
| return {"valid": False} | |
| except Exception as e: | |
| logger.error(f"Password verification error: {e}") | |
| return {"valid": False} | |
| async def app_page(token: str = None): | |
| """Serve the main HTML interface (password protected if APP_PASSWORD is set).""" | |
| # Check if password protection is enabled | |
| if APP_PASSWORD: | |
| if not token or not verify_session(token): | |
| return RedirectResponse(url="/login", status_code=302) | |
| html_path = Path(__file__).parent / "static" / "index.html" | |
| if html_path.exists(): | |
| return FileResponse(html_path) | |
| return HTMLResponse(""" | |
| <h1>Medical Document Validator</h1> | |
| <p>Static files not found. Please check installation.</p> | |
| """) | |
| async def root(): | |
| """Redirect to login or app.""" | |
| if APP_PASSWORD: | |
| return RedirectResponse(url="/login", status_code=302) | |
| return RedirectResponse(url="/app", status_code=302) | |
| # ==================== SHAREPOINT ENDPOINTS ==================== | |
| from app.sharepoint import SharePointConnector | |
| sharepoint = SharePointConnector() | |
| async def sharepoint_login(request: Request): | |
| """Start SharePoint OAuth flow.""" | |
| redirect_uri = str(request.url_for('sharepoint_callback')).replace('http:', 'https:') if 'huggingface.co' in str(request.base_url) else str(request.url_for('sharepoint_callback')) | |
| # Fix for localhost/dev | |
| if "localhost" in str(request.base_url) or "127.0.0.1" in str(request.base_url): | |
| redirect_uri = "http://localhost:8001/auth/sharepoint/callback" | |
| auth_url = sharepoint.get_auth_url(redirect_uri) | |
| return {"auth_url": auth_url} | |
| async def sharepoint_callback(code: str, request: Request): | |
| """Handle OAuth callback.""" | |
| # Reconstruct redirect_uri logic | |
| redirect_uri = str(request.url_for('sharepoint_callback')).replace('http:', 'https:') if 'huggingface.co' in str(request.base_url) else str(request.url_for('sharepoint_callback')) | |
| if "localhost" in str(request.base_url) or "127.0.0.1" in str(request.base_url): | |
| redirect_uri = "http://localhost:8001/auth/sharepoint/callback" | |
| try: | |
| # Get token | |
| result = sharepoint.acquire_token_by_code(code, redirect_uri) | |
| access_token = result.get("access_token") | |
| # Return simple HTML that saves token to localStorage and closes window | |
| html_content = f""" | |
| <html> | |
| <body> | |
| <h1>Authentication Successful!</h1> | |
| <p>You can close this window now.</p> | |
| <script> | |
| // Send token back to parent window if opened as popup | |
| if (window.opener) {{ | |
| window.opener.postMessage({{ type: 'SHAREPOINT_AUTH', token: '{access_token}' }}, '*'); | |
| window.close(); | |
| }} else {{ | |
| // Fallback if not a popup | |
| localStorage.setItem('sharepoint_token', '{access_token}'); | |
| window.location.href = '/app'; | |
| }} | |
| </script> | |
| </body> | |
| </html> | |
| """ | |
| return HTMLResponse(content=html_content) | |
| except Exception as e: | |
| return HTMLResponse(content=f"<h1>Authentication Failed</h1><p>{str(e)}</p>", status_code=400) | |
| async def list_drives(token: str = Query(..., description="SharePoint Access Token")): | |
| """List available drives (OneDrive + SharePoint sites).""" | |
| try: | |
| drives = sharepoint.get_drives(token) | |
| return drives | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def list_items( | |
| drive_id: str, | |
| folder_id: Optional[str] = None, | |
| token: str = Query(..., description="SharePoint Access Token") | |
| ): | |
| """List items in a specific drive/folder.""" | |
| try: | |
| items = sharepoint.list_items(token, drive_id, folder_id) | |
| return items | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| class SharePointDownload(BaseModel): | |
| drive_id: str | |
| file_ids: List[str] | |
| token: str | |
| project_id: Optional[int] = None | |
| async def download_and_validate(data: SharePointDownload): | |
| """Download files from SharePoint and validate them.""" | |
| try: | |
| results = [] | |
| for file_id in data.file_ids: | |
| # Download file content | |
| content = sharepoint.download_file(data.token, data.drive_id, file_id) | |
| # Since we don't know the exact filename easily here without relisting or passing it, | |
| # we might need to assume or fetch metadata. | |
| # For simplicity, let's assume specific operations or just return success for now. | |
| # Ideally, we should integrate this with validator. | |
| # TODO: Integrate with existing validator logic | |
| # This requires converting bytes to UploadFile-like object or modifying validator to accept bytes | |
| pass | |
| return {"features": "Download validated (stub)"} | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def get_templates(): | |
| """ | |
| Get list of all available templates. | |
| Returns: | |
| List of templates with template_key and friendly_name | |
| """ | |
| try: | |
| templates_data = load_templates() | |
| templates = [] | |
| for template in templates_data.get("templates", []): | |
| templates.append(TemplateSummary( | |
| template_key=template.get("template_key"), | |
| friendly_name=template.get("friendly_name") | |
| )) | |
| return templates | |
| except Exception as e: | |
| raise HTTPException( | |
| status_code=500, | |
| detail=f"Failed to load templates: {str(e)}" | |
| ) | |
| async def validate_document( | |
| file: UploadFile = File(..., description="Document file to validate (PDF, DOCX, or PPTX)"), | |
| template_key: str = Query(..., description="Template key to validate against"), | |
| check_spelling: bool = Query(False, description="Enable spell checking (ignores proper names)"), | |
| custom_prompt: Optional[str] = Query(None, description="Optional custom instructions to adapt validation") | |
| ): | |
| """ | |
| Validate a document against a specified template. | |
| Args: | |
| file: Uploaded document file (PDF, DOCX, or PPTX) | |
| template_key: Template key to validate against | |
| Returns: | |
| Validation report with status and element-by-element results | |
| Raises: | |
| 400: Bad request (invalid template, unsupported format) | |
| 422: Unprocessable entity (extraction or validation failure) | |
| 500: Internal server error | |
| """ | |
| # Validate template exists | |
| template = get_template(template_key) | |
| if not template: | |
| raise HTTPException( | |
| status_code=404, | |
| detail=f"Template not found: {template_key}. Use GET /templates to see available templates." | |
| ) | |
| # Validate file extension | |
| filename = file.filename or "" | |
| file_extension = Path(filename).suffix.lower() | |
| supported_extensions = [".pdf", ".docx", ".pptx"] | |
| if file_extension not in supported_extensions: | |
| raise HTTPException( | |
| status_code=400, | |
| detail=f"Unsupported file format: {file_extension}. Supported formats: {', '.join(supported_extensions)}" | |
| ) | |
| # Read file content | |
| try: | |
| file_content = await file.read() | |
| if not file_content: | |
| raise HTTPException( | |
| status_code=400, | |
| detail="Uploaded file is empty" | |
| ) | |
| except Exception as e: | |
| raise HTTPException( | |
| status_code=400, | |
| detail=f"Failed to read file: {str(e)}" | |
| ) | |
| # Perform validation | |
| try: | |
| validation_report = await validator.validate_document( | |
| file_content=file_content, | |
| file_extension=file_extension, | |
| template_key=template_key, | |
| custom_prompt=custom_prompt | |
| ) | |
| # Convert to Pydantic model for response validation | |
| elements_report = [ | |
| ElementReport(**elem) for elem in validation_report.get("elements_report", []) | |
| ] | |
| # Convert link report to Pydantic models | |
| link_report = [ | |
| LinkReport(**link) for link in validation_report.get("link_report", []) | |
| ] | |
| # Perform spell checking if requested | |
| spell_check_result = None | |
| if check_spelling: | |
| # Extract text from the document for spell checking | |
| from app.validator import extract_document_text | |
| try: | |
| document_text = extract_document_text(file_content, file_extension) | |
| spell_check_data = validator.check_spelling(document_text) | |
| # Convert to Pydantic model | |
| spell_errors = [ | |
| SpellCheckError(**error) for error in spell_check_data.get("errors", []) | |
| ] | |
| spell_check_result = SpellCheckReport( | |
| total_errors=spell_check_data.get("total_errors", 0), | |
| errors=spell_errors, | |
| summary=spell_check_data.get("summary", "") | |
| ) | |
| except Exception as e: | |
| logger.error(f"Spell check failed: {str(e)}") | |
| # Return empty spell check on error | |
| spell_check_result = SpellCheckReport( | |
| total_errors=0, | |
| errors=[], | |
| summary=f"Spell check error: {str(e)}" | |
| ) | |
| return ValidationReport( | |
| template_key=validation_report.get("template_key", template_key), | |
| status=validation_report.get("status", "FAIL"), | |
| summary=validation_report.get("summary", ""), | |
| elements_report=elements_report, | |
| spell_check=spell_check_result, | |
| link_report=link_report | |
| ) | |
| except ValueError as e: | |
| raise HTTPException( | |
| status_code=422, | |
| detail=f"Validation error: {str(e)}" | |
| ) | |
| except Exception as e: | |
| raise HTTPException( | |
| status_code=500, | |
| detail=f"Internal server error during validation: {str(e)}" | |
| ) | |
| async def validate_spelling_only( | |
| file: UploadFile = File(..., description="Document file to check spelling (PDF, DOCX, or PPTX)") | |
| ): | |
| """ | |
| Check spelling in a document without template validation. | |
| Args: | |
| file: Uploaded document file (PDF, DOCX, or PPTX) | |
| Returns: | |
| Spell check report only | |
| Raises: | |
| 400: Bad request (unsupported format) | |
| 422: Unprocessable entity (extraction failure) | |
| 500: Internal server error | |
| """ | |
| # Validate file extension | |
| filename = file.filename or "" | |
| file_extension = Path(filename).suffix.lower() | |
| supported_extensions = [".pdf", ".docx", ".pptx"] | |
| if file_extension not in supported_extensions: | |
| raise HTTPException( | |
| status_code=400, | |
| detail=f"Unsupported file format: {file_extension}. Supported formats: {', '.join(supported_extensions)}" | |
| ) | |
| # Read file content | |
| try: | |
| file_content = await file.read() | |
| if not file_content: | |
| raise HTTPException( | |
| status_code=400, | |
| detail="Uploaded file is empty" | |
| ) | |
| except Exception as e: | |
| raise HTTPException( | |
| status_code=400, | |
| detail=f"Failed to read file: {str(e)}" | |
| ) | |
| # Extract text and perform spell checking only | |
| try: | |
| from app.validator import extract_document_text | |
| # Extract text from document | |
| try: | |
| document_text = extract_document_text(file_content, file_extension) | |
| except Exception as e: | |
| raise HTTPException( | |
| status_code=422, | |
| detail=f"Failed to extract text from document: {str(e)}" | |
| ) | |
| # Perform spell checking | |
| spell_check_data = validator.check_spelling(document_text) | |
| # Convert to Pydantic model | |
| spell_errors = [ | |
| SpellCheckError(**error) for error in spell_check_data.get("errors", []) | |
| ] | |
| spell_check_result = SpellCheckReport( | |
| total_errors=spell_check_data.get("total_errors", 0), | |
| errors=spell_errors, | |
| summary=spell_check_data.get("summary", "") | |
| ) | |
| # Return spelling-only response | |
| return { | |
| "mode": "spelling_only", | |
| "spell_check": spell_check_result | |
| } | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| logger.error(f"Spell check failed: {str(e)}", exc_info=True) | |
| raise HTTPException( | |
| status_code=500, | |
| detail=f"Internal server error during spell checking: {str(e)}" | |
| ) | |
| async def health_check(): | |
| """Health check endpoint.""" | |
| return { | |
| "status": "healthy", | |
| "llm_api_key_configured": bool(os.getenv("LLM_API_KEY")) | |
| } | |
| async def debug_extract_images( | |
| file: UploadFile = File(..., description="Document file to extract images from"), | |
| template_key: str = Query(..., description="Template key to identify visual elements") | |
| ): | |
| """ | |
| Debug endpoint to extract and inspect images from a document. | |
| Returns detailed information about extracted images without performing validation. | |
| """ | |
| # Validate template exists | |
| template = get_template(template_key) | |
| if not template: | |
| raise HTTPException( | |
| status_code=404, | |
| detail=f"Template not found: {template_key}" | |
| ) | |
| # Validate file extension | |
| filename = file.filename or "" | |
| file_extension = Path(filename).suffix.lower() | |
| supported_extensions = [".pdf", ".docx", ".pptx"] | |
| if file_extension not in supported_extensions: | |
| raise HTTPException( | |
| status_code=400, | |
| detail=f"Unsupported file format: {file_extension}" | |
| ) | |
| # Read file content | |
| try: | |
| file_content = await file.read() | |
| if not file_content: | |
| raise HTTPException(status_code=400, detail="Uploaded file is empty") | |
| except Exception as e: | |
| raise HTTPException(status_code=400, detail=f"Failed to read file: {str(e)}") | |
| # Extract images | |
| try: | |
| with tempfile.TemporaryDirectory() as temp_dir: | |
| temp_path = Path(temp_dir) | |
| extracted_text, extracted_images = extract_images_from_document( | |
| file_content, | |
| file_extension, | |
| template.get("elements", []), | |
| temp_path | |
| ) | |
| # Get image details | |
| image_details = [] | |
| for img in extracted_images: | |
| img_path = Path(img.file_path) | |
| # Check if image data is in memory (new approach) | |
| if hasattr(img, '_image_bytes') and img._image_bytes: | |
| # Use image data from memory | |
| file_size = len(img._image_bytes) | |
| dimensions = "unknown" | |
| mode = "unknown" | |
| try: | |
| from PIL import Image as PILImage | |
| from io import BytesIO | |
| img_io = BytesIO(img._image_bytes) | |
| pil_img = PILImage.open(img_io) | |
| pil_img.load() | |
| dimensions = f"{pil_img.size[0]}x{pil_img.size[1]}" | |
| mode = pil_img.mode | |
| pil_img.close() | |
| img_io.close() | |
| except Exception as e: | |
| logger.warning(f"Could not read image from memory: {str(e)}") | |
| file_exists = True # Data exists in memory | |
| elif img_path.exists(): | |
| # Fallback: read from file | |
| try: | |
| file_size = img_path.stat().st_size | |
| except Exception: | |
| file_size = 0 | |
| dimensions = "unknown" | |
| mode = "unknown" | |
| # Try to get image dimensions, but handle file locking gracefully | |
| try: | |
| from PIL import Image as PILImage | |
| pil_img = None | |
| try: | |
| pil_img = PILImage.open(img_path) | |
| pil_img.load() # Load into memory | |
| dimensions = f"{pil_img.size[0]}x{pil_img.size[1]}" | |
| mode = pil_img.mode | |
| pil_img.close() # Close immediately | |
| pil_img = None | |
| except Exception as e: | |
| logger.warning(f"Could not read image {img_path}: {str(e)}") | |
| if pil_img: | |
| try: | |
| pil_img.close() | |
| except: | |
| pass | |
| except Exception: | |
| pass # PIL not available or other error | |
| file_exists = True | |
| else: | |
| file_size = 0 | |
| dimensions = "file not found" | |
| mode = "unknown" | |
| file_exists = False | |
| image_details.append({ | |
| "id": img.id, | |
| "file_path": img.file_path, | |
| "file_exists": file_exists, | |
| "file_size_bytes": file_size, | |
| "dimensions": dimensions, | |
| "image_mode": mode, | |
| "page_number": img.page_number, | |
| "role_hint": img.role_hint, | |
| "element_type": img.element_type, | |
| "stored_in_memory": hasattr(img, '_image_bytes') and img._image_bytes is not None | |
| }) | |
| # Get visual elements from template | |
| visual_elements = [ | |
| { | |
| "id": e.get("id"), | |
| "label": e.get("label"), | |
| "type": e.get("type"), | |
| "required": e.get("required", False), | |
| "logo_role": e.get("logo_role") if e.get("type") == "logo" else None | |
| } | |
| for e in template.get("elements", []) | |
| if e.get("type") in ["logo", "signature_block", "qr_code_or_image"] | |
| ] | |
| return { | |
| "file_name": filename, | |
| "file_extension": file_extension, | |
| "file_size_bytes": len(file_content), | |
| "text_extracted": len(extracted_text) > 0, | |
| "text_length": len(extracted_text), | |
| "images_found": len(extracted_images), | |
| "images": image_details, | |
| "template_visual_elements": visual_elements, | |
| "template_requires_visual_elements": len(visual_elements) > 0 | |
| } | |
| except Exception as e: | |
| logger.error(f"Debug image extraction failed: {str(e)}", exc_info=True) | |
| raise HTTPException( | |
| status_code=500, | |
| detail=f"Image extraction failed: {str(e)}" | |
| ) | |
| async def compare_documents( | |
| file1: UploadFile = File(..., description="First document (original version)"), | |
| file2: UploadFile = File(..., description="Second document (modified version)") | |
| ): | |
| """ | |
| Compare two document versions using LLM to identify changes. | |
| Args: | |
| file1: Original document | |
| file2: Modified document | |
| Returns: | |
| Comparison report with summary and detailed changes | |
| """ | |
| # Validate file extensions | |
| filename1 = file1.filename or "" | |
| filename2 = file2.filename or "" | |
| ext1 = Path(filename1).suffix.lower() | |
| ext2 = Path(filename2).suffix.lower() | |
| supported_extensions = [".pdf", ".docx", ".pptx"] | |
| if ext1 not in supported_extensions or ext2 not in supported_extensions: | |
| raise HTTPException( | |
| status_code=400, | |
| detail=f"Unsupported file format. Supported: {', '.join(supported_extensions)}" | |
| ) | |
| # Read file contents | |
| try: | |
| content1 = await file1.read() | |
| content2 = await file2.read() | |
| if not content1 or not content2: | |
| raise HTTPException(status_code=400, detail="One or both files are empty") | |
| except Exception as e: | |
| raise HTTPException(status_code=400, detail=f"Failed to read files: {str(e)}") | |
| # Perform comparison using validator | |
| try: | |
| comparison_result = await validator.compare_documents( | |
| file1_content=content1, | |
| file1_extension=ext1, | |
| file1_name=filename1, | |
| file2_content=content2, | |
| file2_extension=ext2, | |
| file2_name=filename2 | |
| ) | |
| # Convert to Pydantic models | |
| changes = [ | |
| ComparisonChange(**change) for change in comparison_result.get("changes", []) | |
| ] | |
| return ComparisonReport( | |
| summary=comparison_result.get("summary", "No summary available"), | |
| changes=changes, | |
| file1_name=filename1, | |
| file2_name=filename2 | |
| ) | |
| except Exception as e: | |
| logger.error(f"Comparison failed: {str(e)}", exc_info=True) | |
| raise HTTPException( | |
| status_code=500, | |
| detail=f"Comparison failed: {str(e)}" | |
| ) | |
| async def get_excel_columns(file: UploadFile = File(...)): | |
| """ | |
| Extract column headers from an Excel file. | |
| Args: | |
| file: Excel file (.xlsx) | |
| Returns: | |
| List of column names and row count | |
| """ | |
| try: | |
| import openpyxl | |
| from io import BytesIO | |
| content = await file.read() | |
| wb = openpyxl.load_workbook(BytesIO(content)) | |
| ws = wb.active | |
| # Get first row as headers | |
| headers = [] | |
| for cell in ws[1]: | |
| if cell.value: | |
| headers.append(str(cell.value)) | |
| row_count = ws.max_row - 1 # Exclude header row | |
| return { | |
| "columns": headers, | |
| "row_count": row_count | |
| } | |
| except Exception as e: | |
| raise HTTPException( | |
| status_code=400, | |
| detail=f"Failed to parse Excel file: {str(e)}" | |
| ) | |
| async def bulk_validate_certificates( | |
| excel_file: UploadFile = File(..., description="Excel file with names"), | |
| name_column: str = Form(..., description="Column name containing names"), | |
| certificate_files: List[UploadFile] = File(..., description="Certificate files (max 150)") | |
| ): | |
| """ | |
| Validate multiple certificates against an Excel list of names. | |
| Args: | |
| excel_file: Excel file with attendee names | |
| name_column: Column containing the names | |
| certificate_files: List of certificate files to validate | |
| Returns: | |
| Bulk validation results with matches, missing, and extras | |
| """ | |
| if len(certificate_files) > 150: | |
| raise HTTPException( | |
| status_code=400, | |
| detail="Maximum 150 certificates allowed" | |
| ) | |
| try: | |
| # Read Excel file | |
| excel_content = await excel_file.read() | |
| # Read all certificate files | |
| cert_data = [] | |
| for cert_file in certificate_files: | |
| content = await cert_file.read() | |
| filename = cert_file.filename or "unknown" | |
| ext = Path(filename).suffix.lower() | |
| cert_data.append((filename, content, ext)) | |
| # Call validator | |
| result = await validator.bulk_validate_certificates( | |
| excel_content=excel_content, | |
| name_column=name_column, | |
| certificate_data=cert_data | |
| ) | |
| # Convert to Pydantic models | |
| details = [ | |
| BulkValidationDetail(**detail) for detail in result.get("details", []) | |
| ] | |
| return BulkValidationResult( | |
| total_names=result.get("total_names", 0), | |
| total_certificates=result.get("total_certificates", 0), | |
| exact_matches=result.get("exact_matches", 0), | |
| fuzzy_matches=result.get("fuzzy_matches", 0), | |
| missing=result.get("missing", 0), | |
| extras=result.get("extras", 0), | |
| details=details | |
| ) | |
| except Exception as e: | |
| logger.error(f"Bulk validation failed: {str(e)}", exc_info=True) | |
| raise HTTPException( | |
| status_code=500, | |
| detail=f"Bulk validation failed: {str(e)}" | |
| ) | |
| # ==================== PROJECTS ENDPOINTS ==================== | |
| async def list_projects(): | |
| """List all projects.""" | |
| try: | |
| projects = db.list_projects() | |
| return projects | |
| except Exception as e: | |
| logger.error(f"Failed to list projects: {str(e)}", exc_info=True) | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def create_project(project: ProjectCreate): | |
| """Create a new project.""" | |
| try: | |
| project_id = db.create_project(project.name, project.description) | |
| created_project = db.get_project(project_id) | |
| return created_project | |
| except ValueError as e: | |
| raise HTTPException(status_code=400, detail=str(e)) | |
| except Exception as e: | |
| logger.error(f"Failed to create project: {str(e)}", exc_info=True) | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def get_project(project_id: int): | |
| """Get a specific project.""" | |
| try: | |
| project = db.get_project(project_id) | |
| if not project: | |
| raise HTTPException(status_code=404, detail="Project not found") | |
| return project | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| logger.error(f"Failed to get project: {str(e)}", exc_info=True) | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def delete_project(project_id: int): | |
| """Delete a project.""" | |
| try: | |
| deleted = db.delete_project(project_id) | |
| if not deleted: | |
| raise HTTPException(status_code=404, detail="Project not found") | |
| return {"message": "Project deleted successfully"} | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| logger.error(f"Failed to delete project: {str(e)}", exc_info=True) | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def get_project_validations(project_id: int): | |
| """Get all validations for a project.""" | |
| try: | |
| validations = db.get_project_validations(project_id) | |
| return validations | |
| except Exception as e: | |
| logger.error(f"Failed to get project validations: {str(e)}", exc_info=True) | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def get_recent_validations(limit: int = Query(50, ge=1, le=200)): | |
| """Get recent validations across all projects.""" | |
| try: | |
| validations = db.get_recent_validations(limit) | |
| return validations | |
| except Exception as e: | |
| logger.error(f"Failed to get recent validations: {str(e)}", exc_info=True) | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| if __name__ == "__main__": | |
| import uvicorn | |
| uvicorn.run("app.main:app", host="0.0.0.0", port=8000, reload=True) | |