# cloudflare_client.py - Cloudflare Workers AI API Client import requests import json from datetime import datetime class CloudflareAIClient: """Client for calling Cloudflare Workers AI endpoints.""" def __init__(self, logger, endpoint_url): self.logger = logger self.endpoint_url = endpoint_url self.request_count = 0 self.success_count = 0 self.error_count = 0 self.last_error = None self.logger.info(f"CloudflareAIClient initialized") self.logger.info(f"Endpoint: {endpoint_url}") def get_stats(self): """Return client statistics.""" return { "requests": self.request_count, "successes": self.success_count, "errors": self.error_count, "last_error": self.last_error } def test_connection(self): """Test if the endpoint is reachable.""" self.logger.info("Testing connection to Cloudflare endpoint...") try: response = requests.get(self.endpoint_url, timeout=10) self.logger.info(f"Connection test: Status {response.status_code}") self.logger.debug(f"Response: {response.text[:200]}") return response.status_code in [200, 405] # 405 = method not allowed but reachable except Exception as e: self.logger.error(f"Connection test failed: {e}") return False def call_ai(self, prompt): """ Call the Cloudflare Workers AI with a prompt. Args: prompt: The user's message/prompt Returns: dict with 'success', 'response' or 'error', and 'debug_info' """ self.request_count += 1 request_id = f"REQ-{self.request_count:04d}" self.logger.info("-" * 50) self.logger.info(f"[{request_id}] NEW AI REQUEST") self.logger.info(f"[{request_id}] Prompt: {prompt[:100]}...") self.logger.info(f"[{request_id}] Endpoint: {self.endpoint_url}") debug_info = { "request_id": request_id, "timestamp": datetime.now().isoformat(), "endpoint": self.endpoint_url, "prompt_length": len(prompt) } payload = {"prompt": prompt} self.logger.debug(f"[{request_id}] Payload: {json.dumps(payload)}") try: self.logger.info(f"[{request_id}] Sending POST request...") response = requests.post( self.endpoint_url, headers={"Content-Type": "application/json"}, json=payload, timeout=60 ) debug_info["status_code"] = response.status_code debug_info["response_time_estimate"] = "< 60s" self.logger.info(f"[{request_id}] Response Status: {response.status_code}") self.logger.debug(f"[{request_id}] Response Headers: {dict(response.headers)}") if response.status_code == 200: try: result = response.json() self.logger.info(f"[{request_id}] SUCCESS: Got JSON response") self.logger.debug(f"[{request_id}] Response: {json.dumps(result)[:300]}") # Extract the reply from the response reply = result.get("reply") or result.get("response") or result.get("message") if reply: self.success_count += 1 self.logger.info(f"[{request_id}] AI Reply: {reply[:100]}...") return { "success": True, "response": reply, "debug_info": debug_info } else: self.logger.warning(f"[{request_id}] No reply field in response") return { "success": True, "response": str(result), "debug_info": debug_info } except json.JSONDecodeError as e: self.logger.error(f"[{request_id}] JSON parse error: {e}") debug_info["error"] = f"JSON parse error: {e}" return { "success": False, "error": f"Invalid JSON response: {response.text[:200]}", "debug_info": debug_info } else: self.error_count += 1 error_msg = f"HTTP {response.status_code}: {response.text[:200]}" self.last_error = error_msg self.logger.error(f"[{request_id}] API ERROR: {error_msg}") debug_info["error"] = error_msg # Provide helpful diagnostics based on error code diagnosis = self._diagnose_error(response.status_code, response.text) debug_info["diagnosis"] = diagnosis return { "success": False, "error": error_msg, "diagnosis": diagnosis, "debug_info": debug_info } except requests.exceptions.Timeout: self.error_count += 1 self.last_error = "Request timeout" self.logger.error(f"[{request_id}] TIMEOUT after 60 seconds") return { "success": False, "error": "Request timed out after 60 seconds", "debug_info": debug_info } except requests.exceptions.ConnectionError as e: self.error_count += 1 self.last_error = str(e) self.logger.error(f"[{request_id}] CONNECTION ERROR: {e}") return { "success": False, "error": f"Connection error: {e}", "debug_info": debug_info } except Exception as e: self.error_count += 1 self.last_error = str(e) self.logger.error(f"[{request_id}] UNEXPECTED ERROR: {e}") return { "success": False, "error": str(e), "debug_info": debug_info } def _diagnose_error(self, status_code, response_text): """Diagnose common errors and provide helpful suggestions.""" diagnosis = {"code": status_code, "suggestions": []} if "1042" in response_text: diagnosis["issue"] = "WORKER_NOT_DEPLOYED" diagnosis["suggestions"] = [ "The Cloudflare Worker is not deployed yet", "Push the worker code to GitHub to trigger deployment", "Check the Cloudflare dashboard for build errors" ] elif status_code == 400: diagnosis["issue"] = "BAD_REQUEST" diagnosis["suggestions"] = [ "Check the request payload format", "Ensure 'prompt' field is present" ] elif status_code == 401 or status_code == 403: diagnosis["issue"] = "AUTHENTICATION_ERROR" diagnosis["suggestions"] = [ "Check Cloudflare API token", "Verify Worker permissions" ] elif status_code == 404: diagnosis["issue"] = "NOT_FOUND" diagnosis["suggestions"] = [ "Worker endpoint does not exist", "Check the URL is correct" ] elif status_code == 500: diagnosis["issue"] = "SERVER_ERROR" diagnosis["suggestions"] = [ "Check Worker logs in Cloudflare dashboard", "Verify AI binding is configured" ] else: diagnosis["issue"] = f"UNKNOWN_ERROR_{status_code}" diagnosis["suggestions"] = ["Check Cloudflare dashboard for details"] for suggestion in diagnosis["suggestions"]: self.logger.warning(f"SUGGESTION: {suggestion}") return diagnosis