Docker_Deploy / src /python /test_pipeline.py
Shaheryar Shah
Add backend files for RAG Chatbot Docker deployment
bec06d9
import asyncio
import tempfile
import os
from pathlib import Path
from embedding_pipeline import EmbeddingPipeline
async def create_test_documents():
"""Create temporary test documents for testing the pipeline."""
# Create a temporary directory
temp_dir = tempfile.mkdtemp()
# Create sample documents
sample_docs = [
("sample1.txt", "This is a sample document about Physical AI. It discusses the integration of artificial intelligence with physical systems, robotics, and control theory. This field combines machine learning with real-world applications in robotics and automation."),
("sample2.txt", "ROS2 Basics: Robot Operating System 2 is a flexible framework for writing robot software. It is a collection of tools, libraries, and conventions that aim to simplify the task of creating complex and robust robot behavior across a wide variety of robot platforms."),
("sample3.txt", "Simulation in robotics is crucial for testing algorithms before deploying them on real hardware. Gazebo and Webots are popular simulation environments that allow developers to create realistic virtual worlds for their robots."),
("sample4.txt", "Isaac Platform by NVIDIA provides a comprehensive solution for developing, training, and deploying AI-based robotics applications. It includes tools for simulation, training, and deployment of robotics applications."),
("sample5.txt", "Humanoid robotics involves the design and control of robots with human-like characteristics. This includes bipedal locomotion, manipulation, and interaction with human environments. Key challenges include balance, gait planning, and human-robot interaction.")
]
for filename, content in sample_docs:
file_path = os.path.join(temp_dir, filename)
with open(file_path, 'w', encoding='utf-8') as f:
f.write(content)
return temp_dir
async def test_pipeline():
"""Test the entire embedding pipeline."""
print("Creating test documents...")
test_dir = await create_test_documents()
print(f"Test documents created in: {test_dir}")
try:
print("\nInitializing embedding pipeline...")
pipeline = EmbeddingPipeline()
print("Processing documents...")
processed_count = await pipeline.process_directory(test_dir)
print(f"Successfully processed {processed_count} documents")
# Check document count in the collection
doc_count = pipeline.vector_store.get_all_documents_count()
print(f"Total documents in vector store: {doc_count}")
# Test search functionality
print("\nTesting search functionality...")
test_queries = [
"Physical AI and robotics",
"Robot Operating System",
"Simulation environments for robotics",
"Isaac Platform by NVIDIA",
"Humanoid robotics and bipedal locomotion"
]
for query in test_queries:
print(f"\nSearching for: '{query}'")
results = pipeline.search(query, top_k=2)
if results:
for i, result in enumerate(results):
print(f" Result {i+1} (Score: {result['score']:.4f}):")
print(f" Source: {result['source']}")
print(f" Content preview: {result['content'][:100]}...")
else:
print(" No results found.")
print("\nPipeline test completed successfully!")
except Exception as e:
print(f"Error during pipeline test: {str(e)}")
import traceback
traceback.print_exc()
finally:
# Clean up temporary directory
import shutil
shutil.rmtree(test_dir)
print(f"\nCleaned up test directory: {test_dir}")
def test_connection():
"""Test connection to Qdrant and OpenAI API."""
print("Testing connections...")
from config import QDRANT_URL, COLLECTION_NAME
from vector_store import VectorStore
from embedder import Embedder
# Test Qdrant connection
try:
vector_store = VectorStore()
collections = vector_store.client.get_collections().collections
collection_names = [col.name for col in collections]
print(f"[OK] Qdrant connection successful. Collections: {collection_names}")
except Exception as e:
print(f"[ERROR] Qdrant connection failed: {str(e)}")
return False
# Test embedding creation
try:
embedder = Embedder()
test_embedding = asyncio.run(embedder.create_embedding("test connection"))
print(f"[OK] Embedding API connection successful. Embedding dimension: {len(test_embedding)}")
except Exception as e:
print(f"[ERROR] Embedding API connection failed: {str(e)}")
return False
return True
if __name__ == "__main__":
print("Starting comprehensive tests for the Physical AI embedding system...\n")
# Test connections first
if not test_connection():
print("\nConnection tests failed. Please check your configuration.")
exit(1)
print()
# Run the pipeline test
asyncio.run(test_pipeline())