Spaces:
Sleeping
Sleeping
| import os | |
| import io | |
| import json | |
| import hashlib | |
| from datetime import datetime, time | |
| from PIL import Image | |
| from flask import Flask, request, jsonify | |
| from flask_cors import CORS | |
| import google.generativeai as genai | |
| import firebase_admin | |
| from firebase_admin import credentials, db, storage, auth | |
| import pytz | |
| app = Flask(__name__) | |
| CORS(app) | |
| # Firebase initialization | |
| cred = credentials.Certificate('mydateproject-e7994-firebase-adminsdk-mpa9a-2fd9c32a98.json') | |
| firebase_admin.initialize_app(cred, { | |
| 'databaseURL': 'your-database-url', | |
| 'storageBucket': 'your-bucket-name.appspot.com' | |
| }) | |
| # Initialize Firebase storage bucket | |
| bucket = storage.bucket() | |
| # Gemini API configuration | |
| api_key = os.environ['Gemini'] | |
| def initialize_admin(): | |
| # Initialize Firebase Admin SDK | |
| cred = credentials.Certificate('mydateproject-e7994-firebase-adminsdk-mpa9a-2fd9c32a98.json') | |
| firebase_admin.initialize_app(cred, { | |
| 'databaseURL': 'your-database-url' | |
| }) | |
| try: | |
| # Create admin user in Firebase Auth | |
| admin_email = "admin1@test.com" # Change this | |
| admin_password = "admin123" # Change this | |
| # Create the user | |
| admin_user = auth.create_user( | |
| email=admin_email, | |
| password=admin_password | |
| ) | |
| # Set admin custom claim | |
| auth.set_custom_user_claims(admin_user.uid, {'admin': True}) | |
| # Create admin data in Realtime Database | |
| admin_ref = db.reference(f'users/{admin_user.uid}') | |
| admin_ref.set({ | |
| 'email': admin_email, | |
| 'daily_cash': 0.0, # Admins typically don't need petty cash | |
| 'remaining_cash': 0.0, | |
| 'last_reset': datetime.now(pytz.UTC).isoformat(), | |
| 'is_admin': True | |
| }) | |
| print(f"Admin account created successfully!") | |
| print(f"Email: {admin_email}") | |
| print(f"UID: {admin_user.uid}") | |
| except Exception as e: | |
| print(f"Error creating admin account: {str(e)}") | |
| # admin create user. | |
| def create_user(): | |
| try: | |
| # Verify admin token | |
| auth_header = request.headers.get('Authorization') | |
| if not auth_header or not auth_header.startswith('Bearer '): | |
| return jsonify({'error': 'No token provided'}), 401 | |
| token = auth_header.split('Bearer ')[1] | |
| admin_uid = verify_token(token) | |
| if not admin_uid: | |
| return jsonify({'error': 'Invalid token'}), 401 | |
| # Verify admin status | |
| admin = auth.get_user(admin_uid) | |
| if not admin.custom_claims or not admin.custom_claims.get('admin'): | |
| return jsonify({'error': 'Unauthorized'}), 403 | |
| # Get user creation data | |
| data = request.json | |
| email = data.get('email') | |
| password = data.get('password') | |
| daily_cash = data.get('daily_cash', 0.0) | |
| is_admin = data.get('is_admin', False) | |
| if not email or not password: | |
| return jsonify({'error': 'Email and password required'}), 400 | |
| # Create user in Firebase Auth | |
| user = auth.create_user( | |
| email=email, | |
| password=password | |
| ) | |
| # Set admin claim if requested | |
| if is_admin: | |
| auth.set_custom_user_claims(user.uid, {'admin': True}) | |
| # Create user data in Realtime Database | |
| user_ref = db.reference(f'users/{user.uid}') | |
| user_ref.set({ | |
| 'email': email, | |
| 'daily_cash': daily_cash, | |
| 'remaining_cash': daily_cash, | |
| 'last_reset': datetime.now(pytz.UTC).isoformat(), | |
| 'is_admin': is_admin | |
| }) | |
| return jsonify({ | |
| 'success': True, | |
| 'uid': user.uid, | |
| 'email': email | |
| }) | |
| except Exception as e: | |
| return jsonify({'error': str(e)}), 500 | |
| def configure_gemini(api_key): | |
| genai.configure(api_key=api_key) | |
| return genai.GenerativeModel('gemini-2.0-flash-thinking-exp') | |
| def process_receipt(model, image): | |
| prompt = """Analyze this image and determine if it's a receipt. If it is a receipt, extract: | |
| - Total amount (as float) | |
| - List of items purchased (array of strings) | |
| - Date of transaction (DD/MM/YYYY format) | |
| - Receipt number (as string) | |
| Return JSON format with keys: is_receipt (boolean), total, items, date, receipt_number. | |
| If not a receipt, return {"is_receipt": false}""" | |
| response = model.generate_content([prompt, image]) | |
| return response.text | |
| def verify_token(token): | |
| try: | |
| decoded_token = auth.verify_id_token(token) | |
| return decoded_token['uid'] | |
| except Exception as e: | |
| return None | |
| def check_daily_reset(user_ref): | |
| try: | |
| user_data = user_ref.get() | |
| if not user_data: | |
| return False | |
| now = datetime.now(pytz.UTC) | |
| last_reset = datetime.fromisoformat(user_data.get('last_reset', '2000-01-01T00:00:00+00:00')) | |
| if now.time() >= time(8,0) and last_reset.date() < now.date(): | |
| user_ref.update({ | |
| 'remaining_cash': user_data['daily_cash'], | |
| 'last_reset': now.isoformat() | |
| }) | |
| return True | |
| return False | |
| except Exception as e: | |
| print(f"Reset error: {str(e)}") | |
| return False | |
| def login(): | |
| try: | |
| data = request.json | |
| email = data.get('email') | |
| password = data.get('password') | |
| # Use Firebase Auth REST API to sign in | |
| user = auth.get_user_by_email(email) | |
| return jsonify({ | |
| 'uid': user.uid, | |
| 'email': user.email | |
| }) | |
| except Exception as e: | |
| return jsonify({'error': str(e)}), 401 | |
| def process_receipt_endpoint(): | |
| try: | |
| # Verify authentication | |
| auth_header = request.headers.get('Authorization') | |
| if not auth_header or not auth_header.startswith('Bearer '): | |
| return jsonify({'error': 'No token provided'}), 401 | |
| token = auth_header.split('Bearer ')[1] | |
| uid = verify_token(token) | |
| if not uid: | |
| return jsonify({'error': 'Invalid token'}), 401 | |
| if 'receipt' not in request.files: | |
| return jsonify({'error': 'No file uploaded'}), 400 | |
| file = request.files['receipt'] | |
| if file.filename == '': | |
| return jsonify({'error': 'No file selected'}), 400 | |
| # Get user reference | |
| user_ref = db.reference(f'users/{uid}') | |
| user_data = user_ref.get() | |
| if not user_data: | |
| return jsonify({'error': 'User not found'}), 404 | |
| # Check daily reset | |
| check_daily_reset(user_ref) | |
| # Read file and compute hash | |
| image_bytes = file.read() | |
| file_hash = hashlib.md5(image_bytes).hexdigest() | |
| # Check for duplicate receipt | |
| transactions_ref = db.reference('transactions') | |
| existing_transactions = transactions_ref.order_by_child('hash').equal_to(file_hash).get() | |
| if existing_transactions: | |
| return jsonify({'error': 'Receipt already processed'}), 400 | |
| # Process image with Gemini | |
| image = Image.open(io.BytesIO(image_bytes)) | |
| model = configure_gemini(api_key) | |
| result_text = process_receipt(model, image) | |
| # Parse Gemini response | |
| json_str = result_text[result_text.find('{'):result_text.rfind('}')+1] | |
| data = json.loads(json_str) | |
| if not data.get('is_receipt', False): | |
| return jsonify({'error': 'Not a valid receipt'}), 400 | |
| # Validate total against remaining cash | |
| total = float(data.get('total', 0)) | |
| if total > user_data['remaining_cash']: | |
| return jsonify({'error': 'Insufficient funds'}), 400 | |
| # Upload image to Firebase Storage | |
| timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') | |
| blob = bucket.blob(f'receipts/{uid}/{timestamp}_{file_hash}.jpg') | |
| blob.upload_from_string(image_bytes, content_type='image/jpeg') | |
| # Update user's remaining cash | |
| new_remaining = user_data['remaining_cash'] - total | |
| user_ref.update({'remaining_cash': new_remaining}) | |
| # Save transaction | |
| transaction_data = { | |
| 'uid': uid, | |
| 'total': total, | |
| 'items': data.get('items', []), | |
| 'date': data.get('date'), | |
| 'receipt_number': data.get('receipt_number'), | |
| 'timestamp': datetime.now().isoformat(), | |
| 'hash': file_hash, | |
| 'image_url': blob.public_url | |
| } | |
| new_transaction_ref = transactions_ref.push(transaction_data) | |
| return jsonify({ | |
| 'success': True, | |
| 'transaction_id': new_transaction_ref.key, | |
| 'remaining_cash': new_remaining, | |
| **transaction_data | |
| }) | |
| except Exception as e: | |
| return jsonify({'error': str(e)}), 500 | |
| def get_user_transactions(): | |
| try: | |
| auth_header = request.headers.get('Authorization') | |
| if not auth_header or not auth_header.startswith('Bearer '): | |
| return jsonify({'error': 'No token provided'}), 401 | |
| token = auth_header.split('Bearer ')[1] | |
| uid = verify_token(token) | |
| if not uid: | |
| return jsonify({'error': 'Invalid token'}), 401 | |
| transactions_ref = db.reference('transactions') | |
| user_transactions = transactions_ref.order_by_child('uid').equal_to(uid).get() | |
| return jsonify(user_transactions) | |
| except Exception as e: | |
| return jsonify({'error': str(e)}), 500 | |
| def get_all_users(): | |
| try: | |
| auth_header = request.headers.get('Authorization') | |
| if not auth_header or not auth_header.startswith('Bearer '): | |
| return jsonify({'error': 'No token provided'}), 401 | |
| token = auth_header.split('Bearer ')[1] | |
| uid = verify_token(token) | |
| if not uid: | |
| return jsonify({'error': 'Invalid token'}), 401 | |
| # Verify admin status | |
| user = auth.get_user(uid) | |
| if not user.custom_claims or not user.custom_claims.get('admin'): | |
| return jsonify({'error': 'Unauthorized'}), 403 | |
| users_ref = db.reference('users') | |
| users = users_ref.get() | |
| return jsonify(users) | |
| except Exception as e: | |
| return jsonify({'error': str(e)}), 500 | |
| def update_user(): | |
| try: | |
| auth_header = request.headers.get('Authorization') | |
| if not auth_header or not auth_header.startswith('Bearer '): | |
| return jsonify({'error': 'No token provided'}), 401 | |
| token = auth_header.split('Bearer ')[1] | |
| admin_uid = verify_token(token) | |
| if not admin_uid: | |
| return jsonify({'error': 'Invalid token'}), 401 | |
| # Verify admin status | |
| admin = auth.get_user(admin_uid) | |
| if not admin.custom_claims or not admin.custom_claims.get('admin'): | |
| return jsonify({'error': 'Unauthorized'}), 403 | |
| data = request.json | |
| user_id = data.get('uid') | |
| updates = data.get('updates', {}) | |
| user_ref = db.reference(f'users/{user_id}') | |
| user_ref.update(updates) | |
| return jsonify({'success': True}) | |
| except Exception as e: | |
| return jsonify({'error': str(e)}), 500 | |
| def get_all_transactions(): | |
| try: | |
| auth_header = request.headers.get('Authorization') | |
| if not auth_header or not auth_header.startswith('Bearer '): | |
| return jsonify({'error': 'No token provided'}), 401 | |
| token = auth_header.split('Bearer ')[1] | |
| uid = verify_token(token) | |
| if not uid: | |
| return jsonify({'error': 'Invalid token'}), 401 | |
| # Verify admin status | |
| user = auth.get_user(uid) | |
| if not user.custom_claims or not user.custom_claims.get('admin'): | |
| return jsonify({'error': 'Unauthorized'}), 403 | |
| transactions_ref = db.reference('transactions') | |
| transactions = transactions_ref.get() | |
| return jsonify(transactions) | |
| except Exception as e: | |
| return jsonify({'error': str(e)}), 500 | |
| if __name__ == '__main__': | |
| initialize_admin() | |
| app.run(debug=True, host="0.0.0.0", port=7860) |