Spaces:
Sleeping
Sleeping
| from __future__ import annotations | |
| from typing import List, Dict, Any | |
| from openai import OpenAI | |
| from .utils import HybridIndex, Reranker, Doc, select_fewshots | |
| from .ingest import build_docs_from_paths, build_docs_from_urls | |
| from prompts import SYSTEM_PROMPT, FEWSHOTS | |
| class CryptoRAGPipeline: | |
| def __init__(self, dense_model: str = "sentence-transformers/all-MiniLM-L6-v2", reranker_model: str = "cross-encoder/ms-marco-MiniLM-L-6-v2"): | |
| self.index = HybridIndex(dense_model_name=dense_model) | |
| self.reranker = Reranker(reranker_model) | |
| self.client: OpenAI | None = None | |
| def set_openai(self, api_key: str): | |
| self.client = OpenAI(api_key=api_key) | |
| def add_local_files(self, paths: List[str]): | |
| docs = build_docs_from_paths(paths, source_label="local") | |
| self.index.add(docs) | |
| def add_urls(self, urls: List[str]): | |
| docs = build_docs_from_urls(urls, source_label="web") | |
| self.index.add(docs) | |
| def build(self): | |
| self.index.build() | |
| def route(self, query: str) -> str: | |
| q = query.lower() | |
| if any(k in q for k in ["price", "market cap", "marketcap", "ath", "all-time high", "24h", "fear greed", "greed index"]): | |
| return "tools" | |
| return "rag" | |
| def build_prompt(self, query: str, contexts: List[Doc]) -> str: | |
| fs = select_fewshots(query, FEWSHOTS, self.index.embedder, n=2) | |
| few = "\n\n".join([f"Q: {x['q']}\nA: {x['a']}" for x in fs]) | |
| ctx = "\n\n".join([f"[{i+1}] {c.text[:1200]}" for i, c in enumerate(contexts)]) | |
| prompt = f"""{SYSTEM_PROMPT} | |
| Few-shot examples: | |
| {few} | |
| Context (use to answer if relevant; cite [#]): | |
| {ctx} | |
| User question: {query} | |
| Answer:""" | |
| return prompt | |
| def answer_stream(self, query: str, contexts: List[Doc], model: str = "gpt-4o-mini"): | |
| assert self.client is not None, "LLM client not set" | |
| prompt = self.build_prompt(query, contexts) | |
| with self.client.chat.completions.create( | |
| model=model, | |
| messages=[{"role":"system","content":SYSTEM_PROMPT}, | |
| {"role":"user","content":prompt}], | |
| stream=True, | |
| temperature=0.3, | |
| max_tokens=400 | |
| ) as stream: | |
| for event in stream: | |
| if hasattr(event, "choices") and event.choices: | |
| delta = event.choices[0].delta | |
| if delta and delta.content: | |
| yield delta.content | |
| def ask(self, query: str, k: int = 8, alpha: float = 0.5, top_k_rerank: int = 5, filters: Dict[str, Any] | None = None, stream: bool = True): | |
| route = self.route(query) | |
| if route == "tools": | |
| return {"route": "tools", "contexts": []} | |
| # Try auto-build if needed | |
| if not self.index.ready(): | |
| self.index.build() | |
| if not self.index.ready(): | |
| return {"route": "not_ready", "contexts": [], "reason": "index_empty" if len(self.index.docs)==0 else "build_failed"} | |
| hits = self.index.search(query, k=k, alpha=alpha, filters=filters) | |
| if not hits: | |
| return {"route": "not_ready", "contexts": [], "reason": "no_results"} | |
| reranked = self.reranker.rerank(query, hits, top_k=top_k_rerank) | |
| top_contexts = [d for d,_ in reranked] | |
| return {"route": "rag", "contexts": top_contexts} | |