Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| """ | |
| Book content embedding script | |
| Reads markdown files from docs/ (including all nested subdirectories), chunks content by headings or word count, | |
| generates embeddings with OpenAI, and uploads to Qdrant vector database. | |
| Usage: | |
| python backend/scripts/embed_book_content.py --book-path docs/ --collection-name humanoid-robotics-book-v1 | |
| """ | |
| import argparse | |
| import asyncio | |
| import os | |
| import re | |
| import sys | |
| from pathlib import Path | |
| from typing import List, Dict, Any | |
| from uuid import uuid4 | |
| # Add parent directory to path for imports | |
| sys.path.insert(0, str(Path(__file__).parent.parent)) | |
| from openai import AsyncOpenAI | |
| from qdrant_client import AsyncQdrantClient | |
| from qdrant_client.models import Distance, VectorParams, PointStruct | |
| from src.config.settings import settings | |
| from src.utils.logger import setup_logging, get_logger | |
| setup_logging(level="INFO") | |
| logger = get_logger(__name__) | |
| class BookContentChunker: | |
| """Chunks markdown content intelligently by headings and word limits""" | |
| def __init__(self, chunk_size: int = 500, overlap: int = 50): | |
| """ | |
| Initialize chunker | |
| Args: | |
| chunk_size: Target chunk size in words | |
| overlap: Word overlap between chunks | |
| """ | |
| self.chunk_size = chunk_size | |
| self.overlap = overlap | |
| def chunk_markdown(self, content: str, file_path: str) -> List[Dict[str, Any]]: | |
| """ | |
| Chunk markdown content by headings and word limits | |
| Args: | |
| content: Markdown file content | |
| file_path: Path to markdown file (for metadata) | |
| Returns: | |
| List of chunk dictionaries with content and metadata | |
| """ | |
| chunks = [] | |
| # Extract chapter/module name from file path | |
| path_obj = Path(file_path) | |
| chapter = self._extract_chapter_name(path_obj) | |
| # Split by headings (## and ###) | |
| sections = re.split(r'(^#{2,3}\s+.+$)', content, flags=re.MULTILINE) | |
| current_section_heading = "Introduction" | |
| current_content = [] | |
| for i, section in enumerate(sections): | |
| # Check if this is a heading | |
| heading_match = re.match(r'^(#{2,3})\s+(.+)$', section.strip()) | |
| if heading_match: | |
| # Save previous section if it has content | |
| if current_content: | |
| section_chunks = self._chunk_section( | |
| "\n".join(current_content), | |
| chapter, | |
| current_section_heading | |
| ) | |
| chunks.extend(section_chunks) | |
| # Start new section | |
| current_section_heading = heading_match.group(2).strip() | |
| current_content = [] | |
| else: | |
| # Accumulate content | |
| if section.strip(): | |
| current_content.append(section.strip()) | |
| # Process last section | |
| if current_content: | |
| section_chunks = self._chunk_section( | |
| "\n".join(current_content), | |
| chapter, | |
| current_section_heading | |
| ) | |
| chunks.extend(section_chunks) | |
| return chunks | |
| def _chunk_section(self, content: str, chapter: str, section: str) -> List[Dict[str, Any]]: | |
| """Chunk a section by word count with overlap""" | |
| words = content.split() | |
| chunks = [] | |
| if len(words) <= self.chunk_size: | |
| # Section fits in one chunk | |
| chunks.append({ | |
| "content": content, | |
| "chapter": chapter, | |
| "section": section, | |
| "heading": section, | |
| "chunk_index": 0, | |
| "word_count": len(words), | |
| }) | |
| else: | |
| # Split into multiple chunks with overlap | |
| chunk_index = 0 | |
| start = 0 | |
| while start < len(words): | |
| end = start + self.chunk_size | |
| chunk_words = words[start:end] | |
| chunks.append({ | |
| "content": " ".join(chunk_words), | |
| "chapter": chapter, | |
| "section": section, | |
| "heading": section, | |
| "chunk_index": chunk_index, | |
| "word_count": len(chunk_words), | |
| }) | |
| chunk_index += 1 | |
| start = end - self.overlap # Overlap for context | |
| return chunks | |
| def _extract_chapter_name(self, path: Path) -> str: | |
| """Extract chapter/module name from file path""" | |
| # Try to extract from directory or filename | |
| parts = path.parts | |
| # Look for patterns like "module1-ros2", "Module 1", etc. | |
| for part in reversed(parts): | |
| if re.match(r'module[-\s]*\d+', part, re.IGNORECASE): | |
| return part.replace('-', ' ').title() | |
| # Fallback to filename without extension | |
| return path.stem.replace('-', ' ').replace('_', ' ').title() | |
| class BookEmbedder: | |
| """Handles embedding generation and Qdrant upload""" | |
| def __init__(self, collection_name: str = "book_content"): | |
| """ | |
| Initialize embedder | |
| Args: | |
| collection_name: Qdrant collection name | |
| """ | |
| self.collection_name = collection_name | |
| self.openai_client = AsyncOpenAI(api_key=settings.openai_api_key) | |
| self.qdrant_client = AsyncQdrantClient( | |
| url=settings.qdrant_url, | |
| api_key=settings.qdrant_api_key, | |
| timeout=30, # Set a higher timeout (seconds) | |
| ) | |
| async def create_collection(self): | |
| """Create Qdrant collection if it doesn't exist, with improved connection error handling""" | |
| try: | |
| collections = await self.qdrant_client.get_collections() | |
| except Exception as e: | |
| logger.error( | |
| "\nCannot connect to Qdrant. " | |
| f"Error: {type(e).__name__}: {e}\n" | |
| "-> Please make sure your Qdrant server is running and accessible at the configured URL.\n" | |
| f"-> Current Qdrant URL: {settings.qdrant_url}" | |
| ) | |
| logger.error("Exiting due to Qdrant connection failure.") | |
| import sys | |
| sys.exit(1) | |
| collection_names = [col.name for col in collections.collections] | |
| if self.collection_name not in collection_names: | |
| await self.qdrant_client.create_collection( | |
| collection_name=self.collection_name, | |
| vectors_config=VectorParams( | |
| size=settings.vector_size, | |
| distance=Distance.COSINE, | |
| ), | |
| ) | |
| logger.info(f"Created collection: {self.collection_name}") | |
| else: | |
| logger.info(f"Collection already exists: {self.collection_name}") | |
| async def embed_text(self, text: str) -> List[float]: | |
| """ | |
| Generate embedding for text using OpenAI | |
| Args: | |
| text: Text to embed | |
| Returns: | |
| Embedding vector | |
| """ | |
| response = await self.openai_client.embeddings.create( | |
| model=settings.openai_embedding_model, | |
| input=text | |
| ) | |
| return response.data[0].embedding | |
| async def upload_chunks(self, chunks: List[Dict[str, Any]], doc_version: str = "v1.0.0"): | |
| """ | |
| Upload chunks with embeddings to Qdrant | |
| Args: | |
| chunks: List of chunk dictionaries | |
| doc_version: Document version identifier | |
| """ | |
| logger.info(f"Uploading {len(chunks)} chunks to Qdrant...") | |
| points = [] | |
| for i, chunk in enumerate(chunks): | |
| # Generate embedding | |
| embedding = await self.embed_text(chunk["content"]) | |
| # Create point | |
| point = PointStruct( | |
| id=str(uuid4()), | |
| vector=embedding, | |
| payload={ | |
| "content": chunk["content"], | |
| "chapter": chunk["chapter"], | |
| "section": chunk["section"], | |
| "heading": chunk["heading"], | |
| "chunk_index": chunk["chunk_index"], | |
| "word_count": chunk["word_count"], | |
| "doc_version": doc_version, | |
| } | |
| ) | |
| points.append(point) | |
| # Upload in batches of 100 | |
| if len(points) >= 100: | |
| await self.qdrant_client.upsert( | |
| collection_name=self.collection_name, | |
| points=points | |
| ) | |
| logger.info(f"Uploaded batch {i // 100 + 1} ({len(points)} points)") | |
| points = [] | |
| # Upload remaining points | |
| if points: | |
| await self.qdrant_client.upsert( | |
| collection_name=self.collection_name, | |
| points=points | |
| ) | |
| logger.info(f"Uploaded final batch ({len(points)} points)") | |
| async def close(self): | |
| """Close connections""" | |
| await self.qdrant_client.close() | |
| def get_all_markdown_files_recursively(root_path: Path) -> List[Path]: | |
| """ | |
| Find all markdown files recursively (as deep as needed) in the given root_path. | |
| This function will walk all subdirectories and return both *.md and *.mdx files. | |
| Args: | |
| root_path: Path to the root directory | |
| Returns: | |
| List[Path]: List of all markdown file Paths | |
| """ | |
| md_files = list(root_path.rglob("*.md")) | |
| mdx_files = list(root_path.rglob("*.mdx")) | |
| all_files = md_files + mdx_files | |
| return [file for file in all_files if file.is_file() and 'node_modules' not in str(file)] | |
| async def main(): | |
| """Main embedding script""" | |
| parser = argparse.ArgumentParser(description="Embed book content into Qdrant") | |
| parser.add_argument( | |
| "--book-path", | |
| type=str, | |
| required=True, | |
| help="Path to book content directory (e.g., docs/)" | |
| ) | |
| parser.add_argument( | |
| "--collection-name", | |
| type=str, | |
| default="humanoid-robotics-book-v1", | |
| help="Qdrant collection name" | |
| ) | |
| parser.add_argument( | |
| "--doc-version", | |
| type=str, | |
| default="v1.0.0", | |
| help="Document version identifier" | |
| ) | |
| args = parser.parse_args() | |
| # Initialize components | |
| chunker = BookContentChunker(chunk_size=500, overlap=50) | |
| embedder = BookEmbedder(collection_name=args.collection_name) | |
| try: | |
| # Create collection, with robust error handling in the constructor | |
| await embedder.create_collection() | |
| # Find all markdown files as deep as needed | |
| book_path = Path(args.book_path) | |
| md_files = get_all_markdown_files_recursively(book_path) | |
| logger.info(f"Found {len(md_files)} markdown files (.md and .mdx) recursively in all subdirectories") | |
| # Process each file | |
| all_chunks = [] | |
| for md_file in md_files: | |
| logger.info(f"Processing: {md_file}") | |
| with open(md_file, 'r', encoding='utf-8') as f: | |
| content = f.read() | |
| chunks = chunker.chunk_markdown(content, str(md_file)) | |
| all_chunks.extend(chunks) | |
| logger.info(f" -> Generated {len(chunks)} chunks") | |
| logger.info(f"Total chunks: {len(all_chunks)}") | |
| # Upload to Qdrant | |
| await embedder.upload_chunks(all_chunks, doc_version=args.doc_version) | |
| logger.info("✅ Embedding complete!") | |
| finally: | |
| await embedder.close() | |
| if __name__ == "__main__": | |
| # Run main in asyncio loop, but trap connection errors globally as a last resort | |
| try: | |
| asyncio.run(main()) | |
| except Exception as e: | |
| logger.error(f"FATAL: Exception occurred: {type(e).__name__}: {e}") | |
| logger.error("Please check if Qdrant is running, accessible, and credentials are set correctly.") | |
| import sys | |
| sys.exit(1) | |