shreyask Claude Opus 4.6 commited on
Commit
a1d52ca
Β·
verified Β·
1 Parent(s): 997db9a

add pipeline orchestrator as AsyncGenerator yielding stage events

Browse files

Ties together query expansion, BM25/vector search, RRF fusion,
reranking, and score blending into a single async pipeline that
React can subscribe to for progressive UI updates.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

Files changed (1) hide show
  1. src/pipeline/orchestrator.ts +118 -0
src/pipeline/orchestrator.ts ADDED
@@ -0,0 +1,118 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import type {
2
+ Chunk,
3
+ EmbeddedChunk,
4
+ ScoredChunk,
5
+ ExpandedQuery,
6
+ PipelineEvent,
7
+ } from "../types";
8
+ import { expandQuery } from "./expansion";
9
+ import { embedQuery } from "./embeddings";
10
+ import { vectorSearch } from "./vectorSearch";
11
+ import { BM25Index } from "./bm25";
12
+ import { reciprocalRankFusion } from "./rrf";
13
+ import { scoreDocument } from "./rerank";
14
+ import { blendScores } from "./blend";
15
+
16
+ export interface PipelineInput {
17
+ query: string;
18
+ chunks: Chunk[];
19
+ embeddedChunks: EmbeddedChunk[];
20
+ bm25Index: BM25Index;
21
+ }
22
+
23
+ export async function* runPipeline(
24
+ input: PipelineInput,
25
+ ): AsyncGenerator<PipelineEvent> {
26
+ const { query, chunks, embeddedChunks, bm25Index } = input;
27
+
28
+ // ── Stage 1: Query Expansion ──
29
+ yield { stage: "expansion", status: "running" };
30
+ let expanded: ExpandedQuery;
31
+ try {
32
+ expanded = await expandQuery(query);
33
+ yield { stage: "expansion", status: "done", data: expanded };
34
+ } catch (err) {
35
+ yield { stage: "expansion", status: "error", error: String(err) };
36
+ return;
37
+ }
38
+
39
+ // ── Stage 2: Parallel Search ──
40
+ yield { stage: "search", status: "running" };
41
+
42
+ // BM25 searches:
43
+ // 1. Original query -> BM25 (dashed line in architecture diagram)
44
+ // 2. Lex keywords -> BM25
45
+ // 3. Vec sentences -> BM25
46
+ const bm25Original = bm25Index.search(query, 20);
47
+ const bm25Lex = bm25Index.search(expanded.lex, 20);
48
+ const bm25Vec = expanded.vec.flatMap((v) => bm25Index.search(v, 20));
49
+
50
+ // Vector searches:
51
+ // 1. HyDE -> Vector Search
52
+ // 2. Vec sentences -> Vector Search
53
+ const hydeEmbedding = await embedQuery(expanded.hyde);
54
+ const vecEmbeddings = await Promise.all(
55
+ expanded.vec.map((v) => embedQuery(v)),
56
+ );
57
+
58
+ const vecHyde = vectorSearch(hydeEmbedding, embeddedChunks, 20);
59
+ const vecVec = vecEmbeddings.flatMap((emb) =>
60
+ vectorSearch(emb, embeddedChunks, 20),
61
+ );
62
+
63
+ // Combine all hits for UI display
64
+ const allBm25: ScoredChunk[] = [...bm25Original, ...bm25Lex, ...bm25Vec];
65
+ const allVector: ScoredChunk[] = [...vecHyde, ...vecVec];
66
+
67
+ yield {
68
+ stage: "search",
69
+ status: "done",
70
+ data: { bm25Hits: allBm25, vectorHits: allVector },
71
+ };
72
+
73
+ // ── Stage 3: RRF Fusion ──
74
+ // Build ranked lists for RRF (order matters for weights: first 2 get 2x)
75
+ const rrfLists = [
76
+ { results: bm25Original, queryType: "original" as const, query },
77
+ { results: vecHyde, queryType: "hyde" as const, query: expanded.hyde },
78
+ { results: bm25Lex, queryType: "lex" as const, query: expanded.lex },
79
+ ...expanded.vec.map((v, i) => ({
80
+ results: vecVec.slice(i * 20, (i + 1) * 20),
81
+ queryType: "vec" as const,
82
+ query: v,
83
+ })),
84
+ ...expanded.vec.map((v, i) => ({
85
+ results: bm25Vec.slice(i * 20, (i + 1) * 20),
86
+ queryType: "vec" as const,
87
+ query: v,
88
+ })),
89
+ ];
90
+
91
+ const rrfResults = reciprocalRankFusion(rrfLists);
92
+ yield { stage: "rrf", status: "done", data: { merged: rrfResults } };
93
+
94
+ // ── Stage 4: Reranking ──
95
+ yield { stage: "rerank", status: "running" };
96
+ const rerankScores = new Map<string, number>();
97
+ for (const result of rrfResults) {
98
+ const score = await scoreDocument(query, result.bestChunk);
99
+ rerankScores.set(result.docId, score);
100
+ }
101
+
102
+ // Build reranked view for "before/after" visualization
103
+ const rerankedResults = rrfResults.map((r) => ({
104
+ ...r,
105
+ rerankScore: rerankScores.get(r.docId) ?? 0,
106
+ blendedScore: 0, // computed in blend step
107
+ }));
108
+
109
+ yield {
110
+ stage: "rerank",
111
+ status: "done",
112
+ data: { before: rrfResults, after: rerankedResults },
113
+ };
114
+
115
+ // ── Stage 5: Score Blending ──
116
+ const finalResults = blendScores(rrfResults, rerankScores);
117
+ yield { stage: "blend", status: "done", data: { finalResults } };
118
+ }