Spaces:
Sleeping
Sleeping
| from fastapi import APIRouter, HTTPException, Depends, BackgroundTasks | |
| from db.mongo import patients_collection | |
| from core.security import get_current_user | |
| from utils.helpers import calculate_age, escape_latex_special_chars, hyphenate_long_strings, format_timestamp | |
| from datetime import datetime | |
| from bson import ObjectId | |
| from bson.errors import InvalidId | |
| import os | |
| import subprocess | |
| from tempfile import TemporaryDirectory | |
| from string import Template | |
| import logging | |
| import asyncio | |
| import aiohttp | |
| import json | |
| from typing import List, Dict, Optional | |
| from pymongo import MongoClient | |
| from pymongo.errors import PyMongoError | |
| from pymongo.change_stream import CollectionChangeStream | |
| # Configure logging | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s - %(levelname)s - %(name)s - %(message)s' | |
| ) | |
| logger = logging.getLogger(__name__) | |
| router = APIRouter() | |
| # Configuration | |
| FILE_IO_API_URL = "https://file.io" | |
| FILE_IO_EXPIRATION = "1w" # 1 week expiration | |
| PDF_METADATA_COLLECTION = "pdf_metadata" # Collection to store file.io links | |
| async def upload_to_fileio(file_bytes: bytes, filename: str) -> Optional[Dict]: | |
| """Upload a file to file.io and return the response""" | |
| try: | |
| async with aiohttp.ClientSession() as session: | |
| form_data = aiohttp.FormData() | |
| form_data.add_field('file', file_bytes, filename=filename) | |
| form_data.add_field('expires', FILE_IO_EXPIRATION) | |
| async with session.post(FILE_IO_API_URL, data=form_data) as response: | |
| if response.status == 200: | |
| data = await response.json() | |
| if data.get('success'): | |
| return data | |
| logger.error(f"File.io upload failed: {data.get('message')}") | |
| else: | |
| logger.error(f"File.io upload failed with status {response.status}") | |
| return None | |
| except Exception as e: | |
| logger.error(f"Error uploading to file.io: {str(e)}") | |
| return None | |
| async def generate_pdf_bytes(patient: dict) -> Optional[bytes]: | |
| """Generate PDF bytes for a patient""" | |
| try: | |
| # Prepare table content with proper LaTeX formatting | |
| def prepare_table_content(items, columns, default_message): | |
| if not items: | |
| return f"\\multicolumn{{{columns}}}{{l}}{{{default_message}}} \\\\" | |
| content = [] | |
| for item in items: | |
| row = [] | |
| for field in item: | |
| value = item.get(field, "") or "" | |
| row.append(escape_latex_special_chars(hyphenate_long_strings(value))) | |
| content.append(" & ".join(row) + " \\\\") | |
| return "\n".join(content) | |
| # Prepare all table contents | |
| notes_content = prepare_table_content( | |
| [{ | |
| "date": format_timestamp(n.get("date", "")), | |
| "type": n.get("type", ""), | |
| "text": n.get("text", "") | |
| } for n in patient.get("notes", [])], | |
| 3, | |
| "No notes available" | |
| ) | |
| conditions_content = prepare_table_content( | |
| [{ | |
| "id": c.get("id", ""), | |
| "code": c.get("code", ""), | |
| "status": c.get("status", ""), | |
| "onset": format_timestamp(c.get("onset_date", "")), | |
| "verification": c.get("verification_status", "") | |
| } for c in patient.get("conditions", [])], | |
| 5, | |
| "No conditions available" | |
| ) | |
| medications_content = prepare_table_content( | |
| [{ | |
| "id": m.get("id", ""), | |
| "name": m.get("name", ""), | |
| "status": m.get("status", ""), | |
| "date": format_timestamp(m.get("prescribed_date", "")), | |
| "dosage": m.get("dosage", "") | |
| } for m in patient.get("medications", [])], | |
| 5, | |
| "No medications available" | |
| ) | |
| encounters_content = prepare_table_content( | |
| [{ | |
| "id": e.get("id", ""), | |
| "type": e.get("type", ""), | |
| "status": e.get("status", ""), | |
| "start": format_timestamp(e.get("period", {}).get("start", "")), | |
| "provider": e.get("service_provider", "") | |
| } for e in patient.get("encounters", [])], | |
| 5, | |
| "No encounters available" | |
| ) | |
| # LaTeX template | |
| latex_template = Template(r""" | |
| \documentclass[a4paper,12pt]{article} | |
| \usepackage[utf8]{inputenc} | |
| \usepackage[T1]{fontenc} | |
| \usepackage{geometry} | |
| \geometry{margin=1in} | |
| \usepackage{booktabs,longtable,fancyhdr} | |
| \usepackage{array} | |
| \usepackage{microtype} | |
| \microtypesetup{expansion=false} | |
| \setlength{\headheight}{14.5pt} | |
| \pagestyle{fancy} | |
| \fancyhf{} | |
| \fancyhead[L]{Patient Report} | |
| \fancyhead[R]{Generated: \today} | |
| \fancyfoot[C]{\thepage} | |
| \begin{document} | |
| \begin{center} | |
| \Large\textbf{Patient Medical Report} \\ | |
| \vspace{0.2cm} | |
| \textit{Generated on $generated_on} | |
| \end{center} | |
| \section*{Demographics} | |
| \begin{itemize} | |
| \item \textbf{FHIR ID:} $fhir_id | |
| \item \textbf{Full Name:} $full_name | |
| \item \textbf{Gender:} $gender | |
| \item \textbf{Date of Birth:} $dob | |
| \item \textbf{Age:} $age | |
| \item \textbf{Address:} $address | |
| \item \textbf{Marital Status:} $marital_status | |
| \item \textbf{Language:} $language | |
| \end{itemize} | |
| \section*{Clinical Notes} | |
| \begin{longtable}[l]{>{\raggedright\arraybackslash}p{3.5cm}>{\raggedright\arraybackslash}p{3cm}>{\raggedright\arraybackslash}p{6.5cm}} | |
| \caption{Clinical Notes} \\ | |
| \toprule | |
| \textbf{Date} & \textbf{Type} & \textbf{Text} \\ | |
| \midrule | |
| $notes | |
| \bottomrule | |
| \end{longtable} | |
| \section*{Conditions} | |
| \begin{longtable}[l]{>{\raggedright\arraybackslash}p{2cm}>{\raggedright\arraybackslash}p{3cm}>{\raggedright\arraybackslash}p{2cm}>{\raggedright\arraybackslash}p{3.5cm}>{\raggedright\arraybackslash}p{3cm}} | |
| \caption{Conditions} \\ | |
| \toprule | |
| \textbf{ID} & \textbf{Code} & \textbf{Status} & \textbf{Onset} & \textbf{Verification} \\ | |
| \midrule | |
| $conditions | |
| \bottomrule | |
| \end{longtable} | |
| \section*{Medications} | |
| \begin{longtable}[l]{>{\raggedright\arraybackslash}p{2cm}>{\raggedright\arraybackslash}p{4cm}>{\raggedright\arraybackslash}p{2cm}>{\raggedright\arraybackslash}p{3.5cm}>{\raggedright\arraybackslash}p{3cm}} | |
| \caption{Medications} \\ | |
| \toprule | |
| \textbf{ID} & \textbf{Name} & \textbf{Status} & \textbf{Date} & \textbf{Dosage} \\ | |
| \midrule | |
| $medications | |
| \bottomrule | |
| \end{longtable} | |
| \section*{Encounters} | |
| \begin{longtable}[l]{>{\raggedright\arraybackslash}p{2.5cm}>{\raggedright\arraybackslash}p{4.5cm}>{\raggedright\arraybackslash}p{2.5cm}>{\raggedright\arraybackslash}p{4.5cm}>{\raggedright\arraybackslash}p{3.5cm}} | |
| \caption{Encounters} \\ | |
| \toprule | |
| \textbf{ID} & \textbf{Type} & \textbf{Status} & \textbf{Start} & \textbf{Provider} \\ | |
| \midrule | |
| $encounters | |
| \bottomrule | |
| \end{longtable} | |
| \end{document} | |
| """) | |
| # Fill template with patient data | |
| latex_filled = latex_template.substitute( | |
| generated_on=datetime.now().strftime("%A, %B %d, %Y at %I:%M %p %Z"), | |
| fhir_id=escape_latex_special_chars(hyphenate_long_strings(patient.get("fhir_id", "") or ""), | |
| full_name=escape_latex_special_chars(patient.get("full_name", "") or ""), | |
| gender=escape_latex_special_chars(patient.get("gender", "") or ""), | |
| dob=escape_latex_special_chars(patient.get("date_of_birth", "") or ""), | |
| age=escape_latex_special_chars(str(calculate_age(patient.get("date_of_birth", "")) or "N/A"), | |
| address=escape_latex_special_chars(", ".join(filter(None, [ | |
| patient.get("address", ""), | |
| patient.get("city", ""), | |
| patient.get("state", ""), | |
| patient.get("postal_code", ""), | |
| patient.get("country", "") | |
| ]))), | |
| marital_status=escape_latex_special_chars(patient.get("marital_status", "") or ""), | |
| language=escape_latex_special_chars(patient.get("language", "") or ""), | |
| notes=notes_content, | |
| conditions=conditions_content, | |
| medications=medications_content, | |
| encounters=encounters_content | |
| ) | |
| # Compile LaTeX to PDF | |
| with TemporaryDirectory() as tmpdir: | |
| tex_path = os.path.join(tmpdir, "report.tex") | |
| pdf_path = os.path.join(tmpdir, "report.pdf") | |
| with open(tex_path, "w", encoding="utf-8") as f: | |
| f.write(latex_filled) | |
| # Run latexmk twice to ensure proper table rendering | |
| for _ in range(2): | |
| result = subprocess.run( | |
| ["latexmk", "-pdf", "-interaction=nonstopmode", tex_path], | |
| cwd=tmpdir, | |
| check=False, | |
| capture_output=True, | |
| text=True | |
| ) | |
| if result.returncode != 0: | |
| logger.error(f"LaTeX compilation failed: {result.stderr}") | |
| return None | |
| if os.path.exists(pdf_path): | |
| with open(pdf_path, "rb") as f: | |
| return f.read() | |
| return None | |
| except Exception as e: | |
| logger.error(f"Error generating PDF bytes: {str(e)}") | |
| return None | |
| async def generate_and_upload_pdf(patient: dict) -> Optional[Dict]: | |
| """Generate PDF and upload to file.io, returning metadata""" | |
| try: | |
| # Generate PDF bytes | |
| pdf_bytes = await generate_pdf_bytes(patient) | |
| if not pdf_bytes: | |
| logger.error(f"Failed to generate PDF for patient {patient.get('fhir_id')}") | |
| return None | |
| # Create filename | |
| patient_name = patient.get("full_name", "unknown").replace(" ", "_").lower() | |
| patient_id = patient.get("fhir_id", "unknown") | |
| filename = f"patient_{patient_id}_{patient_name}_report.pdf" | |
| # Upload to file.io | |
| upload_response = await upload_to_fileio(pdf_bytes, filename) | |
| if not upload_response: | |
| logger.error(f"Failed to upload PDF for patient {patient.get('fhir_id')}") | |
| return None | |
| # Prepare metadata | |
| metadata = { | |
| "patient_id": patient.get("fhir_id"), | |
| "patient_name": patient.get("full_name"), | |
| "file_key": upload_response.get('key'), | |
| "file_url": upload_response.get('link'), | |
| "expires_at": upload_response.get('expires'), | |
| "generated_at": datetime.utcnow(), | |
| "filename": filename | |
| } | |
| # Store metadata in MongoDB | |
| db = patients_collection.database | |
| await db[PDF_METADATA_COLLECTION].update_one( | |
| {"patient_id": patient.get("fhir_id")}, | |
| {"$set": metadata}, | |
| upsert=True | |
| ) | |
| logger.info(f"Successfully uploaded PDF for patient {patient.get('fhir_id')}") | |
| return metadata | |
| except Exception as e: | |
| logger.error(f"Error in generate_and_upload_pdf: {str(e)}") | |
| return None | |
| async def generate_all_patient_pdfs() -> List[Dict]: | |
| """Generate and upload PDFs for all patients""" | |
| generated_files = [] | |
| try: | |
| cursor = patients_collection.find({}) | |
| patients = await cursor.to_list(length=None) | |
| if not patients: | |
| logger.warning("No patients found in the database") | |
| return [] | |
| logger.info(f"Starting PDF generation for {len(patients)} patients") | |
| # Process patients in batches | |
| batch_size = 5 # Smaller batch size for API rate limiting | |
| for i in range(0, len(patients), batch_size): | |
| batch = patients[i:i + batch_size] | |
| tasks = [generate_and_upload_pdf(patient) for patient in batch] | |
| results = await asyncio.gather(*tasks) | |
| for result in results: | |
| if result: | |
| generated_files.append(result) | |
| logger.info(f"Processed batch {i//batch_size + 1}/{(len(patients)-1)//batch_size + 1}") | |
| await asyncio.sleep(1) # Brief pause between batches | |
| logger.info(f"Successfully processed {len(generated_files)} patients") | |
| return generated_files | |
| except Exception as e: | |
| logger.error(f"Error in generate_all_patient_pdfs: {str(e)}") | |
| return generated_files | |
| async def watch_for_new_patients(): | |
| """Watch MongoDB change stream for new patients and generate PDFs""" | |
| try: | |
| logger.info("Starting MongoDB change stream watcher for new patients") | |
| # Get the database from the collection | |
| db = patients_collection.database | |
| # Open a change stream on the patients collection | |
| pipeline = [{'$match': {'operationType': 'insert'}}] | |
| while True: | |
| try: | |
| async with patients_collection.watch(pipeline) as stream: | |
| async for change in stream: | |
| try: | |
| patient = change['fullDocument'] | |
| logger.info(f"New patient detected: {patient.get('fhir_id')}") | |
| # Generate and upload PDF | |
| result = await generate_and_upload_pdf(patient) | |
| if result: | |
| logger.info(f"Generated PDF for new patient {patient.get('fhir_id')}") | |
| else: | |
| logger.error(f"Failed to generate PDF for new patient {patient.get('fhir_id')}") | |
| except Exception as e: | |
| logger.error(f"Error processing change stream event: {str(e)}") | |
| except PyMongoError as e: | |
| logger.error(f"MongoDB change stream error: {str(e)}") | |
| await asyncio.sleep(5) # Wait before reconnecting | |
| except Exception as e: | |
| logger.error(f"Fatal error in watch_for_new_patients: {str(e)}") | |
| async def startup_event(): | |
| """Start background tasks on application startup""" | |
| # Start the change stream watcher | |
| asyncio.create_task(watch_for_new_patients()) | |
| async def trigger_pdf_generation( | |
| background_tasks: BackgroundTasks, | |
| current_user: dict = Depends(get_current_user) | |
| ): | |
| """Trigger background task to generate PDFs for all patients""" | |
| if current_user.get('role') not in ['admin', 'doctor']: | |
| raise HTTPException( | |
| status_code=403, | |
| detail="Only administrators and doctors can generate PDFs" | |
| ) | |
| background_tasks.add_task(generate_all_patient_pdfs) | |
| return {"status": "accepted", "message": "PDF generation started in the background"} | |
| async def list_pdf_links(current_user: dict = Depends(get_current_user)): | |
| """List all stored PDF metadata""" | |
| if current_user.get('role') not in ['admin', 'doctor']: | |
| raise HTTPException( | |
| status_code=403, | |
| detail="Only administrators and doctors can list PDFs" | |
| ) | |
| try: | |
| db = patients_collection.database | |
| cursor = db[PDF_METADATA_COLLECTION].find({}) | |
| pdfs = await cursor.to_list(length=None) | |
| return pdfs | |
| except Exception as e: | |
| raise HTTPException( | |
| status_code=500, | |
| detail=f"Error listing PDF files: {str(e)}" | |
| ) | |
| # Export the router as 'pdf' for api.__init__.py | |
| pdf = router |