Corin1998 commited on
Commit
796cdb2
·
verified ·
1 Parent(s): 7bfce56

Create utils.py

Browse files
Files changed (1) hide show
  1. modules/utils.py +41 -0
modules/utils.py ADDED
@@ -0,0 +1,41 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import os, re, json, base64, hmac, hashlib
2
+ from pathlib import Path
3
+ from typing import Iterable, List
4
+
5
+ DATA_DIR = Path("data")
6
+ LOG_PATH = DATA_DIR / "events.jsonl"
7
+
8
+ def ensure_dirs():
9
+ DATA_DIR.mkdir(parents=True, exist_ok=True)
10
+
11
+ def chunk_text(text: str, max_len: int=600) -> Iterable[str]:
12
+ lines = re.split(r"(\n+)", text)
13
+ buff, count = [], 0
14
+ for ln in lines:
15
+ if count + len(ln) > max_len and buff:
16
+ yield "".join(buff).strip()
17
+ buff, count = [], 0
18
+ buff.append(ln)
19
+ if buff:
20
+ yield "".join(buff).strip()
21
+
22
+ def truncate(text: str, n: int) -> str:
23
+ return text if len(text) <= n else text[:n] + "..."
24
+
25
+ def log_event(event_type: str, payload: dict, meta: dict | None=None):
26
+ ensure_dirs()
27
+ rec = {"type": event_type, "payload": payload, "meta": meta or {}}
28
+ with open(LOG_PATH, "a", encoding="utf-8") as f:
29
+ f.write(json.dumps(rec, ensure_ascii=False) + "\n")
30
+
31
+ def verify_tracking_token(token: str):
32
+ try:
33
+ raw = base64.urlsafe_b64decode(token.encode())
34
+ data, sig = raw.rsplit(b".",1)
35
+ secret = os.getenv("TRACKING_SECRET", "dev").encode()
36
+ expected = hmac.new(secret, data, hashlib.sha256).digest()
37
+ if not hmac.compare_digest(sig, expected):
38
+ return None
39
+ return json.loads(data.decode())
40
+ except Exception:
41
+ return None