| | from typing import Any, Dict, List, Optional |
| | from google.oauth2.credentials import Credentials |
| | from google_auth_oauthlib.flow import InstalledAppFlow |
| | from google.auth.transport.requests import Request |
| | from googleapiclient.discovery import build |
| | import os |
| | import pickle |
| | from datetime import datetime |
| | from loguru import logger |
| | import json |
| | import hashlib |
| | from google.oauth2.service_account import Credentials |
| |
|
| | from .utils import timing_decorator_sync |
| | from .constants import SHEET_RANGE |
| |
|
| | SCOPES = ['https://www.googleapis.com/auth/spreadsheets'] |
| |
|
| | def generate_conversation_id(user_id: str, page_id: str, timestamp: str) -> str: |
| | """ |
| | Tạo conversation_id duy nhất dựa trên user_id, page_id và timestamp. |
| | """ |
| | hash_input = f"{user_id}:{page_id}:{timestamp}" |
| | return hashlib.sha256(hash_input.encode()).hexdigest()[:32] |
| |
|
| | class SheetsClient: |
| | def __init__(self, credentials_file: str, token_file: str, sheet_id: str): |
| | """ |
| | Khởi tạo SheetsClient với thông tin xác thực và sheet_id. |
| | Input: credentials_file (str), token_file (str), sheet_id (str) |
| | Output: SheetsClient instance. |
| | """ |
| | self.credentials_file = credentials_file |
| | self.token_file = token_file |
| | self.sheet_id = sheet_id |
| | self.creds = None |
| | self.service = None |
| |
|
| | @timing_decorator_sync |
| | def authenticate(self) -> None: |
| | """ |
| | Xác thực với Google Sheets API, tạo self.service. |
| | Đọc credentials từ biến môi trường GOOGLE_SHEETS_CREDENTIALS_JSON nếu có, nếu không thì dùng file. |
| | Input: None |
| | Output: None (raise exception nếu lỗi) |
| | """ |
| | credentials_json = os.getenv("GOOGLE_SHEETS_CREDENTIALS_JSON") |
| | if credentials_json: |
| | info = json.loads(credentials_json) |
| | creds = Credentials.from_service_account_info(info, scopes=SCOPES) |
| | self.creds = creds |
| | else: |
| | if os.path.exists(self.token_file): |
| | with open(self.token_file, 'rb') as token: |
| | self.creds = pickle.load(token) |
| |
|
| | if not self.creds or not self.creds.valid: |
| | if self.creds and self.creds.expired: |
| | self.creds.refresh(Request()) |
| | else: |
| | flow = InstalledAppFlow.from_client_secrets_file( |
| | self.credentials_file, SCOPES) |
| | self.creds = flow.run_local_server(port=0) |
| |
|
| | with open(self.token_file, 'wb') as token: |
| | pickle.dump(self.creds, token) |
| |
|
| | self.service = build('sheets', 'v4', credentials=self.creds) |
| |
|
| | @timing_decorator_sync |
| | def get_conversation_history(self, user_id: str, page_id: str) -> List[Dict[str, Any]]: |
| | """ |
| | Lấy lịch sử hội thoại chưa hoàn thành của user từ Google Sheets. |
| | Input: user_id (str), page_id (str) |
| | Output: list[dict] các dòng hội thoại chưa hoàn thành. |
| | """ |
| | try: |
| | if not self.service: |
| | self.authenticate() |
| | if not self.service: |
| | raise RuntimeError("Google Sheets service not initialized") |
| | range_name = SHEET_RANGE |
| | result = self.service.spreadsheets().values().get( |
| | spreadsheetId=self.sheet_id, |
| | range=range_name |
| | ).execute() |
| | values = result.get('values', []) |
| | history = [] |
| | for row in values: |
| | row = row + [""] * (12 - len(row)) |
| | try: |
| | timestamps = json.loads(row[10]) if row[10] else [] |
| | except Exception: |
| | timestamps = [] |
| | if not isinstance(timestamps, list): |
| | timestamps = [timestamps] |
| | if row[4] == user_id and row[5] == page_id and row[11].lower() == 'false': |
| | history.append({ |
| | 'conversation_id': row[0], |
| | 'originalcommand': row[1], |
| | 'originalcontent': row[2], |
| | 'originalattachments': json.loads(row[3]) if row[3] else [], |
| | 'recipient_id': row[4], |
| | 'page_id': row[5], |
| | 'originaltext': row[6], |
| | 'originalvehicle': row[7], |
| | 'originalaction': row[8], |
| | 'originalpurpose': row[9], |
| | 'timestamp': timestamps, |
| | 'isdone': row[11].lower() == 'true' |
| | }) |
| | return history |
| | except Exception as e: |
| | logger.error(f"Error getting conversation history: {e}") |
| | return [] |
| |
|
| | @timing_decorator_sync |
| | def log_conversation( |
| | self, |
| | conversation_id: str, |
| | recipient_id: str, |
| | page_id: str, |
| | originaltext: str = "", |
| | originalcommand: str = "", |
| | originalcontent: str = "", |
| | originalattachments: Optional[List[str]] = None, |
| | originalvehicle: str = "", |
| | originalaction: str = "", |
| | originalpurpose: str = "", |
| | timestamp: Any = None, |
| | isdone: bool = False |
| | ) -> Optional[Dict[str, Any]]: |
| | """ |
| | Ghi log hội thoại vào Google Sheets. |
| | Dùng các trường original* cho các cột tương ứng trong sheet và các logic liên quan. |
| | """ |
| | try: |
| | if not self.service: |
| | self.authenticate() |
| | if not self.service: |
| | raise RuntimeError("Google Sheets service not initialized") |
| |
|
| | |
| | result = self.service.spreadsheets().values().get( |
| | spreadsheetId=self.sheet_id, |
| | range=SHEET_RANGE |
| | ).execute() |
| | values = result.get('values', []) |
| | logger.info(f"[DEBUG] Gsheet values {values}") |
| | ts = datetime.now().isoformat() |
| | |
| | if timestamp is None: |
| | timestamp = [] |
| | elif not isinstance(timestamp, list): |
| | timestamp = [timestamp] |
| | if not conversation_id: |
| | |
| | for row in values: |
| | if len(row) >= 11: |
| | try: |
| | row_timestamps = json.loads(row[10]) if row[10] else [] |
| | except Exception: |
| | row_timestamps = [] |
| | if not isinstance(row_timestamps, list): |
| | row_timestamps = [row_timestamps] |
| | row_recipient_id = row[4] |
| | row_page_id = row[5] |
| | if (str(timestamp) in [str(ts) for ts in row_timestamps] and str(row_recipient_id) == str(recipient_id) and str(row_page_id) == str(page_id)): |
| | |
| | logger.info(f"Found duplicate conversation for user {recipient_id}, page {page_id}, timestamp {timestamp}") |
| | return { |
| | 'conversation_id': row[0], |
| | 'originalcommand': row[1], |
| | 'originalcontent': row[2], |
| | 'originalattachments': json.loads(row[3]) if row[3] else [], |
| | 'recipient_id': row[4], |
| | 'page_id': row[5], |
| | 'originaltext': row[6], |
| | 'originalvehicle': row[7], |
| | 'originalaction': row[8], |
| | 'originalpurpose': row[9], |
| | 'timestamp': row_timestamps, |
| | 'isdone': row[11].lower() == 'true' if len(row) > 11 else False |
| | } |
| |
|
| | |
| | conversation_id = generate_conversation_id(recipient_id, page_id, ts) |
| | new_row = [ |
| | conversation_id, |
| | originalcommand, |
| | originalcontent, |
| | json.dumps(originalattachments or []), |
| | recipient_id, |
| | page_id, |
| | originaltext, |
| | originalvehicle, |
| | originalaction, |
| | originalpurpose, |
| | json.dumps(timestamp), |
| | str(isdone).lower() |
| | ] |
| | body = { |
| | 'values': [new_row] |
| | } |
| | range_name = SHEET_RANGE |
| | self.service.spreadsheets().values().append( |
| | spreadsheetId=self.sheet_id, |
| | range=range_name, |
| | valueInputOption='RAW', |
| | body=body |
| | ).execute() |
| | logger.info(f"Thêm mới conversation: {conversation_id} | Giá trị: {dict(zip(['conversation_id','originalcommand','originalcontent','originalattachments','recipient_id','page_id','originaltext','originalvehicle','originalaction','originalpurpose','timestamp','isdone'], new_row))}") |
| |
|
| | |
| | return { |
| | 'conversation_id': conversation_id, |
| | 'originalcommand': originalcommand, |
| | 'originalcontent': originalcontent, |
| | 'originalattachments': originalattachments or [], |
| | 'recipient_id': recipient_id, |
| | 'page_id': page_id, |
| | 'originaltext': originaltext, |
| | 'originalvehicle': originalvehicle, |
| | 'originalaction': originalaction, |
| | 'originalpurpose': originalpurpose, |
| | 'timestamp': timestamp, |
| | 'isdone': isdone |
| | } |
| | else: |
| | |
| | if not values: |
| | logger.error("No data in sheet, cannot update conversation.") |
| | return None |
| | row_index = None |
| | for i, row in enumerate(values): |
| | if row[0] == conversation_id: |
| | row_index = i |
| | break |
| | logger.info(f"[DEBUG] Gsheet row index {row_index}") |
| | if row_index is not None: |
| | sheet_row_number = row_index + 2 |
| | current_row = values[row_index] |
| | logger.info(f"[DEBUG] Gsheet current row {current_row}") |
| | while len(current_row) < 13: |
| | current_row.append("") |
| | try: |
| | current_timestamps = json.loads(current_row[10]) if current_row[10] else [] |
| | except Exception: |
| | current_timestamps = [] |
| | if not isinstance(current_timestamps, list): |
| | current_timestamps = [current_timestamps] |
| | |
| | for ts in timestamp: |
| | if ts not in current_timestamps: |
| | current_timestamps.append(ts) |
| | new_row = [ |
| | conversation_id, |
| | originalcommand if originalcommand else current_row[1], |
| | originalcontent if originalcontent else current_row[2], |
| | json.dumps(originalattachments) if originalattachments is not None else current_row[3], |
| | recipient_id if recipient_id else current_row[4], |
| | page_id if page_id else current_row[5], |
| | originaltext if originaltext else current_row[6], |
| | originalvehicle if originalvehicle else current_row[7], |
| | originalaction if originalaction else current_row[8], |
| | originalpurpose if originalpurpose else current_row[9], |
| | json.dumps(current_timestamps), |
| | str(isdone).lower() if isdone is not None else current_row[11] |
| | ] |
| | update_range = f"{SHEET_RANGE.split('!')[0]}!A{sheet_row_number}" |
| | logger.info(f"[DEBUG] Gsheet update range {update_range}") |
| | body = { |
| | 'values': [new_row] |
| | } |
| | self.service.spreadsheets().values().update( |
| | spreadsheetId=self.sheet_id, |
| | range=update_range, |
| | valueInputOption='RAW', |
| | body=body |
| | ).execute() |
| | changed_cols = ['conversation_id','originalcommand','originalcontent','originalattachments','recipient_id','page_id','originaltext','originalvehicle','originalaction','originalpurpose','timestamp','isdone'] |
| | for idx, (old, new) in enumerate(zip(current_row, new_row)): |
| | if old != new: |
| | changed_cols.append(changed_cols[idx]) |
| | logger.info(f"Cập nhật conversation: {conversation_id} tại dòng {sheet_row_number} | Cột cập nhật: {changed_cols} | Giá trị mới: {dict(zip(changed_cols, new_row))}") |
| |
|
| | |
| | return { |
| | 'conversation_id': conversation_id, |
| | 'originalcommand': new_row[1], |
| | 'originalcontent': new_row[2], |
| | 'originalattachments': json.loads(new_row[3]) if new_row[3] else [], |
| | 'recipient_id': new_row[4], |
| | 'page_id': new_row[5], |
| | 'originaltext': new_row[6], |
| | 'originalvehicle': new_row[7], |
| | 'originalaction': new_row[8], |
| | 'originalpurpose': new_row[9], |
| | 'timestamp': current_timestamps, |
| | 'isdone': new_row[11].lower() == 'true' |
| | } |
| | return None |
| | except Exception as e: |
| | logger.error(f"Error logging conversation: {e}") |
| | return None |