File size: 4,676 Bytes
f589dab
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
"""
producers/consumer.py β€” Aggregate consumer for all platform topics.

Reads from raw.twitter, raw.instagram, raw.youtube concurrently.
Deduplicates by content hash (xxhash), upserts into Qdrant + Memgraph.

Architecture:
  Redpanda topics β†’ AIOKafkaConsumer β†’ dedup Set β†’ embed (BGE-M3)
                                                  β†’ Qdrant upsert
                                                  β†’ Memgraph MERGE
"""
from __future__ import annotations
import asyncio, json, os, time, xxhash
from typing import Any
from aiokafka import AIOKafkaConsumer

BROKERS = os.getenv("REDPANDA_BROKERS", "localhost:9092")
TOPICS = ["raw.twitter", "raw.instagram", "raw.youtube"]
CONSUMER_GROUP = "fact-engine-ingest"

# In-process dedup set (resets on restart β€” Redis Bloom filter for production)
_seen_hashes: set[str] = set()


async def upsert_to_qdrant(doc: dict[str, Any], vector: list[float]) -> None:
    """Upsert claim document + embedding into Qdrant 'claims' collection."""
    try:
        from qdrant_client import QdrantClient
        from qdrant_client.models import PointStruct
        client = QdrantClient(url=os.getenv("QDRANT_URL", "http://localhost:6333"), timeout=5)
        content_hash = xxhash.xxh64(doc["text"].encode()).hexdigest()
        point = PointStruct(
            id=abs(hash(content_hash)) % (2**63),  # Qdrant needs uint64
            vector=vector,
            payload={
                "text": doc["text"][:1000],
                "source_url": doc.get("source_url", ""),
                "domain": doc.get("domain", "unknown"),
                "author_verified": doc.get("verified", False),
                "platform": doc.get("platform", "web"),
                "ingested_at": time.time(),
                "hash": content_hash,
            },
        )
        client.upsert(collection_name="claims", points=[point])
    except Exception as exc:
        print(f"[consumer] Qdrant upsert error: {exc}")


async def upsert_to_memgraph(doc: dict[str, Any]) -> None:
    """MERGE Author β†’ Claim node in Memgraph trust graph."""
    try:
        import neo4j
        content_hash = xxhash.xxh64(doc["text"].encode()).hexdigest()
        driver = neo4j.GraphDatabase.driver(
            f"bolt://{os.getenv('MEMGRAPH_HOST','localhost')}:{os.getenv('MEMGRAPH_PORT','7687')}",
            auth=("", ""),
        )
        cypher = """
        MERGE (a:Author {handle: $handle})
        SET a.verified = $verified, a.account_type = $account_type
        MERGE (c:Claim {hash: $hash})
        SET c.text = $text, c.ingested_at = $ts
        MERGE (a)-[:REPORTED {timestamp: $ts}]->(c)
        """
        with driver.session() as session:
            session.run(cypher, {
                "handle": doc.get("author", "unknown"),
                "verified": doc.get("verified", False),
                "account_type": doc.get("account_type", "user"),
                "hash": content_hash,
                "text": doc["text"][:500],
                "ts": time.time(),
            })
        driver.close()
    except Exception as exc:
        print(f"[consumer] Memgraph upsert error: {exc}")


async def process_message(msg_value: dict[str, Any]) -> None:
    """Deduplicate β†’ embed β†’ upsert to both stores."""
    text = msg_value.get("text", "").strip()
    if not text or len(text.split()) < 5:
        return

    content_hash = xxhash.xxh64(text.encode()).hexdigest()
    if content_hash in _seen_hashes:
        return
    _seen_hashes.add(content_hash)
    # Cap set size
    if len(_seen_hashes) > 100_000:
        _seen_hashes.clear()

    # Embed
    try:
        from fastembed import TextEmbedding
        model = TextEmbedding(model_name="BAAI/bge-m3", max_length=512,
                              cache_dir=os.getenv("EMBED_CACHE_DIR", "/tmp/fastembed_cache"))
        vector = list(next(model.embed([text])))
    except Exception:
        vector = [0.0] * 1024

    await asyncio.gather(
        upsert_to_qdrant(msg_value, vector),
        upsert_to_memgraph(msg_value),
    )
    print(f"[consumer] Ingested: {text[:60]}... (hash={content_hash[:8]})")


async def run_consumer() -> None:
    consumer = AIOKafkaConsumer(
        *TOPICS,
        bootstrap_servers=BROKERS,
        group_id=CONSUMER_GROUP,
        value_deserializer=lambda v: json.loads(v.decode()),
        auto_offset_reset="latest",
        enable_auto_commit=True,
    )
    await consumer.start()
    print(f"[consumer] Subscribed to {TOPICS}")
    try:
        async for msg in consumer:
            await process_message(msg.value)
    finally:
        await consumer.stop()


if __name__ == "__main__":
    asyncio.run(run_consumer())