Spaces:
Sleeping
Sleeping
charSLee013
feat: complete Hugging Face Spaces deployment with production-ready CognitiveKernel-Launchpad
1ea26af
| #!/usr/bin/env python3 | |
| """ | |
| Pure HTTP LLM Client - Linus style: simple, direct, fail fast | |
| No provider abstraction, no defensive programming, no technical debt | |
| """ | |
| import requests | |
| from .utils import wrapped_trying, KwargsInitializable | |
| class RateLimitError(Exception): | |
| """Special exception for HTTP 429 rate limit errors""" | |
| pass | |
| try: | |
| import tiktoken | |
| except ImportError: | |
| tiktoken = None | |
| class TikTokenMessageTruncator: | |
| def __init__(self, model_name="gpt-4"): | |
| if tiktoken is None: | |
| # Fallback will be used by MessageTruncator alias when tiktoken is missing | |
| # Keep class importable but non-functional if instantiated directly without tiktoken | |
| raise ImportError("tiktoken is required but not installed") | |
| self.encoding = tiktoken.encoding_for_model(model_name) | |
| def _count_text_tokens(self, content): | |
| """Count tokens in a message's content""" | |
| if isinstance(content, str): | |
| return len(self.encoding.encode(content)) | |
| elif isinstance(content, list): | |
| total = 0 | |
| for part in content: | |
| if part.get("type") == "text": | |
| total += len(self.encoding.encode(part.get("text", ""))) | |
| return total | |
| else: | |
| return 0 | |
| def _truncate_text_content(self, content, max_tokens): | |
| """Truncate text in content to fit max_tokens""" | |
| if isinstance(content, str): | |
| tokens = self.encoding.encode(content) | |
| truncated_tokens = tokens[:max_tokens] | |
| return self.encoding.decode(truncated_tokens) | |
| elif isinstance(content, list): | |
| new_content = [] | |
| tokens_used = 0 | |
| for part in content: | |
| if part.get("type") == "text": | |
| text = part.get("text", "") | |
| tokens = self.encoding.encode(text) | |
| if tokens_used + len(tokens) > max_tokens: | |
| remaining = max_tokens - tokens_used | |
| if remaining > 0: | |
| truncated_tokens = tokens[:remaining] | |
| truncated_text = self.encoding.decode(truncated_tokens) | |
| if truncated_text: | |
| new_content.append({"type": "text", "text": truncated_text}) | |
| break | |
| else: | |
| new_content.append(part) | |
| tokens_used += len(tokens) | |
| else: | |
| new_content.append(part) | |
| return new_content | |
| else: | |
| return content | |
| def truncate_message_list(self, messages, max_length): | |
| """Truncate a list of messages to fit max_length tokens""" | |
| truncated = [] | |
| total_tokens = 0 | |
| for msg in reversed(messages): | |
| content = msg.get("content", "") | |
| tokens = self._count_text_tokens(content) | |
| if total_tokens + tokens > max_length: | |
| if not truncated: | |
| truncated_content = self._truncate_text_content(content, max_length) | |
| truncated_msg = msg.copy() | |
| truncated_msg["content"] = truncated_content | |
| truncated.insert(0, truncated_msg) | |
| break | |
| truncated.insert(0, msg) | |
| total_tokens += tokens | |
| return truncated | |
| # Lightweight fallback truncator | |
| class _LightweightMessageTruncator: | |
| def truncate_message_list(self, messages, max_length): | |
| # Very simple char-based truncation as a fallback | |
| total = 0 | |
| out = [] | |
| for msg in reversed(messages): | |
| content = msg.get("content", "") | |
| size = len(str(content)) | |
| if total + size > max_length: | |
| if not out: | |
| # truncate this one | |
| truncated_msg = msg.copy() | |
| text = str(content) | |
| truncated_msg["content"] = text[: max(0, max_length - total)] | |
| out.insert(0, truncated_msg) | |
| break | |
| out.insert(0, msg) | |
| total += size | |
| return out | |
| # Single, deterministic MessageTruncator alias - fail fast, no confusion | |
| if tiktoken is not None: | |
| MessageTruncator = TikTokenMessageTruncator | |
| else: | |
| MessageTruncator = _LightweightMessageTruncator | |
| class LLM(KwargsInitializable): | |
| """ | |
| Pure HTTP LLM Client - Linus style: simple, direct, fail fast | |
| Design principles: | |
| 1. HTTP-only endpoints - no provider abstraction | |
| 2. Fail fast validation - no defensive programming | |
| 3. extract_body for request parameters | |
| 4. Auto base64 for images | |
| Required fields: call_target (HTTP URL), api_key, model | |
| """ | |
| def __init__(self, **kwargs): | |
| # Pure HTTP config - no provider abstraction | |
| self.call_target = None # Must be full HTTP URL | |
| self.api_key = None | |
| self.api_base_url = None # Optional for provider-style targets | |
| self.model = None # Model ID - separate from extract_body | |
| self.extract_body = {} # Pure request parameters (no model!) | |
| self.max_retry_times = 3 | |
| self.request_timeout = 600 | |
| self.max_token_num = 20000 | |
| # Backward compatibility attributes (ignored in pure HTTP mode) | |
| self.thinking = False | |
| self.seed = 1377 | |
| self.print_call_in = None | |
| self.print_call_out = None | |
| self.call_kwargs = {} # Legacy attribute | |
| # Initialize | |
| super().__init__(**kwargs) | |
| # Handle _default_init case (skip validation) | |
| if kwargs.get('_default_init'): | |
| self.headers = None | |
| self.call_stat = {} | |
| self.message_truncator = TikTokenMessageTruncator() | |
| return | |
| # HTTP-only validation - fail fast, no provider abstraction | |
| if not self.call_target: | |
| raise ValueError("call_target (HTTP URL) is required") | |
| if not isinstance(self.call_target, str) or not self.call_target.startswith("http"): | |
| raise ValueError(f"call_target must be HTTP URL starting with 'http', got: {self.call_target}") | |
| if not self.api_key: | |
| raise ValueError("api_key is required") | |
| if not self.model: | |
| raise ValueError("model is required") | |
| # Setup HTTP headers - simple and direct | |
| self.headers = { | |
| "Content-Type": "application/json", | |
| "Authorization": f"Bearer {self.api_key}" | |
| } | |
| # Stats and truncator | |
| self.call_stat = {} | |
| self.message_truncator = TikTokenMessageTruncator() | |
| def __repr__(self): | |
| return f"LLM(target={self.call_target})" | |
| def __call__(self, messages, extract_body=None, **kwargs): | |
| """Pure HTTP call interface""" | |
| func = lambda: self._call_with_messages(messages, extract_body, **kwargs) | |
| return wrapped_trying(func, max_times=self.max_retry_times, wait_error_names=('RateLimitError',)) | |
| def _call_with_messages(self, messages, extract_body=None, **kwargs): | |
| """Execute pure HTTP LLM call - no abstraction, fail fast""" | |
| # Handle uninitialized case | |
| if not self.headers or not self.call_target: | |
| raise RuntimeError("LLM not properly initialized - use proper call_target and api_key") | |
| # Process images to base64 | |
| messages = self._process_images(messages) | |
| # Truncate messages | |
| messages = self.message_truncator.truncate_message_list(messages, self.max_token_num) | |
| # Build payload - start with required fields | |
| payload = { | |
| "model": self.model, # Model is separate, not in extract_body | |
| "messages": messages | |
| } | |
| # Add default extract_body parameters (pure request params only) | |
| if self.extract_body: | |
| payload.update(self.extract_body) | |
| # Add call-specific extract_body parameters (override defaults) | |
| if extract_body: | |
| payload.update(extract_body) | |
| # Add any additional kwargs | |
| payload.update(kwargs) | |
| # Execute HTTP call - direct to call_target | |
| response = requests.post( | |
| self.call_target, | |
| headers=self.headers, | |
| json=payload, | |
| timeout=self.request_timeout | |
| ) | |
| # Handle different HTTP status codes appropriately | |
| if response.status_code == 429: | |
| # Rate limit exceeded - special handling for retry logic | |
| raise RateLimitError(f"HTTP {response.status_code}: {response.text}") | |
| elif response.status_code != 200: | |
| # Other HTTP errors - fail fast | |
| raise RuntimeError(f"HTTP {response.status_code}: {response.text}") | |
| # Parse response - fail fast on invalid format | |
| try: | |
| result = response.json() | |
| message = result["choices"][0]["message"] | |
| # Check for function calls (tool_calls) | |
| tool_calls = message.get("tool_calls") | |
| if tool_calls and len(tool_calls) > 0: | |
| # Extract function call arguments and synthesize as JSON string | |
| tool_call = tool_calls[0] | |
| if tool_call.get("type") == "function": | |
| function_args = tool_call.get("function", {}).get("arguments", "{}") | |
| # Return the function arguments as a JSON string | |
| content = function_args | |
| else: | |
| content = message.get("content", "") | |
| else: | |
| # Regular text response | |
| content = message.get("content", "") | |
| except (KeyError, IndexError): | |
| raise RuntimeError(f"Invalid response format: {result}") | |
| # Fail fast - empty response | |
| if not content or content.strip() == "": | |
| raise RuntimeError(f"Empty response: {result}") | |
| # Update stats | |
| self._update_stats(result) | |
| return content | |
| def _process_images(self, messages): | |
| """Process images in messages - auto convert to base64 if needed""" | |
| processed_messages = [] | |
| for message in messages: | |
| content = message.get("content", "") | |
| if isinstance(content, list): | |
| # Multi-modal content - process each part | |
| processed_content = [] | |
| for part in content: | |
| if part.get("type") == "image_url": | |
| # Image part - ensure base64 format | |
| image_url = part["image_url"]["url"] | |
| if image_url.startswith("data:image/"): | |
| # Already base64 - keep as is | |
| processed_content.append(part) | |
| else: | |
| # Convert to base64 (if local file or URL) | |
| # For now, assume it's already properly formatted | |
| processed_content.append(part) | |
| else: | |
| # Text or other content | |
| processed_content.append(part) | |
| processed_message = message.copy() | |
| processed_message["content"] = processed_content | |
| processed_messages.append(processed_message) | |
| else: | |
| # Simple text content | |
| processed_messages.append(message) | |
| return processed_messages | |
| def _update_stats(self, result): | |
| """Update call statistics""" | |
| usage = result.get("usage", {}) | |
| if usage: | |
| self.call_stat["llm_call"] = self.call_stat.get("llm_call", 0) + 1 | |
| for key in ["prompt_tokens", "completion_tokens", "total_tokens"]: | |
| self.call_stat[key] = self.call_stat.get(key, 0) + usage.get(key, 0) | |