Spaces:
Running
Running
File size: 10,186 Bytes
f81adc4 1fe0b69 f81adc4 1fe0b69 a37d5b9 1fe0b69 a37d5b9 1fe0b69 f81adc4 5550d94 f81adc4 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 | from __future__ import annotations
"""FastAPI application factory and configuration."""
import logging
import os
import socket
# --- HUGGING FACE SPACES IPV6 FIX ---
# Force IPv4 only to prevent networking issues on broken IPv6 routes
old_getaddrinfo = socket.getaddrinfo
def new_getaddrinfo(*args, **kwargs):
responses = old_getaddrinfo(*args, **kwargs)
return [response for response in responses if response[0] == socket.AF_INET]
socket.getaddrinfo = new_getaddrinfo
# ------------------------------------
import asyncio
from contextlib import asynccontextmanager
from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse
from loguru import logger
from config.logging_config import configure_logging
from config.settings import get_settings
from providers.exceptions import ProviderError
from .dependencies import cleanup_provider
from .routes import router
# Opt-in to future behavior for python-telegram-bot
os.environ["PTB_TIMEDELTA"] = "1"
# Set tiktoken cache to /tmp on Vercel (read-only filesystem workaround)
if os.environ.get("VERCEL"):
os.environ["TIKTOKEN_CACHE_DIR"] = "/tmp"
# Configure logging first (before any module logs)
_settings = get_settings()
configure_logging(_settings.log_file)
_SHUTDOWN_TIMEOUT_S = 5.0
async def _best_effort(
name: str, awaitable, timeout_s: float = _SHUTDOWN_TIMEOUT_S
) -> None:
"""Run a shutdown step with timeout; never raise to callers."""
try:
await asyncio.wait_for(awaitable, timeout=timeout_s)
except TimeoutError:
logger.warning(f"Shutdown step timed out: {name} ({timeout_s}s)")
except Exception as e:
logger.warning(f"Shutdown step failed: {name}: {type(e).__name__}: {e}")
def _warn_if_process_auth_token(settings) -> None:
"""Warn when server auth was implicitly inherited from the shell."""
uses_process_token = getattr(settings, "uses_process_anthropic_auth_token", None)
if callable(uses_process_token) and uses_process_token():
logger.warning(
"ANTHROPIC_AUTH_TOKEN is set in the process environment but not in "
"a configured .env file. The proxy will require that token. Add "
"ANTHROPIC_AUTH_TOKEN= to .env to disable proxy auth, or set the "
"same token in .env to make server auth explicit."
)
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Application lifespan manager."""
settings = get_settings()
# On Vercel, we still initialize the platform but skip polling (handled in platform start)
if os.environ.get("VERCEL"):
logger.info("Running on Vercel: Initializing platform for webhook support.")
logger.info("Starting Claude Code Proxy...")
_warn_if_process_auth_token(settings)
# Initialize messaging platform if configured
messaging_platform = None
message_handler = None
cli_manager = None
try:
# Use the messaging factory to create the right platform
from messaging.platforms.factory import create_messaging_platform
logger.info(f"Platform type: {settings.messaging_platform}")
token_preview = f"{settings.telegram_bot_token[:5]}...{settings.telegram_bot_token[-5:]}" if settings.telegram_bot_token else "MISSING"
logger.info(f"Telegram Token: {token_preview}")
messaging_platform = create_messaging_platform(
platform_type=settings.messaging_platform,
bot_token=settings.telegram_bot_token,
allowed_user_id=settings.allowed_telegram_user_id,
discord_bot_token=settings.discord_bot_token,
allowed_discord_channels=settings.allowed_discord_channels,
)
if messaging_platform:
from cli.manager import CLISessionManager
from messaging.handler import ClaudeMessageHandler
from messaging.session import SessionStore
# Setup workspace - CLI runs in allowed_dir if set (e.g. project root)
workspace = (
os.path.abspath(settings.allowed_dir)
if settings.allowed_dir
else os.getcwd()
)
# Use /tmp on Vercel for any writeable operations
if os.environ.get("VERCEL"):
workspace = "/tmp"
logger.info(f"Vercel: Using {workspace} as workspace")
else:
os.makedirs(workspace, exist_ok=True)
# Session data stored in agent_workspace
data_path = (
"/tmp/agent_workspace"
if os.environ.get("VERCEL")
else os.path.abspath(settings.claude_workspace)
)
os.makedirs(data_path, exist_ok=True)
# Hugging Face Spaces typically use port 7860, and our Dockerfile forces 7860.
actual_port = 7860 if os.environ.get("SPACE_ID") else settings.port
api_url = f"http://{settings.host}:{actual_port}/v1"
allowed_dirs = [workspace] if settings.allowed_dir else []
plans_dir_abs = os.path.abspath(
os.path.join(settings.claude_workspace, "plans")
)
plans_directory = os.path.relpath(plans_dir_abs, workspace)
cli_manager = CLISessionManager(
workspace_path=workspace,
api_url=api_url,
allowed_dirs=allowed_dirs,
plans_directory=plans_directory,
)
# Initialize session store
session_store = SessionStore(
storage_path=os.path.join(data_path, "sessions.json")
)
# Create and register message handler
message_handler = ClaudeMessageHandler(
platform=messaging_platform,
cli_manager=cli_manager,
session_store=session_store,
)
# Restore tree state if available
saved_trees = session_store.get_all_trees()
if saved_trees:
logger.info(f"Restoring {len(saved_trees)} conversation trees...")
from messaging.trees.queue_manager import TreeQueueManager
message_handler.replace_tree_queue(
TreeQueueManager.from_dict(
{
"trees": saved_trees,
"node_to_tree": session_store.get_node_mapping(),
},
queue_update_callback=message_handler.update_queue_positions,
node_started_callback=message_handler.mark_node_processing,
)
)
# Reconcile restored state - anything PENDING/IN_PROGRESS is lost across restart
if message_handler.tree_queue.cleanup_stale_nodes() > 0:
# Sync back and save
tree_data = message_handler.tree_queue.to_dict()
session_store.sync_from_tree_data(
tree_data["trees"], tree_data["node_to_tree"]
)
# Wire up the handler
messaging_platform.on_message(message_handler.handle_message)
# Start the platform
await messaging_platform.start()
logger.info(
f"{messaging_platform.name} platform started with message handler"
)
except ImportError as e:
logger.warning(f"Messaging module import error: {e}")
except Exception as e:
logger.error(f"Failed to start messaging platform: {e}")
import traceback
logger.error(traceback.format_exc())
# Store in app state for access in routes
app.state.messaging_platform = messaging_platform
app.state.message_handler = message_handler
app.state.cli_manager = cli_manager
yield
# Cleanup
if message_handler and hasattr(message_handler, "session_store"):
try:
message_handler.session_store.flush_pending_save()
except Exception as e:
logger.warning(f"Session store flush on shutdown: {e}")
logger.info("Shutdown requested, cleaning up...")
if messaging_platform:
await _best_effort("messaging_platform.stop", messaging_platform.stop())
if cli_manager:
await _best_effort("cli_manager.stop_all", cli_manager.stop_all())
await _best_effort("cleanup_provider", cleanup_provider())
# Ensure background limiter worker doesn't keep the loop alive.
try:
from messaging.limiter import MessagingRateLimiter
await _best_effort(
"MessagingRateLimiter.shutdown_instance",
MessagingRateLimiter.shutdown_instance(),
timeout_s=2.0,
)
except Exception:
# Limiter may never have been imported/initialized.
pass
logger.info("Server shut down cleanly")
def create_app() -> FastAPI:
"""Create and configure the FastAPI application."""
app = FastAPI(
title="Claude Code Proxy",
version="2.0.0",
lifespan=lifespan,
)
# Register routes
app.include_router(router)
# Exception handlers
@app.exception_handler(ProviderError)
async def provider_error_handler(request: Request, exc: ProviderError):
"""Handle provider-specific errors and return Anthropic format."""
logger.error(f"Provider Error: {exc.error_type} - {exc.message}")
return JSONResponse(
status_code=exc.status_code,
content=exc.to_anthropic_format(),
)
@app.exception_handler(Exception)
async def general_error_handler(request: Request, exc: Exception):
"""Handle general errors and return Anthropic format."""
logger.error(f"General Error: {exc!s}")
import traceback
logger.error(traceback.format_exc())
return JSONResponse(
status_code=500,
content={
"type": "error",
"error": {
"type": "api_error",
"message": "An unexpected error occurred.",
},
},
)
return app
# Default app instance for uvicorn
app = create_app()
|