File size: 5,196 Bytes
a63c61f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
# RAG API Design: Retrieval Architecture

This document focuses specifically on the API layer designed to **retrieve** data from our existing, highly optimized data pipeline. Because the heavy lifting of processing, vectorization (BGE-M3 Dense + Sparse), and indexing is already handled by the Kafka and Qdrant workers, this API is designed purely for **scalable, high-performance retrieval and generation**.

---

## 🎯 1. Core API Philosophy

The RAG API acts as the bridge between user queries (from the frontend) and the populated Qdrant vector database. 

1.  **Read-Only Operations:** This API does *not* write to Qdrant or ClickHouse. It assumes the databases are already hydrated by the Kafka workers.
2.  **Symmetry with Ingestion:** The API must use the exact same BGE-M3 model for hashing user queries that the Embedding Service uses to hash news articles.
3.  **Statelessness:** The API nodes hold no session state, allowing infinite horizontal scaling behind a Load Balancer.

---

## 🌐 2. Core API Endpoints

### 2.1 `POST /api/v1/search` (Hybrid Search Only)
*   **Purpose:** The fastest way to find relevant articles without generating an LLM response. Useful for standard "News Search" bars.
*   **Input Request:**
    ```json
    {
      "query": "Quantum computing breakthroughs in 2026",
      "limit": 10,
      "filters": {
        "source": ["TechCrunch", "Wired"],
        "date_range": { "start": "2026-01-01", "end": "2026-12-31" }
      }
    }
    ```
*   **Internal Flow:**
    1.  Passes the `query` text through the BGE-M3 Tokenizer & Model (synchronously or via lightweight async executor).
    2.  Extracts the `Dense` vector (1024-dim) and `Sparse` lexical weights.
    3.  Queries Qdrant using a `Prefetch` query (combining Dense + Sparse scoring).
    4.  Extracts the Qdrant `payload` (article metadata) and returns it.
*   **Response:** A JSON list of articles sorted by relevance score.

### 2.2 `POST /api/v1/rag/ask` (Full RAG Flow)
*   **Purpose:** The endpoint for natural language Q&A. This hits Qdrant first, then sends the context to the LLM.
*   **Input Request:**
    ```json
    {
      "question": "What did Google recently announce regarding quantum processors?",
      "stream": true, // Critical for UX
      "top_k": 5
    }
    ```
*   **Internal Flow:**
    1.  **Retrieve:** Performs the exact same Hybrid Search as `/api/v1/search` to get the top 5 article chunks.
    2.  **Prompt Assembly:** Constructs a structured prompt template:
        `"Use the following news articles to answer the question...\n\nCONTEXT:\n[Article 1 Text...]\n[Article 2 Text...]\n\nQUESTION: What did Google recently announce..."`
    3.  **Generate:** Sends the assembled prompt to the LLM (OpenAI, local Llama-3, etc.).
    4.  **Stream:** Uses Server-Sent Events (SSE) to yield tokens to the frontend as they are generated.

---

## 🧠 3. Query Vectorization Pipeline (Symmetry)

For Qdrant search to work perfectly, the API must emulate Step 4 of the *Data Flow Pipeline* exactly.

```python
# RAG API Vectorization Logic
def vectorize_query(query_text: str):
    # Uses the SAME FlagEmbedding configuration as the ingestor
    embeddings = model.encode(
        sentences=[query_text],
        batch_size=1,
        max_length=512, # Queries are shorter than articles
        return_dense=True,
        return_sparse=True,
        return_colbert_vecs=False
    )
    
    return {
        "dense": embeddings['dense_vecs'][0].tolist(),
        "sparse": {
            "indices": list(embeddings['lexical_weights'][0].keys()),
            "values": list(embeddings['lexical_weights'][0].values())
        }
    }
```

---

## ⚑ 4. Scalability at the Retrieval Layer

Since the Heavy ETL is done by the pipelines, the API's main bottleneck is **waiting** for Qdrant and the LLM.

### 4.1 Async FastAPI
*   The API is built purely on `async def` endpoints.
*   When the API queries Qdrant (`await qdrant_client.async_search(...)`), it yields the thread back to the event loop.
*   A single FastAPI container can handle thousands of concurrent searches while waiting for Qdrant to respond.

### 4.2 Semantic Query Caching (Redis)
To save LLM compute and Qdrant load:
*   We implement Redis **Semantic Caching**. 
*   If User A asks: *"What is Tesla's stock doing?"* and User B asks *"How is the Tesla stock performing?"*, the semantic cache recognizes the queries are identical in meaning (High Cosine Similarity) and instantly returns User A's cached LLM response to User B.

### 4.3 Streaming (SSE) for LLMs
*   Generating a 500-word RAG answer might take the LLM 3 seconds. Instead of a loading spinner for 3 seconds, the API uses `StreamingResponse`. The user sees the first word in 200ms, creating a "Real-Time" feel.

---

## πŸ“Š 5. Integration with Pipeline Analytics
If the RAG API needs to answer questions like *"How many articles mentioned AI today?"*, it should NOT query Qdrant. 
Qdrant is a Vector Search engine, not an Analytics database.

For structured analytics, the API connects directly to **ClickHouse** (which the Kafka `sink` worker hydrates), allowing real-time aggregations without disturbing the vector search performance.