LogicGoInfotechSpaces commited on
Commit
7cf7d80
·
verified ·
1 Parent(s): fa10656

Update app/smart_recommendation.py

Browse files
Files changed (1) hide show
  1. app/smart_recommendation.py +373 -6
app/smart_recommendation.py CHANGED
@@ -109,8 +109,18 @@ class SmartBudgetRecommender:
109
  amount = expense.get("amount", 0)
110
  date = expense.get("date")
111
 
 
 
 
 
112
  if isinstance(date, str):
113
- date = datetime.fromisoformat(date.replace('Z', '+00:00'))
 
 
 
 
 
 
114
 
115
  category_data[category]["total"] += amount
116
  category_data[category]["count"] += 1
@@ -330,24 +340,381 @@ class SmartBudgetRecommender:
330
 
331
  try:
332
  response = requests.post(
333
- "https://api.openai.com/v1/responses",
334
  headers={
335
  "Authorization": f"Bearer {OPENAI_API_KEY}",
336
  "Content-Type": "application/json",
337
  },
338
  json={
339
- "model": "gpt-4.1-mini",
340
- "input": prompt,
 
 
341
  "temperature": 0.1,
342
  "response_format": {"type": "json_object"},
343
  },
344
  timeout=30,
345
  )
346
  response.raise_for_status()
347
- data = response.json()
348
- content = data["output"][0]["content"][0]["text"]
349
  return json.loads(content)
350
  except Exception as exc:
351
  print(f"OpenAI recommendation error for {category}: {exc}")
352
  return None
353
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
109
  amount = expense.get("amount", 0)
110
  date = expense.get("date")
111
 
112
+ # Handle date conversion - skip if date is None or invalid
113
+ if date is None:
114
+ continue
115
+
116
  if isinstance(date, str):
117
+ try:
118
+ date = datetime.fromisoformat(date.replace('Z', '+00:00'))
119
+ except (ValueError, AttributeError):
120
+ continue
121
+ elif not isinstance(date, datetime):
122
+ # If date is not a string or datetime, skip this expense
123
+ continue
124
 
125
  category_data[category]["total"] += amount
126
  category_data[category]["count"] += 1
 
340
 
341
  try:
342
  response = requests.post(
343
+ "https://api.openai.com/v1/chat/completions",
344
  headers={
345
  "Authorization": f"Bearer {OPENAI_API_KEY}",
346
  "Content-Type": "application/json",
347
  },
348
  json={
349
+ "model": "gpt-4o-mini",
350
+ "messages": [
351
+ {"role": "user", "content": prompt}
352
+ ],
353
  "temperature": 0.1,
354
  "response_format": {"type": "json_object"},
355
  },
356
  timeout=30,
357
  )
358
  response.raise_for_status()
359
+ response_data = response.json()
360
+ content = response_data["choices"][0]["message"]["content"]
361
  return json.loads(content)
362
  except Exception as exc:
363
  print(f"OpenAI recommendation error for {category}: {exc}")
364
  return None
365
 
