cpu / cloudflare_client.py
Nhughes09
Modular architecture: logging_config.py, cloudflare_client.py, app.py for Cloudflare AI
e54f4cc
# cloudflare_client.py - Cloudflare Workers AI API Client
import requests
import json
from datetime import datetime
class CloudflareAIClient:
"""Client for calling Cloudflare Workers AI endpoints."""
def __init__(self, logger, endpoint_url):
self.logger = logger
self.endpoint_url = endpoint_url
self.request_count = 0
self.success_count = 0
self.error_count = 0
self.last_error = None
self.logger.info(f"CloudflareAIClient initialized")
self.logger.info(f"Endpoint: {endpoint_url}")
def get_stats(self):
"""Return client statistics."""
return {
"requests": self.request_count,
"successes": self.success_count,
"errors": self.error_count,
"last_error": self.last_error
}
def test_connection(self):
"""Test if the endpoint is reachable."""
self.logger.info("Testing connection to Cloudflare endpoint...")
try:
response = requests.get(self.endpoint_url, timeout=10)
self.logger.info(f"Connection test: Status {response.status_code}")
self.logger.debug(f"Response: {response.text[:200]}")
return response.status_code in [200, 405] # 405 = method not allowed but reachable
except Exception as e:
self.logger.error(f"Connection test failed: {e}")
return False
def call_ai(self, prompt):
"""
Call the Cloudflare Workers AI with a prompt.
Args:
prompt: The user's message/prompt
Returns:
dict with 'success', 'response' or 'error', and 'debug_info'
"""
self.request_count += 1
request_id = f"REQ-{self.request_count:04d}"
self.logger.info("-" * 50)
self.logger.info(f"[{request_id}] NEW AI REQUEST")
self.logger.info(f"[{request_id}] Prompt: {prompt[:100]}...")
self.logger.info(f"[{request_id}] Endpoint: {self.endpoint_url}")
debug_info = {
"request_id": request_id,
"timestamp": datetime.now().isoformat(),
"endpoint": self.endpoint_url,
"prompt_length": len(prompt)
}
payload = {"prompt": prompt}
self.logger.debug(f"[{request_id}] Payload: {json.dumps(payload)}")
try:
self.logger.info(f"[{request_id}] Sending POST request...")
response = requests.post(
self.endpoint_url,
headers={"Content-Type": "application/json"},
json=payload,
timeout=60
)
debug_info["status_code"] = response.status_code
debug_info["response_time_estimate"] = "< 60s"
self.logger.info(f"[{request_id}] Response Status: {response.status_code}")
self.logger.debug(f"[{request_id}] Response Headers: {dict(response.headers)}")
if response.status_code == 200:
try:
result = response.json()
self.logger.info(f"[{request_id}] SUCCESS: Got JSON response")
self.logger.debug(f"[{request_id}] Response: {json.dumps(result)[:300]}")
# Extract the reply from the response
reply = result.get("reply") or result.get("response") or result.get("message")
if reply:
self.success_count += 1
self.logger.info(f"[{request_id}] AI Reply: {reply[:100]}...")
return {
"success": True,
"response": reply,
"debug_info": debug_info
}
else:
self.logger.warning(f"[{request_id}] No reply field in response")
return {
"success": True,
"response": str(result),
"debug_info": debug_info
}
except json.JSONDecodeError as e:
self.logger.error(f"[{request_id}] JSON parse error: {e}")
debug_info["error"] = f"JSON parse error: {e}"
return {
"success": False,
"error": f"Invalid JSON response: {response.text[:200]}",
"debug_info": debug_info
}
else:
self.error_count += 1
error_msg = f"HTTP {response.status_code}: {response.text[:200]}"
self.last_error = error_msg
self.logger.error(f"[{request_id}] API ERROR: {error_msg}")
debug_info["error"] = error_msg
# Provide helpful diagnostics based on error code
diagnosis = self._diagnose_error(response.status_code, response.text)
debug_info["diagnosis"] = diagnosis
return {
"success": False,
"error": error_msg,
"diagnosis": diagnosis,
"debug_info": debug_info
}
except requests.exceptions.Timeout:
self.error_count += 1
self.last_error = "Request timeout"
self.logger.error(f"[{request_id}] TIMEOUT after 60 seconds")
return {
"success": False,
"error": "Request timed out after 60 seconds",
"debug_info": debug_info
}
except requests.exceptions.ConnectionError as e:
self.error_count += 1
self.last_error = str(e)
self.logger.error(f"[{request_id}] CONNECTION ERROR: {e}")
return {
"success": False,
"error": f"Connection error: {e}",
"debug_info": debug_info
}
except Exception as e:
self.error_count += 1
self.last_error = str(e)
self.logger.error(f"[{request_id}] UNEXPECTED ERROR: {e}")
return {
"success": False,
"error": str(e),
"debug_info": debug_info
}
def _diagnose_error(self, status_code, response_text):
"""Diagnose common errors and provide helpful suggestions."""
diagnosis = {"code": status_code, "suggestions": []}
if "1042" in response_text:
diagnosis["issue"] = "WORKER_NOT_DEPLOYED"
diagnosis["suggestions"] = [
"The Cloudflare Worker is not deployed yet",
"Push the worker code to GitHub to trigger deployment",
"Check the Cloudflare dashboard for build errors"
]
elif status_code == 400:
diagnosis["issue"] = "BAD_REQUEST"
diagnosis["suggestions"] = [
"Check the request payload format",
"Ensure 'prompt' field is present"
]
elif status_code == 401 or status_code == 403:
diagnosis["issue"] = "AUTHENTICATION_ERROR"
diagnosis["suggestions"] = [
"Check Cloudflare API token",
"Verify Worker permissions"
]
elif status_code == 404:
diagnosis["issue"] = "NOT_FOUND"
diagnosis["suggestions"] = [
"Worker endpoint does not exist",
"Check the URL is correct"
]
elif status_code == 500:
diagnosis["issue"] = "SERVER_ERROR"
diagnosis["suggestions"] = [
"Check Worker logs in Cloudflare dashboard",
"Verify AI binding is configured"
]
else:
diagnosis["issue"] = f"UNKNOWN_ERROR_{status_code}"
diagnosis["suggestions"] = ["Check Cloudflare dashboard for details"]
for suggestion in diagnosis["suggestions"]:
self.logger.warning(f"SUGGESTION: {suggestion}")
return diagnosis