|
|
""" |
|
|
Test script for Langfuse v3 observability integration. |
|
|
|
|
|
This script tests the observability module and ensures that: |
|
|
1. Observability can be initialized properly |
|
|
2. Root spans are created correctly |
|
|
3. Agent spans are nested properly |
|
|
4. Tool spans work as expected |
|
|
""" |
|
|
|
|
|
import asyncio |
|
|
import os |
|
|
from dotenv import load_dotenv |
|
|
|
|
|
|
|
|
load_dotenv("env.local") |
|
|
|
|
|
def test_observability_initialization(): |
|
|
"""Test that observability can be initialized""" |
|
|
print("π§ͺ Testing observability initialization...") |
|
|
|
|
|
from observability import initialize_observability, get_callback_handler |
|
|
|
|
|
|
|
|
success = initialize_observability() |
|
|
if success: |
|
|
print("β
Observability initialized successfully") |
|
|
else: |
|
|
print("β Observability initialization failed") |
|
|
return False |
|
|
|
|
|
|
|
|
handler = get_callback_handler() |
|
|
if handler: |
|
|
print("β
Callback handler obtained successfully") |
|
|
else: |
|
|
print("β Failed to get callback handler") |
|
|
return False |
|
|
|
|
|
return True |
|
|
|
|
|
def test_span_creation(): |
|
|
"""Test that spans can be created properly""" |
|
|
print("\nπ§ͺ Testing span creation...") |
|
|
|
|
|
from observability import start_root_span, agent_span, tool_span |
|
|
|
|
|
try: |
|
|
|
|
|
with start_root_span( |
|
|
name="test-request", |
|
|
user_id="test_user", |
|
|
session_id="test_session", |
|
|
metadata={"test": True} |
|
|
) as root_span: |
|
|
print("β
Root span created successfully") |
|
|
|
|
|
|
|
|
with agent_span("test_agent", metadata={"agent_type": "test"}) as agent_span_ctx: |
|
|
print("β
Agent span created successfully") |
|
|
|
|
|
|
|
|
with tool_span("test_tool", metadata={"tool_type": "test"}) as tool_span_ctx: |
|
|
print("β
Tool span created successfully") |
|
|
|
|
|
print("β
All spans created and closed successfully") |
|
|
return True |
|
|
|
|
|
except Exception as e: |
|
|
print(f"β Span creation failed: {e}") |
|
|
return False |
|
|
|
|
|
async def test_agent_system_integration(): |
|
|
"""Test the full agent system with observability""" |
|
|
print("\nπ§ͺ Testing agent system integration...") |
|
|
|
|
|
try: |
|
|
from langgraph_agent_system import run_agent_system |
|
|
|
|
|
|
|
|
result = await run_agent_system( |
|
|
query="What is 2 + 2?", |
|
|
user_id="test_user_integration", |
|
|
session_id="test_session_integration", |
|
|
max_iterations=1 |
|
|
) |
|
|
|
|
|
if result and isinstance(result, str): |
|
|
print(f"β
Agent system ran successfully") |
|
|
print(f"π Result: {result[:100]}...") |
|
|
return True |
|
|
else: |
|
|
print("β Agent system returned invalid result") |
|
|
return False |
|
|
|
|
|
except Exception as e: |
|
|
print(f"β Agent system integration test failed: {e}") |
|
|
return False |
|
|
|
|
|
def test_flush_and_cleanup(): |
|
|
"""Test flushing and cleanup functions""" |
|
|
print("\nπ§ͺ Testing flush and cleanup...") |
|
|
|
|
|
try: |
|
|
from observability import flush_traces, shutdown_observability |
|
|
|
|
|
|
|
|
flush_traces(background=False) |
|
|
print("β
Traces flushed successfully") |
|
|
|
|
|
|
|
|
shutdown_observability() |
|
|
print("β
Observability shutdown successfully") |
|
|
|
|
|
return True |
|
|
|
|
|
except Exception as e: |
|
|
print(f"β Flush and cleanup test failed: {e}") |
|
|
return False |
|
|
|
|
|
async def main(): |
|
|
"""Run all tests""" |
|
|
print("π Starting Langfuse v3 observability tests...\n") |
|
|
|
|
|
tests = [ |
|
|
("Observability Initialization", test_observability_initialization), |
|
|
("Span Creation", test_span_creation), |
|
|
("Agent System Integration", test_agent_system_integration), |
|
|
("Flush and Cleanup", test_flush_and_cleanup) |
|
|
] |
|
|
|
|
|
results = [] |
|
|
|
|
|
for test_name, test_func in tests: |
|
|
print(f"\n{'='*50}") |
|
|
print(f"Running: {test_name}") |
|
|
print(f"{'='*50}") |
|
|
|
|
|
try: |
|
|
if asyncio.iscoroutinefunction(test_func): |
|
|
result = await test_func() |
|
|
else: |
|
|
result = test_func() |
|
|
results.append((test_name, result)) |
|
|
except Exception as e: |
|
|
print(f"β Test {test_name} failed with exception: {e}") |
|
|
results.append((test_name, False)) |
|
|
|
|
|
|
|
|
print(f"\n{'='*50}") |
|
|
print("TEST RESULTS SUMMARY") |
|
|
print(f"{'='*50}") |
|
|
|
|
|
passed = 0 |
|
|
total = len(results) |
|
|
|
|
|
for test_name, result in results: |
|
|
status = "β
PASSED" if result else "β FAILED" |
|
|
print(f"{test_name}: {status}") |
|
|
if result: |
|
|
passed += 1 |
|
|
|
|
|
print(f"\nOverall: {passed}/{total} tests passed") |
|
|
|
|
|
if passed == total: |
|
|
print("π All tests passed! Langfuse v3 observability is working correctly.") |
|
|
else: |
|
|
print("β οΈ Some tests failed. Check the output above for details.") |
|
|
|
|
|
|
|
|
print(f"\n{'='*50}") |
|
|
print("ENVIRONMENT CHECK") |
|
|
print(f"{'='*50}") |
|
|
|
|
|
required_env_vars = [ |
|
|
"LANGFUSE_PUBLIC_KEY", |
|
|
"LANGFUSE_SECRET_KEY", |
|
|
"LANGFUSE_HOST" |
|
|
] |
|
|
|
|
|
for var in required_env_vars: |
|
|
value = os.getenv(var) |
|
|
if value: |
|
|
print(f"β
{var}: {'*' * min(len(value), 10)}...") |
|
|
else: |
|
|
print(f"β {var}: Not set") |
|
|
|
|
|
print(f"\nπ If tests passed, check your Langfuse dashboard at: {os.getenv('LANGFUSE_HOST', 'https://cloud.langfuse.com')}") |
|
|
|
|
|
if __name__ == "__main__": |
|
|
asyncio.run(main()) |