backend-deploy / main.py
sanilahmed2019's picture
Initial FastAPI deploy
d4f1687
"""
Book Content Ingestion System
Extracts content from Docusaurus-based book websites, chunks and embeds it using Cohere,
and stores embeddings in Qdrant Cloud for RAG applications.
"""
import os
import re
import requests
from bs4 import BeautifulSoup
from urllib.parse import urljoin, urlparse
import cohere
from qdrant_client import QdrantClient
from qdrant_client.http import models
from typing import List, Dict, Any
import time
import logging
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def get_all_urls(base_url: str) -> List[str]:
"""
Collect and validate all URLs from a Docusaurus-based book website.
Args:
base_url: The base URL of the Docusaurus book
Returns:
List of valid URLs found on the site
"""
urls = set()
visited = set()
def extract_urls_from_page(url: str) -> List[str]:
try:
response = requests.get(url)
response.raise_for_status()
soup = BeautifulSoup(response.content, 'html.parser')
found_urls = []
# Find all links on the page
for link in soup.find_all('a', href=True):
href = link['href']
full_url = urljoin(url, href)
# Only include URLs from the same domain
if urlparse(full_url).netloc == urlparse(base_url).netloc:
found_urls.append(full_url)
return found_urls
except Exception as e:
logger.error(f"Error extracting URLs from {url}: {e}")
return []
# Start with the base URL
to_visit = [base_url]
while to_visit:
current_url = to_visit.pop(0)
if current_url in visited:
continue
visited.add(current_url)
urls.add(current_url)
# Extract additional URLs from the current page
new_urls = extract_urls_from_page(current_url)
# Filter for new URLs that haven't been visited
for url in new_urls:
if url not in visited and len(urls) < 100: # Limit to prevent infinite crawling
to_visit.append(url)
return list(urls)
def extract_text_from_url(url: str) -> Dict[str, Any]:
"""
Extract and clean textual content from a single URL, optimized for Docusaurus sites.
Args:
url: The URL to extract content from
Returns:
Dictionary containing the extracted text and metadata
"""
try:
response = requests.get(url, timeout=10)
response.raise_for_status()
soup = BeautifulSoup(response.content, 'html.parser')
# Remove unwanted elements
for element in soup(['nav', 'footer', 'aside', 'header', 'script', 'style']):
element.decompose()
# Try to find main content area (Docusaurus specific selectors)
content_selectors = [
'main', # Most common for Docusaurus
'.main-wrapper', # Common Docusaurus wrapper
'.container', # General container
'.doc-content', # Docusaurus documentation content
'.theme-doc-markdown', # Docusaurus theme content
'.markdown', # Markdown content
'article', # Article tag as fallback
]
content_element = None
for selector in content_selectors:
content_element = soup.select_one(selector)
if content_element:
break
# If no specific content area found, try to get content differently
if not content_element:
# Try to find content by common Docusaurus classes
content_selectors_alt = [
'[class*="docItem"]',
'[class*="doc-content"]',
'[class*="content"]',
'[role="main"]',
'.content',
]
for selector in content_selectors_alt:
content_element = soup.select_one(selector)
if content_element:
break
if content_element:
# Extract text and clean it
text = content_element.get_text(separator=' ')
# Clean up the text
lines = (line.strip() for line in text.splitlines())
chunks = (phrase.strip() for line in lines for phrase in line.split(" "))
text = ' '.join(chunk for chunk in chunks if chunk)
# Get page title
title = soup.find('title')
title = title.get_text().strip() if title else urlparse(url).path.split('/')[-1] or 'Untitled'
# Check if the extracted text is meaningful
if len(text) < 200:
logger.warning(f"Insufficient content extracted from {url} (only {len(text)} characters)")
return {
'url': url,
'title': title,
'text': '',
'length': 0
}
return {
'url': url,
'title': title,
'text': text,
'length': len(text)
}
else:
logger.warning(f"No content area found at {url}")
return {
'url': url,
'title': urlparse(url).path.split('/')[-1] or 'Untitled',
'text': '',
'length': 0
}
except requests.exceptions.RequestException as e:
logger.error(f"Network error extracting text from {url}: {e}")
return {
'url': url,
'title': urlparse(url).path.split('/')[-1] or 'Error',
'text': '',
'length': 0
}
except Exception as e:
logger.error(f"Error extracting text from {url}: {e}")
return {
'url': url,
'title': urlparse(url).path.split('/')[-1] or 'Error',
'text': '',
'length': 0
}
def chunk_text(text: str, chunk_size: int = 1000) -> List[str]:
"""
Chunk text into fixed-size segments suitable for embedding generation (~1000 tokens).
Args:
text: The text to chunk
chunk_size: The maximum size of each chunk (in characters, approximating tokens)
Returns:
List of text chunks
"""
if not text:
return []
chunks = []
start = 0
while start < len(text):
# Find the best split point (try to break at sentence or word boundary)
end = start + chunk_size
if end >= len(text):
# Last chunk
chunk = text[start:].strip()
if chunk: # Only add non-empty chunks
chunks.append(chunk)
break
# Try to find a good breaking point
# First, try to break at sentence boundary
sentence_break = text.rfind('.', start, end)
if sentence_break > start and sentence_break < end:
chunks.append(text[start:sentence_break + 1].strip())
start = sentence_break + 1
continue
# Next, try to break at paragraph boundary
para_break = text.rfind('\n\n', start, end)
if para_break > start and para_break < end:
chunks.append(text[start:para_break].strip())
start = para_break + 2 # Skip the \n\n
continue
# Otherwise, try to break at word boundary
word_break = text.rfind(' ', start, end)
if word_break > start and word_break < end:
chunks.append(text[start:word_break].strip())
start = word_break + 1
continue
# If no good breaking point found, just break at the limit
chunks.append(text[start:end].strip())
start = end
# Filter out empty chunks
chunks = [chunk for chunk in chunks if chunk]
return chunks
def embed(texts: List[str]) -> List[List[float]]:
"""
Generate embeddings using Cohere with retry logic.
Args:
texts: List of text chunks to embed
Returns:
List of embeddings (each embedding is a list of floats)
"""
import time
import random
cohere_api_key = os.getenv('COHERE_API_KEY')
if not cohere_api_key:
raise ValueError("COHERE_API_KEY environment variable not set")
co = cohere.Client(cohere_api_key)
# Retry up to 3 times with exponential backoff
for attempt in range(3):
try:
response = co.embed(
texts=texts,
model='embed-english-v3.0', # Using a valid Cohere embedding model
input_type='search_document' # Specify the input type
)
return response.embeddings
except Exception as e:
if attempt == 2: # Last attempt
logger.error(f"Error generating embeddings after 3 attempts: {e}")
raise
else:
wait_time = (2 ** attempt) + random.uniform(0, 1) # Exponential backoff
logger.warning(f"Embedding attempt {attempt + 1} failed: {e}. Retrying in {wait_time:.2f}s...")
time.sleep(wait_time)
def create_collection(client: QdrantClient, collection_name: str = "rag_embedding"):
"""
Create a collection in Qdrant for storing embeddings.
Args:
client: Qdrant client instance
collection_name: Name of the collection to create
"""
try:
# Check if collection already exists
collections = client.get_collections()
existing_collections = [c.name for c in collections.collections]
if collection_name in existing_collections:
logger.info(f"Collection '{collection_name}' already exists")
return
# Create the collection
client.create_collection(
collection_name=collection_name,
vectors_config=models.VectorParams(
size=1024, # Cohere embeddings are typically 1024 dimensions
distance=models.Distance.COSINE
)
)
logger.info(f"Collection '{collection_name}' created successfully")
except Exception as e:
logger.error(f"Error creating collection: {e}")
raise
def save_chunk_to_qdrant(client: QdrantClient, collection_name: str, chunk: str,
embedding: List[float], metadata: Dict[str, Any]):
"""
Save a text chunk with its embedding to Qdrant.
Args:
client: Qdrant client instance
collection_name: Name of the collection to save to
chunk: The text chunk
embedding: The embedding vector
metadata: Additional metadata to store with the chunk
"""
try:
# Generate a unique ID for this chunk
import hashlib
chunk_id = hashlib.md5(f"{metadata['url']}_{chunk[:50]}".encode()).hexdigest()
# Upsert the point to Qdrant
client.upsert(
collection_name=collection_name,
points=[
models.PointStruct(
id=chunk_id,
vector=embedding,
payload={
"text": chunk,
"url": metadata.get('url', ''),
"title": metadata.get('title', ''),
"source": metadata.get('source', 'unknown'),
"timestamp": time.time()
}
)
]
)
logger.info(f"Saved chunk to Qdrant: {chunk_id[:8]}...")
except Exception as e:
logger.error(f"Error saving chunk to Qdrant: {e}")
raise
def main():
"""
Main function to execute the complete book content ingestion pipeline.
"""
# Load environment variables from .env file
from dotenv import load_dotenv
load_dotenv()
# Get environment variables
qdrant_url = os.getenv('QDRANT_URL')
qdrant_api_key = os.getenv('QDRANT_API_KEY')
cohere_api_key = os.getenv('COHERE_API_KEY')
if not all([qdrant_url, qdrant_api_key, cohere_api_key]):
raise ValueError("Missing required environment variables: QDRANT_URL, QDRANT_API_KEY, COHERE_API_KEY")
# Target book URL
book_url = "https://sanilahmed.github.io/hackathon-ai-book/"
logger.info(f"Starting ingestion pipeline for: {book_url}")
try:
# Initialize clients
qdrant_client = QdrantClient(
url=qdrant_url,
api_key=qdrant_api_key,
timeout=30 # 30 second timeout for requests
)
# Step 1: Get all URLs from the book
logger.info("Step 1: Collecting all URLs from the book...")
urls = get_all_urls(book_url)
logger.info(f"Found {len(urls)} URLs")
if not urls:
logger.warning("No URLs found, exiting.")
return
# Step 2: Create Qdrant collection
logger.info("Step 2: Creating Qdrant collection...")
create_collection(qdrant_client, "rag_embedding")
# Step 3: Process each URL
logger.info("Step 3: Processing URLs...")
processed_count = 0
for i, url in enumerate(urls):
logger.info(f"Processing URL {i+1}/{len(urls)}: {url}")
# Extract text from URL
content_data = extract_text_from_url(url)
if not content_data['text']:
logger.warning(f"No content extracted from {url}, skipping...")
continue
# Chunk the text into ~1000 token segments (approx 1000 chars)
chunks = chunk_text(content_data['text'], chunk_size=1000)
if not chunks:
logger.warning(f"No chunks created from {url}, skipping...")
continue
# Process each chunk
for j, chunk in enumerate(chunks):
try:
# Embed the chunk
embeddings = embed([chunk])
embedding = embeddings[0] # Get the first (and only) embedding
# Prepare metadata
metadata = {
'url': url,
'title': content_data['title'],
'source': 'book_ingestion',
'chunk_index': j,
'total_chunks': len(chunks)
}
# Save to Qdrant
save_chunk_to_qdrant(
qdrant_client,
"rag_embedding",
chunk,
embedding,
metadata
)
processed_count += 1
# Add a small delay to respect rate limits
time.sleep(0.1)
except Exception as e:
logger.error(f"Error processing chunk {j} from {url}: {e}")
continue
logger.info(f"Ingestion pipeline completed! Processed {processed_count} chunks from {len(urls)} URLs.")
except Exception as e:
logger.error(f"Error in main pipeline: {e}")
raise
if __name__ == "__main__":
main()