VietCat commited on
Commit
9c21ccb
·
1 Parent(s): 67751a7

quick fix timestamp

Browse files
Files changed (1) hide show
  1. app/sheets.py +126 -148
app/sheets.py CHANGED
@@ -1,25 +1,44 @@
1
- from typing import Any, Dict, List, Optional
2
- from google.oauth2.credentials import Credentials
3
- from google_auth_oauthlib.flow import InstalledAppFlow
4
- from google.auth.transport.requests import Request
5
- from googleapiclient.discovery import build
6
  import os
7
  import pickle
8
- from datetime import datetime
9
- from loguru import logger
10
  import json
11
  import hashlib
 
 
 
12
  from google.oauth2.service_account import Credentials
 
 
 
 
13
 
14
  from .utils import timing_decorator_sync
15
  from .constants import SHEET_RANGE
16
 
17
  SCOPES = ['https://www.googleapis.com/auth/spreadsheets']
18
 
19
- def generate_conversation_id(user_id: str, page_id: str, timestamp: str) -> str:
 
20
  hash_input = f"{user_id}:{page_id}:{timestamp}"
21
  return hashlib.sha256(hash_input.encode()).hexdigest()[:32]
22
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
23
  class SheetsClient:
24
  def __init__(self, credentials_file: str, token_file: str, sheet_id: str):
25
  self.credentials_file = credentials_file
@@ -30,21 +49,20 @@ class SheetsClient:
30
 
31
  @timing_decorator_sync
32
  def authenticate(self) -> None:
 
33
  credentials_json = os.getenv("GOOGLE_SHEETS_CREDENTIALS_JSON")
34
  if credentials_json:
35
  info = json.loads(credentials_json)
36
- creds = Credentials.from_service_account_info(info, scopes=SCOPES)
37
- self.creds = creds
38
  else:
39
  if os.path.exists(self.token_file):
40
  with open(self.token_file, 'rb') as token:
41
  self.creds = pickle.load(token)
42
  if not self.creds or not self.creds.valid:
43
- if self.creds and self.creds.expired:
44
  self.creds.refresh(Request())
45
  else:
46
- flow = InstalledAppFlow.from_client_secrets_file(
47
- self.credentials_file, SCOPES)
48
  self.creds = flow.run_local_server(port=0)
49
  with open(self.token_file, 'wb') as token:
50
  pickle.dump(self.creds, token)
@@ -52,158 +70,118 @@ class SheetsClient:
52
 
53
  @timing_decorator_sync
54
  def get_conversation_history(self, user_id: str, page_id: str) -> List[Dict[str, Any]]:
 
55
  try:
56
  if not self.service:
57
  self.authenticate()
58
- range_name = SHEET_RANGE
59
  result = self.service.spreadsheets().values().get(
60
  spreadsheetId=self.sheet_id,
61
- range=range_name
62
  ).execute()
63
  values = result.get('values', [])
 
 
 
 
64
  history = []
65
- for row in values:
66
- row = row + [""] * (14 - len(row))
67
- try:
68
- timestamps = json.loads(row[12]) if row[12] else []
69
- if not isinstance(timestamps, list):
70
- timestamps = [str(timestamps)]
71
- except Exception:
72
- timestamps = []
73
- if row[4] == user_id and row[5] == page_id:
74
- history.append({
75
- 'conversation_id': row[0],
76
- 'originalcommand': row[1],
77
- 'originalcontent': row[2],
78
- 'originalattachments': json.loads(row[3]) if row[3] else [],
79
- 'recipient_id': row[4],
80
- 'page_id': row[5],
81
- 'originaltext': row[6],
82
- 'originalvehicle': row[7],
83
- 'originalaction': row[8],
84
- 'originalpurpose': row[9],
85
- 'originalquestion': row[10],
86
- 'systemresponse': row[11],
87
- 'timestamp': timestamps,
88
- 'isdone': row[13].lower() == 'true'
89
- })
90
  return history
91
  except Exception as e:
92
- logger.error(f"Error getting conversation history: {e}")
93
  return []
94
 
95
  @timing_decorator_sync
