Spaces:
Sleeping
Sleeping
Update modules/utils.py
Browse files- modules/utils.py +64 -31
modules/utils.py
CHANGED
|
@@ -1,42 +1,75 @@
|
|
| 1 |
-
import os
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 2 |
from pathlib import Path
|
| 3 |
-
from typing import
|
| 4 |
|
| 5 |
-
|
| 6 |
-
|
|
|
|
| 7 |
|
| 8 |
-
def ensure_dirs():
|
|
|
|
| 9 |
DATA_DIR.mkdir(parents=True, exist_ok=True)
|
|
|
|
| 10 |
|
| 11 |
-
|
| 12 |
-
|
| 13 |
-
|
| 14 |
-
|
| 15 |
-
|
| 16 |
-
|
| 17 |
-
|
| 18 |
-
|
| 19 |
-
|
| 20 |
-
|
| 21 |
-
|
| 22 |
-
|
| 23 |
-
|
| 24 |
-
|
| 25 |
-
|
| 26 |
-
|
|
|
|
|
|
|
|
|
|
| 27 |
ensure_dirs()
|
| 28 |
-
|
| 29 |
-
|
| 30 |
-
|
|
|
|
|
|
|
| 31 |
|
| 32 |
-
def verify_tracking_token(token: str)
|
|
|
|
|
|
|
|
|
|
| 33 |
try:
|
| 34 |
-
|
| 35 |
-
|
| 36 |
-
|
| 37 |
-
|
| 38 |
-
if not hmac.compare_digest(
|
| 39 |
return None
|
| 40 |
-
return json.
|
| 41 |
except Exception:
|
| 42 |
return None
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 1 |
+
import os
|
| 2 |
+
import json
|
| 3 |
+
import time
|
| 4 |
+
import base64
|
| 5 |
+
import hmac
|
| 6 |
+
import hashlib
|
| 7 |
from pathlib import Path
|
| 8 |
+
from typing import Any, Dict, Optional
|
| 9 |
|
| 10 |
+
# ====== 共通ディレクトリの用意 ======
|
| 11 |
+
DATA_DIR = Path(os.getenv("DATA_DIR", "data"))
|
| 12 |
+
EXPORT_DIR = DATA_DIR / "exports"
|
| 13 |
|
| 14 |
+
def ensure_dirs() -> None:
|
| 15 |
+
"""必要なディレクトリを作成"""
|
| 16 |
DATA_DIR.mkdir(parents=True, exist_ok=True)
|
| 17 |
+
EXPORT_DIR.mkdir(parents=True, exist_ok=True)
|
| 18 |
|
| 19 |
+
# ====== 簡易トラッキングトークン(追加依存なしのHMAC方式) ======
|
| 20 |
+
_SECRET = os.getenv("TRACKING_SECRET", "dev-secret").encode("utf-8")
|
| 21 |
+
|
| 22 |
+
def _b64url_encode(b: bytes) -> str:
|
| 23 |
+
return base64.urlsafe_b64encode(b).rstrip(b"=").decode("ascii")
|
| 24 |
+
|
| 25 |
+
def _b64url_decode(s: str) -> bytes:
|
| 26 |
+
pad = "=" * (-len(s) % 4)
|
| 27 |
+
return base64.urlsafe_b64decode((s + pad).encode("ascii"))
|
| 28 |
+
|
| 29 |
+
def _sign(payload_bytes: bytes) -> str:
|
| 30 |
+
mac = hmac.new(_SECRET, payload_bytes, hashlib.sha256).digest()
|
| 31 |
+
return _b64url_encode(mac)
|
| 32 |
+
|
| 33 |
+
def make_tracking_token(payload: Dict[str, Any]) -> str:
|
| 34 |
+
"""
|
| 35 |
+
payload を JSON にして HMAC 署名し、'<b64json>.<sig>' 形式で返す。
|
| 36 |
+
例: {"company":"Test Inc.","ts":1690000000,"redirect":"/"}
|
| 37 |
+
"""
|
| 38 |
ensure_dirs()
|
| 39 |
+
payload = dict(payload or {})
|
| 40 |
+
if "ts" not in payload:
|
| 41 |
+
payload["ts"] = int(time.time())
|
| 42 |
+
b = json.dumps(payload, ensure_ascii=False, separators=(",", ":")).encode("utf-8")
|
| 43 |
+
return f"{_b64url_encode(b)}.{_sign(b)}"
|
| 44 |
|
| 45 |
+
def verify_tracking_token(token: str) -> Optional[Dict[str, Any]]:
|
| 46 |
+
"""
|
| 47 |
+
トークンの署名検証に成功したら payload を返す。失敗したら None。
|
| 48 |
+
"""
|
| 49 |
try:
|
| 50 |
+
part_json, part_sig = token.split(".", 1)
|
| 51 |
+
b = _b64url_decode(part_json)
|
| 52 |
+
expected = _sign(b)
|
| 53 |
+
# 時間差攻撃対策の恒等比較
|
| 54 |
+
if not hmac.compare_digest(part_sig, expected):
|
| 55 |
return None
|
| 56 |
+
return json.loads(b.decode("utf-8"))
|
| 57 |
except Exception:
|
| 58 |
return None
|
| 59 |
+
|
| 60 |
+
# ====== クリック/イベントの簡易ログ ======
|
| 61 |
+
EVENTS_PATH = DATA_DIR / "events.jsonl"
|
| 62 |
+
|
| 63 |
+
def log_event(event_type: str, payload: Dict[str, Any], meta: Optional[Dict[str, Any]] = None) -> None:
|
| 64 |
+
"""
|
| 65 |
+
data/events.jsonl に1行追記。Spaceの Files タブから確認可能。
|
| 66 |
+
"""
|
| 67 |
+
ensure_dirs()
|
| 68 |
+
rec = {
|
| 69 |
+
"ts": int(time.time()),
|
| 70 |
+
"type": event_type,
|
| 71 |
+
"payload": payload or {},
|
| 72 |
+
"meta": meta or {},
|
| 73 |
+
}
|
| 74 |
+
with open(EVENTS_PATH, "a", encoding="utf-8") as f:
|
| 75 |
+
f.write(json.dumps(rec, ensure_ascii=False) + "\n")
|