File size: 7,344 Bytes
dbb6988
4c4c55e
dbb6988
 
4c4c55e
dbb6988
b249c92
 
dbb6988
3c6d6b3
dbb6988
 
3c6d6b3
dbb6988
 
 
 
 
 
 
3c6d6b3
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
dbb6988
 
 
 
 
 
 
 
 
 
ef2ce34
 
 
 
 
 
 
dbb6988
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
88ec76e
dbb6988
 
 
 
 
 
 
 
5e2e440
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
dbb6988
b249c92
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
4c4c55e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
e529ed6
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
import time
import random
from functools import wraps
from loguru import logger
from typing import Any, Callable, List
import os
import asyncio
import httpx

def timing_decorator_async(func: Callable) -> Callable:
    """
    Decorator đo thời gian thực thi của hàm async, log thời lượng xử lý.
    Dùng cho async def.
    """
    @wraps(func)
    async def wrapper(*args: Any, **kwargs: Any) -> Any:
        start_time = time.time()
        result = await func(*args, **kwargs)
        end_time = time.time()
        duration = end_time - start_time
        logger.info(f"[TIMING][async] {func.__name__} took {duration:.2f} seconds to execute")
        return result
    return wrapper

def timing_decorator_sync(func: Callable) -> Callable:
    """
    Decorator đo thời gian thực thi của hàm sync, log thời lượng xử lý.
    Dùng cho def thường.
    """
    @wraps(func)
    def wrapper(*args: Any, **kwargs: Any) -> Any:
        start_time = time.time()
        result = func(*args, **kwargs)
        end_time = time.time()
        duration = end_time - start_time
        logger.info(f"[TIMING][sync] {func.__name__} took {duration:.2f} seconds to execute")
        return result
    return wrapper

def setup_logging(log_level: str = "INFO") -> None:
    """
    Thiết lập logging với loguru, log ra file và console.
    Input: log_level (str) - mức log.
    Output: None.
    """
    logger.remove()  # Remove default handler
    # logger.add(
    #     "logs/webot.log",
    #     rotation="500 MB",
    #     retention="10 days",
    #     level=log_level,
    #     format="{time:YYYY-MM-DD HH:mm:ss} | {level} | {message}",
    # )
    logger.add(
        lambda msg: print(msg),
        level=log_level,
        format="{time:YYYY-MM-DD HH:mm:ss} | {level} | {message}",
    )

def extract_command(text: str) -> tuple[str, str]:
    """
    Tách lệnh (bắt đầu bằng \) và phần còn lại từ message.
    Input: text (str) - message từ user.
    Output: (command, remaining_text) - tuple (str, str).
    """
    if not text.startswith("\\"):
        return "", text
    
    parts = text.split(maxsplit=1)
    command = parts[0][1:]  # Remove the backslash
    remaining = parts[1] if len(parts) > 1 else ""
    return command, remaining

def extract_keywords(text: str, keywords: list[str]) -> list[str]:
    """
    Tìm các từ khóa xuất hiện trong message.
    Input: text (str), keywords (list[str])
    Output: list[str] các từ khóa tìm thấy.
    """
    return [keyword for keyword in keywords if keyword.lower() in text.lower()]

def ensure_log_dir():
    """
    Đảm bảo thư mục logs tồn tại, tạo nếu chưa có.
    Input: None
    Output: None
    """
    # os.makedirs("logs", exist_ok=True)

def validate_config(settings) -> None:
    """
    Kiểm tra các biến môi trường/config bắt buộc, raise lỗi nếu thiếu.
    Input: settings (Settings)
    Output: None (raise RuntimeError nếu thiếu)
    """
    missing = []
    # Facebook
    if not getattr(settings, 'facebook_verify_token', None):
        missing.append('facebook_verify_token')
    if not getattr(settings, 'facebook_app_secret', None):
        missing.append('facebook_app_secret')
    # Google Sheets: chấp nhận 1 trong 2 biến
    if not (getattr(settings, 'google_sheets_credentials_file', None) or os.getenv("GOOGLE_SHEETS_CREDENTIALS_JSON")):
        missing.append('google_sheets_credentials_file or GOOGLE_SHEETS_CREDENTIALS_JSON')
    if not getattr(settings, 'google_sheets_token_file', None):
        missing.append('google_sheets_token_file')
    if not getattr(settings, 'conversation_sheet_id', None):
        missing.append('conversation_sheet_id')
    # Supabase
    if not getattr(settings, 'supabase_url', None):
        missing.append('supabase_url')
    if not getattr(settings, 'supabase_key', None):
        missing.append('supabase_key')
    if missing:
        raise RuntimeError(f"Missing config: {', '.join(missing)}")

