Spaces:
Sleeping
Sleeping
| import asyncio | |
| import hashlib | |
| import json | |
| import re | |
| from typing import List, Optional, Dict, Any | |
| from urllib.parse import urlparse, unquote | |
| import httpx | |
| from fastapi import FastAPI, HTTPException, Query, Body | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from pydantic import BaseModel, Field, field_validator | |
| from pydantic_settings import BaseSettings | |
| # ========================= | |
| # Settings | |
| # ========================= | |
| class Settings(BaseSettings): | |
| ZOTERO_API_KEY: str | |
| ZOTERO_DEFAULT_GROUP_ID: Optional[str] = None | |
| ZOTERO_API_VERSION: str = "3" | |
| HTTP_TIMEOUT_SECONDS: float = 20.0 | |
| MAX_RETRIES: int = 4 | |
| CORS_ALLOW_ORIGINS: str = "*" # comma-separated or "*" | |
| class Config: | |
| case_sensitive = True | |
| settings = Settings() | |
| # ========================= | |
| # Models | |
| # ========================= | |
| class Creator(BaseModel): | |
| creatorType: str = Field("author", examples=["author", "editor"]) | |
| firstName: str = "" | |
| lastName: str | |
| class Tag(BaseModel): | |
| tag: str | |
| class ItemModel(BaseModel): | |
| """ | |
| Canonical Zotero item payload. | |
| - conferencePaper uses 'proceedingsTitle' | |
| - journalArticle uses 'publicationTitle' | |
| """ | |
| itemType: str = Field(..., examples=["journalArticle", "conferencePaper"]) | |
| title: str | |
| creators: List[Creator] = [] | |
| # Titles | |
| publicationTitle: Optional[str] = None # journalArticle | |
| proceedingsTitle: Optional[str] = None # conferencePaper | |
| conferenceName: Optional[str] = None | |
| abstractNote: Optional[str] = None | |
| date: Optional[str] = None | |
| pages: Optional[str] = None | |
| url: Optional[str] = None | |
| # common extras we’ll map from BibTeX | |
| volume: Optional[str] = None | |
| issue: Optional[str] = None # BibTeX 'number' | |
| publisher: Optional[str] = None | |
| place: Optional[str] = None # BibTeX 'address' | |
| series: Optional[str] = None | |
| DOI: Optional[str] = None | |
| ISBN: Optional[str] = None | |
| tags: List[Tag] = [] | |
| collections: List[str] = [] | |
| extra: Optional[str] = None | |
| relations: Dict[str, Any] = {} | |
| class CreateRequest(BaseModel): | |
| """ | |
| Public/ingress model. Accepts friendly aliases + raw BibTeX. | |
| """ | |
| groupId: Optional[str] = None | |
| writeToken: Optional[str] = None | |
| item: Optional[ItemModel] = None | |
| # Friendly fields | |
| title: Optional[str] = None | |
| authors: Optional[str] = None | |
| editors: Optional[str] = None | |
| itemType: Optional[str] = None | |
| # Titles (aliases) | |
| publicationTitle: Optional[str] = None | |
| proceedingsTitle: Optional[str] = None | |
| bookTitle: Optional[str] = None # alias → proceedingsTitle | |
| conferenceName: Optional[str] = None | |
| date: Optional[str] = None | |
| pages: Optional[str] = None | |
| url: Optional[str] = None | |
| abstractNote: Optional[str] = None | |
| tags: Optional[List[str]] = None | |
| collections: Optional[List[str]] = None | |
| # extra scholarly fields | |
| volume: Optional[str] = None | |
| issue: Optional[str] = None | |
| publisher: Optional[str] = None | |
| place: Optional[str] = None | |
| series: Optional[str] = None | |
| DOI: Optional[str] = None | |
| ISBN: Optional[str] = None | |
| # Optional raw BibTeX | |
| bibtex: Optional[str] = None | |
| def default_group(cls, v: Optional[str]) -> Optional[str]: | |
| return v or settings.ZOTERO_DEFAULT_GROUP_ID | |
| # ========================= | |
| # Helpers | |
| # ========================= | |
| AUTHOR_SPLIT_RE = re.compile(r"\s*(?:;| and )\s*", re.IGNORECASE) | |
| def parse_people(s: Optional[str], creator_type: str = "author") -> List[Creator]: | |
| if not s: | |
| return [] | |
| people = [] | |
| for part in AUTHOR_SPLIT_RE.split(s.strip()): | |
| if not part: | |
| continue | |
| if "," in part: | |
| last, first = [p.strip() for p in part.split(",", 1)] | |
| else: | |
| bits = part.strip().split() | |
| if len(bits) == 1: | |
| last, first = bits[0], "" | |
| else: | |
| last, first = bits[-1], " ".join(bits[:-1]) | |
| people.append(Creator(creatorType=creator_type, firstName=first, lastName=last)) | |
| return people | |
| def _bt_get(bib: str, name: str) -> Optional[str]: | |
| mm = re.search(rf"{name}\s*=\s*\{{([^}}]+)\}}", bib, re.IGNORECASE) | |
| if mm: | |
| return mm.group(1).strip() | |
| # also support quotes "..." | |
| mm = re.search(rf"{name}\s*=\s*\"([^\"]+)\"", bib, re.IGNORECASE) | |
| return mm.group(1).strip() if mm else None | |
| def parse_bibtex(bib: str) -> Dict[str, Any]: | |
| """ | |
| Very light BibTeX parsing. | |
| """ | |
| out: Dict[str, str] = {} | |
| m = re.search(r"@\s*(\w+)\s*\{", bib) | |
| if m: | |
| out["itemType"] = { | |
| "article": "journalArticle", | |
| "inproceedings": "conferencePaper", | |
| "proceedings": "conferencePaper", | |
| "book": "book", | |
| }.get(m.group(1).lower(), m.group(1)) | |
| out["title"] = _bt_get(bib, "title") | |
| out["authors"] = _bt_get(bib, "author") | |
| out["editors"] = _bt_get(bib, "editor") | |
| out["publicationTitle"] = _bt_get(bib, "journal") | |
| out["proceedingsTitle"] = _bt_get(bib, "booktitle") | |
| out["pages"] = _bt_get(bib, "pages") | |
| out["date"] = _bt_get(bib, "year") | |
| out["url"] = _bt_get(bib, "url") | |
| out["abstractNote"] = _bt_get(bib, "abstract") | |
| # extras | |
| out["series"] = _bt_get(bib, "series") | |
| out["volume"] = _bt_get(bib, "volume") | |
| out["issue"] = _bt_get(bib, "number") | |
| out["publisher"] = _bt_get(bib, "publisher") | |
| out["place"] = _bt_get(bib, "address") | |
| out["DOI"] = _bt_get(bib, "doi") | |
| out["ISBN"] = _bt_get(bib, "isbn") | |
| return {k: v for k, v in out.items() if v} | |
| def normalize_titles_for_type(item: Dict[str, Any]) -> None: | |
| """ | |
| Normalize title fields based on itemType. | |
| """ | |
| itype = (item.get("itemType") or "").lower() | |
| if itype == "conferencepaper": | |
| if not item.get("proceedingsTitle"): | |
| if item.get("bookTitle"): | |
| item["proceedingsTitle"] = item.pop("bookTitle") | |
| elif item.get("publicationTitle"): | |
| item["proceedingsTitle"] = item["publicationTitle"] | |
| item.pop("bookTitle", None) | |
| item.pop("publicationTitle", None) | |
| elif itype == "journalarticle": | |
| # prefer publicationTitle; drop conference fields | |
| item.pop("proceedingsTitle", None) | |
| item.pop("bookTitle", None) | |
| else: | |
| # Coalesce: if publicationTitle missing, reuse proceedingsTitle | |
| if not item.get("publicationTitle") and item.get("proceedingsTitle"): | |
| item["publicationTitle"] = item.pop("proceedingsTitle") | |
| item.pop("bookTitle", None) | |
| def guess_title_from_url(u: Optional[str]) -> Optional[str]: | |
| if not u: | |
| return None | |
| try: | |
| p = urlparse(u) | |
| if not p.path: | |
| return None | |
| parts = [seg for seg in p.path.split("/") if seg] | |
| if not parts: | |
| return None | |
| last = parts[-1] | |
| # strip extension | |
| if "." in last: | |
| last = last.rsplit(".", 1)[0] | |
| text = unquote(last) | |
| text = re.sub(r"[-_]+", " ", text).strip() | |
| if not text: | |
| return None | |
| return text[:1].upper() + text[1:] | |
| except Exception: | |
| return None | |
| def merge_request_to_item(req: CreateRequest) -> ItemModel: | |
| """ | |
| Merge precedence: | |
| 1) req.item (canonical) | |
| 2) parsed bibtex (fills missing) | |
| 3) friendly fields on req (override/add) | |
| 4) normalize titles | |
| 5) ensure title (fallback from URL slug if still missing) | |
| """ | |
| item: Dict[str, Any] = json.loads(req.item.json()) if req.item else {} | |
| # (2) From BibTeX | |
| if req.bibtex: | |
| bt = parse_bibtex(req.bibtex) | |
| item.setdefault("itemType", bt.get("itemType")) | |
| for k in [ | |
| "title", "publicationTitle", "proceedingsTitle", "conferenceName", | |
| "pages", "date", "url", "abstractNote", "series", "volume", "issue", | |
| "publisher", "place", "DOI", "ISBN" | |
| ]: | |
| if bt.get(k) and not item.get(k): | |
| item[k] = bt[k] | |
| # creators | |
| if bt.get("authors") and "creators" not in item: | |
| item["creators"] = parse_people(bt["authors"], "author") | |
| if bt.get("editors"): | |
| item.setdefault("creators", []) | |
| item["creators"].extend(parse_people(bt["editors"], "editor")) | |
| # (3) Friendly fields (override/add) | |
| if req.itemType: item["itemType"] = req.itemType | |
| if req.title: item["title"] = req.title | |
| if req.publicationTitle: item["publicationTitle"] = req.publicationTitle | |
| if req.proceedingsTitle: item["proceedingsTitle"] = req.proceedingsTitle | |
| if req.bookTitle: item["bookTitle"] = req.bookTitle | |
| if req.conferenceName: item["conferenceName"] = req.conferenceName | |
| if req.pages: item["pages"] = req.pages | |
| if req.date: item["date"] = req.date | |
| if req.url: item["url"] = req.url | |
| if req.abstractNote: item["abstractNote"] = req.abstractNote | |
| # extras | |
| if req.volume: item["volume"] = req.volume | |
| if req.issue: item["issue"] = req.issue | |
| if req.publisher: item["publisher"] = req.publisher | |
| if req.place: item["place"] = req.place | |
| if req.series: item["series"] = req.series | |
| if req.DOI: item["DOI"] = req.DOI | |
| if req.ISBN: item["ISBN"] = req.ISBN | |
| # creators (authors/editors text) | |
| creators: List[Creator] = [] | |
| if req.authors: creators.extend(parse_people(req.authors, "author")) | |
| if req.editors: creators.extend(parse_people(req.editors, "editor")) | |
| if creators: item["creators"] = creators | |
| # tags/collections | |
| if req.tags is not None: | |
| item["tags"] = [Tag(tag=t) for t in req.tags] | |
| if req.collections is not None: | |
| item["collections"] = req.collections | |
| # (4) Normalize titles by type | |
| normalize_titles_for_type(item) | |
| # (5) Ensure title: fallback from URL slug if still missing | |
| if not item.get("title"): | |
| t = guess_title_from_url(item.get("url")) | |
| if t: | |
| item["title"] = t | |
| # Final validation | |
| try: | |
| return ItemModel(**item) | |
| except Exception as e: | |
| raise HTTPException(status_code=422, detail=f"Invalid item payload: {e}") | |
| def default_write_token(item: ItemModel) -> str: | |
| base = f"{item.itemType}|{item.title}|{item.date or ''}" | |
| return hashlib.sha256(base.encode("utf-8")).hexdigest()[:32] | |
| async def zotero_post_items(group_id: str, items: List[ItemModel], write_token: str) -> Dict[str, Any]: | |
| url = f"https://api.zotero.org/groups/{group_id}/items" | |
| headers = { | |
| "Zotero-API-Key": settings.ZOTERO_API_KEY, | |
| "Zotero-API-Version": settings.ZOTERO_API_VERSION, | |
| "Content-Type": "application/json", | |
| "Accept": "application/json", | |
| "Zotero-Write-Token": write_token, | |
| } | |
| data = [json.loads(i.json(exclude_none=True)) for i in items] | |
| timeout = httpx.Timeout(settings.HTTP_TIMEOUT_SECONDS) | |
| attempt = 0 | |
| last_exc: Optional[Exception] = None | |
| async with httpx.AsyncClient(timeout=timeout) as client: | |
| while attempt <= settings.MAX_RETRIES: | |
| try: | |
| resp = await client.post(url, headers=headers, json=data) | |
| if resp.status_code in (200, 201, 204): | |
| return {"status": resp.status_code, "zotero": resp.json() if resp.text else {}} | |
| if resp.status_code in (429, 500, 502, 503, 504): | |
| retry_after = 0 | |
| ra = resp.headers.get("Retry-After") | |
| if ra: | |
| try: | |
| retry_after = int(ra) | |
| except ValueError: | |
| retry_after = 0 | |
| delay = max(retry_after, 2 ** attempt) | |
| attempt += 1 | |
| await asyncio.sleep(delay) | |
| continue | |
| raise HTTPException(status_code=resp.status_code, detail=resp.text) | |
| except httpx.HTTPError as e: | |
| last_exc = e | |
| attempt += 1 | |
| await asyncio.sleep(2 ** (attempt - 1)) | |
| continue | |
| raise HTTPException(status_code=502, detail=f"Upstream Zotero error or timeout: {last_exc}") | |
| # ========================= | |
| # FastAPI app | |
| # ========================= | |
| app = FastAPI(title="Zotero Proxy", version="1.2.0") | |
| allow_origins = ["*"] if settings.CORS_ALLOW_ORIGINS.strip() == "*" else [ | |
| o.strip() for o in settings.CORS_ALLOW_ORIGINS.split(",") if o.strip() | |
| ] | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=allow_origins, | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| async def ping(): | |
| return {"ok": True} | |
| async def create_item(req: CreateRequest = Body(...)): | |
| group_id = req.groupId or settings.ZOTERO_DEFAULT_GROUP_ID | |
| if not group_id: | |
| raise HTTPException(status_code=400, detail="groupId is required (or set ZOTERO_DEFAULT_GROUP_ID).") | |
| item = merge_request_to_item(req) | |
| # If still no title after all fallbacks, fail clearly: | |
| if not item.title: | |
| raise HTTPException(status_code=422, detail="Missing 'title'. Provide it directly, via BibTeX, or a URL I can slug.") | |
| write_token = req.writeToken or default_write_token(item) | |
| result = await zotero_post_items(group_id, [item], write_token) | |
| return {"groupId": group_id, "writeToken": write_token, **result} | |
| async def create_item_from_query( | |
| groupId: Optional[str] = Query(None), | |
| title: Optional[str] = Query(None), | |
| authors: Optional[str] = Query(None), | |
| editors: Optional[str] = Query(None), | |
| itemType: Optional[str] = Query(None), | |
| # Titles | |
| publicationTitle: Optional[str] = Query(None), | |
| proceedingsTitle: Optional[str] = Query(None), | |
| bookTitle: Optional[str] = Query(None), | |
| conferenceName: Optional[str] = Query(None), | |
| date: Optional[str] = Query(None), | |
| pages: Optional[str] = Query(None), | |
| url: Optional[str] = Query(None), | |
| abstractNote: Optional[str] = Query(None), | |
| tags: Optional[str] = Query(None), # comma-separated | |
| collections: Optional[str] = Query(None), # comma-separated | |
| # extras | |
| volume: Optional[str] = Query(None), | |
| issue: Optional[str] = Query(None), | |
| publisher: Optional[str] = Query(None), | |
| place: Optional[str] = Query(None), | |
| series: Optional[str] = Query(None), | |
| DOI: Optional[str] = Query(None), | |
| ISBN: Optional[str] = Query(None), | |
| writeToken: Optional[str] = Query(None), | |
| bibtex: Optional[str] = Query(None), | |
| ): | |
| req = CreateRequest( | |
| groupId=groupId or settings.ZOTERO_DEFAULT_GROUP_ID, | |
| writeToken=writeToken, | |
| title=title, | |
| authors=authors, | |
| editors=editors, | |
| itemType=itemType, | |
| publicationTitle=publicationTitle, | |
| proceedingsTitle=proceedingsTitle, | |
| bookTitle=bookTitle, | |
| conferenceName=conferenceName, | |
| date=date, | |
| pages=pages, | |
| url=url, | |
| abstractNote=abstractNote, | |
| tags=[t.strip() for t in tags.split(",")] if tags else None, | |
| collections=[c.strip() for c in collections.split(",")] if collections else None, | |
| volume=volume, | |
| issue=issue, | |
| publisher=publisher, | |
| place=place, | |
| series=series, | |
| DOI=DOI, | |
| ISBN=ISBN, | |
| bibtex=bibtex, | |
| ) | |
| return await create_item(req) | |