File size: 5,265 Bytes
bec06d9
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
import asyncio
import tempfile
import os
from pathlib import Path
from embedding_pipeline import EmbeddingPipeline

async def create_test_documents():
    """Create temporary test documents for testing the pipeline."""
    # Create a temporary directory
    temp_dir = tempfile.mkdtemp()
    
    # Create sample documents
    sample_docs = [
        ("sample1.txt", "This is a sample document about Physical AI. It discusses the integration of artificial intelligence with physical systems, robotics, and control theory. This field combines machine learning with real-world applications in robotics and automation."),
        ("sample2.txt", "ROS2 Basics: Robot Operating System 2 is a flexible framework for writing robot software. It is a collection of tools, libraries, and conventions that aim to simplify the task of creating complex and robust robot behavior across a wide variety of robot platforms."),
        ("sample3.txt", "Simulation in robotics is crucial for testing algorithms before deploying them on real hardware. Gazebo and Webots are popular simulation environments that allow developers to create realistic virtual worlds for their robots."),
        ("sample4.txt", "Isaac Platform by NVIDIA provides a comprehensive solution for developing, training, and deploying AI-based robotics applications. It includes tools for simulation, training, and deployment of robotics applications."),
        ("sample5.txt", "Humanoid robotics involves the design and control of robots with human-like characteristics. This includes bipedal locomotion, manipulation, and interaction with human environments. Key challenges include balance, gait planning, and human-robot interaction.")
    ]
    
    for filename, content in sample_docs:
        file_path = os.path.join(temp_dir, filename)
        with open(file_path, 'w', encoding='utf-8') as f:
            f.write(content)
    
    return temp_dir

async def test_pipeline():
    """Test the entire embedding pipeline."""
    print("Creating test documents...")
    test_dir = await create_test_documents()
    print(f"Test documents created in: {test_dir}")
    
    try:
        print("\nInitializing embedding pipeline...")
        pipeline = EmbeddingPipeline()
        
        print("Processing documents...")
        processed_count = await pipeline.process_directory(test_dir)
        print(f"Successfully processed {processed_count} documents")
        
        # Check document count in the collection
        doc_count = pipeline.vector_store.get_all_documents_count()
        print(f"Total documents in vector store: {doc_count}")
        
        # Test search functionality
        print("\nTesting search functionality...")
        
        test_queries = [
            "Physical AI and robotics",
            "Robot Operating System",
            "Simulation environments for robotics",
            "Isaac Platform by NVIDIA",
            "Humanoid robotics and bipedal locomotion"
        ]
        
        for query in test_queries:
            print(f"\nSearching for: '{query}'")
            results = pipeline.search(query, top_k=2)
            
            if results:
                for i, result in enumerate(results):
                    print(f"  Result {i+1} (Score: {result['score']:.4f}):")
                    print(f"    Source: {result['source']}")
                    print(f"    Content preview: {result['content'][:100]}...")
            else:
                print("  No results found.")
        
        print("\nPipeline test completed successfully!")
        
    except Exception as e:
        print(f"Error during pipeline test: {str(e)}")
        import traceback
        traceback.print_exc()
    
    finally:
        # Clean up temporary directory
        import shutil
        shutil.rmtree(test_dir)
        print(f"\nCleaned up test directory: {test_dir}")

def test_connection():
    """Test connection to Qdrant and OpenAI API."""
    print("Testing connections...")
    
    from config import QDRANT_URL, COLLECTION_NAME
    from vector_store import VectorStore
    from embedder import Embedder
    
    # Test Qdrant connection
    try:
        vector_store = VectorStore()
        collections = vector_store.client.get_collections().collections
        collection_names = [col.name for col in collections]
        print(f"[OK] Qdrant connection successful. Collections: {collection_names}")
    except Exception as e:
        print(f"[ERROR] Qdrant connection failed: {str(e)}")
        return False

    # Test embedding creation
    try:
        embedder = Embedder()
        test_embedding = asyncio.run(embedder.create_embedding("test connection"))
        print(f"[OK] Embedding API connection successful. Embedding dimension: {len(test_embedding)}")
    except Exception as e:
        print(f"[ERROR] Embedding API connection failed: {str(e)}")
        return False
    
    return True

if __name__ == "__main__":
    print("Starting comprehensive tests for the Physical AI embedding system...\n")
    
    # Test connections first
    if not test_connection():
        print("\nConnection tests failed. Please check your configuration.")
        exit(1)
    
    print()
    
    # Run the pipeline test
    asyncio.run(test_pipeline())