R.C.M. commited on
Commit
b0d7359
·
1 Parent(s): e5c92d9

Fix circular

Browse files
Files changed (3) hide show
  1. app.py +12 -14
  2. helper/misc.py +0 -158
  3. helper/ratelimit.py +265 -6
app.py CHANGED
@@ -31,20 +31,6 @@ from helper.assets import (
31
  is_base64_image,
32
  asset_router,
33
  )
34
- from helper.misc import (
35
- extract_user_text,
36
- calculate_messages_size,
37
- is_long_context,
38
- contains_code,
39
- is_math_heavy,
40
- is_structured_task,
41
- multiple_questions,
42
- is_code_heavy,
43
- normalize_prompt_value,
44
- enforce_prompt_size,
45
- resolve_bound_subject,
46
- get_usage_snapshot_for_subject,
47
- )
48
 
49
  from helper.ratelimit import (
50
  enforce_rate_limit,
@@ -58,6 +44,18 @@ from helper.ratelimit import (
58
  MAX_GROQ_PROMPT_CHARS,
59
  MAX_MEDIA_PROMPT_BYTES,
60
  MAX_MEDIA_PROMPT_CHARS,
 
 
 
 
 
 
 
 
 
 
 
 
61
  )
62
 
63
  app = FastAPI()
 
31
  is_base64_image,
32
  asset_router,
33
  )
 
 
 
 
 
 
 
 
 
 
 
 
 
 
34
 
35
  from helper.ratelimit import (
36
  enforce_rate_limit,
 
44
  MAX_GROQ_PROMPT_CHARS,
45
  MAX_MEDIA_PROMPT_BYTES,
46
  MAX_MEDIA_PROMPT_CHARS,
47
+ extract_user_text,
48
+ calculate_messages_size,
49
+ is_long_context,
50
+ contains_code,
51
+ is_math_heavy,
52
+ is_structured_task,
53
+ multiple_questions,
54
+ is_code_heavy,
55
+ normalize_prompt_value,
56
+ enforce_prompt_size,
57
+ resolve_bound_subject,
58
+ get_usage_snapshot_for_subject,
59
  )
60
 
61
  app = FastAPI()
