TrueSpend-api / main.py
rairo's picture
Update main.py
4e8d2d3 verified
raw
history blame
12.5 kB
import os
import io
import json
import hashlib
from datetime import datetime, time
from PIL import Image
from flask import Flask, request, jsonify
from flask_cors import CORS
import google.generativeai as genai
import firebase_admin
from firebase_admin import credentials, db, storage, auth
import pytz
app = Flask(__name__)
CORS(app)
# Firebase initialization
cred = credentials.Certificate('mydateproject-e7994-firebase-adminsdk-mpa9a-2fd9c32a98.json')
firebase_admin.initialize_app(cred, {
'databaseURL': 'your-database-url',
'storageBucket': 'your-bucket-name.appspot.com'
})
# Initialize Firebase storage bucket
bucket = storage.bucket()
# Gemini API configuration
api_key = os.environ['Gemini']
def initialize_admin():
# Initialize Firebase Admin SDK
cred = credentials.Certificate('mydateproject-e7994-firebase-adminsdk-mpa9a-2fd9c32a98.json')
firebase_admin.initialize_app(cred, {
'databaseURL': 'your-database-url'
})
try:
# Create admin user in Firebase Auth
admin_email = "admin1@test.com" # Change this
admin_password = "admin123" # Change this
# Create the user
admin_user = auth.create_user(
email=admin_email,
password=admin_password
)
# Set admin custom claim
auth.set_custom_user_claims(admin_user.uid, {'admin': True})
# Create admin data in Realtime Database
admin_ref = db.reference(f'users/{admin_user.uid}')
admin_ref.set({
'email': admin_email,
'daily_cash': 0.0, # Admins typically don't need petty cash
'remaining_cash': 0.0,
'last_reset': datetime.now(pytz.UTC).isoformat(),
'is_admin': True
})
print(f"Admin account created successfully!")
print(f"Email: {admin_email}")
print(f"UID: {admin_user.uid}")
except Exception as e:
print(f"Error creating admin account: {str(e)}")
# admin create user.
@app.route('/api/admin/create-user', methods=['POST'])
def create_user():
try:
# Verify admin token
auth_header = request.headers.get('Authorization')
if not auth_header or not auth_header.startswith('Bearer '):
return jsonify({'error': 'No token provided'}), 401
token = auth_header.split('Bearer ')[1]
admin_uid = verify_token(token)
if not admin_uid:
return jsonify({'error': 'Invalid token'}), 401
# Verify admin status
admin = auth.get_user(admin_uid)
if not admin.custom_claims or not admin.custom_claims.get('admin'):
return jsonify({'error': 'Unauthorized'}), 403
# Get user creation data
data = request.json
email = data.get('email')
password = data.get('password')
daily_cash = data.get('daily_cash', 0.0)
is_admin = data.get('is_admin', False)
if not email or not password:
return jsonify({'error': 'Email and password required'}), 400
# Create user in Firebase Auth
user = auth.create_user(
email=email,
password=password
)
# Set admin claim if requested
if is_admin:
auth.set_custom_user_claims(user.uid, {'admin': True})
# Create user data in Realtime Database
user_ref = db.reference(f'users/{user.uid}')
user_ref.set({
'email': email,
'daily_cash': daily_cash,
'remaining_cash': daily_cash,
'last_reset': datetime.now(pytz.UTC).isoformat(),
'is_admin': is_admin
})
return jsonify({
'success': True,
'uid': user.uid,
'email': email
})
except Exception as e:
return jsonify({'error': str(e)}), 500
def configure_gemini(api_key):
genai.configure(api_key=api_key)
return genai.GenerativeModel('gemini-2.0-flash-thinking-exp')
def process_receipt(model, image):
prompt = """Analyze this image and determine if it's a receipt. If it is a receipt, extract:
- Total amount (as float)
- List of items purchased (array of strings)
- Date of transaction (DD/MM/YYYY format)
- Receipt number (as string)
Return JSON format with keys: is_receipt (boolean), total, items, date, receipt_number.
If not a receipt, return {"is_receipt": false}"""
response = model.generate_content([prompt, image])
return response.text
def verify_token(token):
try:
decoded_token = auth.verify_id_token(token)
return decoded_token['uid']
except Exception as e:
return None
def check_daily_reset(user_ref):
try:
user_data = user_ref.get()
if not user_data:
return False
now = datetime.now(pytz.UTC)
last_reset = datetime.fromisoformat(user_data.get('last_reset', '2000-01-01T00:00:00+00:00'))
if now.time() >= time(8,0) and last_reset.date() < now.date():
user_ref.update({
'remaining_cash': user_data['daily_cash'],
'last_reset': now.isoformat()
})
return True
return False
except Exception as e:
print(f"Reset error: {str(e)}")
return False
@app.route('/api/login', methods=['POST'])
def login():
try:
data = request.json
email = data.get('email')
password = data.get('password')
# Use Firebase Auth REST API to sign in
user = auth.get_user_by_email(email)
return jsonify({
'uid': user.uid,
'email': user.email
})
except Exception as e:
return jsonify({'error': str(e)}), 401
@app.route('/api/process-receipt', methods=['POST'])
def process_receipt_endpoint():
try:
# Verify authentication
auth_header = request.headers.get('Authorization')
if not auth_header or not auth_header.startswith('Bearer '):
return jsonify({'error': 'No token provided'}), 401
token = auth_header.split('Bearer ')[1]
uid = verify_token(token)
if not uid:
return jsonify({'error': 'Invalid token'}), 401
if 'receipt' not in request.files:
return jsonify({'error': 'No file uploaded'}), 400
file = request.files['receipt']
if file.filename == '':
return jsonify({'error': 'No file selected'}), 400
# Get user reference
user_ref = db.reference(f'users/{uid}')
user_data = user_ref.get()
if not user_data:
return jsonify({'error': 'User not found'}), 404
# Check daily reset
check_daily_reset(user_ref)
# Read file and compute hash
image_bytes = file.read()
file_hash = hashlib.md5(image_bytes).hexdigest()
# Check for duplicate receipt
transactions_ref = db.reference('transactions')
existing_transactions = transactions_ref.order_by_child('hash').equal_to(file_hash).get()
if existing_transactions:
return jsonify({'error': 'Receipt already processed'}), 400
# Process image with Gemini
image = Image.open(io.BytesIO(image_bytes))
model = configure_gemini(api_key)
result_text = process_receipt(model, image)
# Parse Gemini response
json_str = result_text[result_text.find('{'):result_text.rfind('}')+1]
data = json.loads(json_str)
if not data.get('is_receipt', False):
return jsonify({'error': 'Not a valid receipt'}), 400
# Validate total against remaining cash
total = float(data.get('total', 0))
if total > user_data['remaining_cash']:
return jsonify({'error': 'Insufficient funds'}), 400
# Upload image to Firebase Storage
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
blob = bucket.blob(f'receipts/{uid}/{timestamp}_{file_hash}.jpg')
blob.upload_from_string(image_bytes, content_type='image/jpeg')
# Update user's remaining cash
new_remaining = user_data['remaining_cash'] - total
user_ref.update({'remaining_cash': new_remaining})
# Save transaction
transaction_data = {
'uid': uid,
'total': total,
'items': data.get('items', []),
'date': data.get('date'),
'receipt_number': data.get('receipt_number'),
'timestamp': datetime.now().isoformat(),
'hash': file_hash,
'image_url': blob.public_url
}
new_transaction_ref = transactions_ref.push(transaction_data)
return jsonify({
'success': True,
'transaction_id': new_transaction_ref.key,
'remaining_cash': new_remaining,
**transaction_data
})
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/user/transactions', methods=['GET'])
def get_user_transactions():
try:
auth_header = request.headers.get('Authorization')
if not auth_header or not auth_header.startswith('Bearer '):
return jsonify({'error': 'No token provided'}), 401
token = auth_header.split('Bearer ')[1]
uid = verify_token(token)
if not uid:
return jsonify({'error': 'Invalid token'}), 401
transactions_ref = db.reference('transactions')
user_transactions = transactions_ref.order_by_child('uid').equal_to(uid).get()
return jsonify(user_transactions)
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/admin/users', methods=['GET'])
def get_all_users():
try:
auth_header = request.headers.get('Authorization')
if not auth_header or not auth_header.startswith('Bearer '):
return jsonify({'error': 'No token provided'}), 401
token = auth_header.split('Bearer ')[1]
uid = verify_token(token)
if not uid:
return jsonify({'error': 'Invalid token'}), 401
# Verify admin status
user = auth.get_user(uid)
if not user.custom_claims or not user.custom_claims.get('admin'):
return jsonify({'error': 'Unauthorized'}), 403
users_ref = db.reference('users')
users = users_ref.get()
return jsonify(users)
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/admin/update-user', methods=['POST'])
def update_user():
try:
auth_header = request.headers.get('Authorization')
if not auth_header or not auth_header.startswith('Bearer '):
return jsonify({'error': 'No token provided'}), 401
token = auth_header.split('Bearer ')[1]
admin_uid = verify_token(token)
if not admin_uid:
return jsonify({'error': 'Invalid token'}), 401
# Verify admin status
admin = auth.get_user(admin_uid)
if not admin.custom_claims or not admin.custom_claims.get('admin'):
return jsonify({'error': 'Unauthorized'}), 403
data = request.json
user_id = data.get('uid')
updates = data.get('updates', {})
user_ref = db.reference(f'users/{user_id}')
user_ref.update(updates)
return jsonify({'success': True})
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/admin/transactions', methods=['GET'])
def get_all_transactions():
try:
auth_header = request.headers.get('Authorization')
if not auth_header or not auth_header.startswith('Bearer '):
return jsonify({'error': 'No token provided'}), 401
token = auth_header.split('Bearer ')[1]
uid = verify_token(token)
if not uid:
return jsonify({'error': 'Invalid token'}), 401
# Verify admin status
user = auth.get_user(uid)
if not user.custom_claims or not user.custom_claims.get('admin'):
return jsonify({'error': 'Unauthorized'}), 403
transactions_ref = db.reference('transactions')
transactions = transactions_ref.get()
return jsonify(transactions)
except Exception as e:
return jsonify({'error': str(e)}), 500
if __name__ == '__main__':
initialize_admin()
app.run(debug=True, host="0.0.0.0", port=7860)