File size: 4,676 Bytes
f589dab | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 | """
producers/consumer.py β Aggregate consumer for all platform topics.
Reads from raw.twitter, raw.instagram, raw.youtube concurrently.
Deduplicates by content hash (xxhash), upserts into Qdrant + Memgraph.
Architecture:
Redpanda topics β AIOKafkaConsumer β dedup Set β embed (BGE-M3)
β Qdrant upsert
β Memgraph MERGE
"""
from __future__ import annotations
import asyncio, json, os, time, xxhash
from typing import Any
from aiokafka import AIOKafkaConsumer
BROKERS = os.getenv("REDPANDA_BROKERS", "localhost:9092")
TOPICS = ["raw.twitter", "raw.instagram", "raw.youtube"]
CONSUMER_GROUP = "fact-engine-ingest"
# In-process dedup set (resets on restart β Redis Bloom filter for production)
_seen_hashes: set[str] = set()
async def upsert_to_qdrant(doc: dict[str, Any], vector: list[float]) -> None:
"""Upsert claim document + embedding into Qdrant 'claims' collection."""
try:
from qdrant_client import QdrantClient
from qdrant_client.models import PointStruct
client = QdrantClient(url=os.getenv("QDRANT_URL", "http://localhost:6333"), timeout=5)
content_hash = xxhash.xxh64(doc["text"].encode()).hexdigest()
point = PointStruct(
id=abs(hash(content_hash)) % (2**63), # Qdrant needs uint64
vector=vector,
payload={
"text": doc["text"][:1000],
"source_url": doc.get("source_url", ""),
"domain": doc.get("domain", "unknown"),
"author_verified": doc.get("verified", False),
"platform": doc.get("platform", "web"),
"ingested_at": time.time(),
"hash": content_hash,
},
)
client.upsert(collection_name="claims", points=[point])
except Exception as exc:
print(f"[consumer] Qdrant upsert error: {exc}")
async def upsert_to_memgraph(doc: dict[str, Any]) -> None:
"""MERGE Author β Claim node in Memgraph trust graph."""
try:
import neo4j
content_hash = xxhash.xxh64(doc["text"].encode()).hexdigest()
driver = neo4j.GraphDatabase.driver(
f"bolt://{os.getenv('MEMGRAPH_HOST','localhost')}:{os.getenv('MEMGRAPH_PORT','7687')}",
auth=("", ""),
)
cypher = """
MERGE (a:Author {handle: $handle})
SET a.verified = $verified, a.account_type = $account_type
MERGE (c:Claim {hash: $hash})
SET c.text = $text, c.ingested_at = $ts
MERGE (a)-[:REPORTED {timestamp: $ts}]->(c)
"""
with driver.session() as session:
session.run(cypher, {
"handle": doc.get("author", "unknown"),
"verified": doc.get("verified", False),
"account_type": doc.get("account_type", "user"),
"hash": content_hash,
"text": doc["text"][:500],
"ts": time.time(),
})
driver.close()
except Exception as exc:
print(f"[consumer] Memgraph upsert error: {exc}")
async def process_message(msg_value: dict[str, Any]) -> None:
"""Deduplicate β embed β upsert to both stores."""
text = msg_value.get("text", "").strip()
if not text or len(text.split()) < 5:
return
content_hash = xxhash.xxh64(text.encode()).hexdigest()
if content_hash in _seen_hashes:
return
_seen_hashes.add(content_hash)
# Cap set size
if len(_seen_hashes) > 100_000:
_seen_hashes.clear()
# Embed
try:
from fastembed import TextEmbedding
model = TextEmbedding(model_name="BAAI/bge-m3", max_length=512,
cache_dir=os.getenv("EMBED_CACHE_DIR", "/tmp/fastembed_cache"))
vector = list(next(model.embed([text])))
except Exception:
vector = [0.0] * 1024
await asyncio.gather(
upsert_to_qdrant(msg_value, vector),
upsert_to_memgraph(msg_value),
)
print(f"[consumer] Ingested: {text[:60]}... (hash={content_hash[:8]})")
async def run_consumer() -> None:
consumer = AIOKafkaConsumer(
*TOPICS,
bootstrap_servers=BROKERS,
group_id=CONSUMER_GROUP,
value_deserializer=lambda v: json.loads(v.decode()),
auto_offset_reset="latest",
enable_auto_commit=True,
)
await consumer.start()
print(f"[consumer] Subscribed to {TOPICS}")
try:
async for msg in consumer:
await process_message(msg.value)
finally:
await consumer.stop()
if __name__ == "__main__":
asyncio.run(run_consumer())
|