saifisvibin's picture
Professional UI redesign + Itinerary Template + SharePoint integration
5c56bc9
"""
FastAPI application for Medical Document Validator.
To run this application:
1. Set LLM_API_KEY in .env file (get your API key from Anthropic)
2. Set APP_PASSWORD in .env file for access control
3. Install dependencies: pip install -r requirements.txt
4. Run the server: uvicorn app.main:app --reload
"""
from fastapi import FastAPI, File, UploadFile, HTTPException, Query, Form, Request, Response, Cookie
from fastapi.responses import JSONResponse, HTMLResponse, FileResponse, RedirectResponse
from fastapi.staticfiles import StaticFiles
from pydantic import BaseModel
from typing import List, Optional, Dict, Any
import os
import tempfile
import logging
import hashlib
import secrets
from pathlib import Path
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler() # Console output
]
)
logger = logging.getLogger(__name__)
from app.validator import Validator, load_templates, get_template, extract_images_from_document
from app.database import db
# Password protection setup
APP_PASSWORD = os.environ.get("APP_PASSWORD", "")
VALID_SESSIONS = set() # Store valid session tokens
def generate_session_token():
"""Generate a secure session token."""
return secrets.token_urlsafe(32)
def verify_session(session_token: str) -> bool:
"""Verify if a session token is valid."""
return session_token in VALID_SESSIONS if APP_PASSWORD else True
# Load environment variables
app = FastAPI(
title="Medical Document Validator API",
description="API for validating medical documents against predefined templates using LLM",
version="1.0.0"
)
# Mount static files directory
static_path = Path(__file__).parent / "static"
if static_path.exists():
app.mount("/static", StaticFiles(directory=str(static_path)), name="static")
# Initialize validator
validator = Validator()
# Pydantic Models
class TemplateSummary(BaseModel):
"""Template summary for listing available templates."""
template_key: str
friendly_name: str
class ElementReport(BaseModel):
"""Individual element validation report."""
id: str
label: str
required: bool
is_present: bool
reason: str
class SpellCheckError(BaseModel):
"""Individual spelling error."""
word: str
context: str
suggestions: List[str]
error_type: str # "spelling", "grammar", "formatting", "typo"
confidence: float
class SpellCheckReport(BaseModel):
"""Spell check report."""
total_errors: int
errors: List[SpellCheckError]
summary: str
class LinkReport(BaseModel):
"""Link validation report."""
url: str
status: str
status_code: int
message: str
page: str
class ValidationReport(BaseModel):
"""Complete validation report response."""
template_key: str
status: str # "PASS" or "FAIL"
summary: str
elements_report: List[ElementReport]
spell_check: Optional[SpellCheckReport] = None # Optional spell check results
link_report: Optional[List[LinkReport]] = None # Optional link validation results
class ComparisonChange(BaseModel):
"""Individual change detected in comparison."""
type: str # "addition", "deletion", "modification"
section: Optional[str] = None # Section/area where change occurred
description: str # Description of the change
class ComparisonReport(BaseModel):
"""Document comparison report."""
summary: str # Natural language summary of changes
changes: List[ComparisonChange] # Detailed list of changes
file1_name: str
file2_name: str
class BulkValidationDetail(BaseModel):
"""Individual validation result for bulk certificate validation."""
name: str
status: str # "exact_match", "fuzzy_match", "missing", "extra"
certificate_file: Optional[str] = None
similarity: Optional[int] = None # Percentage for fuzzy matches
class BulkValidationResult(BaseModel):
"""Bulk certificate validation result."""
total_names: int
total_certificates: int
exact_matches: int
fuzzy_matches: int
missing: int
extras: int
details: List[BulkValidationDetail]
class Project(BaseModel):
"""Project model."""
id: int
name: str
description: Optional[str] = ""
created_at: str
validation_count: int = 0
class ProjectCreate(BaseModel):
"""Project creation request."""
name: str
description: str = ""
class ValidationHistory(BaseModel):
"""Validation history item."""
id: int
project_id: Optional[int]
project_name: Optional[str]
validation_type: str
template_key: Optional[str]
filename: str
status: str
created_at: str
# API Endpoints
# Login page HTML with JavaScript-based auth
LOGIN_PAGE = """
<!DOCTYPE html>
<html>
<head>
<title>Login - Medical Document Validator</title>
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<style>
body { font-family: Arial, sans-serif; display: flex; justify-content: center; align-items: center; min-height: 100vh; margin: 0; background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); }
.login-box { background: white; padding: 40px; border-radius: 12px; box-shadow: 0 10px 40px rgba(0,0,0,0.2); max-width: 400px; width: 90%; }
h1 { margin: 0 0 30px 0; color: #333; text-align: center; font-size: 24px; }
.form-group { margin-bottom: 20px; }
label { display: block; margin-bottom: 8px; font-weight: 600; color: #555; }
input[type="password"] { width: 100%; padding: 12px; border: 1px solid #ddd; border-radius: 6px; font-size: 16px; box-sizing: border-box; }
button { width: 100%; padding: 14px; background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); color: white; border: none; border-radius: 6px; font-size: 16px; font-weight: 600; cursor: pointer; transition: transform 0.2s; }
button:hover { transform: translateY(-2px); }
.error { color: #dc3545; text-align: center; margin-bottom: 20px; padding: 10px; background: #f8d7da; border-radius: 6px; display: none; }
.subtitle { color: #666; text-align: center; margin-bottom: 30px; }
</style>
</head>
<body>
<div class="login-box">
<h1>🔐 Medical Document Validator</h1>
<p class="subtitle">Enter password to access the application</p>
<div class="error" id="error">Invalid password. Please try again.</div>
<div class="form-group">
<label for="password">Password:</label>
<input type="password" id="password" placeholder="Enter password" autofocus>
</div>
<button id="loginBtn">Login</button>
</div>
<script>
async function attemptLogin() {
const password = document.getElementById('password').value;
const errorDiv = document.getElementById('error');
try {
const response = await fetch('/verify-password', {
method: 'POST',
headers: {'Content-Type': 'application/json'},
body: JSON.stringify({password: password})
});
const data = await response.json();
if (data.valid) {
localStorage.setItem('auth_token', data.token);
window.location.href = '/app?token=' + data.token;
} else {
errorDiv.style.display = 'block';
}
} catch (e) {
errorDiv.textContent = 'Connection error. Please try again.';
errorDiv.style.display = 'block';
}
}
document.getElementById('loginBtn').addEventListener('click', attemptLogin);
document.getElementById('password').addEventListener('keypress', function(e) {
if (e.key === 'Enter') attemptLogin();
});
</script>
</body>
</html>
"""
@app.get("/login", response_class=HTMLResponse, tags=["Auth"])
async def login_page():
"""Show login page."""
if not APP_PASSWORD:
return RedirectResponse(url="/app", status_code=302)
return HTMLResponse(LOGIN_PAGE)
@app.post("/verify-password", tags=["Auth"])
async def verify_password(request: Request):
"""Verify password and return token."""
try:
data = await request.json()
password = data.get("password", "")
if password == APP_PASSWORD:
token = generate_session_token()
VALID_SESSIONS.add(token)
return {"valid": True, "token": token}
else:
return {"valid": False}
except Exception as e:
logger.error(f"Password verification error: {e}")
return {"valid": False}
@app.get("/app", response_class=HTMLResponse, tags=["Root"])
async def app_page(token: str = None):
"""Serve the main HTML interface (password protected if APP_PASSWORD is set)."""
# Check if password protection is enabled
if APP_PASSWORD:
if not token or not verify_session(token):
return RedirectResponse(url="/login", status_code=302)
html_path = Path(__file__).parent / "static" / "index.html"
if html_path.exists():
return FileResponse(html_path)
return HTMLResponse("""
<h1>Medical Document Validator</h1>
<p>Static files not found. Please check installation.</p>
""")
@app.get("/", response_class=HTMLResponse, tags=["Root"])
async def root():
"""Redirect to login or app."""
if APP_PASSWORD:
return RedirectResponse(url="/login", status_code=302)
return RedirectResponse(url="/app", status_code=302)
# ==================== SHAREPOINT ENDPOINTS ====================
from app.sharepoint import SharePointConnector
sharepoint = SharePointConnector()
@app.get("/auth/sharepoint/login", tags=["SharePoint"])
async def sharepoint_login(request: Request):
"""Start SharePoint OAuth flow."""
redirect_uri = str(request.url_for('sharepoint_callback')).replace('http:', 'https:') if 'huggingface.co' in str(request.base_url) else str(request.url_for('sharepoint_callback'))
# Fix for localhost/dev
if "localhost" in str(request.base_url) or "127.0.0.1" in str(request.base_url):
redirect_uri = "http://localhost:8001/auth/sharepoint/callback"
auth_url = sharepoint.get_auth_url(redirect_uri)
return {"auth_url": auth_url}
@app.get("/auth/sharepoint/callback", tags=["SharePoint"])
async def sharepoint_callback(code: str, request: Request):
"""Handle OAuth callback."""
# Reconstruct redirect_uri logic
redirect_uri = str(request.url_for('sharepoint_callback')).replace('http:', 'https:') if 'huggingface.co' in str(request.base_url) else str(request.url_for('sharepoint_callback'))
if "localhost" in str(request.base_url) or "127.0.0.1" in str(request.base_url):
redirect_uri = "http://localhost:8001/auth/sharepoint/callback"
try:
# Get token
result = sharepoint.acquire_token_by_code(code, redirect_uri)
access_token = result.get("access_token")
# Return simple HTML that saves token to localStorage and closes window
html_content = f"""
<html>
<body>
<h1>Authentication Successful!</h1>
<p>You can close this window now.</p>
<script>
// Send token back to parent window if opened as popup
if (window.opener) {{
window.opener.postMessage({{ type: 'SHAREPOINT_AUTH', token: '{access_token}' }}, '*');
window.close();
}} else {{
// Fallback if not a popup
localStorage.setItem('sharepoint_token', '{access_token}');
window.location.href = '/app';
}}
</script>
</body>
</html>
"""
return HTMLResponse(content=html_content)
except Exception as e:
return HTMLResponse(content=f"<h1>Authentication Failed</h1><p>{str(e)}</p>", status_code=400)
@app.get("/sharepoint/drives", tags=["SharePoint"])
async def list_drives(token: str = Query(..., description="SharePoint Access Token")):
"""List available drives (OneDrive + SharePoint sites)."""
try:
drives = sharepoint.get_drives(token)
return drives
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/sharepoint/items", tags=["SharePoint"])
async def list_items(
drive_id: str,
folder_id: Optional[str] = None,
token: str = Query(..., description="SharePoint Access Token")
):
"""List items in a specific drive/folder."""
try:
items = sharepoint.list_items(token, drive_id, folder_id)
return items
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
class SharePointDownload(BaseModel):
drive_id: str
file_ids: List[str]
token: str
project_id: Optional[int] = None
@app.post("/sharepoint/download-and-validate", tags=["SharePoint"])
async def download_and_validate(data: SharePointDownload):
"""Download files from SharePoint and validate them."""
try:
results = []
for file_id in data.file_ids:
# Download file content
content = sharepoint.download_file(data.token, data.drive_id, file_id)
# Since we don't know the exact filename easily here without relisting or passing it,
# we might need to assume or fetch metadata.
# For simplicity, let's assume specific operations or just return success for now.
# Ideally, we should integrate this with validator.
# TODO: Integrate with existing validator logic
# This requires converting bytes to UploadFile-like object or modifying validator to accept bytes
pass
return {"features": "Download validated (stub)"}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/templates", response_model=List[TemplateSummary], tags=["Templates"])
async def get_templates():
"""
Get list of all available templates.
Returns:
List of templates with template_key and friendly_name
"""
try:
templates_data = load_templates()
templates = []
for template in templates_data.get("templates", []):
templates.append(TemplateSummary(
template_key=template.get("template_key"),
friendly_name=template.get("friendly_name")
))
return templates
except Exception as e:
raise HTTPException(
status_code=500,
detail=f"Failed to load templates: {str(e)}"
)
@app.post("/validate", response_model=ValidationReport, tags=["Validation"])
async def validate_document(
file: UploadFile = File(..., description="Document file to validate (PDF, DOCX, or PPTX)"),
template_key: str = Query(..., description="Template key to validate against"),
check_spelling: bool = Query(False, description="Enable spell checking (ignores proper names)"),
custom_prompt: Optional[str] = Query(None, description="Optional custom instructions to adapt validation")
):
"""
Validate a document against a specified template.
Args:
file: Uploaded document file (PDF, DOCX, or PPTX)
template_key: Template key to validate against
Returns:
Validation report with status and element-by-element results
Raises:
400: Bad request (invalid template, unsupported format)
422: Unprocessable entity (extraction or validation failure)
500: Internal server error
"""
# Validate template exists
template = get_template(template_key)
if not template:
raise HTTPException(
status_code=404,
detail=f"Template not found: {template_key}. Use GET /templates to see available templates."
)
# Validate file extension
filename = file.filename or ""
file_extension = Path(filename).suffix.lower()
supported_extensions = [".pdf", ".docx", ".pptx"]
if file_extension not in supported_extensions:
raise HTTPException(
status_code=400,
detail=f"Unsupported file format: {file_extension}. Supported formats: {', '.join(supported_extensions)}"
)
# Read file content
try:
file_content = await file.read()
if not file_content:
raise HTTPException(
status_code=400,
detail="Uploaded file is empty"
)
except Exception as e:
raise HTTPException(
status_code=400,
detail=f"Failed to read file: {str(e)}"
)
# Perform validation
try:
validation_report = await validator.validate_document(
file_content=file_content,
file_extension=file_extension,
template_key=template_key,
custom_prompt=custom_prompt
)
# Convert to Pydantic model for response validation
elements_report = [
ElementReport(**elem) for elem in validation_report.get("elements_report", [])
]
# Convert link report to Pydantic models
link_report = [
LinkReport(**link) for link in validation_report.get("link_report", [])
]
# Perform spell checking if requested
spell_check_result = None
if check_spelling:
# Extract text from the document for spell checking
from app.validator import extract_document_text
try:
document_text = extract_document_text(file_content, file_extension)
spell_check_data = validator.check_spelling(document_text)
# Convert to Pydantic model
spell_errors = [
SpellCheckError(**error) for error in spell_check_data.get("errors", [])
]
spell_check_result = SpellCheckReport(
total_errors=spell_check_data.get("total_errors", 0),
errors=spell_errors,
summary=spell_check_data.get("summary", "")
)
except Exception as e:
logger.error(f"Spell check failed: {str(e)}")
# Return empty spell check on error
spell_check_result = SpellCheckReport(
total_errors=0,
errors=[],
summary=f"Spell check error: {str(e)}"
)
return ValidationReport(
template_key=validation_report.get("template_key", template_key),
status=validation_report.get("status", "FAIL"),
summary=validation_report.get("summary", ""),
elements_report=elements_report,
spell_check=spell_check_result,
link_report=link_report
)
except ValueError as e:
raise HTTPException(
status_code=422,
detail=f"Validation error: {str(e)}"
)
except Exception as e:
raise HTTPException(
status_code=500,
detail=f"Internal server error during validation: {str(e)}"
)
@app.post("/validate/spelling-only", tags=["Validation"])
async def validate_spelling_only(
file: UploadFile = File(..., description="Document file to check spelling (PDF, DOCX, or PPTX)")
):
"""
Check spelling in a document without template validation.
Args:
file: Uploaded document file (PDF, DOCX, or PPTX)
Returns:
Spell check report only
Raises:
400: Bad request (unsupported format)
422: Unprocessable entity (extraction failure)
500: Internal server error
"""
# Validate file extension
filename = file.filename or ""
file_extension = Path(filename).suffix.lower()
supported_extensions = [".pdf", ".docx", ".pptx"]
if file_extension not in supported_extensions:
raise HTTPException(
status_code=400,
detail=f"Unsupported file format: {file_extension}. Supported formats: {', '.join(supported_extensions)}"
)
# Read file content
try:
file_content = await file.read()
if not file_content:
raise HTTPException(
status_code=400,
detail="Uploaded file is empty"
)
except Exception as e:
raise HTTPException(
status_code=400,
detail=f"Failed to read file: {str(e)}"
)
# Extract text and perform spell checking only
try:
from app.validator import extract_document_text
# Extract text from document
try:
document_text = extract_document_text(file_content, file_extension)
except Exception as e:
raise HTTPException(
status_code=422,
detail=f"Failed to extract text from document: {str(e)}"
)
# Perform spell checking
spell_check_data = validator.check_spelling(document_text)
# Convert to Pydantic model
spell_errors = [
SpellCheckError(**error) for error in spell_check_data.get("errors", [])
]
spell_check_result = SpellCheckReport(
total_errors=spell_check_data.get("total_errors", 0),
errors=spell_errors,
summary=spell_check_data.get("summary", "")
)
# Return spelling-only response
return {
"mode": "spelling_only",
"spell_check": spell_check_result
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Spell check failed: {str(e)}", exc_info=True)
raise HTTPException(
status_code=500,
detail=f"Internal server error during spell checking: {str(e)}"
)
@app.get("/health", tags=["Health"])
async def health_check():
"""Health check endpoint."""
return {
"status": "healthy",
"llm_api_key_configured": bool(os.getenv("LLM_API_KEY"))
}
@app.post("/debug/extract-images", tags=["Debug"])
async def debug_extract_images(
file: UploadFile = File(..., description="Document file to extract images from"),
template_key: str = Query(..., description="Template key to identify visual elements")
):
"""
Debug endpoint to extract and inspect images from a document.
Returns detailed information about extracted images without performing validation.
"""
# Validate template exists
template = get_template(template_key)
if not template:
raise HTTPException(
status_code=404,
detail=f"Template not found: {template_key}"
)
# Validate file extension
filename = file.filename or ""
file_extension = Path(filename).suffix.lower()
supported_extensions = [".pdf", ".docx", ".pptx"]
if file_extension not in supported_extensions:
raise HTTPException(
status_code=400,
detail=f"Unsupported file format: {file_extension}"
)
# Read file content
try:
file_content = await file.read()
if not file_content:
raise HTTPException(status_code=400, detail="Uploaded file is empty")
except Exception as e:
raise HTTPException(status_code=400, detail=f"Failed to read file: {str(e)}")
# Extract images
try:
with tempfile.TemporaryDirectory() as temp_dir:
temp_path = Path(temp_dir)
extracted_text, extracted_images = extract_images_from_document(
file_content,
file_extension,
template.get("elements", []),
temp_path
)
# Get image details
image_details = []
for img in extracted_images:
img_path = Path(img.file_path)
# Check if image data is in memory (new approach)
if hasattr(img, '_image_bytes') and img._image_bytes:
# Use image data from memory
file_size = len(img._image_bytes)
dimensions = "unknown"
mode = "unknown"
try:
from PIL import Image as PILImage
from io import BytesIO
img_io = BytesIO(img._image_bytes)
pil_img = PILImage.open(img_io)
pil_img.load()
dimensions = f"{pil_img.size[0]}x{pil_img.size[1]}"
mode = pil_img.mode
pil_img.close()
img_io.close()
except Exception as e:
logger.warning(f"Could not read image from memory: {str(e)}")
file_exists = True # Data exists in memory
elif img_path.exists():
# Fallback: read from file
try:
file_size = img_path.stat().st_size
except Exception:
file_size = 0
dimensions = "unknown"
mode = "unknown"
# Try to get image dimensions, but handle file locking gracefully
try:
from PIL import Image as PILImage
pil_img = None
try:
pil_img = PILImage.open(img_path)
pil_img.load() # Load into memory
dimensions = f"{pil_img.size[0]}x{pil_img.size[1]}"
mode = pil_img.mode
pil_img.close() # Close immediately
pil_img = None
except Exception as e:
logger.warning(f"Could not read image {img_path}: {str(e)}")
if pil_img:
try:
pil_img.close()
except:
pass
except Exception:
pass # PIL not available or other error
file_exists = True
else:
file_size = 0
dimensions = "file not found"
mode = "unknown"
file_exists = False
image_details.append({
"id": img.id,
"file_path": img.file_path,
"file_exists": file_exists,
"file_size_bytes": file_size,
"dimensions": dimensions,
"image_mode": mode,
"page_number": img.page_number,
"role_hint": img.role_hint,
"element_type": img.element_type,
"stored_in_memory": hasattr(img, '_image_bytes') and img._image_bytes is not None
})
# Get visual elements from template
visual_elements = [
{
"id": e.get("id"),
"label": e.get("label"),
"type": e.get("type"),
"required": e.get("required", False),
"logo_role": e.get("logo_role") if e.get("type") == "logo" else None
}
for e in template.get("elements", [])
if e.get("type") in ["logo", "signature_block", "qr_code_or_image"]
]
return {
"file_name": filename,
"file_extension": file_extension,
"file_size_bytes": len(file_content),
"text_extracted": len(extracted_text) > 0,
"text_length": len(extracted_text),
"images_found": len(extracted_images),
"images": image_details,
"template_visual_elements": visual_elements,
"template_requires_visual_elements": len(visual_elements) > 0
}
except Exception as e:
logger.error(f"Debug image extraction failed: {str(e)}", exc_info=True)
raise HTTPException(
status_code=500,
detail=f"Image extraction failed: {str(e)}"
)
@app.post("/compare", response_model=ComparisonReport, tags=["Comparison"])
async def compare_documents(
file1: UploadFile = File(..., description="First document (original version)"),
file2: UploadFile = File(..., description="Second document (modified version)")
):
"""
Compare two document versions using LLM to identify changes.
Args:
file1: Original document
file2: Modified document
Returns:
Comparison report with summary and detailed changes
"""
# Validate file extensions
filename1 = file1.filename or ""
filename2 = file2.filename or ""
ext1 = Path(filename1).suffix.lower()
ext2 = Path(filename2).suffix.lower()
supported_extensions = [".pdf", ".docx", ".pptx"]
if ext1 not in supported_extensions or ext2 not in supported_extensions:
raise HTTPException(
status_code=400,
detail=f"Unsupported file format. Supported: {', '.join(supported_extensions)}"
)
# Read file contents
try:
content1 = await file1.read()
content2 = await file2.read()
if not content1 or not content2:
raise HTTPException(status_code=400, detail="One or both files are empty")
except Exception as e:
raise HTTPException(status_code=400, detail=f"Failed to read files: {str(e)}")
# Perform comparison using validator
try:
comparison_result = await validator.compare_documents(
file1_content=content1,
file1_extension=ext1,
file1_name=filename1,
file2_content=content2,
file2_extension=ext2,
file2_name=filename2
)
# Convert to Pydantic models
changes = [
ComparisonChange(**change) for change in comparison_result.get("changes", [])
]
return ComparisonReport(
summary=comparison_result.get("summary", "No summary available"),
changes=changes,
file1_name=filename1,
file2_name=filename2
)
except Exception as e:
logger.error(f"Comparison failed: {str(e)}", exc_info=True)
raise HTTPException(
status_code=500,
detail=f"Comparison failed: {str(e)}"
)
@app.post("/excel-columns", tags=["Bulk Validation"])
async def get_excel_columns(file: UploadFile = File(...)):
"""
Extract column headers from an Excel file.
Args:
file: Excel file (.xlsx)
Returns:
List of column names and row count
"""
try:
import openpyxl
from io import BytesIO
content = await file.read()
wb = openpyxl.load_workbook(BytesIO(content))
ws = wb.active
# Get first row as headers
headers = []
for cell in ws[1]:
if cell.value:
headers.append(str(cell.value))
row_count = ws.max_row - 1 # Exclude header row
return {
"columns": headers,
"row_count": row_count
}
except Exception as e:
raise HTTPException(
status_code=400,
detail=f"Failed to parse Excel file: {str(e)}"
)
@app.post("/bulk-validate", response_model=BulkValidationResult, tags=["Bulk Validation"])
async def bulk_validate_certificates(
excel_file: UploadFile = File(..., description="Excel file with names"),
name_column: str = Form(..., description="Column name containing names"),
certificate_files: List[UploadFile] = File(..., description="Certificate files (max 150)")
):
"""
Validate multiple certificates against an Excel list of names.
Args:
excel_file: Excel file with attendee names
name_column: Column containing the names
certificate_files: List of certificate files to validate
Returns:
Bulk validation results with matches, missing, and extras
"""
if len(certificate_files) > 150:
raise HTTPException(
status_code=400,
detail="Maximum 150 certificates allowed"
)
try:
# Read Excel file
excel_content = await excel_file.read()
# Read all certificate files
cert_data = []
for cert_file in certificate_files:
content = await cert_file.read()
filename = cert_file.filename or "unknown"
ext = Path(filename).suffix.lower()
cert_data.append((filename, content, ext))
# Call validator
result = await validator.bulk_validate_certificates(
excel_content=excel_content,
name_column=name_column,
certificate_data=cert_data
)
# Convert to Pydantic models
details = [
BulkValidationDetail(**detail) for detail in result.get("details", [])
]
return BulkValidationResult(
total_names=result.get("total_names", 0),
total_certificates=result.get("total_certificates", 0),
exact_matches=result.get("exact_matches", 0),
fuzzy_matches=result.get("fuzzy_matches", 0),
missing=result.get("missing", 0),
extras=result.get("extras", 0),
details=details
)
except Exception as e:
logger.error(f"Bulk validation failed: {str(e)}", exc_info=True)
raise HTTPException(
status_code=500,
detail=f"Bulk validation failed: {str(e)}"
)
# ==================== PROJECTS ENDPOINTS ====================
@app.get("/projects", response_model=List[Project], tags=["Projects"])
async def list_projects():
"""List all projects."""
try:
projects = db.list_projects()
return projects
except Exception as e:
logger.error(f"Failed to list projects: {str(e)}", exc_info=True)
raise HTTPException(status_code=500, detail=str(e))
@app.post("/projects", response_model=Project, tags=["Projects"])
async def create_project(project: ProjectCreate):
"""Create a new project."""
try:
project_id = db.create_project(project.name, project.description)
created_project = db.get_project(project_id)
return created_project
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
except Exception as e:
logger.error(f"Failed to create project: {str(e)}", exc_info=True)
raise HTTPException(status_code=500, detail=str(e))
@app.get("/projects/{project_id}", response_model=Project, tags=["Projects"])
async def get_project(project_id: int):
"""Get a specific project."""
try:
project = db.get_project(project_id)
if not project:
raise HTTPException(status_code=404, detail="Project not found")
return project
except HTTPException:
raise
except Exception as e:
logger.error(f"Failed to get project: {str(e)}", exc_info=True)
raise HTTPException(status_code=500, detail=str(e))
@app.delete("/projects/{project_id}", tags=["Projects"])
async def delete_project(project_id: int):
"""Delete a project."""
try:
deleted = db.delete_project(project_id)
if not deleted:
raise HTTPException(status_code=404, detail="Project not found")
return {"message": "Project deleted successfully"}
except HTTPException:
raise
except Exception as e:
logger.error(f"Failed to delete project: {str(e)}", exc_info=True)
raise HTTPException(status_code=500, detail=str(e))
@app.get("/projects/{project_id}/validations", tags=["Projects"])
async def get_project_validations(project_id: int):
"""Get all validations for a project."""
try:
validations = db.get_project_validations(project_id)
return validations
except Exception as e:
logger.error(f"Failed to get project validations: {str(e)}", exc_info=True)
raise HTTPException(status_code=500, detail=str(e))
@app.get("/validations/recent", tags=["Projects"])
async def get_recent_validations(limit: int = Query(50, ge=1, le=200)):
"""Get recent validations across all projects."""
try:
validations = db.get_recent_validations(limit)
return validations
except Exception as e:
logger.error(f"Failed to get recent validations: {str(e)}", exc_info=True)
raise HTTPException(status_code=500, detail=str(e))
if __name__ == "__main__":
import uvicorn
uvicorn.run("app.main:app", host="0.0.0.0", port=8000, reload=True)