Spaces:
Sleeping
Sleeping
| import os | |
| import io | |
| import json | |
| import hashlib | |
| from datetime import datetime, time, timedelta | |
| from PIL import Image | |
| import pytz | |
| from flask import Flask, request, jsonify, send_file | |
| from flask_cors import CORS | |
| import google.generativeai as genai | |
| import firebase_admin | |
| from firebase_admin import credentials, db, storage, auth | |
| import pandas as pd | |
| import requests | |
| from urllib.parse import urlparse, unquote | |
| app = Flask(__name__) | |
| CORS(app) | |
| # Firebase initialization | |
| cred = credentials.Certificate('mydateproject-e7994-firebase-adminsdk-mpa9a-2fd9c32a98.json') | |
| firebase_admin.initialize_app(cred, { | |
| 'databaseURL': 'https://mydateproject-e7994.firebaseio.com', | |
| 'storageBucket': 'mydateproject-e7994.firebasestorage.app' | |
| }) | |
| bucket = storage.bucket() | |
| api_key = os.environ['Gemini'] | |
| FIREBASE_API_KEY = os.environ.get('FIREBASE_API_KEY') | |
| # Helper functions | |
| def configure_gemini(): | |
| genai.configure(api_key=api_key) | |
| return genai.GenerativeModel('gemini-2.0-flash-thinking-exp') | |
| def verify_token(token): | |
| try: | |
| decoded_token = auth.verify_id_token(token) | |
| return decoded_token['uid'] | |
| except Exception as e: | |
| return None | |
| def check_daily_reset(user_ref): | |
| try: | |
| user_data = user_ref.get() | |
| now = datetime.now(pytz.UTC) | |
| last_reset = datetime.fromisoformat(user_data.get('last_reset', '2000-01-01T00:00:00+00:00')) | |
| if now.time() >= time(8,0) and last_reset.date() < now.date(): | |
| user_ref.update({ | |
| 'remaining_cash': user_data['daily_cash'], | |
| 'last_reset': now.isoformat() | |
| }) | |
| return True | |
| return False | |
| except Exception as e: | |
| print(f"Reset error: {str(e)}") | |
| return False | |
| # Process receipt image | |
| def process_receipt(model, image): | |
| prompt = """Analyze this image and determine if it's a receipt. If it is a receipt, extract: | |
| - Total amount (as float) | |
| - List of items purchased (array of strings) | |
| - Date of transaction (DD/MM/YYYY format) | |
| - Receipt number (as string) | |
| Return JSON format with keys: is_receipt (boolean), total, items, date, receipt_number. | |
| If not a receipt, return {"is_receipt": false}""" | |
| try: | |
| response = model.generate_content([prompt, image]) | |
| return response.text | |
| except Exception as e: | |
| print(f"Gemini error: {str(e)}") | |
| return "{}" | |
| # Write report | |
| def generate_report(): | |
| prompt = """You are the TrueSpend AI analyst, analyze this transaction data | |
| and write an insightful business report on the spending habits of the employees. | |
| Make sure the report is in plain text""" | |
| model = configure_gemini() | |
| try: | |
| transaction_data = request.get_json() | |
| transaction_json_string = json.dumps(transaction_data['transactions']) | |
| response = model.generate_content([prompt, transaction_json_string]) | |
| report = response.text | |
| return jsonify({"report": report}) | |
| except Exception as e: | |
| return jsonify({"error": str(e)}), 500 | |
| # ======================================== | |
| # Authentication Endpoints | |
| # ======================================== | |
| # (Any existing authentication endpoints remain unchanged) | |
| # ======================================== | |
| # Receipt Processing Endpoint (unchanged) | |
| # ======================================== | |
| def process_receipt_endpoint(): | |
| try: | |
| auth_header = request.headers.get('Authorization') | |
| token = auth_header.split('Bearer ')[1] if auth_header else None | |
| uid = verify_token(token) if token else None | |
| if not uid: | |
| return jsonify({'error': 'Invalid token'}), 401 | |
| user_ref = db.reference(f'users/{uid}') | |
| user_data = user_ref.get() | |
| # Removed: check_daily_reset(user_ref) | |
| # Handle manual entry | |
| if request.form.get('manual_entry') == 'true': | |
| return handle_manual_entry(uid, user_ref, user_data) | |
| # Handle image processing | |
| file = request.files.get('receipt') | |
| if not file: | |
| return jsonify({'error': 'No file uploaded'}), 400 | |
| image_bytes = file.read() | |
| file_hash = hashlib.md5(image_bytes).hexdigest() | |
| transactions_ref = db.reference('transactions') | |
| existing = transactions_ref.order_by_child('hash').equal_to(file_hash).get() | |
| if existing: | |
| return jsonify({'error': 'Receipt already processed'}), 400 | |
| image = Image.open(io.BytesIO(image_bytes)) | |
| model = configure_gemini() | |
| result_text = process_receipt(model, image) | |
| try: | |
| json_str = result_text[result_text.find('{'):result_text.rfind('}')+1] | |
| data = json.loads(json_str) | |
| except json.JSONDecodeError: | |
| return jsonify({'error': 'Failed to parse receipt data', 'raw_response': result_text}), 400 | |
| if not data.get('is_receipt', False): | |
| return jsonify({'error': 'Not a valid receipt'}), 400 | |
| return validate_and_save_transaction( | |
| uid=uid, | |
| user_data=user_data, | |
| data=data, | |
| file_hash=file_hash, | |
| image_bytes=image_bytes, | |
| manual=False | |
| ) | |
| except Exception as e: | |
| print(e) | |
| return jsonify({'error': str(e)}), 500 | |
| def handle_manual_entry(uid, user_ref, user_data): | |
| try: | |
| data = { | |
| 'total': float(request.form.get('total')), | |
| 'items': [item.strip() for item in request.form.get('items', '').split(',')], | |
| 'date': request.form.get('date'), | |
| 'receipt_number': request.form.get('receipt_number'), | |
| 'is_receipt': True | |
| } | |
| return validate_and_save_transaction( | |
| uid=uid, | |
| user_data=user_data, | |
| data=data, | |
| file_hash=hashlib.md5(str(datetime.now()).encode()).hexdigest(), | |
| image_bytes=None, | |
| manual=True | |
| ) | |
| except Exception as e: | |
| return jsonify({'error': str(e)}), 400 | |
| def validate_and_save_transaction(uid, user_data, data, file_hash, image_bytes, manual=False): | |
| transactions_ref = db.reference('transactions') | |
| receipt_number = data.get('receipt_number') | |
| items = data.get('items', []) | |
| # Check for duplicate transactions based on receipt number and items. | |
| existing_transactions_with_receipt = transactions_ref.order_by_child('receipt_number').equal_to(receipt_number).get() | |
| if existing_transactions_with_receipt: | |
| for transaction_id, existing_transaction_data in existing_transactions_with_receipt.items(): | |
| existing_items = sorted(existing_transaction_data.get('items', [])) | |
| current_items = sorted(items) | |
| if existing_items == current_items: | |
| return jsonify({'error': f"Transaction with Receipt #{receipt_number} and identical items already exists"}), 400 | |
| total = float(data.get('total', 0)) | |
| # Removed the check for insufficient funds to allow negative balances | |
| # Upload image if available | |
| image_url = None | |
| if image_bytes and not manual: | |
| timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') | |
| blob = bucket.blob(f'receipts/{uid}/{timestamp}_{file_hash}.jpg') | |
| blob.upload_from_string(image_bytes, content_type='image/jpeg') | |
| image_url = blob.public_url | |
| # Update user cash - now allowing negative values | |
| new_remaining = user_data['remaining_cash'] - total | |
| db.reference(f'users/{uid}').update({'remaining_cash': new_remaining}) | |
| transaction_data = { | |
| 'uid': uid, | |
| 'total': total, | |
| 'items': items, | |
| 'date': data.get('date'), | |
| 'receipt_number': receipt_number, | |
| 'timestamp': datetime.now(pytz.UTC).isoformat(), | |
| 'hash': file_hash, | |
| 'image_url': image_url, | |
| 'manual_entry': manual | |
| } | |
| new_transaction_ref = transactions_ref.push(transaction_data) | |
| return jsonify({ | |
| 'success': True, | |
| 'transaction': {**transaction_data, 'id': new_transaction_ref.key}, | |
| 'remaining_cash': new_remaining | |
| }) | |
| # ======================================== | |
| # Data Endpoints for Visualizations | |
| # ======================================== | |
| def get_spending_overview(): | |
| try: | |
| uid = verify_token(request.headers.get('Authorization', '').split(' ')[1]) | |
| transactions_ref = db.reference('transactions') | |
| transactions = transactions_ref.order_by_child('uid').equal_to(uid).get() | |
| if transactions is None: | |
| return jsonify({ | |
| 'daily_spending': [], | |
| 'recent_transactions': [] | |
| }) | |
| df = pd.DataFrame.from_dict(transactions, orient='index') | |
| if df.empty: | |
| return jsonify({ | |
| 'daily_spending': [], | |
| 'recent_transactions': [] | |
| }) | |
| df.dropna(subset=['uid','total','items','date', 'receipt_number', 'timestamp','hash', 'manual_entry'], inplace=True) | |
| if df.empty: | |
| return jsonify({ | |
| 'daily_spending': [], | |
| 'recent_transactions': [] | |
| }) | |
| df['date'] = pd.to_datetime(df['date'], format='%d/%m/%Y', errors='coerce') | |
| df.dropna(subset=['date'], inplace=True) | |
| if df.empty: | |
| return jsonify({ | |
| 'daily_spending': [], | |
| 'recent_transactions': [] | |
| }) | |
| daily_spending = df.groupby(df['date'].dt.date)['total'].sum().reset_index() | |
| return jsonify({ | |
| 'daily_spending': daily_spending.to_dict(orient='records'), | |
| 'recent_transactions': df.sort_values(by='timestamp', ascending=False) | |
| .head(10) | |
| .to_dict(orient='records') | |
| }) | |
| except Exception as e: | |
| return jsonify({'error': str(e)}), 500 | |
| # ======================================== | |
| # Modified verify_admin function (now checks database is_admin flag) | |
| # ======================================== | |
| def verify_admin(auth_header): | |
| if not auth_header or not auth_header.startswith('Bearer '): | |
| raise ValueError('Invalid token') | |
| token = auth_header.split(' ')[1] | |
| uid = verify_token(token) | |
| if not uid: | |
| raise PermissionError('Invalid user') | |
| user_ref = db.reference(f'users/{uid}') | |
| user_data = user_ref.get() | |
| if not user_data or not user_data.get('is_admin', False): | |
| raise PermissionError('Admin access required') | |
| try: | |
| auth.set_custom_user_claims(uid, {"admin": True}) | |
| print(f"Custom admin claim set for user {uid}") | |
| except Exception as e: | |
| print(f"Error setting custom admin claim: {e}") | |
| raise PermissionError('Error setting admin claim, but admin verified') | |
| return uid | |
| # ======================================== | |
| # Existing Admin Endpoints | |
| # ======================================== | |
| def get_admin_overview(): | |
| try: | |
| verify_admin(request.headers.get('Authorization', '')) | |
| users_ref = db.reference('users') | |
| all_users = users_ref.get() or {} | |
| users_list = [] | |
| for uid, user_data in all_users.items(): | |
| try: | |
| auth_user = auth.get_user(uid) | |
| email = auth_user.email | |
| except: | |
| email = "Deleted User" | |
| users_list.append({ | |
| 'uid': uid, | |
| 'email': email, | |
| 'daily_cash': user_data.get('daily_cash', 0), | |
| 'remaining_cash': user_data.get('remaining_cash', 0), | |
| 'last_reset': user_data.get('last_reset'), | |
| 'is_admin': user_data.get('is_admin', False) | |
| }) | |
| transactions_ref = db.reference('transactions') | |
| all_transactions = transactions_ref.get() or {} | |
| transactions_list = [{'id': tid, **data} for tid, data in all_transactions.items()] | |
| return jsonify({ | |
| 'users': users_list, | |
| 'transactions': transactions_list, | |
| 'analytics': { | |
| 'total_users': len(users_list), | |
| 'total_transactions': len(transactions_list), | |
| 'total_spent': sum(t['total'] for t in transactions_list) | |
| } | |
| }) | |
| except Exception as e: | |
| return jsonify({'error': str(e)}), 500 | |
| def create_user(): | |
| try: | |
| verify_admin(request.headers.get('Authorization', '')) | |
| data = request.get_json() | |
| user = auth.create_user( | |
| email=data['email'], | |
| password=data['password'] | |
| ) | |
| user_ref = db.reference(f'users/{user.uid}') | |
| user_data = { | |
| 'daily_cash': data.get('daily_cash', 0), | |
| 'remaining_cash': data.get('daily_cash', 0), | |
| 'last_reset': '2025-01-01T00:00:00+00:00', | |
| 'is_admin': data.get('is_admin', False) | |
| } | |
| user_ref.set(user_data) | |
| return jsonify({ | |
| 'success': True, | |
| 'user': { | |
| 'uid': user.uid, | |
| 'email': user.email, | |
| **user_data | |
| } | |
| }), 201 | |
| except Exception as e: | |
| return jsonify({'error': str(e)}), 400 | |
| def update_user_limit(uid): | |
| try: | |
| verify_admin(request.headers.get('Authorization', '')) | |
| data = request.get_json() | |
| new_limit = float(data['daily_cash']) | |
| user_ref = db.reference(f'users/{uid}') | |
| user_data = user_ref.get() | |
| if not user_data: | |
| return jsonify({'error': 'User not found'}), 404 | |
| updates = {'daily_cash': new_limit} | |
| current_remaining = user_data.get('remaining_cash', new_limit) | |
| if current_remaining > new_limit: | |
| updates['remaining_cash'] = new_limit | |
| user_ref.update(updates) | |
| return jsonify({ | |
| 'success': True, | |
| 'new_daily_cash': new_limit, | |
| 'updated_remaining': updates.get('remaining_cash', current_remaining) | |
| }) | |
| except Exception as e: | |
| return jsonify({'error': str(e)}), 400 | |
| # ======================================== | |
| # New Admin Endpoints for Updating Remaining Cash, Setting Cash Limits, and Resetting Password | |
| # ======================================== | |
| def update_remaining_cash(uid): | |
| try: | |
| verify_admin(request.headers.get('Authorization', '')) | |
| data = request.get_json() | |
| if 'remaining_cash' not in data: | |
| return jsonify({'error': 'remaining_cash is required'}), 400 | |
| new_remaining_cash = float(data['remaining_cash']) | |
| user_ref = db.reference(f'users/{uid}') | |
| user_data = user_ref.get() | |
| if not user_data: | |
| return jsonify({'error': 'User not found'}), 404 | |
| user_ref.update({'remaining_cash': new_remaining_cash}) | |
| return jsonify({'success': True, 'remaining_cash': new_remaining_cash}) | |
| except Exception as e: | |
| return jsonify({'error': str(e)}), 400 | |
| def admin_reset_password(uid): | |
| try: | |
| verify_admin(request.headers.get('Authorization', '')) | |
| data = request.get_json() | |
| new_password = data.get('new_password') | |
| if not new_password: | |
| return jsonify({'error': 'new_password is required'}), 400 | |
| auth.update_user(uid, password=new_password) | |
| return jsonify({'success': True, 'message': 'Password reset successfully'}) | |
| except Exception as e: | |
| return jsonify({'error': str(e)}), 400 | |
| # ======================================== | |
| # User management endpoint for profile | |
| # ======================================== | |
| def get_user_profile(): | |
| try: | |
| uid = verify_token(request.headers.get('Authorization', '').split(' ')[1]) | |
| user = auth.get_user(uid) | |
| user_data = db.reference(f'users/{uid}').get() | |
| return jsonify({ | |
| 'uid': uid, | |
| 'email': user.email, | |
| 'daily_cash': user_data.get('daily_cash', 0), | |
| 'remaining_cash': user_data.get('remaining_cash', 0), | |
| 'last_reset': user_data.get('last_reset'), | |
| 'is_admin': user_data.get('is_admin', False) | |
| }) | |
| except Exception as e: | |
| uid = verify_token(request.headers.get('Authorization', '').split(' ')[1]) | |
| user = auth.get_user(uid) | |
| user_data = db.reference(f'users/{uid}').get() | |
| return jsonify({'error': str(e)+ f'user data: {user_data}'}), 500 | |
| # ======================================== | |
| # Receipt media endpoints | |
| # ======================================== | |
| def get_blob_from_image_url(image_url): | |
| parsed = urlparse(image_url) | |
| if parsed.netloc == "storage.googleapis.com": | |
| parts = parsed.path.strip("/").split("/", 1) | |
| if len(parts) == 2: | |
| bucket_name, blob_path = parts | |
| return blob_path | |
| prefix = f"/v0/b/{bucket.name}/o/" | |
| if parsed.path.startswith(prefix): | |
| encoded_blob_path = parsed.path[len(prefix):] | |
| blob_path = unquote(encoded_blob_path) | |
| return blob_path | |
| return None | |
| def view_receipt(transaction_id): | |
| try: | |
| verify_admin(request.headers.get('Authorization', '')) | |
| transaction_ref = db.reference(f'transactions/{transaction_id}') | |
| transaction_data = transaction_ref.get() | |
| if not transaction_data: | |
| return jsonify({'error': 'Transaction not found'}), 404 | |
| image_url = transaction_data.get('image_url') | |
| if not image_url: | |
| return jsonify({'error': 'No receipt image found for this transaction'}), 404 | |
| blob_path = get_blob_from_image_url(image_url) | |
| if not blob_path: | |
| return jsonify({'error': 'Could not determine blob path from URL'}), 500 | |
| print(f"Blob path for view: {blob_path}") | |
| blob = bucket.blob(blob_path) | |
| if not blob.exists(): | |
| print("Blob does not exist at path:", blob_path) | |
| return jsonify({'error': 'Blob not found'}), 404 | |
| signed_url = blob.generate_signed_url(expiration=timedelta(minutes=10)) | |
| r = requests.get(signed_url) | |
| if r.status_code != 200: | |
| return jsonify({'error': 'Unable to fetch image from storage'}), 500 | |
| return send_file(io.BytesIO(r.content), mimetype='image/jpeg') | |
| except Exception as e: | |
| print(f"View receipt error: {str(e)}") | |
| return jsonify({'error': str(e)}), 500 | |
| def download_receipt(transaction_id): | |
| try: | |
| verify_admin(request.headers.get('Authorization', '')) | |
| transaction_ref = db.reference(f'transactions/{transaction_id}') | |
| transaction_data = transaction_ref.get() | |
| if not transaction_data: | |
| return jsonify({'error': 'Transaction not found'}), 404 | |
| image_url = transaction_data.get('image_url') | |
| if not image_url: | |
| return jsonify({'error': 'No receipt image found for this transaction'}), 404 | |
| blob_path = get_blob_from_image_url(image_url) | |
| if not blob_path: | |
| return jsonify({'error': 'Could not determine blob path from URL'}), 500 | |
| print(f"Blob path for download: {blob_path}") | |
| blob = bucket.blob(blob_path) | |
| if not blob.exists(): | |
| print("Blob does not exist at path:", blob_path) | |
| return jsonify({'error': 'Blob not found'}), 404 | |
| signed_url = blob.generate_signed_url(expiration=timedelta(minutes=10)) | |
| r = requests.get(signed_url) | |
| if r.status_code != 200: | |
| return jsonify({'error': 'Unable to fetch image from storage'}), 500 | |
| return send_file( | |
| io.BytesIO(r.content), | |
| mimetype='image/jpeg', | |
| as_attachment=True, | |
| attachment_filename='receipt.jpg' | |
| ) | |
| except Exception as e: | |
| print(f"Download receipt error: {str(e)}") | |
| return jsonify({'error': str(e)}), 500 | |
| # ======================================== | |
| # Delete user endpoint | |
| # ======================================== | |
| def delete_user(uid): | |
| try: | |
| verify_admin(request.headers.get('Authorization', '')) | |
| try: | |
| user = auth.get_user(uid) | |
| except auth.UserNotFoundError: | |
| return jsonify({'error': 'User not found'}), 404 | |
| auth.delete_user(uid) | |
| db.reference(f'users/{uid}').delete() | |
| transactions_ref = db.reference('transactions') | |
| user_transactions = transactions_ref.order_by_child('uid').equal_to(uid).get() | |
| if user_transactions: | |
| for transaction_id in user_transactions.keys(): | |
| transactions_ref.child(transaction_id).delete() | |
| return jsonify({ | |
| 'success': True, | |
| 'message': f'User {uid} and all associated data deleted successfully' | |
| }) | |
| except Exception as e: | |
| return jsonify({'error': str(e)}), 500 | |
| if __name__ == '__main__': | |
| app.run(debug=True, host="0.0.0.0", port=7860) | |