CryptoRAG / rag /pipeline.py
Nullpointer-KK's picture
add RAG
8bd9348
from __future__ import annotations
from typing import List, Dict, Any
from openai import OpenAI
from .utils import HybridIndex, Reranker, Doc, select_fewshots
from .ingest import build_docs_from_paths, build_docs_from_urls
from prompts import SYSTEM_PROMPT, FEWSHOTS
class CryptoRAGPipeline:
def __init__(self, dense_model: str = "sentence-transformers/all-MiniLM-L6-v2", reranker_model: str = "cross-encoder/ms-marco-MiniLM-L-6-v2"):
self.index = HybridIndex(dense_model_name=dense_model)
self.reranker = Reranker(reranker_model)
self.client: OpenAI | None = None
def set_openai(self, api_key: str):
self.client = OpenAI(api_key=api_key)
def add_local_files(self, paths: List[str]):
docs = build_docs_from_paths(paths, source_label="local")
self.index.add(docs)
def add_urls(self, urls: List[str]):
docs = build_docs_from_urls(urls, source_label="web")
self.index.add(docs)
def build(self):
self.index.build()
def route(self, query: str) -> str:
q = query.lower()
if any(k in q for k in ["price", "market cap", "marketcap", "ath", "all-time high", "24h", "fear greed", "greed index"]):
return "tools"
return "rag"
def build_prompt(self, query: str, contexts: List[Doc]) -> str:
fs = select_fewshots(query, FEWSHOTS, self.index.embedder, n=2)
few = "\n\n".join([f"Q: {x['q']}\nA: {x['a']}" for x in fs])
ctx = "\n\n".join([f"[{i+1}] {c.text[:1200]}" for i, c in enumerate(contexts)])
prompt = f"""{SYSTEM_PROMPT}
Few-shot examples:
{few}
Context (use to answer if relevant; cite [#]):
{ctx}
User question: {query}
Answer:"""
return prompt
def answer_stream(self, query: str, contexts: List[Doc], model: str = "gpt-4o-mini"):
assert self.client is not None, "LLM client not set"
prompt = self.build_prompt(query, contexts)
with self.client.chat.completions.create(
model=model,
messages=[{"role":"system","content":SYSTEM_PROMPT},
{"role":"user","content":prompt}],
stream=True,
temperature=0.3,
max_tokens=400
) as stream:
for event in stream:
if hasattr(event, "choices") and event.choices:
delta = event.choices[0].delta
if delta and delta.content:
yield delta.content
def ask(self, query: str, k: int = 8, alpha: float = 0.5, top_k_rerank: int = 5, filters: Dict[str, Any] | None = None, stream: bool = True):
route = self.route(query)
if route == "tools":
return {"route": "tools", "contexts": []}
# Try auto-build if needed
if not self.index.ready():
self.index.build()
if not self.index.ready():
return {"route": "not_ready", "contexts": [], "reason": "index_empty" if len(self.index.docs)==0 else "build_failed"}
hits = self.index.search(query, k=k, alpha=alpha, filters=filters)
if not hits:
return {"route": "not_ready", "contexts": [], "reason": "no_results"}
reranked = self.reranker.rerank(query, hits, top_k=top_k_rerank)
top_contexts = [d for d,_ in reranked]
return {"route": "rag", "contexts": top_contexts}