96
- def log_conversation(
97
- self,
98
- conversation_id: str,
99
- recipient_id: str,
100
- page_id: str,
101
- originaltext: str = "",
102
- originalcommand: str = "",
103
- originalcontent: str = "",
104
- originalattachments: Optional[List[str]] = None,
105
- originalvehicle: str = "",
106
- originalaction: str = "",
107
- originalpurpose: str = "",
108
- originalquestion: str = "",
109
- systemresponse: str = "",
110
- timestamp: Optional[str] = None,
111
- isdone: bool = False
112
- ) -> Optional[Dict[str, Any]]:
113
  try:
114
  if not self.service:
115
  self.authenticate()
116
 
117
- result = self.service.spreadsheets().values().get(
118
- spreadsheetId=self.sheet_id,
119
- range=SHEET_RANGE
120
- ).execute()
121
- values = result.get('values', [])
122
-
123
- current_ts = timestamp or datetime.now().isoformat()
124
- timestamps_list = [current_ts]
125
-
126
- # Check duplicate
127
- for row in values:
128
- row = row + [""] * 14
129
- try:
130
- row_timestamps = json.loads(row[12]) if row[12] else []
131
- if not isinstance(row_timestamps, list):
132
- row_timestamps = [row_timestamps]
133
- except Exception:
134
- row_timestamps = []
135
-
136
- row_recipient_id = row[4]
137
- row_page_id = row[5]
138
-
139
- if current_ts in [str(ts) for ts in row_timestamps] and \
140
- str(row_recipient_id) == str(recipient_id) and \
141
- str(row_page_id) == str(page_id):
142
- logger.info(f"Found duplicate conversation for user {recipient_id}, page {page_id}, timestamp {current_ts}")
143
- return {
144
- 'conversation_id': row[0],
145
- 'originalcommand': row[1],
146
- 'originalcontent': row[2],
147
- 'originalattachments': json.loads(row[3]) if row[3] else [],
148
- 'recipient_id': row[4],
149
- 'page_id': row[5],
150
- 'originaltext': row[6],
151
- 'originalvehicle': row[7],
152
- 'originalaction': row[8],
153
- 'originalpurpose': row[9],
154
- 'originalquestion': row[10],
155
- 'systemresponse': row[11],
156
- 'timestamp': row_timestamps,
157
- 'isdone': row[13].lower() == 'true' if len(row) > 13 else False
158
- }
159
-
160
- conversation_id = generate_conversation_id(recipient_id, page_id, current_ts)
161
- new_row = [
162
- conversation_id,
163
- originalcommand,
164
- originalcontent,
165
- json.dumps(originalattachments or []),
166
- recipient_id,
167
- page_id,
168
- originaltext,
169
- originalvehicle,
170
- originalaction,
171
- originalpurpose,
172
- originalquestion,
173
- systemresponse,
174
- json.dumps(timestamps_list),
175
- str(isdone).lower()
176
- ]
177
-
178
- body = {
179
- 'values': [new_row]
180
- }
181
-
182
- self.service.spreadsheets().values().append(
183
- spreadsheetId=self.sheet_id,
184
- range=SHEET_RANGE,
185
- valueInputOption='RAW',
186
- body=body
187
- ).execute()
188
 
189
- logger.info(f"Thêm mới conversation: {conversation_id}")
190
-
191
- return {
192
- 'conversation_id': conversation_id,
193
- 'originalcommand': originalcommand,
194
- 'originalcontent': originalcontent,
195
- 'originalattachments': originalattachments or [],
196
- 'recipient_id': recipient_id,
197
- 'page_id': page_id,
198
- 'originaltext': originaltext,
199
- 'originalvehicle': originalvehicle,
200
- 'originalaction': originalaction,
201
- 'originalpurpose': originalpurpose,
202
- 'originalquestion': originalquestion,
203
- 'systemresponse': systemresponse,
204
- 'timestamp': timestamps_list,
205
- 'isdone': isdone
206
- }
207
  except Exception as e:
208
- logger.error(f"Error logging conversation: {e}")
209
  return None
 
 
 
 
 
 
1
  import os
2
  import pickle
 
 
3
  import json
4
  import hashlib
5
+ from datetime import datetime
6
+ from typing import Any, Dict, List, Optional
7
+
8
  from google.oauth2.service_account import Credentials
