| """ |
| FSD Simulator 通用 SQL 中间层 (Hugging Face Space + Docker) |
| ======================================================== |
| 两个接口 = 两种权限: |
| POST /write 执行写类 SQL(INSERT/UPDATE/DELETE/建表),用【读写】token,不缓存 |
| POST /query 执行读类 SQL(SELECT),用【只读】token,结果缓存 12 小时 |
| |
| 直接传 SQL,灵活;但要清楚:能调 /write 的人就能改/删库里的数据。 |
| 防线:签名——只有持 APP_SECRET 的人能调接口。 |
| 注意:单 token 模式下 /query 也具备写权限,请勿泄露 APP_SECRET。 |
| |
| 环境变量(填进 HF Space 的 Settings -> Secrets): |
| TURSO_URL https://yourdb-yourorg.turso.io/v2/pipeline |
| TURSO_TOKEN Turso 的 Bearer Token 生成: turso db tokens create <db> |
| APP_SECRET 签名共享密钥,和 Roblox 端 FSDClient 一致 |
| """ |
|
|
| import os |
| import time |
| import json |
| import random |
| import hashlib |
| import hmac |
| from typing import Any, Optional |
|
|
| import httpx |
| from fastapi import FastAPI, Header, HTTPException |
| from pydantic import BaseModel |
|
|
| |
| TURSO_URL = os.environ.get("TURSO_URL", "") |
| TURSO_TOKEN = os.environ.get("TURSO_TOKEN", "") |
| APP_SECRET = os.environ.get("APP_SECRET", "") |
|
|
| CACHE_TTL = 12 * 60 * 60 |
| CACHE_JITTER = 30 * 60 |
| SIGNATURE_WINDOW = 300 |
|
|
| app = FastAPI(title="FSD SQL Layer") |
|
|
| |
| _cache: dict[str, tuple[Any, float]] = {} |
|
|
|
|
| def cache_get(key: str): |
| item = _cache.get(key) |
| if not item: |
| return None |
| value, expiry = item |
| if time.time() > expiry: |
| _cache.pop(key, None) |
| return None |
| return value |
|
|
|
|
| def cache_set(key: str, value: Any): |
| jitter = random.randint(-CACHE_JITTER, CACHE_JITTER) |
| _cache[key] = (value, time.time() + CACHE_TTL + jitter) |
|
|
|
|
| |
| def _to_arg(v: Any) -> dict: |
| """Python 值 -> Turso(Hrana)参数格式。整数用字符串传,防大整数精度丢失。""" |
| if v is None: |
| return {"type": "null", "value": None} |
| if isinstance(v, bool): |
| return {"type": "integer", "value": str(int(v))} |
| if isinstance(v, int): |
| return {"type": "integer", "value": str(v)} |
| if isinstance(v, float): |
| return {"type": "float", "value": v} |
| return {"type": "text", "value": str(v)} |
|
|
|
|
| async def turso_exec(statements: list[tuple[str, list]], token: str) -> list: |
| """用指定 token 执行一批 SQL,返回每条 execute 语句的 response(已过滤掉 close)。""" |
| if not TURSO_URL or not token: |
| raise HTTPException(500, "服务端未配置 TURSO_URL 或对应 token") |
|
|
| requests = [] |
| for sql, args in statements: |
| requests.append({ |
| "type": "execute", |
| "stmt": {"sql": sql, "args": [_to_arg(a) for a in args]}, |
| }) |
| requests.append({"type": "close"}) |
|
|
| async with httpx.AsyncClient(timeout=15) as client: |
| resp = await client.post( |
| TURSO_URL, |
| headers={ |
| "Authorization": f"Bearer {token}", |
| "Content-Type": "application/json", |
| }, |
| json={"requests": requests}, |
| ) |
| resp.raise_for_status() |
| data = resp.json() |
|
|
| results = [] |
| for item in data.get("results", []): |
| if item.get("type") == "error": |
| raise HTTPException(502, f"Turso 报错: {item.get('error')}") |
| r = item.get("response", {}) |
| if r.get("type") == "execute": |
| results.append(r) |
| return results |
|
|
|
|
| def parse_rows(execute_response: dict) -> list[dict]: |
| """execute 结果 -> [{列名: 值}, ...]""" |
| result = execute_response.get("result", {}) |
| cols = [c.get("name") for c in result.get("cols", [])] |
| out = [] |
| for row in result.get("rows", []): |
| out.append({col: cell.get("value") for col, cell in zip(cols, row)}) |
| return out |
|
|
|
|
| def affected(execute_response: dict) -> int: |
| return execute_response.get("result", {}).get("affected_row_count", 0) |
|
|
|
|
| |
| def check_signature(x_timestamp: Optional[str], x_signature: Optional[str]): |
| if not APP_SECRET: |
| return |
| if not x_timestamp or not x_signature: |
| raise HTTPException(401, "缺少 X-Timestamp 或 X-Signature") |
| try: |
| t = int(x_timestamp) |
| except ValueError: |
| raise HTTPException(401, "X-Timestamp 格式错误") |
| if abs(int(time.time()) - t) > SIGNATURE_WINDOW: |
| raise HTTPException(401, "签名已过期(检查两端时钟)") |
| expected = hashlib.sha256((APP_SECRET + x_timestamp).encode()).hexdigest() |
| if not hmac.compare_digest(expected, x_signature): |
| raise HTTPException(401, "签名无效") |
|
|
|
|
| |
|
|
| @app.get("/") |
| def health(): |
| """保活端点:给 UptimeRobot ping,保持 Space 不休眠。不需要签名。""" |
| return {"status": "ok"} |
|
|
|
|
| class Statement(BaseModel): |
| sql: str |
| args: list = [] |
|
|
|
|
| class WritePayload(BaseModel): |
| statements: list[Statement] |
|
|
|
|
| @app.post("/write") |
| async def write(payload: WritePayload, |
| x_timestamp: Optional[str] = Header(default=None), |
| x_signature: Optional[str] = Header(default=None)): |
| """【写】执行一批写类 SQL,用读写 token,不缓存。 |
| 支持一次发多条(写勤攒批),减少 HttpService 请求次数。""" |
| check_signature(x_timestamp, x_signature) |
| if not payload.statements: |
| return {"executed": 0, "affected": []} |
| stmts = [(s.sql, s.args) for s in payload.statements] |
| responses = await turso_exec(stmts, TURSO_TOKEN) |
| return {"executed": len(stmts), "affected": [affected(r) for r in responses]} |
|
|
|
|
| class QueryPayload(BaseModel): |
| sql: str |
| args: list = [] |
| cache: bool = True |
|
|
|
|
| @app.post("/query") |
| async def query(payload: QueryPayload, |
| x_timestamp: Optional[str] = Header(default=None), |
| x_signature: Optional[str] = Header(default=None)): |
| """【读】执行一条 SELECT,用只读 token,结果默认缓存 12 小时。 |
| 缓存键由 sql + args 决定:相同查询命中缓存,0 次回源。""" |
| check_signature(x_timestamp, x_signature) |
|
|
| key = "q:" + hashlib.sha256( |
| (payload.sql + json.dumps(payload.args, ensure_ascii=False)).encode() |
| ).hexdigest() |
|
|
| if payload.cache: |
| cached = cache_get(key) |
| if cached is not None: |
| return {"rows": cached, "cached": True} |
|
|
| responses = await turso_exec([(payload.sql, payload.args)], TURSO_TOKEN) |
| rows = parse_rows(responses[0]) |
| if payload.cache: |
| cache_set(key, rows) |
| return {"rows": rows, "cached": False} |
|
|