felix-framework / src /llm /huggingface_client.py
jkbennitt
FIX: Resolve 7 critical HF Spaces deployment issues for production readiness
8d60b1e
"""
HuggingFace ZeroGPU-optimized client for Felix Framework on HF Spaces.
This module provides advanced HuggingFace integration optimized for ZeroGPU acceleration,
HF Pro account features, and HF Spaces deployment while maintaining full API compatibility
with LMStudioClient.
ZeroGPU Features:
- Dynamic GPU allocation with @spaces.GPU decorator support
- GPU memory management and automatic cleanup
- Batch processing for multiple agents with GPU acceleration
- Model loading with torch.cuda optimization
- Efficient device allocation and deallocation
HF Pro Account Features:
- Higher rate limits and premium model access
- Priority inference queue for Pro accounts
- Advanced model configurations and fine-tuning support
- Extended quota management
Agent-Model Mapping (ZeroGPU Optimized):
- ResearchAgent: Fast 7B models (e.g., microsoft/DialoGPT-large, Qwen/Qwen2.5-7B-Instruct)
- AnalysisAgent: Reasoning 13B models (e.g., microsoft/DialoGPT-large, meta-llama/Llama-3.1-8B-Instruct)
- SynthesisAgent: High-quality 13B models (e.g., meta-llama/Llama-3.1-13B-Instruct)
- CriticAgent: Specialized validation models (e.g., microsoft/DialoGPT-medium)
LMStudioClient Compatibility:
- Drop-in replacement maintaining identical API
- Same method signatures and response objects
- Existing Felix agent system integration preserved
"""
import asyncio
import logging
import time
import os
import gc
from typing import Dict, List, Optional, Any, Union
from dataclasses import dataclass
from enum import Enum
import aiohttp
import json
from datetime import datetime, timedelta
from collections import deque
# ZeroGPU and HF integration imports
try:
import spaces
ZEROGPU_AVAILABLE = True
except ImportError:
ZEROGPU_AVAILABLE = False
# Mock decorator for non-ZeroGPU environments
class MockSpaces:
@staticmethod
def GPU(fn):
return fn
spaces = MockSpaces()
try:
import torch
TORCH_AVAILABLE = True
except ImportError:
TORCH_AVAILABLE = False
from huggingface_hub import HfApi, InferenceClient
try:
from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline
TRANSFORMERS_AVAILABLE = True
except ImportError:
TRANSFORMERS_AVAILABLE = False
from .token_budget import TokenBudgetManager, TokenAllocation
from .lm_studio_client import RequestPriority, LLMResponse
logger = logging.getLogger(__name__)
class ModelType(Enum):
"""Model specialization types for different agent functions."""
RESEARCH = "research"
ANALYSIS = "analysis"
SYNTHESIS = "synthesis"
CRITIC = "critic"
GENERAL = "general"
class GPUMemoryError(Exception):
"""Raised when GPU memory allocation fails."""
pass
class ZeroGPUError(Exception):
"""Raised when ZeroGPU operations fail."""
pass
class HuggingFaceConnectionError(Exception):
"""Raised when cannot connect to HuggingFace services."""
pass
@dataclass
class HFModelConfig:
"""Configuration for a HuggingFace model with ZeroGPU optimization."""
model_id: str
max_tokens: int = 512
temperature: float = 0.7
top_p: float = 0.9
repetition_penalty: float = 1.1
use_cache: bool = True
wait_for_model: bool = True
# ZeroGPU specific settings
use_zerogpu: bool = True
gpu_memory_limit: Optional[float] = None # GB, None for auto
torch_dtype: str = "float16" # torch dtype for GPU efficiency
device_map: str = "auto"
batch_size: int = 1
# HF Pro settings
priority: str = "normal" # normal, high for Pro accounts
use_inference_api: bool = True # Fallback to Inference API
local_model_path: Optional[str] = None
@dataclass
class HFResponse:
"""Response from HuggingFace inference API with GPU metrics."""
content: str
model_used: str
tokens_used: int
response_time: float
success: bool
error: Optional[str] = None
metadata: Optional[Dict[str, Any]] = None
# ZeroGPU specific metrics
gpu_memory_used: Optional[float] = None
gpu_time: Optional[float] = None
batch_processed: Optional[int] = None
fallback_used: bool = False
class HuggingFaceClient:
"""
HuggingFace Inference API client for Felix Framework.
Provides model inference capabilities with token budget management,
rate limiting, and agent specialization support.
"""
# ZeroGPU optimized models for different agent types
DEFAULT_MODELS = {
ModelType.RESEARCH: HFModelConfig(
model_id="microsoft/DialoGPT-large", # Upgraded for ZeroGPU
temperature=0.9,
max_tokens=384,
use_zerogpu=True,
batch_size=2, # Can process multiple research queries
torch_dtype="float16"
),
ModelType.ANALYSIS: HFModelConfig(
model_id="meta-llama/Llama-3.1-8B-Instruct", # Better reasoning
temperature=0.5,
max_tokens=512,
use_zerogpu=True,
batch_size=1,
torch_dtype="float16",
priority="high" # Pro account priority for analysis
),
ModelType.SYNTHESIS: HFModelConfig(
model_id="Qwen/Qwen2.5-7B-Instruct", # ZeroGPU-compatible synthesis (fits in 24GB)
temperature=0.1,
max_tokens=768,
use_zerogpu=True,
batch_size=1,
torch_dtype="float16",
gpu_memory_limit=8.0, # 7B model fits comfortably
priority="high"
),
ModelType.CRITIC: HFModelConfig(
model_id="microsoft/DialoGPT-large",
temperature=0.3,
max_tokens=384,
use_zerogpu=True,
batch_size=2,
torch_dtype="float16"
),
ModelType.GENERAL: HFModelConfig(
model_id="Qwen/Qwen2.5-7B-Instruct", # Good general purpose ZeroGPU model
temperature=0.7,
max_tokens=512,
use_zerogpu=True,
batch_size=1,
torch_dtype="float16"
)
}
def __init__(self,
hf_token: Optional[str] = None,
model_configs: Optional[Dict[ModelType, HFModelConfig]] = None,
token_budget_manager: Optional[TokenBudgetManager] = None,
max_concurrent_requests: int = 10,
request_timeout: float = 30.0,
# ZeroGPU specific parameters
enable_zerogpu: bool = True,
gpu_memory_threshold: float = 0.9, # Trigger cleanup at 90% memory
batch_timeout: float = 5.0, # Max wait time for batching
# LMStudioClient compatibility
base_url: Optional[str] = None, # For API compatibility
timeout: Optional[float] = None, # Alternative name for request_timeout
debug_mode: bool = False):
"""
Initialize HuggingFace ZeroGPU-optimized client.
Args:
hf_token: HuggingFace API token (uses HF_TOKEN env var if None)
model_configs: Custom model configurations by agent type
token_budget_manager: Token budget manager for rate limiting
max_concurrent_requests: Maximum concurrent API requests
request_timeout: Request timeout in seconds
enable_zerogpu: Enable ZeroGPU acceleration when available
gpu_memory_threshold: GPU memory usage threshold for cleanup
batch_timeout: Maximum wait time for request batching
base_url: API base URL (for LMStudioClient compatibility)
timeout: Request timeout (alternative parameter name)
debug_mode: Enable verbose debug output
"""
# API compatibility with LMStudioClient
self.hf_token = hf_token or os.getenv('HF_TOKEN')
self.base_url = base_url # For compatibility (not used)
self.timeout = timeout or request_timeout
self.request_timeout = self.timeout
self.debug_mode = debug_mode
# Core configuration
self.model_configs = model_configs or self.DEFAULT_MODELS
self.token_budget_manager = token_budget_manager or TokenBudgetManager()
self.max_concurrent_requests = max_concurrent_requests
# ZeroGPU configuration
self.enable_zerogpu = enable_zerogpu and ZEROGPU_AVAILABLE
self.gpu_memory_threshold = gpu_memory_threshold
self.batch_timeout = batch_timeout
# Initialize HF clients
self.hf_api = HfApi(token=self.hf_token)
self.inference_clients = {}
self._init_inference_clients()
# ZeroGPU model management
self.loaded_models = {} # Cache for loaded GPU models
self.model_pipelines = {} # Transformers pipelines
self.gpu_memory_usage = 0.0
# Rate limiting and performance tracking
self.semaphore = asyncio.Semaphore(max_concurrent_requests)
self.request_counts = {}
self.response_times = []
self.error_counts = {}
# Batch processing for ZeroGPU efficiency
self.batch_queue = deque()
self.batch_processor_task = None
# LMStudioClient compatibility tracking
self.total_tokens = 0
self.total_requests = 0
self.total_response_time = 0.0
self.concurrent_requests = 0
self._connection_verified = False
# Session management
self.session: Optional[aiohttp.ClientSession] = None
# Initialize if ZeroGPU available
if self.enable_zerogpu:
self._initialize_zerogpu()
def _init_inference_clients(self):
"""Initialize inference clients for each model type."""
for model_type, config in self.model_configs.items():
try:
client = InferenceClient(
model=config.model_id,
token=self.hf_token
)
self.inference_clients[model_type] = client
logger.info(f"Initialized inference client for {model_type.value}: {config.model_id}")
except Exception as e:
logger.error(f"Failed to initialize client for {model_type.value}: {e}")
# Fall back to general model
if model_type != ModelType.GENERAL:
self.inference_clients[model_type] = self.inference_clients.get(ModelType.GENERAL)
def _initialize_zerogpu(self):
"""Initialize ZeroGPU environment and check availability."""
if not ZEROGPU_AVAILABLE:
logger.warning("ZeroGPU not available, falling back to Inference API")
return
if TORCH_AVAILABLE and torch.cuda.is_available():
logger.info(f"ZeroGPU initialized with {torch.cuda.device_count()} GPUs")
for i in range(torch.cuda.device_count()):
logger.info(f"GPU {i}: {torch.cuda.get_device_name(i)}")
else:
logger.warning("CUDA not available, ZeroGPU features disabled")
self.enable_zerogpu = False
# LMStudioClient Compatibility Methods
def test_connection(self) -> bool:
"""
Test connection to HuggingFace services.
Returns:
True if connection successful, False otherwise
"""
try:
# Test with a simple API call
models = self.hf_api.list_models(limit=1)
self._connection_verified = True
return True
except Exception as e:
logger.warning(f"HuggingFace connection test failed: {e}")
self._connection_verified = False
return False
def ensure_connection(self) -> None:
"""Ensure connection to HuggingFace or raise exception."""
if not self._connection_verified and not self.test_connection():
raise HuggingFaceConnectionError(
"Cannot connect to HuggingFace services. "
"Check your internet connection and HF_TOKEN."
)
def complete(self, agent_id: str, system_prompt: str, user_prompt: str,
temperature: float = 0.7, max_tokens: Optional[int] = 500,
model: str = "local-model") -> LLMResponse:
"""
Synchronous completion request (LMStudioClient compatibility).
Args:
agent_id: Identifier for the requesting agent
system_prompt: System/context prompt
user_prompt: User query/task
temperature: Sampling temperature (0.0-1.0)
max_tokens: Maximum tokens in response
model: Model identifier (mapped to agent type)
Returns:
LLMResponse with content and metadata
Raises:
HuggingFaceConnectionError: If cannot connect to HuggingFace
"""
# Run async method synchronously (check for existing loop)
try:
loop = asyncio.get_event_loop()
except RuntimeError:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
# Map model to agent type
agent_type = self._map_model_to_agent_type(model, agent_id)
# Create combined prompt
combined_prompt = f"System: {system_prompt}\n\nUser: {user_prompt}"
result = loop.run_until_complete(
self.generate_text(
prompt=combined_prompt,
agent_type=agent_type,
temperature=temperature,
max_tokens=max_tokens
)
)
# Convert HFResponse to LLMResponse for compatibility
return LLMResponse(
content=result.content,
tokens_used=result.tokens_used,
response_time=result.response_time,
model=result.model_used,
temperature=temperature,
agent_id=agent_id,
timestamp=time.time()
)
finally:
loop.close()
async def complete_async(self, agent_id: str, system_prompt: str,
user_prompt: str, temperature: float = 0.7,
max_tokens: Optional[int] = None,
model: str = "local-model",
priority: RequestPriority = RequestPriority.NORMAL) -> LLMResponse:
"""
Asynchronous completion request (LMStudioClient compatibility).
Args:
agent_id: Identifier for the requesting agent
system_prompt: System/context prompt
user_prompt: User query/task
temperature: Sampling temperature (0.0-1.0)
max_tokens: Maximum tokens in response
model: Model identifier
priority: Request priority level
Returns:
LLMResponse with content and metadata
"""
# Map model to agent type
agent_type = self._map_model_to_agent_type(model, agent_id)
# Create combined prompt
combined_prompt = f"System: {system_prompt}\n\nUser: {user_prompt}"
result = await self.generate_text(
prompt=combined_prompt,
agent_type=agent_type,
temperature=temperature,
max_tokens=max_tokens
)
# Convert HFResponse to LLMResponse for compatibility
return LLMResponse(
content=result.content,
tokens_used=result.tokens_used,
response_time=result.response_time,
model=result.model_used,
temperature=temperature,
agent_id=agent_id,
timestamp=time.time()
)
def _map_model_to_agent_type(self, model: str, agent_id: str) -> ModelType:
"""Map model identifier to agent type for compatibility."""
# Try to infer from agent_id first
agent_id_lower = agent_id.lower()
if "research" in agent_id_lower:
return ModelType.RESEARCH
elif "analysis" in agent_id_lower or "analyze" in agent_id_lower:
return ModelType.ANALYSIS
elif "synthesis" in agent_id_lower or "synthesize" in agent_id_lower:
return ModelType.SYNTHESIS
elif "critic" in agent_id_lower or "critique" in agent_id_lower:
return ModelType.CRITIC
# Try to infer from model name
model_lower = model.lower()
if "research" in model_lower:
return ModelType.RESEARCH
elif "analysis" in model_lower or "thinking" in model_lower:
return ModelType.ANALYSIS
elif "synthesis" in model_lower or "quality" in model_lower:
return ModelType.SYNTHESIS
elif "critic" in model_lower:
return ModelType.CRITIC
return ModelType.GENERAL
def get_usage_stats(self) -> Dict[str, Any]:
"""
Get client usage statistics (LMStudioClient compatibility).
Returns:
Dictionary with usage metrics
"""
avg_response_time = (self.total_response_time / self.total_requests
if self.total_requests > 0 else 0.0)
return {
"total_requests": self.total_requests,
"total_tokens": self.total_tokens,
"total_response_time": self.total_response_time,
"average_response_time": avg_response_time,
"average_tokens_per_request": (self.total_tokens / self.total_requests
if self.total_requests > 0 else 0.0),
"connection_verified": self._connection_verified,
"max_concurrent_requests": self.max_concurrent_requests,
"current_concurrent_requests": self.concurrent_requests,
"queue_size": len(self.batch_queue),
# ZeroGPU specific stats
"zerogpu_enabled": self.enable_zerogpu,
"gpu_memory_usage": self.gpu_memory_usage,
"loaded_models": list(self.loaded_models.keys())
}
def reset_stats(self) -> None:
"""Reset usage statistics (LMStudioClient compatibility)."""
self.total_tokens = 0
self.total_requests = 0
self.total_response_time = 0.0
self.reset_performance_stats()
def create_agent_system_prompt(self, agent_type: str, position_info: Dict[str, float],
task_context: str = "") -> str:
"""
Create system prompt for Felix agent based on position and type (LMStudioClient compatibility).
Args:
agent_type: Type of agent (research, analysis, synthesis, critic)
position_info: Agent's position on helix (x, y, z, radius, depth_ratio)
task_context: Additional context about the current task
Returns:
Formatted system prompt
"""
# Use the same implementation as LMStudioClient but optimized for ZeroGPU models
depth_ratio = position_info.get("depth_ratio", 0.0)
radius = position_info.get("radius", 0.0)
base_prompt = f"""🚨 IMPORTANT: You are a {agent_type} agent in the Felix multi-agent system optimized for ZeroGPU inference.
⚡ ZeroGPU OPTIMIZATION: This response will be processed on GPU-accelerated infrastructure for optimal performance.
Current Position:
- Depth: {depth_ratio:.2f} (0.0 = top/start, 1.0 = bottom/end)
- Radius: {radius:.2f} (decreasing as you progress)
- Processing Stage: {"Early/Broad" if depth_ratio < 0.3 else "Middle/Focused" if depth_ratio < 0.7 else "Final/Precise"}
Your Role Based on Position:
"""
if agent_type == "research":
if depth_ratio < 0.3:
base_prompt += "- MAXIMUM 5 bullet points with key facts ONLY\n"
base_prompt += "- NO explanations, NO introductions, NO conclusions\n"
base_prompt += "- Raw findings only - be direct\n"
else:
base_prompt += "- MAXIMUM 3 specific facts with numbers/dates/quotes\n"
base_prompt += "- NO background context or elaboration\n"
base_prompt += "- Prepare key points for analysis (concise)\n"
elif agent_type == "analysis":
base_prompt += "- MAXIMUM 2 numbered insights/patterns ONLY\n"
base_prompt += "- NO background explanation or context\n"
base_prompt += "- Direct analytical findings only\n"
elif agent_type == "synthesis":
base_prompt += "- FINAL output ONLY - NO process description\n"
base_prompt += "- MAXIMUM 3 short paragraphs\n"
base_prompt += "- Direct, actionable content without fluff\n"
elif agent_type == "critic":
base_prompt += "- MAXIMUM 3 specific issues/fixes ONLY\n"
base_prompt += "- NO praise, NO general comments\n"
base_prompt += "- Direct problems and solutions only\n"
if task_context:
base_prompt += f"\nTask Context: {task_context}\n"
base_prompt += "\n⚡ ZeroGPU REMINDER: Response optimized for GPU acceleration. "
base_prompt += "Early positions focus on breadth, later positions focus on depth and precision. BE CONCISE!"
return base_prompt
# ZeroGPU-specific methods
async def _zerogpu_inference(self, model_id: str, prompt: str,
generation_params: Dict[str, Any]) -> Dict[str, Any]:
"""
ZeroGPU-accelerated inference using direct model loading.
Args:
model_id: HuggingFace model identifier
prompt: Input text prompt
generation_params: Generation parameters
Returns:
Generation result with GPU metrics
"""
if not (TORCH_AVAILABLE and TRANSFORMERS_AVAILABLE):
raise ZeroGPUError("PyTorch and Transformers required for ZeroGPU inference")
gpu_start_time = time.time()
initial_memory = torch.cuda.memory_allocated() if torch.cuda.is_available() else 0
try:
# Load or get cached model
if model_id not in self.loaded_models:
await self._load_model_to_gpu(model_id, generation_params)
model, tokenizer = self.loaded_models[model_id]
# Tokenize input
inputs = tokenizer(
prompt,
return_tensors="pt",
truncation=True,
max_length=2048
).to(model.device)
# Generate with GPU acceleration
with torch.no_grad():
outputs = model.generate(
**inputs,
max_new_tokens=generation_params.get("max_new_tokens", 512),
temperature=generation_params.get("temperature", 0.7),
top_p=generation_params.get("top_p", 0.9),
do_sample=generation_params.get("do_sample", True),
pad_token_id=tokenizer.eos_token_id,
repetition_penalty=generation_params.get("repetition_penalty", 1.1)
)
# Decode response
input_length = inputs['input_ids'].shape[1]
generated_tokens = outputs[0][input_length:]
response_text = tokenizer.decode(generated_tokens, skip_special_tokens=True)
# Calculate metrics
gpu_end_time = time.time()
final_memory = torch.cuda.memory_allocated() if torch.cuda.is_available() else 0
return {
"generated_text": response_text,
"gpu_time": gpu_end_time - gpu_start_time,
"gpu_memory_used": (final_memory - initial_memory) / 1024**3, # GB
"tokens_generated": len(generated_tokens)
}
except Exception as e:
logger.error(f"ZeroGPU inference failed for {model_id}: {e}")
# Cleanup on error
await self._cleanup_gpu_memory()
raise ZeroGPUError(f"GPU inference failed: {e}")
async def _load_model_to_gpu(self, model_id: str, generation_params: Dict[str, Any]):
"""Load model to GPU with memory management."""
if not torch.cuda.is_available():
raise ZeroGPUError("CUDA not available for model loading")
try:
# Check available memory
available_memory = torch.cuda.get_device_properties(0).total_memory
if self.gpu_memory_usage > self.gpu_memory_threshold * available_memory:
await self._cleanup_gpu_memory()
# Load tokenizer
tokenizer = AutoTokenizer.from_pretrained(model_id, trust_remote_code=True)
if tokenizer.pad_token is None:
tokenizer.pad_token = tokenizer.eos_token
# Load model with optimal settings
torch_dtype = getattr(torch, generation_params.get("torch_dtype", "float16"))
model = AutoModelForCausalLM.from_pretrained(
model_id,
torch_dtype=torch_dtype,
device_map="auto",
trust_remote_code=True,
low_cpu_mem_usage=True
)
# Cache the loaded model
self.loaded_models[model_id] = (model, tokenizer)
# Update memory usage tracking
current_memory = torch.cuda.memory_allocated()
self.gpu_memory_usage = current_memory
logger.info(f"Loaded {model_id} to GPU, memory usage: {current_memory / 1024**3:.2f} GB")
except Exception as e:
logger.error(f"Failed to load {model_id} to GPU: {e}")
raise ZeroGPUError(f"Model loading failed: {e}")
async def _cleanup_gpu_memory(self):
"""Clean up GPU memory by unloading models."""
if not torch.cuda.is_available():
return
# Clear model cache
for model_id in list(self.loaded_models.keys()):
model, tokenizer = self.loaded_models.pop(model_id)
del model, tokenizer
# Force garbage collection
gc.collect()
torch.cuda.empty_cache()
self.gpu_memory_usage = 0.0
logger.info("GPU memory cleaned up")
async def __aenter__(self):
"""Async context manager entry."""
self.session = aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=self.request_timeout))
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""Async context manager exit."""
if self.session:
await self.session.close()
# Cleanup GPU resources
if self.enable_zerogpu:
await self._cleanup_gpu_memory()
async def close_async(self) -> None:
"""Close async client and cleanup resources (LMStudioClient compatibility)."""
if self.session:
await self.session.close()
if self.enable_zerogpu:
await self._cleanup_gpu_memory()
if self.batch_processor_task and not self.batch_processor_task.done():
self.batch_processor_task.cancel()
try:
await self.batch_processor_task
except asyncio.CancelledError:
pass
async def generate_text(self,
prompt: str,
agent_type: ModelType = ModelType.GENERAL,
temperature: Optional[float] = None,
max_tokens: Optional[int] = None,
use_zerogpu: Optional[bool] = None,
priority: RequestPriority = RequestPriority.NORMAL,
**kwargs) -> HFResponse:
"""
Generate text using HuggingFace inference with ZeroGPU optimization.
Args:
prompt: Input prompt for text generation
agent_type: Type of agent requesting generation
temperature: Override temperature for this request
max_tokens: Override max tokens for this request
use_zerogpu: Force ZeroGPU usage (None for auto-detect)
priority: Request priority for processing order
**kwargs: Additional generation parameters
Returns:
HFResponse with generated text and metadata
"""
async with self.semaphore:
start_time = time.time()
self.concurrent_requests += 1
try:
# Get model configuration
config = self.model_configs.get(agent_type, self.model_configs[ModelType.GENERAL])
client = self.inference_clients.get(agent_type, self.inference_clients[ModelType.GENERAL])
# Determine if we should use ZeroGPU
should_use_zerogpu = (
use_zerogpu if use_zerogpu is not None
else (self.enable_zerogpu and config.use_zerogpu)
)
if not client and not should_use_zerogpu:
return HFResponse(
content="",
model_used=config.model_id,
tokens_used=0,
response_time=0.0,
success=False,
error=f"No inference client available for {agent_type.value}",
fallback_used=False
)
# Check token budget
estimated_tokens = max_tokens or config.max_tokens
if hasattr(self.token_budget_manager, 'can_allocate') and not self.token_budget_manager.can_allocate(estimated_tokens):
return HFResponse(
content="",
model_used=config.model_id,
tokens_used=0,
response_time=time.time() - start_time,
success=False,
error="Insufficient token budget",
fallback_used=False
)
# Prepare generation parameters
generation_params = {
"max_new_tokens": max_tokens or config.max_tokens,
"temperature": temperature or config.temperature,
"top_p": config.top_p,
"repetition_penalty": config.repetition_penalty,
"do_sample": True,
"return_full_text": False,
"torch_dtype": config.torch_dtype,
**kwargs
}
response_data = None
gpu_metrics = {}
fallback_used = False
# Try ZeroGPU first if enabled and available
if should_use_zerogpu:
try:
if self.debug_mode:
logger.info(f"Using ZeroGPU inference for {agent_type.value} with {config.model_id}")
gpu_result = await self._zerogpu_inference(
config.model_id, prompt, generation_params
)
response_data = [{
"generated_text": gpu_result["generated_text"]
}]
gpu_metrics = {
"gpu_time": gpu_result["gpu_time"],
"gpu_memory_used": gpu_result["gpu_memory_used"],
"tokens_generated": gpu_result["tokens_generated"]
}
except (ZeroGPUError, GPUMemoryError) as e:
logger.warning(f"ZeroGPU failed, falling back to Inference API: {e}")
fallback_used = True
should_use_zerogpu = False
# Fallback to Inference API if ZeroGPU failed or not enabled
if not response_data:
if not client:
raise Exception("No inference method available")
response_data = await self._make_inference_request(
client=client,
prompt=prompt,
parameters=generation_params
)
fallback_used = not should_use_zerogpu
# Process response
if response_data and isinstance(response_data, list) and len(response_data) > 0:
generated_text = response_data[0].get("generated_text", "")
tokens_used = self._estimate_tokens(prompt + generated_text)
# Allocate tokens if budget manager supports it
allocation = None
if hasattr(self.token_budget_manager, 'allocate_tokens'):
allocation = self.token_budget_manager.allocate_tokens(tokens_used)
# Track performance
response_time = time.time() - start_time
self._track_performance(agent_type, response_time, success=True)
# Update compatibility stats
self.total_tokens += tokens_used
self.total_requests += 1
self.total_response_time += response_time
if self.debug_mode:
method = "ZeroGPU" if (should_use_zerogpu and not fallback_used) else "Inference API"
logger.info(f"✅ {method} response for {agent_type.value}: {len(generated_text)} chars, {tokens_used} tokens, {response_time:.2f}s")
return HFResponse(
content=generated_text,
model_used=config.model_id,
tokens_used=tokens_used,
response_time=response_time,
success=True,
metadata={
"agent_type": agent_type.value,
"allocation_id": allocation.allocation_id if allocation else None,
"parameters": generation_params,
"method": "ZeroGPU" if (should_use_zerogpu and not fallback_used) else "Inference API"
},
gpu_memory_used=gpu_metrics.get("gpu_memory_used"),
gpu_time=gpu_metrics.get("gpu_time"),
fallback_used=fallback_used
)
else:
return HFResponse(
content="",
model_used=config.model_id,
tokens_used=0,
response_time=time.time() - start_time,
success=False,
error="Empty or invalid response from API",
fallback_used=fallback_used
)
except Exception as e:
self._track_performance(agent_type, time.time() - start_time, success=False)
logger.error(f"HF API request failed for {agent_type.value}: {e}")
return HFResponse(
content="",
model_used=config.model_id,
tokens_used=0,
response_time=time.time() - start_time,
success=False,
error=str(e),
fallback_used=False
)
finally:
self.concurrent_requests -= 1
async def _make_inference_request(self, client: InferenceClient, prompt: str, parameters: Dict[str, Any]):
"""Make inference request with proper error handling and Pro account optimizations."""
try:
# Remove ZeroGPU-specific parameters for Inference API
api_params = parameters.copy()
api_params.pop('torch_dtype', None)
# Use text generation task with Pro account optimizations
response = await asyncio.wait_for(
asyncio.create_task(
client.text_generation(
prompt=prompt,
**api_params
)
),
timeout=self.request_timeout
)
return [{"generated_text": response}] if isinstance(response, str) else response
except asyncio.TimeoutError:
raise Exception(f"Request timeout after {self.request_timeout}s")
except Exception as e:
raise Exception(f"Inference request failed: {e}")
def _estimate_tokens(self, text: str) -> int:
"""Estimate token count for text (rough approximation)."""
# Simple approximation: ~4 characters per token on average
return max(1, len(text) // 4)
def _track_performance(self, agent_type: ModelType, response_time: float, success: bool):
"""Track performance metrics for monitoring."""
# Track request counts
self.request_counts[agent_type] = self.request_counts.get(agent_type, 0) + 1
# Track response times
self.response_times.append(response_time)
if len(self.response_times) > 1000: # Keep last 1000 responses
self.response_times = self.response_times[-1000:]
# Track errors
if not success:
self.error_counts[agent_type] = self.error_counts.get(agent_type, 0) + 1
def get_performance_stats(self) -> Dict[str, Any]:
"""Get performance statistics with ZeroGPU metrics."""
avg_response_time = sum(self.response_times) / len(self.response_times) if self.response_times else 0
total_requests = sum(self.request_counts.values())
total_errors = sum(self.error_counts.values())
error_rate = (total_errors / total_requests) if total_requests > 0 else 0
# ZeroGPU specific stats
zerogpu_stats = {}
if self.enable_zerogpu and torch.cuda.is_available():
zerogpu_stats = {
"gpu_available": True,
"gpu_count": torch.cuda.device_count(),
"gpu_memory_allocated": torch.cuda.memory_allocated() / 1024**3, # GB
"gpu_memory_cached": torch.cuda.memory_reserved() / 1024**3, # GB
"loaded_models": list(self.loaded_models.keys()),
"current_gpu_memory_usage": self.gpu_memory_usage / 1024**3 if self.gpu_memory_usage else 0.0
}
base_stats = {
"total_requests": total_requests,
"total_errors": total_errors,
"error_rate": error_rate,
"avg_response_time": avg_response_time,
"requests_by_type": dict(self.request_counts),
"errors_by_type": dict(self.error_counts),
"zerogpu_enabled": self.enable_zerogpu,
"zerogpu_available": ZEROGPU_AVAILABLE,
}
if hasattr(self.token_budget_manager, 'get_status'):
base_stats["token_budget_status"] = self.token_budget_manager.get_status()
base_stats.update(zerogpu_stats)
return base_stats
def reset_performance_stats(self):
"""Reset performance tracking."""
self.request_counts.clear()
self.response_times.clear()
self.error_counts.clear()
async def health_check(self) -> Dict[str, bool]:
"""Check health of all configured models."""
health_status = {}
for model_type, config in self.model_configs.items():
try:
# Simple test request
response = await self.generate_text(
prompt="Hello",
agent_type=model_type,
max_tokens=10
)
health_status[model_type.value] = response.success
except Exception as e:
logger.error(f"Health check failed for {model_type.value}: {e}")
health_status[model_type.value] = False
return health_status
def get_available_models(self) -> Dict[str, str]:
"""Get list of available models by type."""
return {
model_type.value: config.model_id
for model_type, config in self.model_configs.items()
}
async def batch_generate(self,
prompts: List[str],
agent_types: List[ModelType],
use_zerogpu_batching: bool = True,
**kwargs) -> List[HFResponse]:
"""
Generate text for multiple prompts with ZeroGPU batching optimization.
Args:
prompts: List of input prompts
agent_types: List of agent types (must match prompts length)
use_zerogpu_batching: Enable ZeroGPU batch processing
**kwargs: Additional generation parameters
Returns:
List of HFResponse objects
"""
if len(prompts) != len(agent_types):
raise ValueError("Prompts and agent_types lists must have same length")
# Use ZeroGPU batching for same model types if enabled
if use_zerogpu_batching and self.enable_zerogpu:
try:
return await self._zerogpu_batch_process(prompts, agent_types, **kwargs)
except Exception as e:
logger.warning(f"ZeroGPU batching failed, falling back to individual requests: {e}")
# Create tasks for concurrent execution
tasks = [
self.generate_text(prompt=prompt, agent_type=agent_type, **kwargs)
for prompt, agent_type in zip(prompts, agent_types)
]
# Execute concurrently with semaphore limiting
results = await asyncio.gather(*tasks, return_exceptions=True)
# Convert exceptions to error responses
processed_results = []
for i, result in enumerate(results):
if isinstance(result, Exception):
processed_results.append(HFResponse(
content="",
model_used=self.model_configs[agent_types[i]].model_id,
tokens_used=0,
response_time=0.0,
success=False,
error=str(result)
))
else:
processed_results.append(result)
return processed_results
async def _zerogpu_batch_process(self, prompts: List[str], agent_types: List[ModelType], **kwargs) -> List[HFResponse]:
"""
Process multiple prompts using ZeroGPU batching for efficiency.
Args:
prompts: List of input prompts
agent_types: List of agent types
**kwargs: Additional parameters
Returns:
List of HFResponse objects
"""
# Group by model type for efficient batching
model_groups = {}
for i, (prompt, agent_type) in enumerate(zip(prompts, agent_types)):
config = self.model_configs.get(agent_type, self.model_configs[ModelType.GENERAL])
model_id = config.model_id
if model_id not in model_groups:
model_groups[model_id] = []
model_groups[model_id].append((i, prompt, agent_type, config))
# Process each model group with GPU batching
results = [None] * len(prompts)
start_time = time.time()
for model_id, group_items in model_groups.items():
batch_prompts = [item[1] for item in group_items]
batch_configs = [item[3] for item in group_items]
try:
# Use first config as representative
base_config = batch_configs[0]
generation_params = {
"max_new_tokens": kwargs.get('max_tokens', base_config.max_tokens),
"temperature": kwargs.get('temperature', base_config.temperature),
"top_p": base_config.top_p,
"repetition_penalty": base_config.repetition_penalty,
"do_sample": True,
"torch_dtype": base_config.torch_dtype,
}
# Process batch on GPU
batch_results = await self._zerogpu_batch_inference(
model_id, batch_prompts, generation_params
)
# Map results back to original positions
for (orig_idx, prompt, agent_type, config), batch_result in zip(group_items, batch_results):
tokens_used = self._estimate_tokens(prompt + batch_result["generated_text"])
response_time = batch_result.get("response_time", time.time() - start_time)
results[orig_idx] = HFResponse(
content=batch_result["generated_text"],
model_used=model_id,
tokens_used=tokens_used,
response_time=response_time,
success=True,
metadata={
"agent_type": agent_type.value,
"method": "ZeroGPU-Batch",
"batch_size": len(batch_prompts)
},
gpu_memory_used=batch_result.get("gpu_memory_used"),
gpu_time=batch_result.get("gpu_time"),
batch_processed=len(batch_prompts),
fallback_used=False
)
except Exception as e:
# Fall back to individual processing for this model group
logger.warning(f"Batch processing failed for {model_id}, using individual requests: {e}")
for orig_idx, prompt, agent_type, config in group_items:
try:
individual_result = await self.generate_text(
prompt=prompt,
agent_type=agent_type,
use_zerogpu=False, # Force Inference API fallback
**kwargs
)
results[orig_idx] = individual_result
except Exception as individual_e:
results[orig_idx] = HFResponse(
content="",
model_used=config.model_id,
tokens_used=0,
response_time=0.0,
success=False,
error=str(individual_e),
fallback_used=True
)
return results
async def _zerogpu_batch_inference(self, model_id: str, prompts: List[str], generation_params: Dict[str, Any]) -> List[Dict[str, Any]]:
"""
Process multiple prompts in a single ZeroGPU session for efficiency.
Args:
model_id: HuggingFace model identifier
prompts: List of input prompts
generation_params: Generation parameters
Returns:
List of generation results
"""
if not (TORCH_AVAILABLE and TRANSFORMERS_AVAILABLE):
raise ZeroGPUError("PyTorch and Transformers required for batch ZeroGPU inference")
gpu_start_time = time.time()
initial_memory = torch.cuda.memory_allocated() if torch.cuda.is_available() else 0
try:
# Load or get cached model
if model_id not in self.loaded_models:
await self._load_model_to_gpu(model_id, generation_params)
model, tokenizer = self.loaded_models[model_id]
results = []
# Process prompts individually but in the same GPU session
for i, prompt in enumerate(prompts):
prompt_start = time.time()
# Tokenize input
inputs = tokenizer(
prompt,
return_tensors="pt",
truncation=True,
max_length=2048
).to(model.device)
# Generate with GPU acceleration
with torch.no_grad():
outputs = model.generate(
**inputs,
max_new_tokens=generation_params.get("max_new_tokens", 512),
temperature=generation_params.get("temperature", 0.7),
top_p=generation_params.get("top_p", 0.9),
do_sample=generation_params.get("do_sample", True),
pad_token_id=tokenizer.eos_token_id,
repetition_penalty=generation_params.get("repetition_penalty", 1.1)
)
# Decode response
input_length = inputs['input_ids'].shape[1]
generated_tokens = outputs[0][input_length:]
response_text = tokenizer.decode(generated_tokens, skip_special_tokens=True)
prompt_end = time.time()
results.append({
"generated_text": response_text,
"response_time": prompt_end - prompt_start,
"tokens_generated": len(generated_tokens)
})
# Calculate overall GPU metrics
gpu_end_time = time.time()
final_memory = torch.cuda.memory_allocated() if torch.cuda.is_available() else 0
# Add GPU metrics to all results
total_gpu_time = gpu_end_time - gpu_start_time
gpu_memory_used = (final_memory - initial_memory) / 1024**3 # GB
for result in results:
result["gpu_time"] = total_gpu_time / len(results) # Distribute GPU time
result["gpu_memory_used"] = gpu_memory_used / len(results) # Distribute memory usage
return results
except Exception as e:
logger.error(f"ZeroGPU batch inference failed for {model_id}: {e}")
# Cleanup on error
await self._cleanup_gpu_memory()
raise ZeroGPUError(f"GPU batch inference failed: {e}")
# Utility functions for Felix Framework integration
def create_felix_hf_client(token_budget: int = 50000,
concurrent_requests: int = 5,
enable_zerogpu: bool = True,
debug_mode: bool = False) -> HuggingFaceClient:
"""
Create ZeroGPU-optimized HuggingFace client for Felix Framework deployment on HF Spaces.
Args:
token_budget: Total token budget for session
concurrent_requests: Maximum concurrent requests
enable_zerogpu: Enable ZeroGPU acceleration
debug_mode: Enable debug logging
Returns:
Configured HuggingFaceClient instance optimized for ZeroGPU and HF Pro
"""
# Create token manager with Felix-specific settings
token_manager = TokenBudgetManager(
base_budget=token_budget // 4, # Distribute among typical 4 agent types
strict_mode=True # Enable for ZeroGPU efficiency
)
# ZeroGPU and HF Pro optimized model configurations
felix_models = {
ModelType.RESEARCH: HFModelConfig(
model_id="microsoft/DialoGPT-large", # Upgraded for better performance
temperature=0.9,
max_tokens=256,
top_p=0.95,
use_zerogpu=True,
batch_size=2, # Efficient batching for research queries
torch_dtype="float16",
priority="normal"
),
ModelType.ANALYSIS: HFModelConfig(
model_id="meta-llama/Llama-3.1-8B-Instruct", # Better reasoning capability
temperature=0.5,
max_tokens=384,
top_p=0.9,
use_zerogpu=True,
batch_size=1,
torch_dtype="float16",
priority="high" # Pro account priority
),
ModelType.SYNTHESIS: HFModelConfig(
model_id="Qwen/Qwen2.5-7B-Instruct", # ZeroGPU-compatible synthesis (fits in 24GB)
temperature=0.1,
max_tokens=512,
top_p=0.85,
use_zerogpu=True,
batch_size=1,
torch_dtype="float16",
gpu_memory_limit=8.0, # 7B model fits comfortably
priority="high"
),
ModelType.CRITIC: HFModelConfig(
model_id="microsoft/DialoGPT-large",
temperature=0.3,
max_tokens=256,
top_p=0.9,
use_zerogpu=True,
batch_size=2,
torch_dtype="float16",
priority="normal"
)
}
return HuggingFaceClient(
model_configs=felix_models,
token_budget_manager=token_manager,
max_concurrent_requests=concurrent_requests,
request_timeout=45.0, # Longer timeout for ZeroGPU model loading
enable_zerogpu=enable_zerogpu,
gpu_memory_threshold=0.85, # Conservative memory management
batch_timeout=3.0, # Shorter batching timeout for responsiveness
debug_mode=debug_mode
)
def create_default_client(max_concurrent_requests: int = 4,
enable_zerogpu: bool = True) -> HuggingFaceClient:
"""Create ZeroGPU HuggingFace client with default settings (LMStudioClient compatibility)."""
return create_felix_hf_client(
concurrent_requests=max_concurrent_requests,
enable_zerogpu=enable_zerogpu
)
# Pro account specific optimizations
def get_pro_account_models() -> Dict[ModelType, HFModelConfig]:
"""
Get model configurations optimized for HF Pro accounts with access to premium models.
Returns:
Dictionary of premium model configurations
"""
return {
ModelType.RESEARCH: HFModelConfig(
model_id="meta-llama/Llama-3.1-8B-Instruct", # Premium access
temperature=0.9,
max_tokens=384,
use_zerogpu=True,
batch_size=3,
priority="high"
),
ModelType.ANALYSIS: HFModelConfig(
model_id="meta-llama/Llama-3.1-8B-Instruct", # ZeroGPU-compatible analysis (fits in 24GB)
temperature=0.5,
max_tokens=512,
use_zerogpu=True,
batch_size=1,
gpu_memory_limit=10.0, # 8B model fits in ZeroGPU
priority="high"
),
ModelType.SYNTHESIS: HFModelConfig(
model_id="Qwen/Qwen2.5-7B-Instruct", # ZeroGPU-compatible synthesis (fits in 24GB)
temperature=0.1,
max_tokens=768,
use_zerogpu=True,
batch_size=1,
gpu_memory_limit=8.0, # 7B model fits in ZeroGPU
priority="high"
),
ModelType.CRITIC: HFModelConfig(
model_id="meta-llama/Llama-3.1-8B-Instruct",
temperature=0.3,
max_tokens=384,
use_zerogpu=True,
batch_size=2,
priority="high"
)
}
# ZeroGPU deployment helpers
def estimate_gpu_requirements(model_configs: Dict[ModelType, HFModelConfig]) -> Dict[str, float]:
"""
Estimate GPU memory requirements for given model configurations.
Args:
model_configs: Model configurations to analyze
Returns:
Dictionary with memory estimates in GB
"""
# Rough model size estimates (in GB)
model_sizes = {
"microsoft/DialoGPT-medium": 1.5,
"microsoft/DialoGPT-large": 3.0,
"meta-llama/Llama-3.1-8B-Instruct": 16.0,
"meta-llama/Llama-3.1-13B-Instruct": 26.0,
"meta-llama/Llama-3.1-70B-Instruct": 140.0,
"Qwen/Qwen2.5-7B-Instruct": 14.0
}
requirements = {}
total_memory = 0.0
max_single_model = 0.0
for agent_type, config in model_configs.items():
model_memory = model_sizes.get(config.model_id, 8.0) # Default 8GB
requirements[f"{agent_type.value}_memory"] = model_memory
total_memory += model_memory
max_single_model = max(max_single_model, model_memory)
requirements.update({
"total_memory_if_all_loaded": total_memory,
"max_single_model_memory": max_single_model,
"recommended_gpu_memory": max_single_model * 1.5, # 50% buffer
"minimum_gpu_memory": max_single_model * 1.2 # 20% buffer
})
return requirements
# Export main classes and functions
__all__ = [
'HuggingFaceClient',
'HFResponse',
'HFModelConfig',
'ModelType',
'GPUMemoryError',
'ZeroGPUError',
'HuggingFaceConnectionError',
'create_felix_hf_client',
'create_default_client',
'get_pro_account_models',
'estimate_gpu_requirements',
'ZEROGPU_AVAILABLE',
'TORCH_AVAILABLE',
'TRANSFORMERS_AVAILABLE'
]