saemstunes commited on
Commit
a1f79e9
·
verified ·
1 Parent(s): f349495

Update src/utils.py

Browse files
Files changed (1) hide show
  1. src/utils.py +266 -10
src/utils.py CHANGED
@@ -1,28 +1,284 @@
1
  import json
 
 
 
2
  from datetime import datetime
3
- from typing import Any, Dict
 
 
 
4
 
5
  def json_serializer(obj: Any) -> str:
6
- """JSON serializer for objects not serializable by default json code"""
 
 
 
 
 
 
 
 
 
 
 
7
  if isinstance(obj, datetime):
8
  return obj.isoformat()
9
- raise TypeError(f"Type {type(obj)} not serializable")
 
 
 
 
 
10
 
11
  def format_response(response: Dict) -> str:
12
- """Format response for consistent output"""
 
 
 
 
 
 
 
 
13
  return json.dumps(response, default=json_serializer, indent=2)
14
 
15
  def get_timestamp() -> str:
16
- """Get current timestamp in ISO format"""
 
 
 
 
 
17
  return datetime.now().isoformat()
18
 
19
- def validate_environment_variables(required_vars: list) -> bool:
20
- """Validate that required environment variables are set"""
21
- import os
 
 
 
 
 
 
 
22
  missing_vars = [var for var in required_vars if not os.getenv(var)]
23
 
24
  if missing_vars:
25
- print(f"Missing environment variables: {', '.join(missing_vars)}")
26
  return False
27
 
28
- return True
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
  import json
2
+ import logging
3
+ import os
4
+ import hashlib
5
  from datetime import datetime
6
+ from typing import Any, Dict, List, Optional
7
+ from urllib.parse import urlencode
8
+ import aiohttp
9
+ import asyncio
10
 
11
  def json_serializer(obj: Any) -> str:
12
+ """
13
+ JSON serializer for objects not serializable by default json code.
14
+
15
+ Args:
16
+ obj: Object to serialize
17
+
18
+ Returns:
19
+ Serialized string
20
+
21
+ Raises:
22
+ TypeError: If object type is not supported
23
+ """
24
  if isinstance(obj, datetime):
25
  return obj.isoformat()
26
+ elif hasattr(obj, '__dict__'):
27
+ return obj.__dict__
28
+ elif isinstance(obj, set):
29
+ return list(obj)
30
+ else:
31
+ raise TypeError(f"Type {type(obj)} not serializable")
32
 
33
  def format_response(response: Dict) -> str:
34
+ """
35
+ Format response for consistent output.
36
+
37
+ Args:
38
+ response: Response dictionary to format
39
+
40
+ Returns:
41
+ Formatted JSON string
42
+ """
43
  return json.dumps(response, default=json_serializer, indent=2)
44
 
45
  def get_timestamp() -> str:
46
+ """
47
+ Get current timestamp in ISO format.
48
+
49
+ Returns:
50
+ Current timestamp as ISO string
51
+ """
52
  return datetime.now().isoformat()
53
 
54
+ def validate_environment_variables(required_vars: List[str]) -> bool:
55
+ """
56
+ Validate that required environment variables are set.
57
+
58
+ Args:
59
+ required_vars: List of required environment variable names
60
+
61
+ Returns:
62
+ True if all variables are set, False otherwise
63
+ """
64
  missing_vars = [var for var in required_vars if not os.getenv(var)]
65
 
66
  if missing_vars:
67
+ logging.error(f"Missing environment variables: {', '.join(missing_vars)}")
68
  return False
69
 
