baocaodulieu / lib /github_store.py
hoangthiencm's picture
Update lib/github_store.py
47bee34 verified
"""
GitHub-based JSON data store.
All data is stored as JSON files in a GitHub repository.
Uses the GitHub Contents API for read/write operations.
"""
import base64
import json
import asyncio
from typing import Any, Optional
import httpx
from config import (
GITHUB_OWNER, GITHUB_REPO, GITHUB_BRANCH, GITHUB_TOKEN,
DATA_PATHS, DEFAULT_FOLDER_NAME,
)
from lib.utils import normalize_text, make_id, now_iso, slugify
# ─── Low-level GitHub API ─────────────────────────────
def _check_env():
if not GITHUB_OWNER or not GITHUB_REPO or not GITHUB_TOKEN:
raise RuntimeError("Missing GitHub env: GITHUB_OWNER, GITHUB_REPO, GITHUB_TOKEN")
def _api_url(path: str) -> str:
clean = path.lstrip("/")
return f"https://api.github.com/repos/{GITHUB_OWNER}/{GITHUB_REPO}/contents/{clean}"
def _headers() -> dict:
return {
"Authorization": f"Bearer {GITHUB_TOKEN}",
"Accept": "application/vnd.github+json",
"X-GitHub-Api-Version": "2022-11-28",
}
async def read_json(path: str) -> dict:
"""Read a JSON file from the GitHub repo. Returns {exists, sha, data}."""
_check_env()
url = f"{_api_url(path)}?ref={GITHUB_BRANCH}"
async with httpx.AsyncClient(timeout=30) as client:
resp = await client.get(url, headers=_headers())
if resp.status_code == 404:
return {"exists": False, "sha": None, "data": None}
if resp.status_code != 200:
raise RuntimeError(f"GitHub read failed ({resp.status_code}): {resp.text}")
payload = resp.json()
raw = base64.b64decode(payload.get("content", "")).decode("utf-8")
data = json.loads(raw) if raw else None
return {"exists": True, "sha": payload.get("sha"), "data": data}
async def write_json(path: str, data: Any, message: str, sha: Optional[str] = None):
"""Write a JSON file to the GitHub repo."""
_check_env()
max_attempts = 4
for attempt in range(1, max_attempts + 1):
body: dict = {
"message": message,
"content": base64.b64encode(
json.dumps(data, ensure_ascii=False, indent=2).encode("utf-8")
).decode("ascii"),
"branch": GITHUB_BRANCH,
}
if sha:
body["sha"] = sha
async with httpx.AsyncClient(timeout=30) as client:
resp = await client.put(
_api_url(path), headers=_headers(),
json=body,
)
if resp.status_code in (200, 201):
return resp.json()
text = resp.text.lower()
is_conflict = resp.status_code == 409
is_workflow_timeout = (
resp.status_code == 403
and "restricts updates to workflow files" in text
)
if (is_conflict or is_workflow_timeout) and attempt < max_attempts:
await asyncio.sleep(0.5 * attempt)
# Re-fetch sha for retry
fresh = await read_json(path)
sha = fresh.get("sha")
continue
raise RuntimeError(f"GitHub write failed ({resp.status_code}): {resp.text}")
raise RuntimeError("GitHub write failed: exhausted retries")
# ─── Defaults ─────────────────────────────────────────
DEFAULTS = {
DATA_PATHS["settings"]: {"defaultFolderName": DEFAULT_FOLDER_NAME},
DATA_PATHS["people"]: [],
DATA_PATHS["list_schemas"]: [],
DATA_PATHS["campaigns"]: [],
DATA_PATHS["submissions"]: [],
}
# ─── High-level data helpers ──────────────────────────
async def get_data(path: str) -> dict:
"""Read data file with fallback to defaults."""
result = await read_json(path)
if not result["exists"]:
init = DEFAULTS.get(path, [])
await write_json(path, init, f"chore(data): initialize {path}")
return {"sha": None, "data": init}
return {"sha": result["sha"], "data": result.get("data") or DEFAULTS.get(path, [])}
async def save_data(path: str, data: Any, message: str):
"""Write data file with conflict-retry (simple overwrite)."""
result = await read_json(path)
sha = result["sha"] if result["exists"] else None
try:
await write_json(path, data, message, sha)
except RuntimeError as exc:
if "409" not in str(exc):
raise
retry = await read_json(path)
await write_json(path, data, message, retry.get("sha"))
async def update_json_list(path: str, modifier, message: str, max_attempts: int = 4):
"""Atomic read-modify-write for list data.
`modifier` is called with the current list and must return the new list.
On conflict (409), re-reads the LATEST data and re-applies the modifier,
ensuring no concurrent writes are lost.
"""
_check_env()
for attempt in range(1, max_attempts + 1):
current = await read_json(path)
sha = current["sha"] if current["exists"] else None
data = current.get("data")
if not isinstance(data, list):
data = DEFAULTS.get(path, [])
new_data = modifier(data)
try:
await write_json(path, new_data, message, sha)
return new_data
except RuntimeError as exc:
if "409" not in str(exc) or attempt >= max_attempts:
raise
await asyncio.sleep(0.3 * attempt)
raise RuntimeError("update_json_list: exhausted retries")
# ─── Settings ─────────────────────────────────────────
async def get_settings() -> dict:
result = await get_data(DATA_PATHS["settings"])
d = result["data"] or {}
return {
"defaultFolderName": d.get("defaultFolderName") or DEFAULT_FOLDER_NAME,
"updatedAt": d.get("updatedAt"),
}
async def save_settings(folder_name: str) -> dict:
safe = (folder_name or "").strip() or DEFAULT_FOLDER_NAME
payload = {"defaultFolderName": safe, "updatedAt": now_iso()}
await save_data(DATA_PATHS["settings"], payload, f"chore(data): update settings folder {safe}")
return payload
# ─── People ───────────────────────────────────────────
async def get_people() -> list:
result = await get_data(DATA_PATHS["people"])
d = result["data"]
return d if isinstance(d, list) else []
async def replace_people(people: list) -> list:
clean = people if isinstance(people, list) else []
await save_data(DATA_PATHS["people"], clean, f"chore(data): import people ({len(clean)})")
return clean
# ─── List Schemas ─────────────────────────────────────
async def get_list_schemas() -> list:
result = await get_data(DATA_PATHS["list_schemas"])
d = result["data"]
return d if isinstance(d, list) else []
async def save_list_schemas(schemas: list) -> list:
clean = schemas if isinstance(schemas, list) else []
await save_data(DATA_PATHS["list_schemas"], clean, f"chore(data): update list schemas ({len(clean)})")
return clean
async def upsert_list_schema(schema_input: dict) -> dict:
schema_id = str(schema_input.get("id") or "").strip()
name = str(schema_input.get("name") or "").strip()
if not name:
raise ValueError("Missing list schema name")
raw_fields = schema_input.get("fields") or []
fields = []
for idx, f in enumerate(raw_fields if isinstance(raw_fields, list) else []):
key = str(f.get("key") or f"field_{idx}").strip()
label = str(f.get("label") or f.get("key") or f"Field {idx + 1}").strip()
ftype = str(f.get("type") or "text").strip()
required = bool(f.get("required"))
if key:
fields.append({"key": key, "label": label, "type": ftype, "required": required})
if not fields:
fields = [{"key": "ho_ten", "label": "Họ tên", "type": "text", "required": True}]
all_schemas = await get_list_schemas()
key_norm = normalize_text(name)
now = now_iso()
idx_by_id = -1
idx_by_name = -1
for i, s in enumerate(all_schemas):
if schema_id and str(s.get("id") or "").strip() == schema_id:
idx_by_id = i
if normalize_text(s.get("name") or "") == key_norm:
idx_by_name = i
idx = idx_by_id if idx_by_id >= 0 else idx_by_name
if idx >= 0:
item = {**all_schemas[idx], "name": name, "fields": fields, "updatedAt": now}
all_schemas[idx] = item
else:
item = {"id": make_id("ls"), "name": name, "fields": fields, "createdAt": now, "updatedAt": now}
all_schemas.append(item)
await save_list_schemas(all_schemas)
return item
async def delete_list_schema(schema_id: str = "", schema_name: str = ""):
sid = (schema_id or "").strip()
sname = (schema_name or "").strip()
if not sid and not sname:
raise ValueError("Missing schema id or name")
all_schemas = await get_list_schemas()
filtered = []
for s in all_schemas:
if sid and str(s.get("id") or "").strip() == sid:
continue
if sname and normalize_text(s.get("name") or "") == normalize_text(sname):
continue
filtered.append(s)
if len(filtered) == len(all_schemas):
raise ValueError("Schema not found")
await save_list_schemas(filtered)
return {"deleted": len(all_schemas) - len(filtered)}
# ─── Campaigns ────────────────────────────────────────
async def get_campaigns() -> list:
result = await get_data(DATA_PATHS["campaigns"])
d = result["data"]
return d if isinstance(d, list) else []
async def save_campaigns(campaigns: list) -> list:
clean = campaigns if isinstance(campaigns, list) else []
await save_data(DATA_PATHS["campaigns"], clean, f"chore(data): update campaigns ({len(clean)})")
return clean
async def atomic_add_campaign(item: dict) -> dict:
"""Atomically append a campaign to the list."""
def modifier(current):
current.append(item)
return current
await update_json_list(
DATA_PATHS["campaigns"], modifier,
f"chore(data): add campaign {item.get('ten', '')}"
)
return item
async def atomic_update_campaign(ten: str, updater) -> bool:
"""Atomically update a campaign by name. `updater(campaign_dict)` returns updated dict."""
found = [False]
def modifier(current):
for i, c in enumerate(current):
if c.get("ten") == ten:
current[i] = updater(c)
found[0] = True
break
return current
await update_json_list(
DATA_PATHS["campaigns"], modifier,
f"chore(data): update campaign {ten}"
)
return found[0]
async def atomic_delete_campaign(ten: str) -> bool:
"""Atomically remove a campaign by name."""
deleted = [False]
def modifier(current):
new_list = [c for c in current if c.get("ten") != ten]
deleted[0] = len(new_list) < len(current)
return new_list
await update_json_list(
DATA_PATHS["campaigns"], modifier,
f"chore(data): delete campaign {ten}"
)
return deleted[0]
# ─── Submissions ──────────────────────────────────────
async def get_submissions() -> list:
result = await get_data(DATA_PATHS["submissions"])
d = result["data"]
return d if isinstance(d, list) else []
async def add_submission(submission: dict) -> dict:
"""Atomically append a submission."""
sub_id = submission.get("id") or ""
def modifier(current):
current.append(submission)
return current
await update_json_list(
DATA_PATHS["submissions"], modifier,
f"chore(data): add submission {sub_id}".strip()
)
return submission