File size: 6,197 Bytes
d4f1687
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
"""
Utility functions for the RAG Agent and API Layer system.

This module contains helper functions, logging setup, and common utilities.
"""
import logging
import uuid
from datetime import datetime
from typing import Dict, Any, Optional
from .models import ErrorResponse


def setup_logging(level: str = "INFO") -> None:
    """
    Set up logging configuration for the application.

    Args:
        level: Logging level as a string (DEBUG, INFO, WARNING, ERROR, CRITICAL)
    """
    logging.basicConfig(
        level=getattr(logging, level.upper()),
        format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
        handlers=[
            logging.StreamHandler(),
        ]
    )


def generate_response_id() -> str:
    """
    Generate a unique identifier for API responses.

    Returns:
        String identifier in UUID format
    """
    return f"resp_{uuid.uuid4().hex[:8]}"


def format_timestamp() -> str:
    """
    Generate an ISO 8601 formatted timestamp.

    Returns:
        ISO 8601 formatted timestamp string
    """
    return datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ")


def create_error_response(error_code: str, message: str, details: Optional[Dict[str, Any]] = None) -> ErrorResponse:
    """
    Create a standardized error response.

    Args:
        error_code: Error code string
        message: Human-readable error message
        details: Optional additional error details

    Returns:
        ErrorResponse instance with standardized format
    """
    error_info = {
        "code": error_code,
        "message": message
    }

    if details:
        error_info["details"] = details

    return ErrorResponse(
        error=error_info,
        timestamp=format_timestamp()
    )


def sanitize_input(text: str) -> str:
    """
    Sanitize user input to prevent injection attacks.

    Args:
        text: Input text to sanitize

    Returns:
        Sanitized text with potentially harmful characters escaped
    """
    # Basic sanitization - in a real implementation, you might want more sophisticated
    # sanitization depending on your specific security requirements
    if not isinstance(text, str):
        return ""

    # Remove or escape potentially dangerous characters
    sanitized = text.replace("<script", "&lt;script").replace("javascript:", "javascript_")
    return sanitized


def validate_url(url: str) -> bool:
    """
    Validate that a string is a properly formatted URL.

    Args:
        url: URL string to validate

    Returns:
        True if URL is valid, False otherwise
    """
    import re
    url_pattern = re.compile(
        r'^https?://'  # http:// or https://
        r'(?:(?:[A-Z0-9](?:[A-Z0-9-]{0,61}[A-Z0-9])?\.)+[A-Z]{2,6}\.?|'  # domain...
        r'localhost|'  # localhost...
        r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})'  # ...or ip
        r'(?::\d+)?'  # optional port
        r'(?:/?|[/?]\S+)$', re.IGNORECASE)
    return url_pattern.match(url) is not None


def format_confidence_score(score: float) -> float:
    """
    Format a confidence score to be within valid bounds.

    Args:
        score: Raw confidence score

    Returns:
        Confidence score normalized to 0.0-1.0 range
    """
    return max(0.0, min(1.0, score))


def extract_content_chunks(text: str, chunk_size: int = 1000, overlap: int = 100) -> list[str]:
    """
    Split text into overlapping chunks.

    Args:
        text: Text to split into chunks
        chunk_size: Size of each chunk in characters
        overlap: Overlap between chunks in characters

    Returns:
        List of text chunks
    """
    if len(text) <= chunk_size:
        return [text]

    chunks = []
    start = 0

    while start < len(text):
        end = start + chunk_size
        chunk = text[start:end]
        chunks.append(chunk)

        start = end - overlap

        # Handle the last chunk properly
        if end >= len(text):
            break

    return chunks


def calculate_similarity_score(text1: str, text2: str) -> float:
    """
    Calculate a basic similarity score between two texts using a simple approach.
    NOTE: This is a simplified implementation. In a real system, you would use
    vector embeddings and cosine similarity or other advanced methods.

    Args:
        text1: First text for comparison
        text2: Second text for comparison

    Returns:
        Similarity score between 0.0 and 1.0
    """
    if not text1 or not text2:
        return 0.0

    # Simple word overlap approach (case-insensitive)
    words1 = set(text1.lower().split())
    words2 = set(text2.lower().split())

    intersection = words1.intersection(words2)
    union = words1.union(words2)

    if not union:
        return 0.0

    return len(intersection) / len(union)


class RateLimiter:
    """
    Simple rate limiter class to control API request frequency.
    """
    def __init__(self, max_requests: int = 100, window_seconds: int = 3600):
        """
        Initialize the rate limiter.

        Args:
            max_requests: Maximum number of requests allowed per window
            window_seconds: Time window in seconds
        """
        self.max_requests = max_requests
        self.window_seconds = window_seconds
        self.requests = {}  # Dictionary to track requests per identifier

    def is_allowed(self, identifier: str) -> bool:
        """
        Check if a request from the given identifier is allowed.

        Args:
            identifier: Identifier for the request (e.g., IP address, user ID)

        Returns:
            True if request is allowed, False if rate limit exceeded
        """
        current_time = datetime.utcnow().timestamp()

        if identifier not in self.requests:
            self.requests[identifier] = []

        # Clean old requests outside the window
        self.requests[identifier] = [
            req_time for req_time in self.requests[identifier]
            if current_time - req_time < self.window_seconds
        ]

        # Check if we're under the limit
        if len(self.requests[identifier]) < self.max_requests:
            self.requests[identifier].append(current_time)
            return True

        return False