TonyD365's picture
Update app.py
4d8a0c4 verified
Raw
History Blame Contribute Delete
7.13 kB
"""
FSD Simulator 通用 SQL 中间层 (Hugging Face Space + Docker)
========================================================
两个接口 = 两种权限:
POST /write 执行写类 SQL(INSERT/UPDATE/DELETE/建表),用【读写】token,不缓存
POST /query 执行读类 SQL(SELECT),用【只读】token,结果缓存 12 小时
直接传 SQL,灵活;但要清楚:能调 /write 的人就能改/删库里的数据。
防线:签名——只有持 APP_SECRET 的人能调接口。
注意:单 token 模式下 /query 也具备写权限,请勿泄露 APP_SECRET。
环境变量(填进 HF Space 的 Settings -> Secrets):
TURSO_URL https://yourdb-yourorg.turso.io/v2/pipeline
TURSO_TOKEN Turso 的 Bearer Token 生成: turso db tokens create <db>
APP_SECRET 签名共享密钥,和 Roblox 端 FSDClient 一致
"""
import os
import time
import json
import random
import hashlib
import hmac
from typing import Any, Optional
import httpx
from fastapi import FastAPI, Header, HTTPException
from pydantic import BaseModel
# ---------- 从环境变量读配置(绝不硬编码密钥!)----------
TURSO_URL = os.environ.get("TURSO_URL", "")
TURSO_TOKEN = os.environ.get("TURSO_TOKEN", "")
APP_SECRET = os.environ.get("APP_SECRET", "")
CACHE_TTL = 12 * 60 * 60 # 读缓存 12 小时
CACHE_JITTER = 30 * 60 # ±30 分钟抖动,防缓存雪崩
SIGNATURE_WINDOW = 300 # 签名时间窗 ±5 分钟
app = FastAPI(title="FSD SQL Layer")
# ---------- 内存缓存(进程重启即清空,只当加速层)----------
_cache: dict[str, tuple[Any, float]] = {}
def cache_get(key: str):
item = _cache.get(key)
if not item:
return None
value, expiry = item
if time.time() > expiry:
_cache.pop(key, None)
return None
return value
def cache_set(key: str, value: Any):
jitter = random.randint(-CACHE_JITTER, CACHE_JITTER)
_cache[key] = (value, time.time() + CACHE_TTL + jitter)
# ---------- Turso HTTP 封装 ----------
def _to_arg(v: Any) -> dict:
"""Python 值 -> Turso(Hrana)参数格式。整数用字符串传,防大整数精度丢失。"""
if v is None:
return {"type": "null", "value": None}
if isinstance(v, bool):
return {"type": "integer", "value": str(int(v))}
if isinstance(v, int):
return {"type": "integer", "value": str(v)}
if isinstance(v, float):
return {"type": "float", "value": v}
return {"type": "text", "value": str(v)}
async def turso_exec(statements: list[tuple[str, list]], token: str) -> list:
"""用指定 token 执行一批 SQL,返回每条 execute 语句的 response(已过滤掉 close)。"""
if not TURSO_URL or not token:
raise HTTPException(500, "服务端未配置 TURSO_URL 或对应 token")
requests = []
for sql, args in statements:
requests.append({
"type": "execute",
"stmt": {"sql": sql, "args": [_to_arg(a) for a in args]},
})
requests.append({"type": "close"})
async with httpx.AsyncClient(timeout=15) as client:
resp = await client.post(
TURSO_URL,
headers={
"Authorization": f"Bearer {token}",
"Content-Type": "application/json",
},
json={"requests": requests},
)
resp.raise_for_status()
data = resp.json()
results = []
for item in data.get("results", []):
if item.get("type") == "error":
raise HTTPException(502, f"Turso 报错: {item.get('error')}")
r = item.get("response", {})
if r.get("type") == "execute": # 只收集 execute,忽略 close 的空响应
results.append(r)
return results
def parse_rows(execute_response: dict) -> list[dict]:
"""execute 结果 -> [{列名: 值}, ...]"""
result = execute_response.get("result", {})
cols = [c.get("name") for c in result.get("cols", [])]
out = []
for row in result.get("rows", []):
out.append({col: cell.get("value") for col, cell in zip(cols, row)})
return out
def affected(execute_response: dict) -> int:
return execute_response.get("result", {}).get("affected_row_count", 0)
# ---------- 鉴权:时间戳签名校验 ----------
def check_signature(x_timestamp: Optional[str], x_signature: Optional[str]):
if not APP_SECRET:
return # 没设就跳过(本地测试);线上务必设置
if not x_timestamp or not x_signature:
raise HTTPException(401, "缺少 X-Timestamp 或 X-Signature")
try:
t = int(x_timestamp)
except ValueError:
raise HTTPException(401, "X-Timestamp 格式错误")
if abs(int(time.time()) - t) > SIGNATURE_WINDOW:
raise HTTPException(401, "签名已过期(检查两端时钟)")
expected = hashlib.sha256((APP_SECRET + x_timestamp).encode()).hexdigest()
if not hmac.compare_digest(expected, x_signature):
raise HTTPException(401, "签名无效")
# ==================== 路由 ====================
@app.get("/")
def health():
"""保活端点:给 UptimeRobot ping,保持 Space 不休眠。不需要签名。"""
return {"status": "ok"}
class Statement(BaseModel):
sql: str
args: list = [] # 可选;建议用 ? 占位 + args,既灵活又防注入
class WritePayload(BaseModel):
statements: list[Statement]
@app.post("/write")
async def write(payload: WritePayload,
x_timestamp: Optional[str] = Header(default=None),
x_signature: Optional[str] = Header(default=None)):
"""【写】执行一批写类 SQL,用读写 token,不缓存。
支持一次发多条(写勤攒批),减少 HttpService 请求次数。"""
check_signature(x_timestamp, x_signature)
if not payload.statements:
return {"executed": 0, "affected": []}
stmts = [(s.sql, s.args) for s in payload.statements]
responses = await turso_exec(stmts, TURSO_TOKEN)
return {"executed": len(stmts), "affected": [affected(r) for r in responses]}
class QueryPayload(BaseModel):
sql: str
args: list = []
cache: bool = True # 想拿实时结果就传 false
@app.post("/query")
async def query(payload: QueryPayload,
x_timestamp: Optional[str] = Header(default=None),
x_signature: Optional[str] = Header(default=None)):
"""【读】执行一条 SELECT,用只读 token,结果默认缓存 12 小时。
缓存键由 sql + args 决定:相同查询命中缓存,0 次回源。"""
check_signature(x_timestamp, x_signature)
key = "q:" + hashlib.sha256(
(payload.sql + json.dumps(payload.args, ensure_ascii=False)).encode()
).hexdigest()
if payload.cache:
cached = cache_get(key)
if cached is not None:
return {"rows": cached, "cached": True}
responses = await turso_exec([(payload.sql, payload.args)], TURSO_TOKEN)
rows = parse_rows(responses[0])
if payload.cache:
cache_set(key, rows)
return {"rows": rows, "cached": False}