Spaces:
Runtime error
Runtime error
| from __future__ import annotations | |
| import os | |
| import requests | |
| GIT_SHA=os.getenv('HF_REVISION','') or os.getenv('GIT_SHA','') | |
| def _alias_env(primary: str, fallback: str) -> None: | |
| if (os.environ.get(primary) or "").strip(): | |
| return | |
| fb = (os.environ.get(fallback) or "").strip() | |
| if fb: | |
| os.environ[primary] = fb | |
| def _upload_pdf_to_api(pdf_id, pdf_path, pdf_name): | |
| base = (os.environ.get("PDF_TRAINER_API_BASE") or "").strip() | |
| if not base: | |
| print("[worker] PDF_TRAINER_API_BASE not set - skipping upload") | |
| return | |
| url = base.rstrip("/") + "/api/pdf/" + str(pdf_id) | |
| print(f"[worker] uploading pdf_id={pdf_id} to {url}") | |
| def _send(method: str) -> requests.Response: | |
| with open(pdf_path, "rb") as f: | |
| return requests.request( | |
| method, | |
| url, | |
| files={"file": (f"{pdf_id}.pdf", f, "application/pdf")}, | |
| data={"pdf_name": pdf_name}, | |
| timeout=30, | |
| ) | |
| # Prefer POST: the public API supports POST, and some deployments have a broken PUT alias. | |
| r = _send("POST") | |
| if r.status_code in (404, 405): | |
| r = _send("PUT") | |
| print(f"[worker] upload status={r.status_code}") | |
| if r.status_code >= 400: | |
| body = (r.text or "").strip() | |
| if body: | |
| print(f"[worker] upload error body: {body[:500]}") | |
| r.raise_for_status() | |
| return r | |
| PDF_TRAINER_API_BASE = (os.environ.get('PDF_TRAINER_API_BASE') or '').strip() | |
| import time | |
| import uuid | |
| from dataclasses import dataclass | |
| from pathlib import Path | |
| from .hf_env_files import resolve_json_or_path | |
| from typing import List, Tuple | |
| from dotenv import load_dotenv | |
| from .gmail_client import GmailClient | |
| from .openai_classifier import classify_with_openai | |
| from .pdf_render import render_pdf_to_pngs | |
| # Force load repo_root/backend/.env (single source of truth) | |
| REPO_ROOT = Path(__file__).resolve().parents[2] | |
| load_dotenv(REPO_ROOT / "backend" / ".env", override=True) | |
| class Settings: | |
| creds_path: Path | |
| token_path: Path | |
| label_incoming: str | |
| label_known: str | |
| label_unknown: str | |
| label_train: str | |
| # Rep email for UNKNOWN detection | |
| rep_notify_to: str | |
| notify_from: str | |
| # OpenAI | |
| openai_api_key: str | |
| openai_model: str | |
| poll_seconds: int | |
| max_messages_per_poll: int | |
| render_pages: int | |
| render_dpi: int | |
| trainer_base_url: str | |
| def load_settings() -> Settings: | |
| base = Path(__file__).resolve().parents[1] # backend/ | |
| _alias_env("GMAIL_CREDENTIALS_JSON", "PDF_PIPELINE_GMAIL_CREDENTIALS_JSON") | |
| _alias_env("GMAIL_TOKEN_JSON", "PDF_PIPELINE_GMAIL_TOKEN_JSON") | |
| creds = resolve_json_or_path("GMAIL_CREDENTIALS_JSON", base / "credentials.json", Path("/tmp/credentials.json")) | |
| token = resolve_json_or_path("GMAIL_TOKEN_JSON", base / "token.json", Path("/tmp/token.json")) | |
| openai_api_key = (os.environ.get("OPENAI_API_KEY_TEST") or os.environ.get("OPENAI_API_KEY") or "").strip() | |
| openai_model = (os.environ.get("OPENAI_MODEL") or "gpt-4o-mini").strip() | |
| return Settings( | |
| creds_path=creds, | |
| token_path=token, | |
| label_incoming=os.environ.get("PDF_PIPELINE_LABEL_INCOMING", "PDF_PIPELINE/INCOMING"), | |
| label_known=os.environ.get("PDF_PIPELINE_LABEL_KNOWN", "PDF_PIPELINE/KNOWN"), | |
| label_unknown=os.environ.get("PDF_PIPELINE_LABEL_UNKNOWN", "PDF_PIPELINE/UNKNOWN"), | |
| label_train=os.environ.get("PDF_PIPELINE_LABEL_TRAIN", "PDF_PIPELINE/TRAIN"), | |
| notify_from=(os.environ.get("PDF_PIPELINE_NOTIFY_FROM") or "").strip(), | |
| rep_notify_to=(os.environ.get("PDF_PIPELINE_NOTIFY_TO") or "").strip(), | |
| openai_api_key=openai_api_key, | |
| openai_model=openai_model, | |
| poll_seconds=int(os.environ.get("PDF_PIPELINE_POLL_SECONDS", "20")), | |
| max_messages_per_poll=int(os.environ.get("PDF_PIPELINE_MAX_PER_POLL", "5")), | |
| render_pages=int(os.environ.get("PDF_PIPELINE_RENDER_PAGES", "2")), | |
| render_dpi=int(os.environ.get("PDF_PIPELINE_RENDER_DPI", "200")), | |
| trainer_base_url=(os.environ.get("PDF_TRAINER_BASE_URL") or "http://localhost:5173").strip(), | |
| ) | |
| def _safe_name(s: str) -> str: | |
| return "".join(c if c.isalnum() or c in ("-", "_", ".", " ") else "_" for c in s).strip() | |
| def _write_pipeline_pdf(root_worker_dir: Path, filename: str, pdf_bytes: bytes) -> Tuple[str, Path]: | |
| """ | |
| Persist PDF for the trainer to fetch later. | |
| Returns (pdf_id, pdf_path_on_disk). | |
| """ | |
| uploads_dir = root_worker_dir / "uploads" | |
| uploads_dir.mkdir(parents=True, exist_ok=True) | |
| pdf_id = uuid.uuid4().hex | |
| pdf_path = uploads_dir / f"{pdf_id}.pdf" | |
| name_path = uploads_dir / f"{pdf_id}.name.txt" | |
| pdf_path.write_bytes(pdf_bytes) | |
| name_path.write_text(filename, encoding="utf-8") | |
| return pdf_id, pdf_path | |
| def _process_train_label(gmail: GmailClient, s: Settings, root: Path) -> None: | |
| """ | |
| TRAIN behavior: | |
| - Pull unread PDFs from TRAIN label | |
| - Store into uploads/ and print trainer link | |
| - Mark read | |
| - Do NOT classify | |
| - Do NOT move labels | |
| """ | |
| msgs = gmail.search_unread_pdf_messages(s.label_train, max_results=s.max_messages_per_poll) | |
| if not msgs: | |
| return | |
| for m in msgs: | |
| msg_full = gmail.get_message_full(m.msg_id) | |
| pdf_atts = gmail.list_pdf_attachments(msg_full) | |
| if not pdf_atts: | |
| gmail.move_message(m.msg_id, add_labels=[], remove_labels=[], mark_read=True) | |
| continue | |
| for (filename, att_id) in pdf_atts: | |
| filename = _safe_name(filename or "attachment.pdf") | |
| pdf_bytes = gmail.download_attachment(m.msg_id, att_id) | |
| pdf_id, stored_pdf_path = _write_pipeline_pdf(root, filename, pdf_bytes) | |
| trainer_link = f"{s.trainer_base_url.rstrip('/')}/?pdf_id={pdf_id}" | |
| gmail.move_message(m.msg_id, add_labels=[], remove_labels=[], mark_read=True) | |
| print( | |
| f"[worker][TRAIN] stored PDF msg={m.msg_id} file={filename} " | |
| f"pdf_id={pdf_id} stored={stored_pdf_path}" | |
| ) | |
| try: | |
| r = _upload_pdf_to_api(pdf_id, stored_pdf_path, filename) | |
| if r is not None: | |
| print(f"[worker] uploaded pdf_id={pdf_id} to API") | |
| except Exception as e: | |
| print(f"[worker] WARN: failed to upload PDF to API: {e}") | |
| print(f"[worker][TRAIN] open: {trainer_link}") | |
| def main(): | |
| s = load_settings() | |
| # Validate settings | |
| if not s.rep_notify_to: | |
| raise RuntimeError("Missing PDF_PIPELINE_NOTIFY_TO (rep email for UNKNOWN detection)") | |
| if not s.notify_from: | |
| raise RuntimeError("Missing PDF_PIPELINE_NOTIFY_FROM (OAuth Gmail account email)") | |
| if not s.trainer_base_url: | |
| raise RuntimeError("Missing PDF_TRAINER_BASE_URL (base URL for trainer link)") | |
| if not s.openai_api_key: | |
| raise RuntimeError("Missing OPENAI_API_KEY_TEST (or OPENAI_API_KEY) in backend/.env") | |
| gmail = GmailClient(s.creds_path, s.token_path) | |
| # Ensure labels exist | |
| gmail.ensure_label(s.label_incoming) | |
| gmail.ensure_label(s.label_known) | |
| gmail.ensure_label(s.label_unknown) | |
| gmail.ensure_label(s.label_train) | |
| gmail.ensure_label("PDF_PIPELINE/TRAINER_DONE") | |
| root = Path(__file__).resolve().parents[0] # backend/worker | |
| tmp_dir = root / "tmp" | |
| tmp_dir.mkdir(parents=True, exist_ok=True) | |
| print(f"[worker] build={GIT_SHA}\n[worker] Watching label: {s.label_incoming}") | |
| print(f"[worker] Known label: {s.label_known}") | |
| print(f"[worker] Unknown label: {s.label_unknown}") | |
| print(f"[worker] Train label: {s.label_train}") | |
| print(f"[worker] Rep notify to: {s.rep_notify_to}") | |
| print(f"[worker] OpenAI model: {s.openai_model}") | |
| while True: | |
| try: | |
| # 1) TRAIN lane | |
| _process_train_label(gmail, s, root) | |
| # 2) Main pipeline (INCOMING -> KNOWN/UNKNOWN) | |
| msgs = gmail.search_unread_pdf_messages(s.label_incoming, max_results=s.max_messages_per_poll) | |
| if not msgs: | |
| time.sleep(s.poll_seconds) | |
| continue | |
| for m in msgs: | |
| msg_full = gmail.get_message_full(m.msg_id) | |
| pdf_atts = gmail.list_pdf_attachments(msg_full) | |
| if not pdf_atts: | |
| # Remove INCOMING + mark read so it doesn't loop forever | |
| gmail.move_message(m.msg_id, add_labels=[], remove_labels=[], mark_read=True) | |
| continue | |
| any_unknown = False | |
| unknown_payloads: List[Tuple[str, bytes]] = [] | |
| # Classify all PDF attachments for this message | |
| for (filename, att_id) in pdf_atts: | |
| filename = _safe_name(filename or "attachment.pdf") | |
| pdf_bytes = gmail.download_attachment(m.msg_id, att_id) | |
| stamp = str(int(time.time())) | |
| pdf_path = tmp_dir / f"{stamp}_{m.msg_id}_{filename}" | |
| pdf_path.write_bytes(pdf_bytes) | |
| img_dir = tmp_dir / f"{stamp}_{m.msg_id}_{pdf_path.stem}" | |
| rendered = render_pdf_to_pngs(pdf_path, img_dir, pages=s.render_pages, dpi=s.render_dpi) | |
| image_paths = [str(r.path) for r in rendered] | |
| result = classify_with_openai( | |
| image_paths, | |
| api_key=s.openai_api_key, | |
| model=s.openai_model, | |
| ) | |
| template_id = (result.get("template_id") or "UNKNOWN").strip() | |
| conf = float(result.get("confidence") or 0.0) | |
| if template_id == "UNKNOWN": | |
| any_unknown = True | |
| unknown_payloads.append((filename, pdf_bytes)) | |
| print(f"[worker] UNKNOWN attachment conf={conf:.3f} msg={m.msg_id} file={filename}") | |
| else: | |
| print( | |
| f"[worker] KNOWN attachment template={template_id} conf={conf:.3f} " | |
| f"msg={m.msg_id} file={filename}" | |
| ) | |
| # Apply Gmail label ONCE per message | |
| if any_unknown: | |
| gmail.move_message( | |
| m.msg_id, | |
| add_labels=[s.label_unknown, "PDF_PIPELINE/TRAINER_DONE"], | |
| remove_labels=[], | |
| mark_read=True, | |
| ) | |
| else: | |
| gmail.move_message( | |
| m.msg_id, | |
| add_labels=[s.label_known, "PDF_PIPELINE/TRAINER_DONE"], | |
| remove_labels=[], | |
| mark_read=True, | |
| ) | |
| # Notify rep for each unknown PDF attachment | |
| if any_unknown: | |
| for (filename, pdf_bytes) in unknown_payloads: | |
| pdf_id, stored_pdf_path = _write_pipeline_pdf(root, filename, pdf_bytes) | |
| try: | |
| _upload_pdf_to_api(pdf_id, stored_pdf_path, filename) | |
| except Exception as e: | |
| print(f"[worker] WARN: failed to upload PDF to API: {e}") | |
| trainer_link = f"{s.trainer_base_url.rstrip('/')}/?pdf_id={pdf_id}" | |
| subject = "Action required: Unknown PDF format (template not found)" | |
| body = ( | |
| "Hi,\n\n" | |
| "We received a PDF that does not match any existing templates in the system.\n\n" | |
| "Please open the PDF Trainer using the link below and create or update the template configuration:\n" | |
| f"{trainer_link}\n\n" | |
| "The original PDF is attached for reference.\n\n" | |
| "Thank you,\n" | |
| "Inserio Automation\n" | |
| ) | |
| attachments: List[Tuple[str, bytes]] = [] | |
| if len(pdf_bytes) < 20 * 1024 * 1024: | |
| attachments.append((filename, pdf_bytes)) | |
| else: | |
| body += "\nNote: The PDF was too large to attach.\n" | |
| gmail.send_email( | |
| to_email=s.rep_notify_to, | |
| from_email=s.notify_from, | |
| subject=subject, | |
| body_text=body, | |
| attachments=attachments, | |
| ) | |
| print( | |
| f"[worker] UNKNOWN: emailed rep {s.rep_notify_to} msg={m.msg_id} file={filename} " | |
| f"pdf_id={pdf_id} stored={stored_pdf_path}" | |
| ) | |
| except Exception as e: | |
| print(f"[worker] ERROR: {e}") | |
| time.sleep(s.poll_seconds) | |
| if __name__ == "__main__": | |
| main() | |