""" Test script for the new /tenant-run endpoint. 1. Sets up tenant in data_sources API 2. Creates tenant API key 3. Registers PostgreSQL source 4. Tests the /tenant-run endpoint with streaming and non-streaming """ import httpx import json import sys import os from datetime import datetime # Add parent directory to path sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) # ============================================================================ # CONFIGURATION # ============================================================================ # Data Sources API DATA_SOURCES_API_URL = "http://127.0.0.1:8000" ADMIN_API_KEY = os.environ.get("SIRUS_ADMIN_API_KEY", "426f27d994943c874c22d42e77596e33c455dac315c361d35215cae5f39941b3") # Your AgentOS server AGENT_OS_URL = "http://127.0.0.1:5559" AGENT_ID = "sirus-sql-agent" # Tenant configuration TENANT_ID = "demo_tenant_001" SOURCE_NAME = "scv_sample_db" USER_ID = "test-user-007" SESSION_ID = "my-test-session-123" # PostgreSQL connection DB_CONFIG = { "uri": "postgresql://neondb_owner:npg_dfWNsn2ZGk7c@ep-cool-poetry-a1puamly-pooler.ap-southeast-1.aws.neon.tech:5432/scv-sample?sslmode=require" } # Will be set after tenant setup TENANT_API_KEY = None # ============================================================================ # TENANT SETUP FUNCTIONS # ============================================================================ def setup_tenant(): """Register tenant and source in data_sources API, return API key.""" global TENANT_API_KEY print("\n" + "="*80) print("šŸ”§ SETTING UP TENANT IN DATA SOURCES API") print("="*80) client = httpx.Client(base_url=DATA_SOURCES_API_URL, timeout=120) # Step 1: Create API key for tenant print(f"\nšŸ“ Creating API key for tenant '{TENANT_ID}'...") try: api_key_response = client.post( f"/api/v1/data-sources/tenants/{TENANT_ID}/api-keys", json={ "description": f"Test key for {TENANT_ID} - {datetime.now().isoformat()}", "expires_in_days": 30 }, headers={"X-Sirus-Admin-Key": ADMIN_API_KEY} ) if api_key_response.status_code != 201: print(f"āŒ Failed to create API key: {api_key_response.status_code}") print(f" Response: {api_key_response.text}") return False api_key_data = api_key_response.json() TENANT_API_KEY = api_key_data["api_key"] print(f"āœ… API key created: {TENANT_API_KEY[:20]}...") except Exception as e: print(f"āŒ Error creating API key: {e}") return False # Step 2: Register PostgreSQL source print(f"\nšŸ“Š Registering PostgreSQL source '{SOURCE_NAME}'...") try: source_response = client.post( f"/api/v1/data-sources/tenants/{TENANT_ID}/sources", json={ "sources": [ { "source_name": SOURCE_NAME, "source_type": "ibis", "config": { "uri": DB_CONFIG["uri"] } } ], "validate_connection": True }, headers={"X-Sirus-Api-Key": TENANT_API_KEY} ) if source_response.status_code != 201: print(f"āŒ Failed to register source: {source_response.status_code}") print(f" Response: {source_response.text}") return False source_data = source_response.json() probe_result = source_data["probe_results"][0] if probe_result["status"] == "success": print(f"āœ… Source registered and validated") print(f" Tables found: {probe_result.get('tables_found', 'N/A')}") else: print(f"āš ļø Source registered but validation failed: {probe_result.get('message', 'Unknown error')}") except Exception as e: print(f"āŒ Error registering source: {e}") return False # Step 3: Verify source is accessible print(f"\nšŸ” Verifying source access...") try: list_response = client.get( "/api/v1/data-sources/list", params={"tenant_id": TENANT_ID}, headers={"X-Sirus-Api-Key": TENANT_API_KEY} ) if list_response.status_code == 200: sources = list_response.json()["available_sources"] print(f"āœ… Tenant sources accessible: {sources}") else: print(f"āš ļø Failed to list sources: {list_response.text}") except Exception as e: print(f"āš ļø Error verifying sources: {e}") print("\n" + "="*80) print("āœ… TENANT SETUP COMPLETE") print("="*80) return True # ============================================================================ # TEST QUERIES # ============================================================================ test_queries = [ "What tables are available in this database?", "How many records are in each table?", "Show me the first 3 rows from the action_items table", ] # ============================================================================ # TEST FUNCTIONS # ============================================================================ def test_streaming_request(question: str): """Test the endpoint with streaming enabled (real-time output)""" print(f"\n{'='*80}") print(f"šŸš€ STREAMING TEST: {question}") print(f"{'='*80}\n") payload = { "message": question, "tenant_id": TENANT_ID, "source_name": SOURCE_NAME, "api_key": TENANT_API_KEY, "session_id": SESSION_ID, "user_id": USER_ID, "stream": True # Enable streaming } try: with httpx.stream( "POST", f"{AGENT_OS_URL}/tenant-run/{AGENT_ID}", json=payload, timeout=60.0 ) as response: print(f"āœ… Connected with status: {response.status_code}") response.raise_for_status() print("\n--- Agent Stream Output ---") for line in response.iter_lines(): if line.startswith("event:"): event_type = line[6:].strip() print(f"\n[EVENT: {event_type}]") elif line.startswith("data:"): try: data = json.loads(line[5:]) if isinstance(data, dict): # Pretty print structured data if data.get('event') == 'RunCompleted': print("\n[āœ… Run Completed]") elif 'content' in data: print(f" {data['content']}") else: print(f" {json.dumps(data, indent=2)}") else: print(f" {data}") except json.JSONDecodeError: # Raw text output print(f" {line[5:]}") print("--- End of Stream ---\n") except httpx.HTTPStatusError as e: print(f"āŒ HTTP Error: {e.response.status_code}") print(f" Response: {e.response.text}") except httpx.RequestError as e: print(f"āŒ Connection Error: Failed to connect to {e.request.url}") except Exception as e: print(f"āŒ Unexpected Error: {e}") def test_non_streaming_request(question: str): """Test the endpoint with streaming disabled (get full response at once)""" print(f"\n{'='*80}") print(f"šŸš€ NON-STREAMING TEST: {question}") print(f"{'='*80}\n") payload = { "message": question, "tenant_id": TENANT_ID, "source_name": SOURCE_NAME, "api_key": TENANT_API_KEY, "session_id": SESSION_ID + "-non-streaming", # Different session "user_id": USER_ID, "stream": False # Disable streaming } try: response = httpx.post( f"{AGENT_OS_URL}/tenant-run/{AGENT_ID}", json=payload, timeout=60.0 ) response.raise_for_status() result = response.json() print(f"āœ… Response received (status: {response.status_code})") print(f"\nSession ID: {result.get('session_id')}") print(f"Tenant ID: {result.get('tenant_id')}") print(f"\n--- Agent Response ---") print(result.get('response', 'No response content')) print("--- End of Response ---\n") except httpx.HTTPStatusError as e: print(f"āŒ HTTP Error: {e.response.status_code}") print(f" Response: {e.response.text}") except httpx.RequestError as e: print(f"āŒ Connection Error: Failed to connect to {e.request.url}") except Exception as e: print(f"āŒ Unexpected Error: {e}") def test_health_check(): """Test that the server is running and accessible""" print(f"\n{'='*80}") print(f"šŸ„ HEALTH CHECK") print(f"{'='*80}\n") try: # Test the standard AgentOS config endpoint response = httpx.get(f"{AGENT_OS_URL}/config", timeout=5.0) response.raise_for_status() print(f"āœ… Server is running at {AGENT_OS_URL}") print(f" Status: {response.status_code}") # Try to get agents list agents_response = httpx.get(f"{AGENT_OS_URL}/agents", timeout=5.0) agents_response.raise_for_status() agents = agents_response.json() print(f" Available agents: {[agent.get('id') for agent in agents]}") except httpx.HTTPStatusError as e: print(f"āŒ HTTP Error: {e.response.status_code}") print(f" Response: {e.response.text}") except httpx.RequestError as e: print(f"āŒ Connection Error: Failed to connect to {AGENT_OS_URL}") print(f" Is the agent server running? Start it with: python agent.py") except Exception as e: print(f"āŒ Unexpected Error: {e}") # ============================================================================ # MAIN TEST RUNNER # ============================================================================ def main(): # Step 1: Check if agent server is running test_health_check() # Step 2: Setup tenant in data_sources API if not setup_tenant(): print("\nāŒ Tenant setup failed. Cannot proceed with tests.") print(" Make sure the data_sources API is running at:", DATA_SOURCES_API_URL) return # Step 3: Test with the first query using streaming if test_queries: print("\n\n" + "="*80) print("šŸ“” STREAMING MODE TESTS") print("="*80) test_streaming_request(test_queries[0]) # Step 4: Test with the second query using non-streaming if len(test_queries) > 1: print("\n\n" + "="*80) print("šŸ“¦ NON-STREAMING MODE TESTS") print("="*80) test_non_streaming_request(test_queries[1]) if __name__ == "__main__": main()