Spaces:
Paused
Paused
| import os | |
| import asyncio, json, redis, duckdb | |
| from app.db import get_conn, ensure_raw_table | |
| from app.ingest import ingest_dict | |
| r = redis.from_url(os.getenv("REDIS_URL")) | |
| STREAM_KEY = "pos_stream:{org_id}" # one stream per tenant | |
| async def stream_consumer(org_id: str): | |
| conn = get_conn(org_id) | |
| ensure_raw_table(conn) | |
| while True: | |
| msgs = r.xread({STREAM_KEY.format(org_id=org_id): '$'}, count=100, block=5000) | |
| if msgs: | |
| _, entries = msgs[0] | |
| for _, data in entries: | |
| ingest_dict(org_id, json.loads(data[b'row'])) | |
| await asyncio.sleep(1) # 1 s micro-batch |