sirus / backend /SQL_Agent /test_tenant_run_endpoint.py
ranilmukesh's picture
Deploy SiRUS SQL Agent backend
b8277c4
"""
Test script for the new /tenant-run endpoint.
1. Sets up tenant in data_sources API
2. Creates tenant API key
3. Registers PostgreSQL source
4. Tests the /tenant-run endpoint with streaming and non-streaming
"""
import httpx
import json
import sys
import os
from datetime import datetime
# Add parent directory to path
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
# ============================================================================
# CONFIGURATION
# ============================================================================
# Data Sources API
DATA_SOURCES_API_URL = "http://127.0.0.1:8000"
ADMIN_API_KEY = os.environ.get("SIRUS_ADMIN_API_KEY", "426f27d994943c874c22d42e77596e33c455dac315c361d35215cae5f39941b3")
# Your AgentOS server
AGENT_OS_URL = "http://127.0.0.1:5559"
AGENT_ID = "sirus-sql-agent"
# Tenant configuration
TENANT_ID = "demo_tenant_001"
SOURCE_NAME = "scv_sample_db"
USER_ID = "test-user-007"
SESSION_ID = "my-test-session-123"
# PostgreSQL connection
DB_CONFIG = {
"uri": "postgresql://neondb_owner:npg_dfWNsn2ZGk7c@ep-cool-poetry-a1puamly-pooler.ap-southeast-1.aws.neon.tech:5432/scv-sample?sslmode=require"
}
# Will be set after tenant setup
TENANT_API_KEY = None
# ============================================================================
# TENANT SETUP FUNCTIONS
# ============================================================================
def setup_tenant():
"""Register tenant and source in data_sources API, return API key."""
global TENANT_API_KEY
print("\n" + "="*80)
print("πŸ”§ SETTING UP TENANT IN DATA SOURCES API")
print("="*80)
client = httpx.Client(base_url=DATA_SOURCES_API_URL, timeout=120)
# Step 1: Create API key for tenant
print(f"\nπŸ“ Creating API key for tenant '{TENANT_ID}'...")
try:
api_key_response = client.post(
f"/api/v1/data-sources/tenants/{TENANT_ID}/api-keys",
json={
"description": f"Test key for {TENANT_ID} - {datetime.now().isoformat()}",
"expires_in_days": 30
},
headers={"X-Sirus-Admin-Key": ADMIN_API_KEY}
)
if api_key_response.status_code != 201:
print(f"❌ Failed to create API key: {api_key_response.status_code}")
print(f" Response: {api_key_response.text}")
return False
api_key_data = api_key_response.json()
TENANT_API_KEY = api_key_data["api_key"]
print(f"βœ… API key created: {TENANT_API_KEY[:20]}...")
except Exception as e:
print(f"❌ Error creating API key: {e}")
return False
# Step 2: Register PostgreSQL source
print(f"\nπŸ“Š Registering PostgreSQL source '{SOURCE_NAME}'...")
try:
source_response = client.post(
f"/api/v1/data-sources/tenants/{TENANT_ID}/sources",
json={
"sources": [
{
"source_name": SOURCE_NAME,
"source_type": "ibis",
"config": {
"uri": DB_CONFIG["uri"]
}
}
],
"validate_connection": True
},
headers={"X-Sirus-Api-Key": TENANT_API_KEY}
)
if source_response.status_code != 201:
print(f"❌ Failed to register source: {source_response.status_code}")
print(f" Response: {source_response.text}")
return False
source_data = source_response.json()
probe_result = source_data["probe_results"][0]
if probe_result["status"] == "success":
print(f"βœ… Source registered and validated")
print(f" Tables found: {probe_result.get('tables_found', 'N/A')}")
else:
print(f"⚠️ Source registered but validation failed: {probe_result.get('message', 'Unknown error')}")
except Exception as e:
print(f"❌ Error registering source: {e}")
return False
# Step 3: Verify source is accessible
print(f"\nπŸ” Verifying source access...")
try:
list_response = client.get(
"/api/v1/data-sources/list",
params={"tenant_id": TENANT_ID},
headers={"X-Sirus-Api-Key": TENANT_API_KEY}
)
if list_response.status_code == 200:
sources = list_response.json()["available_sources"]
print(f"βœ… Tenant sources accessible: {sources}")
else:
print(f"⚠️ Failed to list sources: {list_response.text}")
except Exception as e:
print(f"⚠️ Error verifying sources: {e}")
print("\n" + "="*80)
print("βœ… TENANT SETUP COMPLETE")
print("="*80)
return True
# ============================================================================
# TEST QUERIES
# ============================================================================
test_queries = [
"What tables are available in this database?",
"How many records are in each table?",
"Show me the first 3 rows from the action_items table",
]
# ============================================================================
# TEST FUNCTIONS
# ============================================================================
def test_streaming_request(question: str):
"""Test the endpoint with streaming enabled (real-time output)"""
print(f"\n{'='*80}")
print(f"πŸš€ STREAMING TEST: {question}")
print(f"{'='*80}\n")
payload = {
"message": question,
"tenant_id": TENANT_ID,
"source_name": SOURCE_NAME,
"api_key": TENANT_API_KEY,
"session_id": SESSION_ID,
"user_id": USER_ID,
"stream": True # Enable streaming
}
try:
with httpx.stream(
"POST",
f"{AGENT_OS_URL}/tenant-run/{AGENT_ID}",
json=payload,
timeout=60.0
) as response:
print(f"βœ… Connected with status: {response.status_code}")
response.raise_for_status()
print("\n--- Agent Stream Output ---")
for line in response.iter_lines():
if line.startswith("event:"):
event_type = line[6:].strip()
print(f"\n[EVENT: {event_type}]")
elif line.startswith("data:"):
try:
data = json.loads(line[5:])
if isinstance(data, dict):
# Pretty print structured data
if data.get('event') == 'RunCompleted':
print("\n[βœ… Run Completed]")
elif 'content' in data:
print(f" {data['content']}")
else:
print(f" {json.dumps(data, indent=2)}")
else:
print(f" {data}")
except json.JSONDecodeError:
# Raw text output
print(f" {line[5:]}")
print("--- End of Stream ---\n")
except httpx.HTTPStatusError as e:
print(f"❌ HTTP Error: {e.response.status_code}")
print(f" Response: {e.response.text}")
except httpx.RequestError as e:
print(f"❌ Connection Error: Failed to connect to {e.request.url}")
except Exception as e:
print(f"❌ Unexpected Error: {e}")
def test_non_streaming_request(question: str):
"""Test the endpoint with streaming disabled (get full response at once)"""
print(f"\n{'='*80}")
print(f"πŸš€ NON-STREAMING TEST: {question}")
print(f"{'='*80}\n")
payload = {
"message": question,
"tenant_id": TENANT_ID,
"source_name": SOURCE_NAME,
"api_key": TENANT_API_KEY,
"session_id": SESSION_ID + "-non-streaming", # Different session
"user_id": USER_ID,
"stream": False # Disable streaming
}
try:
response = httpx.post(
f"{AGENT_OS_URL}/tenant-run/{AGENT_ID}",
json=payload,
timeout=60.0
)
response.raise_for_status()
result = response.json()
print(f"βœ… Response received (status: {response.status_code})")
print(f"\nSession ID: {result.get('session_id')}")
print(f"Tenant ID: {result.get('tenant_id')}")
print(f"\n--- Agent Response ---")
print(result.get('response', 'No response content'))
print("--- End of Response ---\n")
except httpx.HTTPStatusError as e:
print(f"❌ HTTP Error: {e.response.status_code}")
print(f" Response: {e.response.text}")
except httpx.RequestError as e:
print(f"❌ Connection Error: Failed to connect to {e.request.url}")
except Exception as e:
print(f"❌ Unexpected Error: {e}")
def test_health_check():
"""Test that the server is running and accessible"""
print(f"\n{'='*80}")
print(f"πŸ₯ HEALTH CHECK")
print(f"{'='*80}\n")
try:
# Test the standard AgentOS config endpoint
response = httpx.get(f"{AGENT_OS_URL}/config", timeout=5.0)
response.raise_for_status()
print(f"βœ… Server is running at {AGENT_OS_URL}")
print(f" Status: {response.status_code}")
# Try to get agents list
agents_response = httpx.get(f"{AGENT_OS_URL}/agents", timeout=5.0)
agents_response.raise_for_status()
agents = agents_response.json()
print(f" Available agents: {[agent.get('id') for agent in agents]}")
except httpx.HTTPStatusError as e:
print(f"❌ HTTP Error: {e.response.status_code}")
print(f" Response: {e.response.text}")
except httpx.RequestError as e:
print(f"❌ Connection Error: Failed to connect to {AGENT_OS_URL}")
print(f" Is the agent server running? Start it with: python agent.py")
except Exception as e:
print(f"❌ Unexpected Error: {e}")
# ============================================================================
# MAIN TEST RUNNER
# ============================================================================
def main():
# Step 1: Check if agent server is running
test_health_check()
# Step 2: Setup tenant in data_sources API
if not setup_tenant():
print("\n❌ Tenant setup failed. Cannot proceed with tests.")
print(" Make sure the data_sources API is running at:", DATA_SOURCES_API_URL)
return
# Step 3: Test with the first query using streaming
if test_queries:
print("\n\n" + "="*80)
print("πŸ“‘ STREAMING MODE TESTS")
print("="*80)
test_streaming_request(test_queries[0])
# Step 4: Test with the second query using non-streaming
if len(test_queries) > 1:
print("\n\n" + "="*80)
print("πŸ“¦ NON-STREAMING MODE TESTS")
print("="*80)
test_non_streaming_request(test_queries[1])
if __name__ == "__main__":
main()