from __future__ import annotations """FastAPI application factory and configuration.""" import logging import os import socket # --- HUGGING FACE SPACES IPV6 FIX --- # Force IPv4 only to prevent networking issues on broken IPv6 routes old_getaddrinfo = socket.getaddrinfo def new_getaddrinfo(*args, **kwargs): responses = old_getaddrinfo(*args, **kwargs) return [response for response in responses if response[0] == socket.AF_INET] socket.getaddrinfo = new_getaddrinfo # ------------------------------------ import asyncio from contextlib import asynccontextmanager from fastapi import FastAPI, Request from fastapi.responses import JSONResponse from loguru import logger from config.logging_config import configure_logging from config.settings import get_settings from providers.exceptions import ProviderError from .dependencies import cleanup_provider from .routes import router # Opt-in to future behavior for python-telegram-bot os.environ["PTB_TIMEDELTA"] = "1" # Set tiktoken cache to /tmp on Vercel (read-only filesystem workaround) if os.environ.get("VERCEL"): os.environ["TIKTOKEN_CACHE_DIR"] = "/tmp" # Configure logging first (before any module logs) _settings = get_settings() configure_logging(_settings.log_file) _SHUTDOWN_TIMEOUT_S = 5.0 async def _best_effort( name: str, awaitable, timeout_s: float = _SHUTDOWN_TIMEOUT_S ) -> None: """Run a shutdown step with timeout; never raise to callers.""" try: await asyncio.wait_for(awaitable, timeout=timeout_s) except TimeoutError: logger.warning(f"Shutdown step timed out: {name} ({timeout_s}s)") except Exception as e: logger.warning(f"Shutdown step failed: {name}: {type(e).__name__}: {e}") def _warn_if_process_auth_token(settings) -> None: """Warn when server auth was implicitly inherited from the shell.""" uses_process_token = getattr(settings, "uses_process_anthropic_auth_token", None) if callable(uses_process_token) and uses_process_token(): logger.warning( "ANTHROPIC_AUTH_TOKEN is set in the process environment but not in " "a configured .env file. The proxy will require that token. Add " "ANTHROPIC_AUTH_TOKEN= to .env to disable proxy auth, or set the " "same token in .env to make server auth explicit." ) @asynccontextmanager async def lifespan(app: FastAPI): """Application lifespan manager.""" settings = get_settings() # On Vercel, we still initialize the platform but skip polling (handled in platform start) if os.environ.get("VERCEL"): logger.info("Running on Vercel: Initializing platform for webhook support.") logger.info("Starting Claude Code Proxy...") _warn_if_process_auth_token(settings) # Initialize messaging platform if configured messaging_platform = None message_handler = None cli_manager = None try: # Use the messaging factory to create the right platform from messaging.platforms.factory import create_messaging_platform logger.info(f"Platform type: {settings.messaging_platform}") token_preview = f"{settings.telegram_bot_token[:5]}...{settings.telegram_bot_token[-5:]}" if settings.telegram_bot_token else "MISSING" logger.info(f"Telegram Token: {token_preview}") messaging_platform = create_messaging_platform( platform_type=settings.messaging_platform, bot_token=settings.telegram_bot_token, allowed_user_id=settings.allowed_telegram_user_id, discord_bot_token=settings.discord_bot_token, allowed_discord_channels=settings.allowed_discord_channels, ) if messaging_platform: from cli.manager import CLISessionManager from messaging.handler import ClaudeMessageHandler from messaging.session import SessionStore # Setup workspace - CLI runs in allowed_dir if set (e.g. project root) workspace = ( os.path.abspath(settings.allowed_dir) if settings.allowed_dir else os.getcwd() ) # Use /tmp on Vercel for any writeable operations if os.environ.get("VERCEL"): workspace = "/tmp" logger.info(f"Vercel: Using {workspace} as workspace") else: os.makedirs(workspace, exist_ok=True) # Session data stored in agent_workspace data_path = ( "/tmp/agent_workspace" if os.environ.get("VERCEL") else os.path.abspath(settings.claude_workspace) ) os.makedirs(data_path, exist_ok=True) # Hugging Face Spaces typically use port 7860, and our Dockerfile forces 7860. actual_port = 7860 if os.environ.get("SPACE_ID") else settings.port api_url = f"http://{settings.host}:{actual_port}/v1" allowed_dirs = [workspace] if settings.allowed_dir else [] plans_dir_abs = os.path.abspath( os.path.join(settings.claude_workspace, "plans") ) plans_directory = os.path.relpath(plans_dir_abs, workspace) cli_manager = CLISessionManager( workspace_path=workspace, api_url=api_url, allowed_dirs=allowed_dirs, plans_directory=plans_directory, ) # Initialize session store session_store = SessionStore( storage_path=os.path.join(data_path, "sessions.json") ) # Create and register message handler message_handler = ClaudeMessageHandler( platform=messaging_platform, cli_manager=cli_manager, session_store=session_store, ) # Restore tree state if available saved_trees = session_store.get_all_trees() if saved_trees: logger.info(f"Restoring {len(saved_trees)} conversation trees...") from messaging.trees.queue_manager import TreeQueueManager message_handler.replace_tree_queue( TreeQueueManager.from_dict( { "trees": saved_trees, "node_to_tree": session_store.get_node_mapping(), }, queue_update_callback=message_handler.update_queue_positions, node_started_callback=message_handler.mark_node_processing, ) ) # Reconcile restored state - anything PENDING/IN_PROGRESS is lost across restart if message_handler.tree_queue.cleanup_stale_nodes() > 0: # Sync back and save tree_data = message_handler.tree_queue.to_dict() session_store.sync_from_tree_data( tree_data["trees"], tree_data["node_to_tree"] ) # Wire up the handler messaging_platform.on_message(message_handler.handle_message) # Start the platform await messaging_platform.start() logger.info( f"{messaging_platform.name} platform started with message handler" ) except ImportError as e: logger.warning(f"Messaging module import error: {e}") except Exception as e: logger.error(f"Failed to start messaging platform: {e}") import traceback logger.error(traceback.format_exc()) # Store in app state for access in routes app.state.messaging_platform = messaging_platform app.state.message_handler = message_handler app.state.cli_manager = cli_manager yield # Cleanup if message_handler and hasattr(message_handler, "session_store"): try: message_handler.session_store.flush_pending_save() except Exception as e: logger.warning(f"Session store flush on shutdown: {e}") logger.info("Shutdown requested, cleaning up...") if messaging_platform: await _best_effort("messaging_platform.stop", messaging_platform.stop()) if cli_manager: await _best_effort("cli_manager.stop_all", cli_manager.stop_all()) await _best_effort("cleanup_provider", cleanup_provider()) # Ensure background limiter worker doesn't keep the loop alive. try: from messaging.limiter import MessagingRateLimiter await _best_effort( "MessagingRateLimiter.shutdown_instance", MessagingRateLimiter.shutdown_instance(), timeout_s=2.0, ) except Exception: # Limiter may never have been imported/initialized. pass logger.info("Server shut down cleanly") def create_app() -> FastAPI: """Create and configure the FastAPI application.""" app = FastAPI( title="Claude Code Proxy", version="2.0.0", lifespan=lifespan, ) # Register routes app.include_router(router) # Exception handlers @app.exception_handler(ProviderError) async def provider_error_handler(request: Request, exc: ProviderError): """Handle provider-specific errors and return Anthropic format.""" logger.error(f"Provider Error: {exc.error_type} - {exc.message}") return JSONResponse( status_code=exc.status_code, content=exc.to_anthropic_format(), ) @app.exception_handler(Exception) async def general_error_handler(request: Request, exc: Exception): """Handle general errors and return Anthropic format.""" logger.error(f"General Error: {exc!s}") import traceback logger.error(traceback.format_exc()) return JSONResponse( status_code=500, content={ "type": "error", "error": { "type": "api_error", "message": "An unexpected error occurred.", }, }, ) return app # Default app instance for uvicorn app = create_app()