Yash030 Claude Opus 4.7 commited on
Commit
f56589d
·
1 Parent(s): b5bd2a8

Track sessions by gateway client IP

Browse files

- Add _get_client_ip() to detect gateway/proxy requests
- Track sessions only when X-Forwarded-For, X-Real-IP, X-Client-IP, or Via headers present
- Each unique client IP behind a gateway = one session
- Direct connections are not tracked

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>

Files changed (3) hide show
  1. .claude/settings.local.json +2 -1
  2. api/routes.py +2 -1
  3. api/services.py +49 -18
.claude/settings.local.json CHANGED
@@ -7,7 +7,8 @@
7
  "Bash(.\\\\.venv\\\\Scripts\\\\python -m uvicorn server:app --host 0.0.0.0 --port 8082)",
8
  "Bash(git add *)",
9
  "Bash(git commit -m ' *)",
10
- "Bash(git push *)"
 
11
  ]
12
  },
13
  "enableAllProjectMcpServers": true,
 
7
  "Bash(.\\\\.venv\\\\Scripts\\\\python -m uvicorn server:app --host 0.0.0.0 --port 8082)",
8
  "Bash(git add *)",
9
  "Bash(git commit -m ' *)",
10
+ "Bash(git push *)",
11
+ "Bash(python -c \"import ast; ast.parse\\(open\\('api/services.py'\\).read\\(\\)\\); print\\('Syntax OK'\\)\")"
12
  ]
13
  },
14
  "enableAllProjectMcpServers": true,
api/routes.py CHANGED
@@ -166,12 +166,13 @@ def _build_models_list_response(
166
  # =============================================================================
167
  @router.post("/v1/messages")
168
  async def create_message(
 
169
  request_data: MessagesRequest,
170
  service: ClaudeProxyService = Depends(get_proxy_service),
171
  _auth=Depends(require_api_key),
172
  ):
173
  """Create a message (always streaming)."""
174
- return service.create_message(request_data)
175
 
176
 
177
  @router.api_route("/v1/messages", methods=["HEAD", "OPTIONS"])
 
166
  # =============================================================================
167
  @router.post("/v1/messages")
168
  async def create_message(
169
+ request: Request,
170
  request_data: MessagesRequest,
171
  service: ClaudeProxyService = Depends(get_proxy_service),
172
  _auth=Depends(require_api_key),
173
  ):
174
  """Create a message (always streaming)."""
175
+ return service.create_message(request, request_data)
176
 
177
 
178
  @router.api_route("/v1/messages", methods=["HEAD", "OPTIONS"])
api/services.py CHANGED
@@ -7,7 +7,7 @@ import uuid
7
  from collections.abc import AsyncIterator, Callable
8
  from typing import Any
9
 
10
- from fastapi import HTTPException
11
  from fastapi.responses import StreamingResponse
12
  from loguru import logger
13
 
@@ -23,7 +23,7 @@ from providers.exceptions import (
23
  RateLimitError,
24
  )
25
 
26
- from .model_router import ModelRouter
27
  from .models.anthropic import MessagesRequest, TokenCountRequest
28
  from .models.responses import TokenCountResponse
29
  from .optimization_handlers import try_optimizations
@@ -88,8 +88,26 @@ def _require_non_empty_messages(messages: list[Any]) -> None:
88
  raise InvalidRequestError("messages cannot be empty")
89
 
90
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
91
  class ClaudeProxyService:
92
- """Coordinate request optimization, model routing, token count, and providers."""
93
 
94
  def __init__(
95
  self,
@@ -104,32 +122,35 @@ class ClaudeProxyService:
104
  self._token_counter = token_counter
105
  self._session_tracker = SessionTracker.get_instance()
106
 
107
- def _get_session_id(self, request_data: MessagesRequest) -> str:
108
- """Extract or generate a session ID from the request."""
109
- # Try to extract session ID from messages metadata or generate one
110
- # This allows multiple Claude Code instances to share the proxy fairly
111
- if hasattr(request_data, 'custom_id'):
112
- return str(request_data.custom_id)
113
- return f"session_{uuid.uuid4().hex[:12]}"
114
 
115
- def create_message(self, request_data: MessagesRequest) -> object:
 
 
 
116
  """Create a message response or streaming response with optional failover."""
117
- from .web_tools.streaming import stream_web_server_tool_response
118
  try:
119
  _require_non_empty_messages(request_data.messages)
120
 
121
  candidates = self._model_router.resolve_candidates(request_data.model)
122
  if not candidates:
123
- raise InvalidRequestError(f"No configured models available for '{request_data.model}'")
 
 
124
 
125
  # For 'auto' requests with multiple candidates, we wrap the stream in a failover loop.
126
  if len(candidates) > 1:
127
  return anthropic_sse_streaming_response(
128
- self._stream_with_fallbacks(candidates, request_data)
129
  )
130
 
131
  # Standard path for single-model requests
132
- return self._create_single_message(candidates[0], request_data)
133
 
134
  except ProviderError:
135
  raise
@@ -143,7 +164,7 @@ class ClaudeProxyService:
143
  ) from e
144
 
145
  def _create_single_message(
146
- self, resolved: ResolvedModel, request_data: MessagesRequest
147
  ) -> object:
148
  """Create a single message response from a resolved model."""
149
  routed_request = request_data.model_copy(deep=True)
@@ -160,6 +181,8 @@ class ClaudeProxyService:
160
  if self._settings.enable_web_server_tools and is_web_server_tool_request(
161
  routed_request
162
  ):
 
 
163
  input_tokens = self._token_counter(
164
  routed_request.messages, routed_request.system, routed_request.tools
165
  )
@@ -187,7 +210,7 @@ class ClaudeProxyService:
187
  thinking_enabled=resolved.thinking_enabled,
188
  )
