vip11017's picture
added pricing_plan into demo rag
f7867c7
import requests
from bs4 import BeautifulSoup
from uuid import uuid4
from urllib.parse import urljoin, urlparse
from collections import deque
import tldextract
from typing import List, Dict
from app.config import qdrant_client, embedding_model, demo_chatbot_configs
from qdrant_client.models import VectorParams, Distance
from langchain_core.documents import Document
from langchain_qdrant import QdrantVectorStore
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_huggingface import HuggingFaceEmbeddings
from qdrant_client import QdrantClient
from app.ingestion.models import ChatbotIngest
import re
def scrape_website(
start_url: str,
timeout: int = 10,
max_pages: int = 150,
) -> List[Dict[str, str]]:
"""
Crawls and extracts cleaned text from all pages and subdomains
under the same registered domain.
Args:
start_url: Entry URL (e.g. https://example.com)
timeout: Request timeout in seconds
max_pages: Hard cap to prevent crawl explosion
Returns:
List of dicts: [{ "url": str, "text": str }]
"""
print(f"Scraping website starting at {start_url}")
def registered_domain(url: str) -> str:
ext = tldextract.extract(url)
return f"{ext.domain}.{ext.suffix}"
base_domain = registered_domain(start_url)
visited = set()
queue = deque([start_url])
results: List[Dict[str, str]] = []
while queue and len(visited) < max_pages:
url = queue.popleft()
if url in visited:
continue
visited.add(url)
try:
response = requests.get(url, timeout=timeout)
response.raise_for_status()
except requests.RequestException:
continue
soup = BeautifulSoup(response.text, "lxml")
# Remove non-content elements
for elem in soup(["script", "style", "noscript", "header", "footer", "aside", "nav", "iframe"]):
elem.decompose()
# Semantic-aware content extraction
content_tags = soup.find_all(["article", "main", "section"])
text_blocks = []
if content_tags:
for tag in content_tags:
for elem in tag.find_all(["p", "li", "span"]):
txt = elem.get_text(strip=True)
if len(txt) > 20: # skip short/noisy text
text_blocks.append(txt)
else:
# fallback: headings + paragraphs
for h in soup.find_all(["h1", "h2", "h3"]):
section_text = [
p.get_text(strip=True) for p in h.find_all_next("p") if len(p.get_text(strip=True)) > 20
]
if section_text:
text_blocks.append(h.get_text(strip=True) + "\n" + " ".join(section_text))
if not text_blocks:
continue
text = "\n".join(text_blocks)
# Remove repeated/noisy boilerplate
text = re.sub(r"(Out of stock|Add to cart|Select Title Default Title)", "", text, flags=re.I)
text = re.sub(r"\n\s*\n", "\n", text)
if text_blocks:
results.append({
"url": url,
"text": text
})
# Discover internal links + subdomains
for link in soup.find_all("a", href=True):
next_url = urljoin(url, link["href"])
parsed = urlparse(next_url)
if parsed.scheme not in ("http", "https"):
continue
if registered_domain(next_url) != base_domain:
continue
if next_url not in visited:
queue.append(next_url)
if not results:
raise ValueError(
f"""Website scraping failed for {start_url}. No readable content found.\n\n
Possible reasons:\n
1) The URL is incorrect or unreachable.\n
2) The site requires login or JavaScript to display content.\n
3) The page contains only images/media without text.\n\n
Please check the URL and try again.
""")
return results
def chunk_and_embed(chatbot_id: str, pages: List[Dict[str, str]]):
"""
Converts scraped website pages into embedded chunks and stores them in a chabot-scoped Qdrant Collection
"""
if not pages:
raise ValueError("No pages to chunk and embed")
collection_name = f"chatbot_{chatbot_id}"
if not qdrant_client.collection_exists(collection_name):
qdrant_client.create_collection(
collection_name=collection_name,
vectors_config=VectorParams(
size=768,
distance=Distance.COSINE,
),
)
# Convert pages → LangChain Documents
documents: List[Document] = [
Document(
page_content=page["text"],
metadata={
"chatbot_id": chatbot_id,
"source": "website",
"url": page["url"],
},
)
for page in pages
if page.get("text")
]
if not documents:
raise ValueError("No valid documents extracted from pages")
#Chunk
splitter = RecursiveCharacterTextSplitter(
chunk_size=512, chunk_overlap=64
)
chunks=splitter.split_documents(documents)
#Embed + Store
embeddings = HuggingFaceEmbeddings(model_name=embedding_model)
vector_store = QdrantVectorStore(
client=qdrant_client,
collection_name=collection_name,
embedding=embeddings,
)
ids = [str(uuid4()) for _ in chunks]
vector_store.add_documents(chunks, ids=ids)
print(f"Stored {len(chunks)} chunks in Qdrant collection {collection_name}")
def build_demo_prompt(ingest: ChatbotIngest) -> str:
chatbot_name = ingest.chatbot_name
company_name = ingest.company_name
allowed_topics = ", ".join(ingest.chatbot_purpose) or "general questions"
banned_topics = ingest.sensitive_topics or "sensitive topics"
response_style = ", ".join(ingest.tone_style) if ingest.tone_style else "clear and concise"
fallback_message = f"Sorry, I cannot answer that question. Please call or email for further assistance. Information can be found on the website"
template = f"""
You are {chatbot_name}, an assistant for {company_name}.
Answer ONLY using the provided context from {company_name}'s approved content.
STRICT RULES:
1. If the Contextual Knowledge section is empty, say: "{fallback_message}"
2. Do NOT use your own general knowledge. Only reference the Contextual Knowledge.
3. Only reference topics explicitly allowed: {allowed_topics}.
4. Do NOT discuss banned topics: {banned_topics}.
5. Keep responses {response_style}.
5. Keep the answers clear and concise in 1-3 sentences
"""
return template
def build_welcome_message(ingest: ChatbotIngest) -> str:
"""
Build a flexible and user-friendly welcome message for the chatbot using its ingest config.
"""
# Determine chatbot name
chatbot_name = ingest.chatbot_name or f"{ingest.company_name} Assistant"
# Filter out "capture leads (email, phone)" from purposes
purposes = [p for p in ingest.chatbot_purpose or [] if p.lower() != "capture leads (email, phone)"]
# Start message
intro_msg = f"Hello! 👋 I'm {chatbot_name}, your virtual assistant for {ingest.company_name}.\n"
if purposes:
intro_msg += "I can help you with:\n" + "\n".join(f"- {p}" for p in purposes) + "\n"
# Add flexible closing
intro_msg += (
"Just type your question below and I'll do my best to help!"
)
return intro_msg
def store_demo_rag_config(chatbot_id, company_id, ingest: ChatbotIngest) -> None:
"""
Stores the RAG configuration prompt for the demo chatbot in MongoDB.
"""
demo_rag_dict = {
"submission_id": ingest.submission_id,
"chatbot_id": chatbot_id,
"company_id": company_id,
"chatbot_name": ingest.chatbot_name,
"company_name": ingest.company_name,
"pricing_plan": ingest.pricing_plan,
"prompt_template": build_demo_prompt(ingest),
"welcome_message": build_welcome_message(ingest),
"retrievers": [
{
"name": "all",
"collection": f"chatbot_{chatbot_id}",
"top_k": 25,
"filter_score": 0.7
}
]
}
result = demo_chatbot_configs.insert_one(demo_rag_dict)
print(f"Inserted RAG config for {ingest.company_name}, _id={result.inserted_id}")