humanoid-robotics-rag / scripts /embed_book_content.py
GrowWithTalha's picture
Upload 62 files
a83c934 verified
#!/usr/bin/env python3
"""
Book content embedding script
Reads markdown files from docs/ (including all nested subdirectories), chunks content by headings or word count,
generates embeddings with OpenAI, and uploads to Qdrant vector database.
Usage:
python backend/scripts/embed_book_content.py --book-path docs/ --collection-name humanoid-robotics-book-v1
"""
import argparse
import asyncio
import os
import re
import sys
from pathlib import Path
from typing import List, Dict, Any
from uuid import uuid4
# Add parent directory to path for imports
sys.path.insert(0, str(Path(__file__).parent.parent))
from openai import AsyncOpenAI
from qdrant_client import AsyncQdrantClient
from qdrant_client.models import Distance, VectorParams, PointStruct
from src.config.settings import settings
from src.utils.logger import setup_logging, get_logger
setup_logging(level="INFO")
logger = get_logger(__name__)
class BookContentChunker:
"""Chunks markdown content intelligently by headings and word limits"""
def __init__(self, chunk_size: int = 500, overlap: int = 50):
"""
Initialize chunker
Args:
chunk_size: Target chunk size in words
overlap: Word overlap between chunks
"""
self.chunk_size = chunk_size
self.overlap = overlap
def chunk_markdown(self, content: str, file_path: str) -> List[Dict[str, Any]]:
"""
Chunk markdown content by headings and word limits
Args:
content: Markdown file content
file_path: Path to markdown file (for metadata)
Returns:
List of chunk dictionaries with content and metadata
"""
chunks = []
# Extract chapter/module name from file path
path_obj = Path(file_path)
chapter = self._extract_chapter_name(path_obj)
# Split by headings (## and ###)
sections = re.split(r'(^#{2,3}\s+.+$)', content, flags=re.MULTILINE)
current_section_heading = "Introduction"
current_content = []
for i, section in enumerate(sections):
# Check if this is a heading
heading_match = re.match(r'^(#{2,3})\s+(.+)$', section.strip())
if heading_match:
# Save previous section if it has content
if current_content:
section_chunks = self._chunk_section(
"\n".join(current_content),
chapter,
current_section_heading
)
chunks.extend(section_chunks)
# Start new section
current_section_heading = heading_match.group(2).strip()
current_content = []
else:
# Accumulate content
if section.strip():
current_content.append(section.strip())
# Process last section
if current_content:
section_chunks = self._chunk_section(
"\n".join(current_content),
chapter,
current_section_heading
)
chunks.extend(section_chunks)
return chunks
def _chunk_section(self, content: str, chapter: str, section: str) -> List[Dict[str, Any]]:
"""Chunk a section by word count with overlap"""
words = content.split()
chunks = []
if len(words) <= self.chunk_size:
# Section fits in one chunk
chunks.append({
"content": content,
"chapter": chapter,
"section": section,
"heading": section,
"chunk_index": 0,
"word_count": len(words),
})
else:
# Split into multiple chunks with overlap
chunk_index = 0
start = 0
while start < len(words):
end = start + self.chunk_size
chunk_words = words[start:end]
chunks.append({
"content": " ".join(chunk_words),
"chapter": chapter,
"section": section,
"heading": section,
"chunk_index": chunk_index,
"word_count": len(chunk_words),
})
chunk_index += 1
start = end - self.overlap # Overlap for context
return chunks
def _extract_chapter_name(self, path: Path) -> str:
"""Extract chapter/module name from file path"""
# Try to extract from directory or filename
parts = path.parts
# Look for patterns like "module1-ros2", "Module 1", etc.
for part in reversed(parts):
if re.match(r'module[-\s]*\d+', part, re.IGNORECASE):
return part.replace('-', ' ').title()
# Fallback to filename without extension
return path.stem.replace('-', ' ').replace('_', ' ').title()
class BookEmbedder:
"""Handles embedding generation and Qdrant upload"""
def __init__(self, collection_name: str = "book_content"):
"""
Initialize embedder
Args:
collection_name: Qdrant collection name
"""
self.collection_name = collection_name
self.openai_client = AsyncOpenAI(api_key=settings.openai_api_key)
self.qdrant_client = AsyncQdrantClient(
url=settings.qdrant_url,
api_key=settings.qdrant_api_key,
timeout=30, # Set a higher timeout (seconds)
)
async def create_collection(self):
"""Create Qdrant collection if it doesn't exist, with improved connection error handling"""
try:
collections = await self.qdrant_client.get_collections()
except Exception as e:
logger.error(
"\nCannot connect to Qdrant. "
f"Error: {type(e).__name__}: {e}\n"
"-> Please make sure your Qdrant server is running and accessible at the configured URL.\n"
f"-> Current Qdrant URL: {settings.qdrant_url}"
)
logger.error("Exiting due to Qdrant connection failure.")
import sys
sys.exit(1)
collection_names = [col.name for col in collections.collections]
if self.collection_name not in collection_names:
await self.qdrant_client.create_collection(
collection_name=self.collection_name,
vectors_config=VectorParams(
size=settings.vector_size,
distance=Distance.COSINE,
),
)
logger.info(f"Created collection: {self.collection_name}")
else:
logger.info(f"Collection already exists: {self.collection_name}")
async def embed_text(self, text: str) -> List[float]:
"""
Generate embedding for text using OpenAI
Args:
text: Text to embed
Returns:
Embedding vector
"""
response = await self.openai_client.embeddings.create(
model=settings.openai_embedding_model,
input=text
)
return response.data[0].embedding
async def upload_chunks(self, chunks: List[Dict[str, Any]], doc_version: str = "v1.0.0"):
"""
Upload chunks with embeddings to Qdrant
Args:
chunks: List of chunk dictionaries
doc_version: Document version identifier
"""
logger.info(f"Uploading {len(chunks)} chunks to Qdrant...")
points = []
for i, chunk in enumerate(chunks):
# Generate embedding
embedding = await self.embed_text(chunk["content"])
# Create point
point = PointStruct(
id=str(uuid4()),
vector=embedding,
payload={
"content": chunk["content"],
"chapter": chunk["chapter"],
"section": chunk["section"],
"heading": chunk["heading"],
"chunk_index": chunk["chunk_index"],
"word_count": chunk["word_count"],
"doc_version": doc_version,
}
)
points.append(point)
# Upload in batches of 100
if len(points) >= 100:
await self.qdrant_client.upsert(
collection_name=self.collection_name,
points=points
)
logger.info(f"Uploaded batch {i // 100 + 1} ({len(points)} points)")
points = []
# Upload remaining points
if points:
await self.qdrant_client.upsert(
collection_name=self.collection_name,
points=points
)
logger.info(f"Uploaded final batch ({len(points)} points)")
async def close(self):
"""Close connections"""
await self.qdrant_client.close()
def get_all_markdown_files_recursively(root_path: Path) -> List[Path]:
"""
Find all markdown files recursively (as deep as needed) in the given root_path.
This function will walk all subdirectories and return both *.md and *.mdx files.
Args:
root_path: Path to the root directory
Returns:
List[Path]: List of all markdown file Paths
"""
md_files = list(root_path.rglob("*.md"))
mdx_files = list(root_path.rglob("*.mdx"))
all_files = md_files + mdx_files
return [file for file in all_files if file.is_file() and 'node_modules' not in str(file)]
async def main():
"""Main embedding script"""
parser = argparse.ArgumentParser(description="Embed book content into Qdrant")
parser.add_argument(
"--book-path",
type=str,
required=True,
help="Path to book content directory (e.g., docs/)"
)
parser.add_argument(
"--collection-name",
type=str,
default="humanoid-robotics-book-v1",
help="Qdrant collection name"
)
parser.add_argument(
"--doc-version",
type=str,
default="v1.0.0",
help="Document version identifier"
)
args = parser.parse_args()
# Initialize components
chunker = BookContentChunker(chunk_size=500, overlap=50)
embedder = BookEmbedder(collection_name=args.collection_name)
try:
# Create collection, with robust error handling in the constructor
await embedder.create_collection()
# Find all markdown files as deep as needed
book_path = Path(args.book_path)
md_files = get_all_markdown_files_recursively(book_path)
logger.info(f"Found {len(md_files)} markdown files (.md and .mdx) recursively in all subdirectories")
# Process each file
all_chunks = []
for md_file in md_files:
logger.info(f"Processing: {md_file}")
with open(md_file, 'r', encoding='utf-8') as f:
content = f.read()
chunks = chunker.chunk_markdown(content, str(md_file))
all_chunks.extend(chunks)
logger.info(f" -> Generated {len(chunks)} chunks")
logger.info(f"Total chunks: {len(all_chunks)}")
# Upload to Qdrant
await embedder.upload_chunks(all_chunks, doc_version=args.doc_version)
logger.info("✅ Embedding complete!")
finally:
await embedder.close()
if __name__ == "__main__":
# Run main in asyncio loop, but trap connection errors globally as a last resort
try:
asyncio.run(main())
except Exception as e:
logger.error(f"FATAL: Exception occurred: {type(e).__name__}: {e}")
logger.error("Please check if Qdrant is running, accessible, and credentials are set correctly.")
import sys
sys.exit(1)