Spaces:
Running
Running
| """ | |
| Test script for the new /tenant-run endpoint. | |
| 1. Sets up tenant in data_sources API | |
| 2. Creates tenant API key | |
| 3. Registers PostgreSQL source | |
| 4. Tests the /tenant-run endpoint with streaming and non-streaming | |
| """ | |
| import httpx | |
| import json | |
| import sys | |
| import os | |
| from datetime import datetime | |
| # Add parent directory to path | |
| sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) | |
| # ============================================================================ | |
| # CONFIGURATION | |
| # ============================================================================ | |
| # Data Sources API | |
| DATA_SOURCES_API_URL = "http://127.0.0.1:8000" | |
| ADMIN_API_KEY = os.environ.get("SIRUS_ADMIN_API_KEY", "426f27d994943c874c22d42e77596e33c455dac315c361d35215cae5f39941b3") | |
| # Your AgentOS server | |
| AGENT_OS_URL = "http://127.0.0.1:5559" | |
| AGENT_ID = "sirus-sql-agent" | |
| # Tenant configuration | |
| TENANT_ID = "demo_tenant_001" | |
| SOURCE_NAME = "scv_sample_db" | |
| USER_ID = "test-user-007" | |
| SESSION_ID = "my-test-session-123" | |
| # PostgreSQL connection | |
| DB_CONFIG = { | |
| "uri": "postgresql://neondb_owner:npg_dfWNsn2ZGk7c@ep-cool-poetry-a1puamly-pooler.ap-southeast-1.aws.neon.tech:5432/scv-sample?sslmode=require" | |
| } | |
| # Will be set after tenant setup | |
| TENANT_API_KEY = None | |
| # ============================================================================ | |
| # TENANT SETUP FUNCTIONS | |
| # ============================================================================ | |
| def setup_tenant(): | |
| """Register tenant and source in data_sources API, return API key.""" | |
| global TENANT_API_KEY | |
| print("\n" + "="*80) | |
| print("π§ SETTING UP TENANT IN DATA SOURCES API") | |
| print("="*80) | |
| client = httpx.Client(base_url=DATA_SOURCES_API_URL, timeout=120) | |
| # Step 1: Create API key for tenant | |
| print(f"\nπ Creating API key for tenant '{TENANT_ID}'...") | |
| try: | |
| api_key_response = client.post( | |
| f"/api/v1/data-sources/tenants/{TENANT_ID}/api-keys", | |
| json={ | |
| "description": f"Test key for {TENANT_ID} - {datetime.now().isoformat()}", | |
| "expires_in_days": 30 | |
| }, | |
| headers={"X-Sirus-Admin-Key": ADMIN_API_KEY} | |
| ) | |
| if api_key_response.status_code != 201: | |
| print(f"β Failed to create API key: {api_key_response.status_code}") | |
| print(f" Response: {api_key_response.text}") | |
| return False | |
| api_key_data = api_key_response.json() | |
| TENANT_API_KEY = api_key_data["api_key"] | |
| print(f"β API key created: {TENANT_API_KEY[:20]}...") | |
| except Exception as e: | |
| print(f"β Error creating API key: {e}") | |
| return False | |
| # Step 2: Register PostgreSQL source | |
| print(f"\nπ Registering PostgreSQL source '{SOURCE_NAME}'...") | |
| try: | |
| source_response = client.post( | |
| f"/api/v1/data-sources/tenants/{TENANT_ID}/sources", | |
| json={ | |
| "sources": [ | |
| { | |
| "source_name": SOURCE_NAME, | |
| "source_type": "ibis", | |
| "config": { | |
| "uri": DB_CONFIG["uri"] | |
| } | |
| } | |
| ], | |
| "validate_connection": True | |
| }, | |
| headers={"X-Sirus-Api-Key": TENANT_API_KEY} | |
| ) | |
| if source_response.status_code != 201: | |
| print(f"β Failed to register source: {source_response.status_code}") | |
| print(f" Response: {source_response.text}") | |
| return False | |
| source_data = source_response.json() | |
| probe_result = source_data["probe_results"][0] | |
| if probe_result["status"] == "success": | |
| print(f"β Source registered and validated") | |
| print(f" Tables found: {probe_result.get('tables_found', 'N/A')}") | |
| else: | |
| print(f"β οΈ Source registered but validation failed: {probe_result.get('message', 'Unknown error')}") | |
| except Exception as e: | |
| print(f"β Error registering source: {e}") | |
| return False | |
| # Step 3: Verify source is accessible | |
| print(f"\nπ Verifying source access...") | |
| try: | |
| list_response = client.get( | |
| "/api/v1/data-sources/list", | |
| params={"tenant_id": TENANT_ID}, | |
| headers={"X-Sirus-Api-Key": TENANT_API_KEY} | |
| ) | |
| if list_response.status_code == 200: | |
| sources = list_response.json()["available_sources"] | |
| print(f"β Tenant sources accessible: {sources}") | |
| else: | |
| print(f"β οΈ Failed to list sources: {list_response.text}") | |
| except Exception as e: | |
| print(f"β οΈ Error verifying sources: {e}") | |
| print("\n" + "="*80) | |
| print("β TENANT SETUP COMPLETE") | |
| print("="*80) | |
| return True | |
| # ============================================================================ | |
| # TEST QUERIES | |
| # ============================================================================ | |
| test_queries = [ | |
| "What tables are available in this database?", | |
| "How many records are in each table?", | |
| "Show me the first 3 rows from the action_items table", | |
| ] | |
| # ============================================================================ | |
| # TEST FUNCTIONS | |
| # ============================================================================ | |
| def test_streaming_request(question: str): | |
| """Test the endpoint with streaming enabled (real-time output)""" | |
| print(f"\n{'='*80}") | |
| print(f"π STREAMING TEST: {question}") | |
| print(f"{'='*80}\n") | |
| payload = { | |
| "message": question, | |
| "tenant_id": TENANT_ID, | |
| "source_name": SOURCE_NAME, | |
| "api_key": TENANT_API_KEY, | |
| "session_id": SESSION_ID, | |
| "user_id": USER_ID, | |
| "stream": True # Enable streaming | |
| } | |
| try: | |
| with httpx.stream( | |
| "POST", | |
| f"{AGENT_OS_URL}/tenant-run/{AGENT_ID}", | |
| json=payload, | |
| timeout=60.0 | |
| ) as response: | |
| print(f"β Connected with status: {response.status_code}") | |
| response.raise_for_status() | |
| print("\n--- Agent Stream Output ---") | |
| for line in response.iter_lines(): | |
| if line.startswith("event:"): | |
| event_type = line[6:].strip() | |
| print(f"\n[EVENT: {event_type}]") | |
| elif line.startswith("data:"): | |
| try: | |
| data = json.loads(line[5:]) | |
| if isinstance(data, dict): | |
| # Pretty print structured data | |
| if data.get('event') == 'RunCompleted': | |
| print("\n[β Run Completed]") | |
| elif 'content' in data: | |
| print(f" {data['content']}") | |
| else: | |
| print(f" {json.dumps(data, indent=2)}") | |
| else: | |
| print(f" {data}") | |
| except json.JSONDecodeError: | |
| # Raw text output | |
| print(f" {line[5:]}") | |
| print("--- End of Stream ---\n") | |
| except httpx.HTTPStatusError as e: | |
| print(f"β HTTP Error: {e.response.status_code}") | |
| print(f" Response: {e.response.text}") | |
| except httpx.RequestError as e: | |
| print(f"β Connection Error: Failed to connect to {e.request.url}") | |
| except Exception as e: | |
| print(f"β Unexpected Error: {e}") | |
| def test_non_streaming_request(question: str): | |
| """Test the endpoint with streaming disabled (get full response at once)""" | |
| print(f"\n{'='*80}") | |
| print(f"π NON-STREAMING TEST: {question}") | |
| print(f"{'='*80}\n") | |
| payload = { | |
| "message": question, | |
| "tenant_id": TENANT_ID, | |
| "source_name": SOURCE_NAME, | |
| "api_key": TENANT_API_KEY, | |
| "session_id": SESSION_ID + "-non-streaming", # Different session | |
| "user_id": USER_ID, | |
| "stream": False # Disable streaming | |
| } | |
| try: | |
| response = httpx.post( | |
| f"{AGENT_OS_URL}/tenant-run/{AGENT_ID}", | |
| json=payload, | |
| timeout=60.0 | |
| ) | |
| response.raise_for_status() | |
| result = response.json() | |
| print(f"β Response received (status: {response.status_code})") | |
| print(f"\nSession ID: {result.get('session_id')}") | |
| print(f"Tenant ID: {result.get('tenant_id')}") | |
| print(f"\n--- Agent Response ---") | |
| print(result.get('response', 'No response content')) | |
| print("--- End of Response ---\n") | |
| except httpx.HTTPStatusError as e: | |
| print(f"β HTTP Error: {e.response.status_code}") | |
| print(f" Response: {e.response.text}") | |
| except httpx.RequestError as e: | |
| print(f"β Connection Error: Failed to connect to {e.request.url}") | |
| except Exception as e: | |
| print(f"β Unexpected Error: {e}") | |
| def test_health_check(): | |
| """Test that the server is running and accessible""" | |
| print(f"\n{'='*80}") | |
| print(f"π₯ HEALTH CHECK") | |
| print(f"{'='*80}\n") | |
| try: | |
| # Test the standard AgentOS config endpoint | |
| response = httpx.get(f"{AGENT_OS_URL}/config", timeout=5.0) | |
| response.raise_for_status() | |
| print(f"β Server is running at {AGENT_OS_URL}") | |
| print(f" Status: {response.status_code}") | |
| # Try to get agents list | |
| agents_response = httpx.get(f"{AGENT_OS_URL}/agents", timeout=5.0) | |
| agents_response.raise_for_status() | |
| agents = agents_response.json() | |
| print(f" Available agents: {[agent.get('id') for agent in agents]}") | |
| except httpx.HTTPStatusError as e: | |
| print(f"β HTTP Error: {e.response.status_code}") | |
| print(f" Response: {e.response.text}") | |
| except httpx.RequestError as e: | |
| print(f"β Connection Error: Failed to connect to {AGENT_OS_URL}") | |
| print(f" Is the agent server running? Start it with: python agent.py") | |
| except Exception as e: | |
| print(f"β Unexpected Error: {e}") | |
| # ============================================================================ | |
| # MAIN TEST RUNNER | |
| # ============================================================================ | |
| def main(): | |
| # Step 1: Check if agent server is running | |
| test_health_check() | |
| # Step 2: Setup tenant in data_sources API | |
| if not setup_tenant(): | |
| print("\nβ Tenant setup failed. Cannot proceed with tests.") | |
| print(" Make sure the data_sources API is running at:", DATA_SOURCES_API_URL) | |
| return | |
| # Step 3: Test with the first query using streaming | |
| if test_queries: | |
| print("\n\n" + "="*80) | |
| print("π‘ STREAMING MODE TESTS") | |
| print("="*80) | |
| test_streaming_request(test_queries[0]) | |
| # Step 4: Test with the second query using non-streaming | |
| if len(test_queries) > 1: | |
| print("\n\n" + "="*80) | |
| print("π¦ NON-STREAMING MODE TESTS") | |
| print("="*80) | |
| test_non_streaming_request(test_queries[1]) | |
| if __name__ == "__main__": | |
| main() | |