helper/misc.py DELETED
@@ -1,158 +0,0 @@
1
- import time
2
- import asyncio
3
-
4
- from helper.ratelimit import CLIENT_BIND_TTL_SECONDS, MAX_CLIENT_ID_LENGTH, client_subject_bindings
5
- from helper.subscriptions import USAGE_PERIODS, usage_locks, usage_store, TIER_CONFIG
6
- from fastapi import Request, HTTPException
7
- from typing import Optional, Dict, Any, List
8
- import re
9
- import hashlib
10
-
11
- def extract_user_text(messages: list) -> str:
12
- return " ".join(
13
- message_content_to_text(m.get("content"))
14
- for m in messages
15
- if isinstance(m, dict) and m.get("role") == "user"
16
- ).lower()
17
-
18
- def get_usage_period_key(metric: str) -> str:
19
- now = time.gmtime()
20
- period = USAGE_PERIODS.get(metric, "daily")
21
- if period == "weekly":
22
- iso_year, iso_week, _ = time.strftime("%G %V %u", now).split(" ")
23
- return f"{iso_year}-W{iso_week}"
24
- return time.strftime("%Y-%m-%d", now)
25
-
26
-
27
- def sanitize_client_id(raw_client_id: Optional[str]) -> Optional[str]:
28
- if not isinstance(raw_client_id, str):
29
- return None
30
- trimmed = raw_client_id.strip()
31
- if not trimmed or len(trimmed) > MAX_CLIENT_ID_LENGTH:
32
- return None
33
- if not re.match(r"^[A-Za-z0-9._:-]+$", trimmed):
34
- return None
35
- return trimmed
36
-
37
-
38
- def get_usage_lock(metric: str, subject: str) -> asyncio.Lock:
39
- metric_locks = usage_locks.get(metric)
40
- if metric_locks is None:
41
- metric_locks = {}
42
- usage_locks[metric] = metric_locks
43
- lock = metric_locks.get(subject)
44
- if lock is None:
45
- lock = asyncio.Lock()
46
- metric_locks[subject] = lock
47
- return lock
48
-
49
-
50
- def build_default_subject(request: Request, client_id: Optional[str]) -> str:
51
- if client_id:
52
- client_hash = hashlib.sha256(client_id.encode("utf-8")).hexdigest()[:24]
53
- return f"client:{client_hash}"
54
- host = request.client.host if request.client else "unknown"
55
- user_agent = request.headers.get("user-agent", "")
56
- ua_hash = (
57
- hashlib.sha256(user_agent.encode("utf-8")).hexdigest()[:12]
58
- if user_agent
59
- else "noua"
60
- )
61
- return f"anon:{host}:{ua_hash}"
62
-
63
-
64
- def bind_client_subject(client_id: Optional[str], subject: str, plan_key: str):
65
- if not client_id:
66
- return
67
- client_subject_bindings[client_id] = {
68
- "subject": subject,
69
- "plan_key": plan_key,
70
- "expires_at": time.time() + CLIENT_BIND_TTL_SECONDS,
71
- }
72
-
73
-
74
- def resolve_bound_subject(client_id: Optional[str], fallback_subject: str) -> str:
75
- if not client_id:
76
- return fallback_subject
77
- bound = client_subject_bindings.get(client_id)
78
- if not bound:
79
- return fallback_subject
80
- if bound.get("expires_at", 0) <= time.time():
81
- client_subject_bindings.pop(client_id, None)
82
- return fallback_subject
83
- return bound.get("subject", fallback_subject)
84
-
85
-
86
- def normalize_prompt_value(prompt: Optional[str], field_name: str = "prompt") -> str:
87
- if not isinstance(prompt, str):
88
- raise HTTPException(status_code=400, detail=f"{field_name} is required")
89
- normalized = prompt.strip()
90
- if not normalized:
91
- raise HTTPException(status_code=400, detail=f"{field_name} is required")
92
- return normalized
93
-
94
-
95
- def enforce_prompt_size(prompt: str, max_chars: int, max_bytes: int, context: str):
96
- char_len = len(prompt)
97
- byte_len = len(prompt.encode("utf-8"))
98
- if char_len > max_chars or byte_len > max_bytes:
99
- raise HTTPException(
100
- status_code=413,
101
- detail=(
102
- f"{context} is too large ({char_len} chars, {byte_len} bytes). "
103
- f"Max allowed is {max_chars} chars or {max_bytes} bytes."
104
- ),
105
- )
106
-
107
-
108
- def message_content_to_text(content: Any) -> str:
109
- if isinstance(content, str):
110
- return content
111
- if isinstance(content, list):
112
- parts: List[str] = []
113
- for item in content:
114
- if isinstance(item, str):
115
- parts.append(item)
116
- continue
117
- if isinstance(item, dict):
118
- text = item.get("text")
119
- if isinstance(text, str):
120
- parts.append(text)
121
- return " ".join(parts)
122
- return ""
123
-
124
-
125
- def calculate_messages_size(messages: list) -> tuple[int, int]:
126
- total_chars = 0
127
- total_bytes = 0
128
- for message in messages:
129
- if not isinstance(message, dict):
130
- continue
131
- text = message_content_to_text(message.get("content"))
132
- if not text:
133
- continue
134
- total_chars += len(text)
135
- total_bytes += len(text.encode("utf-8"))
136
- return total_chars, total_bytes
137
-
138
-
139
- def get_usage_snapshot_for_subject(plan_key: str, subject: str) -> Dict[str, Dict[str, Any]]:
140
- plan = TIER_CONFIG.get(plan_key) or TIER_CONFIG["free"]
141
- plan_limits = plan.get("limits", {})
142
- snapshot: Dict[str, Dict[str, Any]] = {}
143
- for metric in usage_store.keys():
144
- limit = plan_limits.get(metric)
145
- window_key = get_usage_period_key(metric)
146
- entry = usage_store[metric].get(subject)
147
- used = 0
148
- if entry and entry.get("window") == window_key:
149
- used = max(0, int(entry.get("count", 0)))
150
- remaining = None if limit is None else max(0, int(limit) - used)
151
- snapshot[metric] = {
152
- "limit": limit,
153
- "used": used,
154
- "remaining": remaining,
155
- "window": window_key,
156
- "period": USAGE_PERIODS.get(metric, "daily"),
157
- }
158
- return snapshot
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
helper/ratelimit.py CHANGED
@@ -1,32 +1,67 @@
 
 
 
 
1
  import time
