File size: 5,866 Bytes
f844f16 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 |
"""
Test script for Langfuse v3 observability integration.
This script tests the observability module and ensures that:
1. Observability can be initialized properly
2. Root spans are created correctly
3. Agent spans are nested properly
4. Tool spans work as expected
"""
import asyncio
import os
from dotenv import load_dotenv
# Load environment variables
load_dotenv("env.local")
def test_observability_initialization():
"""Test that observability can be initialized"""
print("π§ͺ Testing observability initialization...")
from observability import initialize_observability, get_callback_handler
# Test initialization
success = initialize_observability()
if success:
print("β
Observability initialized successfully")
else:
print("β Observability initialization failed")
return False
# Test callback handler
handler = get_callback_handler()
if handler:
print("β
Callback handler obtained successfully")
else:
print("β Failed to get callback handler")
return False
return True
def test_span_creation():
"""Test that spans can be created properly"""
print("\nπ§ͺ Testing span creation...")
from observability import start_root_span, agent_span, tool_span
try:
# Test root span
with start_root_span(
name="test-request",
user_id="test_user",
session_id="test_session",
metadata={"test": True}
) as root_span:
print("β
Root span created successfully")
# Test agent span
with agent_span("test_agent", metadata={"agent_type": "test"}) as agent_span_ctx:
print("β
Agent span created successfully")
# Test tool span
with tool_span("test_tool", metadata={"tool_type": "test"}) as tool_span_ctx:
print("β
Tool span created successfully")
print("β
All spans created and closed successfully")
return True
except Exception as e:
print(f"β Span creation failed: {e}")
return False
async def test_agent_system_integration():
"""Test the full agent system with observability"""
print("\nπ§ͺ Testing agent system integration...")
try:
from langgraph_agent_system import run_agent_system
# Test a simple question
result = await run_agent_system(
query="What is 2 + 2?",
user_id="test_user_integration",
session_id="test_session_integration",
max_iterations=1 # Keep it short for testing
)
if result and isinstance(result, str):
print(f"β
Agent system ran successfully")
print(f"π Result: {result[:100]}...")
return True
else:
print("β Agent system returned invalid result")
return False
except Exception as e:
print(f"β Agent system integration test failed: {e}")
return False
def test_flush_and_cleanup():
"""Test flushing and cleanup functions"""
print("\nπ§ͺ Testing flush and cleanup...")
try:
from observability import flush_traces, shutdown_observability
# Test flush
flush_traces(background=False)
print("β
Traces flushed successfully")
# Test shutdown
shutdown_observability()
print("β
Observability shutdown successfully")
return True
except Exception as e:
print(f"β Flush and cleanup test failed: {e}")
return False
async def main():
"""Run all tests"""
print("π Starting Langfuse v3 observability tests...\n")
tests = [
("Observability Initialization", test_observability_initialization),
("Span Creation", test_span_creation),
("Agent System Integration", test_agent_system_integration),
("Flush and Cleanup", test_flush_and_cleanup)
]
results = []
for test_name, test_func in tests:
print(f"\n{'='*50}")
print(f"Running: {test_name}")
print(f"{'='*50}")
try:
if asyncio.iscoroutinefunction(test_func):
result = await test_func()
else:
result = test_func()
results.append((test_name, result))
except Exception as e:
print(f"β Test {test_name} failed with exception: {e}")
results.append((test_name, False))
# Summary
print(f"\n{'='*50}")
print("TEST RESULTS SUMMARY")
print(f"{'='*50}")
passed = 0
total = len(results)
for test_name, result in results:
status = "β
PASSED" if result else "β FAILED"
print(f"{test_name}: {status}")
if result:
passed += 1
print(f"\nOverall: {passed}/{total} tests passed")
if passed == total:
print("π All tests passed! Langfuse v3 observability is working correctly.")
else:
print("β οΈ Some tests failed. Check the output above for details.")
# Check environment variables
print(f"\n{'='*50}")
print("ENVIRONMENT CHECK")
print(f"{'='*50}")
required_env_vars = [
"LANGFUSE_PUBLIC_KEY",
"LANGFUSE_SECRET_KEY",
"LANGFUSE_HOST"
]
for var in required_env_vars:
value = os.getenv(var)
if value:
print(f"β
{var}: {'*' * min(len(value), 10)}...")
else:
print(f"β {var}: Not set")
print(f"\nπ If tests passed, check your Langfuse dashboard at: {os.getenv('LANGFUSE_HOST', 'https://cloud.langfuse.com')}")
if __name__ == "__main__":
asyncio.run(main()) |