zotero-proxy / app.py
iurbinah's picture
Update app.py
333d35b verified
import asyncio
import hashlib
import json
import re
from typing import List, Optional, Dict, Any
from urllib.parse import urlparse, unquote
import httpx
from fastapi import FastAPI, HTTPException, Query, Body
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel, Field, field_validator
from pydantic_settings import BaseSettings
# =========================
# Settings
# =========================
class Settings(BaseSettings):
ZOTERO_API_KEY: str
ZOTERO_DEFAULT_GROUP_ID: Optional[str] = None
ZOTERO_API_VERSION: str = "3"
HTTP_TIMEOUT_SECONDS: float = 20.0
MAX_RETRIES: int = 4
CORS_ALLOW_ORIGINS: str = "*" # comma-separated or "*"
class Config:
case_sensitive = True
settings = Settings()
# =========================
# Models
# =========================
class Creator(BaseModel):
creatorType: str = Field("author", examples=["author", "editor"])
firstName: str = ""
lastName: str
class Tag(BaseModel):
tag: str
class ItemModel(BaseModel):
"""
Canonical Zotero item payload.
- conferencePaper uses 'proceedingsTitle'
- journalArticle uses 'publicationTitle'
"""
itemType: str = Field(..., examples=["journalArticle", "conferencePaper"])
title: str
creators: List[Creator] = []
# Titles
publicationTitle: Optional[str] = None # journalArticle
proceedingsTitle: Optional[str] = None # conferencePaper
conferenceName: Optional[str] = None
abstractNote: Optional[str] = None
date: Optional[str] = None
pages: Optional[str] = None
url: Optional[str] = None
# common extras we’ll map from BibTeX
volume: Optional[str] = None
issue: Optional[str] = None # BibTeX 'number'
publisher: Optional[str] = None
place: Optional[str] = None # BibTeX 'address'
series: Optional[str] = None
DOI: Optional[str] = None
ISBN: Optional[str] = None
tags: List[Tag] = []
collections: List[str] = []
extra: Optional[str] = None
relations: Dict[str, Any] = {}
class CreateRequest(BaseModel):
"""
Public/ingress model. Accepts friendly aliases + raw BibTeX.
"""
groupId: Optional[str] = None
writeToken: Optional[str] = None
item: Optional[ItemModel] = None
# Friendly fields
title: Optional[str] = None
authors: Optional[str] = None
editors: Optional[str] = None
itemType: Optional[str] = None
# Titles (aliases)
publicationTitle: Optional[str] = None
proceedingsTitle: Optional[str] = None
bookTitle: Optional[str] = None # alias → proceedingsTitle
conferenceName: Optional[str] = None
date: Optional[str] = None
pages: Optional[str] = None
url: Optional[str] = None
abstractNote: Optional[str] = None
tags: Optional[List[str]] = None
collections: Optional[List[str]] = None
# extra scholarly fields
volume: Optional[str] = None
issue: Optional[str] = None
publisher: Optional[str] = None
place: Optional[str] = None
series: Optional[str] = None
DOI: Optional[str] = None
ISBN: Optional[str] = None
# Optional raw BibTeX
bibtex: Optional[str] = None
@field_validator("groupId", mode="before")
@classmethod
def default_group(cls, v: Optional[str]) -> Optional[str]:
return v or settings.ZOTERO_DEFAULT_GROUP_ID
# =========================
# Helpers
# =========================
AUTHOR_SPLIT_RE = re.compile(r"\s*(?:;| and )\s*", re.IGNORECASE)
def parse_people(s: Optional[str], creator_type: str = "author") -> List[Creator]:
if not s:
return []
people = []
for part in AUTHOR_SPLIT_RE.split(s.strip()):
if not part:
continue
if "," in part:
last, first = [p.strip() for p in part.split(",", 1)]
else:
bits = part.strip().split()
if len(bits) == 1:
last, first = bits[0], ""
else:
last, first = bits[-1], " ".join(bits[:-1])
people.append(Creator(creatorType=creator_type, firstName=first, lastName=last))
return people
def _bt_get(bib: str, name: str) -> Optional[str]:
mm = re.search(rf"{name}\s*=\s*\{{([^}}]+)\}}", bib, re.IGNORECASE)
if mm:
return mm.group(1).strip()
# also support quotes "..."
mm = re.search(rf"{name}\s*=\s*\"([^\"]+)\"", bib, re.IGNORECASE)
return mm.group(1).strip() if mm else None
def parse_bibtex(bib: str) -> Dict[str, Any]:
"""
Very light BibTeX parsing.
"""
out: Dict[str, str] = {}
m = re.search(r"@\s*(\w+)\s*\{", bib)
if m:
out["itemType"] = {
"article": "journalArticle",
"inproceedings": "conferencePaper",
"proceedings": "conferencePaper",
"book": "book",
}.get(m.group(1).lower(), m.group(1))
out["title"] = _bt_get(bib, "title")
out["authors"] = _bt_get(bib, "author")
out["editors"] = _bt_get(bib, "editor")
out["publicationTitle"] = _bt_get(bib, "journal")
out["proceedingsTitle"] = _bt_get(bib, "booktitle")
out["pages"] = _bt_get(bib, "pages")
out["date"] = _bt_get(bib, "year")
out["url"] = _bt_get(bib, "url")
out["abstractNote"] = _bt_get(bib, "abstract")
# extras
out["series"] = _bt_get(bib, "series")
out["volume"] = _bt_get(bib, "volume")
out["issue"] = _bt_get(bib, "number")
out["publisher"] = _bt_get(bib, "publisher")
out["place"] = _bt_get(bib, "address")
out["DOI"] = _bt_get(bib, "doi")
out["ISBN"] = _bt_get(bib, "isbn")
return {k: v for k, v in out.items() if v}
def normalize_titles_for_type(item: Dict[str, Any]) -> None:
"""
Normalize title fields based on itemType.
"""
itype = (item.get("itemType") or "").lower()
if itype == "conferencepaper":
if not item.get("proceedingsTitle"):
if item.get("bookTitle"):
item["proceedingsTitle"] = item.pop("bookTitle")
elif item.get("publicationTitle"):
item["proceedingsTitle"] = item["publicationTitle"]
item.pop("bookTitle", None)
item.pop("publicationTitle", None)
elif itype == "journalarticle":
# prefer publicationTitle; drop conference fields
item.pop("proceedingsTitle", None)
item.pop("bookTitle", None)
else:
# Coalesce: if publicationTitle missing, reuse proceedingsTitle
if not item.get("publicationTitle") and item.get("proceedingsTitle"):
item["publicationTitle"] = item.pop("proceedingsTitle")
item.pop("bookTitle", None)
def guess_title_from_url(u: Optional[str]) -> Optional[str]:
if not u:
return None
try:
p = urlparse(u)
if not p.path:
return None
parts = [seg for seg in p.path.split("/") if seg]
if not parts:
return None
last = parts[-1]
# strip extension
if "." in last:
last = last.rsplit(".", 1)[0]
text = unquote(last)
text = re.sub(r"[-_]+", " ", text).strip()
if not text:
return None
return text[:1].upper() + text[1:]
except Exception:
return None
def merge_request_to_item(req: CreateRequest) -> ItemModel:
"""
Merge precedence:
1) req.item (canonical)
2) parsed bibtex (fills missing)
3) friendly fields on req (override/add)
4) normalize titles
5) ensure title (fallback from URL slug if still missing)
"""
item: Dict[str, Any] = json.loads(req.item.json()) if req.item else {}
# (2) From BibTeX
if req.bibtex:
bt = parse_bibtex(req.bibtex)
item.setdefault("itemType", bt.get("itemType"))
for k in [
"title", "publicationTitle", "proceedingsTitle", "conferenceName",
"pages", "date", "url", "abstractNote", "series", "volume", "issue",
"publisher", "place", "DOI", "ISBN"
]:
if bt.get(k) and not item.get(k):
item[k] = bt[k]
# creators
if bt.get("authors") and "creators" not in item:
item["creators"] = parse_people(bt["authors"], "author")
if bt.get("editors"):
item.setdefault("creators", [])
item["creators"].extend(parse_people(bt["editors"], "editor"))
# (3) Friendly fields (override/add)
if req.itemType: item["itemType"] = req.itemType
if req.title: item["title"] = req.title
if req.publicationTitle: item["publicationTitle"] = req.publicationTitle
if req.proceedingsTitle: item["proceedingsTitle"] = req.proceedingsTitle
if req.bookTitle: item["bookTitle"] = req.bookTitle
if req.conferenceName: item["conferenceName"] = req.conferenceName
if req.pages: item["pages"] = req.pages
if req.date: item["date"] = req.date
if req.url: item["url"] = req.url
if req.abstractNote: item["abstractNote"] = req.abstractNote
# extras
if req.volume: item["volume"] = req.volume
if req.issue: item["issue"] = req.issue
if req.publisher: item["publisher"] = req.publisher
if req.place: item["place"] = req.place
if req.series: item["series"] = req.series
if req.DOI: item["DOI"] = req.DOI
if req.ISBN: item["ISBN"] = req.ISBN
# creators (authors/editors text)
creators: List[Creator] = []
if req.authors: creators.extend(parse_people(req.authors, "author"))
if req.editors: creators.extend(parse_people(req.editors, "editor"))
if creators: item["creators"] = creators
# tags/collections
if req.tags is not None:
item["tags"] = [Tag(tag=t) for t in req.tags]
if req.collections is not None:
item["collections"] = req.collections
# (4) Normalize titles by type
normalize_titles_for_type(item)
# (5) Ensure title: fallback from URL slug if still missing
if not item.get("title"):
t = guess_title_from_url(item.get("url"))
if t:
item["title"] = t
# Final validation
try:
return ItemModel(**item)
except Exception as e:
raise HTTPException(status_code=422, detail=f"Invalid item payload: {e}")
def default_write_token(item: ItemModel) -> str:
base = f"{item.itemType}|{item.title}|{item.date or ''}"
return hashlib.sha256(base.encode("utf-8")).hexdigest()[:32]
async def zotero_post_items(group_id: str, items: List[ItemModel], write_token: str) -> Dict[str, Any]:
url = f"https://api.zotero.org/groups/{group_id}/items"
headers = {
"Zotero-API-Key": settings.ZOTERO_API_KEY,
"Zotero-API-Version": settings.ZOTERO_API_VERSION,
"Content-Type": "application/json",
"Accept": "application/json",
"Zotero-Write-Token": write_token,
}
data = [json.loads(i.json(exclude_none=True)) for i in items]
timeout = httpx.Timeout(settings.HTTP_TIMEOUT_SECONDS)
attempt = 0
last_exc: Optional[Exception] = None
async with httpx.AsyncClient(timeout=timeout) as client:
while attempt <= settings.MAX_RETRIES:
try:
resp = await client.post(url, headers=headers, json=data)
if resp.status_code in (200, 201, 204):
return {"status": resp.status_code, "zotero": resp.json() if resp.text else {}}
if resp.status_code in (429, 500, 502, 503, 504):
retry_after = 0
ra = resp.headers.get("Retry-After")
if ra:
try:
retry_after = int(ra)
except ValueError:
retry_after = 0
delay = max(retry_after, 2 ** attempt)
attempt += 1
await asyncio.sleep(delay)
continue
raise HTTPException(status_code=resp.status_code, detail=resp.text)
except httpx.HTTPError as e:
last_exc = e
attempt += 1
await asyncio.sleep(2 ** (attempt - 1))
continue
raise HTTPException(status_code=502, detail=f"Upstream Zotero error or timeout: {last_exc}")
# =========================
# FastAPI app
# =========================
app = FastAPI(title="Zotero Proxy", version="1.2.0")
allow_origins = ["*"] if settings.CORS_ALLOW_ORIGINS.strip() == "*" else [
o.strip() for o in settings.CORS_ALLOW_ORIGINS.split(",") if o.strip()
]
app.add_middleware(
CORSMiddleware,
allow_origins=allow_origins,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
@app.get("/ping")
async def ping():
return {"ok": True}
@app.post("/items")
async def create_item(req: CreateRequest = Body(...)):
group_id = req.groupId or settings.ZOTERO_DEFAULT_GROUP_ID
if not group_id:
raise HTTPException(status_code=400, detail="groupId is required (or set ZOTERO_DEFAULT_GROUP_ID).")
item = merge_request_to_item(req)
# If still no title after all fallbacks, fail clearly:
if not item.title:
raise HTTPException(status_code=422, detail="Missing 'title'. Provide it directly, via BibTeX, or a URL I can slug.")
write_token = req.writeToken or default_write_token(item)
result = await zotero_post_items(group_id, [item], write_token)
return {"groupId": group_id, "writeToken": write_token, **result}
@app.get("/items")
async def create_item_from_query(
groupId: Optional[str] = Query(None),
title: Optional[str] = Query(None),
authors: Optional[str] = Query(None),
editors: Optional[str] = Query(None),
itemType: Optional[str] = Query(None),
# Titles
publicationTitle: Optional[str] = Query(None),
proceedingsTitle: Optional[str] = Query(None),
bookTitle: Optional[str] = Query(None),
conferenceName: Optional[str] = Query(None),
date: Optional[str] = Query(None),
pages: Optional[str] = Query(None),
url: Optional[str] = Query(None),
abstractNote: Optional[str] = Query(None),
tags: Optional[str] = Query(None), # comma-separated
collections: Optional[str] = Query(None), # comma-separated
# extras
volume: Optional[str] = Query(None),
issue: Optional[str] = Query(None),
publisher: Optional[str] = Query(None),
place: Optional[str] = Query(None),
series: Optional[str] = Query(None),
DOI: Optional[str] = Query(None),
ISBN: Optional[str] = Query(None),
writeToken: Optional[str] = Query(None),
bibtex: Optional[str] = Query(None),
):
req = CreateRequest(
groupId=groupId or settings.ZOTERO_DEFAULT_GROUP_ID,
writeToken=writeToken,
title=title,
authors=authors,
editors=editors,
itemType=itemType,
publicationTitle=publicationTitle,
proceedingsTitle=proceedingsTitle,
bookTitle=bookTitle,
conferenceName=conferenceName,
date=date,
pages=pages,
url=url,
abstractNote=abstractNote,
tags=[t.strip() for t in tags.split(",")] if tags else None,
collections=[c.strip() for c in collections.split(",")] if collections else None,
volume=volume,
issue=issue,
publisher=publisher,
place=place,
series=series,
DOI=DOI,
ISBN=ISBN,
bibtex=bibtex,
)
return await create_item(req)