Spaces:
Sleeping
Sleeping
File size: 5,045 Bytes
3ebd172 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 | import gradio as gr
import time
import math
from collections import deque, defaultdict
# --- 設定參數 ---
MAX_RECENT_EVENTS = 1000 # 暫存區大小 (達到此數量觸發壓縮)
GRID_PRECISION = 4 # 網格精度 (4小數點約=11公尺)
MAX_GRID_INTENSITY = 20.0 # 單一網格最大亮度 (避免單點過曝)
# --- 資料結構 ---
# 1. 近期事件 (Delta Queue): 用於快速同步
# 格式: {'t': time, 'u': user_id, 'l': [lat, lng], 's': speed}
recent_events = deque()
# 2. 歷史網格 (Snapshot Grid): 儲存壓縮後的長期記憶
# Key: (lat_round, lng_round), Value: intensity
archived_grid = defaultdict(float)
# 3. 狀態版本控制
snapshot_version = 0
last_snapshot_time = time.time()
def get_intensity_by_speed(speed_kmh):
"""
根據速度決定迷霧驅散強度 (亮度)
< 5 km/h (步行/停留): 強度 1.0
< 20 km/h (跑步/腳踏車): 強度 0.5
> 20 km/h (車輛): 強度 0.1
"""
if speed_kmh is None: return 0.5
if speed_kmh < 5: return 1.0
if speed_kmh < 20: return 0.5
return 0.1
def compress_events_to_grid(events_list):
"""將事件列表壓縮進網格"""
global archived_grid
for event in events_list:
lat, lng = event['l']
speed = event['s']
# 網格化座標
grid_key = (round(lat, GRID_PRECISION), round(lng, GRID_PRECISION))
# 累加亮度
intensity = get_intensity_by_speed(speed)
archived_grid[grid_key] += intensity
# 限制最大亮度 (防止單點無限疊加)
if archived_grid[grid_key] > MAX_GRID_INTENSITY:
archived_grid[grid_key] = MAX_GRID_INTENSITY
def trigger_snapshot():
"""檢查並執行快照壓縮"""
global recent_events, snapshot_version, last_snapshot_time
# 如果暫存區超過閾值,將最舊的一半資料壓縮進歷史網格
if len(recent_events) > MAX_RECENT_EVENTS:
events_to_archive = []
count_to_remove = int(MAX_RECENT_EVENTS / 2)
for _ in range(count_to_remove):
events_to_archive.append(recent_events.popleft())
compress_events_to_grid(events_to_archive)
snapshot_version += 1
last_snapshot_time = time.time()
print(f"[Server] Snapshot Updated. Ver: {snapshot_version}, Grid Points: {len(archived_grid)}")
def sync_game_state(client_last_time, client_ver, user_id, lat, lng, speed):
"""
主 API: 處理客戶端同步請求
"""
current_time = time.time()
# 1. 記錄用戶上傳的位置 (如果有)
if lat is not None and lng is not None:
# 過濾異常數據 (例如 GPS 飄移瞬間移動 > 150km/h)
if speed is None or speed < 150:
new_event = {
"t": current_time,
"u": user_id,
"l": [lat, lng],
"s": speed
}
recent_events.append(new_event)
# 2. 檢查是否需要觸發壓縮
trigger_snapshot()
# 3. 準備回傳資料
response = {
"server_time": current_time,
"snapshot_version": snapshot_version,
"sync_type": "delta",
"payload": []
}
# 判斷同步策略
# 如果客戶端版本落後 (需要全量更新) 或 客戶端剛加入 (ver = -1)
# if client_ver < snapshot_version:
if client_ver < 0:
response["sync_type"] = "snapshot"
# 回傳壓縮後的網格數據: [[lat, lng, intensity], ...]
response["payload"] = [[k[0], k[1], v] for k, v in archived_grid.items()]
# 同時附上還沒被壓縮的近期事件,確保數據不出現斷層
response["recent_events"] = list(recent_events)
else:
# 增量更新: 只回傳 client_last_time 之後發生的事件
# 注意: 這裡回傳的是未壓縮的原始事件
delta = [e for e in recent_events if e['t'] > client_last_time]
response["payload"] = delta
return response
# --- Gradio 介面設定 ---
with gr.Blocks() as demo:
gr.Markdown("## City Scout Game API Server")
gr.Markdown("此 Space 為遊戲後端,請使用前端 HTML 連接。")
with gr.Row(visible=False): # 隱藏 UI,僅作為 API 使用
# Inputs
in_time = gr.Number(label="Client Last Time")
in_ver = gr.Number(label="Client Snapshot Version")
in_uid = gr.Textbox(label="User ID")
in_lat = gr.Number(label="Lat")
in_lng = gr.Number(label="Lng")
in_speed = gr.Number(label="Speed (km/h)")
# Outputs
out_json = gr.JSON(label="Sync Response")
# API 按鈕
btn_sync = gr.Button("Sync")
btn_sync.click(
fn=sync_game_state,
inputs=[in_time, in_ver, in_uid, in_lat, in_lng, in_speed],
outputs=out_json,
api_name="sync" # 重要: 前端 client.predict("/sync") 用這個名字
)
# 啟動並允許跨域 (CORS)
demo.queue(max_size=50).launch(cors_allowed_origins=["*"]) |