| import os |
| import io |
| import json |
| import hashlib |
| from datetime import datetime, time, timedelta |
| from PIL import Image |
| import pytz |
| from flask import Flask, request, jsonify, send_file |
| from flask_cors import CORS |
| import google.generativeai as genai |
| import firebase_admin |
| from firebase_admin import credentials, db, storage, auth |
| import pandas as pd |
| import requests |
| from urllib.parse import urlparse, unquote |
|
|
| app = Flask(__name__) |
| CORS(app) |
|
|
| |
| cred = credentials.Certificate('mydateproject-e7994-firebase-adminsdk-mpa9a-2fd9c32a98.json') |
| firebase_admin.initialize_app(cred, { |
| 'databaseURL': 'https://mydateproject-e7994.firebaseio.com', |
| 'storageBucket': 'mydateproject-e7994.firebasestorage.app' |
| }) |
|
|
| bucket = storage.bucket() |
| api_key = os.environ['Gemini'] |
| FIREBASE_API_KEY = os.environ.get('FIREBASE_API_KEY') |
|
|
|
|
| |
| def configure_gemini(): |
| genai.configure(api_key=api_key) |
| return genai.GenerativeModel('gemini-2.0-flash-thinking-exp') |
|
|
| def verify_token(token): |
| try: |
| decoded_token = auth.verify_id_token(token) |
| return decoded_token['uid'] |
| except Exception as e: |
| return None |
|
|
| def check_daily_reset(user_ref): |
| try: |
| user_data = user_ref.get() |
| now = datetime.now(pytz.UTC) |
| last_reset = datetime.fromisoformat(user_data.get('last_reset', '2000-01-01T00:00:00+00:00')) |
| |
| if now.time() >= time(8,0) and last_reset.date() < now.date(): |
| user_ref.update({ |
| 'remaining_cash': user_data['daily_cash'], |
| 'last_reset': now.isoformat() |
| }) |
| return True |
| return False |
| except Exception as e: |
| print(f"Reset error: {str(e)}") |
| return False |
| |
| |
| def process_receipt(model, image): |
| prompt = """Analyze this image and determine if it's a receipt. If it is a receipt, extract: |
| - Total amount (as float) |
| - List of items purchased (array of strings) |
| - Date of transaction (DD/MM/YYYY format) |
| - Receipt number (as string) |
| Return JSON format with keys: is_receipt (boolean), total, items, date, receipt_number. |
| If not a receipt, return {"is_receipt": false}""" |
| |
| try: |
| response = model.generate_content([prompt, image]) |
| return response.text |
| except Exception as e: |
| print(f"Gemini error: {str(e)}") |
| return "{}" |
|
|
|
|
| |
| @app.route('/api/write-report', methods=['POST']) |
| def generate_report(): |
| prompt = """You are the TrueSpend AI analyst, analyze this transaction data |
| and write an insightful business report on the spending habits of the employees. |
| Make sure the report is in plain text""" |
| model = configure_gemini() |
| try: |
| transaction_data = request.get_json() |
| transaction_json_string = json.dumps(transaction_data['transactions']) |
| response = model.generate_content([prompt, transaction_json_string]) |
| report = response.text |
| return jsonify({"report": report}) |
| except Exception as e: |
| return jsonify({"error": str(e)}), 500 |
|
|
| |
| |
| |
| |
|
|
| |
| |
| |
|
|
| @app.route('/api/process-receipt', methods=['POST']) |
| def process_receipt_endpoint(): |
| try: |
| auth_header = request.headers.get('Authorization') |
| token = auth_header.split('Bearer ')[1] if auth_header else None |
| uid = verify_token(token) if token else None |
| |
| if not uid: |
| return jsonify({'error': 'Invalid token'}), 401 |
|
|
| user_ref = db.reference(f'users/{uid}') |
| user_data = user_ref.get() |
| |
|
|
| |
| if request.form.get('manual_entry') == 'true': |
| return handle_manual_entry(uid, user_ref, user_data) |
|
|
| |
| file = request.files.get('receipt') |
| if not file: |
| return jsonify({'error': 'No file uploaded'}), 400 |
|
|
| image_bytes = file.read() |
| file_hash = hashlib.md5(image_bytes).hexdigest() |
|
|
| transactions_ref = db.reference('transactions') |
| existing = transactions_ref.order_by_child('hash').equal_to(file_hash).get() |
| if existing: |
| return jsonify({'error': 'Receipt already processed'}), 400 |
|
|
| image = Image.open(io.BytesIO(image_bytes)) |
| model = configure_gemini() |
| result_text = process_receipt(model, image) |
|
|
| try: |
| json_str = result_text[result_text.find('{'):result_text.rfind('}')+1] |
| data = json.loads(json_str) |
| except json.JSONDecodeError: |
| return jsonify({'error': 'Failed to parse receipt data', 'raw_response': result_text}), 400 |
|
|
| if not data.get('is_receipt', False): |
| return jsonify({'error': 'Not a valid receipt'}), 400 |
|
|
| return validate_and_save_transaction( |
| uid=uid, |
| user_data=user_data, |
| data=data, |
| file_hash=file_hash, |
| image_bytes=image_bytes, |
| manual=False |
| ) |
|
|
| except Exception as e: |
| print(e) |
| return jsonify({'error': str(e)}), 500 |
|
|
| |
| def handle_manual_entry(uid, user_ref, user_data): |
| try: |
| data = { |
| 'total': float(request.form.get('total')), |
| 'items': [item.strip() for item in request.form.get('items', '').split(',')], |
| 'date': request.form.get('date'), |
| 'receipt_number': request.form.get('receipt_number'), |
| 'is_receipt': True |
| } |
|
|
| return validate_and_save_transaction( |
| uid=uid, |
| user_data=user_data, |
| data=data, |
| file_hash=hashlib.md5(str(datetime.now()).encode()).hexdigest(), |
| image_bytes=None, |
| manual=True |
| ) |
| except Exception as e: |
| return jsonify({'error': str(e)}), 400 |
|
|
| def validate_and_save_transaction(uid, user_data, data, file_hash, image_bytes, manual=False): |
| transactions_ref = db.reference('transactions') |
| receipt_number = data.get('receipt_number') |
| items = data.get('items', []) |
|
|
| |
| existing_transactions_with_receipt = transactions_ref.order_by_child('receipt_number').equal_to(receipt_number).get() |
|
|
| if existing_transactions_with_receipt: |
| for transaction_id, existing_transaction_data in existing_transactions_with_receipt.items(): |
| existing_items = sorted(existing_transaction_data.get('items', [])) |
| current_items = sorted(items) |
| if existing_items == current_items: |
| return jsonify({'error': f"Transaction with Receipt #{receipt_number} and identical items already exists"}), 400 |
|
|
| total = float(data.get('total', 0)) |
| |
| |
| |
| |
| image_url = None |
| if image_bytes and not manual: |
| timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') |
| blob = bucket.blob(f'receipts/{uid}/{timestamp}_{file_hash}.jpg') |
| blob.upload_from_string(image_bytes, content_type='image/jpeg') |
| image_url = blob.public_url |
|
|
| |
| new_remaining = user_data['remaining_cash'] - total |
| db.reference(f'users/{uid}').update({'remaining_cash': new_remaining}) |
|
|
| transaction_data = { |
| 'uid': uid, |
| 'total': total, |
| 'items': items, |
| 'date': data.get('date'), |
| 'receipt_number': receipt_number, |
| 'timestamp': datetime.now(pytz.UTC).isoformat(), |
| 'hash': file_hash, |
| 'image_url': image_url, |
| 'manual_entry': manual |
| } |
|
|
| new_transaction_ref = transactions_ref.push(transaction_data) |
| return jsonify({ |
| 'success': True, |
| 'transaction': {**transaction_data, 'id': new_transaction_ref.key}, |
| 'remaining_cash': new_remaining |
| }) |
|
|
| |
| |
| |
|
|
| @app.route('/api/user/spending-overview', methods=['GET']) |
| def get_spending_overview(): |
| try: |
| uid = verify_token(request.headers.get('Authorization', '').split(' ')[1]) |
| transactions_ref = db.reference('transactions') |
| transactions = transactions_ref.order_by_child('uid').equal_to(uid).get() |
|
|
| if transactions is None: |
| return jsonify({ |
| 'daily_spending': [], |
| 'recent_transactions': [] |
| }) |
| |
| df = pd.DataFrame.from_dict(transactions, orient='index') |
| if df.empty: |
| return jsonify({ |
| 'daily_spending': [], |
| 'recent_transactions': [] |
| }) |
| |
| df.dropna(subset=['uid','total','items','date', 'receipt_number', 'timestamp','hash', 'manual_entry'], inplace=True) |
| if df.empty: |
| return jsonify({ |
| 'daily_spending': [], |
| 'recent_transactions': [] |
| }) |
| |
| df['date'] = pd.to_datetime(df['date'], format='%d/%m/%Y', errors='coerce') |
| df.dropna(subset=['date'], inplace=True) |
| if df.empty: |
| return jsonify({ |
| 'daily_spending': [], |
| 'recent_transactions': [] |
| }) |
| |
| daily_spending = df.groupby(df['date'].dt.date)['total'].sum().reset_index() |
|
|
| return jsonify({ |
| 'daily_spending': daily_spending.to_dict(orient='records'), |
| 'recent_transactions': df.sort_values(by='timestamp', ascending=False) |
| .head(10) |
| .to_dict(orient='records') |
| }) |
| except Exception as e: |
| return jsonify({'error': str(e)}), 500 |
|
|
| |
| |
| |
| def verify_admin(auth_header): |
| if not auth_header or not auth_header.startswith('Bearer '): |
| raise ValueError('Invalid token') |
|
|
| token = auth_header.split(' ')[1] |
| uid = verify_token(token) |
| if not uid: |
| raise PermissionError('Invalid user') |
|
|
| user_ref = db.reference(f'users/{uid}') |
| user_data = user_ref.get() |
| if not user_data or not user_data.get('is_admin', False): |
| raise PermissionError('Admin access required') |
|
|
| try: |
| auth.set_custom_user_claims(uid, {"admin": True}) |
| print(f"Custom admin claim set for user {uid}") |
| except Exception as e: |
| print(f"Error setting custom admin claim: {e}") |
| raise PermissionError('Error setting admin claim, but admin verified') |
|
|
| return uid |
|
|
| |
| |
| |
| @app.route('/api/admin/overview', methods=['GET']) |
| def get_admin_overview(): |
| try: |
| verify_admin(request.headers.get('Authorization', '')) |
| |
| users_ref = db.reference('users') |
| all_users = users_ref.get() or {} |
| users_list = [] |
| for uid, user_data in all_users.items(): |
| try: |
| auth_user = auth.get_user(uid) |
| email = auth_user.email |
| except: |
| email = "Deleted User" |
| users_list.append({ |
| 'uid': uid, |
| 'email': email, |
| 'daily_cash': user_data.get('daily_cash', 0), |
| 'remaining_cash': user_data.get('remaining_cash', 0), |
| 'last_reset': user_data.get('last_reset'), |
| 'is_admin': user_data.get('is_admin', False) |
| }) |
|
|
| transactions_ref = db.reference('transactions') |
| all_transactions = transactions_ref.get() or {} |
| transactions_list = [{'id': tid, **data} for tid, data in all_transactions.items()] |
|
|
| return jsonify({ |
| 'users': users_list, |
| 'transactions': transactions_list, |
| 'analytics': { |
| 'total_users': len(users_list), |
| 'total_transactions': len(transactions_list), |
| 'total_spent': sum(t['total'] for t in transactions_list) |
| } |
| }) |
| except Exception as e: |
| return jsonify({'error': str(e)}), 500 |
|
|
| @app.route('/api/admin/users', methods=['POST']) |
| def create_user(): |
| try: |
| verify_admin(request.headers.get('Authorization', '')) |
| data = request.get_json() |
| |
| user = auth.create_user( |
| email=data['email'], |
| password=data['password'] |
| ) |
| |
| user_ref = db.reference(f'users/{user.uid}') |
| user_data = { |
| 'daily_cash': data.get('daily_cash', 0), |
| 'remaining_cash': data.get('daily_cash', 0), |
| 'last_reset': '2025-01-01T00:00:00+00:00', |
| 'is_admin': data.get('is_admin', False) |
| } |
| user_ref.set(user_data) |
| |
| return jsonify({ |
| 'success': True, |
| 'user': { |
| 'uid': user.uid, |
| 'email': user.email, |
| **user_data |
| } |
| }), 201 |
| except Exception as e: |
| return jsonify({'error': str(e)}), 400 |
|
|
| @app.route('/api/admin/users/<string:uid>/limit', methods=['PUT']) |
| def update_user_limit(uid): |
| try: |
| verify_admin(request.headers.get('Authorization', '')) |
| data = request.get_json() |
| new_limit = float(data['daily_cash']) |
| |
| user_ref = db.reference(f'users/{uid}') |
| user_data = user_ref.get() |
| |
| if not user_data: |
| return jsonify({'error': 'User not found'}), 404 |
| |
| updates = {'daily_cash': new_limit} |
| current_remaining = user_data.get('remaining_cash', new_limit) |
| if current_remaining > new_limit: |
| updates['remaining_cash'] = new_limit |
| |
| user_ref.update(updates) |
| |
| return jsonify({ |
| 'success': True, |
| 'new_daily_cash': new_limit, |
| 'updated_remaining': updates.get('remaining_cash', current_remaining) |
| }) |
| except Exception as e: |
| return jsonify({'error': str(e)}), 400 |
|
|
| |
| |
| |
|
|
| @app.route('/api/admin/users/<string:uid>/remaining-cash', methods=['PUT']) |
| def update_remaining_cash(uid): |
| try: |
| verify_admin(request.headers.get('Authorization', '')) |
| data = request.get_json() |
| if 'remaining_cash' not in data: |
| return jsonify({'error': 'remaining_cash is required'}), 400 |
| new_remaining_cash = float(data['remaining_cash']) |
| user_ref = db.reference(f'users/{uid}') |
| user_data = user_ref.get() |
| if not user_data: |
| return jsonify({'error': 'User not found'}), 404 |
|
|
| user_ref.update({'remaining_cash': new_remaining_cash}) |
| return jsonify({'success': True, 'remaining_cash': new_remaining_cash}) |
| except Exception as e: |
| return jsonify({'error': str(e)}), 400 |
|
|
|
|
| @app.route('/api/admin/users/<string:uid>/reset-password', methods=['PUT']) |
| def admin_reset_password(uid): |
| try: |
| verify_admin(request.headers.get('Authorization', '')) |
| data = request.get_json() |
| new_password = data.get('new_password') |
| if not new_password: |
| return jsonify({'error': 'new_password is required'}), 400 |
| auth.update_user(uid, password=new_password) |
| return jsonify({'success': True, 'message': 'Password reset successfully'}) |
| except Exception as e: |
| return jsonify({'error': str(e)}), 400 |
|
|
| |
| |
| |
|
|
| @app.route('/api/user/profile', methods=['GET']) |
| def get_user_profile(): |
| try: |
| uid = verify_token(request.headers.get('Authorization', '').split(' ')[1]) |
| user = auth.get_user(uid) |
| user_data = db.reference(f'users/{uid}').get() |
| |
| return jsonify({ |
| 'uid': uid, |
| 'email': user.email, |
| 'daily_cash': user_data.get('daily_cash', 0), |
| 'remaining_cash': user_data.get('remaining_cash', 0), |
| 'last_reset': user_data.get('last_reset'), |
| 'is_admin': user_data.get('is_admin', False) |
| }) |
| except Exception as e: |
| uid = verify_token(request.headers.get('Authorization', '').split(' ')[1]) |
| user = auth.get_user(uid) |
| user_data = db.reference(f'users/{uid}').get() |
| return jsonify({'error': str(e)+ f'user data: {user_data}'}), 500 |
|
|
| |
| |
| |
|
|
| def get_blob_from_image_url(image_url): |
| parsed = urlparse(image_url) |
| if parsed.netloc == "storage.googleapis.com": |
| parts = parsed.path.strip("/").split("/", 1) |
| if len(parts) == 2: |
| bucket_name, blob_path = parts |
| return blob_path |
| prefix = f"/v0/b/{bucket.name}/o/" |
| if parsed.path.startswith(prefix): |
| encoded_blob_path = parsed.path[len(prefix):] |
| blob_path = unquote(encoded_blob_path) |
| return blob_path |
| return None |
|
|
| @app.route('/api/admin/receipt/<string:transaction_id>/view', methods=['GET']) |
| def view_receipt(transaction_id): |
| try: |
| verify_admin(request.headers.get('Authorization', '')) |
| transaction_ref = db.reference(f'transactions/{transaction_id}') |
| transaction_data = transaction_ref.get() |
| if not transaction_data: |
| return jsonify({'error': 'Transaction not found'}), 404 |
|
|
| image_url = transaction_data.get('image_url') |
| if not image_url: |
| return jsonify({'error': 'No receipt image found for this transaction'}), 404 |
|
|
| blob_path = get_blob_from_image_url(image_url) |
| if not blob_path: |
| return jsonify({'error': 'Could not determine blob path from URL'}), 500 |
|
|
| print(f"Blob path for view: {blob_path}") |
| blob = bucket.blob(blob_path) |
| if not blob.exists(): |
| print("Blob does not exist at path:", blob_path) |
| return jsonify({'error': 'Blob not found'}), 404 |
|
|
| signed_url = blob.generate_signed_url(expiration=timedelta(minutes=10)) |
| r = requests.get(signed_url) |
| if r.status_code != 200: |
| return jsonify({'error': 'Unable to fetch image from storage'}), 500 |
|
|
| return send_file(io.BytesIO(r.content), mimetype='image/jpeg') |
| except Exception as e: |
| print(f"View receipt error: {str(e)}") |
| return jsonify({'error': str(e)}), 500 |
|
|
| @app.route('/api/admin/receipt/<string:transaction_id>/download', methods=['GET']) |
| def download_receipt(transaction_id): |
| try: |
| verify_admin(request.headers.get('Authorization', '')) |
| transaction_ref = db.reference(f'transactions/{transaction_id}') |
| transaction_data = transaction_ref.get() |
| if not transaction_data: |
| return jsonify({'error': 'Transaction not found'}), 404 |
|
|
| image_url = transaction_data.get('image_url') |
| if not image_url: |
| return jsonify({'error': 'No receipt image found for this transaction'}), 404 |
|
|
| blob_path = get_blob_from_image_url(image_url) |
| if not blob_path: |
| return jsonify({'error': 'Could not determine blob path from URL'}), 500 |
|
|
| print(f"Blob path for download: {blob_path}") |
| blob = bucket.blob(blob_path) |
| if not blob.exists(): |
| print("Blob does not exist at path:", blob_path) |
| return jsonify({'error': 'Blob not found'}), 404 |
|
|
| signed_url = blob.generate_signed_url(expiration=timedelta(minutes=10)) |
| r = requests.get(signed_url) |
| if r.status_code != 200: |
| return jsonify({'error': 'Unable to fetch image from storage'}), 500 |
|
|
| return send_file( |
| io.BytesIO(r.content), |
| mimetype='image/jpeg', |
| as_attachment=True, |
| attachment_filename='receipt.jpg' |
| ) |
| except Exception as e: |
| print(f"Download receipt error: {str(e)}") |
| return jsonify({'error': str(e)}), 500 |
|
|
| |
| |
| |
|
|
| @app.route('/api/admin/users/<string:uid>', methods=['DELETE']) |
| def delete_user(uid): |
| try: |
| verify_admin(request.headers.get('Authorization', '')) |
| |
| try: |
| user = auth.get_user(uid) |
| except auth.UserNotFoundError: |
| return jsonify({'error': 'User not found'}), 404 |
|
|
| auth.delete_user(uid) |
| db.reference(f'users/{uid}').delete() |
| |
| transactions_ref = db.reference('transactions') |
| user_transactions = transactions_ref.order_by_child('uid').equal_to(uid).get() |
| if user_transactions: |
| for transaction_id in user_transactions.keys(): |
| transactions_ref.child(transaction_id).delete() |
| |
| return jsonify({ |
| 'success': True, |
| 'message': f'User {uid} and all associated data deleted successfully' |
| }) |
| except Exception as e: |
| return jsonify({'error': str(e)}), 500 |
|
|
| if __name__ == '__main__': |
| app.run(debug=True, host="0.0.0.0", port=7860) |
|
|