9
+ from google.auth.transport.requests import Request
10
+ from google_auth_oauthlib.flow import InstalledAppFlow
11
+ from googleapiclient.discovery import build
12
+ from loguru import logger
13
 
14
  from .utils import timing_decorator_sync
15
  from .constants import SHEET_RANGE
16
 
17
  SCOPES = ['https://www.googleapis.com/auth/spreadsheets']
18
 
19
+ def generate_conversation_id(user_id: str, page_id: str, timestamp: Any) -> str:
20
+ """Tạo ID hội thoại duy nhất."""
21
  hash_input = f"{user_id}:{page_id}:{timestamp}"
22
  return hashlib.sha256(hash_input.encode()).hexdigest()[:32]
23
 
24
+ def _flatten_and_unique_timestamps(items: List[Any]) -> List[Any]:
25
+ """
26
+ Hàm tiện ích để làm phẳng danh sách timestamp và loại bỏ các giá trị trùng lặp.
27
+ Xử lý được cả list lồng nhau.
28
+ """
29
+ flat_list = []
30
+ if not isinstance(items, list):
31
+ return [items]
32
+
33
+ for item in items:
34
+ if isinstance(item, list):
35
+ flat_list.extend(_flatten_and_unique_timestamps(item))
36
+ else:
37
+ flat_list.append(item)
38
+ # Dùng dict để giữ lại thứ tự và loại bỏ trùng lặp
39
+ return list(dict.fromkeys(flat_list))
40
+
41
+
42
  class SheetsClient:
43
  def __init__(self, credentials_file: str, token_file: str, sheet_id: str):
44
  self.credentials_file = credentials_file
 
49
 
50
  @timing_decorator_sync
51
  def authenticate(self) -> None:
52
+ """Xác thực với Google Sheets API."""
53
  credentials_json = os.getenv("GOOGLE_SHEETS_CREDENTIALS_JSON")
54
  if credentials_json:
55
  info = json.loads(credentials_json)
56
+ self.creds = Credentials.from_service_account_info(info, scopes=SCOPES)
 
57
  else:
58
  if os.path.exists(self.token_file):
59
  with open(self.token_file, 'rb') as token:
60
  self.creds = pickle.load(token)
61
  if not self.creds or not self.creds.valid:
62
+ if self.creds and self.creds.expired and self.creds.refresh_token:
63
  self.creds.refresh(Request())
64
  else:
65
+ flow = InstalledAppFlow.from_client_secrets_file(self.credentials_file, SCOPES)
 
66
  self.creds = flow.run_local_server(port=0)
67
  with open(self.token_file, 'wb') as token:
68
  pickle.dump(self.creds, token)
 
70
 
71
  @timing_decorator_sync
72
  def get_conversation_history(self, user_id: str, page_id: str) -> List[Dict[str, Any]]:
73
+ """Lấy lịch sử hội thoại từ sheet, xử lý timestamp một cách an toàn."""
74
  try:
75
  if not self.service:
76
  self.authenticate()
77
+
78
  result = self.service.spreadsheets().values().get(
79
  spreadsheetId=self.sheet_id,
80
+ range=SHEET_RANGE
81
  ).execute()
82
  values = result.get('values', [])
83
+ if not values:
84
+ return []
85
+
86
+ header = values[0]
87
  history = []
88
+ for row in values[1:]: # Bỏ qua dòng header
89
+ # Đảm bảo row đủ cột
90
+ row_data = dict(zip(header, row + [""] * (len(header) - len(row))))
91
+
92
+ if row_data.get('recipient_id') == user_id and row_data.get('page_id') == page_id:
93
+ # Cải tiến: Xử lý an toàn việc đọc timestamp
94
+ try:
95
+ timestamps_raw = json.loads(row_data.get('timestamp', '[]'))
96
+ timestamps = _flatten_and_unique_timestamps(timestamps_raw)
97
+ except (json.JSONDecodeError, TypeError):
98
+ timestamps = []
99
+
100
+ row_data['timestamp'] = timestamps
101
+ row_data['originalattachments'] = json.loads(row_data.get('originalattachments', '[]'))
102
+ row_data['isdone'] = str(row_data.get('isdone', 'false')).lower() == 'true'
103
+ history.append(row_data)
 
 
 
 
 
 
 
 
 
