| import os |
| import io |
| import json |
| import hashlib |
| from datetime import datetime, time |
| from PIL import Image |
| import pytz |
| from flask import Flask, request, jsonify |
| from flask_cors import CORS |
| import google.generativeai as genai |
| import firebase_admin |
| from firebase_admin import credentials, db, storage, auth |
| import pandas as pd |
| import requests |
|
|
| app = Flask(__name__) |
| CORS(app) |
|
|
| |
| cred = credentials.Certificate('mydateproject-e7994-firebase-adminsdk-mpa9a-2fd9c32a98.json') |
| firebase_admin.initialize_app(cred, { |
| 'databaseURL': 'https://mydateproject-e7994.firebaseio.com', |
| 'storageBucket': 'mydateproject-e7994.firebasestorage.app' |
| }) |
|
|
| bucket = storage.bucket() |
| api_key = os.environ['Gemini'] |
| FIREBASE_API_KEY = os.environ.get('FIREBASE_API_KEY') |
|
|
| |
| def configure_gemini(): |
| genai.configure(api_key=api_key) |
| return genai.GenerativeModel('gemini-2.0-flash-thinking-exp') |
|
|
| def verify_token(token): |
| try: |
| decoded_token = auth.verify_id_token(token) |
| return decoded_token['uid'] |
| except Exception as e: |
| return None |
|
|
| def check_daily_reset(user_ref): |
| try: |
| user_data = user_ref.get() |
| now = datetime.now(pytz.UTC) |
| last_reset = datetime.fromisoformat(user_data.get('last_reset', '2000-01-01T00:00:00+00:00')) |
| |
| if now.time() >= time(8,0) and last_reset.date() < now.date(): |
| user_ref.update({ |
| 'remaining_cash': user_data['daily_cash'], |
| 'last_reset': now.isoformat() |
| }) |
| return True |
| return False |
| except Exception as e: |
| print(f"Reset error: {str(e)}") |
| return False |
| |
| |
| def process_receipt(model, image): |
| prompt = """Analyze this image and determine if it's a receipt. If it is a receipt, extract: |
| - Total amount (as float) |
| - List of items purchased (array of strings) |
| - Date of transaction (DD/MM/YYYY format) |
| - Receipt number (as string) |
| Return JSON format with keys: is_receipt (boolean), total, items, date, receipt_number. |
| If not a receipt, return {"is_receipt": false}""" |
| |
| try: |
| response = model.generate_content([prompt, image]) |
| return response.text |
| except Exception as e: |
| st.error(f"Gemini error: {str(e)}") |
| return "{}" |
| |
| |
| |
|
|
|
|
| |
| |
| |
|
|
| @app.route('/api/process-receipt', methods=['POST']) |
| def process_receipt_endpoint(): |
| try: |
| auth_header = request.headers.get('Authorization') |
| token = auth_header.split('Bearer ')[1] if auth_header else None |
| uid = verify_token(token) if token else None |
| |
| if not uid: |
| return jsonify({'error': 'Invalid token'}), 401 |
|
|
| user_ref = db.reference(f'users/{uid}') |
| user_data = user_ref.get() |
| check_daily_reset(user_ref) |
|
|
| |
| if request.form.get('manual_entry') == 'true': |
| return handle_manual_entry(uid, user_ref, user_data) |
|
|
| |
| file = request.files.get('receipt') |
| if not file: |
| return jsonify({'error': 'No file uploaded'}), 400 |
|
|
| image_bytes = file.read() |
| file_hash = hashlib.md5(image_bytes).hexdigest() |
|
|
| transactions_ref = db.reference('transactions') |
| existing = transactions_ref.order_by_child('hash').equal_to(file_hash).get() |
| if existing: |
| return jsonify({'error': 'Receipt already processed'}), 400 |
|
|
| image = Image.open(io.BytesIO(image_bytes)) |
| model = configure_gemini() |
| result_text = process_receipt(model, image) |
|
|
| try: |
| json_str = result_text[result_text.find('{'):result_text.rfind('}')+1] |
| data = json.loads(json_str) |
| except json.JSONDecodeError: |
| return jsonify({'error': 'Failed to parse receipt data', 'raw_response': result_text}), 400 |
|
|
| if not data.get('is_receipt', False): |
| return jsonify({'error': 'Not a valid receipt'}), 400 |
|
|
| return validate_and_save_transaction( |
| uid=uid, |
| user_data=user_data, |
| data=data, |
| file_hash=file_hash, |
| image_bytes=image_bytes, |
| manual=False |
| ) |
|
|
| except Exception as e: |
| print(image) |
| return jsonify({'error': str(e)+f'uid: {uid}, user_data: {user_dta}, data:{data}, file_hash: {file_hash},image_bytes: {image_bytes}'}), 500 |
|
|
| def handle_manual_entry(uid, user_ref, user_data): |
| try: |
| data = { |
| 'total': float(request.form.get('total')), |
| 'items': [item.strip() for item in request.form.get('items', '').split(',')], |
| 'date': request.form.get('date'), |
| 'receipt_number': request.form.get('receipt_number'), |
| 'is_receipt': True |
| } |
|
|
| return validate_and_save_transaction( |
| uid=uid, |
| user_data=user_data, |
| data=data, |
| file_hash=hashlib.md5(str(datetime.now()).encode()).hexdigest(), |
| image_bytes=None, |
| manual=True |
| ) |
| except Exception as e: |
| return jsonify({'error': str(e)}), 400 |
|
|
| def validate_and_save_transaction(uid, user_data, data, file_hash, image_bytes, manual=False): |
| transactions_ref = db.reference('transactions') |
| existing = transactions_ref.order_by_child('receipt_number').equal_to(data['receipt_number']).get() |
| |
| if existing: |
| return jsonify({'error': f"Receipt #{data['receipt_number']} already exists"}), 400 |
|
|
| total = float(data.get('total', 0)) |
| if total > user_data['remaining_cash']: |
| return jsonify({'error': 'Insufficient funds'}), 400 |
|
|
| |
| image_url = None |
| if image_bytes and not manual: |
| timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') |
| blob = bucket.blob(f'receipts/{uid}/{timestamp}_{file_hash}.jpg') |
| blob.upload_from_string(image_bytes, content_type='image/jpeg') |
| image_url = blob.public_url |
|
|
| |
| new_remaining = user_data['remaining_cash'] - total |
| db.reference(f'users/{uid}').update({'remaining_cash': new_remaining}) |
|
|
| |
| transaction_data = { |
| 'uid': uid, |
| 'total': total, |
| 'items': data.get('items', []), |
| 'date': data.get('date'), |
| 'receipt_number': data.get('receipt_number'), |
| 'timestamp': datetime.now(pytz.UTC).isoformat(), |
| 'hash': file_hash, |
| 'image_url': image_url, |
| 'manual_entry': manual |
| } |
| |
| new_transaction_ref = transactions_ref.push(transaction_data) |
| return jsonify({ |
| 'success': True, |
| 'transaction': {**transaction_data, 'id': new_transaction_ref.key}, |
| 'remaining_cash': new_remaining |
| }) |
|
|
| |
| |
| |
|
|
| @app.route('/api/user/spending-overview', methods=['GET']) |
| def get_spending_overview(): |
| try: |
| uid = verify_token(request.headers.get('Authorization', '').split(' ')[1]) |
| transactions_ref = db.reference('transactions') |
| transactions = transactions_ref.order_by_child('uid').equal_to(uid).get() |
| |
| df = pd.DataFrame(transactions.values()) |
| if df.empty: |
| return jsonify({'error': 'No transactions found'}), 404 |
|
|
| df['date'] = pd.to_datetime(df['date'], format='%d/%m/%Y', errors='coerce') |
| daily_spending = df.groupby(df['date'].dt.date)['total'].sum().reset_index() |
| |
| return jsonify({ |
| 'daily_spending': daily_spending.to_dict(orient='records'), |
| 'recent_transactions': df.sort_values(by='timestamp', ascending=False) |
| .head(10) |
| .to_dict(orient='records') |
| }) |
| except Exception as e: |
| return jsonify({'error': str(e)}), 500 |
|
|
| @app.route('/api/admin/global-overview', methods=['GET']) |
| def get_global_overview(): |
| try: |
| verify_admin(request.headers.get('Authorization', '')) |
| |
| transactions_ref = db.reference('transactions') |
| users_ref = db.reference('users') |
| |
| transactions = pd.DataFrame(transactions_ref.get().values()) |
| users = pd.DataFrame(users_ref.get().values()) |
| |
| merged = pd.merge(transactions, users, left_on='uid', right_on='uid') |
| |
| return jsonify({ |
| 'user_spending': merged.groupby('uid')['total'].sum().to_dict(), |
| 'all_transactions': transactions.sort_values(by='timestamp', ascending=False) |
| .to_dict(orient='records') |
| }) |
| except Exception as e: |
| return jsonify({'error': str(e)}), 500 |
|
|
| |
| def verify_admin(auth_header): |
| if not auth_header.startswith('Bearer '): |
| raise ValueError('Invalid token') |
| |
| token = auth_header.split(' ')[1] |
| uid = verify_token(token) |
| user = auth.get_user(uid) |
| |
| if not user.custom_claims or not user.custom_claims.get('admin'): |
| raise PermissionError('Admin access required') |
|
|
| |
| @app.route('/api/user/profile', methods=['GET']) |
| def get_user_profile(): |
| try: |
| uid = verify_token(request.headers.get('Authorization', '').split(' ')[1]) |
| user = auth.get_user(uid) |
| user_data = db.reference(f'users/{uid}').get() |
| |
| return jsonify({ |
| 'uid': uid, |
| 'email': user.email, |
| 'daily_cash': user_data.get('daily_cash', 100), |
| 'remaining_cash': user_data.get('remaining_cash', 100), |
| 'last_reset': user_data.get('last_reset'), |
| 'is_admin': user_data.get('is_admin', False) |
| }) |
| except Exception as e: |
| return jsonify({'error': str(e)}), 500 |
|
|
| if __name__ == '__main__': |
| app.run(debug=True, host="0.0.0.0", port=7860) |
|
|