Spaces:
Configuration error
Configuration error
| import asyncio | |
| import tempfile | |
| import os | |
| from pathlib import Path | |
| from embedding_pipeline import EmbeddingPipeline | |
| async def create_test_documents(): | |
| """Create temporary test documents for testing the pipeline.""" | |
| # Create a temporary directory | |
| temp_dir = tempfile.mkdtemp() | |
| # Create sample documents | |
| sample_docs = [ | |
| ("sample1.txt", "This is a sample document about Physical AI. It discusses the integration of artificial intelligence with physical systems, robotics, and control theory. This field combines machine learning with real-world applications in robotics and automation."), | |
| ("sample2.txt", "ROS2 Basics: Robot Operating System 2 is a flexible framework for writing robot software. It is a collection of tools, libraries, and conventions that aim to simplify the task of creating complex and robust robot behavior across a wide variety of robot platforms."), | |
| ("sample3.txt", "Simulation in robotics is crucial for testing algorithms before deploying them on real hardware. Gazebo and Webots are popular simulation environments that allow developers to create realistic virtual worlds for their robots."), | |
| ("sample4.txt", "Isaac Platform by NVIDIA provides a comprehensive solution for developing, training, and deploying AI-based robotics applications. It includes tools for simulation, training, and deployment of robotics applications."), | |
| ("sample5.txt", "Humanoid robotics involves the design and control of robots with human-like characteristics. This includes bipedal locomotion, manipulation, and interaction with human environments. Key challenges include balance, gait planning, and human-robot interaction.") | |
| ] | |
| for filename, content in sample_docs: | |
| file_path = os.path.join(temp_dir, filename) | |
| with open(file_path, 'w', encoding='utf-8') as f: | |
| f.write(content) | |
| return temp_dir | |
| async def test_pipeline(): | |
| """Test the entire embedding pipeline.""" | |
| print("Creating test documents...") | |
| test_dir = await create_test_documents() | |
| print(f"Test documents created in: {test_dir}") | |
| try: | |
| print("\nInitializing embedding pipeline...") | |
| pipeline = EmbeddingPipeline() | |
| print("Processing documents...") | |
| processed_count = await pipeline.process_directory(test_dir) | |
| print(f"Successfully processed {processed_count} documents") | |
| # Check document count in the collection | |
| doc_count = pipeline.vector_store.get_all_documents_count() | |
| print(f"Total documents in vector store: {doc_count}") | |
| # Test search functionality | |
| print("\nTesting search functionality...") | |
| test_queries = [ | |
| "Physical AI and robotics", | |
| "Robot Operating System", | |
| "Simulation environments for robotics", | |
| "Isaac Platform by NVIDIA", | |
| "Humanoid robotics and bipedal locomotion" | |
| ] | |
| for query in test_queries: | |
| print(f"\nSearching for: '{query}'") | |
| results = pipeline.search(query, top_k=2) | |
| if results: | |
| for i, result in enumerate(results): | |
| print(f" Result {i+1} (Score: {result['score']:.4f}):") | |
| print(f" Source: {result['source']}") | |
| print(f" Content preview: {result['content'][:100]}...") | |
| else: | |
| print(" No results found.") | |
| print("\nPipeline test completed successfully!") | |
| except Exception as e: | |
| print(f"Error during pipeline test: {str(e)}") | |
| import traceback | |
| traceback.print_exc() | |
| finally: | |
| # Clean up temporary directory | |
| import shutil | |
| shutil.rmtree(test_dir) | |
| print(f"\nCleaned up test directory: {test_dir}") | |
| def test_connection(): | |
| """Test connection to Qdrant and OpenAI API.""" | |
| print("Testing connections...") | |
| from config import QDRANT_URL, COLLECTION_NAME | |
| from vector_store import VectorStore | |
| from embedder import Embedder | |
| # Test Qdrant connection | |
| try: | |
| vector_store = VectorStore() | |
| collections = vector_store.client.get_collections().collections | |
| collection_names = [col.name for col in collections] | |
| print(f"[OK] Qdrant connection successful. Collections: {collection_names}") | |
| except Exception as e: | |
| print(f"[ERROR] Qdrant connection failed: {str(e)}") | |
| return False | |
| # Test embedding creation | |
| try: | |
| embedder = Embedder() | |
| test_embedding = asyncio.run(embedder.create_embedding("test connection")) | |
| print(f"[OK] Embedding API connection successful. Embedding dimension: {len(test_embedding)}") | |
| except Exception as e: | |
| print(f"[ERROR] Embedding API connection failed: {str(e)}") | |
| return False | |
| return True | |
| if __name__ == "__main__": | |
| print("Starting comprehensive tests for the Physical AI embedding system...\n") | |
| # Test connections first | |
| if not test_connection(): | |
| print("\nConnection tests failed. Please check your configuration.") | |
| exit(1) | |
| print() | |
| # Run the pipeline test | |
| asyncio.run(test_pipeline()) |