File size: 5,866 Bytes
f844f16
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
"""
Test script for Langfuse v3 observability integration.

This script tests the observability module and ensures that:
1. Observability can be initialized properly
2. Root spans are created correctly
3. Agent spans are nested properly
4. Tool spans work as expected
"""

import asyncio
import os
from dotenv import load_dotenv

# Load environment variables
load_dotenv("env.local")

def test_observability_initialization():
    """Test that observability can be initialized"""
    print("πŸ§ͺ Testing observability initialization...")
    
    from observability import initialize_observability, get_callback_handler
    
    # Test initialization
    success = initialize_observability()
    if success:
        print("βœ… Observability initialized successfully")
    else:
        print("❌ Observability initialization failed")
        return False
    
    # Test callback handler
    handler = get_callback_handler()
    if handler:
        print("βœ… Callback handler obtained successfully")
    else:
        print("❌ Failed to get callback handler")
        return False
    
    return True

def test_span_creation():
    """Test that spans can be created properly"""
    print("\nπŸ§ͺ Testing span creation...")
    
    from observability import start_root_span, agent_span, tool_span
    
    try:
        # Test root span
        with start_root_span(
            name="test-request",
            user_id="test_user",
            session_id="test_session",
            metadata={"test": True}
        ) as root_span:
            print("βœ… Root span created successfully")
            
            # Test agent span
            with agent_span("test_agent", metadata={"agent_type": "test"}) as agent_span_ctx:
                print("βœ… Agent span created successfully")
                
                # Test tool span
                with tool_span("test_tool", metadata={"tool_type": "test"}) as tool_span_ctx:
                    print("βœ… Tool span created successfully")
        
        print("βœ… All spans created and closed successfully")
        return True
        
    except Exception as e:
        print(f"❌ Span creation failed: {e}")
        return False

async def test_agent_system_integration():
    """Test the full agent system with observability"""
    print("\nπŸ§ͺ Testing agent system integration...")
    
    try:
        from langgraph_agent_system import run_agent_system
        
        # Test a simple question
        result = await run_agent_system(
            query="What is 2 + 2?",
            user_id="test_user_integration",
            session_id="test_session_integration",
            max_iterations=1  # Keep it short for testing
        )
        
        if result and isinstance(result, str):
            print(f"βœ… Agent system ran successfully")
            print(f"πŸ“ Result: {result[:100]}...")
            return True
        else:
            print("❌ Agent system returned invalid result")
            return False
            
    except Exception as e:
        print(f"❌ Agent system integration test failed: {e}")
        return False

def test_flush_and_cleanup():
    """Test flushing and cleanup functions"""
    print("\nπŸ§ͺ Testing flush and cleanup...")
    
    try:
        from observability import flush_traces, shutdown_observability
        
        # Test flush
        flush_traces(background=False)
        print("βœ… Traces flushed successfully")
        
        # Test shutdown
        shutdown_observability()
        print("βœ… Observability shutdown successfully")
        
        return True
        
    except Exception as e:
        print(f"❌ Flush and cleanup test failed: {e}")
        return False

async def main():
    """Run all tests"""
    print("πŸš€ Starting Langfuse v3 observability tests...\n")
    
    tests = [
        ("Observability Initialization", test_observability_initialization),
        ("Span Creation", test_span_creation),
        ("Agent System Integration", test_agent_system_integration),
        ("Flush and Cleanup", test_flush_and_cleanup)
    ]
    
    results = []
    
    for test_name, test_func in tests:
        print(f"\n{'='*50}")
        print(f"Running: {test_name}")
        print(f"{'='*50}")
        
        try:
            if asyncio.iscoroutinefunction(test_func):
                result = await test_func()
            else:
                result = test_func()
            results.append((test_name, result))
        except Exception as e:
            print(f"❌ Test {test_name} failed with exception: {e}")
            results.append((test_name, False))
    
    # Summary
    print(f"\n{'='*50}")
    print("TEST RESULTS SUMMARY")
    print(f"{'='*50}")
    
    passed = 0
    total = len(results)
    
    for test_name, result in results:
        status = "βœ… PASSED" if result else "❌ FAILED"
        print(f"{test_name}: {status}")
        if result:
            passed += 1
    
    print(f"\nOverall: {passed}/{total} tests passed")
    
    if passed == total:
        print("πŸŽ‰ All tests passed! Langfuse v3 observability is working correctly.")
    else:
        print("⚠️  Some tests failed. Check the output above for details.")
        
    # Check environment variables
    print(f"\n{'='*50}")
    print("ENVIRONMENT CHECK")
    print(f"{'='*50}")
    
    required_env_vars = [
        "LANGFUSE_PUBLIC_KEY",
        "LANGFUSE_SECRET_KEY", 
        "LANGFUSE_HOST"
    ]
    
    for var in required_env_vars:
        value = os.getenv(var)
        if value:
            print(f"βœ… {var}: {'*' * min(len(value), 10)}...")
        else:
            print(f"❌ {var}: Not set")
    
    print(f"\nπŸ”— If tests passed, check your Langfuse dashboard at: {os.getenv('LANGFUSE_HOST', 'https://cloud.langfuse.com')}")

if __name__ == "__main__":
    asyncio.run(main())