CPS-API / api /routes /pdf.py
Ali2206's picture
Update api/routes/pdf.py
97e2e8f verified
raw
history blame
15.7 kB
from fastapi import APIRouter, HTTPException, Depends, BackgroundTasks
from db.mongo import patients_collection
from core.security import get_current_user
from utils.helpers import calculate_age, escape_latex_special_chars, hyphenate_long_strings, format_timestamp
from datetime import datetime
from bson import ObjectId
from bson.errors import InvalidId
import os
import subprocess
from tempfile import TemporaryDirectory
from string import Template
import logging
import asyncio
import aiohttp
import json
from typing import List, Dict, Optional
from pymongo import MongoClient
from pymongo.errors import PyMongoError
from pymongo.change_stream import CollectionChangeStream
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(name)s - %(message)s'
)
logger = logging.getLogger(__name__)
router = APIRouter()
# Configuration
FILE_IO_API_URL = "https://file.io"
FILE_IO_EXPIRATION = "1w" # 1 week expiration
PDF_METADATA_COLLECTION = "pdf_metadata" # Collection to store file.io links
async def upload_to_fileio(file_bytes: bytes, filename: str) -> Optional[Dict]:
"""Upload a file to file.io and return the response"""
try:
async with aiohttp.ClientSession() as session:
form_data = aiohttp.FormData()
form_data.add_field('file', file_bytes, filename=filename)
form_data.add_field('expires', FILE_IO_EXPIRATION)
async with session.post(FILE_IO_API_URL, data=form_data) as response:
if response.status == 200:
data = await response.json()
if data.get('success'):
return data
logger.error(f"File.io upload failed: {data.get('message')}")
else:
logger.error(f"File.io upload failed with status {response.status}")
return None
except Exception as e:
logger.error(f"Error uploading to file.io: {str(e)}")
return None
async def generate_pdf_bytes(patient: dict) -> Optional[bytes]:
"""Generate PDF bytes for a patient"""
try:
# Prepare table content with proper LaTeX formatting
def prepare_table_content(items, columns, default_message):
if not items:
return f"\\multicolumn{{{columns}}}{{l}}{{{default_message}}} \\\\"
content = []
for item in items:
row = []
for field in item:
value = item.get(field, "") or ""
row.append(escape_latex_special_chars(hyphenate_long_strings(value)))
content.append(" & ".join(row) + " \\\\")
return "\n".join(content)
# Prepare all table contents
notes_content = prepare_table_content(
[{
"date": format_timestamp(n.get("date", "")),
"type": n.get("type", ""),
"text": n.get("text", "")
} for n in patient.get("notes", [])],
3,
"No notes available"
)
conditions_content = prepare_table_content(
[{
"id": c.get("id", ""),
"code": c.get("code", ""),
"status": c.get("status", ""),
"onset": format_timestamp(c.get("onset_date", "")),
"verification": c.get("verification_status", "")
} for c in patient.get("conditions", [])],
5,
"No conditions available"
)
medications_content = prepare_table_content(
[{
"id": m.get("id", ""),
"name": m.get("name", ""),
"status": m.get("status", ""),
"date": format_timestamp(m.get("prescribed_date", "")),
"dosage": m.get("dosage", "")
} for m in patient.get("medications", [])],
5,
"No medications available"
)
encounters_content = prepare_table_content(
[{
"id": e.get("id", ""),
"type": e.get("type", ""),
"status": e.get("status", ""),
"start": format_timestamp(e.get("period", {}).get("start", "")),
"provider": e.get("service_provider", "")
} for e in patient.get("encounters", [])],
5,
"No encounters available"
)
# LaTeX template
latex_template = Template(r"""
\documentclass[a4paper,12pt]{article}
\usepackage[utf8]{inputenc}
\usepackage[T1]{fontenc}
\usepackage{geometry}
\geometry{margin=1in}
\usepackage{booktabs,longtable,fancyhdr}
\usepackage{array}
\usepackage{microtype}
\microtypesetup{expansion=false}
\setlength{\headheight}{14.5pt}
\pagestyle{fancy}
\fancyhf{}
\fancyhead[L]{Patient Report}
\fancyhead[R]{Generated: \today}
\fancyfoot[C]{\thepage}
\begin{document}
\begin{center}
\Large\textbf{Patient Medical Report} \\
\vspace{0.2cm}
\textit{Generated on $generated_on}
\end{center}
\section*{Demographics}
\begin{itemize}
\item \textbf{FHIR ID:} $fhir_id
\item \textbf{Full Name:} $full_name
\item \textbf{Gender:} $gender
\item \textbf{Date of Birth:} $dob
\item \textbf{Age:} $age
\item \textbf{Address:} $address
\item \textbf{Marital Status:} $marital_status
\item \textbf{Language:} $language
\end{itemize}
\section*{Clinical Notes}
\begin{longtable}[l]{>{\raggedright\arraybackslash}p{3.5cm}>{\raggedright\arraybackslash}p{3cm}>{\raggedright\arraybackslash}p{6.5cm}}
\caption{Clinical Notes} \\
\toprule
\textbf{Date} & \textbf{Type} & \textbf{Text} \\
\midrule
$notes
\bottomrule
\end{longtable}
\section*{Conditions}
\begin{longtable}[l]{>{\raggedright\arraybackslash}p{2cm}>{\raggedright\arraybackslash}p{3cm}>{\raggedright\arraybackslash}p{2cm}>{\raggedright\arraybackslash}p{3.5cm}>{\raggedright\arraybackslash}p{3cm}}
\caption{Conditions} \\
\toprule
\textbf{ID} & \textbf{Code} & \textbf{Status} & \textbf{Onset} & \textbf{Verification} \\
\midrule
$conditions
\bottomrule
\end{longtable}
\section*{Medications}
\begin{longtable}[l]{>{\raggedright\arraybackslash}p{2cm}>{\raggedright\arraybackslash}p{4cm}>{\raggedright\arraybackslash}p{2cm}>{\raggedright\arraybackslash}p{3.5cm}>{\raggedright\arraybackslash}p{3cm}}
\caption{Medications} \\
\toprule
\textbf{ID} & \textbf{Name} & \textbf{Status} & \textbf{Date} & \textbf{Dosage} \\
\midrule
$medications
\bottomrule
\end{longtable}
\section*{Encounters}
\begin{longtable}[l]{>{\raggedright\arraybackslash}p{2.5cm}>{\raggedright\arraybackslash}p{4.5cm}>{\raggedright\arraybackslash}p{2.5cm}>{\raggedright\arraybackslash}p{4.5cm}>{\raggedright\arraybackslash}p{3.5cm}}
\caption{Encounters} \\
\toprule
\textbf{ID} & \textbf{Type} & \textbf{Status} & \textbf{Start} & \textbf{Provider} \\
\midrule
$encounters
\bottomrule
\end{longtable}
\end{document}
""")
# Fill template with patient data
latex_filled = latex_template.substitute(
generated_on=datetime.now().strftime("%A, %B %d, %Y at %I:%M %p %Z"),
fhir_id=escape_latex_special_chars(hyphenate_long_strings(patient.get("fhir_id", "") or ""),
full_name=escape_latex_special_chars(patient.get("full_name", "") or ""),
gender=escape_latex_special_chars(patient.get("gender", "") or ""),
dob=escape_latex_special_chars(patient.get("date_of_birth", "") or ""),
age=escape_latex_special_chars(str(calculate_age(patient.get("date_of_birth", "")) or "N/A"),
address=escape_latex_special_chars(", ".join(filter(None, [
patient.get("address", ""),
patient.get("city", ""),
patient.get("state", ""),
patient.get("postal_code", ""),
patient.get("country", "")
]))),
marital_status=escape_latex_special_chars(patient.get("marital_status", "") or ""),
language=escape_latex_special_chars(patient.get("language", "") or ""),
notes=notes_content,
conditions=conditions_content,
medications=medications_content,
encounters=encounters_content
)
# Compile LaTeX to PDF
with TemporaryDirectory() as tmpdir:
tex_path = os.path.join(tmpdir, "report.tex")
pdf_path = os.path.join(tmpdir, "report.pdf")
with open(tex_path, "w", encoding="utf-8") as f:
f.write(latex_filled)
# Run latexmk twice to ensure proper table rendering
for _ in range(2):
result = subprocess.run(
["latexmk", "-pdf", "-interaction=nonstopmode", tex_path],
cwd=tmpdir,
check=False,
capture_output=True,
text=True
)
if result.returncode != 0:
logger.error(f"LaTeX compilation failed: {result.stderr}")
return None
if os.path.exists(pdf_path):
with open(pdf_path, "rb") as f:
return f.read()
return None
except Exception as e:
logger.error(f"Error generating PDF bytes: {str(e)}")
return None
async def generate_and_upload_pdf(patient: dict) -> Optional[Dict]:
"""Generate PDF and upload to file.io, returning metadata"""
try:
# Generate PDF bytes
pdf_bytes = await generate_pdf_bytes(patient)
if not pdf_bytes:
logger.error(f"Failed to generate PDF for patient {patient.get('fhir_id')}")
return None
# Create filename
patient_name = patient.get("full_name", "unknown").replace(" ", "_").lower()
patient_id = patient.get("fhir_id", "unknown")
filename = f"patient_{patient_id}_{patient_name}_report.pdf"
# Upload to file.io
upload_response = await upload_to_fileio(pdf_bytes, filename)
if not upload_response:
logger.error(f"Failed to upload PDF for patient {patient.get('fhir_id')}")
return None
# Prepare metadata
metadata = {
"patient_id": patient.get("fhir_id"),
"patient_name": patient.get("full_name"),
"file_key": upload_response.get('key'),
"file_url": upload_response.get('link'),
"expires_at": upload_response.get('expires'),
"generated_at": datetime.utcnow(),
"filename": filename
}
# Store metadata in MongoDB
db = patients_collection.database
await db[PDF_METADATA_COLLECTION].update_one(
{"patient_id": patient.get("fhir_id")},
{"$set": metadata},
upsert=True
)
logger.info(f"Successfully uploaded PDF for patient {patient.get('fhir_id')}")
return metadata
except Exception as e:
logger.error(f"Error in generate_and_upload_pdf: {str(e)}")
return None
async def generate_all_patient_pdfs() -> List[Dict]:
"""Generate and upload PDFs for all patients"""
generated_files = []
try:
cursor = patients_collection.find({})
patients = await cursor.to_list(length=None)
if not patients:
logger.warning("No patients found in the database")
return []
logger.info(f"Starting PDF generation for {len(patients)} patients")
# Process patients in batches
batch_size = 5 # Smaller batch size for API rate limiting
for i in range(0, len(patients), batch_size):
batch = patients[i:i + batch_size]
tasks = [generate_and_upload_pdf(patient) for patient in batch]
results = await asyncio.gather(*tasks)
for result in results:
if result:
generated_files.append(result)
logger.info(f"Processed batch {i//batch_size + 1}/{(len(patients)-1)//batch_size + 1}")
await asyncio.sleep(1) # Brief pause between batches
logger.info(f"Successfully processed {len(generated_files)} patients")
return generated_files
except Exception as e:
logger.error(f"Error in generate_all_patient_pdfs: {str(e)}")
return generated_files
async def watch_for_new_patients():
"""Watch MongoDB change stream for new patients and generate PDFs"""
try:
logger.info("Starting MongoDB change stream watcher for new patients")
# Get the database from the collection
db = patients_collection.database
# Open a change stream on the patients collection
pipeline = [{'$match': {'operationType': 'insert'}}]
while True:
try:
async with patients_collection.watch(pipeline) as stream:
async for change in stream:
try:
patient = change['fullDocument']
logger.info(f"New patient detected: {patient.get('fhir_id')}")
# Generate and upload PDF
result = await generate_and_upload_pdf(patient)
if result:
logger.info(f"Generated PDF for new patient {patient.get('fhir_id')}")
else:
logger.error(f"Failed to generate PDF for new patient {patient.get('fhir_id')}")
except Exception as e:
logger.error(f"Error processing change stream event: {str(e)}")
except PyMongoError as e:
logger.error(f"MongoDB change stream error: {str(e)}")
await asyncio.sleep(5) # Wait before reconnecting
except Exception as e:
logger.error(f"Fatal error in watch_for_new_patients: {str(e)}")
@router.on_event("startup")
async def startup_event():
"""Start background tasks on application startup"""
# Start the change stream watcher
asyncio.create_task(watch_for_new_patients())
@router.post("/generate-all-pdfs", status_code=202)
async def trigger_pdf_generation(
background_tasks: BackgroundTasks,
current_user: dict = Depends(get_current_user)
):
"""Trigger background task to generate PDFs for all patients"""
if current_user.get('role') not in ['admin', 'doctor']:
raise HTTPException(
status_code=403,
detail="Only administrators and doctors can generate PDFs"
)
background_tasks.add_task(generate_all_patient_pdfs)
return {"status": "accepted", "message": "PDF generation started in the background"}
@router.get("/list-pdf-links", response_model=List[Dict])
async def list_pdf_links(current_user: dict = Depends(get_current_user)):
"""List all stored PDF metadata"""
if current_user.get('role') not in ['admin', 'doctor']:
raise HTTPException(
status_code=403,
detail="Only administrators and doctors can list PDFs"
)
try:
db = patients_collection.database
cursor = db[PDF_METADATA_COLLECTION].find({})
pdfs = await cursor.to_list(length=None)
return pdfs
except Exception as e:
raise HTTPException(
status_code=500,
detail=f"Error listing PDF files: {str(e)}"
)
# Export the router as 'pdf' for api.__init__.py
pdf = router