Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| """ | |
| Error handling utilities for HuggingClaw - Cain. | |
| Provides specific exception handlers for common operations. | |
| NOTE: This module is intentionally decoupled from app.py to avoid circular imports. | |
| Exception handlers are registered via register_error_handlers(app) function. | |
| """ | |
| import json | |
| import os | |
| import sys | |
| import logging | |
| import traceback | |
| from datetime import datetime | |
| from pathlib import Path | |
| from typing import Dict, Any, Optional | |
| from fastapi import FastAPI, HTTPException, Request | |
| from fastapi.responses import JSONResponse | |
| # Dedicated error logger for exception tracking | |
| _error_log_path = "/app/logs/cain_errors.log" # Writable in Docker container | |
| try: | |
| error_handler = logging.FileHandler(_error_log_path) | |
| error_handler.setFormatter(logging.Formatter( | |
| "%(asctime)s | %(levelname)-8s | %(name)s | %(message)s" | |
| )) | |
| except (PermissionError, FileNotFoundError): | |
| error_handler = logging.StreamHandler(sys.stdout) | |
| error_handler.setFormatter(logging.Formatter( | |
| "%(asctime)s | %(levelname)-8s | %(name)s | %(message)s" | |
| )) | |
| error_logger = logging.getLogger("cain.errors") | |
| error_logger.setLevel(logging.ERROR) | |
| error_logger.addHandler(error_handler) | |
| def log_exception(exc: Exception, request: Request = None) -> None: | |
| """ | |
| Log exception with timestamp and stack trace to error log. | |
| Args: | |
| exc: The exception to log. | |
| request: Optional request context for additional info. | |
| """ | |
| timestamp = datetime.utcnow().isoformat() + "+00:00" | |
| exc_type = type(exc).__name__ | |
| exc_msg = str(exc) | |
| exc_tb = traceback.format_exc() | |
| log_entry = { | |
| "timestamp": timestamp, | |
| "exception_type": exc_type, | |
| "message": exc_msg, | |
| "stack_trace": exc_tb, | |
| } | |
| if request: | |
| log_entry.update({ | |
| "request_method": request.method, | |
| "request_path": request.url.path, | |
| "client_ip": request.client.host if request.client else "unknown", | |
| }) | |
| error_logger.error(f"Exception logged: {json.dumps(log_entry, indent=2)}") | |
| async def http_exception_handler(request: Request, exc: HTTPException) -> JSONResponse: | |
| """ | |
| FastAPI exception handler for HTTPException. | |
| Args: | |
| request: The incoming request. | |
| exc: The HTTPException that was raised. | |
| Returns: | |
| JSONResponse with error details. | |
| """ | |
| log_exception(exc, request) | |
| return JSONResponse( | |
| status_code=exc.status_code, | |
| content={ | |
| "error": True, | |
| "message": exc.detail, | |
| "status_code": exc.status_code, | |
| "agent": "cain", | |
| "timestamp": datetime.utcnow().isoformat() + "+00:00", | |
| }, | |
| ) | |
| async def value_error_handler(request: Request, exc: ValueError) -> JSONResponse: | |
| """ | |
| FastAPI exception handler for ValueError. | |
| Args: | |
| request: The incoming request. | |
| exc: The ValueError that was raised. | |
| Returns: | |
| JSONResponse with error details. | |
| """ | |
| log_exception(exc, request) | |
| return JSONResponse( | |
| status_code=400, | |
| content={ | |
| "error": True, | |
| "message": "Invalid value provided", | |
| "detail": str(exc), | |
| "type": "ValueError", | |
| "agent": "cain", | |
| "timestamp": datetime.utcnow().isoformat() + "+00:00", | |
| }, | |
| ) | |
| async def generic_exception_handler(request: Request, exc: Exception) -> JSONResponse: | |
| """ | |
| FastAPI exception handler for generic exceptions. | |
| Args: | |
| request: The incoming request. | |
| exc: The Exception that was raised. | |
| Returns: | |
| JSONResponse with error details. | |
| """ | |
| log_exception(exc, request) | |
| return JSONResponse( | |
| status_code=500, | |
| content={ | |
| "error": True, | |
| "message": "Internal server error", | |
| "detail": str(exc), | |
| "type": type(exc).__name__, | |
| "agent": "cain", | |
| "timestamp": datetime.utcnow().isoformat() + "+00:00", | |
| }, | |
| ) | |
| # Configuration path - cain_status.json location | |
| def _get_base_dir() -> Path: | |
| """Determine the correct base directory dynamically.""" | |
| # Priority 1: OPENCLAW_DATA_DIR (set by Docker environment) | |
| data_dir = os.environ.get('OPENCLAW_DATA_DIR') | |
| if data_dir: | |
| return Path(data_dir) | |
| # Priority 2: /data (standard Docker volume mount) | |
| data_path = Path("/data") | |
| if data_path.exists(): | |
| return data_path | |
| # Priority 3: Use this script's directory as fallback | |
| return Path(os.path.dirname(__file__)) | |
| def _resolve_status_file() -> Path: | |
| """Find cain_status.json by searching common locations.""" | |
| # Priority 1: CAIN_STATUS_PATH env var (set by app.py from OPENCLAW_DATA_DIR) | |
| env_path = os.environ.get('CAIN_STATUS_PATH') | |
| if env_path: | |
| return Path(env_path) | |
| # Priority 2: CAIN_STATUS_FILE env var (legacy) | |
| env_path = os.environ.get('CAIN_STATUS_FILE') | |
| if env_path: | |
| return Path(env_path) | |
| # Priority 3: OPENCLAW_DATA_DIR + cain_status.json | |
| data_dir = os.environ.get('OPENCLAW_DATA_DIR') | |
| if data_dir: | |
| data_path = Path(data_dir) / "cain_status.json" | |
| if data_path.exists(): | |
| return data_path | |
| # Priority 4: /data/cain_status.json (Docker volume mount, default) | |
| data_path = Path("/data/cain_status.json") | |
| if data_path.exists(): | |
| return data_path | |
| # Priority 5: /app/.openclaw/agents/cain_status.json OR /app/openclaw/.openclaw/agents/cain_status.json | |
| _script_dir = Path(os.path.abspath(os.path.dirname(__file__))) | |
| _possible_status_paths = [ | |
| Path("/app/.openclaw/agents/cain_status.json"), # Flat structure | |
| Path("/app/openclaw/.openclaw/agents/cain_status.json"), # Nested structure | |
| _script_dir / ".openclaw" / "agents" / "cain_status.json", | |
| _script_dir / "openclaw" / ".openclaw" / "agents" / "cain_status.json", | |
| ] | |
| for openclaw_status in _possible_status_paths: | |
| if openclaw_status.exists(): | |
| return openclaw_status | |
| # Priority 6: /app/cain_status.json (legacy location) | |
| app_path = Path("/app/cain_status.json") | |
| if app_path.parent.exists(): | |
| return app_path | |
| # Priority 6.5: Memory directory paths (CRITICAL: can have stale 'unknown' errors) | |
| _script_dir = Path(os.path.abspath(os.path.dirname(__file__))) | |
| _memory_paths = [ | |
| Path("/app/memory/cain_status.json"), | |
| Path("/data/memory/cain_status.json"), | |
| _script_dir / "memory" / "cain_status.json", | |
| ] | |
| for mem_path in _memory_paths: | |
| if mem_path.exists(): | |
| return mem_path | |
| # Priority 7: Script directory as final fallback | |
| script_dir = Path(os.path.dirname(__file__)) | |
| return script_dir / "cain_status.json" | |
| STATUS_FILE = _resolve_status_file() | |
| def handle_status_file_read() -> Dict[str, Any]: | |
| """ | |
| Handle reading Cain's status file with specific exception handling. | |
| Returns: | |
| Status dictionary with current_state, last_updated, and agent fields. | |
| """ | |
| try: | |
| with open(str(STATUS_FILE), "r") as f: | |
| data = json.load(f) | |
| # CRITICAL: Clean stale "unknown" error strings immediately after reading | |
| # This prevents "Error: unknown" display issues | |
| error = data.get('error') | |
| if isinstance(error, str) and error.strip().lower() in ('unknown', 'none', 'null', ''): | |
| data['error'] = None | |
| data['_cleaned_at'] = 'handle_status_file_read' | |
| # Write back the cleaned data | |
| try: | |
| with open(str(STATUS_FILE), "w") as f: | |
| json.dump(data, f, indent=2) | |
| except Exception: | |
| pass # Don't fail if we can't write back | |
| return data | |
| except FileNotFoundError: | |
| # Status file not found is not an error - return healthy default | |
| return { | |
| "current_state": "idle", | |
| "stage": "STATUS_FILE_NOT_FOUND", | |
| "last_updated": datetime.utcnow().isoformat() + "+00:00", | |
| "agent": "cain", | |
| "error": None, | |
| "status_file": str(STATUS_FILE), | |
| "note": "Status file not found - using default" | |
| } | |
| except PermissionError as e: | |
| return { | |
| "current_state": "error", | |
| "stage": "PERMISSION_ERROR", | |
| "last_updated": datetime.utcnow().isoformat() + "+00:00", | |
| "agent": "cain", | |
| "error": f"Permission denied reading status file: {str(e)}", | |
| "status_file": str(STATUS_FILE) | |
| } | |
| except json.JSONDecodeError as e: | |
| return { | |
| "current_state": "error", | |
| "stage": "JSON_DECODE_ERROR", | |
| "last_updated": datetime.utcnow().isoformat() + "+00:00", | |
| "agent": "cain", | |
| "error": f"Invalid JSON in status file: {str(e)}", | |
| "status_file": str(STATUS_FILE) | |
| } | |
| def handle_brain_response(message: str) -> str: | |
| """ | |
| Handle brain processing with specific exception handling. | |
| Args: | |
| message: The user message to process. | |
| Returns: | |
| Response string from the brain or error message. | |
| Raises: | |
| ImportError: If brain module cannot be imported. | |
| AttributeError: If required brain methods are missing. | |
| """ | |
| try: | |
| # sys.path is already set up at module level (see top of file) | |
| # Import agents directly - no need to import openclaw first | |
| from agents import brain_minimal | |
| # Verify brain has required method before calling | |
| if not hasattr(brain_minimal, 'get_brain'): | |
| return "Brain interface error: get_brain method not found" | |
| brain = brain_minimal.get_brain(agent_name="cain", legacy_mode=True) | |
| result = brain._conversation_process(message) | |
| if result.get("success"): | |
| return result.get("response", f"Processed: {message}") | |
| else: | |
| return f"Error: {result.get('error', 'Unknown error')}" | |
| except ImportError as e: | |
| return f"Brain module import error: {str(e)}" | |
| except AttributeError as e: | |
| return f"Brain interface error: {str(e)}" | |
| except KeyError as e: | |
| return f"Brain response format error: missing key {str(e)}" | |
| except Exception as e: | |
| return f"Brain processing error: {type(e).__name__}: {str(e)}" | |
| async def handle_websocket_send(websocket, status_data: dict) -> bool: | |
| """ | |
| Handle websocket send with specific exception handling. | |
| Args: | |
| websocket: The WebSocket connection object. | |
| status_data: Status data to send. | |
| Returns: | |
| True if send succeeded, False if connection should close. | |
| """ | |
| try: | |
| await websocket.send_json({ | |
| "type": "heartbeat", | |
| "status": status_data | |
| }) | |
| return True | |
| except (ConnectionError, RuntimeError, Exception): | |
| return False | |
| def write_cain_status(status_data: Dict[str, Any]) -> bool: | |
| """ | |
| Write Cain's status to the status file. | |
| Args: | |
| status_data: Dictionary with status information to write. | |
| Returns: | |
| True if write succeeded, False otherwise. | |
| """ | |
| try: | |
| # Ensure directory exists | |
| STATUS_FILE.parent.mkdir(parents=True, exist_ok=True) | |
| # Add timestamp if not present | |
| if "last_updated" not in status_data: | |
| status_data["last_updated"] = datetime.utcnow().isoformat() + "+00:00" | |
| # Write to file | |
| with open(str(STATUS_FILE), "w") as f: | |
| json.dump(status_data, f, indent=2) | |
| return True | |
| except (PermissionError, OSError, json.JSONDecodeError) as e: | |
| print(f"Error writing status file: {e}") | |
| return False | |
| def register_error_handlers(app: FastAPI) -> None: | |
| """ | |
| Register exception handlers with the FastAPI app. | |
| This function-based registration avoids circular imports by not importing | |
| app at module level. Call this after creating the FastAPI app instance. | |
| Args: | |
| app: The FastAPI application instance. | |
| """ | |
| app.add_exception_handler(HTTPException, http_exception_handler) | |
| app.add_exception_handler(ValueError, value_error_handler) | |
| app.add_exception_handler(Exception, generic_exception_handler) | |