import os import io import json import hashlib from datetime import datetime, time, timedelta from PIL import Image import pytz from flask import Flask, request, jsonify, send_file from flask_cors import CORS import google.generativeai as genai import firebase_admin from firebase_admin import credentials, db, storage, auth import pandas as pd import requests from urllib.parse import urlparse, unquote import numpy as np app = Flask(__name__) CORS(app) # Firebase initialization cred = credentials.Certificate('mydateproject-e7994-firebase-adminsdk-mpa9a-2fd9c32a98.json') firebase_admin.initialize_app(cred, { 'databaseURL': 'https://mydateproject-e7994.firebaseio.com', 'storageBucket': 'mydateproject-e7994.firebasestorage.app' }) bucket = storage.bucket() api_key = os.environ['Gemini'] FIREBASE_API_KEY = os.environ.get('FIREBASE_API_KEY') # Helper functions def configure_gemini(): genai.configure(api_key=api_key) return genai.GenerativeModel('gemini-2.0-flash-thinking-exp') def verify_token(token): try: decoded_token = auth.verify_id_token(token) return decoded_token['uid'] except Exception as e: return None def check_daily_reset(user_ref): try: user_data = user_ref.get() now = datetime.now(pytz.UTC) last_reset = datetime.fromisoformat(user_data.get('last_reset', '2000-01-01T00:00:00+00:00')) if now.time() >= time(8,0) and last_reset.date() < now.date(): user_ref.update({ 'remaining_cash': user_data['daily_cash'], 'last_reset': now.isoformat() }) return True return False except Exception as e: print(f"Reset error: {str(e)}") return False # Process receipt image def process_receipt(model, image): prompt = """Analyze this image and determine if it's a receipt. If it is a receipt, extract: - Total amount (as float) - List of items purchased (array of strings) - Date of transaction (DD/MM/YYYY format) - Receipt number (as string) Return JSON format with keys: is_receipt (boolean), total, items, date, receipt_number. If not a receipt, return {"is_receipt": false}""" try: response = model.generate_content([prompt, image]) return response.text except Exception as e: print(f"Gemini error: {str(e)}") return "{}" # Write report @app.route('/api/write-report', methods=['POST']) def generate_report(): prompt = """You are the TrueSpend AI analyst, analyze this transaction data and write an insightful business report on the spending habits of the employees. Make sure the report is in plain text""" model = configure_gemini() try: transaction_data = request.get_json() transaction_json_string = json.dumps(transaction_data['transactions']) response = model.generate_content([prompt, transaction_json_string]) report = response.text return jsonify({"report": report}) except Exception as e: return jsonify({"error": str(e)}), 500 # ======================================== # Authentication Endpoints # ======================================== # (Any existing authentication endpoints remain unchanged) # ======================================== # Image Receipt Processing Endpoint # ======================================== @app.route('/api/process-receipt', methods=['POST']) def process_receipt_endpoint(): try: auth_header = request.headers.get('Authorization') token = auth_header.split('Bearer ')[1] if auth_header else None uid = verify_token(token) if token else None if not uid: return jsonify({'error': 'Invalid token'}), 401 user_ref = db.reference(f'users/{uid}') user_data = user_ref.get() # If the confirmation flag is set, then use the form fields (and hidden file_hash/image_url) # to create the transaction. (Note: image_bytes is None because the image was already uploaded.) if request.form.get('confirmed') == 'true': data = { 'total': float(request.form.get('total')), 'items': [item.strip() for item in request.form.get('items', '').split(',')], 'date': request.form.get('date'), 'receipt_number': request.form.get('receipt_number'), 'is_receipt': True, 'image_url': request.form.get('image_url') # provided by the first submission } file_hash = request.form.get('file_hash', '') # In this confirmation branch, we pass image_bytes as None. return validate_and_save_transaction( uid=uid, user_data=user_data, data=data, file_hash=file_hash, image_bytes=None, manual=False ) # Handle image processing (first submission) file = request.files.get('receipt') if not file: return jsonify({'error': 'No file uploaded'}), 400 image_bytes = file.read() file_hash = hashlib.md5(image_bytes).hexdigest() transactions_ref = db.reference('transactions') existing = transactions_ref.order_by_child('hash').equal_to(file_hash).get() if existing: return jsonify({'error': 'Receipt already processed'}), 400 # Immediately upload the image so that it is stored regardless of review outcome. timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') blob = bucket.blob(f'receipts/{uid}/{timestamp}_{file_hash}.jpg') blob.upload_from_string(image_bytes, content_type='image/jpeg') image_url = blob.public_url # Process the image with Gemini image = Image.open(io.BytesIO(image_bytes)) # --- Image Compression --- compressed_buffer = io.BytesIO() image.save(compressed_buffer, format="JPEG", optimize=True, quality=90) # Adjust quality as needed compressed_image_bytes = compressed_buffer.getvalue() compressed_image = Image.open(io.BytesIO(compressed_image_bytes)) model = configure_gemini() result_text = process_receipt(model, compressed_image) try: json_str = result_text[result_text.find('{'):result_text.rfind('}')+1] data = json.loads(json_str) except json.JSONDecodeError: return jsonify({'error': 'Failed to parse receipt data', 'raw_response': result_text}), 400 if not data.get('is_receipt', False): return jsonify({'error': 'Not a valid receipt'}), 400 # Instead of saving immediately, return the extracted data along with the image info for review. data['file_hash'] = file_hash data['image_url'] = image_url return jsonify({ 'success': True, 'extracted': True, 'data': data, 'message': 'Review extracted information before confirming.' }) except Exception as e: print(e) return jsonify({'error': str(e)}), 500 # ======================================== # Manual Entry Endpoint # ======================================== @app.route('/api/manual-entry', methods=['POST']) def manual_entry_endpoint(): try: auth_header = request.headers.get('Authorization') token = auth_header.split('Bearer ')[1] if auth_header else None uid = verify_token(token) if token else None if not uid: return jsonify({'error': 'Invalid token'}), 401 user_ref = db.reference(f'users/{uid}') user_data = user_ref.get() return handle_manual_entry(uid, user_ref, user_data) except Exception as e: return jsonify({'error': str(e)}), 500 def handle_manual_entry(uid, user_ref, user_data): try: data = { 'total': float(request.form.get('total')), 'items': [item.strip() for item in request.form.get('items', '').split(',')], 'date': request.form.get('date'), 'receipt_number': request.form.get('receipt_number'), 'is_receipt': True } return validate_and_save_transaction( uid=uid, user_data=user_data, data=data, file_hash=hashlib.md5(str(datetime.now()).encode()).hexdigest(), image_bytes=None, manual=True ) except Exception as e: return jsonify({'error': str(e)}), 400 def validate_and_save_transaction(uid, user_data, data, file_hash, image_bytes, manual=False): transactions_ref = db.reference('transactions') receipt_number = data.get('receipt_number') items = data.get('items', []) # Check for duplicate transactions based on receipt number and items existing_transactions_with_receipt = transactions_ref.order_by_child('receipt_number').equal_to(receipt_number).get() if existing_transactions_with_receipt: for transaction_id, existing_transaction_data in existing_transactions_with_receipt.items(): existing_items = sorted(existing_transaction_data.get('items', [])) current_items = sorted(items) if existing_items == current_items: return jsonify({'error': f"Transaction with Receipt #{receipt_number} and identical items already exists"}), 400 total = float(data.get('total', 0)) # Initialize image_url from data (could be None) image_url = data.get('image_url') # Upload image if image_bytes are provided and not manual if image_bytes and not manual: timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') blob = bucket.blob(f'receipts/{uid}/{timestamp}_{file_hash}.jpg') blob.upload_from_string(image_bytes, content_type='image/jpeg') image_url = blob.public_url # Update user cash - allowing negative values new_remaining = user_data['remaining_cash'] - total db.reference(f'users/{uid}').update({'remaining_cash': new_remaining}) # Build transaction data without image_url initially transaction_data = { 'uid': uid, 'total': total, 'items': items, 'date': data.get('date'), 'receipt_number': receipt_number, 'timestamp': datetime.now(pytz.UTC).isoformat(), 'hash': file_hash, 'manual_entry': manual } # Only add image_url if it is not None if image_url is not None: transaction_data['image_url'] = image_url # Save the transaction new_transaction_ref = transactions_ref.push(transaction_data) return jsonify({ 'success': True, 'transaction': {**transaction_data, 'id': new_transaction_ref.key}, 'remaining_cash': new_remaining }) # ======================================== # Data Endpoints for Visualizations # ======================================== @app.route('/api/user/spending-overview', methods=['GET']) def get_spending_overview(): try: # Extract the token and verify it. auth_header = request.headers.get('Authorization', '') token = auth_header.split(' ')[1] if len(auth_header.split(' ')) > 1 else '' uid = verify_token(token) # Get transactions for the user. transactions_ref = db.reference('transactions') transactions = transactions_ref.order_by_child('uid').equal_to(uid).get() or {} # Convert to list for easier processing transactions_list = [] for tx_id, tx_data in transactions.items(): # Add ID to the transaction data tx_item = tx_data.copy() tx_item['id'] = tx_id transactions_list.append(tx_item) # Create a DataFrame df = pd.DataFrame(transactions_list) if transactions_list else pd.DataFrame() if df.empty: return jsonify({ 'daily_spending': [], 'recent_transactions': [] }) # Handle date parsing without dropping invalid dates if 'date' in df.columns: # Create a temporary column for parsed dates df['parsed_date'] = pd.to_datetime(df['date'], errors='coerce') # For rows with invalid dates, set a default date default_date = pd.Timestamp('2000-01-01') mask = df['parsed_date'].isna() if mask.any(): print(f"Found {mask.sum()} transactions with invalid dates") # Keep the original string in 'date' column, but use default for calculations df.loc[mask, 'parsed_date'] = default_date # Create ISO format dates for valid dates df['date_iso'] = df['parsed_date'].apply(lambda d: d.isoformat() if pd.notnull(d) else '2000-01-01T00:00:00') # Extract date part for daily spending calculations df['date_only'] = df['date_iso'].apply(lambda d: d.split('T')[0]) else: # If no date column exists, create default values df['date_only'] = '2000-01-01' df['date_iso'] = '2000-01-01T00:00:00' # For daily spending, group by date daily_spending = df.groupby('date_only')['total'].sum().reset_index() daily_spending.rename(columns={'date_only': 'date'}, inplace=True) # Sort transactions by timestamp if available if 'timestamp' in df.columns: df = df.sort_values(by='timestamp', ascending=False) # Select the most recent transactions recent_transactions = df.head(10) # Replace NaN with None for JSON serialization daily_spending = daily_spending.replace({np.nan: None}) recent_transactions = recent_transactions.replace({np.nan: None}) # Ensure proper column selections for the response recent_tx_dict = recent_transactions.to_dict(orient='records') # Make sure each transaction has the original date string for tx in recent_tx_dict: # Remove temporary columns if 'parsed_date' in tx: del tx['parsed_date'] if 'date_only' in tx: del tx['date_only'] if 'date_iso' in tx and 'date' not in tx: tx['date'] = tx['date_iso'] del tx['date_iso'] elif 'date_iso' in tx: del tx['date_iso'] return jsonify({ 'daily_spending': daily_spending.to_dict(orient='records'), 'recent_transactions': recent_tx_dict }) except Exception as e: print(f"Error in spending overview: {e}") return jsonify({'error': str(e)}), 500 # ======================================== # Modified verify_admin function (now checks database is_admin flag) # ======================================== def verify_admin(auth_header): if not auth_header or not auth_header.startswith('Bearer '): raise ValueError('Invalid token') token = auth_header.split(' ')[1] uid = verify_token(token) if not uid: raise PermissionError('Invalid user') user_ref = db.reference(f'users/{uid}') user_data = user_ref.get() if not user_data or not user_data.get('is_admin', False): raise PermissionError('Admin access required') try: auth.set_custom_user_claims(uid, {"admin": True}) print(f"Custom admin claim set for user {uid}") except Exception as e: print(f"Error setting custom admin claim: {e}") raise PermissionError('Error setting admin claim, but admin verified') return uid # ======================================== # Existing Admin Endpoints # ======================================== @app.route('/api/admin/overview', methods=['GET']) def get_admin_overview(): try: verify_admin(request.headers.get('Authorization', '')) users_ref = db.reference('users') all_users = users_ref.get() or {} users_list = [] for uid, user_data in all_users.items(): try: auth_user = auth.get_user(uid) email = auth_user.email except: email = "Deleted User" users_list.append({ 'uid': uid, 'email': email, 'daily_cash': user_data.get('daily_cash', 0), 'remaining_cash': user_data.get('remaining_cash', 0), 'last_reset': user_data.get('last_reset'), 'is_admin': user_data.get('is_admin', False) }) transactions_ref = db.reference('transactions') all_transactions = transactions_ref.get() or {} transactions_list = [{'id': tid, **data} for tid, data in all_transactions.items()] return jsonify({ 'users': users_list, 'transactions': transactions_list, 'analytics': { 'total_users': len(users_list), 'total_transactions': len(transactions_list), 'total_spent': sum(t['total'] for t in transactions_list) } }) except Exception as e: return jsonify({'error': str(e)}), 500 @app.route('/api/admin/users', methods=['POST']) def create_user(): try: verify_admin(request.headers.get('Authorization', '')) data = request.get_json() user = auth.create_user( email=data['email'], password=data['password'] ) user_ref = db.reference(f'users/{user.uid}') user_data = { 'daily_cash': data.get('daily_cash', 0), 'remaining_cash': data.get('daily_cash', 0), 'last_reset': '2025-01-01T00:00:00+00:00', 'is_admin': data.get('is_admin', False) } user_ref.set(user_data) return jsonify({ 'success': True, 'user': { 'uid': user.uid, 'email': user.email, **user_data } }), 201 except Exception as e: return jsonify({'error': str(e)}), 400 @app.route('/api/admin/users//limit', methods=['PUT']) def update_user_limit(uid): try: verify_admin(request.headers.get('Authorization', '')) data = request.get_json() new_limit = float(data['daily_cash']) user_ref = db.reference(f'users/{uid}') user_data = user_ref.get() if not user_data: return jsonify({'error': 'User not found'}), 404 updates = {'daily_cash': new_limit} current_remaining = user_data.get('remaining_cash', new_limit) if current_remaining > new_limit: updates['remaining_cash'] = new_limit user_ref.update(updates) return jsonify({ 'success': True, 'new_daily_cash': new_limit, 'updated_remaining': updates.get('remaining_cash', current_remaining) }) except Exception as e: return jsonify({'error': str(e)}), 400 # ======================================== # New Admin Endpoints for Updating Remaining Cash, Setting Cash Limits, and Resetting Password # ======================================== @app.route('/api/admin/users//remaining-cash', methods=['PUT']) def update_remaining_cash(uid): try: verify_admin(request.headers.get('Authorization', '')) data = request.get_json() if 'remaining_cash' not in data: return jsonify({'error': 'remaining_cash is required'}), 400 new_remaining_cash = float(data['remaining_cash']) user_ref = db.reference(f'users/{uid}') user_data = user_ref.get() if not user_data: return jsonify({'error': 'User not found'}), 404 user_ref.update({'remaining_cash': new_remaining_cash}) return jsonify({'success': True, 'remaining_cash': new_remaining_cash}) except Exception as e: return jsonify({'error': str(e)}), 400 @app.route('/api/admin/users//reset-password', methods=['PUT']) def admin_reset_password(uid): try: verify_admin(request.headers.get('Authorization', '')) data = request.get_json() new_password = data.get('new_password') if not new_password: return jsonify({'error': 'new_password is required'}), 400 auth.update_user(uid, password=new_password) return jsonify({'success': True, 'message': 'Password reset successfully'}) except Exception as e: return jsonify({'error': str(e)}), 400 @app.route('/api/admin/transactions/', methods=['DELETE']) def delete_transaction(transaction_id): """Allow admin users to delete transactions.""" try: verify_admin(request.headers.get('Authorization', '')) transactions_ref = db.reference(f'transactions/{transaction_id}') transaction_data = transactions_ref.get() if not transaction_data: return jsonify({'error': 'Transaction not found'}), 404 transactions_ref.delete() return jsonify({'success': True, 'message': 'Transaction deleted successfully'}) except Exception as e: return jsonify({'error': str(e)}), 500 @app.route('/api/admin/transactions/', methods=['PUT']) def edit_transaction(transaction_id): """Allow admin users to edit transactions.""" try: verify_admin(request.headers.get('Authorization', '')) transactions_ref = db.reference(f'transactions/{transaction_id}') transaction_data = transactions_ref.get() if not transaction_data: return jsonify({'error': 'Transaction not found'}), 404 update_data = request.get_json() transactions_ref.update(update_data) return jsonify({'success': True, 'updated_transaction': update_data}) except Exception as e: return jsonify({'error': str(e)}), 500 # ======================================== # User management endpoint for profile # ======================================== @app.route('/api/user/profile', methods=['GET']) def get_user_profile(): try: uid = verify_token(request.headers.get('Authorization', '').split(' ')[1]) user = auth.get_user(uid) user_data = db.reference(f'users/{uid}').get() return jsonify({ 'uid': uid, 'email': user.email, 'daily_cash': user_data.get('daily_cash', 0), 'remaining_cash': user_data.get('remaining_cash', 0), 'last_reset': user_data.get('last_reset'), 'is_admin': user_data.get('is_admin', False) }) except Exception as e: uid = verify_token(request.headers.get('Authorization', '').split(' ')[1]) user = auth.get_user(uid) user_data = db.reference(f'users/{uid}').get() return jsonify({'error': str(e)+ f'user data: {user_data}'}), 500 # ======================================== # Receipt media endpoints # ======================================== def get_blob_from_image_url(image_url): parsed = urlparse(image_url) if parsed.netloc == "storage.googleapis.com": parts = parsed.path.strip("/").split("/", 1) if len(parts) == 2: bucket_name, blob_path = parts return blob_path prefix = f"/v0/b/{bucket.name}/o/" if parsed.path.startswith(prefix): encoded_blob_path = parsed.path[len(prefix):] blob_path = unquote(encoded_blob_path) return blob_path return None @app.route('/api/admin/receipt//view', methods=['GET']) def view_receipt(transaction_id): try: verify_admin(request.headers.get('Authorization', '')) transaction_ref = db.reference(f'transactions/{transaction_id}') transaction_data = transaction_ref.get() if not transaction_data: return jsonify({'error': 'Transaction not found'}), 404 image_url = transaction_data.get('image_url') if not image_url: return jsonify({'error': 'No receipt image found for this transaction'}), 404 blob_path = get_blob_from_image_url(image_url) if not blob_path: return jsonify({'error': 'Could not determine blob path from URL'}), 500 print(f"Blob path for view: {blob_path}") blob = bucket.blob(blob_path) if not blob.exists(): print("Blob does not exist at path:", blob_path) return jsonify({'error': 'Blob not found'}), 404 signed_url = blob.generate_signed_url(expiration=timedelta(minutes=10)) r = requests.get(signed_url) if r.status_code != 200: return jsonify({'error': 'Unable to fetch image from storage'}), 500 return send_file(io.BytesIO(r.content), mimetype='image/jpeg') except Exception as e: print(f"View receipt error: {str(e)}") return jsonify({'error': str(e)}), 500 @app.route('/api/admin/receipt//download', methods=['GET']) def download_receipt(transaction_id): try: verify_admin(request.headers.get('Authorization', '')) transaction_ref = db.reference(f'transactions/{transaction_id}') transaction_data = transaction_ref.get() if not transaction_data: return jsonify({'error': 'Transaction not found'}), 404 image_url = transaction_data.get('image_url') if not image_url: return jsonify({'error': 'No receipt image found for this transaction'}), 404 blob_path = get_blob_from_image_url(image_url) if not blob_path: return jsonify({'error': 'Could not determine blob path from URL'}), 500 print(f"Blob path for download: {blob_path}") blob = bucket.blob(blob_path) if not blob.exists(): print("Blob does not exist at path:", blob_path) return jsonify({'error': 'Blob not found'}), 404 signed_url = blob.generate_signed_url(expiration=timedelta(minutes=10)) r = requests.get(signed_url) if r.status_code != 200: return jsonify({'error': 'Unable to fetch image from storage'}), 500 return send_file( io.BytesIO(r.content), mimetype='image/jpeg', as_attachment=True, attachment_filename='receipt.jpg' ) except Exception as e: print(f"Download receipt error: {str(e)}") return jsonify({'error': str(e)}), 500 # ======================================== # Delete user endpoint # ======================================== @app.route('/api/admin/users/', methods=['DELETE']) def delete_user(uid): try: verify_admin(request.headers.get('Authorization', '')) try: user = auth.get_user(uid) except auth.UserNotFoundError: return jsonify({'error': 'User not found'}), 404 auth.delete_user(uid) db.reference(f'users/{uid}').delete() transactions_ref = db.reference('transactions') user_transactions = transactions_ref.order_by_child('uid').equal_to(uid).get() if user_transactions: for transaction_id in user_transactions.keys(): transactions_ref.child(transaction_id).delete() return jsonify({ 'success': True, 'message': f'User {uid} and all associated data deleted successfully' }) except Exception as e: return jsonify({'error': str(e)}), 500 if __name__ == '__main__': app.run(debug=True, host="0.0.0.0", port=7860)