Spaces:
Sleeping
Sleeping
| from fastapi import FastAPI, HTTPException, BackgroundTasks | |
| from fastapi.responses import HTMLResponse, FileResponse | |
| from pydantic import BaseModel | |
| from pathlib import Path | |
| import uuid | |
| import shutil | |
| import subprocess | |
| from validator import validate_rmd, render_rmd_format | |
| app = FastAPI( | |
| title="RMD Validator API", | |
| description="API per renderizzare file R Markdown in HTML", | |
| version="2.0.0" | |
| ) | |
| class RmdContent(BaseModel): | |
| content: str | |
| filename: str = "document.rmd" | |
| async def root(): | |
| return { | |
| "status": "online", | |
| "service": "RMD Validator API", | |
| "version": "2.0.0" | |
| } | |
| async def health(): | |
| return { | |
| "status": "healthy", | |
| "r_available": True, | |
| "packages_loaded": True | |
| } | |
| async def validate_rmd_endpoint(rmd_data: RmdContent): | |
| request_id = str(uuid.uuid4()) | |
| temp_dir = Path(f"/app/temp/{request_id}") | |
| temp_dir.mkdir(parents=True, exist_ok=True) | |
| try: | |
| if not rmd_data.filename.endswith('.rmd'): | |
| rmd_data.filename += '.rmd' | |
| rmd_file = temp_dir / rmd_data.filename | |
| with open(rmd_file, 'w', encoding='utf-8') as f: | |
| f.write(rmd_data.content) | |
| status, message = validate_rmd(str(rmd_file)) | |
| response = { | |
| "status": status, | |
| "message": message, | |
| "filename": rmd_data.filename, | |
| "request_id": request_id | |
| } | |
| if status == 200: | |
| base_name = rmd_file.stem | |
| generated_files = [] | |
| for ext in (".html", ".tex"): | |
| out = temp_dir / f"{base_name}{ext}" | |
| if out.exists(): | |
| generated_files.append(f"{base_name}{ext}") | |
| response["files"] = generated_files | |
| response["output_directory"] = str(temp_dir) | |
| else: | |
| error_file = temp_dir / "404" | |
| if error_file.exists(): | |
| with open(error_file, 'r', encoding='utf-8') as f: | |
| response["error_details"] = f.read() | |
| return response | |
| except FileNotFoundError as e: | |
| raise HTTPException(status_code=404, detail=str(e)) | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=f"Errore interno: {str(e)}") | |
| async def render_rmd_endpoint(rmd_data: RmdContent): | |
| """Renderizza Rmd e restituisce direttamente l'HTML.""" | |
| request_id = str(uuid.uuid4()) | |
| temp_dir = Path(f"/app/temp/{request_id}") | |
| temp_dir.mkdir(parents=True, exist_ok=True) | |
| try: | |
| if not rmd_data.filename.endswith('.rmd'): | |
| rmd_data.filename += '.rmd' | |
| rmd_file = temp_dir / rmd_data.filename | |
| with open(rmd_file, 'w', encoding='utf-8') as f: | |
| f.write(rmd_data.content) | |
| status, message = validate_rmd(str(rmd_file)) | |
| if status != 200: | |
| raise HTTPException(status_code=422, detail=message) | |
| html_file = temp_dir / f"{rmd_file.stem}.html" | |
| if not html_file.exists(): | |
| raise HTTPException(status_code=500, detail="HTML non generato") | |
| html_content = html_file.read_text(encoding='utf-8') | |
| return HTMLResponse(content=html_content) | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=f"Errore interno: {str(e)}") | |
| finally: | |
| shutil.rmtree(temp_dir, ignore_errors=True) | |
| async def validate_rmd_and_cleanup(rmd_data: RmdContent): | |
| request_id = str(uuid.uuid4()) | |
| temp_dir = Path(f"/app/temp/{request_id}") | |
| temp_dir.mkdir(parents=True, exist_ok=True) | |
| try: | |
| if not rmd_data.filename.endswith('.rmd'): | |
| rmd_data.filename += '.rmd' | |
| rmd_file = temp_dir / rmd_data.filename | |
| with open(rmd_file, 'w', encoding='utf-8') as f: | |
| f.write(rmd_data.content) | |
| status, message = validate_rmd(str(rmd_file)) | |
| response = { | |
| "status": status, | |
| "message": message, | |
| "filename": rmd_data.filename, | |
| "request_id": request_id | |
| } | |
| if status != 200: | |
| error_file = temp_dir / "404" | |
| if error_file.exists(): | |
| with open(error_file, 'r', encoding='utf-8') as f: | |
| response["error_details"] = f.read() | |
| return response | |
| except FileNotFoundError as e: | |
| raise HTTPException(status_code=404, detail=str(e)) | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=f"Errore interno: {str(e)}") | |
| finally: | |
| shutil.rmtree(temp_dir, ignore_errors=True) | |
| def _prepare_rmd(rmd_data, temp_dir): | |
| if not rmd_data.filename.endswith('.rmd'): | |
| rmd_data.filename += '.rmd' | |
| rmd_file = temp_dir / rmd_data.filename | |
| rmd_file.write_text(rmd_data.content, encoding='utf-8') | |
| return rmd_file | |
| async def download_html(rmd_data: RmdContent, background_tasks: BackgroundTasks): | |
| request_id = str(uuid.uuid4()) | |
| temp_dir = Path(f"/app/temp/{request_id}") | |
| temp_dir.mkdir(parents=True, exist_ok=True) | |
| try: | |
| rmd_file = _prepare_rmd(rmd_data, temp_dir) | |
| render_rmd_format(str(rmd_file), 'html_document') | |
| html_file = temp_dir / f"{rmd_file.stem}.html" | |
| if not html_file.exists(): | |
| raise HTTPException(status_code=500, detail="HTML non generato") | |
| background_tasks.add_task(shutil.rmtree, temp_dir, True) | |
| return FileResponse( | |
| path=str(html_file), | |
| filename=f"{rmd_file.stem}.html", | |
| media_type="text/html", | |
| ) | |
| except subprocess.CalledProcessError as e: | |
| shutil.rmtree(temp_dir, ignore_errors=True) | |
| raise HTTPException(status_code=422, detail=f"Errore rendering: {e.stderr}") | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| shutil.rmtree(temp_dir, ignore_errors=True) | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def download_pdf(rmd_data: RmdContent, background_tasks: BackgroundTasks): | |
| request_id = str(uuid.uuid4()) | |
| temp_dir = Path(f"/app/temp/{request_id}") | |
| temp_dir.mkdir(parents=True, exist_ok=True) | |
| try: | |
| rmd_file = _prepare_rmd(rmd_data, temp_dir) | |
| render_rmd_format(str(rmd_file), 'pdf_document') | |
| pdf_file = temp_dir / f"{rmd_file.stem}.pdf" | |
| if not pdf_file.exists(): | |
| raise HTTPException(status_code=500, detail="PDF non generato") | |
| background_tasks.add_task(shutil.rmtree, temp_dir, True) | |
| return FileResponse( | |
| path=str(pdf_file), | |
| filename=f"{rmd_file.stem}.pdf", | |
| media_type="application/pdf", | |
| ) | |
| except subprocess.CalledProcessError as e: | |
| shutil.rmtree(temp_dir, ignore_errors=True) | |
| raise HTTPException(status_code=422, detail=f"Errore rendering: {e.stderr}") | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| shutil.rmtree(temp_dir, ignore_errors=True) | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def download_tex(rmd_data: RmdContent, background_tasks: BackgroundTasks): | |
| request_id = str(uuid.uuid4()) | |
| temp_dir = Path(f"/app/temp/{request_id}") | |
| temp_dir.mkdir(parents=True, exist_ok=True) | |
| try: | |
| rmd_file = _prepare_rmd(rmd_data, temp_dir) | |
| render_rmd_format(str(rmd_file), 'latex_document') | |
| tex_file = temp_dir / f"{rmd_file.stem}.tex" | |
| if not tex_file.exists(): | |
| raise HTTPException(status_code=500, detail="LaTeX non generato") | |
| background_tasks.add_task(shutil.rmtree, temp_dir, True) | |
| return FileResponse( | |
| path=str(tex_file), | |
| filename=f"{rmd_file.stem}.tex", | |
| media_type="application/x-tex", | |
| ) | |
| except subprocess.CalledProcessError as e: | |
| shutil.rmtree(temp_dir, ignore_errors=True) | |
| raise HTTPException(status_code=422, detail=f"Errore rendering: {e.stderr}") | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| shutil.rmtree(temp_dir, ignore_errors=True) | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def cleanup_request(request_id: str): | |
| temp_dir = Path(f"/app/temp/{request_id}") | |
| if not temp_dir.exists(): | |
| raise HTTPException(status_code=404, detail="Request ID non trovato") | |
| try: | |
| shutil.rmtree(temp_dir) | |
| return {"status": "success", "message": f"File per request {request_id} eliminati"} | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=f"Errore durante la pulizia: {str(e)}") | |
| if __name__ == "__main__": | |
| import uvicorn | |
| uvicorn.run(app, host="0.0.0.0", port=7860) | |