| """ICSAC paper submission intake — FastAPI handler. |
| |
| Origin-trust is enforced by HMAC over body+timestamp using a shared |
| secret known only to the upstream proxy (CF Pages Function) and this |
| handler; the handler is bound to a private interface and only reachable |
| through that proxy. |
| |
| Endpoints: |
| POST /api/submit — multipart form + PDF, HMAC-gated |
| GET /api/submission/<id>/state — public read-only status |
| |
| The handler writes ~/icsac-submissions/<id>/{paper.pdf, submission.json, |
| state.json}, drops a marker in queue/, fires the confirmation email and |
| a curator alert via the configured notification channel, and returns |
| immediately. The pipeline runs in submission_worker.py triggered by a |
| systemd .path unit on the queue directory. |
| """ |
|
|
| from __future__ import annotations |
|
|
| import asyncio |
| import hashlib |
| import hmac |
| import json |
| import os |
| import random |
| import re |
| import secrets |
| import sys |
| import time |
| import uuid |
| from datetime import datetime, timezone |
| from pathlib import Path |
| from urllib.parse import unquote |
|
|
| from fastapi import BackgroundTasks, FastAPI, HTTPException, Request |
| from fastapi.responses import JSONResponse |
|
|
| |
| |
| _REPO_ROOT = Path(__file__).resolve().parent.parent |
| if str(_REPO_ROOT) not in sys.path: |
| sys.path.insert(0, str(_REPO_ROOT)) |
|
|
| import config |
|
|
| import submission_intake as ingest |
| import notify |
|
|
| from . import notify_author |
| from .time_fmt import to_et_display |
|
|
|
|
| SUBMISSIONS_ROOT = Path.home() / "icsac-submissions" |
| QUEUE_DIR = SUBMISSIONS_ROOT / "queue" |
| COUNTER_FILE = SUBMISSIONS_ROOT / ".counter" |
| AUDIT_LOG = Path(config.REVIEWS_DIR) / "audit-log.jsonl" |
| |
| |
| |
| |
| TEST_SUBMISSIONS_ROOT = SUBMISSIONS_ROOT / "test" |
| TEST_QUEUE_DIR = TEST_SUBMISSIONS_ROOT / "queue" |
| TEST_AUDIT_LOG = Path(config.REVIEWS_DIR) / "audit-log-test.jsonl" |
|
|
| MAX_PDF_BYTES = int(os.environ.get( |
| "INTAKE_MAX_PDF_BYTES", str(100 * 1024 * 1024) |
| )) |
| HMAC_SECRET = os.environ.get("INTAKE_HMAC_SECRET", "").encode() |
| HMAC_MAX_SKEW_SEC = 300 |
|
|
| ALLOWED_LICENSES = {"cc-by-4.0", "cc-by-sa-4.0", "cc0-1.0"} |
| EMAIL_RE = re.compile(r"^[^@\s]+@[^@\s]+\.[^@\s]+$") |
| ORCID_RE = re.compile(r"^\d{4}-\d{4}-\d{4}-\d{3}[\dX]$") |
| SUB_ID_RE = re.compile(r"^ICSAC-SUB-\d{5}$") |
| |
| |
| |
| |
| TEST_SUB_ID_RE = re.compile(r"^ICSAC-SUB-TEST-\d{10,}$") |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| _DEFAULT_TEST_ORCIDS = frozenset() |
|
|
|
|
| def _normalize_orcid(s: str) -> str: |
| """Whitespace-strip and uppercase the X check digit. ORCID ids are |
| case-sensitive on the final character only.""" |
| return (s or "").strip().upper() |
|
|
|
|
| def _load_test_orcids() -> frozenset[str]: |
| raw = os.environ.get("TEST_ORCIDS", "").strip() |
| if not raw: |
| return _DEFAULT_TEST_ORCIDS |
| parsed = {_normalize_orcid(p) for p in raw.split(",") if p.strip()} |
| |
| |
| return frozenset(parsed | _DEFAULT_TEST_ORCIDS) |
|
|
|
|
| TEST_ORCID_WHITELIST = _load_test_orcids() |
|
|
|
|
| def is_test_submission(orcid: str) -> bool: |
| """Return True if `orcid` is on the ICSAC test-mode whitelist.""" |
| return _normalize_orcid(orcid) in TEST_ORCID_WHITELIST |
|
|
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| _TIER_LETTERS = {1: "azbycx", 2: "dwevfu", 3: "gthsir"} |
|
|
|
|
| def _test_token(tier: int, *, seed: str | None = None) -> str: |
| rng = secrets.SystemRandom() if seed is None else random.Random(seed) |
| letters = list(_TIER_LETTERS[tier]) |
| rng.shuffle(letters) |
| d = str(tier) |
| s1 = rng.choice(("0", d)) |
| s2 = rng.choice(("0", d)) |
| return f"TEST:{letters[0]}{letters[1]}{s1}{letters[2]}{letters[3]}{s2}{letters[4]}{letters[5]}{d}" |
|
|
|
|
| _TEST_TOKEN_RE = { |
| 1: re.compile(r"^TEST:[abcxyz]{2}[01][abcxyz]{2}[01][abcxyz]{2}1$"), |
| 2: re.compile(r"^TEST:[defuvw]{2}[02][defuvw]{2}[02][defuvw]{2}2$"), |
| 3: re.compile(r"^TEST:[ghirst]{2}[03][ghirst]{2}[03][ghirst]{2}3$"), |
| } |
|
|
|
|
| def _identify_test_token(token: str) -> int | None: |
| for tier, pattern in _TEST_TOKEN_RE.items(): |
| if pattern.match(token): |
| return tier |
| return None |
|
|
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| def _tiers_disabled() -> bool: |
| return os.environ.get("TEST_TIERS_DISABLED", "").lower() in ( |
| "true", "1", "yes", |
| ) |
|
|
|
|
| def _resolve_tier(orcid: str, tier_header: str | None) -> int: |
| """Resolve effective tier. Returns 1, 2, or 3. |
| |
| Production ORCIDs always get 1 even if the header is set (the param |
| is ignored — production flow stays unchanged). For test ORCIDs, the |
| header drives the choice; garbage values silently downgrade to 1. |
| TEST_TIERS_DISABLED env var forces 1 globally for test ORCIDs. |
| """ |
| if not is_test_submission(orcid): |
| return 1 |
| if _tiers_disabled(): |
| return 1 |
| if tier_header in ("t2", "T2"): |
| return 2 |
| if tier_header in ("t3", "T3"): |
| return 3 |
| return 1 |
| |
| |
| |
| ZENODO_DOI_RE = re.compile(r"^10\.5281/zenodo\.\d+$", re.IGNORECASE) |
| ARXIV_REF_RE = re.compile( |
| r"^(?:10\.48550/arXiv\.)?\d{4}\.\d{4,5}(?:v\d+)?$", re.IGNORECASE |
| ) |
| DOI_SHAPE_RE = re.compile(r"^10\.\d{4,9}/\S+$") |
|
|
| app = FastAPI(title="ICSAC Submission Intake", docs_url=None, redoc_url=None) |
|
|
|
|
| |
|
|
| def _now_iso() -> str: |
| return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") |
|
|
|
|
| def _scan_max_known_sub_n() -> int: |
| """Walk on-disk submission dirs + the audit-log for ICSAC-SUB-NNNNN |
| references. Returns the largest N ever seen, or 0. Belt-and-suspenders |
| against a counter-file that's been deleted or stomped during cleanup |
| — by deriving the floor from primary sources of truth, allocation |
| can never regress to a previously-issued ID even if .counter is reset. |
| """ |
| max_n = 0 |
| if SUBMISSIONS_ROOT.is_dir(): |
| for entry in SUBMISSIONS_ROOT.iterdir(): |
| m = re.match(r"^ICSAC-SUB-(\d+)$", entry.name) |
| if m: |
| n = int(m.group(1)) |
| if n > max_n: |
| max_n = n |
| if AUDIT_LOG.exists(): |
| try: |
| with AUDIT_LOG.open() as alog: |
| for line in alog: |
| am = re.search(r'"sub_id":\s*"ICSAC-SUB-(\d+)"', line) |
| if am: |
| n = int(am.group(1)) |
| if n > max_n: |
| max_n = n |
| except Exception: |
| pass |
| return max_n |
|
|
|
|
| def _allocate_sub_id() -> str: |
| """Atomic monotonic counter. The counter file is one of three sources |
| of truth (file value, on-disk dirs, audit-log entries); the next ID |
| is always max-over-sources + 1, so allocation can't regress even if |
| the counter file gets deleted or de-synced during curator cleanup. |
| """ |
| import fcntl |
|
|
| SUBMISSIONS_ROOT.mkdir(parents=True, exist_ok=True) |
| COUNTER_FILE.touch(exist_ok=True) |
|
|
| with COUNTER_FILE.open("r+") as f: |
| fcntl.flock(f, fcntl.LOCK_EX) |
| try: |
| try: |
| file_value = int((f.read().strip() or "0")) |
| except ValueError: |
| file_value = 0 |
| current = max(file_value, _scan_max_known_sub_n()) |
| nxt = current + 1 |
| f.seek(0) |
| f.truncate() |
| f.write(f"{nxt}\n") |
| finally: |
| fcntl.flock(f, fcntl.LOCK_UN) |
| return f"ICSAC-SUB-{nxt:05d}" |
|
|
|
|
| def _audit_append(entry: dict, *, test_mode: bool = False) -> None: |
| """Append a JSONL audit entry. Test-mode entries route to a separate |
| sibling file (TEST_AUDIT_LOG) so production observability never sees |
| them — production AUDIT_LOG stays disjoint from test data even if a |
| downstream consumer forgets to filter on the `test: true` field.""" |
| target = TEST_AUDIT_LOG if test_mode else AUDIT_LOG |
| target.parent.mkdir(parents=True, exist_ok=True) |
| base = {"ts": _now_iso()} |
| if test_mode: |
| base["test"] = True |
| entry = {**base, **entry} |
| try: |
| with target.open("a") as f: |
| f.write(json.dumps(entry) + "\n") |
| except Exception as exc: |
| print(f"audit-log append failed: {exc}", file=sys.stderr) |
|
|
|
|
| def _tlog(msg: str) -> None: |
| """Stdout/journald log with [TEST] prefix so a curator tailing logs |
| can instantly see test-mode activity. Flushed so it lands in |
| journalctl in real time.""" |
| print(f"[TEST] {msg}", flush=True) |
|
|
|
|
| def _verify_hmac(request: Request, body: bytes) -> None: |
| """Reject request unless X-ICSAC-Signature + Timestamp validate.""" |
| if not HMAC_SECRET: |
| raise HTTPException(500, "intake misconfigured: HMAC secret missing") |
|
|
| sig_header = request.headers.get("x-icsac-signature", "") |
| ts_header = request.headers.get("x-icsac-timestamp", "") |
| if not sig_header.startswith("sha256=") or not ts_header: |
| raise HTTPException(401, "missing signature") |
|
|
| try: |
| ts = int(ts_header) |
| except ValueError: |
| raise HTTPException(401, "bad timestamp") |
| if abs(time.time() - ts) > HMAC_MAX_SKEW_SEC: |
| raise HTTPException(401, "stale signature") |
|
|
| expected = hmac.new( |
| HMAC_SECRET, f"{ts}.".encode() + body, hashlib.sha256 |
| ).hexdigest() |
| if not hmac.compare_digest(expected, sig_header[len("sha256="):]): |
| raise HTTPException(401, "bad signature") |
|
|
|
|
| def _validate_submitter(d: dict) -> dict: |
| """Validate the always-required submitter fields. Raises 400 on bad input. |
| |
| ORCID is required on both DOI and upload routes since 2026-04-27 (frontend |
| commit f5f5305 + backend enforcement here closes the DevTools-bypass gap). |
| |
| `exclusivity` is opt-in — frontend has been sending it on PDF submissions |
| since website commit 57565a4 (2026-04-26). We capture the value when |
| present (alongside coi_acknowledged) but do not enforce it server-side; |
| the form-level requirement lives in the upload UI, not the API contract. |
| """ |
| name = (d.get("name") or "").strip() |
| email = (d.get("email") or "").strip().lower() |
| orcid = (d.get("orcid") or "").strip() |
| coi = str(d.get("coi") or "").lower() in ("on", "true", "1", "yes") |
| exclusivity_raw = d.get("exclusivity") |
| if exclusivity_raw is None: |
| exclusivity = None |
| else: |
| exclusivity = str(exclusivity_raw).lower() in ("on", "true", "1", "yes") |
|
|
| errs = [] |
| if len(name) < 2 or len(name) > 200: |
| errs.append("name must be 2–200 chars") |
| if not EMAIL_RE.match(email) or len(email) > 200: |
| errs.append("email is not a valid address") |
| if not orcid: |
| errs.append("ORCID is required") |
| elif not ORCID_RE.match(orcid): |
| errs.append("ORCID must be of the form 0000-0000-0000-0000") |
| if not coi: |
| errs.append("conflict-of-interest acknowledgement required") |
| if errs: |
| raise HTTPException(400, {"error": "validation_failed", "details": errs}) |
|
|
| return { |
| "name": name, "email": email, |
| "orcid": orcid, "coi_acknowledged": True, |
| "exclusivity_acknowledged": exclusivity, |
| } |
|
|
|
|
| RESOURCE_TYPES = {"preprint", "article", "report", "dataset", "software", "other"} |
| ISO_DATE_RE = re.compile(r"^\d{4}-\d{2}-\d{2}$") |
| RELATION_TYPES = { |
| "isSupplementTo", "isPreviousVersionOf", "isNewVersionOf", |
| "isDerivedFrom", "isPartOf", "cites", "references", "isDocumentedBy", |
| } |
|
|
|
|
| def _parse_creators(raw: str) -> list[dict]: |
| """Parse the JSON-encoded `creators` field from the upload form. |
| |
| Each entry must have a non-empty name; orcid (validated against ORCID_RE) |
| and affiliation are optional. Raises HTTPException 400 on invalid shape |
| or invalid ORCID; the caller catches errors via the standard validator |
| accumulator. Returns [] if raw is empty so the caller can decide whether |
| that's acceptable (it's not — at least one creator is required). |
| """ |
| if not raw: |
| return [] |
| try: |
| parsed = json.loads(raw) |
| except json.JSONDecodeError: |
| raise HTTPException(400, {"error": "validation_failed", |
| "details": ["creators must be JSON"]}) |
| if not isinstance(parsed, list): |
| raise HTTPException(400, {"error": "validation_failed", |
| "details": ["creators must be a JSON array"]}) |
| out = [] |
| for i, entry in enumerate(parsed): |
| if not isinstance(entry, dict): |
| raise HTTPException(400, {"error": "validation_failed", |
| "details": [f"creators[{i}] must be an object"]}) |
| name = (entry.get("name") or "").strip() |
| if not name or len(name) > 200: |
| raise HTTPException(400, {"error": "validation_failed", |
| "details": [f"creators[{i}].name required (≤200 chars)"]}) |
| orcid = (entry.get("orcid") or "").strip() |
| if orcid and not ORCID_RE.match(orcid): |
| raise HTTPException(400, {"error": "validation_failed", |
| "details": [f"creators[{i}].orcid must be 0000-0000-0000-0000 form"]}) |
| affiliation = (entry.get("affiliation") or "").strip() |
| rec: dict[str, str] = {"name": name} |
| if orcid: |
| rec["orcid"] = orcid |
| if affiliation: |
| rec["affiliation"] = affiliation[:300] |
| out.append(rec) |
| return out |
|
|
|
|
| def _parse_related_identifiers(raw: str) -> list[dict]: |
| """Parse the JSON-encoded `related_identifiers` field — optional, may |
| be absent or an empty list. Each entry needs a non-empty `identifier` |
| and a `relation` from the Zenodo-aligned RELATION_TYPES set. |
| """ |
| if not raw: |
| return [] |
| try: |
| parsed = json.loads(raw) |
| except json.JSONDecodeError: |
| raise HTTPException(400, {"error": "validation_failed", |
| "details": ["related_identifiers must be JSON"]}) |
| if not isinstance(parsed, list): |
| raise HTTPException(400, {"error": "validation_failed", |
| "details": ["related_identifiers must be a JSON array"]}) |
| out = [] |
| for i, entry in enumerate(parsed): |
| if not isinstance(entry, dict): |
| raise HTTPException(400, {"error": "validation_failed", |
| "details": [f"related_identifiers[{i}] must be an object"]}) |
| ident = (entry.get("identifier") or "").strip() |
| rel = (entry.get("relation") or "").strip() |
| if not ident or len(ident) > 500: |
| raise HTTPException(400, {"error": "validation_failed", |
| "details": [f"related_identifiers[{i}].identifier required (≤500 chars)"]}) |
| if rel not in RELATION_TYPES: |
| raise HTTPException(400, {"error": "validation_failed", |
| "details": [f"related_identifiers[{i}].relation must be one of {sorted(RELATION_TYPES)}"]}) |
| out.append({"identifier": ident, "relation": rel}) |
| return out |
|
|
|
|
| def _validate_upload_metadata(d: dict) -> dict: |
| """Validate the metadata fields required only when the submitter is |
| uploading a PDF (DOI-mode papers get metadata from the resolver). |
| |
| Captures the full Zenodo deposit metadata expansion (2026-04-27): |
| creators[], resource_type, publication_date, subject, funding, |
| related_identifiers[], and the deposit_consent gate. The deposit |
| step (worker-side) consumes these to mint the ICSAC-community Zenodo |
| record on accept; until that step ships the metadata sits in |
| submission.json waiting for the deposit module. |
| """ |
| title = (d.get("title") or "").strip() |
| abstract = (d.get("abstract") or "").strip() |
| keywords = (d.get("keywords") or "").strip() |
| license_id = (d.get("license") or "").strip().lower() |
| resource_type = (d.get("resource_type") or "preprint").strip().lower() |
| publication_date = (d.get("publication_date") or "").strip() |
| subject = (d.get("subject") or "").strip() |
| funding = (d.get("funding") or "").strip() |
| deposit_consent = str(d.get("deposit_consent") or "").lower() in ("on", "true", "1", "yes") |
|
|
| creators = _parse_creators(d.get("creators") or "") |
| related = _parse_related_identifiers(d.get("related_identifiers") or "") |
|
|
| errs = [] |
| if len(title) < 3 or len(title) > 400: |
| errs.append("title must be 3–400 chars") |
| if len(abstract) < 50 or len(abstract) > 5000: |
| errs.append("abstract must be 50–5000 chars") |
| if not keywords: |
| errs.append("at least one keyword required") |
| if license_id not in ALLOWED_LICENSES: |
| errs.append(f"license must be one of {sorted(ALLOWED_LICENSES)}") |
| if resource_type not in RESOURCE_TYPES: |
| errs.append(f"resource_type must be one of {sorted(RESOURCE_TYPES)}") |
| if not publication_date: |
| errs.append("publication_date required (YYYY-MM-DD)") |
| elif not ISO_DATE_RE.match(publication_date): |
| errs.append("publication_date must be YYYY-MM-DD format") |
| if not creators: |
| errs.append("at least one creator required") |
| if not deposit_consent: |
| errs.append("deposit_consent required for upload route") |
| if errs: |
| raise HTTPException(400, {"error": "validation_failed", "details": errs}) |
|
|
| kw_list = [k.strip() for k in re.split(r"[,;]+", keywords) if k.strip()] |
| return { |
| "title": title, "abstract": abstract, |
| "keywords": kw_list, "license": license_id, |
| "resource_type": resource_type, |
| "publication_date": publication_date, |
| "subject": subject or None, |
| "funding": funding or None, |
| "creators": creators, |
| "related_identifiers": related, |
| "deposit_consent": True, |
| } |
|
|
|
|
| def _normalize_doi(raw: str) -> str: |
| """Strip URL prefix variants. Accepts 'https://doi.org/10.5281/...', |
| 'doi:10.5281/...', or bare DOI.""" |
| s = (raw or "").strip() |
| s = re.sub(r"^https?://(dx\.)?doi\.org/", "", s, flags=re.IGNORECASE) |
| s = re.sub(r"^doi:\s*", "", s, flags=re.IGNORECASE) |
| return s.strip() |
|
|
|
|
| |
|
|
| def _allocate_test_sub_id() -> str: |
| """ICSAC-SUB-TEST-<unix-ts>. Visually distinct prefix keeps test |
| submissions from polluting the real ICSAC-SUB-NNNNN counter or |
| on-disk numbering.""" |
| return f"ICSAC-SUB-TEST-{int(time.time())}" |
|
|
|
|
| def _test_state_progression(sub_dir: Path, sub_id: str) -> None: |
| """Background coroutine: walk the canned test submission through |
| SUBMITTED → REVIEWING → REVIEW_COMPLETE → ACCEPTED on a 5s/10s/15s |
| cadence so the state-page poll loop sees realistic transitions. |
| Runs as a fire-and-forget task scheduled from handle_test_submission. |
| All audit entries are stamped test=True. |
| """ |
|
|
| def _write(state: str, **extra) -> None: |
| state_path = sub_dir / "state.json" |
| data = json.loads(state_path.read_text()) if state_path.exists() else {} |
| data["state"] = state |
| data["test_mode"] = True |
| for k, v in extra.items(): |
| data[k] = v |
| state_path.write_text(json.dumps(data, indent=2)) |
|
|
| async def _run() -> None: |
| try: |
| await asyncio.sleep(5) |
| _write("in_review", review_started_at=_now_iso()) |
| _audit_append( |
| {"sub_id": sub_id, "event": "review_started"}, |
| test_mode=True, |
| ) |
|
|
| await asyncio.sleep(5) |
| |
| |
| |
| |
| |
| canned_panel = { |
| "recommendation": "ACCEPT", |
| "scores": { |
| "domain_fit": 7, |
| "methodological_transparency": 7, |
| "internal_consistency": 7, |
| "citation_integrity": 7, |
| "novelty_signal": 7, |
| "ai_provenance_signal": 7, |
| }, |
| "comments": "[TEST MODE — canned response]", |
| "scale_note": "scores are 7/10 (test scale, NOT production 1-5)", |
| } |
| _write("review_complete", |
| panel_completed_at=_now_iso(), |
| panel_result=canned_panel) |
| _audit_append( |
| {"sub_id": sub_id, "event": "review_completed", |
| "recommendation": "ACCEPT", |
| "panel_result": canned_panel}, |
| test_mode=True, |
| ) |
|
|
| await asyncio.sleep(5) |
| _write("completed", |
| completed_at=_now_iso(), |
| decision="accept", |
| panel_recommendation="ACCEPT") |
| _audit_append( |
| {"sub_id": sub_id, "event": "decision_finalized", |
| "verdict": "accept", "by": "test_mode"}, |
| test_mode=True, |
| ) |
| except Exception as exc: |
| print(f"test-mode progression failed for {sub_id}: {exc}", |
| file=sys.stderr) |
| _audit_append( |
| {"sub_id": sub_id, "event": "test_progression_failed", |
| "error": str(exc)[:200]}, |
| test_mode=True, |
| ) |
|
|
| |
| |
| |
| |
| try: |
| asyncio.get_event_loop().create_task(_run()) |
| except RuntimeError: |
| |
| |
| import threading |
| threading.Thread(target=lambda: asyncio.run(_run()), |
| daemon=True).start() |
|
|
|
|
| def handle_test_submission(submitter: dict, auth_orcid: str, |
| auth_name: str, source: str, |
| source_ref: str, title: str, |
| received_at: str) -> JSONResponse: |
| """Test-mode short-circuit. Writes a state file under SUBMISSIONS_ROOT |
| using a TEST-prefixed ID, audits with test=True, schedules background |
| state progression, and returns the same response shape as the real |
| handler. Does NOT dispatch the panel, run RQC, deposit to Zenodo, |
| email the author, or fire Telegram approval. |
| """ |
| sub_id = _allocate_test_sub_id() |
| token = _test_token(1, seed=sub_id) |
| |
| |
| |
| TEST_SUBMISSIONS_ROOT.mkdir(parents=True, exist_ok=True) |
| sub_dir = TEST_SUBMISSIONS_ROOT / sub_id |
| sub_dir.mkdir(parents=True, exist_ok=False) |
| _tlog(f"T1 submission received: {sub_id} (auth_token={token})") |
|
|
| redacted_form = {**submitter, "orcid": token} |
| submission_record = { |
| "sub_id": sub_id, |
| "test_mode": True, |
| "tier": 1, |
| "received_at": received_at, |
| "source": source, |
| "source_ref": source_ref, |
| "form": redacted_form, |
| "auth": { |
| "orcid": token, |
| "name_on_record": auth_name or None, |
| "verified": bool(auth_orcid), |
| }, |
| "title": title, |
| "abstract": "[TEST MODE — canned response]", |
| } |
| (sub_dir / "submission.json").write_text( |
| json.dumps(submission_record, indent=2) |
| ) |
| (sub_dir / "state.json").write_text(json.dumps({ |
| "state": "received", |
| "test_mode": True, |
| "tier": 1, |
| "received_at": received_at, |
| }, indent=2)) |
|
|
| _audit_append({ |
| "sub_id": sub_id, |
| "event": "submission_received", |
| "tier": 1, |
| "source": source, |
| "source_ref": source_ref, |
| "title": title[:200], |
| "auth_orcid": token, |
| "auth_verified": bool(auth_orcid), |
| }, test_mode=True) |
|
|
| _test_state_progression(sub_dir, sub_id) |
|
|
| return JSONResponse({ |
| "sub_id": sub_id, |
| "status_url": f"/submission/{sub_id}", |
| "state": "received", |
| "received_at": received_at, |
| "test_mode": True, |
| "tier": 1, |
| }) |
|
|
|
|
| |
|
|
| async def handle_test_pipeline_submission( |
| *, tier: int, request: Request, form, submitter: dict, |
| auth_orcid: str, auth_name: str, |
| ) -> JSONResponse: |
| """T2/T3 entry point: real pipeline, test side-effect routing. |
| |
| Mirrors the production handler's PDF/DOI handling and metadata |
| validation, but writes everything into SUBMISSIONS_ROOT/test/<id>, |
| drops the queue marker into TEST_QUEUE_DIR, redacts the live ORCID |
| via _test_token(tier, seed=sub_id), and stamps `tier` on |
| submission.json so the worker knows to apply T2/T3 routing in the |
| side-effect stages (Zenodo sandbox, email outbox vs draft, test |
| Telegram chat). |
| |
| On T3 only, fires an immediate curator-Telegram tripwire so the |
| curator knows real frontier-model tokens are about to burn and |
| a Zenodo sandbox call is queued. Tripwire goes to the OPERATOR |
| chat (TELEGRAM_CHAT_ID), not the test chat. |
| """ |
| if tier not in (2, 3): |
| raise ValueError(f"handle_test_pipeline_submission tier must be 2 or 3, got {tier!r}") |
|
|
| pdf = form.get("pdf") |
| has_pdf = pdf is not None and hasattr(pdf, "read") and getattr(pdf, "filename", "") |
| raw_doi = form.get("doi", "") or "" |
| doi = _normalize_doi(raw_doi if isinstance(raw_doi, str) else "") |
| has_doi = bool(doi) |
|
|
| if has_doi and has_pdf: |
| raise HTTPException(400, { |
| "error": "doi_xor_pdf", |
| "message": "Provide a DOI OR a PDF upload, not both.", |
| }) |
| if not has_doi and not has_pdf: |
| raise HTTPException(400, { |
| "error": "doi_or_pdf_required", |
| "message": "Provide a DOI or upload a PDF.", |
| }) |
|
|
| if has_doi: |
| _validate_doi_shape(doi) |
|
|
| sub_id = _allocate_test_sub_id() |
| token = _test_token(tier, seed=sub_id) |
| TEST_SUBMISSIONS_ROOT.mkdir(parents=True, exist_ok=True) |
| sub_dir = TEST_SUBMISSIONS_ROOT / sub_id |
| sub_dir.mkdir(parents=True, exist_ok=False) |
| pdf_path = sub_dir / "paper.pdf" |
| _tlog(f"T{tier} submission received: {sub_id} (auth_token={token})") |
|
|
| if has_doi: |
| title = "(deferred — resolving from DOI)" |
| abstract = "" |
| keywords: list = [] |
| license_id = "" |
| creators = [{"name": submitter["name"], "orcid": token}] |
| publication_date = _now_iso()[:10] |
| resource_type = None |
| subject = None |
| funding = None |
| related_identifiers: list = [] |
| source = "doi" |
| source_ref = doi |
| else: |
| upload_meta = _validate_upload_metadata({ |
| "title": form.get("title"), |
| "abstract": form.get("abstract"), |
| "keywords": form.get("keywords"), |
| "license": form.get("license"), |
| "resource_type": form.get("resource_type"), |
| "publication_date": form.get("publication_date"), |
| "subject": form.get("subject"), |
| "funding": form.get("funding"), |
| "creators": form.get("creators"), |
| "related_identifiers": form.get("related_identifiers"), |
| "deposit_consent": form.get("deposit_consent"), |
| }) |
|
|
| total = 0 |
| head = b"" |
| with pdf_path.open("wb") as out: |
| while chunk := await pdf.read(64 * 1024): |
| total += len(chunk) |
| if total > MAX_PDF_BYTES: |
| out.close() |
| pdf_path.unlink(missing_ok=True) |
| sub_dir.rmdir() |
| raise HTTPException(413, |
| f"PDF exceeds {MAX_PDF_BYTES // (1024*1024)} MB limit") |
| if not head: |
| head = chunk[:5] |
| out.write(chunk) |
| if not head.startswith(b"%PDF-"): |
| pdf_path.unlink(missing_ok=True) |
| sub_dir.rmdir() |
| raise HTTPException(415, "Uploaded file is not a PDF") |
|
|
| title = upload_meta["title"] |
| abstract = upload_meta["abstract"] |
| keywords = upload_meta["keywords"] |
| license_id = upload_meta["license"] |
| creators = upload_meta["creators"] |
| |
| |
| |
| |
| |
| |
| for c in creators: |
| if c.get("orcid") == _normalize_orcid(auth_orcid): |
| c["orcid"] = token |
| publication_date = upload_meta["publication_date"] |
| resource_type = upload_meta["resource_type"] |
| subject = upload_meta["subject"] |
| funding = upload_meta["funding"] |
| related_identifiers = upload_meta["related_identifiers"] |
| submitter["deposit_consent"] = upload_meta["deposit_consent"] |
| source = "upload" |
| source_ref = pdf.filename or "paper.pdf" |
|
|
| if not has_doi: |
| text = ingest.extract_pdf_text(str(pdf_path)) |
| if len(text) < ingest.PDF_TEXT_MIN_CHARS: |
| pdf_path.unlink(missing_ok=True) |
| try: |
| sub_dir.rmdir() |
| except OSError: |
| pass |
| raise HTTPException(422, { |
| "error": "no_text_layer", |
| "message": ( |
| "The PDF has no extractable text layer " |
| f"({len(text)} chars). ICSAC reviews text-layer PDFs only — " |
| "image-only scans cannot be evaluated by the panel." |
| ), |
| }) |
| pdf_size = pdf_path.stat().st_size |
| pdf_sha = hashlib.sha256(pdf_path.read_bytes()).hexdigest() |
| else: |
| pdf_size = 0 |
| pdf_sha = "" |
|
|
| redacted_form = {**submitter, "orcid": token} |
| auth_block = { |
| "orcid": token, |
| "name_on_record": auth_name or None, |
| "verified": bool(auth_orcid), |
| } |
|
|
| received_at = _now_iso() |
| submission_record = { |
| "sub_id": sub_id, |
| "test_mode": True, |
| "tier": tier, |
| "received_at": received_at, |
| "source": source, |
| "source_ref": source_ref, |
| "form": redacted_form, |
| "auth": auth_block, |
| "doi": doi if has_doi else None, |
| "pending_pdf_fetch": has_doi, |
| "title": title, |
| "abstract": abstract, |
| "keywords": keywords, |
| "license": license_id, |
| "creators": creators, |
| "publication_date": publication_date, |
| "resource_type": resource_type, |
| "subject": subject, |
| "funding": funding, |
| "related_identifiers": related_identifiers, |
| "pdf": { |
| "filename": "paper.pdf", |
| "size_bytes": pdf_size, |
| "sha256": pdf_sha, |
| } if not has_doi else None, |
| } |
| (sub_dir / "submission.json").write_text( |
| json.dumps(submission_record, indent=2) |
| ) |
| (sub_dir / "state.json").write_text(json.dumps({ |
| "state": "received", |
| "test_mode": True, |
| "tier": tier, |
| "received_at": received_at, |
| }, indent=2)) |
|
|
| |
| |
| |
| TEST_QUEUE_DIR.mkdir(parents=True, exist_ok=True) |
| (TEST_QUEUE_DIR / sub_id).write_text(received_at) |
|
|
| _audit_append({ |
| "sub_id": sub_id, |
| "event": "submission_received", |
| "tier": tier, |
| "source": source, |
| "source_ref": source_ref, |
| "title": title[:200], |
| "license": license_id, |
| "pdf_sha256": pdf_sha, |
| "pdf_size_bytes": pdf_size, |
| "auth_orcid": token, |
| "auth_verified": bool(auth_orcid), |
| }, test_mode=True) |
|
|
| _audit_append({ |
| "sub_id": sub_id, |
| "event": "tier_elevated", |
| "tier": tier, |
| "auth_orcid": token, |
| }, test_mode=True) |
|
|
| |
| |
| |
| |
| if tier == 3: |
| try: |
| notify.send_to_curator( |
| f"⚠️ T3 invoked: {token}, at {received_at}. " |
| f"Pipeline running. Kill switch: TEST_TIERS_DISABLED=true.", |
| parse_mode=None, |
| ) |
| except Exception as exc: |
| print(f"T3 tripwire alert failed: {exc}", file=sys.stderr) |
|
|
| return JSONResponse({ |
| "sub_id": sub_id, |
| "status_url": f"/submission/{sub_id}", |
| "state": "received", |
| "received_at": received_at, |
| "test_mode": True, |
| "tier": tier, |
| }) |
|
|
|
|
| |
|
|
| @app.get("/healthz") |
| def healthz(): |
| return {"ok": True, "ts": _now_iso()} |
|
|
|
|
| def _validate_doi_shape(doi: str) -> None: |
| """Synchronous shape-only check on a DOI or arXiv reference. The actual |
| resolution + PDF fetch is deferred to the worker so the handler |
| returns in <1s rather than holding the request open through a multi- |
| MB download. Raises HTTPException only on shape errors that the user |
| can fix at submit time. Accepted shapes: |
| Zenodo DOI : 10.5281/zenodo.NNNNN |
| arXiv DOI : 10.48550/arXiv.YYMM.NNNNN[vN] |
| arXiv ID : YYMM.NNNNN[vN] (modern format only — pre-2007 IDs |
| like math.GT/0309136 are out of scope) |
| """ |
| if ZENODO_DOI_RE.match(doi) or ARXIV_REF_RE.match(doi): |
| return |
| if DOI_SHAPE_RE.match(doi): |
| raise HTTPException(422, { |
| "error": "doi_unsupported", |
| "message": ( |
| "DOI mode supports Zenodo (10.5281/zenodo.NNNNN) and arXiv " |
| "(10.48550/arXiv.YYMM.NNNNN, or a bare arXiv ID like " |
| "2103.12345). Other DOI sources (Crossref, DataCite, " |
| "publisher DOIs) are a known gap — for those, upload the " |
| "PDF directly using the 'Upload PDF' tab." |
| ), |
| }) |
| raise HTTPException(422, { |
| "error": "doi_invalid", |
| "message": ( |
| "That doesn't look like a Zenodo DOI, an arXiv DOI, or an " |
| "arXiv ID. Expected forms: 10.5281/zenodo.NNNNN, " |
| "10.48550/arXiv.YYMM.NNNNN, or YYMM.NNNNN. If you don't " |
| "have any of those, switch to the 'Upload PDF' tab." |
| ), |
| }) |
|
|
|
|
| @app.post("/api/submit") |
| async def api_submit(request: Request): |
| |
| |
| |
| |
| raw_body = await request.body() |
| if len(raw_body) > MAX_PDF_BYTES + 64 * 1024: |
| raise HTTPException(413, |
| f"request body exceeds {MAX_PDF_BYTES // (1024*1024)} MB cap") |
| _verify_hmac(request, raw_body) |
|
|
| form = await request.form() |
|
|
| submitter = _validate_submitter({ |
| "name": form.get("name"), |
| "email": form.get("email"), |
| "orcid": form.get("orcid", ""), |
| "coi": form.get("coi", ""), |
| }) |
|
|
| |
| |
| |
| |
| auth_orcid = (request.headers.get("x-icsac-auth-orcid") or "").strip() |
| auth_name_enc = (request.headers.get("x-icsac-auth-name") or "").strip() |
| auth_name = unquote(auth_name_enc) if auth_name_enc else "" |
|
|
| |
| |
| |
| tier_header = (request.headers.get("x-icsac-test-tier") or "").strip() |
| tier = _resolve_tier(auth_orcid, tier_header) |
|
|
| if is_test_submission(auth_orcid): |
| |
| |
| |
| |
| |
| |
| |
| if tier == 1: |
| raw_doi = form.get("doi", "") or "" |
| doi_for_test = _normalize_doi(raw_doi if isinstance(raw_doi, str) else "") |
| pdf_field = form.get("pdf") |
| title = (form.get("title") or "").strip() or ( |
| "(test mode — DOI submission)" |
| if doi_for_test |
| else "(test mode — PDF submission)" |
| ) |
| if doi_for_test: |
| test_source, test_source_ref = "doi", doi_for_test |
| elif pdf_field is not None and getattr(pdf_field, "filename", ""): |
| test_source = "upload" |
| test_source_ref = pdf_field.filename or "paper.pdf" |
| else: |
| test_source, test_source_ref = "test", "synthetic" |
| if tier_header and not _tiers_disabled(): |
| |
| |
| |
| _audit_append({ |
| "event": "tier_downgrade", |
| "tier_requested": tier_header[:8], |
| "tier_resolved": 1, |
| "auth_orcid": _test_token(1), |
| }, test_mode=True) |
| return handle_test_submission( |
| submitter=submitter, auth_orcid=auth_orcid, |
| auth_name=auth_name, source=test_source, |
| source_ref=test_source_ref, title=title[:400], |
| received_at=_now_iso(), |
| ) |
| |
| return await handle_test_pipeline_submission( |
| tier=tier, request=request, form=form, submitter=submitter, |
| auth_orcid=auth_orcid, auth_name=auth_name, |
| ) |
|
|
| pdf = form.get("pdf") |
| has_pdf = pdf is not None and hasattr(pdf, "read") and getattr(pdf, "filename", "") |
| raw_doi = form.get("doi", "") or "" |
| doi = _normalize_doi(raw_doi if isinstance(raw_doi, str) else "") |
| has_doi = bool(doi) |
|
|
| if has_doi and has_pdf: |
| raise HTTPException(400, { |
| "error": "doi_xor_pdf", |
| "message": "Provide a DOI OR a PDF upload, not both.", |
| }) |
| if not has_doi and not has_pdf: |
| raise HTTPException(400, { |
| "error": "doi_or_pdf_required", |
| "message": "Provide a DOI or upload a PDF.", |
| }) |
|
|
| if has_doi: |
| |
| |
| |
| |
| |
| _validate_doi_shape(doi) |
|
|
| sub_id = _allocate_sub_id() |
| sub_dir = SUBMISSIONS_ROOT / sub_id |
| sub_dir.mkdir(parents=True, exist_ok=False) |
| pdf_path = sub_dir / "paper.pdf" |
|
|
| if has_doi: |
| |
| |
| |
| |
| |
| |
| |
| title = "(deferred — resolving from DOI)" |
| abstract = "" |
| keywords: list = [] |
| license_id = "" |
| creators = [{"name": submitter["name"], "orcid": submitter["orcid"]}] |
| publication_date = _now_iso()[:10] |
| resource_type = None |
| subject = None |
| funding = None |
| related_identifiers: list = [] |
| source = "doi" |
| source_ref = doi |
| else: |
| |
| upload_meta = _validate_upload_metadata({ |
| "title": form.get("title"), |
| "abstract": form.get("abstract"), |
| "keywords": form.get("keywords"), |
| "license": form.get("license"), |
| "resource_type": form.get("resource_type"), |
| "publication_date": form.get("publication_date"), |
| "subject": form.get("subject"), |
| "funding": form.get("funding"), |
| "creators": form.get("creators"), |
| "related_identifiers": form.get("related_identifiers"), |
| "deposit_consent": form.get("deposit_consent"), |
| }) |
|
|
| total = 0 |
| head = b"" |
| with pdf_path.open("wb") as out: |
| while chunk := await pdf.read(64 * 1024): |
| total += len(chunk) |
| if total > MAX_PDF_BYTES: |
| out.close() |
| pdf_path.unlink(missing_ok=True) |
| sub_dir.rmdir() |
| raise HTTPException(413, |
| f"PDF exceeds {MAX_PDF_BYTES // (1024*1024)} MB limit") |
| if not head: |
| head = chunk[:5] |
| out.write(chunk) |
| if not head.startswith(b"%PDF-"): |
| pdf_path.unlink(missing_ok=True) |
| sub_dir.rmdir() |
| raise HTTPException(415, "Uploaded file is not a PDF") |
|
|
| title = upload_meta["title"] |
| abstract = upload_meta["abstract"] |
| keywords = upload_meta["keywords"] |
| license_id = upload_meta["license"] |
| creators = upload_meta["creators"] |
| publication_date = upload_meta["publication_date"] |
| resource_type = upload_meta["resource_type"] |
| subject = upload_meta["subject"] |
| funding = upload_meta["funding"] |
| related_identifiers = upload_meta["related_identifiers"] |
| |
| |
| |
| submitter["deposit_consent"] = upload_meta["deposit_consent"] |
| source = "upload" |
| source_ref = pdf.filename or "paper.pdf" |
|
|
| |
| |
| if not has_doi: |
| text = ingest.extract_pdf_text(str(pdf_path)) |
| if len(text) < ingest.PDF_TEXT_MIN_CHARS: |
| pdf_path.unlink(missing_ok=True) |
| try: |
| sub_dir.rmdir() |
| except OSError: |
| pass |
| raise HTTPException(422, { |
| "error": "no_text_layer", |
| "message": ( |
| "The PDF has no extractable text layer " |
| f"({len(text)} chars). ICSAC reviews text-layer PDFs only — " |
| "image-only scans cannot be evaluated by the panel." |
| ), |
| }) |
| pdf_size = pdf_path.stat().st_size |
| pdf_sha = hashlib.sha256(pdf_path.read_bytes()).hexdigest() |
| else: |
| pdf_size = 0 |
| pdf_sha = "" |
|
|
| |
| |
| |
| |
| |
| auth_block = { |
| "orcid": auth_orcid or None, |
| "name_on_record": auth_name or None, |
| "verified": bool(auth_orcid), |
| } |
|
|
| received_at = _now_iso() |
| submission_record = { |
| "sub_id": sub_id, |
| "received_at": received_at, |
| "source": source, |
| "source_ref": source_ref, |
| "form": submitter, |
| "auth": auth_block, |
| "doi": doi if has_doi else None, |
| "pending_pdf_fetch": has_doi, |
| "title": title, |
| "abstract": abstract, |
| "keywords": keywords, |
| "license": license_id, |
| "creators": creators, |
| "publication_date": publication_date, |
| "resource_type": resource_type, |
| "subject": subject, |
| "funding": funding, |
| "related_identifiers": related_identifiers, |
| "pdf": { |
| "filename": "paper.pdf", |
| "size_bytes": pdf_size, |
| "sha256": pdf_sha, |
| } if not has_doi else None, |
| } |
| (sub_dir / "submission.json").write_text( |
| json.dumps(submission_record, indent=2) |
| ) |
| (sub_dir / "state.json").write_text( |
| json.dumps({"state": "received", "received_at": received_at}, indent=2) |
| ) |
|
|
| QUEUE_DIR.mkdir(parents=True, exist_ok=True) |
| (QUEUE_DIR / sub_id).write_text(received_at) |
|
|
| _audit_append({ |
| "sub_id": sub_id, |
| "event": "submission_received", |
| "source": source, |
| "source_ref": source_ref, |
| "title": title[:200], |
| "license": license_id, |
| "pdf_sha256": pdf_sha, |
| "pdf_size_bytes": pdf_size, |
| "auth_orcid": auth_orcid or None, |
| "auth_verified": bool(auth_orcid), |
| }) |
|
|
| |
| |
| |
| |
| |
| |
| if not has_doi: |
| try: |
| notify_author.send_received( |
| to=submitter["email"], sub_id=sub_id, |
| title=title, author_name=submitter["name"], |
| ) |
| except Exception as exc: |
| print(f"received email failed (non-fatal): {exc}", file=sys.stderr) |
| _audit_append({"sub_id": sub_id, "event": "received_email_failed", |
| "error": str(exc)[:200]}) |
|
|
| try: |
| notify.send_to_curator( |
| f"ICSAC submission received\n\n" |
| f"ID: {sub_id}\n" |
| f"Source: {source} ({source_ref})\n" |
| f"Title: {title[:120]}\n" |
| f"Submitter: {submitter['name']} <{submitter['email']}>\n" |
| f"License: {license_id or '(from DOI)'}\n" |
| f"Status: queued for panel review", |
| parse_mode=None, |
| ) |
| except Exception: |
| pass |
|
|
| try: |
| notify.send_ntfy(f"{sub_id}: {title[:120]}", |
| title="ICSAC submission received") |
| except Exception: |
| pass |
|
|
| return JSONResponse({ |
| "sub_id": sub_id, |
| "status_url": f"/submission/{sub_id}", |
| "state": "received", |
| "received_at": received_at, |
| }) |
|
|
|
|
| @app.get("/api/submission/{sub_id}/state") |
| def api_submission_state(sub_id: str): |
| if not (SUB_ID_RE.match(sub_id) or TEST_SUB_ID_RE.match(sub_id)): |
| raise HTTPException(400, "bad submission id") |
| |
| |
| is_test = TEST_SUB_ID_RE.match(sub_id) is not None |
| state_path = (TEST_SUBMISSIONS_ROOT if is_test else SUBMISSIONS_ROOT) / sub_id / "state.json" |
| if not state_path.exists(): |
| raise HTTPException(404, "no such submission") |
| data = json.loads(state_path.read_text()) |
| |
| |
| |
| |
| received_utc = data.get("received_at") |
| completed_utc = data.get("completed_at") |
| return { |
| "sub_id": sub_id, |
| "state": data.get("state", "unknown"), |
| "received_at": received_utc, |
| "received_at_display": to_et_display(received_utc), |
| "completed_at": completed_utc, |
| "completed_at_display": to_et_display(completed_utc), |
| "decision": data.get("decision"), |
| "test_mode": bool(data.get("test_mode", False)), |
| "tier": data.get("tier") if data.get("test_mode") else None, |
| } |
|
|
|
|
| @app.exception_handler(HTTPException) |
| async def _http_exc_handler(request: Request, exc: HTTPException): |
| detail = exc.detail |
| body = detail if isinstance(detail, dict) else {"error": detail} |
| return JSONResponse(body, status_code=exc.status_code) |
|
|