Mutsynchub / app /tasks /ingest_worker.py
shaliz-kong
Initial commit: self-hosted Redis, DuckDB, Analytics Engine
98a466d
raw
history blame contribute delete
629 Bytes
import os
import asyncio, json, redis, duckdb
from app.db import get_conn, ensure_raw_table
from app.ingest import ingest_dict
r = redis.from_url(os.getenv("REDIS_URL"))
STREAM_KEY = "pos_stream:{org_id}" # one stream per tenant
async def stream_consumer(org_id: str):
conn = get_conn(org_id)
ensure_raw_table(conn)
while True:
msgs = r.xread({STREAM_KEY.format(org_id=org_id): '$'}, count=100, block=5000)
if msgs:
_, entries = msgs[0]
for _, data in entries:
ingest_dict(org_id, json.loads(data[b'row']))
await asyncio.sleep(1) # 1 s micro-batch