Spaces:
Running
Running
| """ | |
| GitHub-based JSON data store. | |
| All data is stored as JSON files in a GitHub repository. | |
| Uses the GitHub Contents API for read/write operations. | |
| """ | |
| import base64 | |
| import json | |
| import asyncio | |
| from typing import Any, Optional | |
| import httpx | |
| from config import ( | |
| GITHUB_OWNER, GITHUB_REPO, GITHUB_BRANCH, GITHUB_TOKEN, | |
| DATA_PATHS, DEFAULT_FOLDER_NAME, | |
| ) | |
| from lib.utils import normalize_text, make_id, now_iso, slugify | |
| # ─── Low-level GitHub API ───────────────────────────── | |
| def _check_env(): | |
| if not GITHUB_OWNER or not GITHUB_REPO or not GITHUB_TOKEN: | |
| raise RuntimeError("Missing GitHub env: GITHUB_OWNER, GITHUB_REPO, GITHUB_TOKEN") | |
| def _api_url(path: str) -> str: | |
| clean = path.lstrip("/") | |
| return f"https://api.github.com/repos/{GITHUB_OWNER}/{GITHUB_REPO}/contents/{clean}" | |
| def _headers() -> dict: | |
| return { | |
| "Authorization": f"Bearer {GITHUB_TOKEN}", | |
| "Accept": "application/vnd.github+json", | |
| "X-GitHub-Api-Version": "2022-11-28", | |
| } | |
| async def read_json(path: str) -> dict: | |
| """Read a JSON file from the GitHub repo. Returns {exists, sha, data}.""" | |
| _check_env() | |
| url = f"{_api_url(path)}?ref={GITHUB_BRANCH}" | |
| async with httpx.AsyncClient(timeout=30) as client: | |
| resp = await client.get(url, headers=_headers()) | |
| if resp.status_code == 404: | |
| return {"exists": False, "sha": None, "data": None} | |
| if resp.status_code != 200: | |
| raise RuntimeError(f"GitHub read failed ({resp.status_code}): {resp.text}") | |
| payload = resp.json() | |
| raw = base64.b64decode(payload.get("content", "")).decode("utf-8") | |
| data = json.loads(raw) if raw else None | |
| return {"exists": True, "sha": payload.get("sha"), "data": data} | |
| async def write_json(path: str, data: Any, message: str, sha: Optional[str] = None): | |
| """Write a JSON file to the GitHub repo.""" | |
| _check_env() | |
| max_attempts = 4 | |
| for attempt in range(1, max_attempts + 1): | |
| body: dict = { | |
| "message": message, | |
| "content": base64.b64encode( | |
| json.dumps(data, ensure_ascii=False, indent=2).encode("utf-8") | |
| ).decode("ascii"), | |
| "branch": GITHUB_BRANCH, | |
| } | |
| if sha: | |
| body["sha"] = sha | |
| async with httpx.AsyncClient(timeout=30) as client: | |
| resp = await client.put( | |
| _api_url(path), headers=_headers(), | |
| json=body, | |
| ) | |
| if resp.status_code in (200, 201): | |
| return resp.json() | |
| text = resp.text.lower() | |
| is_conflict = resp.status_code == 409 | |
| is_workflow_timeout = ( | |
| resp.status_code == 403 | |
| and "restricts updates to workflow files" in text | |
| ) | |
| if (is_conflict or is_workflow_timeout) and attempt < max_attempts: | |
| await asyncio.sleep(0.5 * attempt) | |
| # Re-fetch sha for retry | |
| fresh = await read_json(path) | |
| sha = fresh.get("sha") | |
| continue | |
| raise RuntimeError(f"GitHub write failed ({resp.status_code}): {resp.text}") | |
| raise RuntimeError("GitHub write failed: exhausted retries") | |
| # ─── Defaults ───────────────────────────────────────── | |
| DEFAULTS = { | |
| DATA_PATHS["settings"]: {"defaultFolderName": DEFAULT_FOLDER_NAME}, | |
| DATA_PATHS["people"]: [], | |
| DATA_PATHS["list_schemas"]: [], | |
| DATA_PATHS["campaigns"]: [], | |
| DATA_PATHS["submissions"]: [], | |
| } | |
| # ─── High-level data helpers ────────────────────────── | |
| async def get_data(path: str) -> dict: | |
| """Read data file with fallback to defaults.""" | |
| result = await read_json(path) | |
| if not result["exists"]: | |
| init = DEFAULTS.get(path, []) | |
| await write_json(path, init, f"chore(data): initialize {path}") | |
| return {"sha": None, "data": init} | |
| return {"sha": result["sha"], "data": result.get("data") or DEFAULTS.get(path, [])} | |
| async def save_data(path: str, data: Any, message: str): | |
| """Write data file with conflict-retry (simple overwrite).""" | |
| result = await read_json(path) | |
| sha = result["sha"] if result["exists"] else None | |
| try: | |
| await write_json(path, data, message, sha) | |
| except RuntimeError as exc: | |
| if "409" not in str(exc): | |
| raise | |
| retry = await read_json(path) | |
| await write_json(path, data, message, retry.get("sha")) | |
| async def update_json_list(path: str, modifier, message: str, max_attempts: int = 4): | |
| """Atomic read-modify-write for list data. | |
| `modifier` is called with the current list and must return the new list. | |
| On conflict (409), re-reads the LATEST data and re-applies the modifier, | |
| ensuring no concurrent writes are lost. | |
| """ | |
| _check_env() | |
| for attempt in range(1, max_attempts + 1): | |
| current = await read_json(path) | |
| sha = current["sha"] if current["exists"] else None | |
| data = current.get("data") | |
| if not isinstance(data, list): | |
| data = DEFAULTS.get(path, []) | |
| new_data = modifier(data) | |
| try: | |
| await write_json(path, new_data, message, sha) | |
| return new_data | |
| except RuntimeError as exc: | |
| if "409" not in str(exc) or attempt >= max_attempts: | |
| raise | |
| await asyncio.sleep(0.3 * attempt) | |
| raise RuntimeError("update_json_list: exhausted retries") | |
| # ─── Settings ───────────────────────────────────────── | |
| async def get_settings() -> dict: | |
| result = await get_data(DATA_PATHS["settings"]) | |
| d = result["data"] or {} | |
| return { | |
| "defaultFolderName": d.get("defaultFolderName") or DEFAULT_FOLDER_NAME, | |
| "updatedAt": d.get("updatedAt"), | |
| } | |
| async def save_settings(folder_name: str) -> dict: | |
| safe = (folder_name or "").strip() or DEFAULT_FOLDER_NAME | |
| payload = {"defaultFolderName": safe, "updatedAt": now_iso()} | |
| await save_data(DATA_PATHS["settings"], payload, f"chore(data): update settings folder {safe}") | |
| return payload | |
| # ─── People ─────────────────────────────────────────── | |
| async def get_people() -> list: | |
| result = await get_data(DATA_PATHS["people"]) | |
| d = result["data"] | |
| return d if isinstance(d, list) else [] | |
| async def replace_people(people: list) -> list: | |
| clean = people if isinstance(people, list) else [] | |
| await save_data(DATA_PATHS["people"], clean, f"chore(data): import people ({len(clean)})") | |
| return clean | |
| # ─── List Schemas ───────────────────────────────────── | |
| async def get_list_schemas() -> list: | |
| result = await get_data(DATA_PATHS["list_schemas"]) | |
| d = result["data"] | |
| return d if isinstance(d, list) else [] | |
| async def save_list_schemas(schemas: list) -> list: | |
| clean = schemas if isinstance(schemas, list) else [] | |
| await save_data(DATA_PATHS["list_schemas"], clean, f"chore(data): update list schemas ({len(clean)})") | |
| return clean | |
| async def upsert_list_schema(schema_input: dict) -> dict: | |
| schema_id = str(schema_input.get("id") or "").strip() | |
| name = str(schema_input.get("name") or "").strip() | |
| if not name: | |
| raise ValueError("Missing list schema name") | |
| raw_fields = schema_input.get("fields") or [] | |
| fields = [] | |
| for idx, f in enumerate(raw_fields if isinstance(raw_fields, list) else []): | |
| key = str(f.get("key") or f"field_{idx}").strip() | |
| label = str(f.get("label") or f.get("key") or f"Field {idx + 1}").strip() | |
| ftype = str(f.get("type") or "text").strip() | |
| required = bool(f.get("required")) | |
| if key: | |
| fields.append({"key": key, "label": label, "type": ftype, "required": required}) | |
| if not fields: | |
| fields = [{"key": "ho_ten", "label": "Họ tên", "type": "text", "required": True}] | |
| all_schemas = await get_list_schemas() | |
| key_norm = normalize_text(name) | |
| now = now_iso() | |
| idx_by_id = -1 | |
| idx_by_name = -1 | |
| for i, s in enumerate(all_schemas): | |
| if schema_id and str(s.get("id") or "").strip() == schema_id: | |
| idx_by_id = i | |
| if normalize_text(s.get("name") or "") == key_norm: | |
| idx_by_name = i | |
| idx = idx_by_id if idx_by_id >= 0 else idx_by_name | |
| if idx >= 0: | |
| item = {**all_schemas[idx], "name": name, "fields": fields, "updatedAt": now} | |
| all_schemas[idx] = item | |
| else: | |
| item = {"id": make_id("ls"), "name": name, "fields": fields, "createdAt": now, "updatedAt": now} | |
| all_schemas.append(item) | |
| await save_list_schemas(all_schemas) | |
| return item | |
| async def delete_list_schema(schema_id: str = "", schema_name: str = ""): | |
| sid = (schema_id or "").strip() | |
| sname = (schema_name or "").strip() | |
| if not sid and not sname: | |
| raise ValueError("Missing schema id or name") | |
| all_schemas = await get_list_schemas() | |
| filtered = [] | |
| for s in all_schemas: | |
| if sid and str(s.get("id") or "").strip() == sid: | |
| continue | |
| if sname and normalize_text(s.get("name") or "") == normalize_text(sname): | |
| continue | |
| filtered.append(s) | |
| if len(filtered) == len(all_schemas): | |
| raise ValueError("Schema not found") | |
| await save_list_schemas(filtered) | |
| return {"deleted": len(all_schemas) - len(filtered)} | |
| # ─── Campaigns ──────────────────────────────────────── | |
| async def get_campaigns() -> list: | |
| result = await get_data(DATA_PATHS["campaigns"]) | |
| d = result["data"] | |
| return d if isinstance(d, list) else [] | |
| async def save_campaigns(campaigns: list) -> list: | |
| clean = campaigns if isinstance(campaigns, list) else [] | |
| await save_data(DATA_PATHS["campaigns"], clean, f"chore(data): update campaigns ({len(clean)})") | |
| return clean | |
| async def atomic_add_campaign(item: dict) -> dict: | |
| """Atomically append a campaign to the list.""" | |
| def modifier(current): | |
| current.append(item) | |
| return current | |
| await update_json_list( | |
| DATA_PATHS["campaigns"], modifier, | |
| f"chore(data): add campaign {item.get('ten', '')}" | |
| ) | |
| return item | |
| async def atomic_update_campaign(ten: str, updater) -> bool: | |
| """Atomically update a campaign by name. `updater(campaign_dict)` returns updated dict.""" | |
| found = [False] | |
| def modifier(current): | |
| for i, c in enumerate(current): | |
| if c.get("ten") == ten: | |
| current[i] = updater(c) | |
| found[0] = True | |
| break | |
| return current | |
| await update_json_list( | |
| DATA_PATHS["campaigns"], modifier, | |
| f"chore(data): update campaign {ten}" | |
| ) | |
| return found[0] | |
| async def atomic_delete_campaign(ten: str) -> bool: | |
| """Atomically remove a campaign by name.""" | |
| deleted = [False] | |
| def modifier(current): | |
| new_list = [c for c in current if c.get("ten") != ten] | |
| deleted[0] = len(new_list) < len(current) | |
| return new_list | |
| await update_json_list( | |
| DATA_PATHS["campaigns"], modifier, | |
| f"chore(data): delete campaign {ten}" | |
| ) | |
| return deleted[0] | |
| # ─── Submissions ────────────────────────────────────── | |
| async def get_submissions() -> list: | |
| result = await get_data(DATA_PATHS["submissions"]) | |
| d = result["data"] | |
| return d if isinstance(d, list) else [] | |
| async def add_submission(submission: dict) -> dict: | |
| """Atomically append a submission.""" | |
| sub_id = submission.get("id") or "" | |
| def modifier(current): | |
| current.append(submission) | |
| return current | |
| await update_json_list( | |
| DATA_PATHS["submissions"], modifier, | |
| f"chore(data): add submission {sub_id}".strip() | |
| ) | |
| return submission | |