File size: 3,367 Bytes
8bd9348
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
from __future__ import annotations
from typing import List, Dict, Any

from openai import OpenAI
from .utils import HybridIndex, Reranker, Doc, select_fewshots
from .ingest import build_docs_from_paths, build_docs_from_urls
from prompts import SYSTEM_PROMPT, FEWSHOTS

class CryptoRAGPipeline:
    def __init__(self, dense_model: str = "sentence-transformers/all-MiniLM-L6-v2", reranker_model: str = "cross-encoder/ms-marco-MiniLM-L-6-v2"):
        self.index = HybridIndex(dense_model_name=dense_model)
        self.reranker = Reranker(reranker_model)
        self.client: OpenAI | None = None

    def set_openai(self, api_key: str):
        self.client = OpenAI(api_key=api_key)

    def add_local_files(self, paths: List[str]):
        docs = build_docs_from_paths(paths, source_label="local")
        self.index.add(docs)

    def add_urls(self, urls: List[str]):
        docs = build_docs_from_urls(urls, source_label="web")
        self.index.add(docs)

    def build(self):
        self.index.build()

    def route(self, query: str) -> str:
        q = query.lower()
        if any(k in q for k in ["price", "market cap", "marketcap", "ath", "all-time high", "24h", "fear greed", "greed index"]):
            return "tools"
        return "rag"

    def build_prompt(self, query: str, contexts: List[Doc]) -> str:
        fs = select_fewshots(query, FEWSHOTS, self.index.embedder, n=2)
        few = "\n\n".join([f"Q: {x['q']}\nA: {x['a']}" for x in fs])
        ctx = "\n\n".join([f"[{i+1}] {c.text[:1200]}" for i, c in enumerate(contexts)])
        prompt = f"""{SYSTEM_PROMPT}

Few-shot examples:
{few}

Context (use to answer if relevant; cite [#]):
{ctx}

User question: {query}

Answer:"""
        return prompt

    def answer_stream(self, query: str, contexts: List[Doc], model: str = "gpt-4o-mini"):
        assert self.client is not None, "LLM client not set"
        prompt = self.build_prompt(query, contexts)
        with self.client.chat.completions.create(
            model=model,
            messages=[{"role":"system","content":SYSTEM_PROMPT},
                      {"role":"user","content":prompt}],
            stream=True,
            temperature=0.3,
            max_tokens=400
        ) as stream:
            for event in stream:
                if hasattr(event, "choices") and event.choices:
                    delta = event.choices[0].delta
                    if delta and delta.content:
                        yield delta.content

    def ask(self, query: str, k: int = 8, alpha: float = 0.5, top_k_rerank: int = 5, filters: Dict[str, Any] | None = None, stream: bool = True):
        route = self.route(query)
        if route == "tools":
            return {"route": "tools", "contexts": []}

        # Try auto-build if needed
        if not self.index.ready():
            self.index.build()
            if not self.index.ready():
                return {"route": "not_ready", "contexts": [], "reason": "index_empty" if len(self.index.docs)==0 else "build_failed"}

        hits = self.index.search(query, k=k, alpha=alpha, filters=filters)
        if not hits:
            return {"route": "not_ready", "contexts": [], "reason": "no_results"}

        reranked = self.reranker.rerank(query, hits, top_k=top_k_rerank)
        top_contexts = [d for d,_ in reranked]
        return {"route": "rag", "contexts": top_contexts}