from __future__ import annotations import os import requests GIT_SHA=os.getenv('HF_REVISION','') or os.getenv('GIT_SHA','') def _alias_env(primary: str, fallback: str) -> None: if (os.environ.get(primary) or "").strip(): return fb = (os.environ.get(fallback) or "").strip() if fb: os.environ[primary] = fb def _upload_pdf_to_api(pdf_id, pdf_path, pdf_name): base = (os.environ.get("PDF_TRAINER_API_BASE") or "").strip() if not base: print("[worker] PDF_TRAINER_API_BASE not set - skipping upload") return url = base.rstrip("/") + "/api/pdf/" + str(pdf_id) print(f"[worker] uploading pdf_id={pdf_id} to {url}") def _send(method: str) -> requests.Response: with open(pdf_path, "rb") as f: return requests.request( method, url, files={"file": (f"{pdf_id}.pdf", f, "application/pdf")}, data={"pdf_name": pdf_name}, timeout=30, ) # Prefer POST: the public API supports POST, and some deployments have a broken PUT alias. r = _send("POST") if r.status_code in (404, 405): r = _send("PUT") print(f"[worker] upload status={r.status_code}") if r.status_code >= 400: body = (r.text or "").strip() if body: print(f"[worker] upload error body: {body[:500]}") r.raise_for_status() return r PDF_TRAINER_API_BASE = (os.environ.get('PDF_TRAINER_API_BASE') or '').strip() import time import uuid from dataclasses import dataclass from pathlib import Path from .hf_env_files import resolve_json_or_path from typing import List, Tuple from dotenv import load_dotenv from .gmail_client import GmailClient from .openai_classifier import classify_with_openai from .pdf_render import render_pdf_to_pngs # Force load repo_root/backend/.env (single source of truth) REPO_ROOT = Path(__file__).resolve().parents[2] load_dotenv(REPO_ROOT / "backend" / ".env", override=True) @dataclass class Settings: creds_path: Path token_path: Path label_incoming: str label_known: str label_unknown: str label_train: str # Rep email for UNKNOWN detection rep_notify_to: str notify_from: str # OpenAI openai_api_key: str openai_model: str poll_seconds: int max_messages_per_poll: int render_pages: int render_dpi: int trainer_base_url: str def load_settings() -> Settings: base = Path(__file__).resolve().parents[1] # backend/ _alias_env("GMAIL_CREDENTIALS_JSON", "PDF_PIPELINE_GMAIL_CREDENTIALS_JSON") _alias_env("GMAIL_TOKEN_JSON", "PDF_PIPELINE_GMAIL_TOKEN_JSON") creds = resolve_json_or_path("GMAIL_CREDENTIALS_JSON", base / "credentials.json", Path("/tmp/credentials.json")) token = resolve_json_or_path("GMAIL_TOKEN_JSON", base / "token.json", Path("/tmp/token.json")) openai_api_key = (os.environ.get("OPENAI_API_KEY_TEST") or os.environ.get("OPENAI_API_KEY") or "").strip() openai_model = (os.environ.get("OPENAI_MODEL") or "gpt-4o-mini").strip() return Settings( creds_path=creds, token_path=token, label_incoming=os.environ.get("PDF_PIPELINE_LABEL_INCOMING", "PDF_PIPELINE/INCOMING"), label_known=os.environ.get("PDF_PIPELINE_LABEL_KNOWN", "PDF_PIPELINE/KNOWN"), label_unknown=os.environ.get("PDF_PIPELINE_LABEL_UNKNOWN", "PDF_PIPELINE/UNKNOWN"), label_train=os.environ.get("PDF_PIPELINE_LABEL_TRAIN", "PDF_PIPELINE/TRAIN"), notify_from=(os.environ.get("PDF_PIPELINE_NOTIFY_FROM") or "").strip(), rep_notify_to=(os.environ.get("PDF_PIPELINE_NOTIFY_TO") or "").strip(), openai_api_key=openai_api_key, openai_model=openai_model, poll_seconds=int(os.environ.get("PDF_PIPELINE_POLL_SECONDS", "20")), max_messages_per_poll=int(os.environ.get("PDF_PIPELINE_MAX_PER_POLL", "5")), render_pages=int(os.environ.get("PDF_PIPELINE_RENDER_PAGES", "2")), render_dpi=int(os.environ.get("PDF_PIPELINE_RENDER_DPI", "200")), trainer_base_url=(os.environ.get("PDF_TRAINER_BASE_URL") or "http://localhost:5173").strip(), ) def _safe_name(s: str) -> str: return "".join(c if c.isalnum() or c in ("-", "_", ".", " ") else "_" for c in s).strip() def _write_pipeline_pdf(root_worker_dir: Path, filename: str, pdf_bytes: bytes) -> Tuple[str, Path]: """ Persist PDF for the trainer to fetch later. Returns (pdf_id, pdf_path_on_disk). """ uploads_dir = root_worker_dir / "uploads" uploads_dir.mkdir(parents=True, exist_ok=True) pdf_id = uuid.uuid4().hex pdf_path = uploads_dir / f"{pdf_id}.pdf" name_path = uploads_dir / f"{pdf_id}.name.txt" pdf_path.write_bytes(pdf_bytes) name_path.write_text(filename, encoding="utf-8") return pdf_id, pdf_path def _process_train_label(gmail: GmailClient, s: Settings, root: Path) -> None: """ TRAIN behavior: - Pull unread PDFs from TRAIN label - Store into uploads/ and print trainer link - Mark read - Do NOT classify - Do NOT move labels """ msgs = gmail.search_unread_pdf_messages(s.label_train, max_results=s.max_messages_per_poll) if not msgs: return for m in msgs: msg_full = gmail.get_message_full(m.msg_id) pdf_atts = gmail.list_pdf_attachments(msg_full) if not pdf_atts: gmail.move_message(m.msg_id, add_labels=[], remove_labels=[], mark_read=True) continue for (filename, att_id) in pdf_atts: filename = _safe_name(filename or "attachment.pdf") pdf_bytes = gmail.download_attachment(m.msg_id, att_id) pdf_id, stored_pdf_path = _write_pipeline_pdf(root, filename, pdf_bytes) trainer_link = f"{s.trainer_base_url.rstrip('/')}/?pdf_id={pdf_id}" gmail.move_message(m.msg_id, add_labels=[], remove_labels=[], mark_read=True) print( f"[worker][TRAIN] stored PDF msg={m.msg_id} file={filename} " f"pdf_id={pdf_id} stored={stored_pdf_path}" ) try: r = _upload_pdf_to_api(pdf_id, stored_pdf_path, filename) if r is not None: print(f"[worker] uploaded pdf_id={pdf_id} to API") except Exception as e: print(f"[worker] WARN: failed to upload PDF to API: {e}") print(f"[worker][TRAIN] open: {trainer_link}") def main(): s = load_settings() # Validate settings if not s.rep_notify_to: raise RuntimeError("Missing PDF_PIPELINE_NOTIFY_TO (rep email for UNKNOWN detection)") if not s.notify_from: raise RuntimeError("Missing PDF_PIPELINE_NOTIFY_FROM (OAuth Gmail account email)") if not s.trainer_base_url: raise RuntimeError("Missing PDF_TRAINER_BASE_URL (base URL for trainer link)") if not s.openai_api_key: raise RuntimeError("Missing OPENAI_API_KEY_TEST (or OPENAI_API_KEY) in backend/.env") gmail = GmailClient(s.creds_path, s.token_path) # Ensure labels exist gmail.ensure_label(s.label_incoming) gmail.ensure_label(s.label_known) gmail.ensure_label(s.label_unknown) gmail.ensure_label(s.label_train) gmail.ensure_label("PDF_PIPELINE/TRAINER_DONE") root = Path(__file__).resolve().parents[0] # backend/worker tmp_dir = root / "tmp" tmp_dir.mkdir(parents=True, exist_ok=True) print(f"[worker] build={GIT_SHA}\n[worker] Watching label: {s.label_incoming}") print(f"[worker] Known label: {s.label_known}") print(f"[worker] Unknown label: {s.label_unknown}") print(f"[worker] Train label: {s.label_train}") print(f"[worker] Rep notify to: {s.rep_notify_to}") print(f"[worker] OpenAI model: {s.openai_model}") while True: try: # 1) TRAIN lane _process_train_label(gmail, s, root) # 2) Main pipeline (INCOMING -> KNOWN/UNKNOWN) msgs = gmail.search_unread_pdf_messages(s.label_incoming, max_results=s.max_messages_per_poll) if not msgs: time.sleep(s.poll_seconds) continue for m in msgs: msg_full = gmail.get_message_full(m.msg_id) pdf_atts = gmail.list_pdf_attachments(msg_full) if not pdf_atts: # Remove INCOMING + mark read so it doesn't loop forever gmail.move_message(m.msg_id, add_labels=[], remove_labels=[], mark_read=True) continue any_unknown = False unknown_payloads: List[Tuple[str, bytes]] = [] # Classify all PDF attachments for this message for (filename, att_id) in pdf_atts: filename = _safe_name(filename or "attachment.pdf") pdf_bytes = gmail.download_attachment(m.msg_id, att_id) stamp = str(int(time.time())) pdf_path = tmp_dir / f"{stamp}_{m.msg_id}_{filename}" pdf_path.write_bytes(pdf_bytes) img_dir = tmp_dir / f"{stamp}_{m.msg_id}_{pdf_path.stem}" rendered = render_pdf_to_pngs(pdf_path, img_dir, pages=s.render_pages, dpi=s.render_dpi) image_paths = [str(r.path) for r in rendered] result = classify_with_openai( image_paths, api_key=s.openai_api_key, model=s.openai_model, ) template_id = (result.get("template_id") or "UNKNOWN").strip() conf = float(result.get("confidence") or 0.0) if template_id == "UNKNOWN": any_unknown = True unknown_payloads.append((filename, pdf_bytes)) print(f"[worker] UNKNOWN attachment conf={conf:.3f} msg={m.msg_id} file={filename}") else: print( f"[worker] KNOWN attachment template={template_id} conf={conf:.3f} " f"msg={m.msg_id} file={filename}" ) # Apply Gmail label ONCE per message if any_unknown: gmail.move_message( m.msg_id, add_labels=[s.label_unknown, "PDF_PIPELINE/TRAINER_DONE"], remove_labels=[], mark_read=True, ) else: gmail.move_message( m.msg_id, add_labels=[s.label_known, "PDF_PIPELINE/TRAINER_DONE"], remove_labels=[], mark_read=True, ) # Notify rep for each unknown PDF attachment if any_unknown: for (filename, pdf_bytes) in unknown_payloads: pdf_id, stored_pdf_path = _write_pipeline_pdf(root, filename, pdf_bytes) try: _upload_pdf_to_api(pdf_id, stored_pdf_path, filename) except Exception as e: print(f"[worker] WARN: failed to upload PDF to API: {e}") trainer_link = f"{s.trainer_base_url.rstrip('/')}/?pdf_id={pdf_id}" subject = "Action required: Unknown PDF format (template not found)" body = ( "Hi,\n\n" "We received a PDF that does not match any existing templates in the system.\n\n" "Please open the PDF Trainer using the link below and create or update the template configuration:\n" f"{trainer_link}\n\n" "The original PDF is attached for reference.\n\n" "Thank you,\n" "Inserio Automation\n" ) attachments: List[Tuple[str, bytes]] = [] if len(pdf_bytes) < 20 * 1024 * 1024: attachments.append((filename, pdf_bytes)) else: body += "\nNote: The PDF was too large to attach.\n" gmail.send_email( to_email=s.rep_notify_to, from_email=s.notify_from, subject=subject, body_text=body, attachments=attachments, ) print( f"[worker] UNKNOWN: emailed rep {s.rep_notify_to} msg={m.msg_id} file={filename} " f"pdf_id={pdf_id} stored={stored_pdf_path}" ) except Exception as e: print(f"[worker] ERROR: {e}") time.sleep(s.poll_seconds) if __name__ == "__main__": main()