366
+
367
+
368
+ # import json
369
+ # import math
370
+ # import os
371
+ # from collections import defaultdict
372
+ # from datetime import datetime, timedelta
373
+ # from typing import Dict, List
374
+
375
+ # import requests
376
+ # from dotenv import load_dotenv
377
+ # from bson import ObjectId
378
+
379
+ # from app.models import BudgetRecommendation, CategoryExpense
380
+
381
+ # load_dotenv()
382
+ # OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
383
+
384
+ # class SmartBudgetRecommender:
385
+ # """
386
+ # Smart Budget Recommendation Engine
387
+
388
+ # Analyzes past spending behavior and recommends personalized budgets
389
+ # for each category based on historical data.
390
+ # """
391
+
392
+ # def __init__(self, db):
393
+ # self.db = db
394
+
395
+ # def get_recommendations(self, user_id: str, month: int, year: int) -> List[BudgetRecommendation]:
396
+ # """
397
+ # Get budget recommendations for all categories based on past behavior.
398
+
399
+ # Args:
400
+ # user_id: User identifier
401
+ # month: Target month (1-12)
402
+ # year: Target year
403
+
404
+ # Returns:
405
+ # List of budget recommendations for each category
406
+ # """
407
+ # # 1) Try to build stats from existing budgets for this user (createdBy)
408
+ # category_data = self._get_category_stats_from_budgets(user_id, month, year)
409
+
410
+ # # 2) If there are no budgets, fall back to expenses history
411
+ # if not category_data:
412
+ # end_date = datetime(year, month, 1) - timedelta(days=1)
413
+ # start_date = end_date - timedelta(days=180) # ~6 months
414
+
415
+ # expenses = list(
416
+ # self.db.expenses.find(
417
+ # {
418
+ # "user_id": user_id,
419
+ # "date": {"$gte": start_date, "$lte": end_date},
420
+ # "type": "expense",
421
+ # }
422
+ # )
423
+ # )
424
+
425
+ # if not expenses:
426
+ # return []
427
+
428
+ # # Group expenses by category and calculate monthly averages
429
+ # category_data = self._calculate_category_statistics(
430
+ # expenses, start_date, end_date
431
+ # )
432
+
433
+ # recommendations: List[BudgetRecommendation] = []
434
+
435
+ # for category, data in category_data.items():
436
+ # avg_expense = data["average_monthly"]
437
+ # confidence = self._calculate_confidence(data)
438
+
439
+ # # 1) Try OpenAI first (primary source of recommendation)
440
+ # ai_result = self._get_ai_recommendation(category, data, avg_expense)
441
+ # if ai_result:
442
+ # recommended_budget = ai_result.get("recommended_budget")
443
+ # reason = ai_result.get("reason")
444
+ # action = ai_result.get("action")
445
+ # else:
446
+ # # 2) Fallback to rule-based recommendation
447
+ # recommended_budget = self._calculate_recommended_budget(avg_expense, data)
448
+ # reason = self._generate_reason(category, avg_expense, recommended_budget)
449
+ # action = None
450
+
451
+ # recommendations.append(BudgetRecommendation(
452
+ # category=category,
453
+ # average_expense=round(avg_expense, 2),
454
+ # recommended_budget=round(recommended_budget or 0, 2),
455
+ # reason=reason,
456
+ # confidence=confidence,
457
+ # action=action
458
+ # ))
459
+
460
+ # # Sort by average expense (highest first)
461
+ # recommendations.sort(key=lambda x: x.average_expense, reverse=True)
462
+
463
+ # return recommendations
464
+
465
+ # def _calculate_category_statistics(self, expenses: List[Dict], start_date: datetime, end_date: datetime) -> Dict:
466
+ # """Calculate statistics for each category"""
467
+ # category_data = defaultdict(lambda: {
468
+ # "total": 0,
469
+ # "count": 0,
470
+ # "months": set(),
471
+ # "monthly_totals": defaultdict(float)
472
+ # })
473
+
474
+ # for expense in expenses:
475
+ # category = expense.get("category", "Uncategorized")
476
+ # amount = expense.get("amount", 0)
477
+ # date = expense.get("date")
478
+
479
+ # if isinstance(date, str):
480
+ # date = datetime.fromisoformat(date.replace('Z', '+00:00'))
481
+
482
+ # category_data[category]["total"] += amount
483
+ # category_data[category]["count"] += 1
484
+
485
+ # # Track monthly totals
486
+ # month_key = (date.year, date.month)
487
+ # category_data[category]["months"].add(month_key)
488
+ # category_data[category]["monthly_totals"][month_key] += amount
489
+
490
+ # # Calculate averages
491
+ # result = {}
492
+ # for category, data in category_data.items():
493
+ # num_months = len(data["months"]) or 1
494
+ # avg_monthly = data["total"] / num_months
495
+
496
+ # # Calculate standard deviation for variability
497
+ # monthly_values = list(data["monthly_totals"].values())
498
+ # if len(monthly_values) > 1:
499
+ # mean = sum(monthly_values) / len(monthly_values)
500
+ # variance = sum((x - mean) ** 2 for x in monthly_values) / len(monthly_values)
501
+ # std_dev = math.sqrt(variance)
502
+ # else:
503
+ # std_dev = 0
504
+
505
+ # result[category] = {
506
+ # "average_monthly": avg_monthly,
507
+ # "total": data["total"],
508
+ # "count": data["count"],
509
+ # "months_analyzed": num_months,
510
+ # "std_dev": std_dev,
511
+ # "monthly_values": monthly_values
512
+ # }
513
+
514
+ # return result
515
+
516
+ # def _calculate_recommended_budget(self, avg_expense: float, data: Dict) -> float:
517
+ # """
518
+ # Calculate recommended budget based on average expense.
519
+
520
+ # Strategy:
521
+ # - Base: Average monthly expense
522
+ # - Add 5% buffer for variability
523
+ # - Round to nearest 100 for cleaner numbers
524
+ # """
525
+ # # Add 5% buffer to handle variability
526
+ # buffer = avg_expense * 0.05
527
+
528
+ # # If there's high variability (std_dev > 20% of mean), add more buffer
529
+ # if data["std_dev"] > 0:
530
+ # coefficient_of_variation = data["std_dev"] / avg_expense if avg_expense > 0 else 0
531
+ # if coefficient_of_variation > 0.2:
532
+ # buffer = avg_expense * 0.10 # 10% buffer for high variability
533
+
534
+ # recommended = avg_expense + buffer
535
+
536
+ # # Round to nearest 100 for cleaner budget numbers
537
+ # recommended = round(recommended / 100) * 100
538
+
539
+ # # Ensure minimum of 100 if there was any expense
540
+ # if recommended < 100 and avg_expense > 0:
541
+ # recommended = 100
542
+
543
+ # return recommended
544
+
545
+ # def _calculate_confidence(self, data: Dict) -> float:
546
+ # """
547
+ # Calculate confidence score (0-1) based on data quality.
548
+
549
+ # Factors:
550
+ # - Number of months analyzed (more = higher confidence)
551
+ # - Number of transactions (more = higher confidence)
552
+ # - Consistency of spending (lower std_dev = higher confidence)
553
+ # """
554
+ # months_score = min(data["months_analyzed"] / 6, 1.0) # Max at 6 months
555
+ # count_score = min(data["count"] / 10, 1.0) # Max at 10 transactions
556
+
557
+ # # Consistency score (inverse of coefficient of variation)
558
+ # if data["average_monthly"] > 0:
559
+ # cv = data["std_dev"] / data["average_monthly"]
560
+ # consistency_score = max(0, 1 - min(cv, 1.0)) # Lower CV = higher score
561
+ # else:
562
+ # consistency_score = 0.5
563
+
564
+ # # Weighted average
565
+ # confidence = (months_score * 0.4 + count_score * 0.3 + consistency_score * 0.3)
566
+
567
+ # return round(confidence, 2)
568
+
569
+ # def _generate_reason(self, category: str, avg_expense: float, recommended_budget: float) -> str:
570
+ # """Generate human-readable reason for the recommendation"""
571
+ # # Format amounts with currency symbol
572
+ # avg_formatted = f"Rs.{avg_expense:,.0f}"
573
+ # budget_formatted = f"Rs.{recommended_budget:,.0f}"
574
+
575
+ # if recommended_budget > avg_expense:
576
+ # buffer = recommended_budget - avg_expense
577
+ # buffer_pct = (buffer / avg_expense * 100) if avg_expense > 0 else 0
578
+ # return (
579
+ # f"Your average monthly {category.lower()} expense is {avg_formatted}. "
580
+ # f"We suggest setting your budget to {budget_formatted} for next month "
581
+ # f"(includes a {buffer_pct:.0f}% buffer for variability)."
582
+ # )
583
+ # else:
584
+ # return (
585
+ # f"Your average monthly {category.lower()} expense is {avg_formatted}. "
586
+ # f"We recommend a budget of {budget_formatted} for next month."
587
+ # )
588
+
589
+ # def get_category_averages(self, user_id: str, months: int = 3) -> List[CategoryExpense]:
590
+ # """Get average expenses by category for the past N months"""
591
+ # end_date = datetime.now()
592
+ # start_date = end_date - timedelta(days=months * 30)
593
+
594
+ # expenses = list(self.db.expenses.find({
595
+ # "user_id": user_id,
596
+ # "date": {"$gte": start_date, "$lte": end_date},
597
+ # "type": "expense"
598
+ # }))
599
+
600
+ # if not expenses:
601
+ # return []
602
+
603
+ # category_data = self._calculate_category_statistics(expenses, start_date, end_date)
604
+
605
+ # result = []
606
+ # for category, data in category_data.items():
607
+ # result.append(CategoryExpense(
608
+ # category=category,
609
+ # average_monthly_expense=round(data["average_monthly"], 2),
610
+ # total_expenses=data["count"],
611
+ # months_analyzed=data["months_analyzed"]
612
+ # ))
613
+
614
+ # result.sort(key=lambda x: x.average_monthly_expense, reverse=True)
615
+ # return result
616
+
617
+ # def _get_category_stats_from_budgets(
618
+ # self, user_id: str, month: int, year: int
619
+ # ) -> Dict:
620
+ # """
621
+ # Build category stats from existing budgets for this user.
622
+
623
+ # We treat each budget document (e.g. \"Office Maintenance\", \"LOGICGO\")
624
+ # as a spending category and derive an \"average\" from its amounts.
625
+ # """
626
+ # # createdBy is stored as ObjectId in WalletSync, while user_id is a string.
627
+ # # Try to cast to ObjectId; if it fails, fall back to matching the raw string.
628
+ # query: Dict = {"status": "OPEN"}
629
+ # try:
630
+ # query["createdBy"] = ObjectId(user_id)
631
+ # except Exception:
632
+ # query["createdBy"] = user_id
633
+
634
+ # budgets = list(self.db.budgets.find(query))
635
+
636
+ # if not budgets:
637
+ # return {}
638
+
639
+ # result: Dict[str, Dict] = {}
640
+ # for b in budgets:
641
+ # # Use budget \"name\" as category label
642
+ # category = b.get("name", "Uncategorized")
643
+
644
+ # # Derive a base amount from WalletSync fields
645
+ # max_amount = float(b.get("maxAmount", 0) or 0)
646
+ # spend_amount = float(b.get("spendAmount", 0) or 0)
647
+
648
+ # # If there is recorded spend, use that as \"average\"; otherwise maxAmount
649
+ # base_amount = spend_amount if spend_amount > 0 else max_amount
650
+ # if base_amount <= 0:
651
+ # continue
652
+
653
+ # if category not in result:
654
+ # result[category] = {
655
+ # "average_monthly": base_amount,
656
+ # "total": base_amount,
657
+ # "count": 1,
658
+ # "months_analyzed": 1,
659
+ # "std_dev": 0.0,
660
+ # "monthly_values": [base_amount],
661
+ # }
662
+ # else:
663
+ # # If multiple budgets per category, average them
664
+ # result[category]["total"] += base_amount
665
+ # result[category]["count"] += 1
666
+ # result[category]["months_analyzed"] = result[category]["count"]
667
+ # result[category]["average_monthly"] = (
668
+ # result[category]["total"] / result[category]["count"]
669
+ # )
670
+ # result[category]["monthly_values"].append(base_amount)
671
+
672
+ # return result
673
+
674
+ # def _get_ai_recommendation(self, category: str, data: Dict, avg_expense: float):
675
+ # """Use OpenAI to refine the budget recommendation."""
676
+ # if not OPENAI_API_KEY:
677
+ # return None
678
+
679
+ # history = ", ".join(f"{value:.0f}" for value in data["monthly_values"])
680
+ # summary = (
681
+ # f"Category: {category}\n"
682
+ # f"Monthly totals: [{history}]\n"
683
+ # f"Average spend: {avg_expense:.2f}\n"
684
+ # f"Std deviation: {data['std_dev']:.2f}\n"
685
+ # f"Months observed: {data['months_analyzed']}\n"
686
+ # )
687
+
688
+ # prompt = (
689
+ # "You are an Indian personal finance coach. "
690
+ # "Given the user's spending history, decide whether to increase, decrease, "
691
+ # "or keep the upcoming month's budget and provide a short explanation. "
692
+ # "Respond strictly as JSON with the following keys:\n"
693
+ # '{ "recommended_budget": number, "action": "increase|decrease|keep", "reason": "string" }.\n'
694
+ # "Use rupees for all amounts.\n\n"
695
+ # f"{summary}"
696
+ # )
697
+
698
+ # try:
699
+ # response = requests.post(
700
+ # "https://api.openai.com/v1/responses",
701
+ # headers={
702
+ # "Authorization": f"Bearer {OPENAI_API_KEY}",
703
+ # "Content-Type": "application/json",
704
+ # },
705
+ # json={
706
+ # "model": "gpt-4.1-mini",
707
+ # "input": prompt,
708
+ # "temperature": 0.1,
709
+ # "response_format": {"type": "json_object"},
710
+ # },
711
+ # timeout=30,
712
+ # )
713
+ # response.raise_for_status()
714
+ # data = response.json()
715
+ # content = data["output"][0]["content"][0]["text"]
716
+ # return json.loads(content)
717
+ # except Exception as exc:
718
+ # print(f"OpenAI recommendation error for {category}: {exc}")
719
+ # return None
720
+