File size: 5,099 Bytes
ebe8608
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
from fastapi import FastAPI, HTTPException
import numpy as np
import matplotlib.pyplot as plt
from io import BytesIO
from google.cloud import storage
import os

app = FastAPI()

# Google Cloud Storage settings
GCS_BUCKET_NAME = "your-bucket-name"  # Replace with your GCS bucket name
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "path/to/your/service-account-key.json"  # Replace with your service account key path

class FinancialAnalyzer:
    def __init__(self, categories, hardcoded_limits):
        self.categories = categories
        self.hardcoded_limits = hardcoded_limits

    def generate_spending_data(self):
        spending_data = {}
        for category in self.categories:
            avg_daily_limit = self.hardcoded_limits[category] / 100
            spending_data[category] = np.random.uniform(0.8 * avg_daily_limit, 1.2 * avg_daily_limit, 100)
        return spending_data

    def analyze_spending(self, spending_data):
        overspent_categories = []
        category_to_avoid = None
        largest_excess = 0

        analysis_result = {"overspent_categories": [], "category_to_avoid": None, "details": {}}

        for category, daily_spending in spending_data.items():
            avg_daily_spending = np.mean(daily_spending)
            hardcoded_limit = self.hardcoded_limits[category] / 100

            analysis_result["details"][category] = {
                "average_daily_spending": avg_daily_spending,
                "daily_limit": hardcoded_limit,
                "status": "Within limit"
            }

            if avg_daily_spending > 1.10 * hardcoded_limit:
                overspent_categories.append(category)
                excess = avg_daily_spending - hardcoded_limit
                if excess > largest_excess:
                    largest_excess = excess
                    category_to_avoid = category
                analysis_result["details"][category]["status"] = "Overspending"

        analysis_result["overspent_categories"] = overspent_categories
        analysis_result["category_to_avoid"] = category_to_avoid

        return analysis_result

    def estimate_days_left(self, spending_data, current_balance):
        avg_daily_spending_all = np.mean([np.mean(spending) for spending in spending_data.values()])
        days_left = current_balance / avg_daily_spending_all if avg_daily_spending_all != 0 else float('inf')
        return days_left

    def plot_spending_trends(self, spending_data, days_left):
        plt.figure(figsize=(12, 8))
        for category, daily_spending in spending_data.items():
            plt.plot(range(1, 101), daily_spending, label=f"{category} (Actual)")
            extended_days = int(np.ceil(days_left))
            extended_spending = daily_spending[:extended_days]
            if len(extended_spending) < extended_days:
                extended_spending = np.concatenate([extended_spending, np.full(extended_days - len(extended_spending), daily_spending[-1])])
            plt.plot(range(101, 101 + extended_days), extended_spending, '--', label=f"{category} (Continued)")

        plt.axvline(x=100, color='red', linestyle='--', label="End of 100 Days")
        plt.xlabel("Days")
        plt.ylabel("Daily Spending ($)")
        plt.title("Spending Trends with Continued Spending until Balance Exhausts")
        plt.legend()
        plt.grid(True)

        # Save the plot to a BytesIO object
        buf = BytesIO()
        plt.savefig(buf, format="png")
        plt.close()
        buf.seek(0)
        return buf

def upload_to_gcs(bucket_name, file_name, file_data):
    """Uploads a file to Google Cloud Storage and makes it publicly accessible."""
    client = storage.Client()
    bucket = client.bucket(bucket_name)
    blob = bucket.blob(file_name)
    blob.upload_from_file(file_data, rewind=True)

    # Make the file publicly accessible
    blob.make_public()
    return blob.public_url

# Initialize the financial analyzer
categories = ['Food', 'Entertainment', 'Shopping', 'Rent', 'Travel']
hardcoded_limits = {
    'Food': 10000,
    'Entertainment': 1000,
    'Shopping': 4000,
    'Rent': 2600,
    'Travel': 3000
}
analyzer = FinancialAnalyzer(categories, hardcoded_limits)

@app.get("/analyze")
async def analyze(current_balance: float):
    # Generate spending data
    spending_data = analyzer.generate_spending_data()

    # Analyze spending
    analysis_result = analyzer.analyze_spending(spending_data)

    # Estimate days left
    days_left = analyzer.estimate_days_left(spending_data, current_balance)
    analysis_result["days_left"] = days_left

    # Generate the plot
    plot_buffer = analyzer.plot_spending_trends(spending_data, days_left)

    # Upload the plot to Google Cloud Storage
    file_name = f"spending_trends_{np.random.randint(1000, 9999)}.png"
    try:
        image_url = upload_to_gcs(GCS_BUCKET_NAME, file_name, plot_buffer)
    except Exception as e:
        raise HTTPException(status_code=500, detail=f"Failed to upload image to GCS: {str(e)}")

    # Return the GCS URL
    return {"analysis_result": analysis_result, "image_url": image_url}