KManager / llm /codewhisperer_client.py
StarrySkyWorld's picture
Initial commit
494c89b
#!/usr/bin/env python3
"""
CodeWhisperer Client for Kiro LLM API
Based on AnyAI-2-Open-API implementation.
Endpoint: https://codewhisperer.{region}.amazonaws.com/generateAssistantResponse
"""
import json
import uuid
import hashlib
import platform
from typing import AsyncGenerator, Dict, Any, List, Optional
import httpx
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent.parent))
from llm.token_pool import TokenPool, PoolToken
from core.kiro_config import get_machine_id, get_kiro_version
# ============================================================================
# Configuration
# ============================================================================
DEFAULT_REGION = "us-east-1"
# Model mapping (from Kiro UI)
MODEL_MAPPING = {
# Opus 4.5 (2.2x credit)
"claude-opus-4-5": "CLAUDE_OPUS_4_5_V1_0",
"claude-opus-4.5": "CLAUDE_OPUS_4_5_V1_0",
"claude-4-opus": "CLAUDE_OPUS_4_5_V1_0",
"opus": "CLAUDE_OPUS_4_5_V1_0",
# Sonnet 4.5 (1.3x credit)
"claude-sonnet-4-5": "CLAUDE_SONNET_4_5_V1_0",
"claude-sonnet-4.5": "CLAUDE_SONNET_4_5_V1_0",
"claude-4-sonnet": "CLAUDE_SONNET_4_5_V1_0",
# Sonnet 4 (1.3x credit)
"claude-sonnet-4": "CLAUDE_SONNET_4_20250514_V1_0",
"claude-sonnet-4-20250514": "CLAUDE_SONNET_4_20250514_V1_0",
"sonnet": "CLAUDE_SONNET_4_20250514_V1_0",
# Haiku 4.5 (0.4x credit)
"claude-haiku-4-5": "CLAUDE_HAIKU_4_5_V1_0",
"claude-haiku-4.5": "CLAUDE_HAIKU_4_5_V1_0",
"haiku": "CLAUDE_HAIKU_4_5_V1_0",
# Auto (default)
"auto": "AUTO",
}
def get_endpoint(region: str) -> str:
"""Get CodeWhisperer endpoint for region."""
return f"https://codewhisperer.{region}.amazonaws.com/generateAssistantResponse"
def get_user_agent(machine_id: str) -> Dict[str, str]:
"""Generate Kiro-compatible user agent headers."""
version = get_kiro_version()
os_info = f"win32#{platform.version()}" if platform.system() == "Windows" else f"{platform.system().lower()}#{platform.release()}"
return {
"x-amz-user-agent": f"aws-sdk-js/1.0.7 KiroIDE-{version}-{machine_id}",
"user-agent": f"aws-sdk-js/1.0.7 ua/2.1 os/{os_info} lang/js md/nodejs#20.16.0 api/codewhispererstreaming#1.0.7 m/E KiroIDE-{version}-{machine_id}",
}
# ============================================================================
# CodeWhisperer Client
# ============================================================================
class CodeWhispererClient:
"""Client for CodeWhisperer API (Kiro backend)."""
def __init__(self, token_pool: TokenPool):
self.token_pool = token_pool
self.current_token: Optional[PoolToken] = None
self.machine_id = get_machine_id()
async def generate(
self,
messages: List[Any],
model: str = "claude-sonnet-4-20250514",
max_tokens: int = 4096,
temperature: float = 0.7
) -> str:
"""Generate non-streaming response."""
content = ""
async for chunk in self.generate_stream(messages, model, max_tokens, temperature):
content += chunk
return content
async def generate_stream(
self,
messages: List[Any],
model: str = "claude-sonnet-4-20250514",
max_tokens: int = 4096,
temperature: float = 0.7
) -> AsyncGenerator[str, None]:
"""Generate streaming response."""
# Get token from pool
token = await self.token_pool.get_token()
if not token:
raise Exception("No available tokens in pool")
self.current_token = token
# Build request
url = get_endpoint(token.region)
request_body = self._build_request(messages, model)
# Headers
ua_headers = get_user_agent(self.machine_id)
headers = {
"Content-Type": "application/json",
"Accept": "application/json",
"Authorization": f"Bearer {token.access_token}",
"amz-sdk-invocation-id": str(uuid.uuid4()),
"amz-sdk-request": "attempt=1; max=1",
"x-amzn-kiro-agent-mode": "vibe",
**ua_headers
}
body_str = json.dumps(request_body)
try:
async with httpx.AsyncClient(timeout=120.0) as client:
response = await client.post(
url,
headers=headers,
content=body_str
)
if response.status_code == 401:
await self.token_pool.mark_error(token.filename, "Unauthorized")
raise Exception("Token unauthorized - may be expired or banned")
if response.status_code == 403:
error_text = response.text[:200]
await self.token_pool.mark_error(token.filename, f"Forbidden: {error_text}")
raise Exception(f"Access forbidden - {error_text}")
if response.status_code == 429:
await self.token_pool.mark_quota_exceeded(token.filename)
raise Exception("Rate limited - quota exceeded")
if response.status_code != 200:
error_body = response.text[:500]
await self.token_pool.mark_error(
token.filename,
f"HTTP {response.status_code}: {error_body}"
)
raise Exception(f"API error {response.status_code}: {error_body}")
# Parse response
content = self._parse_response(response.text)
if content:
yield content
# Success
await self.token_pool.mark_success(token.filename)
except httpx.TimeoutException:
await self.token_pool.mark_error(token.filename, "Request timeout")
raise Exception("Request timed out")
except Exception as e:
error_str = str(e).lower()
if "unauthorized" in error_str or "forbidden" in error_str:
await self.token_pool.mark_error(token.filename, str(e))
raise
def _build_request(self, messages: List[Any], model: str) -> Dict[str, Any]:
"""Build CodeWhisperer request from OpenAI-style messages."""
conversation_id = str(uuid.uuid4())
# Get model ID
cw_model = MODEL_MAPPING.get(model, MODEL_MAPPING.get("claude-sonnet-4-20250514"))
# Process messages
history = []
system_prompt = ""
current_message = None
for msg in messages:
role = msg.role if hasattr(msg, 'role') else msg.get('role', '')
content = msg.content if hasattr(msg, 'content') else msg.get('content', '')
if role == "system":
system_prompt = content
elif role == "user":
if current_message:
# Add previous exchange to history
history.append(current_message)
user_content = content
if system_prompt and not history:
# Prepend system prompt to first user message
user_content = f"{system_prompt}\n\n{content}"
system_prompt = ""
current_message = {
"userInputMessage": {
"content": user_content,
"modelId": cw_model,
"origin": "AI_EDITOR",
"userInputMessageContext": {}
}
}
elif role == "assistant":
if current_message:
current_message["assistantResponseMessage"] = {
"content": content
}
history.append(current_message)
current_message = None
# Build final request
if current_message:
request = {
"conversationState": {
"chatTriggerType": "MANUAL",
"conversationId": conversation_id,
"currentMessage": current_message,
"history": history[:-1] if history else [] # Exclude last if it's current
}
}
else:
# Fallback - use last message
last_content = ""
if messages:
last_msg = messages[-1]
last_content = last_msg.content if hasattr(last_msg, 'content') else last_msg.get('content', '')
if system_prompt:
last_content = f"{system_prompt}\n\n{last_content}"
request = {
"conversationState": {
"chatTriggerType": "MANUAL",
"conversationId": conversation_id,
"currentMessage": {
"userInputMessage": {
"content": last_content,
"modelId": cw_model,
"origin": "AI_EDITOR",
"userInputMessageContext": {}
}
},
"history": history
}
}
return request
def _parse_response(self, raw_response: str) -> str:
"""Parse CodeWhisperer response and extract content."""
content_parts = []
# Response format: event{json}event{json}...
# Look for content fields in the response
import re
# Try to find all JSON objects with content
# Pattern: look for "content":"..."
content_pattern = re.compile(r'"content"\s*:\s*"((?:[^"\\]|\\.)*)?"', re.DOTALL)
for match in content_pattern.finditer(raw_response):
content = match.group(1)
if content:
# Unescape
content = content.replace('\\n', '\n')
content = content.replace('\\t', '\t')
content = content.replace('\\"', '"')
content = content.replace('\\\\', '\\')
content_parts.append(content)
# Also try parsing as JSON events
# Format: :message-typeevent{...}
event_pattern = re.compile(r':message-typeevent(\{[^}]+\})', re.DOTALL)
for match in event_pattern.finditer(raw_response):
try:
event = json.loads(match.group(1))
if 'content' in event and event['content']:
content_parts.append(event['content'])
except json.JSONDecodeError:
pass
# Deduplicate while preserving order
seen = set()
unique_parts = []
for part in content_parts:
if part not in seen:
seen.add(part)
unique_parts.append(part)
return ''.join(unique_parts)