Spaces:
Sleeping
Sleeping
| from __future__ import annotations | |
| import json | |
| import os | |
| from pathlib import Path | |
| from typing import Any, Dict, Optional | |
| from fastapi import FastAPI, UploadFile, File, Form, HTTPException | |
| from fastapi.responses import FileResponse, JSONResponse | |
| from backend.worker.gmail_client import GmailClient | |
| app = FastAPI(title="PDF Trainer API", version="1.0") | |
| DATA_DIR = Path(os.environ.get("DATA_DIR", "/data/uploads")).resolve() | |
| PDF_DIR = DATA_DIR / "pdfs" | |
| CFG_DIR = DATA_DIR / "configs" | |
| PDF_DIR.mkdir(parents=True, exist_ok=True) | |
| CFG_DIR.mkdir(parents=True, exist_ok=True) | |
| _MAX_EMAIL_ATTACHMENT_BYTES = 20 * 1024 * 1024 # ~20MB | |
| def _looks_like_json(s: str) -> bool: | |
| s = (s or "").strip() | |
| return s.startswith("{") and s.endswith("}") | |
| def _alias_env(primary: str, fallback: str) -> None: | |
| if (os.environ.get(primary) or "").strip(): | |
| return | |
| fb = (os.environ.get(fallback) or "").strip() | |
| if fb: | |
| os.environ[primary] = fb | |
| def _resolve_json_or_path(env_name: str, default_path: Path, out_path: Path) -> Path: | |
| raw = (os.environ.get(env_name) or "").strip() | |
| if not raw: | |
| return default_path | |
| if _looks_like_json(raw): | |
| out_path.parent.mkdir(parents=True, exist_ok=True) | |
| out_path.write_text(raw, encoding="utf-8") | |
| return out_path | |
| return Path(raw) | |
| def _gmail_client() -> GmailClient: | |
| _alias_env("GMAIL_CREDENTIALS_JSON", "PDF_PIPELINE_GMAIL_CREDENTIALS_JSON") | |
| _alias_env("GMAIL_TOKEN_JSON", "PDF_PIPELINE_GMAIL_TOKEN_JSON") | |
| creds_path = _resolve_json_or_path( | |
| "GMAIL_CREDENTIALS_JSON", | |
| Path("backend/credentials.json"), | |
| Path("/tmp/credentials.json"), | |
| ) | |
| token_path = _resolve_json_or_path( | |
| "GMAIL_TOKEN_JSON", | |
| Path("backend/token.json"), | |
| Path("/tmp/token.json"), | |
| ) | |
| return GmailClient(creds_path, token_path) | |
| def _send_config_confirmation_email( | |
| *, | |
| pdf_id: str, | |
| template_id: str, | |
| config_obj: Any, | |
| notify_to: str, | |
| ) -> None: | |
| notify_from = (os.environ.get("PDF_PIPELINE_NOTIFY_FROM") or "").strip() | |
| if not notify_from: | |
| raise RuntimeError("Missing PDF_PIPELINE_NOTIFY_FROM env var") | |
| cfg_filename = f"trainer_config_{pdf_id}__{template_id}.json" | |
| cfg_bytes = json.dumps(config_obj, indent=2).encode("utf-8") | |
| attachments = [(cfg_filename, cfg_bytes)] | |
| # Attach PDF if available | |
| pdf_path = PDF_DIR / f"{pdf_id}.pdf" | |
| pdf_name_path = PDF_DIR / f"{pdf_id}.name.txt" | |
| pdf_filename = f"{pdf_id}.pdf" | |
| if pdf_name_path.exists(): | |
| try: | |
| pdf_filename = (pdf_name_path.read_text(encoding="utf-8") or "").strip() or pdf_filename | |
| except Exception: | |
| pdf_filename = f"{pdf_id}.pdf" | |
| pdf_attached = False | |
| if pdf_path.exists(): | |
| try: | |
| pdf_bytes = pdf_path.read_bytes() | |
| except Exception: | |
| pdf_bytes = b"" | |
| if pdf_bytes and len(pdf_bytes) <= _MAX_EMAIL_ATTACHMENT_BYTES: | |
| attachments.append((pdf_filename, pdf_bytes)) | |
| pdf_attached = True | |
| subject = f"PDF Trainer: configuration updated ({template_id})" | |
| body_lines = [ | |
| "Configuration has been updated.", | |
| "", | |
| f"template_id: {template_id}", | |
| f"pdf_id: {pdf_id}", | |
| "", | |
| f"PDF attached: {'yes' if pdf_attached else 'no'}", | |
| ] | |
| gmail = _gmail_client() | |
| gmail.send_email( | |
| to_email=notify_to, | |
| from_email=notify_from, | |
| subject=subject, | |
| body_text="\n".join(body_lines) + "\n", | |
| attachments=attachments, | |
| ) | |
| async def health() -> Dict[str, Any]: | |
| return {"ok": True} | |
| async def put_pdf( | |
| pdf_id: str, | |
| file: UploadFile = File(...), | |
| pdf_name: Optional[str] = Form(None), | |
| ) -> Dict[str, Any]: | |
| pdf_path = PDF_DIR / f"{pdf_id}.pdf" | |
| name_path = PDF_DIR / f"{pdf_id}.name.txt" | |
| data = await file.read() | |
| if not data: | |
| raise HTTPException(status_code=400, detail="empty upload") | |
| try: | |
| pdf_path.write_bytes(data) | |
| final_name = (pdf_name or file.filename or f"{pdf_id}.pdf").strip() or f"{pdf_id}.pdf" | |
| name_path.write_text(final_name, encoding="utf-8") | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=f"failed to store pdf: {e}") | |
| return {"ok": True, "pdf_id": pdf_id, "bytes": len(data)} | |
| async def get_pdf(pdf_id: str): | |
| pdf_path = PDF_DIR / f"{pdf_id}.pdf" | |
| if not pdf_path.exists(): | |
| raise HTTPException(status_code=404, detail="pdf not found") | |
| name_path = PDF_DIR / f"{pdf_id}.name.txt" | |
| filename = f"{pdf_id}.pdf" | |
| try: | |
| if name_path.exists(): | |
| filename = (name_path.read_text(encoding="utf-8").strip() or filename) | |
| except Exception: | |
| filename = f"{pdf_id}.pdf" | |
| return FileResponse( | |
| path=str(pdf_path), | |
| media_type="application/pdf", | |
| filename=filename, | |
| headers={"x-pdf-name": filename}, | |
| ) | |
| async def get_config(pdf_id: str, template_id: str) -> Dict[str, Any]: | |
| cfg_path = CFG_DIR / f"{pdf_id}__{template_id}.json" | |
| if not cfg_path.exists(): | |
| raise HTTPException(status_code=404, detail="config not found") | |
| try: | |
| return json.loads(cfg_path.read_text(encoding="utf-8")) | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=f"bad config json: {e}") | |
| async def send_config(payload: Dict[str, Any]) -> Dict[str, Any]: | |
| pdf_id = (payload.get("pdf_id") or "").strip() | |
| template_id = (payload.get("template_id") or "").strip() | |
| config = payload.get("config") | |
| notify_to_override = (payload.get("notify_to") or payload.get("notifyTo") or "").strip() | |
| if not pdf_id: | |
| raise HTTPException(status_code=400, detail="Missing pdf_id") | |
| if not template_id: | |
| raise HTTPException(status_code=400, detail="Missing template_id") | |
| if config is None: | |
| raise HTTPException(status_code=400, detail="Missing config") | |
| out = {"pdf_id": pdf_id, "template_id": template_id, "config": config} | |
| cfg_path = CFG_DIR / f"{pdf_id}__{template_id}.json" | |
| try: | |
| cfg_path.write_text(json.dumps(out, indent=2, sort_keys=True), encoding="utf-8") | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=f"failed to store config: {e}") | |
| notify_to_default = (os.environ.get("PDF_PIPELINE_PIPELINE_NOTIFY_TO") or "").strip() | |
| notify_to = (notify_to_override or notify_to_default).strip() | |
| emailed = False | |
| email_error: Optional[str] = None | |
| if notify_to: | |
| try: | |
| _send_config_confirmation_email( | |
| pdf_id=pdf_id, | |
| template_id=template_id, | |
| config_obj=out, | |
| notify_to=notify_to, | |
| ) | |
| emailed = True | |
| except Exception as e: | |
| email_error = str(e) | |
| pdf_exists = (PDF_DIR / f"{pdf_id}.pdf").exists() | |
| message = "Configuration has been updated." | |
| if emailed: | |
| message += f" Confirmation email sent to {notify_to}." | |
| elif notify_to: | |
| message += f" Email FAILED: {email_error}" | |
| else: | |
| message += " (No confirmation email: set PDF_PIPELINE_PIPELINE_NOTIFY_TO.)" | |
| return { | |
| "ok": True, | |
| "message": message, | |
| "stored": str(cfg_path), | |
| "pdf_exists": pdf_exists, | |
| "emailed": emailed, | |
| "notify_to": notify_to or None, | |
| } | |
| async def put_pdf_alias( | |
| pdf_id: str, | |
| file: UploadFile = File(...), | |
| pdf_name: Optional[str] = Form(None), | |
| ): | |
| # Backwards-compatible alias for clients that still use PUT. | |
| return await put_pdf(pdf_id=pdf_id, file=file, pdf_name=pdf_name) | |