2
- from typing import Optional, Dict
 
3
  from fastapi import HTTPException, Request
4
- from helper.misc import sanitize_client_id, get_usage_lock, get_usage_period_key, build_default_subject, bind_client_subject, resolve_bound_subject
5
- from helper.subscriptions import fetch_subscription, usage_store, normalize_plan_key, TIER_CONFIG
6
- import os
 
 
 
 
 
 
 
 
 
 
 
 
7
  IDENTITY_CACHE_TTL_SECONDS = 60
8
- identity_cache = {}
9
  CLIENT_BIND_TTL_SECONDS = int(
10
  os.getenv("CLIENT_BIND_TTL_SECONDS", str(8 * 24 * 60 * 60))
11
  )
 
12
  MAX_CLIENT_ID_LENGTH = 128
13
- client_subject_bindings = {}
14
 
15
  MAX_CHAT_PROMPT_CHARS = int(os.getenv("MAX_CHAT_PROMPT_CHARS", "120000"))
16
  MAX_CHAT_PROMPT_BYTES = int(os.getenv("MAX_CHAT_PROMPT_BYTES", "500000"))
 
17
  MAX_GROQ_PROMPT_CHARS = int(os.getenv("MAX_GROQ_PROMPT_CHARS", "90000"))
18
  MAX_GROQ_PROMPT_BYTES = int(os.getenv("MAX_GROQ_PROMPT_BYTES", "350000"))
 
19
  MAX_MEDIA_PROMPT_CHARS = int(os.getenv("MAX_MEDIA_PROMPT_CHARS", "4000"))
20
  MAX_MEDIA_PROMPT_BYTES = int(os.getenv("MAX_MEDIA_PROMPT_BYTES", "16000"))
21
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
22
  async def resolve_rate_limit_identity(
23
  request: Request,
24
  authorization: Optional[str],
25
  client_id: Optional[str] = None,
26
  ) -> tuple[str, str]:
27
  now = time.time()
 
28
  normalized_client_id = sanitize_client_id(client_id)
29
  default_subject = build_default_subject(request, normalized_client_id)
 
30
  if not authorization or not authorization.startswith("Bearer "):
31
  return "free", resolve_bound_subject(normalized_client_id, default_subject)
32
 
