import requests from bs4 import BeautifulSoup from uuid import uuid4 from urllib.parse import urljoin, urlparse from collections import deque import tldextract from typing import List, Dict from app.config import qdrant_client, embedding_model, demo_chatbot_configs from qdrant_client.models import VectorParams, Distance from langchain_core.documents import Document from langchain_qdrant import QdrantVectorStore from langchain_text_splitters import RecursiveCharacterTextSplitter from langchain_huggingface import HuggingFaceEmbeddings from qdrant_client import QdrantClient from app.ingestion.models import ChatbotIngest import re def scrape_website( start_url: str, timeout: int = 10, max_pages: int = 150, ) -> List[Dict[str, str]]: """ Crawls and extracts cleaned text from all pages and subdomains under the same registered domain. Args: start_url: Entry URL (e.g. https://example.com) timeout: Request timeout in seconds max_pages: Hard cap to prevent crawl explosion Returns: List of dicts: [{ "url": str, "text": str }] """ print(f"Scraping website starting at {start_url}") def registered_domain(url: str) -> str: ext = tldextract.extract(url) return f"{ext.domain}.{ext.suffix}" base_domain = registered_domain(start_url) visited = set() queue = deque([start_url]) results: List[Dict[str, str]] = [] while queue and len(visited) < max_pages: url = queue.popleft() if url in visited: continue visited.add(url) try: response = requests.get(url, timeout=timeout) response.raise_for_status() except requests.RequestException: continue soup = BeautifulSoup(response.text, "lxml") # Remove non-content elements for elem in soup(["script", "style", "noscript", "header", "footer", "aside", "nav", "iframe"]): elem.decompose() # Semantic-aware content extraction content_tags = soup.find_all(["article", "main", "section"]) text_blocks = [] if content_tags: for tag in content_tags: for elem in tag.find_all(["p", "li", "span"]): txt = elem.get_text(strip=True) if len(txt) > 20: # skip short/noisy text text_blocks.append(txt) else: # fallback: headings + paragraphs for h in soup.find_all(["h1", "h2", "h3"]): section_text = [ p.get_text(strip=True) for p in h.find_all_next("p") if len(p.get_text(strip=True)) > 20 ] if section_text: text_blocks.append(h.get_text(strip=True) + "\n" + " ".join(section_text)) if not text_blocks: continue text = "\n".join(text_blocks) # Remove repeated/noisy boilerplate text = re.sub(r"(Out of stock|Add to cart|Select Title Default Title)", "", text, flags=re.I) text = re.sub(r"\n\s*\n", "\n", text) if text_blocks: results.append({ "url": url, "text": text }) # Discover internal links + subdomains for link in soup.find_all("a", href=True): next_url = urljoin(url, link["href"]) parsed = urlparse(next_url) if parsed.scheme not in ("http", "https"): continue if registered_domain(next_url) != base_domain: continue if next_url not in visited: queue.append(next_url) if not results: raise ValueError( f"""Website scraping failed for {start_url}. No readable content found.\n\n Possible reasons:\n 1) The URL is incorrect or unreachable.\n 2) The site requires login or JavaScript to display content.\n 3) The page contains only images/media without text.\n\n Please check the URL and try again. """) return results def chunk_and_embed(chatbot_id: str, pages: List[Dict[str, str]]): """ Converts scraped website pages into embedded chunks and stores them in a chabot-scoped Qdrant Collection """ if not pages: raise ValueError("No pages to chunk and embed") collection_name = f"chatbot_{chatbot_id}" if not qdrant_client.collection_exists(collection_name): qdrant_client.create_collection( collection_name=collection_name, vectors_config=VectorParams( size=768, distance=Distance.COSINE, ), ) # Convert pages → LangChain Documents documents: List[Document] = [ Document( page_content=page["text"], metadata={ "chatbot_id": chatbot_id, "source": "website", "url": page["url"], }, ) for page in pages if page.get("text") ] if not documents: raise ValueError("No valid documents extracted from pages") #Chunk splitter = RecursiveCharacterTextSplitter( chunk_size=512, chunk_overlap=64 ) chunks=splitter.split_documents(documents) #Embed + Store embeddings = HuggingFaceEmbeddings(model_name=embedding_model) vector_store = QdrantVectorStore( client=qdrant_client, collection_name=collection_name, embedding=embeddings, ) ids = [str(uuid4()) for _ in chunks] vector_store.add_documents(chunks, ids=ids) print(f"Stored {len(chunks)} chunks in Qdrant collection {collection_name}") def build_demo_prompt(ingest: ChatbotIngest) -> str: chatbot_name = ingest.chatbot_name company_name = ingest.company_name allowed_topics = ", ".join(ingest.chatbot_purpose) or "general questions" banned_topics = ingest.sensitive_topics or "sensitive topics" response_style = ", ".join(ingest.tone_style) if ingest.tone_style else "clear and concise" fallback_message = f"Sorry, I cannot answer that question. Please call or email for further assistance. Information can be found on the website" template = f""" You are {chatbot_name}, an assistant for {company_name}. Answer ONLY using the provided context from {company_name}'s approved content. STRICT RULES: 1. If the Contextual Knowledge section is empty, say: "{fallback_message}" 2. Do NOT use your own general knowledge. Only reference the Contextual Knowledge. 3. Only reference topics explicitly allowed: {allowed_topics}. 4. Do NOT discuss banned topics: {banned_topics}. 5. Keep responses {response_style}. 5. Keep the answers clear and concise in 1-3 sentences """ return template def build_welcome_message(ingest: ChatbotIngest) -> str: """ Build a flexible and user-friendly welcome message for the chatbot using its ingest config. """ # Determine chatbot name chatbot_name = ingest.chatbot_name or f"{ingest.company_name} Assistant" # Filter out "capture leads (email, phone)" from purposes purposes = [p for p in ingest.chatbot_purpose or [] if p.lower() != "capture leads (email, phone)"] # Start message intro_msg = f"Hello! 👋 I'm {chatbot_name}, your virtual assistant for {ingest.company_name}.\n" if purposes: intro_msg += "I can help you with:\n" + "\n".join(f"- {p}" for p in purposes) + "\n" # Add flexible closing intro_msg += ( "Just type your question below and I'll do my best to help!" ) return intro_msg def store_demo_rag_config(chatbot_id, company_id, ingest: ChatbotIngest) -> None: """ Stores the RAG configuration prompt for the demo chatbot in MongoDB. """ demo_rag_dict = { "submission_id": ingest.submission_id, "chatbot_id": chatbot_id, "company_id": company_id, "chatbot_name": ingest.chatbot_name, "company_name": ingest.company_name, "pricing_plan": ingest.pricing_plan, "prompt_template": build_demo_prompt(ingest), "welcome_message": build_welcome_message(ingest), "retrievers": [ { "name": "all", "collection": f"chatbot_{chatbot_id}", "top_k": 25, "filter_score": 0.7 } ] } result = demo_chatbot_configs.insert_one(demo_rag_dict) print(f"Inserted RAG config for {ingest.company_name}, _id={result.inserted_id}")