File size: 4,451 Bytes
6172a47 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 | """Request detection utilities for API optimizations.
Detects quota checks, title generation, prefix detection, suggestion mode,
and filepath extraction requests to enable fast-path responses.
"""
from providers.common.text import extract_text_from_content
from .models.anthropic import MessagesRequest
def is_quota_check_request(request_data: MessagesRequest) -> bool:
"""Check if this is a quota probe request.
Quota checks are typically simple requests with max_tokens=1
and a single message containing the word "quota".
"""
if (
request_data.max_tokens == 1
and len(request_data.messages) == 1
and request_data.messages[0].role == "user"
):
text = extract_text_from_content(request_data.messages[0].content)
if "quota" in text.lower():
return True
return False
def is_title_generation_request(request_data: MessagesRequest) -> bool:
"""Check if this is a conversation title generation request.
Title generation requests are detected by a system prompt containing
title extraction instructions, no tools, and a single user message.
"""
if not request_data.system or request_data.tools:
return False
system_text = extract_text_from_content(request_data.system).lower()
return "new conversation topic" in system_text and "title" in system_text
def is_prefix_detection_request(request_data: MessagesRequest) -> tuple[bool, str]:
"""Check if this is a fast prefix detection request.
Prefix detection requests contain a policy_spec block and
a Command: section for extracting shell command prefixes.
Returns:
Tuple of (is_prefix_request, command_string)
"""
if len(request_data.messages) != 1 or request_data.messages[0].role != "user":
return False, ""
content = extract_text_from_content(request_data.messages[0].content)
if "<policy_spec>" in content and "Command:" in content:
try:
cmd_start = content.rfind("Command:") + len("Command:")
return True, content[cmd_start:].strip()
except Exception:
pass
return False, ""
def is_suggestion_mode_request(request_data: MessagesRequest) -> bool:
"""Check if this is a suggestion mode request.
Suggestion mode requests contain "[SUGGESTION MODE:" in the user's message,
used for auto-suggesting what the user might type next.
"""
for msg in request_data.messages:
if msg.role == "user":
text = extract_text_from_content(msg.content)
if "[SUGGESTION MODE:" in text:
return True
return False
def is_filepath_extraction_request(
request_data: MessagesRequest,
) -> tuple[bool, str, str]:
"""Check if this is a filepath extraction request.
Filepath extraction requests have a single user message with
"Command:" and "Output:" sections, asking to extract file paths
from command output.
Returns:
Tuple of (is_filepath_request, command, output)
"""
if len(request_data.messages) != 1 or request_data.messages[0].role != "user":
return False, "", ""
if request_data.tools:
return False, "", ""
content = extract_text_from_content(request_data.messages[0].content)
if "Command:" not in content or "Output:" not in content:
return False, "", ""
# Match if user content OR system block indicates filepath extraction
user_has_filepaths = (
"filepaths" in content.lower() or "<filepaths>" in content.lower()
)
system_text = (
extract_text_from_content(request_data.system) if request_data.system else ""
)
system_has_extract = (
"extract any file paths" in system_text.lower()
or "file paths that this command" in system_text.lower()
)
if not user_has_filepaths and not system_has_extract:
return False, "", ""
try:
cmd_start = content.find("Command:") + len("Command:")
output_marker = content.find("Output:", cmd_start)
if output_marker == -1:
return False, "", ""
command = content[cmd_start:output_marker].strip()
output = content[output_marker + len("Output:") :].strip()
for marker in ["<", "\n\n"]:
if marker in output:
output = output.split(marker)[0].strip()
return True, command, output
except Exception:
return False, "", ""
|