Tahasaif3 commited on
Commit
1466ee6
·
1 Parent(s): 800595f
Files changed (1) hide show
  1. ageents/ledgerAgent.py +86 -72
ageents/ledgerAgent.py CHANGED
@@ -8,7 +8,7 @@ import os
8
  import uuid
9
  import json
10
  from datetime import datetime
11
- from typing import Optional, Dict, Any
12
  from pydantic import BaseModel, Field
13
  from dotenv import load_dotenv
14
  from openai import AsyncOpenAI
@@ -62,32 +62,54 @@ class ManualEntryData(BaseModel):
62
 
63
  class ProcessedEntry(BaseModel):
64
  """Validated and processed entry"""
65
- entry_type: str # "expense" or "income"
66
  category: str
67
  amount: float
68
  currency: str = "PKR"
69
  payment_method: str
70
  notes: str
71
  user_id: str
72
- recorded_by: str # "voice" or "manual"
73
  device_id: str
74
  confidence: float = 1.0
75
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
76
  # ================== FUNCTION TOOLS ==================
77
 
78
  @function_tool()
79
- def validate_entry_data(entry: Dict[str, Any]) -> Dict[str, Any]:
 
 
 
 
 
80
  """
81
- Validate entry fields against business rules
82
- Returns validation result with any corrections
83
  """
84
  errors = []
85
  warnings = []
 
 
86
 
87
  # Validate amount
88
- if entry.get("amount", 0) <= 0:
89
  errors.append("Amount must be greater than 0")
90
- if entry.get("amount", 0) > 1000000:
91
  warnings.append("Large amount detected - verify correctness")
92
 
93
  # Validate category
@@ -95,41 +117,43 @@ def validate_entry_data(entry: Dict[str, Any]) -> Dict[str, Any]:
95
  "fertilizer", "seeds", "fuel", "labour", "pesticide",
96
  "equipment", "irrigation", "transport", "other"
97
  ]
98
- if entry.get("category", "").lower() not in valid_categories:
99
- errors.append(f"Invalid category. Valid options: {', '.join(valid_categories)}")
 
100
 
101
  # Validate payment method
102
  valid_methods = ["cash", "bank", "mobile", "credit"]
103
- if entry.get("payment_method", "").lower() not in valid_methods:
104
  warnings.append("Unknown payment method - setting to cash")
105
- entry["payment_method"] = "cash"
106
 
107
  # Validate user_id
108
- if not entry.get("user_id"):
109
  errors.append("user_id is required")
110
 
111
- return {
112
- "valid": len(errors) == 0,
113
- "errors": errors,
114
- "warnings": warnings,
115
- "corrected_entry": entry,
116
- }
 
117
 
118
  @function_tool()
119
- def determine_entry_type(category: str, amount: float, notes: str) -> str:
 
 
 
 
120
  """
121
- Intelligent determination of entry type (income/expense)
122
- Based on category and contextual clues from notes
123
  """
124
- # Default is expense
125
  entry_type = "expense"
126
 
127
- # Income indicators in notes
128
- income_keywords = ["sale", "sold", "income", "earned", "received", "بیچا", "آمدن"]
129
  if any(keyword in notes.lower() for keyword in income_keywords):
130
  entry_type = "income"
131
 
132
- # Categories typically associated with income
133
  income_categories = ["sale", "income"]
134
  if category.lower() in income_categories:
135
  entry_type = "income"
@@ -148,10 +172,10 @@ def add_ledger_entry(
148
  recorded_by: str,
149
  device_id: str,
150
  confidence: float = 1.0,
151
- ) -> EntryResponse:
152
  """
153
  Add validated ledger entry to Firebase.
154
- Automatically syncs to all connected clients.
155
  """
156
  try:
157
  entry_id = str(uuid.uuid4())
@@ -177,30 +201,19 @@ def add_ledger_entry(
177
  },
178
  }
179
 
180
- # Save to Firebase under "entries" collection
181
  ref = get_db_ref("entries")
182
  ref.child(entry_id).set(entry_data)
183
 
184
  print(f"✅ Entry saved to Firebase: {entry_id}")
 
185
 
