Spaces:
Sleeping
Sleeping
| import os | |
| import json | |
| import time | |
| import base64 | |
| import hmac | |
| import hashlib | |
| from pathlib import Path | |
| from typing import Any, Dict, Optional, List | |
| # ------------------------------------------------------------ | |
| # 書き込みディレクトリの自動選択(/data → /cache → /tmp → 最後に ./data) | |
| # ------------------------------------------------------------ | |
| _DATA_DIR: Optional[Path] = None | |
| _EXPORT_DIR: Optional[Path] = None | |
| _HF_INIT_DONE: bool = False | |
| def _is_writable(p: Path) -> bool: | |
| try: | |
| p.mkdir(parents=True, exist_ok=True) | |
| testfile = p / ".w_test" | |
| with open(testfile, "w", encoding="utf-8") as f: | |
| f.write("ok") | |
| testfile.unlink(missing_ok=True) | |
| return True | |
| except Exception: | |
| return False | |
| def _pick_writable_dir(candidates: List[Path]) -> Path: | |
| for p in candidates: | |
| if _is_writable(p): | |
| return p | |
| # 全滅時は最終手段として /tmp に落とす | |
| fallback = Path("/tmp/agent_studio") | |
| fallback.mkdir(parents=True, exist_ok=True) | |
| return fallback | |
| def _init_hf_env(base: Path) -> None: | |
| """ | |
| Hugging Face / Transformers / Sentence-Transformers の | |
| キャッシュ&ホームをすべて base 配下に固定して PermissionError を回避。 | |
| """ | |
| global _HF_INIT_DONE | |
| if _HF_INIT_DONE: | |
| return | |
| hf_home = base / "hf_home" | |
| hf_cache = base / "hf_cache" | |
| hf_home.mkdir(parents=True, exist_ok=True) | |
| hf_cache.mkdir(parents=True, exist_ok=True) | |
| # 主要な環境変数を強制設定(既存設定より優先) | |
| os.environ.setdefault("HF_HOME", str(hf_home)) | |
| os.environ.setdefault("HUGGINGFACE_HUB_CACHE", str(hf_cache)) | |
| os.environ.setdefault("TRANSFORMERS_CACHE", str(hf_cache)) | |
| os.environ.setdefault("SENTENCE_TRANSFORMERS_HOME", str(hf_cache)) | |
| # 余計な参照を抑制 | |
| os.environ.setdefault("HF_HUB_DISABLE_TELEMETRY", "1") | |
| os.environ.setdefault("HF_HUB_DISABLE_PROGRESS_BARS", "1") | |
| # 既定のトークン探索を避けるため空文字に(公開モデルの匿名DLを想定) | |
| os.environ.setdefault("HF_TOKEN", "") | |
| _HF_INIT_DONE = True | |
| def ensure_dirs() -> None: | |
| """書き込み可能なデータディレクトリを決定して作成。""" | |
| global _DATA_DIR, _EXPORT_DIR | |
| if _DATA_DIR is not None and _EXPORT_DIR is not None: | |
| return # 既に確定済み | |
| # 優先順: 環境変数 DATA_DIR → /data → /cache → /tmp → ./data | |
| env_dir = os.getenv("DATA_DIR") | |
| candidates = [] | |
| if env_dir: | |
| candidates.append(Path(env_dir)) | |
| candidates.extend([ | |
| Path("/data/agent_studio"), | |
| Path("/cache/agent_studio"), | |
| Path("/tmp/agent_studio"), | |
| Path("data"), # 最後に相対パス | |
| ]) | |
| chosen = _pick_writable_dir(candidates) | |
| export = chosen / "exports" | |
| chosen.mkdir(parents=True, exist_ok=True) | |
| export.mkdir(parents=True, exist_ok=True) | |
| _DATA_DIR = chosen | |
| _EXPORT_DIR = export | |
| # ★ Hugging Face 関連のホーム/キャッシュを、この書き込み可能ベースに固定 | |
| _init_hf_env(chosen) | |
| def data_dir() -> Path: | |
| ensure_dirs() | |
| return _DATA_DIR # type: ignore | |
| def export_dir() -> Path: | |
| ensure_dirs() | |
| return _EXPORT_DIR # type: ignore | |
| # ====== 文字列分割ユーティリティ(rag_indexer が利用) ====== | |
| def chunk_text(text: str, max_chars: int = 1200, overlap: int = 200) -> List[str]: | |
| """ | |
| text を最大 max_chars のチャンクに分割する(重なり overlap 文字)。 | |
| 句点・改行の境界を優先して分割し、見つからなければ生のスライスで分割。 | |
| """ | |
| if not text: | |
| return [] | |
| text = str(text) | |
| max_chars = max(1, int(max_chars)) | |
| overlap = max(0, min(int(overlap), max_chars - 1)) | |
| chunks: List[str] = [] | |
| i = 0 | |
| n = len(text) | |
| while i < n: | |
| end = min(i + max_chars, n) | |
| window = text[i:end] | |
| # 末尾から句点・改行を探してそこまでを優先 | |
| cut = -1 | |
| for sep in ["\n\n", "。\n", "。", "\n", "!", "?", ".", "!", "?"]: | |
| pos = window.rfind(sep) | |
| if pos != -1 and (i + pos + len(sep)) - i >= max_chars * 0.6: | |
| cut = pos + len(sep) | |
| break | |
| if cut == -1: | |
| cut = len(window) | |
| piece = window[:cut].strip() | |
| if piece: | |
| chunks.append(piece) | |
| if i + cut >= n: | |
| break | |
| # 次の開始位置(オーバーラップあり) | |
| i = i + cut - overlap | |
| if i < 0: | |
| i = 0 | |
| return chunks | |
| # ====== 簡易トラッキングトークン(追加依存なしのHMAC方式) ====== | |
| _SECRET = os.getenv("TRACKING_SECRET", "dev-secret").encode("utf-8") | |
| def _b64url_encode(b: bytes) -> str: | |
| return base64.urlsafe_b64encode(b).rstrip(b"=").decode("ascii") | |
| def _b64url_decode(s: str) -> bytes: | |
| pad = "=" * (-len(s) % 4) | |
| return base64.urlsafe_b64decode((s + pad).encode("ascii")) | |
| def _sign(payload_bytes: bytes) -> str: | |
| mac = hmac.new(_SECRET, payload_bytes, hashlib.sha256).digest() | |
| return _b64url_encode(mac) | |
| def make_tracking_token(payload: Dict[str, Any]) -> str: | |
| """ | |
| payload を JSON にして HMAC 署名し、'<b64json>.<sig>' 形式で返す。 | |
| 例: {"company":"Test Inc.","ts":1690000000,"redirect":"/"} | |
| """ | |
| ensure_dirs() | |
| payload = dict(payload or {}) | |
| if "ts" not in payload: | |
| payload["ts"] = int(time.time()) | |
| b = json.dumps(payload, ensure_ascii=False, separators=(",", ":")).encode("utf-8") | |
| return f"{_b64url_encode(b)}.{_sign(b)}" | |
| def verify_tracking_token(token: str) -> Optional[Dict[str, Any]]: | |
| """ | |
| トークンの署名検証に成功したら payload を返す。失敗したら None。 | |
| """ | |
| try: | |
| part_json, part_sig = token.split(".", 1) | |
| b = _b64url_decode(part_json) | |
| expected = _sign(b) | |
| if not hmac.compare_digest(part_sig, expected): | |
| return None | |
| return json.loads(b.decode("utf-8")) | |
| except Exception: | |
| return None | |
| # ====== クリック/イベントの簡易ログ ====== | |
| def _events_path() -> Path: | |
| return data_dir() / "events.jsonl" | |
| def log_event(event_type: str, payload: Dict[str, Any], meta: Optional[Dict[str, Any]] = None) -> None: | |
| """ | |
| data/events.jsonl に1行追記。Spaceの Files タブから確認可能。 | |
| 書き込み場所は ensure_dirs() により自動選択される。 | |
| """ | |
| ensure_dirs() | |
| rec = { | |
| "ts": int(time.time()), | |
| "type": event_type, | |
| "payload": payload or {}, | |
| "meta": meta or {}, | |
| } | |
| with open(_events_path(), "a", encoding="utf-8") as f: | |
| f.write(json.dumps(rec, ensure_ascii=False) + "\n") | |