rairo commited on
Commit
64e6cdb
·
1 Parent(s): 96c1c12

Update utility.py

Browse files
Files changed (1) hide show
  1. utility.py +275 -62
utility.py CHANGED
@@ -1,16 +1,21 @@
1
  import json
2
  import os
 
3
  from datetime import datetime
4
  from google.cloud import firestore
5
  import pandas as pd
6
  from langchain_google_genai import ChatGoogleGenerativeAI
7
  import google.generativeai as genai
8
  import logging
 
9
 
 
10
  logger = logging.getLogger(__name__)
11
 
 
12
  db = firestore.Client.from_service_account_json("firestore-key.json")
13
 
 
14
  llm = ChatGoogleGenerativeAI(
15
  model="gemini-2.0-flash-thinking-exp",
16
  max_output_tokens=1024,
@@ -19,8 +24,11 @@ llm = ChatGoogleGenerativeAI(
19
  top_p=0.01,
20
  )
21
 
22
- def generateResponse(prompt):
23
- system_prompt = """You MUST format responses EXACTLY like this:
 
 
 
24
 
25
  *Intent*: [create/read/update/delete]
26
  *Transaction Type*: [purchase/sale/inventory/etc]
@@ -29,7 +37,7 @@ def generateResponse(prompt):
29
  - Field2: Value2
30
  - Field3: Value3
31
 
32
- For multiple transactions repeat this pattern exactly."""
33
 
34
  genai.configure(api_key=os.environ.get("GOOGLE_API_KEY"))
35
  model = genai.GenerativeModel(
@@ -46,27 +54,33 @@ For multiple transactions repeat this pattern exactly."""
46
  response = model.generate_content([system_prompt, prompt])
47
  return response.text
48
  except Exception as e:
49
- logger.error(f"Generation error: {str(e)}")
50
- return None
51
 
52
- def parse_multiple_transactions(response_text):
 
 
 
 
53
  transactions = []
54
  current_trans = {}
55
 
 
56
  try:
57
- # First try JSON parsing
58
  parsed = json.loads(response_text)
59
  if isinstance(parsed, dict):
60
  return [add_timestamp(parsed)]
61
- return [add_timestamp(t) for t in parsed]
 
62
  except json.JSONDecodeError:
63
- pass # Proceed to text parsing
64
 
 
65
  lines = [line.strip() for line in response_text.split('\n') if line.strip()]
66
 
67
  for line in lines:
68
  if line.startswith('*Intent*:'):
69
- if current_trans:
70
  transactions.append(add_timestamp(current_trans))
71
  current_trans = {}
72
  current_trans['intent'] = line.split(':', 1)[1].strip().lower()
@@ -81,16 +95,22 @@ def parse_multiple_transactions(response_text):
81
  value = key_value[1].strip()
82
  current_trans['details'][key] = value
83
 
 
84
  if current_trans:
85
  transactions.append(add_timestamp(current_trans))
86
 
87
  return transactions if transactions else []
88
 
89
- def add_timestamp(trans):
90
- trans['created_at'] = datetime.now().isoformat()
91
- return trans
 
92
 
93
- def create_inventory(user_phone, transaction_data):
 
 
 
 
94
  for transaction in transaction_data:
95
  item_name = transaction['details'].get('item')
96
  if not item_name:
@@ -98,74 +118,190 @@ def create_inventory(user_phone, transaction_data):
98
 
99
  quantity = int(transaction['details'].get('quantity', 0))
100
  doc_ref = db.collection("users").document(user_phone).collection("inventory").document(item_name)
101
- existing_item = doc_ref.get()
102
 
103
- if existing_item.exists:
104
- existing_data = existing_item.to_dict()
105
- current_qty = int(existing_data['details'].get('quantity', 0))
106
- existing_data['details']['quantity'] = current_qty + quantity
107
- doc_ref.set(existing_data)
108
- else:
109
- transaction['details']['quantity'] = quantity
110
- doc_ref.set(transaction)
 
 
 
 
 
 
 
 
111
  return True
112
 
113
- def create_sale(user_phone, transaction_data):
 
 
 
 
 
 
 
 
114
  for transaction in transaction_data:
115
  item_name = transaction['details'].get('item')
116
  if not item_name:
117
  continue
118
 
119
  quantity = int(transaction['details'].get('quantity', 0))
120
- inventory = fetch_transaction(user_phone, item_name)
121
 
122
- if not inventory or int(inventory['details'].get('quantity', 0)) < quantity:
 
 
 
 
 
 
123
  return False
124
 
125
- new_stock = int(inventory['details']['quantity']) - quantity
126
- inventory['details']['quantity'] = new_stock
127
- db.collection("users").document(user_phone).collection("inventory").document(item_name).set(inventory)
 
 
128
 
129
- sale_ref = db.collection("users").document(user_phone).collection("sales").document()
130
- sale_ref.set(transaction)
131
- return True
 
 
 
 
 
 
 
 
 
 
132
 
133
- def read_datalake(user_phone, user_question):
134
- inventory_ref = db.collection("users").document(user_phone).collection("inventory")
135
- sales_ref = db.collection("users").document(user_phone).collection("sales")
136
- inventory_list = [doc.to_dict() for doc in inventory_ref.stream()]
137
- sales_list = [doc.to_dict() for doc in sales_ref.stream()]
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
138
 
139
- if not inventory_list and not sales_list:
140
- return "No data found"
 
 
 
 
 
141
 
142
- inventory_df = pd.DataFrame(inventory_list)
143
- sales_df = pd.DataFrame(sales_list)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
144
 
145
- from pandasai import SmartDatalake
146
- lake = SmartDatalake(
147
- [inventory_df, sales_df],
148
- config={"llm": llm}
149
- )
 
 
 
 
 
150
 
151
  try:
152
- return str(lake.chat(user_question))
 
153
  except Exception as e:
154
- logger.error(f"Datalake error: {str(e)}")
155
- return "Error generating response"
156
 
157
- def persist_temporary_transaction(transactions, mobile):
 
 
 
 
158
  try:
159
- db.collection("users").document(mobile).collection("temp_transactions").document("pending").set({
 
160
  "transactions": transactions,
161
  "timestamp": datetime.now().isoformat()
162
  })
163
  return True
164
  except Exception as e:
165
- logger.error(f"Temp save error: {str(e)}")
166
  return False
167
 
168
- def format_transaction_response(transactions):
 
 
 
 
169
  if not transactions:
170
  return "No transaction data"
171
 
@@ -173,20 +309,97 @@ def format_transaction_response(transactions):
173
  transactions = [transactions]
174
 
175
  output = []
176
- for trans in transactions:
177
- output.append(f"Intent: {trans.get('intent', 'unknown')}")
178
- output.append(f"Type: {trans.get('transaction_type', 'unknown')}")
 
 
179
  if 'details' in trans:
180
- output.append("Details:")
181
  for k, v in trans['details'].items():
182
- output.append(f"- {k}: {v}")
183
- output.append("")
 
 
 
 
 
 
184
 
185
  return "\n".join(output)
186
 
187
- def fetch_transaction(user_phone, item_name=None):
 
 
 
 
 
 
 
 
188
  if item_name:
189
- doc = db.collection("users").document(user_phone).collection("inventory").document(item_name).get()
190
  return doc.to_dict() if doc.exists else None
191
  else:
192
- return [doc.to_dict() for doc in db.collection("users").document(user_phone).collection("inventory").stream()]
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
  import json
2
  import os
3
+ import re
4
  from datetime import datetime
5
  from google.cloud import firestore
6
  import pandas as pd
7
  from langchain_google_genai import ChatGoogleGenerativeAI
8
  import google.generativeai as genai
9
  import logging
10
+ from typing import List, Dict, Union, Optional
11
 
12
+ # Configure logging
13
  logger = logging.getLogger(__name__)
14
 
15
+ # Initialize Firestore
16
  db = firestore.Client.from_service_account_json("firestore-key.json")
17
 
18
+ # Initialize Gemini LLM
19
  llm = ChatGoogleGenerativeAI(
20
  model="gemini-2.0-flash-thinking-exp",
21
  max_output_tokens=1024,
 
24
  top_p=0.01,
25
  )
26
 
27
+ def generateResponse(prompt: str) -> str:
28
+ """
29
+ Generate structured transaction response from user input using Gemini
30
+ """
31
+ system_prompt = """You MUST format responses EXACTLY as follows:
32
 
33
  *Intent*: [create/read/update/delete]
34
  *Transaction Type*: [purchase/sale/inventory/etc]
 
37
  - Field2: Value2
38
  - Field3: Value3
39
 
40
+ For multiple transactions, repeat this pattern for each transaction."""
41
 
42
  genai.configure(api_key=os.environ.get("GOOGLE_API_KEY"))
43
  model = genai.GenerativeModel(
 
54
  response = model.generate_content([system_prompt, prompt])
55
  return response.text
56
  except Exception as e:
57
+ logger.error(f"Response generation failed: {str(e)}")
58
+ return "Sorry, I couldn't process that request."
59
 
60
+ def parse_multiple_transactions(response_text: str) -> List[Dict]:
61
+ """
62
+ Parse Gemini response into structured transaction data
63
+ Returns list of transaction dictionaries
64
+ """
65
  transactions = []
66
  current_trans = {}
67
 
68
+ # First try JSON parsing if response is JSON
69
  try:
 
70
  parsed = json.loads(response_text)
71
  if isinstance(parsed, dict):
72
  return [add_timestamp(parsed)]
73
+ elif isinstance(parsed, list):
74
+ return [add_timestamp(t) for t in parsed]
75
  except json.JSONDecodeError:
76
+ pass # Proceed to text parsing if not JSON
77
 
78
+ # Text-based parsing
79
  lines = [line.strip() for line in response_text.split('\n') if line.strip()]
80
 
81
  for line in lines:
82
  if line.startswith('*Intent*:'):
83
+ if current_trans: # Save previous transaction if exists
84
  transactions.append(add_timestamp(current_trans))
85
  current_trans = {}
86
  current_trans['intent'] = line.split(':', 1)[1].strip().lower()
 
95
  value = key_value[1].strip()
96
  current_trans['details'][key] = value
97
 
98
+ # Add the last transaction if exists
99
  if current_trans:
100
  transactions.append(add_timestamp(current_trans))
101
 
102
  return transactions if transactions else []
103
 
104
+ def add_timestamp(transaction: Dict) -> Dict:
105
+ """Add timestamp to transaction"""
106
+ transaction['created_at'] = datetime.now().isoformat()
107
+ return transaction
108
 
109
+ def create_inventory(user_phone: str, transaction_data: List[Dict]) -> bool:
110
+ """
111
+ Create or update inventory items
112
+ Returns True if successful, False otherwise
113
+ """
114
  for transaction in transaction_data:
115
  item_name = transaction['details'].get('item')
116
  if not item_name:
 
118
 
119
  quantity = int(transaction['details'].get('quantity', 0))
120
  doc_ref = db.collection("users").document(user_phone).collection("inventory").document(item_name)
 
121
 
122
+ try:
123
+ # Update existing or create new
124
+ doc_ref.set({
125
+ 'intent': 'create',
126
+ 'transaction_type': 'inventory',
127
+ 'details': {
128
+ 'item': item_name,
129
+ 'quantity': firestore.Increment(quantity),
130
+ **{k:v for k,v in transaction['details'].items() if k not in ['item', 'quantity']}
131
+ },
132
+ 'last_updated': datetime.now().isoformat()
133
+ }, merge=True)
134
+ except Exception as e:
135
+ logger.error(f"Inventory update failed for {user_phone}: {str(e)}")
136
+ return False
137
+
138
  return True
139
 
140
+ def create_sale(user_phone: str, transaction_data: List[Dict]) -> bool:
141
+ """
142
+ Process sales transactions with inventory validation
143
+ Returns True if successful, False otherwise
144
+ """
145
+ batch = db.batch()
146
+ inventory_ref = db.collection("users").document(user_phone).collection("inventory")
147
+ sales_ref = db.collection("users").document(user_phone).collection("sales")
148
+
149
  for transaction in transaction_data:
150
  item_name = transaction['details'].get('item')
151
  if not item_name:
152
  continue
153
 
154
  quantity = int(transaction['details'].get('quantity', 0))
155
+ item_doc = inventory_ref.document(item_name).get()
156
 
157
+ if not item_doc.exists:
158
+ logger.error(f"Item {item_name} not found in inventory")
159
+ return False
160
+
161
+ current_stock = int(item_doc.to_dict().get('details', {}).get('quantity', 0))
162
+ if current_stock < quantity:
163
+ logger.error(f"Insufficient stock for {item_name}")
164
  return False
165
 
166
+ # Update inventory
167
+ batch.update(inventory_ref.document(item_name), {
168
+ 'details.quantity': firestore.Increment(-quantity),
169
+ 'last_updated': datetime.now().isoformat()
170
+ })
171
 
172
+ # Record sale
173
+ sale_doc = sales_ref.document()
174
+ batch.set(sale_doc, {
175
+ **transaction,
176
+ 'timestamp': datetime.now().isoformat()
177
+ })
178
+
179
+ try:
180
+ batch.commit()
181
+ return True
182
+ except Exception as e:
183
+ logger.error(f"Sales transaction failed: {str(e)}")
184
+ return False
185
 
186
+ def read_datalake(user_phone: str, query: str) -> Union[str, Dict]:
187
+ """
188
+ Query user's transaction data
189
+ Returns formatted response or file path for visualizations
190
+ """
191
+ from pandasai import SmartDatalake
192
+
193
+ try:
194
+ # Get all inventory and sales data
195
+ inventory = [doc.to_dict() for doc in
196
+ db.collection("users").document(user_phone).collection("inventory").stream()]
197
+ sales = [doc.to_dict() for doc in
198
+ db.collection("users").document(user_phone).collection("sales").stream()]
199
+
200
+ if not inventory and not sales:
201
+ return "No transaction data found"
202
+
203
+ # Create DataFrames
204
+ inventory_df = pd.DataFrame(inventory)
205
+ sales_df = pd.DataFrame(sales)
206
+
207
+ # Initialize SmartDatalake
208
+ lake = SmartDatalake(
209
+ [inventory_df, sales_df],
210
+ config={
211
+ "llm": llm,
212
+ "enable_cache": False,
213
+ "save_logs": False
214
+ }
215
+ )
216
+
217
+ # Process query
218
+ response = lake.chat(query)
219
+
220
+ # Handle different response types
221
+ if isinstance(response, str):
222
+ return response
223
+ elif hasattr(response, 'to_dict'):
224
+ return response.to_dict()
225
+ return str(response)
226
+
227
+ except Exception as e:
228
+ logger.error(f"Data query failed: {str(e)}")
229
+ return "Sorry, I couldn't retrieve that data."
230
+
231
+ def update_transaction(user_phone: str, transaction_id: str,
232
+ updates: Dict, collection: str = "inventory") -> bool:
233
+ """
234
+ Update existing transaction
235
+ Returns True if successful, False otherwise
236
+ """
237
+ doc_ref = db.collection("users").document(user_phone).collection(collection).document(transaction_id)
238
 
239
+ if not doc_ref.get().exists:
240
+ return False
241
+
242
+ update_data = {
243
+ f"details.{k}": v for k,v in updates.items()
244
+ }
245
+ update_data['last_updated'] = datetime.now().isoformat()
246
 
247
+ try:
248
+ doc_ref.update(update_data)
249
+ return True
250
+ except Exception as e:
251
+ logger.error(f"Update failed: {str(e)}")
252
+ return False
253
+
254
+ def delete_transaction(user_phone: str, transaction_data: List[Dict]) -> bool:
255
+ """
256
+ Delete specified transactions
257
+ Returns True if successful, False otherwise
258
+ """
259
+ batch = db.batch()
260
+ collection_map = {
261
+ 'purchase': 'inventory',
262
+ 'sale': 'sales',
263
+ 'inventory': 'inventory'
264
+ }
265
 
266
+ for transaction in transaction_data:
267
+ item_name = transaction['details'].get('item')
268
+ if not item_name:
269
+ continue
270
+
271
+ trans_type = transaction.get('transaction_type', 'inventory').lower()
272
+ collection = collection_map.get(trans_type, 'inventory')
273
+
274
+ doc_ref = db.collection("users").document(user_phone).collection(collection).document(item_name)
275
+ batch.delete(doc_ref)
276
 
277
  try:
278
+ batch.commit()
279
+ return True
280
  except Exception as e:
281
+ logger.error(f"Deletion failed: {str(e)}")
282
+ return False
283
 
284
+ def persist_temporary_transaction(transactions: List[Dict], mobile: str) -> bool:
285
+ """
286
+ Store transactions temporarily before confirmation
287
+ Returns True if successful, False otherwise
288
+ """
289
  try:
290
+ doc_ref = db.collection("users").document(mobile).collection("temp_transactions").document("pending")
291
+ doc_ref.set({
292
  "transactions": transactions,
293
  "timestamp": datetime.now().isoformat()
294
  })
295
  return True
296
  except Exception as e:
297
+ logger.error(f"Temp storage failed: {str(e)}")
298
  return False
299
 
300
+ def format_transaction_response(transactions: Union[List[Dict], Dict]) -> str:
301
+ """
302
+ Format transaction data for user display
303
+ Returns formatted string
304
+ """
305
  if not transactions:
306
  return "No transaction data"
307
 
 
309
  transactions = [transactions]
310
 
311
  output = []
312
+ for idx, trans in enumerate(transactions, 1):
313
+ output.append(f"Transaction {idx}:" if len(transactions) > 1 else "Transaction Details:")
314
+ output.append(f"• Intent: {trans.get('intent', 'unknown').title()}")
315
+ output.append(f"• Type: {trans.get('transaction_type', 'unknown').title()}")
316
+
317
  if 'details' in trans:
318
+ output.append("Details:")
319
  for k, v in trans['details'].items():
320
+ if k == 'currency':
321
+ continue
322
+ if 'price' in k or 'amount' in k or 'cost' in k:
323
+ currency = trans['details'].get('currency', '$')
324
+ output.append(f" - {k.title()}: {currency}{v}")
325
+ else:
326
+ output.append(f" - {k.title()}: {v}")
327
+ output.append("") # Blank line between transactions
328
 
329
  return "\n".join(output)
330
 
331
+ def fetch_transaction(user_phone: str,
332
+ item_name: Optional[str] = None,
333
+ collection: str = "inventory") -> Union[Dict, List[Dict]]:
334
+ """
335
+ Retrieve transaction(s) from Firestore
336
+ Returns single dict for specific item or list of all transactions
337
+ """
338
+ col_ref = db.collection("users").document(user_phone).collection(collection)
339
+
340
  if item_name:
341
+ doc = col_ref.document(item_name).get()
342
  return doc.to_dict() if doc.exists else None
343
  else:
344
+ return [doc.to_dict() for doc in col_ref.stream()]
345
+
346
+
347
+ def process_intent(parsed_trans_data: List[Dict], mobile: str) -> str:
348
+ """
349
+ Route transactions to appropriate CRUD operation based on intent
350
+ Returns formatted response message for user
351
+ """
352
+ if not parsed_trans_data:
353
+ return "Error: No transaction data provided"
354
+
355
+ try:
356
+ intent = parsed_trans_data[0].get('intent', '').lower()
357
+ trans_type = parsed_trans_data[0].get('transaction_type', '').lower()
358
+ transaction_summary = format_transaction_response(parsed_trans_data)
359
+
360
+ # Route to appropriate operation
361
+ if intent == 'create':
362
+ if trans_type in ('purchase', 'inventory'):
363
+ success = create_inventory(mobile, parsed_trans_data)
364
+ return ("Inventory updated successfully!\n\n" + transaction_summary) if success else \
365
+ ("Failed to update inventory!\n\n" + transaction_summary)
366
+ elif trans_type == 'sale':
367
+ success = create_sale(mobile, parsed_trans_data)
368
+ return ("Sale recorded successfully!\n\n" + transaction_summary) if success else \
369
+ ("Failed to record sale! Check inventory.\n\n" + transaction_summary)
370
+ else:
371
+ return f"Unsupported transaction type: {trans_type}\n\n{transaction_summary}"
372
+
373
+ elif intent == 'read':
374
+ if 'item' in parsed_trans_data[0].get('details', {}):
375
+ item = parsed_trans_data[0]['details']['item']
376
+ result = fetch_transaction(mobile, item)
377
+ if result:
378
+ return f"Item details:\n\n{format_transaction_response(result)}"
379
+ return f"Item '{item}' not found"
380
+ else:
381
+ results = fetch_transaction(mobile)
382
+ return f"All inventory items:\n\n{format_transaction_response(results)}" if results else "Inventory is empty"
383
+
384
+ elif intent == 'update':
385
+ item = parsed_trans_data[0]['details'].get('item')
386
+ if not item:
387
+ return "Error: No item specified for update"
388
+
389
+ updates = {k:v for k,v in parsed_trans_data[0]['details'].items() if k != 'item'}
390
+ success = update_transaction(mobile, item, updates, trans_type if trans_type else 'inventory')
391
+ return (f"Updated {item} successfully!\n\n{transaction_summary}") if success else \
392
+ (f"Failed to update {item}!\n\n{transaction_summary}")
393
+
394
+ elif intent == 'delete':
395
+ item = parsed_trans_data[0]['details'].get('item', 'item')
396
+ success = delete_transaction(mobile, parsed_trans_data)
397
+ return (f"Deleted {item} successfully!") if success else \
398
+ (f"Failed to delete {item}!")
399
+
400
+ else:
401
+ return f"Unsupported intent: {intent}\n\n{transaction_summary}"
402
+
403
+ except Exception as e:
404
+ logger.error(f"Intent processing failed: {str(e)}")
405
+ return "Sorry, something went wrong while processing your request."