186
- return EntryResponse(
187
- id=entry_id,
188
- entryType=entry_type,
189
- category=category,
190
- amount=amount,
191
- currency=currency,
192
- paymentMethod=payment_method,
193
- notes=notes,
194
- createdAt=created_at,
195
- recordedBy=recorded_by,
196
- deviceId=device_id,
197
- )
198
  except Exception as e:
199
  print(f"❌ Error saving entry: {str(e)}")
200
- raise
201
 
202
  @function_tool()
203
- def sync_entry_to_frontend(entry_id: str, user_id: str) -> Dict[str, Any]:
204
  """
205
  Trigger frontend sync for new entry.
206
  Frontend listens for "expense-added" event.
@@ -210,23 +223,21 @@ def sync_entry_to_frontend(entry_id: str, user_id: str) -> Dict[str, Any]:
210
  entry = ref.get()
211
 
212
  if not entry:
213
- return {"status": "error", "message": "Entry not found"}
214
 
215
- # Frontend will receive this via real-time listener
216
- sync_data = {
217
- "status": "synced",
218
- "entry_id": entry_id,
219
- "user_id": user_id,
220
- "timestamp": datetime.utcnow().isoformat(),
221
- "event": "expense-added",
222
- }
223
 
224
  print(f"🔄 Entry {entry_id} marked for frontend sync")
225
- return {"status": "success", "data": sync_data}
226
 
227
  except Exception as e:
228
  print(f"❌ Sync error: {str(e)}")
229
- return {"status": "error", "message": str(e)}
230
 
231
  # ================== AGENT DEFINITION ==================
232
 
@@ -248,6 +259,7 @@ When processing entries:
248
  - Extract clear, concise notes from the original input
249
  - Determine entry type (expense vs income) based on category and context
250
  - Use the add_ledger_entry tool to persist to database
 
251
 
252
  For Urdu/Roman Urdu entries:
253
  - "khad" = fertilizer, "beej" = seeds, "dawa" = pesticide, "mazdoor" = labour
@@ -264,10 +276,10 @@ Always confirm successful entry addition and provide user feedback.""",
264
 
265
  # ================== PROCESSING FUNCTIONS ==================
266
 
267
- async def process_voice_entry(voice_data: VoiceEntryData) -> EntryResponse:
268
  """
269
  Process a voice-parsed entry through the agent.
270
- Called when user confirms voice entry from frontend.
271
  """
272
  print(f"\n🎙️ Processing voice entry: {voice_data.transcript}")
273
 
@@ -288,25 +300,26 @@ Device: {voice_data.device_id}
288
  Recorded: {voice_data.recorded_at}
289
 
290
  Steps:
291
- 1. Validate the entry data
292
- 2. Determine if this is income or expense
293
  3. Add to ledger via add_ledger_entry
294
- 4. Confirm successful addition
 
295
 
296
- Respond in Urdu/English mix as appropriate."""
297
 
298
  try:
299
  result = await runner.run(prompt)
300
  print(f"✅ Voice entry processed successfully")
301
- return result
302
  except Exception as e:
303
  print(f"❌ Error processing voice entry: {str(e)}")
304
  raise
305
 
306
- async def process_manual_entry(manual_data: ManualEntryData) -> EntryResponse:
307
  """
308
  Process a manual form entry through the agent.
309
- Called when user submits the manual expense form.
310
  """
311
  print(f"\n📝 Processing manual entry: {manual_data.category}")
312
 
@@ -326,24 +339,25 @@ User ID: {manual_data.user_id}
326
  Device: {manual_data.device_id}
327
 
328
  Steps:
329
- 1. Validate all entry fields
330
- 2. Determine entry type (should be "expense" for manual entries)
331
- 3. Add to ledger via add_ledger_entry tool
332
- 4. Confirm and sync to frontend
 
333
 
334
- Respond in Urdu/English mix as appropriate."""
335
 
336
  try:
337
  result = await runner.run(prompt)
338
  print(f"✅ Manual entry processed successfully")
339
- return result
340
  except Exception as e:
341
  print(f"❌ Error processing manual entry: {str(e)}")
342
  raise
343
 
344
- async def process_batch_entries(entries: list[Dict[str, Any]], user_id: str) -> list[EntryResponse]:
345
  """
346
- Process multiple entries in batch (e.g., from offline sync).
347
  """
348
  print(f"\n📦 Processing batch of {len(entries)} entries for user {user_id}")
349
 
