Dmitry Beresnev commited on
Commit
130d9e3
·
1 Parent(s): 6381e7f

a “slow request” logging, log when a request exceeds the budgeted prompt and gets compacted

Browse files
Files changed (1) hide show
  1. app.py +57 -0
app.py CHANGED
@@ -179,6 +179,11 @@ def _compact_messages(messages: list[dict], max_tokens: int) -> list[dict]:
179
  return compacted
180
 
181
 
 
 
 
 
 
182
  @dataclass
183
  class CachedModel:
184
  """Represents a cached model with its process and connection info."""
@@ -953,6 +958,7 @@ async def chat_completions(request: ChatCompletionRequest, raw_request: Request)
953
  - Request metrics tracking
954
  """
955
  request_id = getattr(raw_request.state, "request_id", "-")
 
956
  try:
957
  request_start = time.time()
958
 
@@ -965,7 +971,30 @@ async def chat_completions(request: ChatCompletionRequest, raw_request: Request)
965
  raise HTTPException(status_code=500, detail="Current model not loaded")
966
 
967
  # Forward to llama-server using aiohttp
 
 
 
 
 
 
 
 
968
  compacted_messages = _compact_messages(request.messages, request.max_tokens)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
969
  payload = {
970
  "messages": compacted_messages,
971
  "max_tokens": request.max_tokens,
@@ -1000,6 +1029,9 @@ async def chat_completions(request: ChatCompletionRequest, raw_request: Request)
1000
  except Exception:
1001
  logger.exception(f"request_id={request_id} chat_completions error")
1002
  raise
 
 
 
1003
 
1004
 
1005
  async def search_web_async(query: str, max_results: int = 5) -> list[dict]:
@@ -1074,6 +1106,7 @@ async def web_chat_completions(request: WebChatRequest, raw_request: Request):
1074
  - Parallel execution where possible
1075
  """
1076
  request_id = getattr(raw_request.state, "request_id", "-")
 
1077
  try:
1078
  # Get the last user message as search query
1079
  user_messages = [msg for msg in request.messages if msg.get("role") == "user"]
@@ -1109,8 +1142,29 @@ Always cite sources when using information from the search results."""
1109
  raise HTTPException(status_code=500, detail="HTTP session not initialized")
1110
 
1111
  # Compact messages to fit within context
 
 
 
 
 
 
 
 
1112
  augmented_messages = _compact_messages(augmented_messages, request.max_tokens)
1113
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1114
  # Get current model from cache
1115
  cached_model = model_cache.get(current_model)
1116
  if not cached_model:
@@ -1146,6 +1200,9 @@ Always cite sources when using information from the search results."""
1146
  except Exception as e:
1147
  logger.exception(f"request_id={request_id} web_chat_completions error")
1148
  raise HTTPException(status_code=500, detail=f"Error: {str(e)}")
 
 
 
1149
 
1150
 
1151
  @app.get(
 
179
  return compacted
180
 
181
 
182
+ def _estimate_messages_tokens(messages: list[dict]) -> int:
183
+ """Estimate total tokens for a list of messages."""
184
+ return sum(_estimate_tokens(str(m.get("content", ""))) for m in messages)
185
+
186
+
187
  @dataclass
188
  class CachedModel:
189
  """Represents a cached model with its process and connection info."""
 
958
  - Request metrics tracking
959
  """
960
  request_id = getattr(raw_request.state, "request_id", "-")
961
+ slow_task: Optional[asyncio.Task] = None
962
  try:
963
  request_start = time.time()
964
 
 
971
  raise HTTPException(status_code=500, detail="Current model not loaded")
972
 
973
  # Forward to llama-server using aiohttp
974
+ prompt_budget = CONTEXT_SIZE - request.max_tokens - PROMPT_MARGIN_TOKENS
975
+ original_tokens = _estimate_messages_tokens(request.messages)
976
+ if prompt_budget > 0 and original_tokens > prompt_budget:
977
+ logger.warning(
978
+ f"request_id={request_id} prompt_compaction "
979
+ f"original_tokens≈{original_tokens} budget≈{prompt_budget}"
980
+ )
981
+
982
  compacted_messages = _compact_messages(request.messages, request.max_tokens)
983
+
984
+ compacted_tokens = _estimate_messages_tokens(compacted_messages)
985
+ if compacted_tokens < original_tokens:
986
+ logger.info(
987
+ f"request_id={request_id} prompt_compacted "
988
+ f"tokens≈{original_tokens}->{compacted_tokens}"
989
+ )
990
+
991
+ async def _slow_request_logger():
992
+ await asyncio.sleep(30)
993
+ elapsed = time.time() - request_start
994
+ logger.warning(f"request_id={request_id} slow_request {elapsed:.1f}s")
995
+
996
+ slow_task = asyncio.create_task(_slow_request_logger())
997
+
998
  payload = {
999
  "messages": compacted_messages,
1000
  "max_tokens": request.max_tokens,
 
1029
  except Exception:
1030
  logger.exception(f"request_id={request_id} chat_completions error")
1031
  raise
1032
+ finally:
1033
+ if slow_task and not slow_task.done():
1034
+ slow_task.cancel()
1035
 
1036
 
1037
  async def search_web_async(query: str, max_results: int = 5) -> list[dict]:
 
1106
  - Parallel execution where possible
1107
  """
1108
  request_id = getattr(raw_request.state, "request_id", "-")
1109
+ slow_task: Optional[asyncio.Task] = None
1110
  try:
1111
  # Get the last user message as search query
1112
  user_messages = [msg for msg in request.messages if msg.get("role") == "user"]
 
1142
  raise HTTPException(status_code=500, detail="HTTP session not initialized")
1143
 
1144
  # Compact messages to fit within context
1145
+ prompt_budget = CONTEXT_SIZE - request.max_tokens - PROMPT_MARGIN_TOKENS
1146
+ original_tokens = _estimate_messages_tokens(augmented_messages)
1147
+ if prompt_budget > 0 and original_tokens > prompt_budget:
1148
+ logger.warning(
1149
+ f"request_id={request_id} prompt_compaction "
1150
+ f"original_tokens≈{original_tokens} budget≈{prompt_budget}"
1151
+ )
1152
+
1153
  augmented_messages = _compact_messages(augmented_messages, request.max_tokens)
1154
 
1155
+ compacted_tokens = _estimate_messages_tokens(augmented_messages)
1156
+ if compacted_tokens < original_tokens:
1157
+ logger.info(
1158
+ f"request_id={request_id} prompt_compacted "
1159
+ f"tokens≈{original_tokens}->{compacted_tokens}"
1160
+ )
1161
+
1162
+ async def _slow_request_logger():
1163
+ await asyncio.sleep(30)
1164
+ logger.warning(f"request_id={request_id} slow_request 30.0s")
1165
+
1166
+ slow_task = asyncio.create_task(_slow_request_logger())
1167
+
1168
  # Get current model from cache
1169
  cached_model = model_cache.get(current_model)
1170
  if not cached_model:
 
1200
  except Exception as e:
1201
  logger.exception(f"request_id={request_id} web_chat_completions error")
1202
  raise HTTPException(status_code=500, detail=f"Error: {str(e)}")
1203
+ finally:
1204
+ if slow_task and not slow_task.done():
1205
+ slow_task.cancel()
1206
 
1207
 
1208
  @app.get(