| | import os |
| | import asyncio, json, redis, duckdb |
| | from app.db import get_conn, ensure_raw_table |
| | from app.ingest import ingest_dict |
| |
|
| | r = redis.from_url(os.getenv("REDIS_URL")) |
| | STREAM_KEY = "pos_stream:{org_id}" |
| |
|
| | async def stream_consumer(org_id: str): |
| | conn = get_conn(org_id) |
| | ensure_raw_table(conn) |
| | while True: |
| | msgs = r.xread({STREAM_KEY.format(org_id=org_id): '$'}, count=100, block=5000) |
| | if msgs: |
| | _, entries = msgs[0] |
| | for _, data in entries: |
| | ingest_dict(org_id, json.loads(data[b'row'])) |
| | await asyncio.sleep(1) |