@@ -356,7 +370,7 @@ async def process_batch_entries(entries: list[Dict[str, Any]], user_id: str) ->
356
  result = await process_manual_entry(ManualEntryData(**entry))
357
  results.append(result)
358
  except Exception as e:
359
- print(f"⚠️ Error in batch processing: {str(e)}")
360
  continue
361
 
362
  return results
 
8
  import uuid
9
  import json
10
  from datetime import datetime
11
+ from typing import Optional
12
  from pydantic import BaseModel, Field
13
  from dotenv import load_dotenv
14
  from openai import AsyncOpenAI
 
62
 
63
  class ProcessedEntry(BaseModel):
64
  """Validated and processed entry"""
65
+ entry_type: str
66
  category: str
67
  amount: float
68
  currency: str = "PKR"
69
  payment_method: str
70
  notes: str
71
  user_id: str
72
+ recorded_by: str
73
  device_id: str
74
  confidence: float = 1.0
75
 
76
+ class ValidationResult(BaseModel):
77
+ """Validation result response"""
78
+ valid: bool
79
+ errors: list[str] = Field(default_factory=list)
80
+ warnings: list[str] = Field(default_factory=list)
81
+ corrected_category: Optional[str] = None
82
+ corrected_amount: Optional[float] = None
83
+
84
+ class SyncResult(BaseModel):
85
+ """Sync result response"""
86
+ status: str
87
+ entry_id: str
88
+ message: str
89
+ timestamp: str
90
+
91
  # ================== FUNCTION TOOLS ==================
92
 
93
  @function_tool()
94
+ def validate_entry_data(
95
+ amount: float,
96
+ category: str,
97
+ payment_method: str,
98
+ user_id: str,
99
+ ) -> ValidationResult:
100
  """
101
+ Validate entry fields against business rules.
102
+ Returns validation result with corrections if needed.
103
  """
104
  errors = []
105
  warnings = []
106
+ corrected_category = category
107
+ corrected_amount = amount
108
 
109
  # Validate amount
110
+ if amount <= 0:
111
  errors.append("Amount must be greater than 0")
112
+ if amount > 1000000:
113
  warnings.append("Large amount detected - verify correctness")
114
 
115
  # Validate category
 
117
  "fertilizer", "seeds", "fuel", "labour", "pesticide",
118
  "equipment", "irrigation", "transport", "other"
119
  ]
120
+ if category.lower() not in valid_categories:
121
+ errors.append(f"Invalid category. Valid: fertilizer, seeds, fuel, labour, pesticide, equipment, irrigation, transport, other")
122
+ corrected_category = "other"
123
 
124
  # Validate payment method
125
  valid_methods = ["cash", "bank", "mobile", "credit"]
126
+ if payment_method.lower() not in valid_methods:
127
  warnings.append("Unknown payment method - setting to cash")
 
128
 
129
  # Validate user_id
130
+ if not user_id or len(user_id) == 0:
131
  errors.append("user_id is required")
132
 
133
+ return ValidationResult(
134
+ valid=len(errors) == 0,
135
+ errors=errors,
136
+ warnings=warnings,
137
+ corrected_category=corrected_category,
138
+ corrected_amount=corrected_amount,
139
+ )
140
 
141
  @function_tool()
142
+ def determine_entry_type(
143
+ category: str,
144
+ amount: float,
145
+ notes: str,
146
+ ) -> str:
147
  """
148
+ Intelligent determination of entry type (income/expense).
149
+ Based on category and contextual clues from notes.
150
  """
 
151
  entry_type = "expense"
152
 
153
+ income_keywords = ["sale", "sold", "income", "earned", "received", "بیچا", "آمدن", "harvest"]
 
154
  if any(keyword in notes.lower() for keyword in income_keywords):
155
  entry_type = "income"
156
 
 
157
  income_categories = ["sale", "income"]
158
  if category.lower() in income_categories:
159
  entry_type = "income"
 
172
  recorded_by: str,
173
  device_id: str,
174
  confidence: float = 1.0,
175
+ ) -> str:
176
  """
177
  Add validated ledger entry to Firebase.
178
+ Returns the entry ID.
179
  """
180
  try:
181
  entry_id = str(uuid.uuid4())
 
201
  },
202
  }
203
 
204
+ # Save to Firebase
205
  ref = get_db_ref("entries")
