PettyCash-api / main.py
rairo's picture
Update main.py
46e7291 verified
import os
import io
import json
import hashlib
from datetime import datetime, time, timedelta
from PIL import Image
import pytz
from flask import Flask, request, jsonify, send_file
from flask_cors import CORS
import google.generativeai as genai
import firebase_admin
from firebase_admin import credentials, db, storage, auth
import pandas as pd
import requests
from urllib.parse import urlparse, unquote
import numpy as np
app = Flask(__name__)
CORS(app)
# Firebase initialization
cred = credentials.Certificate('mydateproject-e7994-firebase-adminsdk-mpa9a-2fd9c32a98.json')
firebase_admin.initialize_app(cred, {
'databaseURL': 'https://mydateproject-e7994.firebaseio.com',
'storageBucket': 'mydateproject-e7994.firebasestorage.app'
})
bucket = storage.bucket()
api_key = os.environ['Gemini']
FIREBASE_API_KEY = os.environ.get('FIREBASE_API_KEY')
# Helper functions
def configure_gemini():
genai.configure(api_key=api_key)
return genai.GenerativeModel('gemini-2.0-flash-thinking-exp')
def verify_token(token):
try:
decoded_token = auth.verify_id_token(token)
return decoded_token['uid']
except Exception as e:
return None
def check_daily_reset(user_ref):
try:
user_data = user_ref.get()
now = datetime.now(pytz.UTC)
last_reset = datetime.fromisoformat(user_data.get('last_reset', '2000-01-01T00:00:00+00:00'))
if now.time() >= time(8,0) and last_reset.date() < now.date():
user_ref.update({
'remaining_cash': user_data['daily_cash'],
'last_reset': now.isoformat()
})
return True
return False
except Exception as e:
print(f"Reset error: {str(e)}")
return False
# Process receipt image
def process_receipt(model, image):
prompt = """Analyze this image and determine if it's a receipt. If it is a receipt, extract:
- Total amount (as float)
- List of items purchased (array of strings)
- Date of transaction (DD/MM/YYYY format)
- Receipt number (as string)
Return JSON format with keys: is_receipt (boolean), total, items, date, receipt_number.
If not a receipt, return {"is_receipt": false}"""
try:
response = model.generate_content([prompt, image])
return response.text
except Exception as e:
print(f"Gemini error: {str(e)}")
return "{}"
# Write report
@app.route('/api/write-report', methods=['POST'])
def generate_report():
prompt = """You are the TrueSpend AI analyst, analyze this transaction data
and write an insightful business report on the spending habits of the employees.
Make sure the report is in plain text"""
model = configure_gemini()
try:
transaction_data = request.get_json()
transaction_json_string = json.dumps(transaction_data['transactions'])
response = model.generate_content([prompt, transaction_json_string])
report = response.text
return jsonify({"report": report})
except Exception as e:
return jsonify({"error": str(e)}), 500
# ========================================
# Authentication Endpoints
# ========================================
# (Any existing authentication endpoints remain unchanged)
# ========================================
# Image Receipt Processing Endpoint
# ========================================
@app.route('/api/process-receipt', methods=['POST'])
def process_receipt_endpoint():
try:
auth_header = request.headers.get('Authorization')
token = auth_header.split('Bearer ')[1] if auth_header else None
uid = verify_token(token) if token else None
if not uid:
return jsonify({'error': 'Invalid token'}), 401
user_ref = db.reference(f'users/{uid}')
user_data = user_ref.get()
# If the confirmation flag is set, then use the form fields (and hidden file_hash/image_url)
# to create the transaction. (Note: image_bytes is None because the image was already uploaded.)
if request.form.get('confirmed') == 'true':
data = {
'total': float(request.form.get('total')),
'items': [item.strip() for item in request.form.get('items', '').split(',')],
'date': request.form.get('date'),
'receipt_number': request.form.get('receipt_number'),
'is_receipt': True,
'image_url': request.form.get('image_url') # provided by the first submission
}
file_hash = request.form.get('file_hash', '')
# In this confirmation branch, we pass image_bytes as None.
return validate_and_save_transaction(
uid=uid,
user_data=user_data,
data=data,
file_hash=file_hash,
image_bytes=None,
manual=False
)
# Handle image processing (first submission)
file = request.files.get('receipt')
if not file:
return jsonify({'error': 'No file uploaded'}), 400
image_bytes = file.read()
file_hash = hashlib.md5(image_bytes).hexdigest()
transactions_ref = db.reference('transactions')
existing = transactions_ref.order_by_child('hash').equal_to(file_hash).get()
if existing:
return jsonify({'error': 'Receipt already processed'}), 400
# Immediately upload the image so that it is stored regardless of review outcome.
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
blob = bucket.blob(f'receipts/{uid}/{timestamp}_{file_hash}.jpg')
blob.upload_from_string(image_bytes, content_type='image/jpeg')
image_url = blob.public_url
# Process the image with Gemini
image = Image.open(io.BytesIO(image_bytes))
# --- Image Compression ---
compressed_buffer = io.BytesIO()
image.save(compressed_buffer, format="JPEG", optimize=True, quality=90) # Adjust quality as needed
compressed_image_bytes = compressed_buffer.getvalue()
compressed_image = Image.open(io.BytesIO(compressed_image_bytes))
model = configure_gemini()
result_text = process_receipt(model, compressed_image)
try:
json_str = result_text[result_text.find('{'):result_text.rfind('}')+1]
data = json.loads(json_str)
except json.JSONDecodeError:
return jsonify({'error': 'Failed to parse receipt data', 'raw_response': result_text}), 400
if not data.get('is_receipt', False):
return jsonify({'error': 'Not a valid receipt'}), 400
# Instead of saving immediately, return the extracted data along with the image info for review.
data['file_hash'] = file_hash
data['image_url'] = image_url
return jsonify({
'success': True,
'extracted': True,
'data': data,
'message': 'Review extracted information before confirming.'
})
except Exception as e:
print(e)
return jsonify({'error': str(e)}), 500
# ========================================
# Manual Entry Endpoint
# ========================================
@app.route('/api/manual-entry', methods=['POST'])
def manual_entry_endpoint():
try:
auth_header = request.headers.get('Authorization')
token = auth_header.split('Bearer ')[1] if auth_header else None
uid = verify_token(token) if token else None
if not uid:
return jsonify({'error': 'Invalid token'}), 401
user_ref = db.reference(f'users/{uid}')
user_data = user_ref.get()
return handle_manual_entry(uid, user_ref, user_data)
except Exception as e:
return jsonify({'error': str(e)}), 500
def handle_manual_entry(uid, user_ref, user_data):
try:
data = {
'total': float(request.form.get('total')),
'items': [item.strip() for item in request.form.get('items', '').split(',')],
'date': request.form.get('date'),
'receipt_number': request.form.get('receipt_number'),
'is_receipt': True
}
return validate_and_save_transaction(
uid=uid,
user_data=user_data,
data=data,
file_hash=hashlib.md5(str(datetime.now()).encode()).hexdigest(),
image_bytes=None,
manual=True
)
except Exception as e:
return jsonify({'error': str(e)}), 400
def validate_and_save_transaction(uid, user_data, data, file_hash, image_bytes, manual=False):
transactions_ref = db.reference('transactions')
receipt_number = data.get('receipt_number')
items = data.get('items', [])
# Check for duplicate transactions based on receipt number and items
existing_transactions_with_receipt = transactions_ref.order_by_child('receipt_number').equal_to(receipt_number).get()
if existing_transactions_with_receipt:
for transaction_id, existing_transaction_data in existing_transactions_with_receipt.items():
existing_items = sorted(existing_transaction_data.get('items', []))
current_items = sorted(items)
if existing_items == current_items:
return jsonify({'error': f"Transaction with Receipt #{receipt_number} and identical items already exists"}), 400
total = float(data.get('total', 0))
# Initialize image_url from data (could be None)
image_url = data.get('image_url')
# Upload image if image_bytes are provided and not manual
if image_bytes and not manual:
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
blob = bucket.blob(f'receipts/{uid}/{timestamp}_{file_hash}.jpg')
blob.upload_from_string(image_bytes, content_type='image/jpeg')
image_url = blob.public_url
# Update user cash - allowing negative values
new_remaining = user_data['remaining_cash'] - total
db.reference(f'users/{uid}').update({'remaining_cash': new_remaining})
# Build transaction data without image_url initially
transaction_data = {
'uid': uid,
'total': total,
'items': items,
'date': data.get('date'),
'receipt_number': receipt_number,
'timestamp': datetime.now(pytz.UTC).isoformat(),
'hash': file_hash,
'manual_entry': manual
}
# Only add image_url if it is not None
if image_url is not None:
transaction_data['image_url'] = image_url
# Save the transaction
new_transaction_ref = transactions_ref.push(transaction_data)
return jsonify({
'success': True,
'transaction': {**transaction_data, 'id': new_transaction_ref.key},
'remaining_cash': new_remaining
})
# ========================================
# Data Endpoints for Visualizations
# ========================================
@app.route('/api/user/spending-overview', methods=['GET'])
def get_spending_overview():
try:
# Extract the token and verify it.
auth_header = request.headers.get('Authorization', '')
token = auth_header.split(' ')[1] if len(auth_header.split(' ')) > 1 else ''
uid = verify_token(token)
# Get transactions for the user.
transactions_ref = db.reference('transactions')
transactions = transactions_ref.order_by_child('uid').equal_to(uid).get() or {}
# Convert to list for easier processing
transactions_list = []
for tx_id, tx_data in transactions.items():
# Add ID to the transaction data
tx_item = tx_data.copy()
tx_item['id'] = tx_id
transactions_list.append(tx_item)
# Create a DataFrame
df = pd.DataFrame(transactions_list) if transactions_list else pd.DataFrame()
if df.empty:
return jsonify({
'daily_spending': [],
'recent_transactions': []
})
# Handle date parsing without dropping invalid dates
if 'date' in df.columns:
# Create a temporary column for parsed dates
df['parsed_date'] = pd.to_datetime(df['date'], errors='coerce')
# For rows with invalid dates, set a default date
default_date = pd.Timestamp('2000-01-01')
mask = df['parsed_date'].isna()
if mask.any():
print(f"Found {mask.sum()} transactions with invalid dates")
# Keep the original string in 'date' column, but use default for calculations
df.loc[mask, 'parsed_date'] = default_date
# Create ISO format dates for valid dates
df['date_iso'] = df['parsed_date'].apply(lambda d: d.isoformat() if pd.notnull(d) else '2000-01-01T00:00:00')
# Extract date part for daily spending calculations
df['date_only'] = df['date_iso'].apply(lambda d: d.split('T')[0])
else:
# If no date column exists, create default values
df['date_only'] = '2000-01-01'
df['date_iso'] = '2000-01-01T00:00:00'
# For daily spending, group by date
daily_spending = df.groupby('date_only')['total'].sum().reset_index()
daily_spending.rename(columns={'date_only': 'date'}, inplace=True)
# Sort transactions by timestamp if available
if 'timestamp' in df.columns:
df = df.sort_values(by='timestamp', ascending=False)
# Select the most recent transactions
recent_transactions = df.head(10)
# Replace NaN with None for JSON serialization
daily_spending = daily_spending.replace({np.nan: None})
recent_transactions = recent_transactions.replace({np.nan: None})
# Ensure proper column selections for the response
recent_tx_dict = recent_transactions.to_dict(orient='records')
# Make sure each transaction has the original date string
for tx in recent_tx_dict:
# Remove temporary columns
if 'parsed_date' in tx:
del tx['parsed_date']
if 'date_only' in tx:
del tx['date_only']
if 'date_iso' in tx and 'date' not in tx:
tx['date'] = tx['date_iso']
del tx['date_iso']
elif 'date_iso' in tx:
del tx['date_iso']
return jsonify({
'daily_spending': daily_spending.to_dict(orient='records'),
'recent_transactions': recent_tx_dict
})
except Exception as e:
print(f"Error in spending overview: {e}")
return jsonify({'error': str(e)}), 500
# ========================================
# Modified verify_admin function (now checks database is_admin flag)
# ========================================
def verify_admin(auth_header):
if not auth_header or not auth_header.startswith('Bearer '):
raise ValueError('Invalid token')
token = auth_header.split(' ')[1]
uid = verify_token(token)
if not uid:
raise PermissionError('Invalid user')
user_ref = db.reference(f'users/{uid}')
user_data = user_ref.get()
if not user_data or not user_data.get('is_admin', False):
raise PermissionError('Admin access required')
try:
auth.set_custom_user_claims(uid, {"admin": True})
print(f"Custom admin claim set for user {uid}")
except Exception as e:
print(f"Error setting custom admin claim: {e}")
raise PermissionError('Error setting admin claim, but admin verified')
return uid
# ========================================
# Existing Admin Endpoints
# ========================================
@app.route('/api/admin/overview', methods=['GET'])
def get_admin_overview():
try:
verify_admin(request.headers.get('Authorization', ''))
users_ref = db.reference('users')
all_users = users_ref.get() or {}
users_list = []
for uid, user_data in all_users.items():
try:
auth_user = auth.get_user(uid)
email = auth_user.email
except:
email = "Deleted User"
users_list.append({
'uid': uid,
'email': email,
'daily_cash': user_data.get('daily_cash', 0),
'remaining_cash': user_data.get('remaining_cash', 0),
'last_reset': user_data.get('last_reset'),
'is_admin': user_data.get('is_admin', False)
})
transactions_ref = db.reference('transactions')
all_transactions = transactions_ref.get() or {}
transactions_list = [{'id': tid, **data} for tid, data in all_transactions.items()]
return jsonify({
'users': users_list,
'transactions': transactions_list,
'analytics': {
'total_users': len(users_list),
'total_transactions': len(transactions_list),
'total_spent': sum(t['total'] for t in transactions_list)
}
})
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/admin/users', methods=['POST'])
def create_user():
try:
verify_admin(request.headers.get('Authorization', ''))
data = request.get_json()
user = auth.create_user(
email=data['email'],
password=data['password']
)
user_ref = db.reference(f'users/{user.uid}')
user_data = {
'daily_cash': data.get('daily_cash', 0),
'remaining_cash': data.get('daily_cash', 0),
'last_reset': '2025-01-01T00:00:00+00:00',
'is_admin': data.get('is_admin', False)
}
user_ref.set(user_data)
return jsonify({
'success': True,
'user': {
'uid': user.uid,
'email': user.email,
**user_data
}
}), 201
except Exception as e:
return jsonify({'error': str(e)}), 400
@app.route('/api/admin/users/<string:uid>/limit', methods=['PUT'])
def update_user_limit(uid):
try:
verify_admin(request.headers.get('Authorization', ''))
data = request.get_json()
new_limit = float(data['daily_cash'])
user_ref = db.reference(f'users/{uid}')
user_data = user_ref.get()
if not user_data:
return jsonify({'error': 'User not found'}), 404
updates = {'daily_cash': new_limit}
current_remaining = user_data.get('remaining_cash', new_limit)
if current_remaining > new_limit:
updates['remaining_cash'] = new_limit
user_ref.update(updates)
return jsonify({
'success': True,
'new_daily_cash': new_limit,
'updated_remaining': updates.get('remaining_cash', current_remaining)
})
except Exception as e:
return jsonify({'error': str(e)}), 400
# ========================================
# New Admin Endpoints for Updating Remaining Cash, Setting Cash Limits, and Resetting Password
# ========================================
@app.route('/api/admin/users/<string:uid>/remaining-cash', methods=['PUT'])
def update_remaining_cash(uid):
try:
verify_admin(request.headers.get('Authorization', ''))
data = request.get_json()
if 'remaining_cash' not in data:
return jsonify({'error': 'remaining_cash is required'}), 400
new_remaining_cash = float(data['remaining_cash'])
user_ref = db.reference(f'users/{uid}')
user_data = user_ref.get()
if not user_data:
return jsonify({'error': 'User not found'}), 404
user_ref.update({'remaining_cash': new_remaining_cash})
return jsonify({'success': True, 'remaining_cash': new_remaining_cash})
except Exception as e:
return jsonify({'error': str(e)}), 400
@app.route('/api/admin/users/<string:uid>/reset-password', methods=['PUT'])
def admin_reset_password(uid):
try:
verify_admin(request.headers.get('Authorization', ''))
data = request.get_json()
new_password = data.get('new_password')
if not new_password:
return jsonify({'error': 'new_password is required'}), 400
auth.update_user(uid, password=new_password)
return jsonify({'success': True, 'message': 'Password reset successfully'})
except Exception as e:
return jsonify({'error': str(e)}), 400
@app.route('/api/admin/transactions/<string:transaction_id>', methods=['DELETE'])
def delete_transaction(transaction_id):
"""Allow admin users to delete transactions."""
try:
verify_admin(request.headers.get('Authorization', ''))
transactions_ref = db.reference(f'transactions/{transaction_id}')
transaction_data = transactions_ref.get()
if not transaction_data:
return jsonify({'error': 'Transaction not found'}), 404
transactions_ref.delete()
return jsonify({'success': True, 'message': 'Transaction deleted successfully'})
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/admin/transactions/<string:transaction_id>', methods=['PUT'])
def edit_transaction(transaction_id):
"""Allow admin users to edit transactions."""
try:
verify_admin(request.headers.get('Authorization', ''))
transactions_ref = db.reference(f'transactions/{transaction_id}')
transaction_data = transactions_ref.get()
if not transaction_data:
return jsonify({'error': 'Transaction not found'}), 404
update_data = request.get_json()
transactions_ref.update(update_data)
return jsonify({'success': True, 'updated_transaction': update_data})
except Exception as e:
return jsonify({'error': str(e)}), 500
# ========================================
# User management endpoint for profile
# ========================================
@app.route('/api/user/profile', methods=['GET'])
def get_user_profile():
try:
uid = verify_token(request.headers.get('Authorization', '').split(' ')[1])
user = auth.get_user(uid)
user_data = db.reference(f'users/{uid}').get()
return jsonify({
'uid': uid,
'email': user.email,
'daily_cash': user_data.get('daily_cash', 0),
'remaining_cash': user_data.get('remaining_cash', 0),
'last_reset': user_data.get('last_reset'),
'is_admin': user_data.get('is_admin', False)
})
except Exception as e:
uid = verify_token(request.headers.get('Authorization', '').split(' ')[1])
user = auth.get_user(uid)
user_data = db.reference(f'users/{uid}').get()
return jsonify({'error': str(e)+ f'user data: {user_data}'}), 500
# ========================================
# Receipt media endpoints
# ========================================
def get_blob_from_image_url(image_url):
parsed = urlparse(image_url)
if parsed.netloc == "storage.googleapis.com":
parts = parsed.path.strip("/").split("/", 1)
if len(parts) == 2:
bucket_name, blob_path = parts
return blob_path
prefix = f"/v0/b/{bucket.name}/o/"
if parsed.path.startswith(prefix):
encoded_blob_path = parsed.path[len(prefix):]
blob_path = unquote(encoded_blob_path)
return blob_path
return None
@app.route('/api/admin/receipt/<string:transaction_id>/view', methods=['GET'])
def view_receipt(transaction_id):
try:
verify_admin(request.headers.get('Authorization', ''))
transaction_ref = db.reference(f'transactions/{transaction_id}')
transaction_data = transaction_ref.get()
if not transaction_data:
return jsonify({'error': 'Transaction not found'}), 404
image_url = transaction_data.get('image_url')
if not image_url:
return jsonify({'error': 'No receipt image found for this transaction'}), 404
blob_path = get_blob_from_image_url(image_url)
if not blob_path:
return jsonify({'error': 'Could not determine blob path from URL'}), 500
print(f"Blob path for view: {blob_path}")
blob = bucket.blob(blob_path)
if not blob.exists():
print("Blob does not exist at path:", blob_path)
return jsonify({'error': 'Blob not found'}), 404
signed_url = blob.generate_signed_url(expiration=timedelta(minutes=10))
r = requests.get(signed_url)
if r.status_code != 200:
return jsonify({'error': 'Unable to fetch image from storage'}), 500
return send_file(io.BytesIO(r.content), mimetype='image/jpeg')
except Exception as e:
print(f"View receipt error: {str(e)}")
return jsonify({'error': str(e)}), 500
@app.route('/api/admin/receipt/<string:transaction_id>/download', methods=['GET'])
def download_receipt(transaction_id):
try:
verify_admin(request.headers.get('Authorization', ''))
transaction_ref = db.reference(f'transactions/{transaction_id}')
transaction_data = transaction_ref.get()
if not transaction_data:
return jsonify({'error': 'Transaction not found'}), 404
image_url = transaction_data.get('image_url')
if not image_url:
return jsonify({'error': 'No receipt image found for this transaction'}), 404
blob_path = get_blob_from_image_url(image_url)
if not blob_path:
return jsonify({'error': 'Could not determine blob path from URL'}), 500
print(f"Blob path for download: {blob_path}")
blob = bucket.blob(blob_path)
if not blob.exists():
print("Blob does not exist at path:", blob_path)
return jsonify({'error': 'Blob not found'}), 404
signed_url = blob.generate_signed_url(expiration=timedelta(minutes=10))
r = requests.get(signed_url)
if r.status_code != 200:
return jsonify({'error': 'Unable to fetch image from storage'}), 500
return send_file(
io.BytesIO(r.content),
mimetype='image/jpeg',
as_attachment=True,
attachment_filename='receipt.jpg'
)
except Exception as e:
print(f"Download receipt error: {str(e)}")
return jsonify({'error': str(e)}), 500
# ========================================
# Delete user endpoint
# ========================================
@app.route('/api/admin/users/<string:uid>', methods=['DELETE'])
def delete_user(uid):
try:
verify_admin(request.headers.get('Authorization', ''))
try:
user = auth.get_user(uid)
except auth.UserNotFoundError:
return jsonify({'error': 'User not found'}), 404
auth.delete_user(uid)
db.reference(f'users/{uid}').delete()
transactions_ref = db.reference('transactions')
user_transactions = transactions_ref.order_by_child('uid').equal_to(uid).get()
if user_transactions:
for transaction_id in user_transactions.keys():
transactions_ref.child(transaction_id).delete()
return jsonify({
'success': True,
'message': f'User {uid} and all associated data deleted successfully'
})
except Exception as e:
return jsonify({'error': str(e)}), 500
if __name__ == '__main__':
app.run(debug=True, host="0.0.0.0", port=7860)