TrueSpend-api / main.py
rairo's picture
Update main.py
c19b014 verified
raw
history blame
21.6 kB
import os
import io
import json
import hashlib
from datetime import datetime, time, timedelta
from PIL import Image
import pytz
from flask import Flask, request, jsonify, send_file
from flask_cors import CORS
import google.generativeai as genai
import firebase_admin
from firebase_admin import credentials, db, storage, auth
import pandas as pd
import requests
from urllib.parse import urlparse, unquote
app = Flask(__name__)
CORS(app)
# Firebase initialization
cred = credentials.Certificate('mydateproject-e7994-firebase-adminsdk-mpa9a-2fd9c32a98.json')
firebase_admin.initialize_app(cred, {
'databaseURL': 'https://mydateproject-e7994.firebaseio.com',
'storageBucket': 'mydateproject-e7994.firebasestorage.app'
})
bucket = storage.bucket()
api_key = os.environ['Gemini']
FIREBASE_API_KEY = os.environ.get('FIREBASE_API_KEY')
# Helper functions
def configure_gemini():
genai.configure(api_key=api_key)
return genai.GenerativeModel('gemini-2.0-flash-thinking-exp')
def verify_token(token):
try:
decoded_token = auth.verify_id_token(token)
return decoded_token['uid']
except Exception as e:
return None
def check_daily_reset(user_ref):
try:
user_data = user_ref.get()
now = datetime.now(pytz.UTC)
last_reset = datetime.fromisoformat(user_data.get('last_reset', '2000-01-01T00:00:00+00:00'))
if now.time() >= time(8,0) and last_reset.date() < now.date():
user_ref.update({
'remaining_cash': user_data['daily_cash'],
'last_reset': now.isoformat()
})
return True
return False
except Exception as e:
print(f"Reset error: {str(e)}")
return False
# Process receipt image
def process_receipt(model, image):
prompt = """Analyze this image and determine if it's a receipt. If it is a receipt, extract:
- Total amount (as float)
- List of items purchased (array of strings)
- Date of transaction (DD/MM/YYYY format)
- Receipt number (as string)
Return JSON format with keys: is_receipt (boolean), total, items, date, receipt_number.
If not a receipt, return {"is_receipt": false}"""
try:
response = model.generate_content([prompt, image])
return response.text
except Exception as e:
print(f"Gemini error: {str(e)}")
return "{}"
# Write report
@app.route('/api/write-report', methods=['POST'])
def generate_report():
prompt = """You are the TrueSpend AI analyst, analyze this transaction data
and write an insightful business report on the spending habits of the employees.
Make sure the report is in plain text"""
model = configure_gemini()
try:
transaction_data = request.get_json()
transaction_json_string = json.dumps(transaction_data['transactions'])
response = model.generate_content([prompt, transaction_json_string])
report = response.text
return jsonify({"report": report})
except Exception as e:
return jsonify({"error": str(e)}), 500
# ========================================
# Authentication Endpoints
# ========================================
# (Any existing authentication endpoints remain unchanged)
# ========================================
# Receipt Processing Endpoint (unchanged)
# ========================================
@app.route('/api/process-receipt', methods=['POST'])
def process_receipt_endpoint():
try:
auth_header = request.headers.get('Authorization')
token = auth_header.split('Bearer ')[1] if auth_header else None
uid = verify_token(token) if token else None
if not uid:
return jsonify({'error': 'Invalid token'}), 401
user_ref = db.reference(f'users/{uid}')
user_data = user_ref.get()
# Removed: check_daily_reset(user_ref)
# Handle manual entry
if request.form.get('manual_entry') == 'true':
return handle_manual_entry(uid, user_ref, user_data)
# Handle image processing
file = request.files.get('receipt')
if not file:
return jsonify({'error': 'No file uploaded'}), 400
image_bytes = file.read()
file_hash = hashlib.md5(image_bytes).hexdigest()
transactions_ref = db.reference('transactions')
existing = transactions_ref.order_by_child('hash').equal_to(file_hash).get()
if existing:
return jsonify({'error': 'Receipt already processed'}), 400
image = Image.open(io.BytesIO(image_bytes))
model = configure_gemini()
result_text = process_receipt(model, image)
try:
json_str = result_text[result_text.find('{'):result_text.rfind('}')+1]
data = json.loads(json_str)
except json.JSONDecodeError:
return jsonify({'error': 'Failed to parse receipt data', 'raw_response': result_text}), 400
if not data.get('is_receipt', False):
return jsonify({'error': 'Not a valid receipt'}), 400
return validate_and_save_transaction(
uid=uid,
user_data=user_data,
data=data,
file_hash=file_hash,
image_bytes=image_bytes,
manual=False
)
except Exception as e:
print(e)
return jsonify({'error': str(e)}), 500
def handle_manual_entry(uid, user_ref, user_data):
try:
data = {
'total': float(request.form.get('total')),
'items': [item.strip() for item in request.form.get('items', '').split(',')],
'date': request.form.get('date'),
'receipt_number': request.form.get('receipt_number'),
'is_receipt': True
}
return validate_and_save_transaction(
uid=uid,
user_data=user_data,
data=data,
file_hash=hashlib.md5(str(datetime.now()).encode()).hexdigest(),
image_bytes=None,
manual=True
)
except Exception as e:
return jsonify({'error': str(e)}), 400
def validate_and_save_transaction(uid, user_data, data, file_hash, image_bytes, manual=False):
transactions_ref = db.reference('transactions')
receipt_number = data.get('receipt_number')
items = data.get('items', [])
# Check for duplicate transactions based on receipt number and items.
existing_transactions_with_receipt = transactions_ref.order_by_child('receipt_number').equal_to(receipt_number).get()
if existing_transactions_with_receipt:
for transaction_id, existing_transaction_data in existing_transactions_with_receipt.items():
existing_items = sorted(existing_transaction_data.get('items', []))
current_items = sorted(items)
if existing_items == current_items:
return jsonify({'error': f"Transaction with Receipt #{receipt_number} and identical items already exists"}), 400
total = float(data.get('total', 0))
# Removed the check for insufficient funds to allow negative balances
# Upload image if available
image_url = None
if image_bytes and not manual:
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
blob = bucket.blob(f'receipts/{uid}/{timestamp}_{file_hash}.jpg')
blob.upload_from_string(image_bytes, content_type='image/jpeg')
image_url = blob.public_url
# Update user cash - now allowing negative values
new_remaining = user_data['remaining_cash'] - total
db.reference(f'users/{uid}').update({'remaining_cash': new_remaining})
transaction_data = {
'uid': uid,
'total': total,
'items': items,
'date': data.get('date'),
'receipt_number': receipt_number,
'timestamp': datetime.now(pytz.UTC).isoformat(),
'hash': file_hash,
'image_url': image_url,
'manual_entry': manual
}
new_transaction_ref = transactions_ref.push(transaction_data)
return jsonify({
'success': True,
'transaction': {**transaction_data, 'id': new_transaction_ref.key},
'remaining_cash': new_remaining
})
# ========================================
# Data Endpoints for Visualizations
# ========================================
@app.route('/api/user/spending-overview', methods=['GET'])
def get_spending_overview():
try:
uid = verify_token(request.headers.get('Authorization', '').split(' ')[1])
transactions_ref = db.reference('transactions')
transactions = transactions_ref.order_by_child('uid').equal_to(uid).get()
if transactions is None:
return jsonify({
'daily_spending': [],
'recent_transactions': []
})
df = pd.DataFrame.from_dict(transactions, orient='index')
if df.empty:
return jsonify({
'daily_spending': [],
'recent_transactions': []
})
df.dropna(subset=['uid','total','items','date', 'receipt_number', 'timestamp','hash', 'manual_entry'], inplace=True)
if df.empty:
return jsonify({
'daily_spending': [],
'recent_transactions': []
})
df['date'] = pd.to_datetime(df['date'], format='%d/%m/%Y', errors='coerce')
df.dropna(subset=['date'], inplace=True)
if df.empty:
return jsonify({
'daily_spending': [],
'recent_transactions': []
})
daily_spending = df.groupby(df['date'].dt.date)['total'].sum().reset_index()
return jsonify({
'daily_spending': daily_spending.to_dict(orient='records'),
'recent_transactions': df.sort_values(by='timestamp', ascending=False)
.head(10)
.to_dict(orient='records')
})
except Exception as e:
return jsonify({'error': str(e)}), 500
# ========================================
# Modified verify_admin function (now checks database is_admin flag)
# ========================================
def verify_admin(auth_header):
if not auth_header or not auth_header.startswith('Bearer '):
raise ValueError('Invalid token')
token = auth_header.split(' ')[1]
uid = verify_token(token)
if not uid:
raise PermissionError('Invalid user')
user_ref = db.reference(f'users/{uid}')
user_data = user_ref.get()
if not user_data or not user_data.get('is_admin', False):
raise PermissionError('Admin access required')
try:
auth.set_custom_user_claims(uid, {"admin": True})
print(f"Custom admin claim set for user {uid}")
except Exception as e:
print(f"Error setting custom admin claim: {e}")
raise PermissionError('Error setting admin claim, but admin verified')
return uid
# ========================================
# Existing Admin Endpoints
# ========================================
@app.route('/api/admin/overview', methods=['GET'])
def get_admin_overview():
try:
verify_admin(request.headers.get('Authorization', ''))
users_ref = db.reference('users')
all_users = users_ref.get() or {}
users_list = []
for uid, user_data in all_users.items():
try:
auth_user = auth.get_user(uid)
email = auth_user.email
except:
email = "Deleted User"
users_list.append({
'uid': uid,
'email': email,
'daily_cash': user_data.get('daily_cash', 0),
'remaining_cash': user_data.get('remaining_cash', 0),
'last_reset': user_data.get('last_reset'),
'is_admin': user_data.get('is_admin', False)
})
transactions_ref = db.reference('transactions')
all_transactions = transactions_ref.get() or {}
transactions_list = [{'id': tid, **data} for tid, data in all_transactions.items()]
return jsonify({
'users': users_list,
'transactions': transactions_list,
'analytics': {
'total_users': len(users_list),
'total_transactions': len(transactions_list),
'total_spent': sum(t['total'] for t in transactions_list)
}
})
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/admin/users', methods=['POST'])
def create_user():
try:
verify_admin(request.headers.get('Authorization', ''))
data = request.get_json()
user = auth.create_user(
email=data['email'],
password=data['password']
)
user_ref = db.reference(f'users/{user.uid}')
user_data = {
'daily_cash': data.get('daily_cash', 0),
'remaining_cash': data.get('daily_cash', 0),
'last_reset': '2025-01-01T00:00:00+00:00',
'is_admin': data.get('is_admin', False)
}
user_ref.set(user_data)
return jsonify({
'success': True,
'user': {
'uid': user.uid,
'email': user.email,
**user_data
}
}), 201
except Exception as e:
return jsonify({'error': str(e)}), 400
@app.route('/api/admin/users/<string:uid>/limit', methods=['PUT'])
def update_user_limit(uid):
try:
verify_admin(request.headers.get('Authorization', ''))
data = request.get_json()
new_limit = float(data['daily_cash'])
user_ref = db.reference(f'users/{uid}')
user_data = user_ref.get()
if not user_data:
return jsonify({'error': 'User not found'}), 404
updates = {'daily_cash': new_limit}
current_remaining = user_data.get('remaining_cash', new_limit)
if current_remaining > new_limit:
updates['remaining_cash'] = new_limit
user_ref.update(updates)
return jsonify({
'success': True,
'new_daily_cash': new_limit,
'updated_remaining': updates.get('remaining_cash', current_remaining)
})
except Exception as e:
return jsonify({'error': str(e)}), 400
# ========================================
# New Admin Endpoints for Updating Remaining Cash, Setting Cash Limits, and Resetting Password
# ========================================
@app.route('/api/admin/users/<string:uid>/remaining-cash', methods=['PUT'])
def update_remaining_cash(uid):
try:
verify_admin(request.headers.get('Authorization', ''))
data = request.get_json()
if 'remaining_cash' not in data:
return jsonify({'error': 'remaining_cash is required'}), 400
new_remaining_cash = float(data['remaining_cash'])
user_ref = db.reference(f'users/{uid}')
user_data = user_ref.get()
if not user_data:
return jsonify({'error': 'User not found'}), 404
user_ref.update({'remaining_cash': new_remaining_cash})
return jsonify({'success': True, 'remaining_cash': new_remaining_cash})
except Exception as e:
return jsonify({'error': str(e)}), 400
@app.route('/api/admin/users/<string:uid>/reset-password', methods=['PUT'])
def admin_reset_password(uid):
try:
verify_admin(request.headers.get('Authorization', ''))
data = request.get_json()
new_password = data.get('new_password')
if not new_password:
return jsonify({'error': 'new_password is required'}), 400
auth.update_user(uid, password=new_password)
return jsonify({'success': True, 'message': 'Password reset successfully'})
except Exception as e:
return jsonify({'error': str(e)}), 400
# ========================================
# User management endpoint for profile
# ========================================
@app.route('/api/user/profile', methods=['GET'])
def get_user_profile():
try:
uid = verify_token(request.headers.get('Authorization', '').split(' ')[1])
user = auth.get_user(uid)
user_data = db.reference(f'users/{uid}').get()
return jsonify({
'uid': uid,
'email': user.email,
'daily_cash': user_data.get('daily_cash', 0),
'remaining_cash': user_data.get('remaining_cash', 0),
'last_reset': user_data.get('last_reset'),
'is_admin': user_data.get('is_admin', False)
})
except Exception as e:
uid = verify_token(request.headers.get('Authorization', '').split(' ')[1])
user = auth.get_user(uid)
user_data = db.reference(f'users/{uid}').get()
return jsonify({'error': str(e)+ f'user data: {user_data}'}), 500
# ========================================
# Receipt media endpoints
# ========================================
def get_blob_from_image_url(image_url):
parsed = urlparse(image_url)
if parsed.netloc == "storage.googleapis.com":
parts = parsed.path.strip("/").split("/", 1)
if len(parts) == 2:
bucket_name, blob_path = parts
return blob_path
prefix = f"/v0/b/{bucket.name}/o/"
if parsed.path.startswith(prefix):
encoded_blob_path = parsed.path[len(prefix):]
blob_path = unquote(encoded_blob_path)
return blob_path
return None
@app.route('/api/admin/receipt/<string:transaction_id>/view', methods=['GET'])
def view_receipt(transaction_id):
try:
verify_admin(request.headers.get('Authorization', ''))
transaction_ref = db.reference(f'transactions/{transaction_id}')
transaction_data = transaction_ref.get()
if not transaction_data:
return jsonify({'error': 'Transaction not found'}), 404
image_url = transaction_data.get('image_url')
if not image_url:
return jsonify({'error': 'No receipt image found for this transaction'}), 404
blob_path = get_blob_from_image_url(image_url)
if not blob_path:
return jsonify({'error': 'Could not determine blob path from URL'}), 500
print(f"Blob path for view: {blob_path}")
blob = bucket.blob(blob_path)
if not blob.exists():
print("Blob does not exist at path:", blob_path)
return jsonify({'error': 'Blob not found'}), 404
signed_url = blob.generate_signed_url(expiration=timedelta(minutes=10))
r = requests.get(signed_url)
if r.status_code != 200:
return jsonify({'error': 'Unable to fetch image from storage'}), 500
return send_file(io.BytesIO(r.content), mimetype='image/jpeg')
except Exception as e:
print(f"View receipt error: {str(e)}")
return jsonify({'error': str(e)}), 500
@app.route('/api/admin/receipt/<string:transaction_id>/download', methods=['GET'])
def download_receipt(transaction_id):
try:
verify_admin(request.headers.get('Authorization', ''))
transaction_ref = db.reference(f'transactions/{transaction_id}')
transaction_data = transaction_ref.get()
if not transaction_data:
return jsonify({'error': 'Transaction not found'}), 404
image_url = transaction_data.get('image_url')
if not image_url:
return jsonify({'error': 'No receipt image found for this transaction'}), 404
blob_path = get_blob_from_image_url(image_url)
if not blob_path:
return jsonify({'error': 'Could not determine blob path from URL'}), 500
print(f"Blob path for download: {blob_path}")
blob = bucket.blob(blob_path)
if not blob.exists():
print("Blob does not exist at path:", blob_path)
return jsonify({'error': 'Blob not found'}), 404
signed_url = blob.generate_signed_url(expiration=timedelta(minutes=10))
r = requests.get(signed_url)
if r.status_code != 200:
return jsonify({'error': 'Unable to fetch image from storage'}), 500
return send_file(
io.BytesIO(r.content),
mimetype='image/jpeg',
as_attachment=True,
attachment_filename='receipt.jpg'
)
except Exception as e:
print(f"Download receipt error: {str(e)}")
return jsonify({'error': str(e)}), 500
# ========================================
# Delete user endpoint
# ========================================
@app.route('/api/admin/users/<string:uid>', methods=['DELETE'])
def delete_user(uid):
try:
verify_admin(request.headers.get('Authorization', ''))
try:
user = auth.get_user(uid)
except auth.UserNotFoundError:
return jsonify({'error': 'User not found'}), 404
auth.delete_user(uid)
db.reference(f'users/{uid}').delete()
transactions_ref = db.reference('transactions')
user_transactions = transactions_ref.order_by_child('uid').equal_to(uid).get()
if user_transactions:
for transaction_id in user_transactions.keys():
transactions_ref.child(transaction_id).delete()
return jsonify({
'success': True,
'message': f'User {uid} and all associated data deleted successfully'
})
except Exception as e:
return jsonify({'error': str(e)}), 500
if __name__ == '__main__':
app.run(debug=True, host="0.0.0.0", port=7860)