File size: 5,091 Bytes
aa8691d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
import re
import sys
from dataclasses import dataclass
from pathlib import Path

import bs4
import numpy as np
import requests
from sambanova import SambaNova
import yaml
from langchain_huggingface import HuggingFaceEmbeddings

@dataclass
class DocumentChunk:
    text: str
    source: str
    vector: np.ndarray


def load_config(path: Path) -> dict:
    with path.open("r", encoding="utf-8") as f:
        return yaml.safe_load(f)


def scrape_website(url: str) -> str:
    response = requests.get(url, timeout=15)
    response.raise_for_status()
    soup = bs4.BeautifulSoup(response.text, "html.parser")
    for tag in soup(["script", "style", "header", "footer", "nav", "aside"]):
        tag.decompose()
    text = soup.get_text(separator="\n")
    text = re.sub(r"\n{2,}", "\n", text).strip()
    return text


def split_into_chunks(text: str, chunk_size: int = 400, overlap: int = 100) -> list[str]:
    sentences = [s.strip() for s in re.split(r"(?<=[\.\?\!])\s+", text) if s.strip()]
    chunks = []
    current = ""
    for sentence in sentences:
        if len(current) + len(sentence) + 1 > chunk_size and current:
            chunks.append(current.strip())
            current = current[-overlap:] if overlap < len(current) else current
        current += " " + sentence
    if current.strip():
        chunks.append(current.strip())
    return chunks


def embed_texts(texts: list[str], embed_model: HuggingFaceEmbeddings = None) -> list[np.ndarray]:
    if not texts:
        return []
    if embed_model:
        return embed_model.embed_documents(texts)


def cosine_similarity(a: np.ndarray, b: np.ndarray) -> float:
    if np.linalg.norm(a) == 0 or np.linalg.norm(b) == 0:
        return 0.0
    return float(np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b)))


def build_rag_corpus(config: dict, embed_model: HuggingFaceEmbeddings, url: str) -> list[DocumentChunk]:
    print(f"Scraping website: {url}")
    page_text = scrape_website(url)
    chunks = split_into_chunks(page_text)
    print(f"Split content into {len(chunks)} chunks")
    embeddings = embed_texts(chunks, embed_model)
    return [DocumentChunk(text=chunk, source=url, vector=np.array(vector)) for chunk, vector in zip(chunks, embeddings)]


def retrieve_relevant_chunks(chunks: list[DocumentChunk], question: str, embed_model: HuggingFaceEmbeddings, top_k: int = 4) -> list[DocumentChunk]:
    question_embeddings = embed_texts([question], embed_model)
    if not question_embeddings:
        return chunks[:top_k]
    question_vector = np.array(question_embeddings[0])
    scored = [
        (chunk, cosine_similarity(question_vector, chunk.vector))
        for chunk in chunks
    ]
    scored.sort(key=lambda item: item[1], reverse=True)
    return [chunk for chunk, _ in scored[:top_k]]


def build_prompt(system_prompt: str, question: str, context_chunks: list[DocumentChunk]) -> str:
    context_text = "\n---\n".join(chunk.text for chunk in context_chunks)
    return (
        f"{system_prompt}\n\n"
        f"Use the following extracted website text to answer the question clearly.\n"
        f"Context:\n{context_text}\n\n"
        f"Question: {question}\n"
    )


def create_llm_client(config: dict) -> SambaNova:
    return SambaNova(
        api_key=config.get("sambanova_api_key"),
        base_url="https://api.sambanova.ai/v1",
        timeout=30,
    )


def ask_model(prompt: str, client: SambaNova) -> str:
    response = client.chat.completions.create(
        model="DeepSeek-V3.1",
        messages=[{"role": "user", "content": prompt}],
        max_tokens=1056,
        temperature=0.2,
    )
    return response.choices[0].message.content.strip()


def format_answer(raw: str, chunks: list[DocumentChunk]) -> str:
    return raw


def main() -> int:
    config_path = Path(__file__).parent / "config.yaml"
    if not config_path.exists():
        print(f"Missing config file: {config_path}")
        return 1

    config = load_config(config_path)
    llm_api_key = config.get("sambanova_api_key")
    website = config.get("website")
    system_prompt = config.get("system_prompt", "You are a helpful assistant.")

    if not llm_api_key or not website:
        print("Please set sambanova_api_key and website in config.yaml")
        return 1
    embed_model = HuggingFaceEmbeddings(model_name=config.get("embedding_model"))
    chunks = build_rag_corpus(config, embed_model, website)
    client = create_llm_client(config)
    print("RAG corpus ready. Ask a question or type 'exit'.")

    while True:
        try:
            question = input("Question> ").strip()
        except EOFError:
            break
        if not question:
            continue
        if question.lower() in {"exit", "quit"}:
            break

        selected = retrieve_relevant_chunks(chunks, question, embed_model)
        prompt = build_prompt(system_prompt, question, selected)
        raw_answer = ask_model(prompt, client)
        response = format_answer(raw_answer, selected)

        print(response)
        print()

    return 0


if __name__ == "__main__":
    sys.exit(main())