206
  ref.child(entry_id).set(entry_data)
207
 
208
  print(f"✅ Entry saved to Firebase: {entry_id}")
209
+ return entry_id
210
 
 
 
 
 
 
 
 
 
 
 
 
 
211
  except Exception as e:
212
  print(f"❌ Error saving entry: {str(e)}")
213
+ raise ValueError(f"Failed to save entry: {str(e)}")
214
 
215
  @function_tool()
216
+ def sync_entry_to_frontend(entry_id: str, user_id: str) -> SyncResult:
217
  """
218
  Trigger frontend sync for new entry.
219
  Frontend listens for "expense-added" event.
 
223
  entry = ref.get()
224
 
225
  if not entry:
226
+ raise ValueError("Entry not found after save")
227
 
228
+ sync_data = SyncResult(
229
+ status="synced",
230
+ entry_id=entry_id,
231
+ message=f"Entry {entry_id} synced successfully",
232
+ timestamp=datetime.utcnow().isoformat(),
233
+ )
 
 
234
 
235
  print(f"🔄 Entry {entry_id} marked for frontend sync")
236
+ return sync_data
237
 
238
  except Exception as e:
239
  print(f"❌ Sync error: {str(e)}")
240
+ raise ValueError(f"Sync failed: {str(e)}")
241
 
242
  # ================== AGENT DEFINITION ==================
243
 
 
259
  - Extract clear, concise notes from the original input
260
  - Determine entry type (expense vs income) based on category and context
261
  - Use the add_ledger_entry tool to persist to database
262
+ - Use sync_entry_to_frontend to trigger frontend updates
263
 
264
  For Urdu/Roman Urdu entries:
265
  - "khad" = fertilizer, "beej" = seeds, "dawa" = pesticide, "mazdoor" = labour
 
276
 
277
  # ================== PROCESSING FUNCTIONS ==================
278
 
279
+ async def process_voice_entry(voice_data: VoiceEntryData) -> str:
280
  """
281
  Process a voice-parsed entry through the agent.
282
+ Returns entry ID on success.
283
  """
284
  print(f"\n🎙️ Processing voice entry: {voice_data.transcript}")
285
 
 
300
  Recorded: {voice_data.recorded_at}
301
 
302
  Steps:
303
+ 1. Validate the entry data using validate_entry_data
304
+ 2. Determine if this is income or expense using determine_entry_type
305
  3. Add to ledger via add_ledger_entry
306
+ 4. Sync to frontend using sync_entry_to_frontend
307
+ 5. Confirm successful addition
308
 
309
+ Respond in Urdu/English mix as appropriate. Be brief and direct."""
310
 
311
  try:
312
  result = await runner.run(prompt)
313
  print(f"✅ Voice entry processed successfully")
314
+ return str(result)
315
  except Exception as e:
316
  print(f"❌ Error processing voice entry: {str(e)}")
317
  raise
318
 
319
+ async def process_manual_entry(manual_data: ManualEntryData) -> str:
320
  """
321
  Process a manual form entry through the agent.
322
+ Returns entry ID on success.
323
  """
324
  print(f"\n📝 Processing manual entry: {manual_data.category}")
325
 
 
339
  Device: {manual_data.device_id}
340
 
341
  Steps:
342
+ 1. Validate all entry fields using validate_entry_data
343
+ 2. Determine entry type using determine_entry_type (should be "expense")
344
+ 3. Add to ledger via add_ledger_entry
345
+ 4. Sync to frontend using sync_entry_to_frontend
346
+ 5. Confirm successful addition
347
 
348
+ Respond briefly and directly."""
349
 
350
  try:
351
  result = await runner.run(prompt)
352
  print(f"✅ Manual entry processed successfully")
353
+ return str(result)
354
  except Exception as e:
355
  print(f"❌ Error processing manual entry: {str(e)}")
356
  raise
357
 
358
+ async def process_batch_entries(entries: list, user_id: str) -> list:
359
  """
360
+ Process multiple entries in batch.
361
  """
362
  print(f"\n📦 Processing batch of {len(entries)} entries for user {user_id}")
363
 
 
370
  result = await process_manual_entry(ManualEntryData(**entry))
371
  results.append(result)
372
  except Exception as e:
373
+ print(f"⚠️ Error in batch: {str(e)}")
374
  continue
375
 
376
  return results