70
+ return True
71
+
72
+ def calculate_md5_hash(text: str) -> str:
73
+ """
74
+ Calculate MD5 hash of text.
75
+
76
+ Args:
77
+ text: Text to hash
78
+
79
+ Returns:
80
+ MD5 hash string
81
+ """
82
+ return hashlib.md5(text.encode('utf-8')).hexdigest()
83
+
84
+ def sanitize_filename(filename: str) -> str:
85
+ """
86
+ Sanitize filename to remove potentially dangerous characters.
87
+
88
+ Args:
89
+ filename: Original filename
90
+
91
+ Returns:
92
+ Sanitized filename
93
+ """
94
+ # Remove directory traversal attempts
95
+ filename = filename.replace('../', '').replace('..\\', '')
96
+
97
+ # Remove dangerous characters
98
+ dangerous_chars = ['/', '\\', ':', '*', '?', '"', '<', '>', '|']
99
+ for char in dangerous_chars:
100
+ filename = filename.replace(char, '_')
101
+
102
+ return filename
103
+
104
+ async def async_get_request(url: str, headers: Optional[Dict] = None, timeout: int = 30) -> Dict[str, Any]:
105
+ """
106
+ Make asynchronous GET request.
107
+
108
+ Args:
109
+ url: URL to request
110
+ headers: Optional headers
111
+ timeout: Request timeout in seconds
112
+
113
+ Returns:
114
+ Response dictionary
115
+ """
116
+ try:
117
+ async with aiohttp.ClientSession() as session:
118
+ async with session.get(url, headers=headers, timeout=timeout) as response:
119
+ return {
120
+ 'status': response.status,
121
+ 'headers': dict(response.headers),
122
+ 'content': await response.text(),
123
+ 'url': str(response.url)
124
+ }
125
+ except Exception as e:
126
+ return {
127
+ 'status': 0,
128
+ 'error': str(e),
129
+ 'content': '',
130
+ 'url': url
131
+ }
132
+
133
+ async def async_post_request(url: str, data: Any, headers: Optional[Dict] = None, timeout: int = 30) -> Dict[str, Any]:
134
+ """
135
+ Make asynchronous POST request.
136
+
137
+ Args:
138
+ url: URL to request
139
+ data: Data to send
140
+ headers: Optional headers
141
+ timeout: Request timeout in seconds
142
+
143
+ Returns:
144
+ Response dictionary
145
+ """
146
+ try:
147
+ async with aiohttp.ClientSession() as session:
148
+ async with session.post(url, json=data, headers=headers, timeout=timeout) as response:
149
+ return {
150
+ 'status': response.status,
151
+ 'headers': dict(response.headers),
152
+ 'content': await response.text(),
153
+ 'url': str(response.url)
154
+ }
155
+ except Exception as e:
156
+ return {
157
+ 'status': 0,
158
+ 'error': str(e),
159
+ 'content': '',
160
+ 'url': url
161
+ }
162
+
163
+ def format_duration(seconds: float) -> str:
164
+ """
165
+ Format duration in seconds to human-readable string.
166
+
167
+ Args:
168
+ seconds: Duration in seconds
169
+
170
+ Returns:
171
+ Formatted duration string
172
+ """
173
+ if seconds < 1:
174
+ return f"{seconds * 1000:.0f}ms"
175
+ elif seconds < 60:
176
+ return f"{seconds:.1f}s"
177
+ elif seconds < 3600:
178
+ minutes = seconds / 60
179
+ return f"{minutes:.1f}m"
180
+ else:
181
+ hours = seconds / 3600
182
+ return f"{hours:.1f}h"
183
+
184
+ def get_file_size(filepath: str) -> Optional[int]:
185
+ """
186
+ Get file size in bytes.
187
+
188
+ Args:
189
+ filepath: Path to file
190
+
191
+ Returns:
192
+ File size in bytes or None if file doesn't exist
193
+ """
194
+ try:
195
+ return os.path.getsize(filepath)
196
+ except OSError:
197
+ return None
198
+
199
+ def create_directory_if_not_exists(directory: str):
200
+ """
201
+ Create directory if it doesn't exist.
202
+
203
+ Args:
204
+ directory: Directory path to create
205
+ """
206
+ if not os.path.exists(directory):
207
+ os.makedirs(directory, exist_ok=True)
208
+
209
+ def setup_logging(log_level: str = "INFO", log_file: Optional[str] = None):
210
+ """
211
+ Setup logging configuration.
212
+
213
+ Args:
214
+ log_level: Logging level
215
+ log_file: Optional log file path
216
+ """
217
+ log_format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
218
+
219
+ handlers = [logging.StreamHandler()]
220
+ if log_file:
221
+ handlers.append(logging.FileHandler(log_file))
222
+
223
+ logging.basicConfig(
224
+ level=getattr(logging, log_level.upper()),
225
+ format=log_format,
226
+ handlers=handlers
227
+ )
228
+
229
+ def truncate_text(text: str, max_length: int = 100) -> str:
230
+ """
231
+ Truncate text to maximum length.
232
+
233
+ Args:
234
+ text: Text to truncate
235
+ max_length: Maximum length
236
+
237
+ Returns:
238
+ Truncated text
239
+ """
240
+ if len(text) <= max_length:
241
+ return text
242
+ return text[:max_length-3] + "..."
243
+
244
+ def is_valid_url(url: str) -> bool:
245
+ """
246
+ Check if string is a valid URL.
247
+
248
+ Args:
249
+ url: URL to validate
250
+
251
+ Returns:
252
+ True if valid URL, False otherwise
253
+ """
254
+ try:
255
+ from urllib.parse import urlparse
256
+ result = urlparse(url)
257
+ return all([result.scheme, result.netloc])
258
+ except:
259
+ return False
260
+
261
+ def get_memory_usage_mb() -> float:
262
+ """
263
+ Get current process memory usage in MB.
264
+
265
+ Returns:
266
+ Memory usage in MB
267
+ """
268
+ import psutil
269
+ process = psutil.Process(os.getpid())
270
+ return process.memory_info().rss / 1024 / 1024
271
+
272
+ def retry_on_exception(max_retries: int = 3, delay: float = 1.0, exceptions: tuple = (Exception,)):
273
+ """
274
+ Decorator for retrying function on exception.
275
+
276
+ Args:
277
+ max_retries: Maximum number of retries
278
+ delay: Delay between retries in seconds
279
+ exceptions: Exceptions to catch
280
+
281
+ Returns:
282
+ Decorated function
283
+ """
284
+ def