File size: 8,311 Bytes
e54f4cc |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 |
# cloudflare_client.py - Cloudflare Workers AI API Client
import requests
import json
from datetime import datetime
class CloudflareAIClient:
"""Client for calling Cloudflare Workers AI endpoints."""
def __init__(self, logger, endpoint_url):
self.logger = logger
self.endpoint_url = endpoint_url
self.request_count = 0
self.success_count = 0
self.error_count = 0
self.last_error = None
self.logger.info(f"CloudflareAIClient initialized")
self.logger.info(f"Endpoint: {endpoint_url}")
def get_stats(self):
"""Return client statistics."""
return {
"requests": self.request_count,
"successes": self.success_count,
"errors": self.error_count,
"last_error": self.last_error
}
def test_connection(self):
"""Test if the endpoint is reachable."""
self.logger.info("Testing connection to Cloudflare endpoint...")
try:
response = requests.get(self.endpoint_url, timeout=10)
self.logger.info(f"Connection test: Status {response.status_code}")
self.logger.debug(f"Response: {response.text[:200]}")
return response.status_code in [200, 405] # 405 = method not allowed but reachable
except Exception as e:
self.logger.error(f"Connection test failed: {e}")
return False
def call_ai(self, prompt):
"""
Call the Cloudflare Workers AI with a prompt.
Args:
prompt: The user's message/prompt
Returns:
dict with 'success', 'response' or 'error', and 'debug_info'
"""
self.request_count += 1
request_id = f"REQ-{self.request_count:04d}"
self.logger.info("-" * 50)
self.logger.info(f"[{request_id}] NEW AI REQUEST")
self.logger.info(f"[{request_id}] Prompt: {prompt[:100]}...")
self.logger.info(f"[{request_id}] Endpoint: {self.endpoint_url}")
debug_info = {
"request_id": request_id,
"timestamp": datetime.now().isoformat(),
"endpoint": self.endpoint_url,
"prompt_length": len(prompt)
}
payload = {"prompt": prompt}
self.logger.debug(f"[{request_id}] Payload: {json.dumps(payload)}")
try:
self.logger.info(f"[{request_id}] Sending POST request...")
response = requests.post(
self.endpoint_url,
headers={"Content-Type": "application/json"},
json=payload,
timeout=60
)
debug_info["status_code"] = response.status_code
debug_info["response_time_estimate"] = "< 60s"
self.logger.info(f"[{request_id}] Response Status: {response.status_code}")
self.logger.debug(f"[{request_id}] Response Headers: {dict(response.headers)}")
if response.status_code == 200:
try:
result = response.json()
self.logger.info(f"[{request_id}] SUCCESS: Got JSON response")
self.logger.debug(f"[{request_id}] Response: {json.dumps(result)[:300]}")
# Extract the reply from the response
reply = result.get("reply") or result.get("response") or result.get("message")
if reply:
self.success_count += 1
self.logger.info(f"[{request_id}] AI Reply: {reply[:100]}...")
return {
"success": True,
"response": reply,
"debug_info": debug_info
}
else:
self.logger.warning(f"[{request_id}] No reply field in response")
return {
"success": True,
"response": str(result),
"debug_info": debug_info
}
except json.JSONDecodeError as e:
self.logger.error(f"[{request_id}] JSON parse error: {e}")
debug_info["error"] = f"JSON parse error: {e}"
return {
"success": False,
"error": f"Invalid JSON response: {response.text[:200]}",
"debug_info": debug_info
}
else:
self.error_count += 1
error_msg = f"HTTP {response.status_code}: {response.text[:200]}"
self.last_error = error_msg
self.logger.error(f"[{request_id}] API ERROR: {error_msg}")
debug_info["error"] = error_msg
# Provide helpful diagnostics based on error code
diagnosis = self._diagnose_error(response.status_code, response.text)
debug_info["diagnosis"] = diagnosis
return {
"success": False,
"error": error_msg,
"diagnosis": diagnosis,
"debug_info": debug_info
}
except requests.exceptions.Timeout:
self.error_count += 1
self.last_error = "Request timeout"
self.logger.error(f"[{request_id}] TIMEOUT after 60 seconds")
return {
"success": False,
"error": "Request timed out after 60 seconds",
"debug_info": debug_info
}
except requests.exceptions.ConnectionError as e:
self.error_count += 1
self.last_error = str(e)
self.logger.error(f"[{request_id}] CONNECTION ERROR: {e}")
return {
"success": False,
"error": f"Connection error: {e}",
"debug_info": debug_info
}
except Exception as e:
self.error_count += 1
self.last_error = str(e)
self.logger.error(f"[{request_id}] UNEXPECTED ERROR: {e}")
return {
"success": False,
"error": str(e),
"debug_info": debug_info
}
def _diagnose_error(self, status_code, response_text):
"""Diagnose common errors and provide helpful suggestions."""
diagnosis = {"code": status_code, "suggestions": []}
if "1042" in response_text:
diagnosis["issue"] = "WORKER_NOT_DEPLOYED"
diagnosis["suggestions"] = [
"The Cloudflare Worker is not deployed yet",
"Push the worker code to GitHub to trigger deployment",
"Check the Cloudflare dashboard for build errors"
]
elif status_code == 400:
diagnosis["issue"] = "BAD_REQUEST"
diagnosis["suggestions"] = [
"Check the request payload format",
"Ensure 'prompt' field is present"
]
elif status_code == 401 or status_code == 403:
diagnosis["issue"] = "AUTHENTICATION_ERROR"
diagnosis["suggestions"] = [
"Check Cloudflare API token",
"Verify Worker permissions"
]
elif status_code == 404:
diagnosis["issue"] = "NOT_FOUND"
diagnosis["suggestions"] = [
"Worker endpoint does not exist",
"Check the URL is correct"
]
elif status_code == 500:
diagnosis["issue"] = "SERVER_ERROR"
diagnosis["suggestions"] = [
"Check Worker logs in Cloudflare dashboard",
"Verify AI binding is configured"
]
else:
diagnosis["issue"] = f"UNKNOWN_ERROR_{status_code}"
diagnosis["suggestions"] = ["Check Cloudflare dashboard for details"]
for suggestion in diagnosis["suggestions"]:
self.logger.warning(f"SUGGESTION: {suggestion}")
return diagnosis
|