import asyncio import hashlib import json import re from typing import List, Optional, Dict, Any from urllib.parse import urlparse, unquote import httpx from fastapi import FastAPI, HTTPException, Query, Body from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel, Field, field_validator from pydantic_settings import BaseSettings # ========================= # Settings # ========================= class Settings(BaseSettings): ZOTERO_API_KEY: str ZOTERO_DEFAULT_GROUP_ID: Optional[str] = None ZOTERO_API_VERSION: str = "3" HTTP_TIMEOUT_SECONDS: float = 20.0 MAX_RETRIES: int = 4 CORS_ALLOW_ORIGINS: str = "*" # comma-separated or "*" class Config: case_sensitive = True settings = Settings() # ========================= # Models # ========================= class Creator(BaseModel): creatorType: str = Field("author", examples=["author", "editor"]) firstName: str = "" lastName: str class Tag(BaseModel): tag: str class ItemModel(BaseModel): """ Canonical Zotero item payload. - conferencePaper uses 'proceedingsTitle' - journalArticle uses 'publicationTitle' """ itemType: str = Field(..., examples=["journalArticle", "conferencePaper"]) title: str creators: List[Creator] = [] # Titles publicationTitle: Optional[str] = None # journalArticle proceedingsTitle: Optional[str] = None # conferencePaper conferenceName: Optional[str] = None abstractNote: Optional[str] = None date: Optional[str] = None pages: Optional[str] = None url: Optional[str] = None # common extras we’ll map from BibTeX volume: Optional[str] = None issue: Optional[str] = None # BibTeX 'number' publisher: Optional[str] = None place: Optional[str] = None # BibTeX 'address' series: Optional[str] = None DOI: Optional[str] = None ISBN: Optional[str] = None tags: List[Tag] = [] collections: List[str] = [] extra: Optional[str] = None relations: Dict[str, Any] = {} class CreateRequest(BaseModel): """ Public/ingress model. Accepts friendly aliases + raw BibTeX. """ groupId: Optional[str] = None writeToken: Optional[str] = None item: Optional[ItemModel] = None # Friendly fields title: Optional[str] = None authors: Optional[str] = None editors: Optional[str] = None itemType: Optional[str] = None # Titles (aliases) publicationTitle: Optional[str] = None proceedingsTitle: Optional[str] = None bookTitle: Optional[str] = None # alias → proceedingsTitle conferenceName: Optional[str] = None date: Optional[str] = None pages: Optional[str] = None url: Optional[str] = None abstractNote: Optional[str] = None tags: Optional[List[str]] = None collections: Optional[List[str]] = None # extra scholarly fields volume: Optional[str] = None issue: Optional[str] = None publisher: Optional[str] = None place: Optional[str] = None series: Optional[str] = None DOI: Optional[str] = None ISBN: Optional[str] = None # Optional raw BibTeX bibtex: Optional[str] = None @field_validator("groupId", mode="before") @classmethod def default_group(cls, v: Optional[str]) -> Optional[str]: return v or settings.ZOTERO_DEFAULT_GROUP_ID # ========================= # Helpers # ========================= AUTHOR_SPLIT_RE = re.compile(r"\s*(?:;| and )\s*", re.IGNORECASE) def parse_people(s: Optional[str], creator_type: str = "author") -> List[Creator]: if not s: return [] people = [] for part in AUTHOR_SPLIT_RE.split(s.strip()): if not part: continue if "," in part: last, first = [p.strip() for p in part.split(",", 1)] else: bits = part.strip().split() if len(bits) == 1: last, first = bits[0], "" else: last, first = bits[-1], " ".join(bits[:-1]) people.append(Creator(creatorType=creator_type, firstName=first, lastName=last)) return people def _bt_get(bib: str, name: str) -> Optional[str]: mm = re.search(rf"{name}\s*=\s*\{{([^}}]+)\}}", bib, re.IGNORECASE) if mm: return mm.group(1).strip() # also support quotes "..." mm = re.search(rf"{name}\s*=\s*\"([^\"]+)\"", bib, re.IGNORECASE) return mm.group(1).strip() if mm else None def parse_bibtex(bib: str) -> Dict[str, Any]: """ Very light BibTeX parsing. """ out: Dict[str, str] = {} m = re.search(r"@\s*(\w+)\s*\{", bib) if m: out["itemType"] = { "article": "journalArticle", "inproceedings": "conferencePaper", "proceedings": "conferencePaper", "book": "book", }.get(m.group(1).lower(), m.group(1)) out["title"] = _bt_get(bib, "title") out["authors"] = _bt_get(bib, "author") out["editors"] = _bt_get(bib, "editor") out["publicationTitle"] = _bt_get(bib, "journal") out["proceedingsTitle"] = _bt_get(bib, "booktitle") out["pages"] = _bt_get(bib, "pages") out["date"] = _bt_get(bib, "year") out["url"] = _bt_get(bib, "url") out["abstractNote"] = _bt_get(bib, "abstract") # extras out["series"] = _bt_get(bib, "series") out["volume"] = _bt_get(bib, "volume") out["issue"] = _bt_get(bib, "number") out["publisher"] = _bt_get(bib, "publisher") out["place"] = _bt_get(bib, "address") out["DOI"] = _bt_get(bib, "doi") out["ISBN"] = _bt_get(bib, "isbn") return {k: v for k, v in out.items() if v} def normalize_titles_for_type(item: Dict[str, Any]) -> None: """ Normalize title fields based on itemType. """ itype = (item.get("itemType") or "").lower() if itype == "conferencepaper": if not item.get("proceedingsTitle"): if item.get("bookTitle"): item["proceedingsTitle"] = item.pop("bookTitle") elif item.get("publicationTitle"): item["proceedingsTitle"] = item["publicationTitle"] item.pop("bookTitle", None) item.pop("publicationTitle", None) elif itype == "journalarticle": # prefer publicationTitle; drop conference fields item.pop("proceedingsTitle", None) item.pop("bookTitle", None) else: # Coalesce: if publicationTitle missing, reuse proceedingsTitle if not item.get("publicationTitle") and item.get("proceedingsTitle"): item["publicationTitle"] = item.pop("proceedingsTitle") item.pop("bookTitle", None) def guess_title_from_url(u: Optional[str]) -> Optional[str]: if not u: return None try: p = urlparse(u) if not p.path: return None parts = [seg for seg in p.path.split("/") if seg] if not parts: return None last = parts[-1] # strip extension if "." in last: last = last.rsplit(".", 1)[0] text = unquote(last) text = re.sub(r"[-_]+", " ", text).strip() if not text: return None return text[:1].upper() + text[1:] except Exception: return None def merge_request_to_item(req: CreateRequest) -> ItemModel: """ Merge precedence: 1) req.item (canonical) 2) parsed bibtex (fills missing) 3) friendly fields on req (override/add) 4) normalize titles 5) ensure title (fallback from URL slug if still missing) """ item: Dict[str, Any] = json.loads(req.item.json()) if req.item else {} # (2) From BibTeX if req.bibtex: bt = parse_bibtex(req.bibtex) item.setdefault("itemType", bt.get("itemType")) for k in [ "title", "publicationTitle", "proceedingsTitle", "conferenceName", "pages", "date", "url", "abstractNote", "series", "volume", "issue", "publisher", "place", "DOI", "ISBN" ]: if bt.get(k) and not item.get(k): item[k] = bt[k] # creators if bt.get("authors") and "creators" not in item: item["creators"] = parse_people(bt["authors"], "author") if bt.get("editors"): item.setdefault("creators", []) item["creators"].extend(parse_people(bt["editors"], "editor")) # (3) Friendly fields (override/add) if req.itemType: item["itemType"] = req.itemType if req.title: item["title"] = req.title if req.publicationTitle: item["publicationTitle"] = req.publicationTitle if req.proceedingsTitle: item["proceedingsTitle"] = req.proceedingsTitle if req.bookTitle: item["bookTitle"] = req.bookTitle if req.conferenceName: item["conferenceName"] = req.conferenceName if req.pages: item["pages"] = req.pages if req.date: item["date"] = req.date if req.url: item["url"] = req.url if req.abstractNote: item["abstractNote"] = req.abstractNote # extras if req.volume: item["volume"] = req.volume if req.issue: item["issue"] = req.issue if req.publisher: item["publisher"] = req.publisher if req.place: item["place"] = req.place if req.series: item["series"] = req.series if req.DOI: item["DOI"] = req.DOI if req.ISBN: item["ISBN"] = req.ISBN # creators (authors/editors text) creators: List[Creator] = [] if req.authors: creators.extend(parse_people(req.authors, "author")) if req.editors: creators.extend(parse_people(req.editors, "editor")) if creators: item["creators"] = creators # tags/collections if req.tags is not None: item["tags"] = [Tag(tag=t) for t in req.tags] if req.collections is not None: item["collections"] = req.collections # (4) Normalize titles by type normalize_titles_for_type(item) # (5) Ensure title: fallback from URL slug if still missing if not item.get("title"): t = guess_title_from_url(item.get("url")) if t: item["title"] = t # Final validation try: return ItemModel(**item) except Exception as e: raise HTTPException(status_code=422, detail=f"Invalid item payload: {e}") def default_write_token(item: ItemModel) -> str: base = f"{item.itemType}|{item.title}|{item.date or ''}" return hashlib.sha256(base.encode("utf-8")).hexdigest()[:32] async def zotero_post_items(group_id: str, items: List[ItemModel], write_token: str) -> Dict[str, Any]: url = f"https://api.zotero.org/groups/{group_id}/items" headers = { "Zotero-API-Key": settings.ZOTERO_API_KEY, "Zotero-API-Version": settings.ZOTERO_API_VERSION, "Content-Type": "application/json", "Accept": "application/json", "Zotero-Write-Token": write_token, } data = [json.loads(i.json(exclude_none=True)) for i in items] timeout = httpx.Timeout(settings.HTTP_TIMEOUT_SECONDS) attempt = 0 last_exc: Optional[Exception] = None async with httpx.AsyncClient(timeout=timeout) as client: while attempt <= settings.MAX_RETRIES: try: resp = await client.post(url, headers=headers, json=data) if resp.status_code in (200, 201, 204): return {"status": resp.status_code, "zotero": resp.json() if resp.text else {}} if resp.status_code in (429, 500, 502, 503, 504): retry_after = 0 ra = resp.headers.get("Retry-After") if ra: try: retry_after = int(ra) except ValueError: retry_after = 0 delay = max(retry_after, 2 ** attempt) attempt += 1 await asyncio.sleep(delay) continue raise HTTPException(status_code=resp.status_code, detail=resp.text) except httpx.HTTPError as e: last_exc = e attempt += 1 await asyncio.sleep(2 ** (attempt - 1)) continue raise HTTPException(status_code=502, detail=f"Upstream Zotero error or timeout: {last_exc}") # ========================= # FastAPI app # ========================= app = FastAPI(title="Zotero Proxy", version="1.2.0") allow_origins = ["*"] if settings.CORS_ALLOW_ORIGINS.strip() == "*" else [ o.strip() for o in settings.CORS_ALLOW_ORIGINS.split(",") if o.strip() ] app.add_middleware( CORSMiddleware, allow_origins=allow_origins, allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) @app.get("/ping") async def ping(): return {"ok": True} @app.post("/items") async def create_item(req: CreateRequest = Body(...)): group_id = req.groupId or settings.ZOTERO_DEFAULT_GROUP_ID if not group_id: raise HTTPException(status_code=400, detail="groupId is required (or set ZOTERO_DEFAULT_GROUP_ID).") item = merge_request_to_item(req) # If still no title after all fallbacks, fail clearly: if not item.title: raise HTTPException(status_code=422, detail="Missing 'title'. Provide it directly, via BibTeX, or a URL I can slug.") write_token = req.writeToken or default_write_token(item) result = await zotero_post_items(group_id, [item], write_token) return {"groupId": group_id, "writeToken": write_token, **result} @app.get("/items") async def create_item_from_query( groupId: Optional[str] = Query(None), title: Optional[str] = Query(None), authors: Optional[str] = Query(None), editors: Optional[str] = Query(None), itemType: Optional[str] = Query(None), # Titles publicationTitle: Optional[str] = Query(None), proceedingsTitle: Optional[str] = Query(None), bookTitle: Optional[str] = Query(None), conferenceName: Optional[str] = Query(None), date: Optional[str] = Query(None), pages: Optional[str] = Query(None), url: Optional[str] = Query(None), abstractNote: Optional[str] = Query(None), tags: Optional[str] = Query(None), # comma-separated collections: Optional[str] = Query(None), # comma-separated # extras volume: Optional[str] = Query(None), issue: Optional[str] = Query(None), publisher: Optional[str] = Query(None), place: Optional[str] = Query(None), series: Optional[str] = Query(None), DOI: Optional[str] = Query(None), ISBN: Optional[str] = Query(None), writeToken: Optional[str] = Query(None), bibtex: Optional[str] = Query(None), ): req = CreateRequest( groupId=groupId or settings.ZOTERO_DEFAULT_GROUP_ID, writeToken=writeToken, title=title, authors=authors, editors=editors, itemType=itemType, publicationTitle=publicationTitle, proceedingsTitle=proceedingsTitle, bookTitle=bookTitle, conferenceName=conferenceName, date=date, pages=pages, url=url, abstractNote=abstractNote, tags=[t.strip() for t in tags.split(",")] if tags else None, collections=[c.strip() for c in collections.split(",")] if collections else None, volume=volume, issue=issue, publisher=publisher, place=place, series=series, DOI=DOI, ISBN=ISBN, bibtex=bibtex, ) return await create_item(req)