def get_logger():
    return logger

async def call_endpoint_with_retry(client, url, payload, max_retries=3, base_timeout=30, headers=None):
    logger = get_logger()
    timeout = base_timeout
    for attempt in range(1, max_retries + 1):
        try:
            response = await client.post(url, json=payload, timeout=timeout, headers=headers)
            response.raise_for_status()
            return response
        except httpx.TimeoutException as e:
            if attempt == max_retries:
                raise
            else:
                logger.warning(f"Timeout (attempt {attempt}/{max_retries}), retrying with timeout={timeout * 2}s...")
                timeout *= 2
                await asyncio.sleep(1)
        except httpx.HTTPStatusError as e:
            logger.error(f"HTTP error: {e.response.status_code} - {e.response.text}")
            raise
        except Exception as e:
            logger.error(f"Other error: {e}")
            raise 

def get_random_message(message_list: List[str]) -> str:
    """
    Lấy ngẫu nhiên một message từ danh sách messages.
    
    Args:
        message_list (List[str]): Danh sách các messages có sẵn
        
    Returns:
        str: Message được chọn ngẫu nhiên, hoặc message mặc định nếu danh sách rỗng
        
    Example:
        >>> messages = ["Message 1", "Message 2", "Message 3"]
        >>> get_random_message(messages)
        "Message 2"  # hoặc message khác ngẫu nhiên
    """
    if not message_list:
        return "Đang xử lý..."
    
    return random.choice(message_list)

def _safe_truncate(
    s: str,
    nguong_a: int = 100,
    do_dai_x: int = 50,
    do_dai_y: int = 100
) -> str:
    """
    Cắt chuỗi một cách thông minh dựa trên độ dài của nó.

    - Nếu độ dài chuỗi < nguong_a: chỉ hiển thị `do_dai_x` ký tự đầu tiên.
    - Nếu độ dài chuỗi >= nguong_a: hiển thị `do_dai_y` ký tự đầu và `do_dai_y` ký tự cuối.

    Args:
        s: Chuỗi đầu vào cần xử lý.
        nguong_a (A): Ngưỡng độ dài để quyết định logic cắt chuỗi.
        do_dai_x (X): Số ký tự đầu tiên cần hiển thị cho chuỗi ngắn.
        do_dai_y (Y): Số ký tự đầu/cuối cần hiển thị cho chuỗi dài.

    Returns:
        Chuỗi đã được cắt ngắn theo quy tắc.
    """
    if not isinstance(s, str):
        s = str(s)

    s_len = len(s)

    # --- Trường hợp 1: Độ dài chuỗi NGẮN HƠN ngưỡng A ---
    if s_len < nguong_a:
        # Nếu chuỗi đã ngắn hơn hoặc bằng X, trả về nguyên bản
        if s_len <= do_dai_x:
            return s
        # Nếu không, cắt lấy X ký tự đầu
        return f"{s[:do_dai_x]}... [đã cắt]"

    # --- Trường hợp 2: Độ dài chuỗi DÀI HƠN hoặc BẰNG ngưỡng A ---
    else:
        # Nếu việc lấy Y ký tự đầu và Y cuối sẽ bao trọn cả chuỗi (2*Y >= s_len)
        # thì không cần cắt để tránh hiển thị trùng lặp.
        if s_len <= do_dai_y * 2:
            return s
        # Ngược lại, lấy Y ký tự đầu và Y ký tự cuối
        return f"{s[:do_dai_y]}... [đã cắt] ...{s[-do_dai_y:]}"