therickglenn's picture
Update app.py
b57cbdc verified
import streamlit as st
import pandas as pd
import numpy as np
import plotly.express as px
import plotly.graph_objects as go
import yfinance as yf
from datetime import datetime, timedelta
import random
from dateutil.relativedelta import relativedelta
from math import ceil
import sqlite3
import pdfplumber
import re
from fuzzywuzzy import fuzz
import time
import logging
# -------------------------------------------------
# 1) Database Utilities
# -------------------------------------------------
DB_PATH = "bank_data.db"
def get_connection(db_path=DB_PATH, encryption_key="default_secret"):
try:
from pysqlcipher3 import dbapi2 as sqlite
conn = sqlite.connect(db_path)
conn.execute(f"PRAGMA key='{encryption_key}'")
except ImportError:
import sqlite3
conn = sqlite3.connect(db_path)
return conn
def initialize_db():
"""Create tables if they don't exist."""
conn = get_connection()
cursor = conn.cursor()
# Debts Table
cursor.execute("""
CREATE TABLE IF NOT EXISTS debts (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT,
amount REAL,
rate REAL,
min_payment REAL,
category TEXT,
due_date TEXT
)
""")
# Income Table
cursor.execute("""
CREATE TABLE IF NOT EXISTS income (
id INTEGER PRIMARY KEY AUTOINCREMENT,
source TEXT,
amount REAL,
frequency TEXT
)
""")
# Expenses Table
cursor.execute("""
CREATE TABLE IF NOT EXISTS expenses (
id INTEGER PRIMARY KEY AUTOINCREMENT,
category TEXT,
amount REAL,
is_recurring INTEGER,
start_date TEXT,
end_date TEXT,
frequency TEXT,
single_date TEXT
)
""")
# Goals Table
cursor.execute("""
CREATE TABLE IF NOT EXISTS goals (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT,
category TEXT,
target REAL,
date TEXT,
current REAL
)
""")
# Savings History Table
cursor.execute("""
CREATE TABLE IF NOT EXISTS savings_history (
id INTEGER PRIMARY KEY AUTOINCREMENT,
date TEXT,
amount REAL
)
""")
# Watchlist Table (for user tickers)
cursor.execute("""
CREATE TABLE IF NOT EXISTS watchlist (
id INTEGER PRIMARY KEY AUTOINCREMENT,
ticker TEXT
)
""")
# Add 'quantity' column if it doesn't exist
cursor.execute("PRAGMA table_info(watchlist)")
columns = [info[1] for info in cursor.fetchall()]
if 'quantity' not in columns:
cursor.execute(
"ALTER TABLE watchlist ADD COLUMN quantity INTEGER DEFAULT 0")
# Bank Keywords Table
cursor.execute("""
CREATE TABLE IF NOT EXISTS bank_keywords (
id INTEGER PRIMARY KEY AUTOINCREMENT,
bank_name TEXT NOT NULL,
keyword TEXT NOT NULL,
category TEXT NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
# Populate with default mappings if empty
cursor.execute("SELECT COUNT(*) FROM bank_keywords")
if cursor.fetchone()[0] == 0:
default_mappings = [
# Chase Bank
("Chase", "EMPLOYER", "Income"),
("Chase", "DIRECT DEP", "Income"),
("Chase", "MORTGAGE", "Housing"),
("Chase", "RENT", "Housing"),
("Chase", "NETFLIX", "Entertainment"),
# Bank of America
("Bank of America", "DIR DEP", "Income"),
("Bank of America", "PAYROLL", "Income"),
("Bank of America", "AMZN", "Shopping"),
# Wells Fargo
("Wells Fargo", "SALARY", "Income"),
("Wells Fargo", "UBER", "Transportation"),
]
cursor.executemany("""
INSERT INTO bank_keywords (bank_name, keyword, category)
VALUES (?, ?, ?)
""", default_mappings)
conn.commit()
conn.close()
# --------------- HELPER CRUD FUNCTIONS ---------------
# Debts
def fetch_debts():
conn = get_connection()
cursor = conn.cursor()
cursor.execute(
"SELECT id, name, amount, rate, min_payment, category, due_date FROM debts")
rows = cursor.fetchall()
conn.close()
debts = []
for row in rows:
debts.append({
"id": row[0],
"name": row[1],
"amount": row[2],
"rate": row[3],
"min_payment": row[4],
"category": row[5],
"due_date": row[6]
})
return debts
def insert_debt(name, amount, rate, min_payment, category, due_date):
conn = get_connection()
cursor = conn.cursor()
cursor.execute("""
INSERT INTO debts (name, amount, rate, min_payment, category, due_date)
VALUES (?, ?, ?, ?, ?, ?)
""", (name, amount, rate, min_payment, category, due_date))
conn.commit()
conn.close()
# Income
def fetch_income():
conn = get_connection()
cursor = conn.cursor()
cursor.execute("SELECT id, source, amount, frequency FROM income")
rows = cursor.fetchall()
conn.close()
incomes = []
for row in rows:
incomes.append({
"id": row[0],
"Source": row[1],
"Amount": row[2],
"Frequency": row[3]
})
return incomes
def insert_income(source, amount, frequency):
conn = get_connection()
cursor = conn.cursor()
cursor.execute("""
INSERT INTO income (source, amount, frequency)
VALUES (?, ?, ?)
""", (source, amount, frequency))
conn.commit()
conn.close()
# Expenses
def fetch_expenses():
conn = get_connection()
cursor = conn.cursor()
cursor.execute(
"SELECT id, category, amount, is_recurring, start_date, end_date, frequency, single_date FROM expenses")
rows = cursor.fetchall()
conn.close()
expenses = []
for row in rows:
expenses.append({
"id": row[0],
"category": row[1],
"amount": row[2],
"is_recurring": bool(row[3]),
"start_date": row[4],
"end_date": row[5],
"frequency": row[6],
"single_date": row[7]
})
return expenses
def insert_expense(category, amount, is_recurring, start_date, end_date, frequency, single_date):
conn = get_connection()
cursor = conn.cursor()
cursor.execute("""
INSERT INTO expenses (category, amount, is_recurring, start_date, end_date, frequency, single_date)
VALUES (?, ?, ?, ?, ?, ?, ?)
""", (category, amount, int(is_recurring), start_date, end_date, frequency, single_date))
conn.commit()
conn.close()
# Goals
def fetch_goals():
conn = get_connection()
cursor = conn.cursor()
cursor.execute(
"SELECT id, name, category, target, date, current FROM goals")
rows = cursor.fetchall()
conn.close()
goals = []
for row in rows:
goals.append({
"id": row[0],
"name": row[1],
"category": row[2],
"target": row[3],
"date": row[4],
"current": row[5]
})
return goals
def insert_goal(name, category, target, date, current):
conn = get_connection()
cursor = conn.cursor()
cursor.execute("""
INSERT INTO goals (name, category, target, date, current)
VALUES (?, ?, ?, ?, ?)
""", (name, category, target, date, current))
conn.commit()
conn.close()
def update_goal_current(goal_id, new_current):
conn = get_connection()
cursor = conn.cursor()
cursor.execute("""
UPDATE goals
SET current = ?
WHERE id = ?
""", (new_current, goal_id))
conn.commit()
conn.close()
def delete_goal(goal_id):
conn = get_connection()
cursor = conn.cursor()
cursor.execute("DELETE FROM goals WHERE id = ?", (goal_id,))
conn.commit()
conn.close()
# Savings History
def fetch_savings_history():
conn = get_connection()
cursor = conn.cursor()
cursor.execute("SELECT id, date, amount FROM savings_history")
rows = cursor.fetchall()
conn.close()
history = []
for row in rows:
history.append({
"id": row[0],
"date": row[1],
"amount": row[2]
})
return history
def insert_savings_entry(date_str, amount):
conn = get_connection()
cursor = conn.cursor()
cursor.execute("""
INSERT INTO savings_history (date, amount)
VALUES (?, ?)
""", (date_str, amount))
conn.commit()
conn.close()
# Watchlist
def fetch_watchlist():
conn = get_connection()
cursor = conn.cursor()
cursor.execute("SELECT id, ticker, quantity FROM watchlist")
rows = cursor.fetchall()
conn.close()
watchlist = []
for row in rows:
watchlist.append({"id": row[0], "ticker": row[1], "quantity": row[2]})
return watchlist
def insert_watchlist_ticker(ticker, quantity=0):
conn = get_connection()
cursor = conn.cursor()
cursor.execute("""
INSERT INTO watchlist (ticker, quantity)
VALUES (?, ?)
""", (ticker, quantity))
conn.commit()
conn.close()
def delete_watchlist_ticker(ticker_id):
conn = get_connection()
cursor = conn.cursor()
cursor.execute("DELETE FROM watchlist WHERE id = ?", (ticker_id,))
conn.commit()
conn.close()
# Function to identify and group expense transactions
def identify_and_group_expenses(transactions):
"""Identify and group expense transactions based on description keywords.
Parameters:
transactions (list of dict): List of transaction records, each containing at least a 'description' key.
Returns:
dict: A dictionary where keys are expense categories and values are lists of transactions that belong to each category.
"""
# Define income keywords for identifying income-related transactions
income_keywords = ["direct deposit", "deposit", "refund", "rebate"]
# Define expense keywords for various categories
expense_keywords = {
"Food": ["restaurant", "cafe", "lunch", "breakfast", "dinner"],
"Transport": ["uber", "taxi", "bus", "train", "subway"],
"Utilities": ["electricity", "water", "gas", "internet", "phone"],
"Entertainment": ["movie", "concert", "game", "netflix"],
"Groceries": ["supermarket", "grocery", "market"]
}
# Initialize grouped dictionary with an Income category and expense categories
grouped = {"Income": []}
for category in expense_keywords:
grouped[category] = []
grouped["Other"] = []
for txn in transactions:
description = txn.get("description", "").lower()
# First, check if the transaction looks like income
if any(income_kw in description for income_kw in income_keywords):
grouped["Income"].append(txn)
continue
# Next, check for expense categories
assigned = False
for category, keywords in expense_keywords.items():
if any(keyword in description for keyword in keywords):
grouped[category].append(txn)
assigned = True
break
if not assigned:
grouped["Other"].append(txn)
return grouped
# -------------------------------------------------
# 2) Debt Payoff Calculation (unchanged)
# -------------------------------------------------
def calculate_payoff(debts, extra_payment, strategy="snowball"):
"""Calculate debt payoff timeline and returns detailed payment information.
'debts' is a list of dicts with keys: name, amount, rate, min_payment
"""
debts = [d.copy() for d in debts]
total_interest = 0
months = 0
monthly_balance = []
payment_schedule = []
milestones = []
# Get initial total debt
initial_total = sum(d["amount"] for d in debts)
# If there's no debt, return early
if initial_total == 0:
return {
"months": 0,
"total_interest": 0,
"monthly_balance": [0],
"payment_schedule": [],
"milestones": []
}
# Sort debts based on strategy
if strategy == "snowball":
debts.sort(key=lambda x: (x["amount"], -x["rate"]))
elif strategy == "avalanche":
debts.sort(key=lambda x: (-x["rate"], x["amount"]))
elif strategy == "hybrid":
debts.sort(key=lambda x: (-1 if x["rate"] > 20 else 1, x["amount"]))
while any(debt["amount"] > 0 for debt in debts):
months += 1
current_balance = 0
month_payments = []
remaining_extra = extra_payment
# Calculate interest and minimum payments first
for debt in debts:
if debt["amount"] <= 0:
continue
# Calculate monthly interest
monthly_rate = debt["rate"] / 100 / 12
interest = debt["amount"] * monthly_rate
debt["amount"] += interest
total_interest += interest
# Apply minimum payment
payment = min(debt["min_payment"], debt["amount"])
debt["amount"] -= payment
remaining_extra = remaining_extra - \
(payment - debt["min_payment"]
) if payment > debt["min_payment"] else remaining_extra
current_balance += debt["amount"]
# Apply extra payment to highest priority debt
for debt in debts:
if debt["amount"] <= 0 or remaining_extra <= 0:
continue
extra = min(remaining_extra, debt["amount"])
debt["amount"] -= extra
remaining_extra -= extra
# Record payment details (basic)
month_payments.append({
"name": debt["name"],
"payment": extra,
})
current_balance = sum(d["amount"] for d in debts if d["amount"] > 0)
monthly_balance.append(current_balance)
payment_schedule.append(month_payments)
# Calculate milestones only if initial_total is not zero
if initial_total > 0:
progress = ((initial_total - current_balance) /
initial_total) * 100
if progress >= 25 and not any(m["percentage"] == 25 for m in milestones):
milestones.append({"percentage": 25, "month": months})
if progress >= 50 and not any(m["percentage"] == 50 for m in milestones):
milestones.append({"percentage": 50, "month": months})
if progress >= 75 and not any(m["percentage"] == 75 for m in milestones):
milestones.append({"percentage": 75, "month": months})
return {
"months": months,
"total_interest": total_interest,
"monthly_balance": monthly_balance,
"payment_schedule": payment_schedule,
"milestones": milestones
}
# -------------------------------------------------
# 3) Initialize DB + Streamlit Config
# -------------------------------------------------
initialize_db() # <-- Make sure to call once at app start to create tables
st.set_page_config(
page_title="Warren - Personal Wealth Companion",
layout="wide",
page_icon="🤑"
)
# -------------------------------------------------
# Custom CSS / JS
# -------------------------------------------------
st.markdown(r"""
<style>
/* ... Your existing CSS ... */
</style>
""", unsafe_allow_html=True)
# -------------------------------------------------
# Helper to Save Data Locally (if you still want CSV)
# -------------------------------------------------
def save_data(data, file_path):
data.to_csv(file_path, index=False)
# -------------------------------------------------
# Sidebar Navigation State
# -------------------------------------------------
if "selected_panel" not in st.session_state:
st.session_state.selected_panel = "Welcome"
def change_panel(panel_name):
st.session_state.selected_panel = panel_name
# -------------------------------------------------
# Sidebar Layout
# -------------------------------------------------
st.sidebar.title("🤑 Warren")
st.sidebar.write("No subscriptions. No data hacks. Just results.")
st.sidebar.markdown("---")
st.sidebar.title("📖 Getting Started")
st.sidebar.button("🔑 Introduction", on_click=lambda: change_panel("Welcome"))
st.sidebar.button("📖 How It Works",
on_click=lambda: change_panel("How It Works"))
st.sidebar.markdown("---")
st.sidebar.button("📋 Overview", on_click=lambda: change_panel(
"Overview"), key="overview_button")
st.sidebar.markdown("---")
st.sidebar.title("🛠️ Financial Tools")
st.sidebar.button("💼 Income Sources",
on_click=lambda: change_panel("Income Sources"))
st.sidebar.button("💸 Expense Breakdown",
on_click=lambda: change_panel("Expense Breakdown"))
st.sidebar.button("📊 Cash Flow Summary",
on_click=lambda: change_panel("Cash Flow Summary"))
st.sidebar.button(
"💹 Investments", on_click=lambda: change_panel("Investments"))
st.sidebar.button("📉 Debt Reduction",
on_click=lambda: change_panel("Debt Reduction"))
st.sidebar.button("🗺️ Wealth Roadmap",
on_click=lambda: change_panel("Wealth Roadmap"))
st.sidebar.button("📝 Feedback", on_click=lambda: change_panel("Feedback"))
st.sidebar.button("🧪 Run Basic Tests",
on_click=lambda: change_panel("Basic Unit Tests"))
# -------------------------------------------------
# Sidebar Quote Section
# -------------------------------------------------
st.sidebar.markdown("---")
st.sidebar.title("💡 Wealth Wisdom")
quotes = [
"The stock market is filled with individuals who know the price of everything but the value of nothing. – Philip Fisher",
"It's not whether you're right or wrong that's important, but how much money you make when you're right and how much you lose when you're wrong. – George Soros",
"The goal of a successful investor is to make money consistently, not to be right all the time. – Benjamin Graham",
"Do not save what is left after spending, but spend what is left after saving. – Warren Buffett",
"The more your money works for you, the less you have to work for money. – Idowu Koyenikan",
"An investment in knowledge pays the best interest. – Benjamin Franklin",
"The stock market is designed to transfer money from the Active to the Patient. – Warren Buffett",
"The four most dangerous words in investing are: 'this time it's different.' – Sir John Templeton",
"In investing, what is comfortable is rarely profitable. – Robert Arnott",
"The individual investor should act consistently as an investor and not as a speculator. – Ben Graham",
"The secret to investing is to figure out the value of something – and then pay a lot less. – Joel Greenblatt",
"The best investment you can make is in yourself. – Warren Buffett",
"Risk comes from not knowing what you're doing. – Warren Buffett",
"The most important quality for an investor is temperament, not intellect. – Warren Buffett",
"Investing should be more like watching paint dry or watching grass grow. If you want excitement, take $800 and go to Las Vegas. – Paul Samuelson",
"The stock market is a device for transferring money from the impatient to the patient. – Warren Buffett",
"Price is what you pay. Value is what you get. – Warren Buffett",
"Know what you own, and know why you own it. – Peter Lynch",
"The biggest risk of all is not taking one. – Mellody Hobson",
"The only way to do great work is to love what you do. – Steve Jobs",
"The best way to predict the future is to create it. – Peter Drucker",
"The real measure of your wealth is how much you'd be worth if you lost all your money. – Anonymous",
"The goal of retirement is to live off your assets-not on them. – Frank Eberhart",
"The stock market is a giant distraction to the business of investing. – John Bogle",
"The best time to plant a tree was 20 years ago. The second best time is now. – Chinese Proverb",
"The intelligent investor is a realist who sells to optimists and buys from pessimists. – Benjamin Graham",
"In the short run, the market is a voting machine but in the long run, it is a weighing machine. – Benjamin Graham",
"The stock market is filled with individuals who know the price of everything, but the value of nothing. – Philip Fisher",
"The four most dangerous words in investing are: 'this time it's different.' – Sir John Templeton",
"The individual investor should act consistently as an investor and not as a speculator. – Ben Graham",
"The secret to investing is to figure out the value of something – and then pay a lot less. – Joel Greenblatt",
"The best investment you can make is in yourself. – Warren Buffett",
"Risk comes from not knowing what you're doing. – Warren Buffett",
"The most important quality for an investor is temperament, not intellect. – Warren Buffett",
"Investing should be more like watching paint dry or watching grass grow. If you want excitement, take $800 and go to Las Vegas. – Paul Samuelson",
"The stock market is a device for transferring money from the impatient to the patient. – Warren Buffett",
"Price is what you pay. Value is what you get. – Warren Buffett",
"Know what you own, and know why you own it. – Peter Lynch",
"The biggest risk of all is not taking one. – Mellody Hobson",
"The only way to do great work is to love what you do. – Steve Jobs",
"The best way to predict the future is to create it. – Peter Drucker",
"The real measure of your wealth is how much you'd be worth if you lost all your money. – Anonymous",
"The goal of retirement is to live off your assets-not on them. – Frank Eberhart",
"The stock market is a giant distraction to the business of investing. – John Bogle",
"The best time to plant a tree was 20 years ago. The second best time is now. – Chinese Proverb"
]
selected_quote = random.choice(quotes)
st.sidebar.markdown(f"> *{selected_quote}*")
# -------------------------------------------------
# Page Panels
# -------------------------------------------------
# Initialize session state variables for Overview panel
if "financial_data" not in st.session_state:
st.session_state.financial_data = {
"cash_on_hand": 5000.00,
"investments_value": 20000.00,
"total_debt": 10000.00,
"monthly_income": 4000.00,
"monthly_expenses": 3000.00,
"savings_goal": 1000.00,
"current_savings": 300.00,
"emergency_fund_target": 10000.00,
"emergency_fund_balance": 2500.00,
"upcoming_bills": [
{"name": "Credit Card Payment", "amount": 150.00,
"due_date": datetime.now() + timedelta(days=3)},
{"name": "Utility Bill", "amount": 120.00,
"due_date": datetime.now() + timedelta(days=7)},
],
"top_investments": [
{"name": "AAPL", "value": 5000.00, "change": 5.2},
{"name": "MSFT", "value": 4000.00, "change": 3.8},
{"name": "GOOGL", "value": 3000.00, "change": 2.7},
],
"portfolio_growth": 8.0, # Monthly portfolio growth percentage
"benchmark_growth": 6.0, # Monthly benchmark growth percentage
"spending_breakdown": {
"Housing": 1500.00,
"Transportation": 500.00,
"Food": 800.00,
"Utilities": 200.00,
"Entertainment": 300.00,
"Groceries": 600.00,
},
"income_expense_trends": {
"months": ["Jan", "Feb", "Mar", "Apr", "May", "Jun"],
"income": [4000.00, 4200.00, 3800.00, 4100.00, 4000.00, 4200.00],
"expenses": [3000.00, 3100.00, 3200.00, 3000.00, 3100.00, 3200.00],
}
}
# ----------------- Welcome -----------------
if 'selected_panel' not in st.session_state:
st.session_state.selected_panel = "Welcome"
# ----------------- Overview -----------------
if st.session_state.selected_panel == "Overview":
# Extract data from the 'data' dictionary or other sources
cash_on_hand = st.session_state.financial_data.get('cash_on_hand', 0)
total_debt = st.session_state.financial_data.get('total_debt', 0)
investments_value = st.session_state.financial_data.get(
'investments_value', 0)
monthly_income = st.session_state.financial_data.get('monthly_income', 0)
upcoming_bills = st.session_state.financial_data.get('upcoming_bills', [])
savings_rate = st.session_state.financial_data.get('savings_rate', 0)
# Start building the Overview Tab
st.title("🧾 Overview")
# Create three columns
col1, col2, col3 = st.columns(3)
# Column 1: Financial Summary and Key Metrics
with col1:
st.subheader("📊 Financial Summary")
financial_summary = [
("Cash on Hand", f"${cash_on_hand:,.2f}"),
("Total Debt", f"${total_debt:,.2f}"),
("Investments", f"${investments_value:,.2f}")
]
for label, value in financial_summary:
st.metric(label=label, value=value)
st.markdown("---") # Divider
st.subheader("📈 Key Metrics and Progress")
key_metrics = [
("Monthly Income", f"${monthly_income:,.2f}"),
("Savings Rate", f"{savings_rate:.2%}"),
("Debt-to-Income Ratio",
f"{(total_debt / max(monthly_income, 1)):.2%}")
]
for label, value in key_metrics:
st.metric(label=label, value=value)
# Column 2: Key Charts
with col2:
st.subheader("📊 Key Charts")
# Spending Breakdown Pie Chart
st.subheader("Spending Breakdown")
spending_data = st.session_state.financial_data.get(
'spending_breakdown', {})
if spending_data:
pie_chart_data = {
"Categories": list(spending_data.keys()),
"Values": list(spending_data.values())
}
fig = go.Figure(data=[go.Pie(
labels=pie_chart_data["Categories"], values=pie_chart_data["Values"], hole=.4)])
fig.update_layout(title="Spending Breakdown")
st.plotly_chart(fig, use_container_width=True)
st.markdown("---") # Divider
# Monthly Income vs. Expenses Bar Chart
st.subheader("Income vs. Expenses")
income_expense_data = st.session_state.financial_data.get(
'income_expense_trends', {})
if income_expense_data:
fig = go.Figure()
fig.add_trace(go.Bar(
x=income_expense_data['months'], y=income_expense_data['income'], name='Income'))
fig.add_trace(go.Bar(
x=income_expense_data['months'], y=income_expense_data['expenses'], name='Expenses'))
fig.update_layout(barmode='group', title="Monthly Income vs. Expenses",
xaxis_title="Month", yaxis_title="Amount ($)")
st.plotly_chart(fig, use_container_width=True)
# Column 3: Upcoming Events and Alerts
with col3:
st.subheader("📅 Upcoming Alerts and Events")
upcoming_events = st.session_state.financial_data.get(
'upcoming_events', [])
if upcoming_events:
for event in upcoming_events:
st.info(event)
else:
st.success("No upcoming events! You're all set.")
# Example of adding specific events
paydays = st.session_state.financial_data.get('paydays', [])
recurring_bills = st.session_state.financial_data.get(
'recurring_bills', [])
tax_filings = st.session_state.financial_data.get('tax_filings', [])
# Display paydays
if paydays:
st.subheader("💰 Paydays")
for payday in paydays:
st.info(f"Payday on {payday}")
# Display recurring bills
if recurring_bills:
st.subheader("💸 Recurring Bills")
for bill in recurring_bills:
st.info(f"Bill due on {bill}")
# Display tax filings
if tax_filings:
st.subheader("📝 Tax Filings")
for filing in tax_filings:
st.info(f"Tax filing due on {filing}")
# ----------------- Welcome -----------------
if st.session_state.selected_panel == "Welcome":
st.markdown("<h1 style='font-size:32px;'>🤑 Welcome to Warren!</h1>",
unsafe_allow_html=True)
st.write("Warren is a privacy-focused personal finance dashboard inspired by Warren Buffett's principles.")
st.markdown("### 🔑 What is Warren?")
st.markdown("""
* A personal wealth building companion.
* Designed to give you **full control over your financial data**.
* No cloud storage or external data syncing – **100% local data privacy**.
""")
st.markdown("### 🛡️ Privacy-Centric Philosophy")
st.markdown("""
* **Offline Data Management:** All financial data is stored and processed locally on your device.
* **No Third-Party Integration:** No automatic bank API connections to ensure maximum data security.
* **Transparency First:** You control the data visibility and sharing.
""")
st.markdown("### 📈 Inspired by Warren Buffett's Investment Principles")
st.markdown("""
* **Data-Driven Decisions:** Clear, objective insights from your financial data.
* **Long-Term Wealth Building:** Emphasis on **sustainable** financial growth.
* **Simplicity & Transparency:** Clear visuals and reporting for easier decision-making.
""")
st.markdown("""
Use the sidebar to explore features like **Expense Breakdown**, **Cash Flow Summary**, and **Wealth Roadmap**.
""")
# ----------------- How It Works -----------------
if st.session_state.selected_panel == "How It Works":
st.markdown("<h1 style='font-size:32px;'>📖 How It Works</h1>",
unsafe_allow_html=True)
st.markdown(
"""
<p><strong>Warren</strong> is designed to help you manage your finances with clarity and control. Follow these steps to get started:</p>
<h3>Step 1: Set Up Your Data</h3>
📤 <strong>Import Your Data:</strong> Upload CSV or Excel files containing your income, expenses, and budget data below.<br>
📝 <strong>Manual Entry:</strong> Alternatively, you can manually enter data in the <strong>Income Sources</strong> and <strong>Expense Breakdown</strong> sections.<br>
🛡️ <strong>Data Privacy:</strong> All data is stored locally on your device, with no external services used for syncing.<br>
""",
unsafe_allow_html=True
)
# CSV/Excel upload
uploaded_file = st.file_uploader(
"📂 Upload your CSV or Excel file here:", type=["csv", "xlsx"])
if uploaded_file:
st.success("✅ File uploaded successfully!")
# You might parse the file here and insert rows into the DB:
# df = pd.read_csv(uploaded_file) or pd.read_excel(uploaded_file)
# for row in df.itertuples():
# insert_income(...), etc.
st.markdown(
"""
<h3>Step 2: Explore Your Finances</h3>
📊 <strong>Income Sources:</strong> Track all your streams of income.<br>
💸 <strong>Expense Breakdown:</strong> Record and categorize your expenses.<br>
📈 <strong>Cash Flow Summary:</strong> Visualize your financial health with charts and summaries.<br>
📅 <strong>Wealth Roadmap:</strong> Set goals and monitor your progress toward financial milestones.<br>
<h3>Step 3: Stay in Control</h3>
🧩 <strong>Export Your Data:</strong> Download your financial records for backup or external analysis.<br>
🔒 <strong>Privacy Matters:</strong> No data leaves your device. <strong>You own your financial insights.</strong><br>
""",
unsafe_allow_html=True
)
# ----------------- Income Sources -----------------
if st.session_state.selected_panel == "Income Sources":
st.markdown("<h1 style='font-size:32px;'>💰 Income Sources</h1>",
unsafe_allow_html=True)
col1, col2 = st.columns(2)
with col1:
st.subheader("Add a New Income Source")
with st.form("add_income_form_tab1"):
source = st.text_input("Income Source Name:")
amount = st.number_input(
"Income Amount ($)", min_value=0.01, step=0.01)
frequency = st.selectbox(
"Frequency", ["Weekly", "Monthly", "Quarterly", "Annually"])
submitted = st.form_submit_button("Add Income Source")
if submitted and source:
insert_income(source, amount, frequency)
st.success(
f"Added income source: {source} (${amount} - {frequency})")
st.rerun()
with col2:
st.subheader("Income Breakdown")
# Fetch from DB
incomes = fetch_income()
if incomes:
# Convert to DataFrame for chart
df_income = pd.DataFrame(incomes)
fig_income = px.pie(df_income, names="Source", values="Amount")
st.plotly_chart(fig_income)
total_income = df_income["Amount"].sum()
st.write(f"**Total Income Sources:** {len(incomes)}")
st.write(
f"**Projected Annual Income (rough):** ${total_income * 12:,.2f}")
else:
st.info("No income sources found. Add one above.")
st.subheader("Income Insights")
# Optional date range or preset controls
date_range = st.date_input("Select Date Range:", [])
time_preset = st.selectbox("Time Preset:", ["Daily", "Weekly", "Monthly"])
# ----------------- Expense Breakdown -----------------
if st.session_state.selected_panel == "Expense Breakdown":
st.markdown("<h1 style='font-size:32px;'>💸 Expense Breakdown</h1>",
unsafe_allow_html=True)
col1, col2 = st.columns(2)
with col1:
st.subheader("Add an Expense")
# Create container for dynamic fields
with st.form("add_expense_form_tab2"):
category = st.text_input("Expense Category:")
amount = st.number_input(
"Expense Amount ($)", min_value=0.01, step=0.01)
is_recurring = st.checkbox("Recurring Expense")
start_date = None
end_date = None
frequency = None
single_date = None
if is_recurring:
col_start, col_end = st.columns(2)
with col_start:
start_date = st.date_input("Start Date")
with col_end:
end_date = st.date_input("End Date")
frequency = st.selectbox(
"Frequency", ["Daily", "Weekly", "Bi-weekly", "Monthly", "Quarterly", "Annually"])
else:
single_date = st.date_input("Expense Date")
submitted = st.form_submit_button("Add Expense")
if submitted and category:
insert_expense(
category,
amount,
is_recurring,
str(start_date) if start_date else "",
str(end_date) if end_date else "",
frequency if frequency else "",
str(single_date) if single_date else ""
)
st.success(f"Added expense to category: {category} (${amount})")
st.rerun()
with col2:
st.subheader("Expense Breakdown Chart")
expenses = fetch_expenses()
if expenses:
df_exp = pd.DataFrame(expenses)
# Basic grouping by category
grouped = df_exp.groupby("category")["amount"].sum().reset_index()
fig_expense = px.pie(grouped, names="category", values="amount")
st.plotly_chart(fig_expense)
else:
st.info("No expenses found. Add one above.")
col3, col4 = st.columns(2)
with col3:
st.subheader("Insights & Filters")
date_range = st.date_input("Select Date Range:", [])
time_preset = st.selectbox(
"Time Preset:", ["Daily", "Weekly", "Monthly"])
st.write("Use the filters to customize your insights.")
with col4:
# Just an example summary
st.markdown(
"""
<div style="text-align: center;">
<h3>Expense Summary</h3>
</div>
""", unsafe_allow_html=True
)
if expenses:
total_exp = sum(e["amount"] for e in expenses)
st.write(f"**Total Recorded Expenses:** {len(expenses)}")
st.write(
f"**Projected Annual Expenses (rough):** ${total_exp * 12:,.2f}")
else:
st.write("No expense data yet.")
# Add bank statement processing section
with st.expander("Import Bank Statement"):
bank_options = ["Chase", "Bank of America",
"Wells Fargo", "Citibank", "Other"]
selected_bank = st.selectbox("Select Your Bank", bank_options)
uploaded_file = st.file_uploader(
"Upload Bank Statement (PDF)", type="pdf")
if uploaded_file and st.button("Process Statement"):
with st.spinner("Processing bank statement..."):
transactions = process_bank_statement(
uploaded_file, selected_bank)
if transactions:
st.write("### Detected Transactions")
df_trans = pd.DataFrame(transactions)
# Allow category editing
edited_df = st.data_editor(
df_trans,
column_config={
"category": st.column_config.SelectboxColumn(
"Category",
options=[
"Housing", "Transportation", "Food", "Utilities",
"Insurance", "Healthcare", "Savings", "Personal",
"Entertainment", "Shopping", "Income", "Other"
]
)
}
)
if st.button("Save Transactions"):
for _, row in edited_df.iterrows():
if row["type"] == "expense":
insert_expense(
category=row["category"],
amount=abs(float(row["amount"])),
is_recurring=False,
start_date=None,
end_date=None,
frequency=None,
single_date=row["date"]
)
elif row["type"] == "income":
insert_income(
source=row["description"],
amount=float(row["amount"]),
frequency="One-time"
)
st.success("Transactions imported successfully!")
st.rerun()
# Add custom categorization rules
with st.expander("Manage Transaction Categories"):
st.write("### Custom Category Rules")
# Add new rule
col1, col2, col3 = st.columns(3)
with col1:
rule_bank = st.selectbox("Bank", bank_options, key="new_rule_bank")
with col2:
rule_keyword = st.text_input("Transaction Keyword")
with col3:
rule_category = st.selectbox("Category", [
"Housing", "Transportation", "Food", "Utilities",
"Insurance", "Healthcare", "Savings", "Personal",
"Entertainment", "Shopping", "Income", "Other"
])
if st.button("Add Rule"):
conn = get_connection()
cursor = conn.cursor()
cursor.execute("""
INSERT INTO bank_keywords (bank_name, keyword, category)
VALUES (?, ?, ?)
""", (rule_bank, rule_keyword, rule_category))
conn.commit()
conn.close()
st.success("Rule added successfully!")
st.rerun()
# Show existing rules
conn = get_connection()
cursor = conn.cursor()
cursor.execute(
"SELECT id, bank_name, keyword, category FROM bank_keywords")
rules = cursor.fetchall()
conn.close()
if rules:
rules_df = pd.DataFrame(
rules, columns=["id", "Bank", "Keyword", "Category"])
edited_rules = st.data_editor(rules_df)
if st.button("Update Rules"):
# Update rules in database
conn = get_connection()
cursor = conn.cursor()
for _, row in edited_rules.iterrows():
cursor.execute("""
UPDATE bank_keywords
SET bank_name = ?, keyword = ?, category = ?
WHERE id = ?
""", (row["Bank"], row["Keyword"], row["Category"], row["id"]))
conn.commit()
conn.close()
st.success("Rules updated successfully!")
# ----------------- Cash Flow Summary -----------------
if st.session_state.selected_panel == "Cash Flow Summary":
st.markdown("<h1 style='font-size:32px;'>📊 Cash Flow Summary</h1>",
unsafe_allow_html=True)
st.subheader("Customize Your Insights")
date_range = st.date_input("Select Date Range:", [])
time_interval = st.radio("Select Time Interval:",
("Daily", "Weekly", "Monthly"))
# Quick sample data for demonstration
data = {
"Month": ["Jan", "Feb", "Mar"],
"Income": [5000, 5200, 4800],
"Expenses": [3000, 3100, 3200],
}
df = pd.DataFrame(data)
df["Net Cash Flow"] = df["Income"] - df["Expenses"]
col1, col2, col3 = st.columns(3)
with col1:
st.subheader("Income vs. Expense Breakdown")
pie_data = pd.DataFrame({"Type": ["Income", "Expenses"], "Amount": [
df["Income"].sum(), df["Expenses"].sum()]})
fig_pie = px.pie(pie_data, names="Type", values="Amount", height=300)
st.plotly_chart(fig_pie, use_container_width=True)
with col2:
st.subheader("Expense Categories Breakdown")
fig_bar = px.bar(df, x="Month", y="Expenses",
text="Expenses", height=300)
st.plotly_chart(fig_bar, use_container_width=True)
with col3:
st.subheader("Monthly Net Cash Flow")
fig_bar_net = px.bar(
df, x="Month", y=["Income", "Expenses", "Net Cash Flow"], barmode="group", height=300)
st.plotly_chart(fig_bar_net, use_container_width=True)
# ----------------- Investments (Includes Watchlist) -----------------
def fetch_data(tickers, start_date, end_date, time_increment):
data = {}
# Instead of using specific dates, use relative time periods
# This is more reliable with yfinance
if (end_date - start_date).days <= 30:
period = "1mo"
elif (end_date - start_date).days <= 90:
period = "3mo"
elif (end_date - start_date).days <= 180:
period = "6mo"
elif (end_date - start_date).days <= 365:
period = "1y"
else:
period = "2y"
for ticker in tickers:
try:
stock = yf.Ticker(ticker)
# First verify the ticker is valid
try:
info = stock.info
except Exception as e:
st.warning(f"Could not verify ticker {ticker}: {str(e)}")
continue
# Get historical data using period instead of specific dates
hist = stock.history(period=period, interval=time_increment)
if not hist.empty:
# Add calculated fields needed for plotting
hist = hist.copy()
hist['Daily_Return'] = hist['Close'].pct_change()
hist['SMA_50'] = hist['Close'].rolling(window=50).mean()
hist['ROI'] = (hist['Close'] / hist['Close'].iloc[0] - 1) * 100
data[ticker] = hist
else:
st.warning(f"No data available for {ticker}")
except Exception as e:
st.warning(f"Error fetching data for {ticker}: {str(e)}")
continue
return data
def calculate_performance_metrics(data, ticker):
df = data[ticker]
latest = df.iloc[-1]
earliest = df.iloc[0]
metrics = {
'Daily_Change': float((latest['Close'] / df.iloc[-2]['Close'] - 1) * 100),
'Total_ROI': float((latest['Close'] / earliest['Close'] - 1) * 100),
'Highest_Price': float(df['High'].max()),
'Lowest_Price': float(df['Low'].min()),
'Current_Price': float(latest['Close'])
}
return metrics
def plot_combined_data(data, title, show_sma=False, benchmark_compare=False):
if not data:
return None
# Create figure
fig = go.Figure()
# Add traces for each ticker
for ticker, df in data.items():
if df is None or df.empty:
continue
try:
# Add price line
fig.add_trace(
go.Scatter(
x=df.index,
y=df['Close'],
name=ticker,
mode='lines'
)
)
# Add SMA if requested
if show_sma and 'SMA_50' in df.columns:
fig.add_trace(
go.Scatter(
x=df.index,
y=df['SMA_50'],
name=f"{ticker} 50-day SMA",
line=dict(dash='dash'),
opacity=0.7
)
)
except Exception as e:
st.warning(f"Error plotting {ticker}: {str(e)}")
continue
# Update layout
fig.update_layout(
title=title,
xaxis_title="Date",
yaxis_title="Price",
hovermode='x unified',
showlegend=True,
template="plotly_dark",
height=600,
margin=dict(t=30, b=30) # Reduce margins
)
# Add range slider
fig.update_xaxes(rangeslider_visible=True)
return fig
def plot_individual_data(data, ticker, show_sma=False):
if ticker not in data or data[ticker] is None or data[ticker].empty:
return None
df = data[ticker]
# Create figure
fig = go.Figure()
try:
# Add price line
fig.add_trace(
go.Scatter(
x=df.index,
y=df['Close'],
name=ticker,
mode='lines'
)
)
# Add SMA if requested
if show_sma and 'SMA_50' in df.columns:
fig.add_trace(
go.Scatter(
x=df.index,
y=df['SMA_50'],
name="50-day SMA",
line=dict(dash='dash'),
opacity=0.7
)
)
# Update layout
fig.update_layout(
title=f"{ticker} Price History",
xaxis_title="Date",
yaxis_title="Price",
hovermode='x unified',
showlegend=True,
template="plotly_dark",
height=500,
margin=dict(t=30, b=30) # Reduce margins
)
# Add range slider but make it smaller
fig.update_xaxes(rangeslider=dict(visible=True, thickness=0.05))
return fig
except Exception as e:
st.warning(f"Error creating plot for {ticker}: {str(e)}")
return None
if st.session_state.selected_panel == "Investments":
st.markdown("<h1 style='font-size:32px;'>💹 Investments</h1>",
unsafe_allow_html=True)
# Initialize session state variables if they don't exist
if 'watchlist' not in st.session_state:
st.session_state.watchlist = []
# Watchlist from DB
watchlist_entries = fetch_watchlist()
watchlist_tickers = [w["ticker"] for w in watchlist_entries]
with st.expander("Manage Watchlist", expanded=False):
col1, col2 = st.columns([2, 1])
with col1:
# Show current watchlist
if watchlist_tickers:
st.write("Current Watchlist:")
for ticker in watchlist_tickers:
st.code(ticker, language=None)
else:
st.info("Your watchlist is empty. Add tickers below.")
with col2:
# Add to watchlist
new_ticker = st.text_input(
"Add Ticker", key="new_ticker").strip().upper()
new_quantity = st.number_input(
"Quantity", min_value=0, step=1, key="new_quantity")
if st.button("Add to Watchlist"):
if new_ticker:
if new_ticker not in watchlist_tickers:
insert_watchlist_ticker(new_ticker, new_quantity)
st.success(f"Added {new_ticker} to watchlist!")
st.rerun()
else:
st.warning(
f"{new_ticker} is already in your watchlist.")
else:
st.warning("Please enter a ticker symbol.")
# Remove from watchlist
if watchlist_tickers:
ticker_to_remove = st.selectbox(
"Remove Ticker", [""] + watchlist_tickers, key="remove_ticker")
if st.button("Remove from Watchlist"):
if ticker_to_remove:
delete_watchlist_ticker(
watchlist_entries[watchlist_tickers.index(ticker_to_remove)]["id"])
st.success(
f"Removed {ticker_to_remove} from watchlist!")
st.rerun()
else:
st.warning("Please select a ticker to remove.")
st.write("") # Add spacing
# Date range selection with elegant validation
current_market_date = pd.Timestamp(
'2024-01-29') # Using last known market date
max_lookback = current_market_date - pd.DateOffset(years=5)
# Initialize default dates if not in session state
if 'investment_start_date' not in st.session_state:
st.session_state.investment_start_date = current_market_date - \
pd.DateOffset(months=6)
if 'investment_end_date' not in st.session_state:
st.session_state.investment_end_date = current_market_date
# Create three columns for date inputs and time interval
col1, col2, col3 = st.columns(3)
with col1:
start_date = st.date_input(
"Start Date",
value=st.session_state.investment_start_date,
min_value=max_lookback.date(),
max_value=current_market_date.date(),
help="Select a start date (up to 5 years ago)"
)
with col2:
end_date = st.date_input(
"End Date",
value=st.session_state.investment_end_date,
min_value=start_date,
max_value=current_market_date.date(),
help="Select an end date (up to current market date)"
)
with col3:
time_increment = st.selectbox(
"Time Interval",
options=["1d", "1wk", "1mo"],
help="Select the time interval for data points"
)
# Update session state
st.session_state.investment_start_date = pd.Timestamp(start_date)
st.session_state.investment_end_date = pd.Timestamp(end_date)
# Show date range info if needed
date_range_days = (end_date - start_date).days
if date_range_days > 365 * 2:
st.info(f"📅 Viewing {date_range_days // 7} weeks of data")
elif date_range_days > 90:
st.info(f"📅 Viewing {date_range_days // 30} months and {date_range_days % 30} days of data")
# Investment type selection using tabs
investment_types = ["Stocks", "Forex", "Crypto",
"Commodities", "Markets", "ETFs/Mutual Funds"]
investment_tabs = st.tabs(investment_types)
# Controls in a container above the tabs
with st.container():
col1, col2, col3 = st.columns([1, 1, 2])
with col1:
show_sma = st.checkbox(
"Show 50-day MA", help="Display the 50-day simple moving average on the charts")
with col2:
compare_benchmark = st.checkbox(
"Compare S&P 500", help="Add S&P 500 benchmark comparison")
# Define all ticker groups
ticker_groups = {
"Stocks": ["AAPL", "MSFT", "AMZN", "GOOGL", "NVDA", "TSLA", "META"],
"Forex": ["EURUSD=X", "USDJPY=X", "GBPUSD=X", "AUDUSD=X", "USDCAD=X", "NZDUSD=X", "USDCHF=X"],
"Crypto": ["BTC-USD", "ETH-USD", "USDT-USD", "BNB-USD", "XRP-USD", "ADA-USD", "DOGE-USD"],
"Commodities": ["GLD", "SLV", "USO", "UNG", "PPLT", "COPX", "GDX"],
"Markets": ["^GSPC", "^DJI", "^IXIC", "^RUT", "^TNX", "^FVX", "^TYX"],
"ETFs/Mutual Funds": ["SPY", "VTI", "VOO", "QQQ", "ARKK", "IWM", "XLF"]
}
# Process each investment type in its tab
for idx, investment_type in enumerate(investment_types):
with investment_tabs[idx]:
# Get default tickers for current category
default_tickers = ticker_groups[investment_type]
# Filter watchlist to only include tickers from all categories
watchlist_tickers = [t for t in st.session_state.watchlist if any(
t in tickers for tickers in ticker_groups.values())]
current_watchlist = [
t for t in watchlist_tickers if t in default_tickers]
# Combine category tickers with watchlist
user_tickers = list(set(default_tickers + current_watchlist))
if not user_tickers:
st.info(f"No tickers to display in {investment_type}.")
continue
with st.spinner(f"Loading {investment_type} data..."):
data = fetch_data(user_tickers, start_date,
end_date, time_increment)
if not data:
st.warning(f"No data available for any specified period.")
continue
# Get tickers with valid data
valid_tickers = [ticker for ticker, df in data.items(
) if df is not None and not df.empty]
if not valid_tickers:
st.warning("No data available for any specified period.")
continue
# Performance Summary
st.subheader("Performance Summary")
# Calculate number of columns needed (max 4 per row)
num_tickers = len(valid_tickers)
tickers_per_row = min(4, num_tickers)
num_rows = (num_tickers + tickers_per_row - 1) // tickers_per_row
# Create rows of metrics
for row in range(num_rows):
start_idx = row * tickers_per_row
end_idx = min(start_idx + tickers_per_row, num_tickers)
row_tickers = valid_tickers[start_idx:end_idx]
# Create columns for this row
cols = st.columns(tickers_per_row)
for idx, ticker in enumerate(row_tickers):
try:
df = data[ticker]
latest = df.iloc[-1]
earliest = df.iloc[0]
metrics = {
'Daily_Change': float((latest['Close'] / df.iloc[-2]['Close'] - 1) * 100),
'Total_ROI': float((latest['Close'] / earliest['Close'] - 1) * 100),
'Current_Price': float(latest['Close'])
}
with cols[idx]:
st.metric(
label=ticker,
value=f"${metrics['Current_Price']:.2f}",
delta=f"{metrics['Daily_Change']:+.2f}%"
)
st.caption(f"Total Return: {total_return}")
except Exception as e:
with cols[idx]:
st.warning(f"Error loading {ticker}")
# Add some spacing
st.write("")
# Create plot tabs
plot_tabs = st.tabs(["Combined View"] + valid_tickers)
# Combined plot
with plot_tabs[0]:
try:
if compare_benchmark and investment_type != "Markets":
data["^GSPC"] = fetch_data(
["^GSPC"], start_date, end_date, time_increment)["^GSPC"]
fig = plot_combined_data(
data, f"{investment_type} Overview", show_sma, compare_benchmark)
if fig:
st.plotly_chart(fig, use_container_width=True)
else:
st.warning("Could not create combined plot.")
except Exception as e:
st.error(f"Error creating combined view: {str(e)}")
# Individual plots
for i, ticker in enumerate(valid_tickers, 1):
with plot_tabs[i]:
try:
fig = plot_individual_data(
data, ticker, show_sma)
if fig:
# Main plot
st.plotly_chart(fig, use_container_width=True)
# Statistics in a cleaner layout
df = data[ticker]
col1, col2 = st.columns(2)
with col1:
stats_cols = st.columns(2)
with stats_cols[0]:
st.metric(
"High", f"${df['High'].max():.2f}")
with stats_cols[1]:
st.metric("Low", f"${df['Low'].min():.2f}")
with col2:
stats_cols = st.columns(2)
with stats_cols[0]:
st.metric("Avg Volume", f"{avg_volume}")
st.metric("Avg Volume", f"{df['Volume'].mean():,.0f}")
with stats_cols[1]:
volatility = df['Close'].pct_change(
).std() * 100
st.metric("Volatility", f"{volatility:.2f}%")
else:
st.warning(f"Could not create plot for {ticker}")
except Exception as e:
st.error(f"Error plotting {ticker}: {str(e)}")
# Add spacing between categories
st.write("---")
# ----------------- Debt Reduction -----------------
if st.session_state.selected_panel == "Debt Reduction":
st.markdown("<h1 style='font-size:32px;'>📉 Debt Reduction</h1>",
unsafe_allow_html=True)
debt_col1, debt_col2, debt_col3 = st.columns(3)
# Column 1: Debt Input & Summary
with debt_col1:
st.subheader("💳 Add Debt")
with st.expander("Add New Debt", expanded=True):
debt_name = st.text_input("Debt Name")
debt_amount = st.number_input(
"Amount ($)", min_value=0.0, step=100.0)
debt_min_payment = st.number_input(
"Minimum Monthly Payment ($)", min_value=0.0, step=10.0)
debt_rate = st.number_input(
"Interest Rate (%)", min_value=0.0, max_value=100.0, step=0.1)
debt_category = st.selectbox("Debt Category", [
"Credit Card", "Student Loan", "Mortgage", "Personal Loan", "Auto Loan", "Other"])
debt_due_date = st.date_input("Payment Due Date")
if st.button("Add Debt"):
if debt_name and debt_amount > 0 and debt_min_payment > 0:
insert_debt(debt_name, debt_amount, debt_rate,
debt_min_payment, debt_category, str(debt_due_date))
st.success(f"Added {debt_name} to your debt list!")
st.rerun()
else:
st.error("Please fill in all required fields")
# Show summary from DB
debts = fetch_debts()
if debts:
st.subheader("Debt Summary")
total_debt = sum(d["amount"] for d in debts)
total_min_monthly = sum(d["min_payment"] for d in debts)
st.metric("Total Debt", f"${total_debt:,.2f}")
st.metric("Monthly Minimum", f"${total_min_monthly:,.2f}")
else:
st.info("No debts recorded yet.")
# Column 2: Strategy Explanation & Comparison
with debt_col2:
st.subheader("📊 Payment Strategies")
plan_type = st.selectbox(
"Choose Your Debt Management Strategy",
[
"I want to pay off small debts first (Snowball Method)",
"I want to minimize interest payments (Avalanche Method)",
"I want a balanced approach (Hybrid Method)",
"I'm not sure - help me choose"
],
index=3
)
if plan_type == "I'm not sure - help me choose":
st.info("Let's figure out the best strategy for you!")
motivation_type = st.radio("What motivates you more?", [
"Quick wins and visible progress", "Saving money on interest", "A balance of both"])
financial_discipline = st.radio("How would you describe your financial discipline?", [
"I need to see progress to stay motivated", "I can stick to a long-term plan", "Somewhere in between"])
if motivation_type == "Quick wins and visible progress" or financial_discipline == "I need to see progress to stay motivated":
st.success("💡 Snowball Method might work best for you!")
elif motivation_type == "Saving money on interest" and financial_discipline == "I can stick to a long-term plan":
st.success("💡 Avalanche Method might work best for you!")
else:
st.success("💡 Hybrid Method might be best for you!")
# If we have debts, we can compare strategies
if debts:
extra_payment = st.number_input(
"Additional Monthly Payment ($)", min_value=0.0, step=10.0)
# Compare Snowball vs Avalanche
snowball_result = calculate_payoff(
debts, extra_payment, "snowball")
avalanche_result = calculate_payoff(
debts, extra_payment, "avalanche")
strategy_cols = st.columns(2)
with strategy_cols[0]:
st.write("**🏂 Debt Snowball**")
st.write(
f"Time to pay off: {snowball_result['months']} months")
st.write(
f"Total interest: ${snowball_result['total_interest']:,.2f}")
with strategy_cols[1]:
st.write("**🏔️ Debt Avalanche**")
st.write(
f"Time to pay off: {avalanche_result['months']} months")
st.write(
f"Total interest: ${avalanche_result['total_interest']:,.2f}")
interest_savings = snowball_result['total_interest'] - \
avalanche_result['total_interest']
time_savings = snowball_result['months'] - \
avalanche_result['months']
if interest_savings > 0:
st.success(
f"Using Avalanche saves you ${interest_savings:,.2f} in interest!")
if time_savings > 0:
st.success(
f"You'll also finish {time_savings} months sooner!")
# Plot payoff progress
fig = go.Figure()
fig.add_trace(go.Scatter(
x=list(range(len(snowball_result['monthly_balance']))),
y=snowball_result['monthly_balance'],
name="Snowball Method",
line=dict(color="blue")
))
fig.add_trace(go.Scatter(
x=list(range(len(avalanche_result['monthly_balance']))),
y=avalanche_result['monthly_balance'],
name="Avalanche Method",
line=dict(color="red")
))
fig.update_layout(
title="Debt Balance Over Time",
xaxis_title="Months",
yaxis_title="Remaining Balance ($)",
hovermode="x unified"
)
st.plotly_chart(fig, use_container_width=True)
# Column 3: What-If Analysis
with debt_col3:
st.subheader("🔮 What-If Analysis")
# Demo quick-add
with st.expander("➕ Quick Add Debt"):
if st.button("Add Credit Card ($5,000, 18%)"):
insert_debt("Credit Card", 5000, 18, 150, "Credit Card", "")
st.rerun()
if st.button("Add Student Loan ($15,000, 5%)"):
insert_debt("Student Loan", 15000, 5, 300, "Student Loan", "")
st.rerun()
if st.button("Add Auto Loan ($10,000, 7%)"):
insert_debt("Auto Loan", 10000, 7, 250, "Auto Loan", "")
st.rerun()
if not debts:
st.info("Add debts first to see the What-If Analysis.")
else:
extra_payment = st.slider(
"Additional Monthly Payment ($)", min_value=0.0, max_value=1000.0, step=50.0)
result = calculate_payoff(
debts, extra_payment, strategy="snowball")
st.metric("Debt-Free Timeline", f"{result['months']} months")
st.metric("Total Interest Paid",
f"${result['total_interest']:,.2f}")
fig_whatif = go.Figure()
fig_whatif.add_trace(go.Scatter(
y=result["monthly_balance"],
x=list(range(1, len(result["monthly_balance"]) + 1)),
mode="lines",
name="Debt Balance",
line=dict(color="green")
))
fig_whatif.update_layout(
title="Debt Balance Over Time",
xaxis_title="Months",
yaxis_title="Remaining Balance ($)",
hovermode="x unified"
)
st.plotly_chart(fig_whatif, use_container_width=True)
# ----------------- Wealth Roadmap -----------------
if st.session_state.selected_panel == "Wealth Roadmap":
st.markdown("<h1 style='font-size:32px;'>🗺️ Wealth Roadmap</h1>",
unsafe_allow_html=True)
col1, col2, col3 = st.columns(3)
# Savings Plan (col1)
with col1:
st.subheader("💰 Savings Plan")
savings_goal = st.number_input(
"Savings Goal ($)", min_value=0.01, step=0.01)
freq_options = {"Weekly": 52, "Bi-weekly": 26, "Monthly": 12}
contribution_freq = st.selectbox(
"Contribution Frequency",
list(freq_options.keys())
)
contribution_amount = st.number_input(
f"{contribution_freq} Contribution ($)",
min_value=0.01,
step=0.01
)
# Calculate projections
annual_contribution = contribution_amount * \
freq_options[contribution_freq]
monthly_contribution = annual_contribution / 12
savings_history = fetch_savings_history()
current_savings = sum(entry["amount"] for entry in savings_history)
remaining_amount = savings_goal - current_savings
months_required = 0
if monthly_contribution > 0:
months_required = ceil(
remaining_amount / monthly_contribution) if remaining_amount > 0 else 0
if months_required > 0:
completion_date = datetime.now() + relativedelta(months=months_required)
st.write(
f"Projected Completion: {completion_date.strftime('%B %Y')}")
# Build projection chart
months_list = range(months_required + 1)
projected_savings = [current_savings +
(monthly_contribution * i) for i in months_list]
fig = go.Figure()
fig.add_trace(go.Scatter(
x=list(months_list),
y=projected_savings,
name='Projected Savings',
line=dict(color='blue', dash='dash')
))
fig.add_trace(go.Scatter(
x=[0, months_required],
y=[savings_goal, savings_goal],
name='Savings Goal',
line=dict(color='green', dash='dot')
))
if savings_history:
actual_months = range(len(savings_history))
actual_savings_list = np.cumsum(
[entry["amount"] for entry in savings_history])
fig.add_trace(go.Scatter(
x=list(actual_months),
y=actual_savings_list,
name='Actual Savings',
line=dict(color='red'),
mode='lines+markers'
))
fig.update_layout(
title='Savings Projection vs. Actual',
xaxis_title='Months',
yaxis_title='Amount ($)',
height=300
)
st.plotly_chart(fig, use_container_width=True)
# Financial Goals (col2)
with col2:
st.subheader("🎯 Financial Goals")
goals = fetch_goals()
with st.expander("Add New Goal", expanded=(len(goals) == 0)):
goal_name = st.text_input("Goal Name")
goal_category = st.selectbox("Category", [
"Savings", "Investment", "Debt Repayment", "Emergency Fund", "Education", "Custom"])
goal_amount = st.number_input("Target Amount ($)", min_value=0.01)
goal_date = st.date_input("Target Date")
current_amount = st.number_input(
"Current Amount ($)", min_value=0.0)
if st.button("Add Goal"):
if goal_name and goal_amount > 0:
insert_goal(goal_name, goal_category,
goal_amount, str(goal_date), current_amount)
st.success("Goal added successfully!")
st.rerun()
else:
st.error("Please fill in all required fields")
if goals:
st.write("### Your Goals")
for g in goals:
progress = (g["current"] / g["target"]) * \
100 if g["target"] else 0
days_left = (pd.to_datetime(g["date"]) - datetime.now()).days
# Rough time progress
time_progress = 0
if days_left < 365:
time_progress = max(
0, min(100, (1 - (days_left / 365)) * 100))
st.write(f"**{g['name']}** ({g['category']})")
st.write(
f"Target: ${g['target']:,.2f} | Current: ${g['current']:,.2f}")
st.write(f"Progress: {progress:.1f}%")
st.progress(progress / 100)
st.write("Time Progress:")
st.progress(time_progress / 100)
colA, colB = st.columns([3, 1])
with colA:
new_amount = st.number_input("Update current amount:", value=float(
g["current"]), key=f"update_goal_{g['id']}")
if st.button("Update", key=f"btn_update_goal_{g['id']}"):
update_goal_current(g["id"], new_amount)
st.rerun()
with colB:
if st.button("🗑️", key=f"delete_goal_{g['id']}"):
delete_goal(g["id"])
st.rerun()
st.divider()
# Log new savings
with st.expander("Log Savings"):
new_savings = st.number_input(
"Amount Saved ($)", min_value=0.0, step=0.01)
if st.button("Log Savings"):
insert_savings_entry(str(datetime.now()), new_savings)
st.success("Savings logged successfully!")
st.rerun()
# Scenario Planning (col3)
with col3:
st.subheader("📈 Scenario Planning")
st.markdown("""
<style>
[data-testid="stMetricLabel"] {
font-size: 16px !important;
font-weight: normal !important;
}
[data-testid="stMetricValue"] {
font-size: 16px !important;
font-weight: normal !important;
}
[data-testid="stMetricDelta"] {
font-size: 14px !important;
font-weight: normal !important;
}
</style>
""", unsafe_allow_html=True)
ASSET_CLASSES = {
"Bonds": {"low": 2, "medium": 4, "high": 6},
"Stocks": {"low": 5, "medium": 8, "high": 12},
"Real Estate": {"low": 4, "medium": 7, "high": 10},
"Crypto": {"low": 10, "medium": 20, "high": 30}
}
risk_tolerance = st.select_slider("Risk Tolerance", options=[
"Low", "Medium", "High"], value="Medium")
selected_asset = st.selectbox(
"Asset Class",
list(ASSET_CLASSES.keys())
)
colA, colB = st.columns(2)
with colA:
initial_investment = st.number_input(
"Initial Investment ($)",
min_value=0.01,
step=100.0
)
with colB:
monthly_contribution = st.number_input(
"Monthly Contribution ($)",
min_value=0.0,
step=100.0
)
years = st.slider("Investment Timeline (Years)", 1, 30, 10)
return_rate = ASSET_CLASSES[selected_asset][risk_tolerance.lower()]
scenarios = {
"Conservative": return_rate - 2,
"Expected": return_rate,
"Optimistic": return_rate + 2
}
projection_data = []
for scenario_name, rate in scenarios.items():
monthly_rate = rate / 100 / 12
balance = initial_investment
for month in range(years * 12):
balance = balance * (1 + monthly_rate) + monthly_contribution
projection_data.append({
"Month": month,
"Balance": balance,
"Scenario": scenario_name
})
df_projections = pd.DataFrame(projection_data)
fig_scenario = go.Figure()
for scenario_name in scenarios.keys():
scenario_df = df_projections[df_projections["Scenario"]
== scenario_name]
fig_scenario.add_trace(go.Scatter(
x=scenario_df["Month"] / 12,
y=scenario_df["Balance"],
name=f"{scenario_name} ({scenarios[scenario_name]}%)",
hovertemplate="Year %{x:.1f}<br>Balance: $%{y:,.2f}"
))
fig_scenario.update_layout(
title=f"{selected_asset} Investment Projection",
xaxis_title="Years",
yaxis_title="Portfolio Value ($)",
hovermode="x unified",
height=400
)
st.plotly_chart(fig_scenario, use_container_width=True)
st.write("### Investment Summary")
summary_cols = st.columns(len(scenarios))
for idx, (scenario_name, rate) in enumerate(scenarios.items()):
with summary_cols[idx]:
st.write(scenario_name)
scenario_df = df_projections[df_projections["Scenario"]
== scenario_name]
final_data = scenario_df[scenario_df["Month"] == (
years * 12 - 1)]
final_value = final_data["Balance"].values[0] if not final_data.empty else initial_investment
total_invested = initial_investment + \
(monthly_contribution * years * 12)
total_return = final_value - total_invested
roi_percent = (total_return / total_invested) * \
100 if total_invested else 0
st.metric("Final Value", f"${final_value:,.2f}")
st.metric("Total Return", f"${total_return:,.2f}")
st.metric("ROI", f"{roi_percent:.1f}%")
# ----------------- Feedback Panel -----------------
if st.session_state.get('selected_panel', 'Welcome') == "Feedback":
st.markdown("<h2>Feedback</h2>", unsafe_allow_html=True)
feedback = st.text_area("Please enter your feedback below:")
if st.button("Submit Feedback"):
try:
logging.info("User feedback received: %s", feedback)
st.success("Thank you for your feedback!")
except Exception as e:
logging.error("Error submitting feedback: %s", str(e))
st.error(
"There was an error submitting your feedback. Please try again.")
# ----------------- Basic Unit Tests -----------------
if st.session_state.selected_panel == "Basic Unit Tests":
st.markdown("<h1 style='font-size:32px;'>🧪 Basic Unit Tests</h1>",
unsafe_allow_html=True)
# Test 1: Testing identify_and_group_expenses
test_transactions = [
{"description": "Direct Deposit Salary", "amount": 1000},
{"description": "Uber ride", "amount": 20},
{"description": "Dinner at restaurant", "amount": 50},
{"description": "Grocery shopping", "amount": 80}
]
try:
result = identify_and_group_expenses(test_transactions)
st.write("identify_and_group_expenses output:", result)
st.success("Basic unit tests completed!")
except Exception as e:
st.error(f"Error during basic tests: {e}")
logging.error("Error in basic unit tests: %s", str(e))
# -------------------------------------------------
# Bank Format Rules Configuration
# This configuration maps bank names to their specific transaction format rules.
# Each entry includes:
# - required_headers: A list of headers expected in the transaction file.
# - income_keywords: A list of keywords that indicate income transactions.
# - expense_keywords: A dictionary mapping expense categories to keywords.
BANK_FORMAT_RULES = {
"CapitalOne": {
"required_headers": ["description", "amount", "date"],
"income_keywords": ["direct deposit", "deposit", "refund", "rebate"],
"expense_keywords": {
"Food": ["restaurant", "cafe", "lunch", "breakfast", "dinner"],
"Transport": ["uber", "taxi", "bus", "train", "subway"],
"Utilities": ["electricity", "water", "gas", "internet", "phone"],
"Entertainment": ["movie", "concert", "game", "netflix"],
"Groceries": ["supermarket", "grocery", "market"]
}
},
"Chase": {
"required_headers": ["description", "amount", "date"],
"income_keywords": ["direct deposit", "deposit", "refund"],
"expense_keywords": {
"Food": ["restaurant", "cafe", "eatery"],
"Transport": ["uber", "lyft", "taxi"],
"Utilities": ["electric bill", "water bill"],
"Entertainment": ["theater", "concert"],
"Groceries": ["grocery store", "supermarket"]
}
}
}
# -------------------------------------------------
# Bank Statement Processing Functions
# -------------------------------------------------
def parse_chase_format(text):
"""Parse Chase bank statement format."""
transactions = []
lines = text.split('\n')
for line in lines:
if not line.strip() or any(skip in line.upper() for skip in ['PAGE', 'BALANCE', 'STATEMENT PERIOD']):
continue
date_pattern = r'(\d{2}/\d{2}/\d{2,4})'
amount_pattern = r'[-+]?\d*\.\d{2}'
date_match = re.search(date_pattern, line)
amount_match = re.search(amount_pattern, line)
if date_match and amount_match:
date = date_match.group(1)
amount = float(amount_match.group())
description = line[date_match.end():amount_match.start()].strip()
transactions.append({
"date": date,
"description": description,
"amount": amount,
"type": "expense" if amount < 0 else "income"
})
return transactions
def parse_boa_format(text):
"""Parse Bank of America statement format."""
transactions = []
lines = text.split('\n')
for line in lines:
if not line.strip() or any(skip in line.upper() for skip in ['PAGE', 'BALANCE', 'STATEMENT PERIOD']):
continue
date_pattern = r'(\d{2}/\d{2}/\d{4})'
amount_pattern = r'[-+]?[\d,]*\.\d{2}'
date_match = re.search(date_pattern, line)
amount_match = re.search(amount_pattern, line)
if date_match and amount_match:
date = date_match.group(1)
amount = float(amount_match.group().replace(',', ''))
description = line[date_match.end():amount_match.start()].strip()
transactions.append({
"date": date,
"description": description,
"amount": amount,
"type": "expense" if amount < 0 else "income"
})
return transactions
def parse_wells_fargo_format(text):
"""Parse Wells Fargo statement format."""
transactions = []
lines = text.split('\n')
for line in lines:
if not line.strip() or any(skip in line.upper() for skip in ['PAGE', 'BALANCE', 'STATEMENT PERIOD']):
continue
date_pattern = r'(\d{2}/\d{2})'
amount_pattern = r'[-+]?[\d,]*\.\d{2}'
date_match = re.search(date_pattern, line)
amount_match = re.search(amount_pattern, line)
if date_match and amount_match:
date = date_match.group(1)
date = f"{date}/{datetime.now().year}"
amount = float(amount_match.group().replace(',', ''))
description = line[date_match.end():amount_match.start()].strip()
transactions.append({
"date": date,
"description": description,
"amount": amount,
"type": "expense" if amount < 0 else "income"
})
return transactions
def parse_citibank_format(text):
"""Parse Citibank statement format."""
transactions = []
lines = text.split('\n')
for line in lines:
if not line.strip() or any(skip in line.upper() for skip in ['PAGE', 'BALANCE', 'STATEMENT PERIOD']):
continue
date_pattern = r'(\d{2}/\d{2})'
amount_pattern = r'[-+]?[\d,]*\.\d{2}'
date_match = re.search(date_pattern, line)
amount_matches = list(re.finditer(amount_pattern, line))
if date_match and amount_matches:
date = date_match.group(1)
date = f"{date}/{datetime.now().year}"
amounts = [float(m.group().replace(',', ''))
for m in amount_matches]
amount = next((amt for amt in amounts if amt != 0), 0)
description = line[date_match.end(
):amount_matches[0].start()].strip()
transactions.append({
"date": date,
"description": description,
"amount": amount,
"type": "expense" if amount < 0 else "income"
})
return transactions
def parse_generic_format(text):
"""Parse generic bank statement format for banks not specifically supported."""
transactions = []
lines = text.split('\n')
date_patterns = [
r'(\d{2}/\d{2}/\d{2,4})', # MM/DD/YY or MM/DD/YYYY
r'(\d{2}-\d{2}-\d{2,4})', # MM-DD-YY or MM-DD-YYYY
# DD MMM YYYY
r'(\d{2}\s+(?:Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep|Oct|Nov|Dec)\s+\d{2,4})'
]
amount_pattern = r'[-+]?[\d,]*\.\d{2}'
for line in lines:
if not line.strip() or any(skip in line.upper() for skip in ['PAGE', 'BALANCE', 'STATEMENT PERIOD']):
continue
date_match = None
for pattern in date_patterns:
date_match = re.search(pattern, line)
if date_match:
break
amount_match = re.search(amount_pattern, line)
if date_match and amount_match:
date = date_match.group(1)
amount = float(amount_match.group().replace(',', ''))
description = line[date_match.end():amount_match.start()].strip()
transactions.append({
"date": date,
"description": description,
"amount": amount,
"type": "expense" if amount < 0 else "income"
})
return transactions
def process_bank_statement(pdf_file, bank_name):
"""Process a bank statement PDF and categorize transactions."""
transactions = []
try:
with pdfplumber.open(pdf_file) as pdf:
text = ""
for page in pdf.pages:
text += page.extract_text() + "\n"
# Apply bank-specific parsing
if bank_name == "Chase":
transactions = parse_chase_format(text)
elif bank_name == "Bank of America":
transactions = parse_boa_format(text)
elif bank_name == "Wells Fargo":
transactions = parse_wells_fargo_format(text)
elif bank_name == "Citibank":
transactions = parse_citibank_format(text)
else:
transactions = parse_generic_format(text)
if not transactions:
st.warning(
"No transactions found. The statement format might not be supported.")
except Exception as e:
st.error(f"Error processing PDF: {str(e)}")
return []
return categorize_transactions(transactions, bank_name)
def categorize_transactions(transactions, bank_name):
"""Categorize transactions using keyword matching."""
conn = get_connection()
cursor = conn.cursor()
# Get all keywords for this bank
cursor.execute(
"SELECT keyword, category FROM bank_keywords WHERE bank_name = ?", (bank_name,))
keywords = cursor.fetchall()
categorized = []
for transaction in transactions:
matched_category = None
max_ratio = 0
for keyword, category in keywords:
# Use fuzzy matching to find the best category match
ratio = fuzz.partial_ratio(
keyword.lower(), transaction['description'].lower())
if ratio > max_ratio and ratio > 80: # 80% threshold for match
max_ratio = ratio
matched_category = category
transaction['category'] = matched_category or 'Uncategorized'
categorized.append(transaction)
conn.close()
return categorized