Spaces:
Running
Running
File size: 5,091 Bytes
aa8691d | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 | import re
import sys
from dataclasses import dataclass
from pathlib import Path
import bs4
import numpy as np
import requests
from sambanova import SambaNova
import yaml
from langchain_huggingface import HuggingFaceEmbeddings
@dataclass
class DocumentChunk:
text: str
source: str
vector: np.ndarray
def load_config(path: Path) -> dict:
with path.open("r", encoding="utf-8") as f:
return yaml.safe_load(f)
def scrape_website(url: str) -> str:
response = requests.get(url, timeout=15)
response.raise_for_status()
soup = bs4.BeautifulSoup(response.text, "html.parser")
for tag in soup(["script", "style", "header", "footer", "nav", "aside"]):
tag.decompose()
text = soup.get_text(separator="\n")
text = re.sub(r"\n{2,}", "\n", text).strip()
return text
def split_into_chunks(text: str, chunk_size: int = 400, overlap: int = 100) -> list[str]:
sentences = [s.strip() for s in re.split(r"(?<=[\.\?\!])\s+", text) if s.strip()]
chunks = []
current = ""
for sentence in sentences:
if len(current) + len(sentence) + 1 > chunk_size and current:
chunks.append(current.strip())
current = current[-overlap:] if overlap < len(current) else current
current += " " + sentence
if current.strip():
chunks.append(current.strip())
return chunks
def embed_texts(texts: list[str], embed_model: HuggingFaceEmbeddings = None) -> list[np.ndarray]:
if not texts:
return []
if embed_model:
return embed_model.embed_documents(texts)
def cosine_similarity(a: np.ndarray, b: np.ndarray) -> float:
if np.linalg.norm(a) == 0 or np.linalg.norm(b) == 0:
return 0.0
return float(np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b)))
def build_rag_corpus(config: dict, embed_model: HuggingFaceEmbeddings, url: str) -> list[DocumentChunk]:
print(f"Scraping website: {url}")
page_text = scrape_website(url)
chunks = split_into_chunks(page_text)
print(f"Split content into {len(chunks)} chunks")
embeddings = embed_texts(chunks, embed_model)
return [DocumentChunk(text=chunk, source=url, vector=np.array(vector)) for chunk, vector in zip(chunks, embeddings)]
def retrieve_relevant_chunks(chunks: list[DocumentChunk], question: str, embed_model: HuggingFaceEmbeddings, top_k: int = 4) -> list[DocumentChunk]:
question_embeddings = embed_texts([question], embed_model)
if not question_embeddings:
return chunks[:top_k]
question_vector = np.array(question_embeddings[0])
scored = [
(chunk, cosine_similarity(question_vector, chunk.vector))
for chunk in chunks
]
scored.sort(key=lambda item: item[1], reverse=True)
return [chunk for chunk, _ in scored[:top_k]]
def build_prompt(system_prompt: str, question: str, context_chunks: list[DocumentChunk]) -> str:
context_text = "\n---\n".join(chunk.text for chunk in context_chunks)
return (
f"{system_prompt}\n\n"
f"Use the following extracted website text to answer the question clearly.\n"
f"Context:\n{context_text}\n\n"
f"Question: {question}\n"
)
def create_llm_client(config: dict) -> SambaNova:
return SambaNova(
api_key=config.get("sambanova_api_key"),
base_url="https://api.sambanova.ai/v1",
timeout=30,
)
def ask_model(prompt: str, client: SambaNova) -> str:
response = client.chat.completions.create(
model="DeepSeek-V3.1",
messages=[{"role": "user", "content": prompt}],
max_tokens=1056,
temperature=0.2,
)
return response.choices[0].message.content.strip()
def format_answer(raw: str, chunks: list[DocumentChunk]) -> str:
return raw
def main() -> int:
config_path = Path(__file__).parent / "config.yaml"
if not config_path.exists():
print(f"Missing config file: {config_path}")
return 1
config = load_config(config_path)
llm_api_key = config.get("sambanova_api_key")
website = config.get("website")
system_prompt = config.get("system_prompt", "You are a helpful assistant.")
if not llm_api_key or not website:
print("Please set sambanova_api_key and website in config.yaml")
return 1
embed_model = HuggingFaceEmbeddings(model_name=config.get("embedding_model"))
chunks = build_rag_corpus(config, embed_model, website)
client = create_llm_client(config)
print("RAG corpus ready. Ask a question or type 'exit'.")
while True:
try:
question = input("Question> ").strip()
except EOFError:
break
if not question:
continue
if question.lower() in {"exit", "quit"}:
break
selected = retrieve_relevant_chunks(chunks, question, embed_model)
prompt = build_prompt(system_prompt, question, selected)
raw_answer = ask_model(prompt, client)
response = format_answer(raw_answer, selected)
print(response)
print()
return 0
if __name__ == "__main__":
sys.exit(main())
|