| """One-shot script to delete a specific chat cache key from Redis. | |
| Usage: | |
| uv run python playground_flush_cache.py | |
| """ | |
| import asyncio | |
| from src.db.redis.connection import get_redis | |
| from src.config.settings import settings | |
| # ββββββββββββββββββββββββββββββββββββββββββββββ | |
| # CONFIG | |
| # ββββββββββββββββββββββββββββββββββββββββββββββ | |
| USER_ID = "8b6c18fd-8971-46e5-b106-35b7afb412e0" | |
| MESSAGE = "Berapa digital rate card untuk whatsapp?" | |
| # Set to True to wipe ALL chat cache keys for the user instead | |
| FLUSH_ALL_USER_CHAT = False | |
| # ββββββββββββββββββββββββββββββββββββββββββββββ | |
| async def main(): | |
| redis = await get_redis() | |
| prefix = f"{settings.redis_prefix}chat:{USER_ID}:" | |
| if FLUSH_ALL_USER_CHAT: | |
| keys = await redis.keys(f"{prefix}*") | |
| if keys: | |
| await redis.delete(*keys) | |
| print(f"Deleted {len(keys)} key(s) matching {prefix}*") | |
| else: | |
| print("No keys found.") | |
| else: | |
| key = f"{prefix}{MESSAGE}" | |
| deleted = await redis.delete(key) | |
| print(f"Deleted {deleted} key: {key!r}") | |
| if __name__ == "__main__": | |
| asyncio.run(main()) | |