@@ -38,6 +73,7 @@ async def resolve_rate_limit_identity(
38
  if cached and cached.get("expires_at", 0) > now:
39
  plan_key = cached.get("plan_key", "free")
40
  subject = cached.get("subject", default_subject)
 
41
  bind_client_subject(normalized_client_id, subject, plan_key)
42
  return plan_key, subject
43
 
@@ -56,12 +92,15 @@ async def resolve_rate_limit_identity(
56
  subject = default_subject
57
 
58
  plan_key = normalize_plan_key(sub.get("plan_key"))
 
59
  identity_cache[token] = {
60
  "plan_key": plan_key,
61
  "subject": subject,
62
  "expires_at": now + IDENTITY_CACHE_TTL_SECONDS,
63
  }
 
64
  bind_client_subject(normalized_client_id, subject, plan_key)
 
65
  return plan_key, subject
66
 
67
 
@@ -77,15 +116,20 @@ async def enforce_rate_limit(
77
  plan_key, subject = await resolve_rate_limit_identity(
78
  request, authorization, client_id
79
  )
 
80
  plan = TIER_CONFIG.get(plan_key) or TIER_CONFIG["free"]
81
  plan_limits = plan.get("limits", {})
82
  limit = plan_limits.get(metric)
83
 
84
  window_key = get_usage_period_key(metric)
 
85
  lock = get_usage_lock(metric, subject)
 
86
  async with lock:
87
  bucket = usage_store[metric]
 
88
  entry = bucket.get(subject)
 
89
  if not entry or entry.get("window") != window_key:
90
  entry = {"window": window_key, "count": 0}
91
  bucket[subject] = entry
@@ -97,7 +141,9 @@ async def enforce_rate_limit(
97
  )
98
 
99
  entry["count"] += 1
 
100
  remaining = None if limit is None else max(0, int(limit) - entry["count"])
 
101
  return {
102
  "plan_key": plan_key,
103
  "remaining": remaining,
@@ -129,3 +175,216 @@ async def check_video_rate_limit(
129
  ):
130
  await enforce_rate_limit(request, authorization, "videosDaily", client_id)
131
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import asyncio
2
+ import hashlib
3
+ import os
4
+ import re
5
  import time
6
+ from typing import Any, Dict, List, Optional
7
+
8
  from fastapi import HTTPException, Request
9
+
10
+ from helper.subscriptions import (
11
+ TIER_CONFIG,
12
+ USAGE_PERIODS,
13
+ fetch_subscription,
14
+ normalize_plan_key,
15
+ usage_locks,
16
+ usage_store,
17
+ )
18
+
19
+
20
+ # -------------------------------------------------------------------
21
+ # Configuration / Constants
22
+ # -------------------------------------------------------------------
23
+
24
  IDENTITY_CACHE_TTL_SECONDS = 60
25
+
26
  CLIENT_BIND_TTL_SECONDS = int(
27
  os.getenv("CLIENT_BIND_TTL_SECONDS", str(8 * 24 * 60 * 60))
28
  )
29
+
30
  MAX_CLIENT_ID_LENGTH = 128
 
31
 
32
  MAX_CHAT_PROMPT_CHARS = int(os.getenv("MAX_CHAT_PROMPT_CHARS", "120000"))
33
  MAX_CHAT_PROMPT_BYTES = int(os.getenv("MAX_CHAT_PROMPT_BYTES", "500000"))
34
+
35
  MAX_GROQ_PROMPT_CHARS = int(os.getenv("MAX_GROQ_PROMPT_CHARS", "90000"))
36
  MAX_GROQ_PROMPT_BYTES = int(os.getenv("MAX_GROQ_PROMPT_BYTES", "350000"))
37
+
38
  MAX_MEDIA_PROMPT_CHARS = int(os.getenv("MAX_MEDIA_PROMPT_CHARS", "4000"))
39
  MAX_MEDIA_PROMPT_BYTES = int(os.getenv("MAX_MEDIA_PROMPT_BYTES", "16000"))
40
 
41
+
42
+ # -------------------------------------------------------------------
43
+ # In-memory state
44
+ # -------------------------------------------------------------------
45
+
46
+ identity_cache: Dict[str, Dict[str, Any]] = {}
47
+
48
+ client_subject_bindings: Dict[str, Dict[str, Any]] = {}
49
+
50
+
51
+ # -------------------------------------------------------------------
52
+ # Public Rate Limit APIs
53
+ # -------------------------------------------------------------------
54
+
55
  async def resolve_rate_limit_identity(
56
  request: Request,
57
  authorization: Optional[str],
58
  client_id: Optional[str] = None,
59
  ) -> tuple[str, str]:
60
  now = time.time()
61
+
62
  normalized_client_id = sanitize_client_id(client_id)
63
  default_subject = build_default_subject(request, normalized_client_id)
64
+
65
  if not authorization or not authorization.startswith("Bearer "):
66
  return "free", resolve_bound_subject(normalized_client_id, default_subject)
67
 
 
73
  if cached and cached.get("expires_at", 0) > now:
74
  plan_key = cached.get("plan_key", "free")
75
  subject = cached.get("subject", default_subject)
76
+
77
  bind_client_subject(normalized_client_id, subject, plan_key)
78
  return plan_key, subject
79
 
 
92
  subject = default_subject
93
 
94
  plan_key = normalize_plan_key(sub.get("plan_key"))
95
+
96
  identity_cache[token] = {
97
  "plan_key": plan_key,
98
  "subject": subject,
99
  "expires_at": now + IDENTITY_CACHE_TTL_SECONDS,
100
  }
101
+
102
  bind_client_subject(normalized_client_id, subject, plan_key)
103
+
104
  return plan_key, subject
105
 
106
 
 
116
  plan_key, subject = await resolve_rate_limit_identity(
117
  request, authorization, client_id
118
  )
119
+
120
  plan = TIER_CONFIG.get(plan_key) or TIER_CONFIG["free"]
121
  plan_limits = plan.get("limits", {})
122
  limit = plan_limits.get(metric)
123
 
124
  window_key = get_usage_period_key(metric)
125
+
126
  lock = get_usage_lock(metric, subject)
127
+
128
  async with lock:
129
  bucket = usage_store[metric]
130
+
131
  entry = bucket.get(subject)
132
+
133
  if not entry or entry.get("window") != window_key:
134
  entry = {"window": window_key, "count": 0}
135
  bucket[subject] = entry
 
141
  )
142
 
143
  entry["count"] += 1
144
+
145
  remaining = None if limit is None else max(0, int(limit) - entry["count"])
146
+
147
  return {
148
  "plan_key": plan_key,
149
  "remaining": remaining,
 
175
  ):
176
  await enforce_rate_limit(request, authorization, "videosDaily", client_id)
177
 
178
+
179
+ # -------------------------------------------------------------------
180
+ # Prompt Utilities
181
+ # -------------------------------------------------------------------
182
+
183
+ def normalize_prompt_value(prompt: Optional[str], field_name: str = "prompt") -> str:
184
+ if not isinstance(prompt, str):
185
+ raise HTTPException(status_code=400, detail=f"{field_name} is required")
186
+
187
+ normalized = prompt.strip()
188
+
189
+ if not normalized:
190
+ raise HTTPException(status_code=400, detail=f"{field_name} is required")
191
+
192
+ return normalized
193
+
194
+
195
+ def enforce_prompt_size(prompt: str, max_chars: int, max_bytes: int, context: str):
196
+ char_len = len(prompt)
197
+ byte_len = len(prompt.encode("utf-8"))
198
+
199
+ if char_len > max_chars or byte_len > max_bytes:
200
+ raise HTTPException(
201
+ status_code=413,
202
+ detail=(
203
+ f"{context} is too large ({char_len} chars, {byte_len} bytes). "
204
+ f"Max allowed is {max_chars} chars or {max_bytes} bytes."
205
+ ),
206
+ )
207
+
208
+
209
+ def calculate_messages_size(messages: list) -> tuple[int, int]:
210
+ total_chars = 0
211
+ total_bytes = 0
212
+
213
+ for message in messages:
214
+ if not isinstance(message, dict):
215
+ continue
216
+
217
+ text = message_content_to_text(message.get("content"))
218
+
219
+ if not text:
220
+ continue
221
+
222
+ total_chars += len(text)
223
+ total_bytes += len(text.encode("utf-8"))
224
+
225
+ return total_chars, total_bytes
226
+
227
+
228
+ def extract_user_text(messages: list) -> str:
229
+ return " ".join(
230
+ message_content_to_text(m.get("content"))
231
+ for m in messages
232
+ if isinstance(m, dict) and m.get("role") == "user"
233
+ ).lower()
234
+
235
+
236
+ # -------------------------------------------------------------------
237
+ # Usage / Rate Limit Internals
238
+ # -------------------------------------------------------------------
239
+
240
+ def get_usage_period_key(metric: str) -> str:
241
+ now = time.gmtime()
242
+
243
+ period = USAGE_PERIODS.get(metric, "daily")
244
+
245
+ if period == "weekly":
246
+ iso_year, iso_week, _ = time.strftime("%G %V %u", now).split(" ")
247
+ return f"{iso_year}-W{iso_week}"
248
+
249
+ return time.strftime("%Y-%m-%d", now)
250
+
251
+
252
+ def get_usage_lock(metric: str, subject: str) -> asyncio.Lock:
253
+ metric_locks = usage_locks.get(metric)
254
+
255
+ if metric_locks is None:
256
+ metric_locks = {}
257
+ usage_locks[metric] = metric_locks
258
+
259
+ lock = metric_locks.get(subject)
260
+
261
+ if lock is None:
262
+ lock = asyncio.Lock()
263
+ metric_locks[subject] = lock
264
+
265
+ return lock
266
+
267
+
268
+ def get_usage_snapshot_for_subject(
269
+ plan_key: str, subject: str
270
+ ) -> Dict[str, Dict[str, Any]]:
271
+ plan = TIER_CONFIG.get(plan_key) or TIER_CONFIG["free"]
272
+
273
+ plan_limits = plan.get("limits", {})
274
+
275
+ snapshot: Dict[str, Dict[str, Any]] = {}
276
+
277
+ for metric in usage_store.keys():
278
+ limit = plan_limits.get(metric)
279
+
280
+ window_key = get_usage_period_key(metric)
281
+
282
+ entry = usage_store[metric].get(subject)
283
+
284
+ used = 0
285
+
286
+ if entry and entry.get("window") == window_key:
287
+ used = max(0, int(entry.get("count", 0)))
288
+
289
+ remaining = None if limit is None else max(0, int(limit) - used)
290
+
291
+ snapshot[metric] = {
292
+ "limit": limit,
293
+ "used": used,
294
+ "remaining": remaining,
295
+ "window": window_key,
296
+ "period": USAGE_PERIODS.get(metric, "daily"),
297
+ }
298
+
299
+ return snapshot
300
+
301
+
302
+ # -------------------------------------------------------------------
303
+ # Identity / Client helpers
304
+ # -------------------------------------------------------------------
305
+
306
+ def sanitize_client_id(raw_client_id: Optional[str]) -> Optional[str]:
307
+ if not isinstance(raw_client_id, str):
308
+ return None
309
+
310
+ trimmed = raw_client_id.strip()
311
+
312
+ if not trimmed or len(trimmed) > MAX_CLIENT_ID_LENGTH:
313
+ return None
314
+
315
+ if not re.match(r"^[A-Za-z0-9._:-]+$", trimmed):
316
+ return None
317
+
318
+ return trimmed
319
+
320
+
321
+ def build_default_subject(request: Request, client_id: Optional[str]) -> str:
322
+ if client_id:
323
+ client_hash = hashlib.sha256(client_id.encode("utf-8")).hexdigest()[:24]
324
+ return f"client:{client_hash}"
325
+
326
+ host = request.client.host if request.client else "unknown"
327
+
328
+ user_agent = request.headers.get("user-agent", "")
329
+
330
+ ua_hash = (
331
+ hashlib.sha256(user_agent.encode("utf-8")).hexdigest()[:12]
332
+ if user_agent
333
+ else "noua"
334
+ )
335
+
336
+ return f"anon:{host}:{ua_hash}"
337
+
338
+
339
+ def bind_client_subject(client_id: Optional[str], subject: str, plan_key: str):
340
+ if not client_id:
341
+ return
342
+
343
+ client_subject_bindings[client_id] = {
344
+ "subject": subject,
345
+ "plan_key": plan_key,
346
+ "expires_at": time.time() + CLIENT_BIND_TTL_SECONDS,
347
+ }
348
+
349
+
350
+ def resolve_bound_subject(client_id: Optional[str], fallback_subject: str) -> str:
351
+ if not client_id:
352
+ return fallback_subject
353
+
354
+ bound = client_subject_bindings.get(client_id)
355
+
356
+ if not bound:
357
+ return fallback_subject
358
+
359
+ if bound.get("expires_at", 0) <= time.time():
360
+ client_subject_bindings.pop(client_id, None)
361
+ return fallback_subject
362
+
363
+ return bound.get("subject", fallback_subject)
364
+
365
+
366
+ # -------------------------------------------------------------------
367
+ # Message parsing helpers
368
+ # -------------------------------------------------------------------
369
+
370
+ def message_content_to_text(content: Any) -> str:
371
+ if isinstance(content, str):
372
+ return content
373
+
374
+ if isinstance(content, list):
375
+ parts: List[str] = []
376
+
377
+ for item in content:
378
+ if isinstance(item, str):
379
+ parts.append(item)
380
+ continue
381
+
382
+ if isinstance(item, dict):
383
+ text = item.get("text")
384
+
385
+ if isinstance(text, str):
386
+ parts.append(text)
387
+
388
+ return " ".join(parts)
389
+
390
+ return ""