File size: 4,683 Bytes
e38de99 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 |
"""
Test script for DCRM API
========================
Simple script to test the API endpoint with the sample CSV file.
"""
import requests
import json
import sys
# Configuration
API_URL = "http://localhost:5000"
BREAKER_ID = "6926e63d4614721a79b7b24e"
CSV_FILE = "df3_final (1).csv"
def test_health_check():
"""Test the health check endpoint"""
print("π Testing health check endpoint...")
try:
response = requests.get(f"{API_URL}/api/health")
if response.status_code == 200:
print("β
Health check passed!")
print(json.dumps(response.json(), indent=2))
return True
else:
print(f"β Health check failed with status {response.status_code}")
return False
except requests.exceptions.ConnectionError:
print("β Cannot connect to API. Make sure the server is running!")
print(" Run: python dcrm_api.py")
return False
except Exception as e:
print(f"β Error: {e}")
return False
def test_analysis():
"""Test the main analysis endpoint"""
print(f"\nπ Testing analysis endpoint with {CSV_FILE}...")
try:
# Open and upload the CSV file
with open(CSV_FILE, 'rb') as f:
files = {'file': (CSV_FILE, f, 'text/csv')}
url = f"{API_URL}/api/circuit-breakers/{BREAKER_ID}/tests/upload"
print(f"π€ Uploading to: {url}")
print("β³ Processing (this may take 30-60 seconds)...")
response = requests.post(url, files=files, timeout=120)
if response.status_code == 200:
print("β
Analysis completed successfully!")
# Parse response
result = response.json()
# Display key results
print("\nπ Key Results:")
print(f" Breaker ID: {result.get('breakerId')}")
print(f" Health Score: {result.get('healthScore')}/100")
print(f" CBHI Score: {result.get('cbhi', {}).get('score')}/100")
print(f" Findings: {result.get('findings')}")
# Save full response
output_file = "test_response.json"
with open(output_file, 'w', encoding='utf-8') as f:
json.dump(result, f, indent=2, ensure_ascii=False)
print(f"\nπΎ Full response saved to: {output_file}")
return True
else:
print(f"β Analysis failed with status {response.status_code}")
print(f" Error: {response.text}")
return False
except FileNotFoundError:
print(f"β CSV file not found: {CSV_FILE}")
print(" Make sure the file exists in the current directory")
return False
except requests.exceptions.Timeout:
print("β Request timeout. Analysis took too long.")
return False
except Exception as e:
print(f"β Error: {e}")
return False
def test_invalid_file():
"""Test error handling with invalid file"""
print("\nπ Testing error handling with invalid file...")
try:
# Create a temporary invalid file
invalid_content = b"This is not a valid CSV"
files = {'file': ('invalid.txt', invalid_content, 'text/plain')}
url = f"{API_URL}/api/circuit-breakers/{BREAKER_ID}/tests/upload"
response = requests.post(url, files=files)
if response.status_code == 400:
print("β
Error handling works correctly!")
print(f" Error response: {response.json()}")
return True
else:
print(f"β οΈ Unexpected status code: {response.status_code}")
return False
except Exception as e:
print(f"β Error: {e}")
return False
def main():
"""Run all tests"""
print("=" * 60)
print("DCRM API Test Suite")
print("=" * 60)
# Test 1: Health check
if not test_health_check():
print("\nβ Health check failed. Stopping tests.")
sys.exit(1)
# Test 2: Main analysis
if not test_analysis():
print("\nβ Analysis test failed.")
sys.exit(1)
# Test 3: Error handling
test_invalid_file()
print("\n" + "=" * 60)
print("β
All tests completed!")
print("=" * 60)
if __name__ == "__main__":
main()
|