Spaces:
Sleeping
Sleeping
| import requests | |
| from bs4 import BeautifulSoup | |
| from uuid import uuid4 | |
| from urllib.parse import urljoin, urlparse | |
| from collections import deque | |
| import tldextract | |
| from typing import List, Dict | |
| from app.config import qdrant_client, embedding_model, demo_chatbot_configs | |
| from qdrant_client.models import VectorParams, Distance | |
| from langchain_core.documents import Document | |
| from langchain_qdrant import QdrantVectorStore | |
| from langchain_text_splitters import RecursiveCharacterTextSplitter | |
| from langchain_huggingface import HuggingFaceEmbeddings | |
| from qdrant_client import QdrantClient | |
| from app.ingestion.models import ChatbotIngest | |
| import re | |
| def scrape_website( | |
| start_url: str, | |
| timeout: int = 10, | |
| max_pages: int = 150, | |
| ) -> List[Dict[str, str]]: | |
| """ | |
| Crawls and extracts cleaned text from all pages and subdomains | |
| under the same registered domain. | |
| Args: | |
| start_url: Entry URL (e.g. https://example.com) | |
| timeout: Request timeout in seconds | |
| max_pages: Hard cap to prevent crawl explosion | |
| Returns: | |
| List of dicts: [{ "url": str, "text": str }] | |
| """ | |
| print(f"Scraping website starting at {start_url}") | |
| def registered_domain(url: str) -> str: | |
| ext = tldextract.extract(url) | |
| return f"{ext.domain}.{ext.suffix}" | |
| base_domain = registered_domain(start_url) | |
| visited = set() | |
| queue = deque([start_url]) | |
| results: List[Dict[str, str]] = [] | |
| while queue and len(visited) < max_pages: | |
| url = queue.popleft() | |
| if url in visited: | |
| continue | |
| visited.add(url) | |
| try: | |
| response = requests.get(url, timeout=timeout) | |
| response.raise_for_status() | |
| except requests.RequestException: | |
| continue | |
| soup = BeautifulSoup(response.text, "lxml") | |
| # Remove non-content elements | |
| for elem in soup(["script", "style", "noscript", "header", "footer", "aside", "nav", "iframe"]): | |
| elem.decompose() | |
| # Semantic-aware content extraction | |
| content_tags = soup.find_all(["article", "main", "section"]) | |
| text_blocks = [] | |
| if content_tags: | |
| for tag in content_tags: | |
| for elem in tag.find_all(["p", "li", "span"]): | |
| txt = elem.get_text(strip=True) | |
| if len(txt) > 20: # skip short/noisy text | |
| text_blocks.append(txt) | |
| else: | |
| # fallback: headings + paragraphs | |
| for h in soup.find_all(["h1", "h2", "h3"]): | |
| section_text = [ | |
| p.get_text(strip=True) for p in h.find_all_next("p") if len(p.get_text(strip=True)) > 20 | |
| ] | |
| if section_text: | |
| text_blocks.append(h.get_text(strip=True) + "\n" + " ".join(section_text)) | |
| if not text_blocks: | |
| continue | |
| text = "\n".join(text_blocks) | |
| # Remove repeated/noisy boilerplate | |
| text = re.sub(r"(Out of stock|Add to cart|Select Title Default Title)", "", text, flags=re.I) | |
| text = re.sub(r"\n\s*\n", "\n", text) | |
| if text_blocks: | |
| results.append({ | |
| "url": url, | |
| "text": text | |
| }) | |
| # Discover internal links + subdomains | |
| for link in soup.find_all("a", href=True): | |
| next_url = urljoin(url, link["href"]) | |
| parsed = urlparse(next_url) | |
| if parsed.scheme not in ("http", "https"): | |
| continue | |
| if registered_domain(next_url) != base_domain: | |
| continue | |
| if next_url not in visited: | |
| queue.append(next_url) | |
| if not results: | |
| raise ValueError( | |
| f"""Website scraping failed for {start_url}. No readable content found.\n\n | |
| Possible reasons:\n | |
| 1) The URL is incorrect or unreachable.\n | |
| 2) The site requires login or JavaScript to display content.\n | |
| 3) The page contains only images/media without text.\n\n | |
| Please check the URL and try again. | |
| """) | |
| return results | |
| def chunk_and_embed(chatbot_id: str, pages: List[Dict[str, str]]): | |
| """ | |
| Converts scraped website pages into embedded chunks and stores them in a chabot-scoped Qdrant Collection | |
| """ | |
| if not pages: | |
| raise ValueError("No pages to chunk and embed") | |
| collection_name = f"chatbot_{chatbot_id}" | |
| if not qdrant_client.collection_exists(collection_name): | |
| qdrant_client.create_collection( | |
| collection_name=collection_name, | |
| vectors_config=VectorParams( | |
| size=768, | |
| distance=Distance.COSINE, | |
| ), | |
| ) | |
| # Convert pages → LangChain Documents | |
| documents: List[Document] = [ | |
| Document( | |
| page_content=page["text"], | |
| metadata={ | |
| "chatbot_id": chatbot_id, | |
| "source": "website", | |
| "url": page["url"], | |
| }, | |
| ) | |
| for page in pages | |
| if page.get("text") | |
| ] | |
| if not documents: | |
| raise ValueError("No valid documents extracted from pages") | |
| #Chunk | |
| splitter = RecursiveCharacterTextSplitter( | |
| chunk_size=512, chunk_overlap=64 | |
| ) | |
| chunks=splitter.split_documents(documents) | |
| #Embed + Store | |
| embeddings = HuggingFaceEmbeddings(model_name=embedding_model) | |
| vector_store = QdrantVectorStore( | |
| client=qdrant_client, | |
| collection_name=collection_name, | |
| embedding=embeddings, | |
| ) | |
| ids = [str(uuid4()) for _ in chunks] | |
| vector_store.add_documents(chunks, ids=ids) | |
| print(f"Stored {len(chunks)} chunks in Qdrant collection {collection_name}") | |
| def build_demo_prompt(ingest: ChatbotIngest) -> str: | |
| chatbot_name = ingest.chatbot_name | |
| company_name = ingest.company_name | |
| allowed_topics = ", ".join(ingest.chatbot_purpose) or "general questions" | |
| banned_topics = ingest.sensitive_topics or "sensitive topics" | |
| response_style = ", ".join(ingest.tone_style) if ingest.tone_style else "clear and concise" | |
| fallback_message = f"Sorry, I cannot answer that question. Please call or email for further assistance. Information can be found on the website" | |
| template = f""" | |
| You are {chatbot_name}, an assistant for {company_name}. | |
| Answer ONLY using the provided context from {company_name}'s approved content. | |
| STRICT RULES: | |
| 1. If the Contextual Knowledge section is empty, say: "{fallback_message}" | |
| 2. Do NOT use your own general knowledge. Only reference the Contextual Knowledge. | |
| 3. Only reference topics explicitly allowed: {allowed_topics}. | |
| 4. Do NOT discuss banned topics: {banned_topics}. | |
| 5. Keep responses {response_style}. | |
| 5. Keep the answers clear and concise in 1-3 sentences | |
| """ | |
| return template | |
| def build_welcome_message(ingest: ChatbotIngest) -> str: | |
| """ | |
| Build a flexible and user-friendly welcome message for the chatbot using its ingest config. | |
| """ | |
| # Determine chatbot name | |
| chatbot_name = ingest.chatbot_name or f"{ingest.company_name} Assistant" | |
| # Filter out "capture leads (email, phone)" from purposes | |
| purposes = [p for p in ingest.chatbot_purpose or [] if p.lower() != "capture leads (email, phone)"] | |
| # Start message | |
| intro_msg = f"Hello! 👋 I'm {chatbot_name}, your virtual assistant for {ingest.company_name}.\n" | |
| if purposes: | |
| intro_msg += "I can help you with:\n" + "\n".join(f"- {p}" for p in purposes) + "\n" | |
| # Add flexible closing | |
| intro_msg += ( | |
| "Just type your question below and I'll do my best to help!" | |
| ) | |
| return intro_msg | |
| def store_demo_rag_config(chatbot_id, company_id, ingest: ChatbotIngest) -> None: | |
| """ | |
| Stores the RAG configuration prompt for the demo chatbot in MongoDB. | |
| """ | |
| demo_rag_dict = { | |
| "submission_id": ingest.submission_id, | |
| "chatbot_id": chatbot_id, | |
| "company_id": company_id, | |
| "chatbot_name": ingest.chatbot_name, | |
| "company_name": ingest.company_name, | |
| "pricing_plan": ingest.pricing_plan, | |
| "prompt_template": build_demo_prompt(ingest), | |
| "welcome_message": build_welcome_message(ingest), | |
| "retrievers": [ | |
| { | |
| "name": "all", | |
| "collection": f"chatbot_{chatbot_id}", | |
| "top_k": 25, | |
| "filter_score": 0.7 | |
| } | |
| ] | |
| } | |
| result = demo_chatbot_configs.insert_one(demo_rag_dict) | |
| print(f"Inserted RAG config for {ingest.company_name}, _id={result.inserted_id}") |