Chatopus / app /utils.py
VietCat's picture
fix duplicate message
b249c92
raw
history blame
4.37 kB
import time
from functools import wraps
from loguru import logger
from typing import Any, Callable
import os
import asyncio
import httpx
def timing_decorator_async(func: Callable) -> Callable:
"""
Decorator đo thời gian thực thi của hàm async, log thời lượng xử lý.
Dùng cho async def.
"""
@wraps(func)
async def wrapper(*args: Any, **kwargs: Any) -> Any:
start_time = time.time()
result = await func(*args, **kwargs)
end_time = time.time()
duration = end_time - start_time
logger.info(f"[TIMING][async] {func.__name__} took {duration:.2f} seconds to execute")
return result
return wrapper
def timing_decorator_sync(func: Callable) -> Callable:
"""
Decorator đo thời gian thực thi của hàm sync, log thời lượng xử lý.
Dùng cho def thường.
"""
@wraps(func)
def wrapper(*args: Any, **kwargs: Any) -> Any:
start_time = time.time()
result = func(*args, **kwargs)
end_time = time.time()
duration = end_time - start_time
logger.info(f"[TIMING][sync] {func.__name__} took {duration:.2f} seconds to execute")
return result
return wrapper
def setup_logging(log_level: str = "INFO") -> None:
"""
Thiết lập logging với loguru, log ra file và console.
Input: log_level (str) - mức log.
Output: None.
"""
logger.remove() # Remove default handler
# logger.add(
# "logs/webot.log",
# rotation="500 MB",
# retention="10 days",
# level=log_level,
# format="{time:YYYY-MM-DD HH:mm:ss} | {level} | {message}",
# )
logger.add(
lambda msg: print(msg),
level=log_level,
format="{time:YYYY-MM-DD HH:mm:ss} | {level} | {message}",
)
def extract_command(text: str) -> tuple[str, str]:
"""
Tách lệnh (bắt đầu bằng \) và phần còn lại từ message.
Input: text (str) - message từ user.
Output: (command, remaining_text) - tuple (str, str).
"""
if not text.startswith("\\"):
return "", text
parts = text.split(maxsplit=1)
command = parts[0][1:] # Remove the backslash
remaining = parts[1] if len(parts) > 1 else ""
return command, remaining
def extract_keywords(text: str, keywords: list[str]) -> list[str]:
"""
Tìm các từ khóa xuất hiện trong message.
Input: text (str), keywords (list[str])
Output: list[str] các từ khóa tìm thấy.
"""
return [keyword for keyword in keywords if keyword.lower() in text.lower()]
def ensure_log_dir():
"""
Đảm bảo thư mục logs tồn tại, tạo nếu chưa có.
Input: None
Output: None
"""
# os.makedirs("logs", exist_ok=True)
def validate_config(settings) -> None:
"""
Kiểm tra các biến môi trường/config bắt buộc, raise lỗi nếu thiếu.
Input: settings (Settings)
Output: None (raise RuntimeError nếu thiếu)
"""
missing = []
for field in [
'facebook_verify_token', 'facebook_app_secret',
'google_sheets_credentials_file', 'google_sheets_token_file', 'conversation_sheet_id',
'supabase_url', 'supabase_key']:
if not getattr(settings, field, None):
missing.append(field)
if missing:
raise RuntimeError(f"Missing config: {', '.join(missing)}")
def get_logger():
return logger
async def call_endpoint_with_retry(client, url, payload, max_retries=3, base_timeout=30, headers=None):
logger = get_logger()
timeout = base_timeout
for attempt in range(1, max_retries + 1):
try:
response = await client.post(url, json=payload, timeout=timeout, headers=headers)
response.raise_for_status()
return response
except httpx.TimeoutException as e:
if attempt == max_retries:
raise
else:
logger.warning(f"Timeout (attempt {attempt}/{max_retries}), retrying with timeout={timeout * 2}s...")
timeout *= 2
await asyncio.sleep(1)
except httpx.HTTPStatusError as e:
logger.error(f"HTTP error: {e.response.status_code} - {e.response.text}")
raise
except Exception as e:
logger.error(f"Other error: {e}")
raise