Spaces:
Runtime error
Runtime error
File size: 2,263 Bytes
4390951 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 |
import redis
# Connect to Redis
r = redis.Redis(host='ubuntu', port=6379, decode_responses=True)
# Set a value
r.set('foo', 'bar')
# Get a value
value = r.get('foo')
print(value) # Output: bar
print("Redis client is running...", r.ping())
import asyncio
from redis.asyncio import Redis
from functools import wraps
from typing import Callable, Dict
class AsyncRedisPubSub:
def __init__(self, host="ubuntu", port=6379, db=0):
self.redis = Redis(host=host, port=port, db=db)
self.channel_handlers: Dict[str, Callable[[str], None]] = {}
self._subscriber_started = False
self._lock = asyncio.Lock()
def subscribe(self, channel: str):
def decorator(func: Callable[[str], None]):
self.channel_handlers[channel] = func
asyncio.create_task(self._ensure_subscriber())
return func
return decorator
async def publish(self, channel: str, message: str):
await self.redis.publish(channel, message)
async def _ensure_subscriber(self):
async with self._lock:
if self._subscriber_started:
return
self._subscriber_started = True
await self._start_subscriber()
async def _start_subscriber(self):
pubsub = self.redis.pubsub()
await pubsub.subscribe(*self.channel_handlers.keys())
print(f"Subscribed to: {', '.join(self.channel_handlers.keys())}")
async def listen():
async for message in pubsub.listen():
if message["type"] != "message":
continue
channel = message["channel"].decode()
data = message["data"].decode()
handler = self.channel_handlers.get(channel)
if handler:
await self._maybe_async(handler, data)
asyncio.create_task(listen())
async def _maybe_async(self, func: Callable, *args, **kwargs):
result = func(*args, **kwargs)
if asyncio.iscoroutine(result):
await result
pubsub = AsyncRedisPubSub()
@pubsub.subscribe("chat")
def handle_chat(msg):
print(f"[chat redis] Received: {msg}")
@pubsub.subscribe("news")
def handle_news(msg):
print(f"[news] Breaking: {msg}")
|