Spaces:
Sleeping
Sleeping
File size: 6,197 Bytes
d4f1687 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 |
"""
Utility functions for the RAG Agent and API Layer system.
This module contains helper functions, logging setup, and common utilities.
"""
import logging
import uuid
from datetime import datetime
from typing import Dict, Any, Optional
from .models import ErrorResponse
def setup_logging(level: str = "INFO") -> None:
"""
Set up logging configuration for the application.
Args:
level: Logging level as a string (DEBUG, INFO, WARNING, ERROR, CRITICAL)
"""
logging.basicConfig(
level=getattr(logging, level.upper()),
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler(),
]
)
def generate_response_id() -> str:
"""
Generate a unique identifier for API responses.
Returns:
String identifier in UUID format
"""
return f"resp_{uuid.uuid4().hex[:8]}"
def format_timestamp() -> str:
"""
Generate an ISO 8601 formatted timestamp.
Returns:
ISO 8601 formatted timestamp string
"""
return datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ")
def create_error_response(error_code: str, message: str, details: Optional[Dict[str, Any]] = None) -> ErrorResponse:
"""
Create a standardized error response.
Args:
error_code: Error code string
message: Human-readable error message
details: Optional additional error details
Returns:
ErrorResponse instance with standardized format
"""
error_info = {
"code": error_code,
"message": message
}
if details:
error_info["details"] = details
return ErrorResponse(
error=error_info,
timestamp=format_timestamp()
)
def sanitize_input(text: str) -> str:
"""
Sanitize user input to prevent injection attacks.
Args:
text: Input text to sanitize
Returns:
Sanitized text with potentially harmful characters escaped
"""
# Basic sanitization - in a real implementation, you might want more sophisticated
# sanitization depending on your specific security requirements
if not isinstance(text, str):
return ""
# Remove or escape potentially dangerous characters
sanitized = text.replace("<script", "<script").replace("javascript:", "javascript_")
return sanitized
def validate_url(url: str) -> bool:
"""
Validate that a string is a properly formatted URL.
Args:
url: URL string to validate
Returns:
True if URL is valid, False otherwise
"""
import re
url_pattern = re.compile(
r'^https?://' # http:// or https://
r'(?:(?:[A-Z0-9](?:[A-Z0-9-]{0,61}[A-Z0-9])?\.)+[A-Z]{2,6}\.?|' # domain...
r'localhost|' # localhost...
r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})' # ...or ip
r'(?::\d+)?' # optional port
r'(?:/?|[/?]\S+)$', re.IGNORECASE)
return url_pattern.match(url) is not None
def format_confidence_score(score: float) -> float:
"""
Format a confidence score to be within valid bounds.
Args:
score: Raw confidence score
Returns:
Confidence score normalized to 0.0-1.0 range
"""
return max(0.0, min(1.0, score))
def extract_content_chunks(text: str, chunk_size: int = 1000, overlap: int = 100) -> list[str]:
"""
Split text into overlapping chunks.
Args:
text: Text to split into chunks
chunk_size: Size of each chunk in characters
overlap: Overlap between chunks in characters
Returns:
List of text chunks
"""
if len(text) <= chunk_size:
return [text]
chunks = []
start = 0
while start < len(text):
end = start + chunk_size
chunk = text[start:end]
chunks.append(chunk)
start = end - overlap
# Handle the last chunk properly
if end >= len(text):
break
return chunks
def calculate_similarity_score(text1: str, text2: str) -> float:
"""
Calculate a basic similarity score between two texts using a simple approach.
NOTE: This is a simplified implementation. In a real system, you would use
vector embeddings and cosine similarity or other advanced methods.
Args:
text1: First text for comparison
text2: Second text for comparison
Returns:
Similarity score between 0.0 and 1.0
"""
if not text1 or not text2:
return 0.0
# Simple word overlap approach (case-insensitive)
words1 = set(text1.lower().split())
words2 = set(text2.lower().split())
intersection = words1.intersection(words2)
union = words1.union(words2)
if not union:
return 0.0
return len(intersection) / len(union)
class RateLimiter:
"""
Simple rate limiter class to control API request frequency.
"""
def __init__(self, max_requests: int = 100, window_seconds: int = 3600):
"""
Initialize the rate limiter.
Args:
max_requests: Maximum number of requests allowed per window
window_seconds: Time window in seconds
"""
self.max_requests = max_requests
self.window_seconds = window_seconds
self.requests = {} # Dictionary to track requests per identifier
def is_allowed(self, identifier: str) -> bool:
"""
Check if a request from the given identifier is allowed.
Args:
identifier: Identifier for the request (e.g., IP address, user ID)
Returns:
True if request is allowed, False if rate limit exceeded
"""
current_time = datetime.utcnow().timestamp()
if identifier not in self.requests:
self.requests[identifier] = []
# Clean old requests outside the window
self.requests[identifier] = [
req_time for req_time in self.requests[identifier]
if current_time - req_time < self.window_seconds
]
# Check if we're under the limit
if len(self.requests[identifier]) < self.max_requests:
self.requests[identifier].append(current_time)
return True
return False |