Final_Assignment_Template / test_observability.py
Humanlearning's picture
updated agent
f844f16
"""
Test script for Langfuse v3 observability integration.
This script tests the observability module and ensures that:
1. Observability can be initialized properly
2. Root spans are created correctly
3. Agent spans are nested properly
4. Tool spans work as expected
"""
import asyncio
import os
from dotenv import load_dotenv
# Load environment variables
load_dotenv("env.local")
def test_observability_initialization():
"""Test that observability can be initialized"""
print("πŸ§ͺ Testing observability initialization...")
from observability import initialize_observability, get_callback_handler
# Test initialization
success = initialize_observability()
if success:
print("βœ… Observability initialized successfully")
else:
print("❌ Observability initialization failed")
return False
# Test callback handler
handler = get_callback_handler()
if handler:
print("βœ… Callback handler obtained successfully")
else:
print("❌ Failed to get callback handler")
return False
return True
def test_span_creation():
"""Test that spans can be created properly"""
print("\nπŸ§ͺ Testing span creation...")
from observability import start_root_span, agent_span, tool_span
try:
# Test root span
with start_root_span(
name="test-request",
user_id="test_user",
session_id="test_session",
metadata={"test": True}
) as root_span:
print("βœ… Root span created successfully")
# Test agent span
with agent_span("test_agent", metadata={"agent_type": "test"}) as agent_span_ctx:
print("βœ… Agent span created successfully")
# Test tool span
with tool_span("test_tool", metadata={"tool_type": "test"}) as tool_span_ctx:
print("βœ… Tool span created successfully")
print("βœ… All spans created and closed successfully")
return True
except Exception as e:
print(f"❌ Span creation failed: {e}")
return False
async def test_agent_system_integration():
"""Test the full agent system with observability"""
print("\nπŸ§ͺ Testing agent system integration...")
try:
from langgraph_agent_system import run_agent_system
# Test a simple question
result = await run_agent_system(
query="What is 2 + 2?",
user_id="test_user_integration",
session_id="test_session_integration",
max_iterations=1 # Keep it short for testing
)
if result and isinstance(result, str):
print(f"βœ… Agent system ran successfully")
print(f"πŸ“ Result: {result[:100]}...")
return True
else:
print("❌ Agent system returned invalid result")
return False
except Exception as e:
print(f"❌ Agent system integration test failed: {e}")
return False
def test_flush_and_cleanup():
"""Test flushing and cleanup functions"""
print("\nπŸ§ͺ Testing flush and cleanup...")
try:
from observability import flush_traces, shutdown_observability
# Test flush
flush_traces(background=False)
print("βœ… Traces flushed successfully")
# Test shutdown
shutdown_observability()
print("βœ… Observability shutdown successfully")
return True
except Exception as e:
print(f"❌ Flush and cleanup test failed: {e}")
return False
async def main():
"""Run all tests"""
print("πŸš€ Starting Langfuse v3 observability tests...\n")
tests = [
("Observability Initialization", test_observability_initialization),
("Span Creation", test_span_creation),
("Agent System Integration", test_agent_system_integration),
("Flush and Cleanup", test_flush_and_cleanup)
]
results = []
for test_name, test_func in tests:
print(f"\n{'='*50}")
print(f"Running: {test_name}")
print(f"{'='*50}")
try:
if asyncio.iscoroutinefunction(test_func):
result = await test_func()
else:
result = test_func()
results.append((test_name, result))
except Exception as e:
print(f"❌ Test {test_name} failed with exception: {e}")
results.append((test_name, False))
# Summary
print(f"\n{'='*50}")
print("TEST RESULTS SUMMARY")
print(f"{'='*50}")
passed = 0
total = len(results)
for test_name, result in results:
status = "βœ… PASSED" if result else "❌ FAILED"
print(f"{test_name}: {status}")
if result:
passed += 1
print(f"\nOverall: {passed}/{total} tests passed")
if passed == total:
print("πŸŽ‰ All tests passed! Langfuse v3 observability is working correctly.")
else:
print("⚠️ Some tests failed. Check the output above for details.")
# Check environment variables
print(f"\n{'='*50}")
print("ENVIRONMENT CHECK")
print(f"{'='*50}")
required_env_vars = [
"LANGFUSE_PUBLIC_KEY",
"LANGFUSE_SECRET_KEY",
"LANGFUSE_HOST"
]
for var in required_env_vars:
value = os.getenv(var)
if value:
print(f"βœ… {var}: {'*' * min(len(value), 10)}...")
else:
print(f"❌ {var}: Not set")
print(f"\nπŸ”— If tests passed, check your Langfuse dashboard at: {os.getenv('LANGFUSE_HOST', 'https://cloud.langfuse.com')}")
if __name__ == "__main__":
asyncio.run(main())