Spaces:
Sleeping
Sleeping
File size: 5,635 Bytes
54fe70d |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 |
import os
import requests
import json
from pathlib import Path
from dotenv import load_dotenv
import random
# Load environment variables
load_dotenv()
# API endpoint for incident classification
API_URL = "http://localhost:8000/api/incidents/classify"
def get_token():
"""Get a JWT token by authenticating with the API"""
try:
login_url = "http://localhost:8000/api/auth/login"
login_data = {
"email": "testuser@example.com",
"password": "Password123!"
}
response = requests.post(login_url, data=json.dumps(login_data),
headers={"Content-Type": "application/json"})
if response.status_code == 200:
token = response.json().get("access_token")
print(f"Authentication successful. Token: {token[:15]}...")
return token
else:
print(f"Authentication failed with status code {response.status_code}")
print(response.text)
return None
except Exception as e:
print(f"Error getting token: {e}")
return None
def create_test_image():
"""Create a test image file - we'll use a .jpg extension but with text content for testing"""
# Create a directory for test files if it doesn't exist
test_dir = Path("test_files")
test_dir.mkdir(exist_ok=True)
# Use a .jpg extension to make sure Cloudinary tries to process it properly
test_image_path = test_dir / f"test_image_{random.randint(1000, 9999)}.jpg"
# Write some content to the file
with open(test_image_path, "w") as f:
f.write("This is a fake image file for testing Cloudinary uploads")
print(f"Created test file at {test_image_path}")
return test_image_path
def report_incident_with_image():
"""Report an incident with an image via the API"""
# Create a test image
image_path = create_test_image()
# Get a token
token = get_token()
if not token:
print("No token available, cannot proceed with test")
return False
try:
# Prepare the form data
form_data = {
"description": "Testing Cloudinary integration with improved logging",
"name": "Cloudinary Test",
"latitude": "37.7749",
"longitude": "-122.4194"
}
# Prepare the file
with open(image_path, "rb") as image_file:
files = {"image": (image_path.name, image_file, "image/jpeg")}
# Make the API request
print(f"Sending request to {API_URL}")
response = requests.post(
API_URL,
data=form_data,
files=files,
headers={"Authorization": f"Bearer {token}"}
)
# Process the response
if response.status_code == 200:
print("Incident report successful!")
print(f"Response: {response.json()}")
# Get the incident ID from the response
incident_id = response.json().get("incident_id")
print(f"Created incident with ID: {incident_id}")
# Now check the incident details to see if Cloudinary URL was used
if incident_id:
return check_incident_image(token, incident_id)
return True
else:
print(f"API request failed with status code {response.status_code}")
print(f"Response: {response.text}")
return False
except Exception as e:
print(f"Error testing incident report API: {e}")
return False
finally:
# Clean up test image
try:
if image_path.exists():
image_path.unlink()
print(f"Removed test file: {image_path}")
except Exception as e:
print(f"Error cleaning up test file: {e}")
def check_incident_image(token, incident_id):
"""Check if the incident has a Cloudinary image URL"""
try:
# Get the list of incidents
list_url = "http://localhost:8000/api/incidents/list"
print(f"Fetching incidents to check for Cloudinary URL...")
response = requests.get(
list_url,
headers={"Authorization": f"Bearer {token}"}
)
if response.status_code == 200:
incidents = response.json().get("incidents", [])
# Find our incident
for incident in incidents:
if incident.get("id") == incident_id:
image_path = incident.get("image_path")
print(f"Found incident. Image path: {image_path}")
if image_path and "cloudinary.com" in image_path:
print("✅ SUCCESS: Image was uploaded to Cloudinary!")
return True
else:
print("❌ FAILED: Image was not uploaded to Cloudinary")
return False
print(f"Couldn't find incident with ID: {incident_id}")
return False
else:
print(f"Failed to fetch incidents list: {response.status_code}")
print(response.text)
return False
except Exception as e:
print(f"Error checking incident: {e}")
return False
if __name__ == "__main__":
print("Testing Marine Guard incident reporting with Cloudinary integration")
success = report_incident_with_image()
print(f"Test {'PASSED' if success else 'FAILED'}") |