smartspend / app.py
annmarysruthy's picture
Update app.py
609da79 verified
import streamlit as st
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import altair as alt
import google.generativeai as genai
from datetime import datetime, timedelta
import os
import re
import json
import sqlite3
from PIL import Image
import io
# App title and configuration
st.set_page_config(page_title="Smart Spend Tracker", layout="wide")
# Persistent storage path in Hugging Face Spaces
DATA_DIR = "/data" if os.path.exists("/data") else "data" # Fallback to local 'data' folder for testing
DB_PATH = os.path.join(DATA_DIR, "transactions.db")
# Ensure the data directory exists
if not os.path.exists(DATA_DIR):
os.makedirs(DATA_DIR)
# Initialize SQLite database
def init_db():
conn = sqlite3.connect(DB_PATH)
c = conn.cursor()
c.execute('''CREATE TABLE IF NOT EXISTS transactions
(id INTEGER PRIMARY KEY AUTOINCREMENT,
Date TEXT,
Category TEXT,
Amount REAL,
Type TEXT,
Description TEXT,
Balance REAL)''')
conn.commit()
conn.close()
# Load data from SQLite into DataFrame with capitalized column names
def load_data():
conn = sqlite3.connect(DB_PATH)
df = pd.read_sql_query("SELECT * FROM transactions", conn)
conn.close()
# Drop the 'id' column and rename columns to match expected capitalization
if not df.empty and 'id' in df.columns:
df = df.drop(columns=['id'])
if not df.empty:
df.columns = ['Date', 'Category', 'Amount', 'Type', 'Description', 'Balance']
else:
df = pd.DataFrame(columns=['Date', 'Category', 'Amount', 'Type', 'Description', 'Balance'])
return df
# Save DataFrame to SQLite
def save_data(df):
conn = sqlite3.connect(DB_PATH)
# Drop any existing table and recreate with new data
conn.execute("DROP TABLE IF EXISTS transactions")
conn.execute('''CREATE TABLE transactions
(id INTEGER PRIMARY KEY AUTOINCREMENT,
Date TEXT,
Category TEXT,
Amount REAL,
Type TEXT,
Description TEXT,
Balance REAL)''')
# Save with capitalized column names
df_to_save = df[['Date', 'Category', 'Amount', 'Type', 'Description', 'Balance']]
df_to_save.to_sql('transactions', conn, if_exists='append', index=False)
conn.commit()
conn.close()
# Initialize session state and database
if 'df' not in st.session_state:
init_db() # Create the database if it doesn't exist
st.session_state.df = load_data()
if 'chat_history' not in st.session_state:
st.session_state.chat_history = []
# Load Gemini API key from secrets
def configure_genai():
if 'GEMINI_API_KEY' in st.secrets:
api_key = st.secrets['GEMINI_API_KEY']
else:
api_key = os.environ.get('GEMINI_API_KEY')
if not api_key:
st.error("Gemini API key not found. Please add it to secrets or environment variables.")
st.stop()
genai.configure(api_key=api_key)
return genai.GenerativeModel('gemini-2.0-flash')
# Get multimodal model for image processing
def get_multimodal_model():
if 'GEMINI_API_KEY' in st.secrets:
api_key = st.secrets['GEMINI_API_KEY']
else:
api_key = os.environ.get('GEMINI_API_KEY')
if not api_key:
st.error("Gemini API key not found. Please add it to secrets or environment variables.")
st.stop()
genai.configure(api_key=api_key)
return genai.GenerativeModel('gemini-1.5-flash')
model = configure_genai()
# Function to extract transaction data using Gemini
def extract_expense_data(text):
prompt = f"""
Extract financial transaction information from the following text.
Return a JSON object or array with these fields:
- Date: in YYYY-MM-DD format (use today's date if not specified)
- Category: the transaction category (e.g., food, transport, salary)
- Amount: the numerical amount (no currency symbol)
- Type: 'Income' or 'Expense' (infer from context, e.g., 'spent' implies Expense, 'earned' implies Income)
- Description: brief description
Example:
{{
"Date": "2025-03-19",
"Category": "salary",
"Amount": 1000,
"Type": "Income",
"Description": "monthly salary"
}}
Text: {text}
"""
try:
response = model.generate_content(prompt)
response_text = response.text
json_match = re.search(r'```json\n(.*?)```', response_text, re.DOTALL)
if json_match:
json_str = json_match.group(1)
else:
json_str = response_text
data = json.loads(json_str)
return data
except Exception as e:
st.error(f"Error extracting transaction data: {e}")
return None
# Function to extract transaction data from receipt image
def extract_data_from_receipt(image):
try:
# Ensure image is a PIL Image
if not isinstance(image, Image.Image):
st.error("Invalid image format")
return None
# Convert PIL image to bytes using BytesIO
img_byte_arr = io.BytesIO()
image.save(img_byte_arr, format='PNG')
img_byte_arr.seek(0) # Reset pointer to beginning of bytes
# Use Gemini's multimodal capabilities
multimodal_model = get_multimodal_model()
# Create proper image part for the Gemini API
image_part = {"mime_type": "image/png", "data": img_byte_arr.getvalue()}
prompt = """
Extract financial transaction information from this receipt image.
Return a JSON object with these fields:
- Date: in YYYY-MM-DD format (if visible on the receipt, otherwise use today's date)
- Category: the transaction category (e.g., groceries, restaurant, retail)
- Amount: the total amount as a number (no currency symbol)
- Type: 'Expense' (receipts are typically expenses)
- Description: store name or main item purchased
Example:
{
"Date": "2025-03-19",
"Category": "groceries",
"Amount": 45.67,
"Type": "Expense",
"Description": "Whole Foods Market"
}
Return ONLY the JSON object, properly formatted.
"""
# Use the correct format for multimodal input
response = multimodal_model.generate_content([prompt, image_part])
response_text = response.text
# Try to extract JSON from the response
json_match = re.search(r'```json\n(.*?)```', response_text, re.DOTALL)
if json_match:
json_str = json_match.group(1)
else:
# If not in code block, try to find JSON structure directly
json_str = response_text
# Clean up the string to ensure it's valid JSON
json_str = re.sub(r'[^\x00-\x7F]+', '', json_str) # Remove non-ASCII characters
# Parse the JSON data
data = json.loads(json_str)
return data
except Exception as e:
st.error(f"Error extracting data from receipt: {e}")
st.error(f"Response text: {response_text if 'response_text' in locals() else 'No response received'}")
st.error(f"Image info: {type(image)}")
return None
# Function to add transactions to the dataframe and database
def add_expense_to_df(expense_data):
if isinstance(expense_data, list):
for expense in expense_data:
add_single_expense(expense)
else:
add_single_expense(expense_data)
# Update balance and save to database
st.session_state.df = st.session_state.df.sort_values(by='Date')
st.session_state.df['Balance'] = st.session_state.df['Amount'].cumsum()
save_data(st.session_state.df)
def add_single_expense(expense):
try:
amount = float(expense['Amount'])
except:
amount = 0.0
transaction_type = expense.get('Type', 'Expense')
amount = amount if transaction_type == 'Income' else -amount
new_row = pd.DataFrame({
'Date': [expense.get('Date', datetime.now().strftime('%Y-%m-%d'))],
'Category': [expense.get('Category', 'Other')],
'Amount': [amount],
'Type': [transaction_type],
'Description': [expense.get('Description', '')],
'Balance': [0.0] # Temporary balance, will be recalculated
})
st.session_state.df = pd.concat([st.session_state.df, new_row], ignore_index=True)
# Function to get AI insights
def get_expense_insights(query):
if st.session_state.df.empty:
return "No transaction data available yet."
df_str = st.session_state.df.to_string()
prompt = f"""
Here is a dataset of financial transactions (positive amounts are income, negative are expenses):
{df_str}
User query: {query}
Analyze this data and answer the query clearly and concisely.
If the query is about visualizations, describe what chart would be helpful.
"""
try:
response = model.generate_content(prompt)
return response.text
except Exception as e:
return f"Error getting insights: {e}"
# Function to get date range for month comparison
def get_month_date_range(date, months_back=1):
if isinstance(date, str):
date = pd.to_datetime(date)
current_month_start = date.replace(day=1)
next_month_start = (current_month_start + pd.DateOffset(months=1))
current_month_end = next_month_start - pd.DateOffset(days=1)
prev_month_start = current_month_start - pd.DateOffset(months=months_back)
prev_month_end = current_month_start - pd.DateOffset(days=1)
return {
'current': (current_month_start, current_month_end),
'previous': (prev_month_start, prev_month_end)
}
# Function to create visualizations
def create_visualizations(df, date_range=None, selected_categories=None):
if df.empty:
st.info("Add some transactions to see visualizations")
return
viz_df = df.copy()
viz_df['Date'] = pd.to_datetime(viz_df['Date'])
if date_range:
viz_df = viz_df[(viz_df['Date'] >= date_range[0]) & (viz_df['Date'] <= date_range[1])]
if selected_categories and len(selected_categories) > 0:
viz_df = viz_df[viz_df['Category'].isin(selected_categories)]
if viz_df.empty:
st.info("No data available for the selected filters")
return
tab1, tab2, tab3 = st.tabs(["By Category", "Over Time", "Recent Transactions"])
with tab1:
st.subheader("Transactions by Category")
category_totals = viz_df.groupby('Category')['Amount'].sum().reset_index()
fig, ax = plt.subplots(figsize=(8, 8))
ax.pie(category_totals['Amount'].abs(), labels=category_totals['Category'], autopct='%1.1f%%')
ax.set_title('Transactions by Category')
st.pyplot(fig)
category_chart = alt.Chart(category_totals).mark_bar().encode(
x=alt.X('Category:N', sort='-y'),
y=alt.Y('Amount:Q'),
color=alt.condition(
alt.datum.Amount > 0,
alt.value('green'), # Income
alt.value('red') # Expenses
)
).properties(
title='Total by Category'
)
st.altair_chart(category_chart, use_container_width=True)
with tab2:
st.subheader("Balance Over Time")
daily_totals = viz_df.groupby(viz_df['Date'].dt.date)['Amount'].sum().cumsum().reset_index()
daily_totals.columns = ['Date', 'Balance']
time_chart = alt.Chart(daily_totals).mark_line(point=True).encode(
x='Date:T',
y='Balance:Q',
tooltip=['Date:T', 'Balance:Q']
).properties(
title='Balance Over Time'
)
st.altair_chart(time_chart, use_container_width=True)
with tab3:
st.subheader("Recent Transactions")
recent = viz_df.sort_values('Date', ascending=False).head(10)
recent_chart = alt.Chart(recent).mark_bar().encode(
x=alt.X('Description:N', sort='-y'),
y='Amount:Q',
color=alt.condition(
alt.datum.Amount > 0,
alt.value('green'),
alt.value('red')
),
tooltip=['Date:T', 'Category:N', 'Amount:Q', 'Description:N']
).properties(
title='Most Recent Transactions'
)
st.altair_chart(recent_chart, use_container_width=True)
# Function to compare expenses
def compare_expenses(df, date_range1, date_range2, selected_categories=None):
if df.empty:
st.info("Add some transactions to see comparisons")
return
analysis_df = df.copy()
analysis_df['Date'] = pd.to_datetime(analysis_df['Date'])
current_df = analysis_df[(analysis_df['Date'] >= date_range1[0]) & (analysis_df['Date'] <= date_range1[1])]
previous_df = analysis_df[(analysis_df['Date'] >= date_range2[0]) & (analysis_df['Date'] <= date_range2[1])]
if selected_categories and len(selected_categories) > 0:
current_df = current_df[current_df['Category'].isin(selected_categories)]
previous_df = previous_df[previous_df['Category'].isin(selected_categories)]
if current_df.empty and previous_df.empty:
st.info("No data for selected periods")
return
current_income = current_df[current_df['Amount'] > 0]['Amount'].sum()
current_expense = abs(current_df[current_df['Amount'] < 0]['Amount'].sum())
previous_income = previous_df[previous_df['Amount'] > 0]['Amount'].sum()
previous_expense = abs(previous_df[previous_df['Amount'] < 0]['Amount'].sum())
st.subheader("Period Comparison")
col1, col2 = st.columns(2)
with col1:
st.metric(f"{date_range1[0].strftime('%b %Y')} Income", f"${current_income:.2f}")
st.metric(f"{date_range1[0].strftime('%b %Y')} Expenses", f"${current_expense:.2f}")
with col2:
st.metric(f"{date_range2[0].strftime('%b %Y')} Income", f"${previous_income:.2f}")
st.metric(f"{date_range2[0].strftime('%b %Y')} Expenses", f"${previous_expense:.2f}")
# App layout
st.title("💰 Smart Spend Tracker")
# Simplified sidebar navigation
page = st.sidebar.radio("Navigation", ["Add Transactions", "Upload Receipt", "Edit Transactions", "View & Analyze", "Chat with Data"])
if page == "Add Transactions":
st.header("Add Your Transactions")
st.write("Describe your income or expenses in natural language.")
with st.form("transaction_form"):
user_input = st.text_area(
"Enter your transactions:",
height=100,
placeholder="Example: Earned $1000 from salary today, spent $25 on lunch yesterday"
)
submit_button = st.form_submit_button("Add Transactions")
if submit_button and user_input:
with st.spinner("Processing..."):
transaction_data = extract_expense_data(user_input)
if transaction_data:
add_expense_to_df(transaction_data)
st.success("Transactions added!")
st.json(transaction_data)
else:
st.error("Failed to extract data. Try a clearer description.")
if not st.session_state.df.empty:
st.subheader("Recent Transactions")
display_df = st.session_state.df.copy()
display_df['Credit'] = display_df['Amount'].apply(lambda x: x if x > 0 else 0)
display_df['Debit'] = display_df['Amount'].apply(lambda x: abs(x) if x < 0 else 0)
st.dataframe(
display_df[['Date', 'Description', 'Category', 'Credit', 'Debit', 'Balance']]
.sort_values(by='Date', ascending=False),
use_container_width=True
)
elif page == "Upload Receipt":
st.header("Upload Receipt Images")
st.write("Upload images of your receipts to automatically extract transaction data.")
uploaded_file = st.file_uploader("Upload a receipt image", type=["jpg", "jpeg", "png"])
# Create session state variables if they don't exist
if 'receipt_data' not in st.session_state:
st.session_state.receipt_data = None
if 'receipt_processed' not in st.session_state:
st.session_state.receipt_processed = False
if uploaded_file is not None:
# Display the uploaded image
image = Image.open(uploaded_file)
st.image(image, caption="Uploaded Receipt", width=400)
# Process button
if st.button("Extract Data from Receipt") or st.session_state.receipt_processed:
if not st.session_state.receipt_processed:
with st.spinner("Processing receipt image..."):
transaction_data = extract_data_from_receipt(image)
st.session_state.receipt_data = transaction_data
st.session_state.receipt_processed = True
if st.session_state.receipt_data:
# Show extracted data
st.success("Successfully extracted data from receipt!")
st.json(st.session_state.receipt_data)
# Ask for confirmation
if st.button("Confirm and Add Transaction"):
add_expense_to_df(st.session_state.receipt_data)
st.success("Transaction added to your records!")
# Reset the receipt processing state
st.session_state.receipt_processed = False
st.session_state.receipt_data = None
# Show updated recent transactions
st.subheader("Recent Transactions")
display_df = st.session_state.df.copy()
display_df['Credit'] = display_df['Amount'].apply(lambda x: x if x > 0 else 0)
display_df['Debit'] = display_df['Amount'].apply(lambda x: abs(x) if x < 0 else 0)
st.dataframe(
display_df[['Date', 'Description', 'Category', 'Credit', 'Debit', 'Balance']]
.sort_values(by='Date', ascending=False),
use_container_width=True
)
else:
st.error("Could not extract transaction data from image. Please try a clearer image or enter the details manually.")
st.session_state.receipt_processed = False
# Manual entry option if image processing fails
st.subheader("Manual Receipt Entry")
st.write("If image processing fails, you can enter the receipt details manually:")
with st.form("manual_receipt_form"):
col1, col2 = st.columns(2)
with col1:
receipt_date = st.date_input("Receipt Date", value=datetime.now())
receipt_amount = st.number_input("Amount ($)", min_value=0.01, step=0.01)
with col2:
receipt_category = st.text_input("Category", placeholder="e.g., Groceries, Restaurant")
receipt_description = st.text_input("Description", placeholder="Store name or items purchased")
manual_submit = st.form_submit_button("Add Manual Receipt")
if manual_submit:
manual_data = {
"Date": receipt_date.strftime('%Y-%m-%d'),
"Category": receipt_category,
"Amount": receipt_amount,
"Type": "Expense",
"Description": receipt_description
}
add_expense_to_df(manual_data)
st.success("Receipt transaction added manually!")
elif page == "Edit Transactions":
st.header("Edit Transactions")
if st.session_state.df.empty:
st.info("No transactions available to edit.")
else:
st.write("Select a transaction to edit:")
display_df = st.session_state.df.copy()
display_df['Credit'] = display_df['Amount'].apply(lambda x: x if x > 0 else 0)
display_df['Debit'] = display_df['Amount'].apply(lambda x: abs(x) if x < 0 else 0)
# Display the current transactions
st.dataframe(
display_df[['Date', 'Description', 'Category', 'Credit', 'Debit', 'Balance']]
.sort_values(by='Date', ascending=False),
use_container_width=True
)
# Select transaction to edit
transaction_index = st.selectbox(
"Select transaction to edit (by index):",
options=range(len(st.session_state.df)),
format_func=lambda x: f"{st.session_state.df.iloc[x]['Date']} - {st.session_state.df.iloc[x]['Description']} (${abs(st.session_state.df.iloc[x]['Amount']):.2f})"
)
# Show current transaction details and allow editing
with st.form("edit_form"):
selected_transaction = st.session_state.df.iloc[transaction_index]
edit_date = st.date_input("Date", value=pd.to_datetime(selected_transaction['Date']))
edit_category = st.text_input("Category", value=selected_transaction['Category'])
edit_amount = st.number_input("Amount ($)", value=abs(selected_transaction['Amount']), min_value=0.0, step=0.01)
edit_type = st.selectbox("Type", ["Income", "Expense"], index=0 if selected_transaction['Type'] == "Income" else 1)
edit_description = st.text_input("Description", value=selected_transaction['Description'])
submit_edit = st.form_submit_button("Update Transaction")
if submit_edit:
# Update the transaction using .loc
updated_amount = edit_amount if edit_type == "Income" else -edit_amount
st.session_state.df.loc[transaction_index, ['Date', 'Category', 'Amount', 'Type', 'Description']] = [
edit_date.strftime('%Y-%m-%d'),
edit_category,
updated_amount,
edit_type,
edit_description
]
# Recalculate balance and save to database
st.session_state.df = st.session_state.df.sort_values(by='Date')
st.session_state.df['Balance'] = st.session_state.df['Amount'].cumsum()
save_data(st.session_state.df)
st.success("Transaction updated successfully!")
# Show updated table
updated_display_df = st.session_state.df.copy()
updated_display_df['Credit'] = updated_display_df['Amount'].apply(lambda x: x if x > 0 else 0)
updated_display_df['Debit'] = updated_display_df['Amount'].apply(lambda x: abs(x) if x < 0 else 0)
st.dataframe(
updated_display_df[['Date', 'Description', 'Category', 'Credit', 'Debit', 'Balance']]
.sort_values(by='Date', ascending=False),
use_container_width=True
)
elif page == "View & Analyze":
st.header("Your Financial Data")
if not st.session_state.df.empty:
df = st.session_state.df.copy()
df['Date'] = pd.to_datetime(df['Date'])
all_categories = sorted(df['Category'].unique())
st.sidebar.header("Filter Options")
filter_method = st.sidebar.radio("Filter by", ["Custom Date Range", "Month", "No Filter"])
if filter_method == "Custom Date Range":
min_date = df['Date'].min().date()
max_date = df['Date'].max().date()
start_date = st.sidebar.date_input("Start Date", min_date)
end_date = st.sidebar.date_input("End Date", max_date)
date_filter = (pd.to_datetime(start_date), pd.to_datetime(end_date))
elif filter_method == "Month":
df['Month'] = df['Date'].dt.strftime('%b %Y')
months = sorted(df['Month'].unique(), key=lambda x: pd.to_datetime(x, format='%b %Y'), reverse=True)
selected_month = st.sidebar.selectbox("Select Month", months)
month_dt = pd.to_datetime(selected_month, format='%b %Y')
date_ranges = get_month_date_range(month_dt)
date_filter = date_ranges['current']
else:
date_filter = None
selected_categories = st.sidebar.multiselect("Select Categories", all_categories)
filtered_df = df.copy()
if date_filter:
filtered_df = filtered_df[(filtered_df['Date'] >= date_filter[0]) & (filtered_df['Date'] <= date_filter[1])]
if selected_categories:
filtered_df = filtered_df[filtered_df['Category'].isin(selected_categories)]
st.subheader("Transaction History")
display_df = filtered_df.copy()
display_df['Credit'] = display_df['Amount'].apply(lambda x: x if x > 0 else 0)
display_df['Debit'] = display_df['Amount'].apply(lambda x: abs(x) if x < 0 else 0)
st.dataframe(
display_df[['Date', 'Description', 'Category', 'Credit', 'Debit', 'Balance']]
.sort_values(by='Date', ascending=False),
use_container_width=True
)
csv = filtered_df.to_csv(index=False)
st.download_button(
label="Download CSV",
data=csv,
file_name="transactions.csv",
mime="text/csv"
)
st.subheader("Summary Statistics")
col1, col2, col3 = st.columns(3)
with col1:
st.metric("Total Income", f"${display_df['Credit'].sum():.2f}")
with col2:
st.metric("Total Expenses", f"${display_df['Debit'].sum():.2f}")
with col3:
st.metric("Current Balance", f"${display_df['Balance'].iloc[-1]:.2f}")
st.subheader("Visualizations")
create_visualizations(df, date_filter, selected_categories)
st.header("Month-to-Month Comparison")
df['Month'] = df['Date'].dt.strftime('%b %Y')
months = sorted(df['Month'].unique(), key=lambda x: pd.to_datetime(x, format='%b %Y'), reverse=True)
if len(months) >= 2:
col1, col2 = st.columns(2)
with col1:
current_month = st.selectbox("Current Month", months, index=0)
with col2:
previous_month = st.selectbox("Previous Month", months, index=min(1, len(months)-1))
current_date = pd.to_datetime(current_month, format='%b %Y')
previous_date = pd.to_datetime(previous_month, format='%b %Y')
current_range = get_month_date_range(current_date)['current']
previous_range = get_month_date_range(previous_date)['current']
compare_expenses(df, current_range, previous_range, selected_categories)
else:
st.info("Need at least two months of data for comparison")
else:
st.info("No transaction data available yet.")
elif page == "Chat with Data":
st.header("Chat with Your Financial Data")
if st.session_state.df.empty:
st.info("No data available yet.")
else:
st.write("Ask questions about your transactions.")
for message in st.session_state.chat_history:
with st.chat_message(message["role"]):
st.write(message["content"])
user_query = st.chat_input("Ask about your finances...")
if user_query:
st.session_state.chat_history.append({"role": "user", "content": user_query})
with st.chat_message("user"):
st.write(user_query)
with st.spinner("Thinking..."):
response = get_expense_insights(user_query)
st.session_state.chat_history.append({"role": "assistant", "content": response})
with st.chat_message("assistant"):
st.write(response)