sema-chat / app /utils /helpers.py
kamau1's picture
Initial Commit
639f3bb
"""
Utility functions and helpers
"""
import re
import uuid
import hashlib
from typing import Optional, Dict, Any, List
from datetime import datetime, timezone
def generate_session_id(user_id: Optional[str] = None) -> str:
"""
Generate a unique session ID
Args:
user_id: Optional user identifier to include in session ID
Returns:
Unique session identifier
"""
timestamp = datetime.now(timezone.utc).strftime("%Y%m%d%H%M%S")
random_part = str(uuid.uuid4())[:8]
if user_id:
# Create a hash of user_id for privacy
user_hash = hashlib.md5(user_id.encode()).hexdigest()[:8]
return f"{user_hash}-{timestamp}-{random_part}"
else:
return f"anon-{timestamp}-{random_part}"
def generate_message_id() -> str:
"""Generate a unique message ID"""
return f"msg-{uuid.uuid4()}"
def sanitize_text(text: str, max_length: int = 4000) -> str:
"""
Sanitize and clean text input
Args:
text: Input text to sanitize
max_length: Maximum allowed length
Returns:
Sanitized text
"""
if not text:
return ""
# Remove excessive whitespace
text = re.sub(r'\s+', ' ', text.strip())
# Truncate if too long
if len(text) > max_length:
text = text[:max_length].rsplit(' ', 1)[0] + "..."
return text
def format_timestamp(dt: datetime) -> str:
"""
Format datetime for consistent display
Args:
dt: Datetime object
Returns:
Formatted timestamp string
"""
return dt.strftime("%Y-%m-%d %H:%M:%S UTC")
def estimate_tokens(text: str) -> int:
"""
Rough estimation of token count for text
Args:
text: Input text
Returns:
Estimated token count
"""
# Very rough estimation: ~4 characters per token on average
return max(1, len(text) // 4)
def truncate_conversation_history(
messages: List[Dict[str, Any]],
max_tokens: int = 2000
) -> List[Dict[str, Any]]:
"""
Truncate conversation history to fit within token limit
Args:
messages: List of message dictionaries
max_tokens: Maximum token limit
Returns:
Truncated list of messages
"""
if not messages:
return messages
# Always keep system message if present
system_messages = [msg for msg in messages if msg.get("role") == "system"]
other_messages = [msg for msg in messages if msg.get("role") != "system"]
# Estimate tokens for system messages
system_tokens = sum(estimate_tokens(msg.get("content", "")) for msg in system_messages)
available_tokens = max_tokens - system_tokens
if available_tokens <= 0:
return system_messages
# Add messages from the end (most recent first) until we hit the limit
selected_messages = []
current_tokens = 0
for msg in reversed(other_messages):
msg_tokens = estimate_tokens(msg.get("content", ""))
if current_tokens + msg_tokens <= available_tokens:
selected_messages.insert(0, msg)
current_tokens += msg_tokens
else:
break
return system_messages + selected_messages
def validate_session_id(session_id: str) -> bool:
"""
Validate session ID format
Args:
session_id: Session identifier to validate
Returns:
True if valid, False otherwise
"""
if not session_id or len(session_id) < 5 or len(session_id) > 100:
return False
# Allow alphanumeric, hyphens, and underscores
return bool(re.match(r'^[a-zA-Z0-9_-]+$', session_id))
def extract_model_name_from_path(model_path: str) -> str:
"""
Extract clean model name from HuggingFace model path
Args:
model_path: Full model path (e.g., "microsoft/DialoGPT-medium")
Returns:
Clean model name
"""
if "/" in model_path:
return model_path.split("/")[-1]
return model_path
def format_model_info(model_info: Dict[str, Any]) -> Dict[str, Any]:
"""
Format model information for API responses
Args:
model_info: Raw model information
Returns:
Formatted model information
"""
formatted = {
"name": model_info.get("name", "unknown"),
"type": model_info.get("type", "unknown"),
"loaded": model_info.get("loaded", False),
"capabilities": model_info.get("capabilities", []),
}
# Add backend-specific information
if "device" in model_info:
formatted["device"] = model_info["device"]
if "provider" in model_info:
formatted["provider"] = model_info["provider"]
if "parameters" in model_info:
formatted["parameters"] = model_info["parameters"]
return formatted
def create_error_response(
error_type: str,
message: str,
details: Optional[Dict[str, Any]] = None,
request_id: Optional[str] = None
) -> Dict[str, Any]:
"""
Create standardized error response
Args:
error_type: Type of error
message: Error message
details: Optional additional details
request_id: Optional request identifier
Returns:
Formatted error response
"""
return {
"error": error_type,
"message": message,
"details": details or {},
"timestamp": datetime.utcnow().isoformat(),
"request_id": request_id or generate_message_id()
}
def parse_model_backend_from_name(model_name: str) -> str:
"""
Guess the appropriate backend type from model name
Args:
model_name: Model name or path
Returns:
Suggested backend type
"""
model_lower = model_name.lower()
if "gpt" in model_lower and ("3.5" in model_lower or "4" in model_lower):
return "openai"
elif "claude" in model_lower:
return "anthropic"
elif any(provider in model_lower for provider in ["microsoft", "google", "meta", "huggingface"]):
return "hf_api" # Likely available via HF API
else:
return "local" # Default to local
def get_supported_model_examples() -> Dict[str, List[str]]:
"""
Get examples of supported models for each backend type
Returns:
Dictionary mapping backend types to example models
"""
return {
"local": [
"TinyLlama/TinyLlama-1.1B-Chat-v1.0",
"microsoft/DialoGPT-medium",
"Qwen/Qwen2.5-0.5B-Instruct",
"microsoft/phi-2"
],
"hf_api": [
"microsoft/DialoGPT-large",
"google/gemma-2b-it",
"microsoft/phi-2",
"meta-llama/Llama-2-7b-chat-hf"
],
"openai": [
"gpt-3.5-turbo",
"gpt-4",
"gpt-4-turbo",
"gpt-4o"
],
"anthropic": [
"claude-3-haiku-20240307",
"claude-3-sonnet-20240229",
"claude-3-opus-20240229",
"claude-3-5-sonnet-20241022"
]
}
def calculate_response_metrics(
start_time: float,
response_text: str,
token_count: Optional[int] = None
) -> Dict[str, Any]:
"""
Calculate response metrics for monitoring
Args:
start_time: Request start time
response_text: Generated response text
token_count: Actual token count if available
Returns:
Dictionary of metrics
"""
import time
end_time = time.time()
total_time = end_time - start_time
estimated_tokens = token_count or estimate_tokens(response_text)
tokens_per_second = estimated_tokens / total_time if total_time > 0 else 0
return {
"total_time": total_time,
"character_count": len(response_text),
"estimated_tokens": estimated_tokens,
"actual_tokens": token_count,
"tokens_per_second": tokens_per_second,
"words_count": len(response_text.split())
}