104
  return history
105
  except Exception as e:
106
+ logger.error(f"Lỗi khi lấy lịch sử hội thoại: {e}")
107
  return []
108
 
109
  @timing_decorator_sync
110
+ def log_conversation(self, **kwargs: Any) -> Optional[Dict[str, Any]]:
111
+ """
112
+ Ghi lại hoặc cập nhật một hội thoại.
113
+ - Nếu có 'conversation_id', sẽ tìm và CẬP NHẬT dòng đó.
114
+ - Nếu không, sẽ THÊM MỚI một dòng.
115
+ """
 
 
 
 
 
 
 
 
 
 
 
116
  try:
117
  if not self.service:
118
  self.authenticate()
119
 
120
+ # Lấy toàn bộ dữ liệu từ sheet một lần
121
+ sheet = self.service.spreadsheets().values().get(spreadsheetId=self.sheet_id, range=SHEET_RANGE).execute()
122
+ values = sheet.get('values', [])
123
+ header = values[0] if values else []
124
+ if not header:
125
+ logger.error("Sheet rỗng hoặc không có header.")
126
+ return None
127
+
128
+ conversation_id = kwargs.get('conversation_id')
129
+ row_index_to_update = -1
130
+
131
+ # Tìm dòng cần cập nhật nếu có conversation_id
132
+ if conversation_id:
133
+ for i, row in enumerate(values[1:], start=2): # start=2 sheet index bắt đầu từ 1 và bỏ qua header
134
+ if row and row[0] == conversation_id:
135
+ row_index_to_update = i
136
+ break
137
+
138
+ # Làm phẳng và đảm bảo timestamp là một list duy nhất
139
+ timestamps_list = _flatten_and_unique_timestamps(kwargs.get('timestamp', []))
140
+ kwargs['timestamp'] = timestamps_list # Cập nhật lại kwargs
141
+
142
+ # Chuẩn bị dữ liệu cho dòng mới/cập nhật
143
+ row_data = [str(kwargs.get(h, '')) for h in header]
144
+ # Chuyển đổi các trường đặc biệt sang JSON string
145
+ row_data[header.index('originalattachments')] = json.dumps(kwargs.get('originalattachments', []))
146
+ row_data[header.index('timestamp')] = json.dumps(timestamps_list)
147
+ row_data[header.index('isdone')] = str(kwargs.get('isdone', False)).lower()
148
+
149
+ if row_index_to_update != -1:
150
+ # --- CẬP NHẬT DÒNG HIỆN CÓ ---
151
+ logger.info(f"Đang cập nhật conversation: {conversation_id}")
152
+ range_to_update = f"A{row_index_to_update}"
153
+ body = {'values': [row_data]}
154
+ self.service.spreadsheets().values().update(
155
+ spreadsheetId=self.sheet_id,
156
+ range=range_to_update,
157
+ valueInputOption='RAW',
158
+ body=body
159
+ ).execute()
160
+ else:
161
+ # --- THÊM DÒNG MỚI ---
162
+ if not conversation_id:
163
+ # Tạo ID mới nếu chưa có
164
+ new_id = generate_conversation_id(
165
+ kwargs.get('recipient_id', ''),
166
+ kwargs.get('page_id', ''),
167
+ timestamps_list[0] if timestamps_list else datetime.now().isoformat()
168
+ )
169
+ kwargs['conversation_id'] = new_id
170
+ row_data[header.index('conversation_id')] = new_id
171
+
172
+ logger.info(f"Đang thêm mới conversation: {kwargs['conversation_id']}")
173
+ body = {'values': [row_data]}
174
+ self.service.spreadsheets().values().append(
175
+ spreadsheetId=self.sheet_id,
176
+ range=SHEET_RANGE,
177
+ valueInputOption='RAW',
178
+ insertDataOption='INSERT_ROWS',
179
+ body=body
180
+ ).execute()
181
+
182
+ # Trả về dữ liệu đã được xử lý
183
+ return kwargs
 
 
 
 
 
 
 
184
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
185
  except Exception as e:
186
+ logger.error(f"Lỗi khi ghi/cập nhật conversation: {e}", exc_info=True)
187
  return None