Spaces:
Sleeping
Sleeping
| """ | |
| ๊ฐ๋จํ ์ธ์ /๋ฃธ โ ์ฌ์ฉ์ ๋งคํ ๋ ์ง์คํธ๋ฆฌ | |
| - ํ๋ก์ธ์ค ๋ฉ๋ชจ๋ฆฌ ๊ธฐ๋ฐ (์๋น์ค ์ฌ์์ ์ ์ด๊ธฐํ) | |
| - ์ ๋ก๋/์์ฑ ๊ฐ user_id ๋ถ์ผ์น ๋ณด์ ์ฉ | |
| """ | |
| from typing import Optional, Dict, Any | |
| import time | |
| import os | |
| _room_to_user: Dict[str, str] = {} | |
| _session_to_user: Dict[str, str] = {} | |
| _last_user: Optional[str] = None | |
| _last_updated_at: float = 0.0 | |
| _room_flags: Dict[str, Dict[str, Dict[str, Any]]] = {} | |
| _room_last_document: Dict[str, str] = {} | |
| def set_user_for_room(room_id: Optional[str], user_id: Optional[str]) -> None: | |
| if not room_id or not user_id: | |
| return | |
| _room_to_user[str(room_id)] = str(user_id) | |
| set_last_user(user_id) | |
| def get_user_for_room(room_id: Optional[str]) -> Optional[str]: | |
| if not room_id: | |
| return None | |
| return _room_to_user.get(str(room_id)) | |
| def set_user_for_session(session_id: Optional[str], user_id: Optional[str]) -> None: | |
| if not session_id or not user_id: | |
| return | |
| _session_to_user[str(session_id)] = str(user_id) | |
| set_last_user(user_id) | |
| def get_user_for_session(session_id: Optional[str]) -> Optional[str]: | |
| if not session_id: | |
| return None | |
| return _session_to_user.get(str(session_id)) | |
| def set_last_user(user_id: Optional[str]) -> None: | |
| global _last_user, _last_updated_at | |
| if not user_id: | |
| return | |
| _last_user = str(user_id) | |
| _last_updated_at = time.time() | |
| def get_last_user() -> Optional[str]: | |
| return _last_user | |
| def clear() -> None: | |
| _room_to_user.clear() | |
| _session_to_user.clear() | |
| global _last_user, _last_updated_at | |
| _last_user = None | |
| _last_updated_at = 0.0 | |
| _room_flags.clear() | |
| _room_last_document.clear() | |
| # ---- room-scoped one-shot flags ---- | |
| def set_flag_for_room(room_id: Optional[str], key: str, value: bool = True, ttl_seconds: Optional[float] = None) -> None: | |
| if not room_id or not key: | |
| return | |
| rid = str(room_id) | |
| flags = _room_flags.get(rid) | |
| if flags is None: | |
| flags = {} | |
| _room_flags[rid] = flags | |
| # TTL ๊ฒฐ์ : ์ธ์ > ํ๊ฒฝ๋ณ์ > ๊ธฐ๋ณธ 120์ด | |
| if ttl_seconds is None: | |
| try: | |
| env_ttl = os.getenv('LILY_RAG_IMAGE_FLAG_TTL_SECONDS') | |
| ttl_seconds = float(env_ttl) if env_ttl is not None else 120.0 | |
| except Exception: | |
| ttl_seconds = 120.0 | |
| expires_at = time.time() + float(ttl_seconds) if ttl_seconds and ttl_seconds > 0 else None | |
| flags[str(key)] = {"value": bool(value), "expires_at": expires_at} | |
| def get_flag_for_room(room_id: Optional[str], key: str) -> Optional[bool]: | |
| if not room_id or not key: | |
| return None | |
| rid = str(room_id) | |
| flags = _room_flags.get(rid) or {} | |
| entry = flags.get(str(key)) | |
| if not entry: | |
| return None | |
| expires_at = entry.get("expires_at") | |
| if expires_at is not None and time.time() > expires_at: | |
| # ๋ง๋ฃ๋๋ฉด ์ ๊ฑฐ | |
| try: | |
| flags.pop(str(key), None) | |
| except Exception: | |
| pass | |
| return None | |
| return bool(entry.get("value")) | |
| def pop_flag_for_room(room_id: Optional[str], key: str) -> Optional[bool]: | |
| if not room_id or not key: | |
| return None | |
| rid = str(room_id) | |
| flags = _room_flags.get(rid) | |
| if not flags: | |
| return None | |
| entry = flags.get(str(key)) | |
| if not entry: | |
| return None | |
| expires_at = entry.get("expires_at") | |
| if expires_at is not None and time.time() > expires_at: | |
| # ๋ง๋ฃ๋ ๊ฒฝ์ฐ ์๋น ์์ด ์ ๊ฑฐ | |
| try: | |
| flags.pop(str(key), None) | |
| except Exception: | |
| pass | |
| return None | |
| try: | |
| removed = flags.pop(str(key), None) | |
| except Exception: | |
| removed = None | |
| if removed is None: | |
| return None | |
| return bool(removed.get("value")) | |
| # ---- room-scoped last document id ---- | |
| def set_last_document_for_room(room_id: Optional[str], document_id: Optional[str]) -> None: | |
| if not room_id or not document_id: | |
| return | |
| _room_last_document[str(room_id)] = str(document_id) | |
| def get_last_document_for_room(room_id: Optional[str]) -> Optional[str]: | |
| if not room_id: | |
| return None | |
| return _room_last_document.get(str(room_id)) | |