189
 
190
- session_id = self._get_session_id(request_data)
191
  self._session_tracker.track_request_sync(session_id, resolved.provider_id)
192
 
193
  request_id = f"req_{uuid.uuid4().hex[:12]}"
@@ -211,7 +234,10 @@ class ClaudeProxyService:
211
  )
212
 
213
  async def _stream_with_fallbacks(
214
- self, candidates: list[ResolvedModel], request_data: MessagesRequest
 
 
 
215
  ) -> AsyncIterator[str]:
216
  """Iterate through candidates until one succeeds or all fail."""
217
  last_exc: Exception | None = None
@@ -227,6 +253,11 @@ class ClaudeProxyService:
227
  thinking_enabled=resolved.thinking_enabled,
228
  )
229
 
 
 
 
 
 
230
  request_id = f"req_{uuid.uuid4().hex[:12]}"
231
  logger.info(
232
  "API_REQUEST (auto fallback {}/{}): request_id={} provider={} model={}",
 
7
  from collections.abc import AsyncIterator, Callable
8
  from typing import Any
9
 
10
+ from fastapi import HTTPException, Request
11
  from fastapi.responses import StreamingResponse
12
  from loguru import logger
13
 
 
23
  RateLimitError,
24
  )
25
 
26
+ from .model_router import ModelRouter, ResolvedModel
27
  from .models.anthropic import MessagesRequest, TokenCountRequest
28
  from .models.responses import TokenCountResponse
29
  from .optimization_handlers import try_optimizations
 
88
  raise InvalidRequestError("messages cannot be empty")
89
 
90
 
91
+ def _get_client_ip(request: Request) -> str | None:
92
+ """Extract client IP from gateway headers or return None for direct connections."""
93
+ # Check for proxy/gateway headers
94
+ forwarded = request.headers.get("X-Forwarded-For")
95
+ if forwarded:
96
+ return forwarded.split(",")[0].strip()
97
+ real_ip = request.headers.get("X-Real-IP")
98
+ if real_ip:
99
+ return real_ip
100
+ client_ip = request.headers.get("X-Client-IP")
101
+ if client_ip:
102
+ return client_ip
103
+ via = request.headers.get("Via")
104
+ if via:
105
+ return request.client.host # Gateway/proxy IP
106
+ return None # Direct connection
107
+
108
+
109
  class ClaudeProxyService:
110
+ """Coordinate request optimization, model routing, and providers."""
111
 
112
  def __init__(
113
  self,
 
122
  self._token_counter = token_counter
123
  self._session_tracker = SessionTracker.get_instance()
124
 
125
+ def _get_session_id(self, request: Request, request_data: MessagesRequest) -> str:
126
+ """Extract or generate a session ID for gateway clients only."""
127
+ # Check if request came through a gateway/proxy
128
+ ip = _get_client_ip(request)
129
+ if ip is None:
130
+ return "direct" # Don't track direct connections
 
131
 
132
+ # Use gateway client IP as session identifier
133
+ return f"gateway_{ip}"
134
+
135
+ def create_message(self, request: Request, request_data: MessagesRequest) -> object:
136
  """Create a message response or streaming response with optional failover."""
 
137
  try:
138
  _require_non_empty_messages(request_data.messages)
139
 
140
  candidates = self._model_router.resolve_candidates(request_data.model)
141
  if not candidates:
142
+ raise InvalidRequestError(
143
+ f"No configured models available for '{request_data.model}'"
144
+ )
145
 
146
  # For 'auto' requests with multiple candidates, we wrap the stream in a failover loop.
147
  if len(candidates) > 1:
148
  return anthropic_sse_streaming_response(
149
+ self._stream_with_fallbacks(request, candidates, request_data)
150
  )
151
 
152
  # Standard path for single-model requests
153
+ return self._create_single_message(request, candidates[0], request_data)
154
 
155
  except ProviderError:
156
  raise
 
164
  ) from e
165
 
166
  def _create_single_message(
167
+ self, request: Request, resolved: ResolvedModel, request_data: MessagesRequest
168
  ) -> object:
169
  """Create a single message response from a resolved model."""
170
  routed_request = request_data.model_copy(deep=True)
 
181
  if self._settings.enable_web_server_tools and is_web_server_tool_request(
182
  routed_request
183
  ):
184
+ from .web_tools.streaming import stream_web_server_tool_response
185
+
186
  input_tokens = self._token_counter(
187
  routed_request.messages, routed_request.system, routed_request.tools
188
  )
 
210
  thinking_enabled=resolved.thinking_enabled,
211
  )
212
 
213
+ session_id = self._get_session_id(request, request_data)
214
  self._session_tracker.track_request_sync(session_id, resolved.provider_id)
215
 
216
  request_id = f"req_{uuid.uuid4().hex[:12]}"
 
234
  )
235
 
236
  async def _stream_with_fallbacks(
237
+ self,
238
+ request: Request,
239
+ candidates: list[ResolvedModel],
240
+ request_data: MessagesRequest,
241
  ) -> AsyncIterator[str]:
242
  """Iterate through candidates until one succeeds or all fail."""
243
  last_exc: Exception | None = None
 
253
  thinking_enabled=resolved.thinking_enabled,
254
  )
255
 
256
+ session_id = self._get_session_id(request, request_data)
257
+ self._session_tracker.track_request_sync(
258
+ session_id, resolved.provider_id
259
+ )
260
+
261
  request_id = f"req_{uuid.uuid4().hex[:12]}"
262
  logger.info(
263
  "API_REQUEST (auto fallback {}/{}): request_id={} provider={} model={}",