Spaces:
Sleeping
Sleeping
| from langchain_google_genai import ChatGoogleGenerativeAI | |
| import pandas as pd | |
| import os | |
| import io | |
| from flask import Flask, request, jsonify | |
| from flask_cors import CORS, cross_origin | |
| import firebase_admin | |
| import logging | |
| from firebase_admin import credentials, firestore | |
| from dotenv import load_dotenv | |
| from pandasai import SmartDatalake | |
| from pandasai.responses.response_parser import ResponseParser | |
| from langchain.prompts import PromptTemplate | |
| from langchain.chains import LLMChain | |
| from datetime import datetime | |
| import matplotlib.pyplot as plt | |
| from statsmodels.tsa.holtwinters import ExponentialSmoothing | |
| from prophet import Prophet | |
| load_dotenv() | |
| app = Flask(__name__) | |
| cors = CORS(app) | |
| # Initialize Firebase app | |
| if not firebase_admin._apps: | |
| cred = credentials.Certificate("quant-app-99d09-firebase-adminsdk-6prb1-37f34e1c91.json") | |
| firebase_admin.initialize_app(cred) | |
| db = firestore.client() | |
| class FlaskResponse(ResponseParser): | |
| def __init__(self, context) -> None: | |
| super().__init__(context) | |
| def format_dataframe(self, result): | |
| return result['value'].to_html() | |
| def format_plot(self, result): | |
| try: | |
| img_path = result['value'] | |
| except ValueError: | |
| img_path = str(result['value']) | |
| print("ValueError:", img_path) | |
| print("response_class_path:", img_path) | |
| return img_path | |
| def format_other(self, result): | |
| return str(result['value']) | |
| gemini_api_key = os.getenv('Gemini') | |
| llm = ChatGoogleGenerativeAI(api_key=gemini_api_key, model='gemini-1.5-flash-001', temperature=0.1) | |
| # Endpoint for handling questions to the bot using transaction data | |
| def bot(): | |
| user_id = request.json.get("user_id") | |
| user_question = request.json.get("user_question") | |
| inventory_ref = db.collection("system_users").document(user_id).collection('inventory') | |
| tasks_ref = db.collection("system_users").document(user_id).collection('tasks') | |
| transactions_ref = db.collection("system_users").document(user_id).collection('transactions') | |
| inventory_list = [doc.to_dict() for doc in inventory_ref.stream()] | |
| tasks_list = [doc.to_dict() for doc in tasks_ref.stream()] | |
| transactions_list = [doc.to_dict() for doc in transactions_ref.stream()] | |
| inventory_df = pd.DataFrame(inventory_list) | |
| transactions_df = pd.DataFrame(transactions_list) | |
| tasks_df = pd.DataFrame(tasks_list) | |
| lake = SmartDatalake([inventory_df, transactions_df, tasks_df], config={"llm": llm, "response_parser": FlaskResponse, "enable_cache": False, "save_logs": False}) | |
| response = lake.chat(user_question) | |
| print(user_question) | |
| return jsonify(str(response)) | |
| # Marketing recommendations endpoint | |
| def marketing_rec(): | |
| user_id = request.json.get("user_id") | |
| transactions_ref = db.collection("system_users").document(user_id).collection('transactions') | |
| transactions_list = [doc.to_dict() for doc in transactions_ref.stream()] | |
| transactions_df = pd.DataFrame(transactions_list) | |
| prompt = PromptTemplate.from_template('You are a business analyst. Write a brief analysis and marketing tips for a small business using this transactions data {data_frame}') | |
| chain = LLMChain(llm=llm, prompt=prompt, verbose=True) | |
| response = chain.invoke(input=transactions_df) | |
| print(response) | |
| return jsonify(str(response['text'])) | |
| # Profit/Customer Engagement Prediction endpoint | |
| def predict_metric(): | |
| request_data = request.json | |
| user_id = request_data.get("user_id") | |
| interval = request_data.get("interval", 30) | |
| metric_type = request_data.get("metric_type", "Profit") # "Profit" or "Customer Engagement" | |
| transactions_ref = db.collection("system_users").document(user_id).collection("transactions") | |
| data = [] | |
| if metric_type == "Profit": | |
| # Fetch both Income and Expense transactions for Profit calculation | |
| income_query = transactions_ref.where("transactionType", "==", "Income").stream() | |
| expense_query = transactions_ref.where("transactionType", "==", "Expense").stream() | |
| income_data = {} | |
| expense_data = {} | |
| for doc in income_query: | |
| transaction = doc.to_dict() | |
| date_str = transaction["date"] | |
| amount = transaction["amountDue"] | |
| income_data[date_str] = income_data.get(date_str, 0) + amount | |
| for doc in expense_query: | |
| transaction = doc.to_dict() | |
| date_str = transaction["date"] | |
| amount = transaction["amountDue"] | |
| expense_data[date_str] = expense_data.get(date_str, 0) + amount | |
| # Calculate net profit for each date | |
| for date, income in income_data.items(): | |
| expense = expense_data.get(date, 0) | |
| data.append({"date": date, "amountDue": income - expense}) | |
| elif metric_type == "Customer Engagement": | |
| # Use count of Income transactions per day as Customer Engagement | |
| income_query = transactions_ref.where("transactionType", "==", "Income").stream() | |
| engagement_data = {} | |
| for doc in income_query: | |
| transaction = doc.to_dict() | |
| date_str = transaction["date"] | |
| engagement_data[date_str] = engagement_data.get(date_str, 0) + 1 | |
| for date, count in engagement_data.items(): | |
| data.append({"date": date, "amountDue": count}) | |
| # Create DataFrame from the aggregated data | |
| df = pd.DataFrame(data) | |
| # Ensure 'date' column is datetime | |
| df['date'] = pd.to_datetime(df['date']) | |
| df['date'] = df['date'].dt.tz_localize(None) | |
| # Set 'date' as index | |
| df = df.sort_values("date").set_index("date") | |
| # Resample daily to ensure regular intervals (fill missing dates) | |
| df = df.resample("D").sum().reset_index() | |
| df.columns = ["ds", "y"] # ds: date, y: target | |
| # Check if there's enough data to train the model | |
| if df.shape[0] < 10: | |
| return jsonify({"error": "Not enough data for prediction"}) | |
| # Initialize and fit the Prophet model | |
| model = Prophet(daily_seasonality=True, yearly_seasonality=True) | |
| model.fit(df) | |
| # DataFrame for future predictions | |
| future_dates = model.make_future_dataframe(periods=interval) | |
| forecast = model.predict(future_dates) | |
| # Extract the forecast for the requested interval | |
| forecast_data = forecast[['ds', 'yhat']].tail(interval) | |
| predictions = [{"date": row['ds'].strftime('%Y-%m-%d'), "value": row['yhat']} for _, row in forecast_data.iterrows()] | |
| # Return predictions in JSON format | |
| return jsonify({"predictedData": predictions}) | |
| if __name__ == "__main__": | |
| app.run(debug=True, host="0.0.0.0", port=7860) |