import os import io import json import hashlib from datetime import datetime, time, timedelta from PIL import Image import pytz from flask import Flask, request, jsonify, send_file from flask_cors import CORS import google.generativeai as genai import firebase_admin from firebase_admin import credentials, db, storage, auth import pandas as pd import requests from urllib.parse import urlparse, unquote app = Flask(__name__) CORS(app) # Firebase initialization cred = credentials.Certificate('mydateproject-e7994-firebase-adminsdk-mpa9a-2fd9c32a98.json') firebase_admin.initialize_app(cred, { 'databaseURL': 'https://mydateproject-e7994.firebaseio.com', 'storageBucket': 'mydateproject-e7994.firebasestorage.app' }) bucket = storage.bucket() api_key = os.environ['Gemini'] FIREBASE_API_KEY = os.environ.get('FIREBASE_API_KEY') # Helper functions def configure_gemini(): genai.configure(api_key=api_key) return genai.GenerativeModel('gemini-2.0-flash-thinking-exp') def verify_token(token): try: decoded_token = auth.verify_id_token(token) return decoded_token['uid'] except Exception as e: return None def check_daily_reset(user_ref): try: user_data = user_ref.get() now = datetime.now(pytz.UTC) last_reset = datetime.fromisoformat(user_data.get('last_reset', '2000-01-01T00:00:00+00:00')) if now.time() >= time(8,0) and last_reset.date() < now.date(): user_ref.update({ 'remaining_cash': user_data['daily_cash'], 'last_reset': now.isoformat() }) return True return False except Exception as e: print(f"Reset error: {str(e)}") return False # Process receipt image def process_receipt(model, image): prompt = """Analyze this image and determine if it's a receipt. If it is a receipt, extract: - Total amount (as float) - List of items purchased (array of strings) - Date of transaction (DD/MM/YYYY format) - Receipt number (as string) Return JSON format with keys: is_receipt (boolean), total, items, date, receipt_number. If not a receipt, return {"is_receipt": false}""" try: response = model.generate_content([prompt, image]) return response.text except Exception as e: print(f"Gemini error: {str(e)}") return "{}" # Write report @app.route('/api/write-report', methods=['POST']) def generate_report(): prompt = """You are the TrueSpend AI analyst, analyze this transaction data and write an insightful business report on the spending habits of the employees. Make sure the report is in plain text""" model = configure_gemini() try: transaction_data = request.get_json() transaction_json_string = json.dumps(transaction_data['transactions']) response = model.generate_content([prompt, transaction_json_string]) report = response.text return jsonify({"report": report}) except Exception as e: return jsonify({"error": str(e)}), 500 # ======================================== # Authentication Endpoints # ======================================== # (Any existing authentication endpoints remain unchanged) # ======================================== # Receipt Processing Endpoint (unchanged) # ======================================== @app.route('/api/process-receipt', methods=['POST']) def process_receipt_endpoint(): try: auth_header = request.headers.get('Authorization') token = auth_header.split('Bearer ')[1] if auth_header else None uid = verify_token(token) if token else None if not uid: return jsonify({'error': 'Invalid token'}), 401 user_ref = db.reference(f'users/{uid}') user_data = user_ref.get() # Removed: check_daily_reset(user_ref) # Handle manual entry if request.form.get('manual_entry') == 'true': return handle_manual_entry(uid, user_ref, user_data) # Handle image processing file = request.files.get('receipt') if not file: return jsonify({'error': 'No file uploaded'}), 400 image_bytes = file.read() file_hash = hashlib.md5(image_bytes).hexdigest() transactions_ref = db.reference('transactions') existing = transactions_ref.order_by_child('hash').equal_to(file_hash).get() if existing: return jsonify({'error': 'Receipt already processed'}), 400 image = Image.open(io.BytesIO(image_bytes)) model = configure_gemini() result_text = process_receipt(model, image) try: json_str = result_text[result_text.find('{'):result_text.rfind('}')+1] data = json.loads(json_str) except json.JSONDecodeError: return jsonify({'error': 'Failed to parse receipt data', 'raw_response': result_text}), 400 if not data.get('is_receipt', False): return jsonify({'error': 'Not a valid receipt'}), 400 return validate_and_save_transaction( uid=uid, user_data=user_data, data=data, file_hash=file_hash, image_bytes=image_bytes, manual=False ) except Exception as e: print(e) return jsonify({'error': str(e)}), 500 def handle_manual_entry(uid, user_ref, user_data): try: data = { 'total': float(request.form.get('total')), 'items': [item.strip() for item in request.form.get('items', '').split(',')], 'date': request.form.get('date'), 'receipt_number': request.form.get('receipt_number'), 'is_receipt': True } return validate_and_save_transaction( uid=uid, user_data=user_data, data=data, file_hash=hashlib.md5(str(datetime.now()).encode()).hexdigest(), image_bytes=None, manual=True ) except Exception as e: return jsonify({'error': str(e)}), 400 def validate_and_save_transaction(uid, user_data, data, file_hash, image_bytes, manual=False): transactions_ref = db.reference('transactions') receipt_number = data.get('receipt_number') items = data.get('items', []) # Check for duplicate transactions based on receipt number and items. existing_transactions_with_receipt = transactions_ref.order_by_child('receipt_number').equal_to(receipt_number).get() if existing_transactions_with_receipt: for transaction_id, existing_transaction_data in existing_transactions_with_receipt.items(): existing_items = sorted(existing_transaction_data.get('items', [])) current_items = sorted(items) if existing_items == current_items: return jsonify({'error': f"Transaction with Receipt #{receipt_number} and identical items already exists"}), 400 total = float(data.get('total', 0)) # Removed the check for insufficient funds to allow negative balances # Upload image if available image_url = None if image_bytes and not manual: timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') blob = bucket.blob(f'receipts/{uid}/{timestamp}_{file_hash}.jpg') blob.upload_from_string(image_bytes, content_type='image/jpeg') image_url = blob.public_url # Update user cash - now allowing negative values new_remaining = user_data['remaining_cash'] - total db.reference(f'users/{uid}').update({'remaining_cash': new_remaining}) transaction_data = { 'uid': uid, 'total': total, 'items': items, 'date': data.get('date'), 'receipt_number': receipt_number, 'timestamp': datetime.now(pytz.UTC).isoformat(), 'hash': file_hash, 'image_url': image_url, 'manual_entry': manual } new_transaction_ref = transactions_ref.push(transaction_data) return jsonify({ 'success': True, 'transaction': {**transaction_data, 'id': new_transaction_ref.key}, 'remaining_cash': new_remaining }) # ======================================== # Data Endpoints for Visualizations # ======================================== @app.route('/api/user/spending-overview', methods=['GET']) def get_spending_overview(): try: uid = verify_token(request.headers.get('Authorization', '').split(' ')[1]) transactions_ref = db.reference('transactions') transactions = transactions_ref.order_by_child('uid').equal_to(uid).get() if transactions is None: return jsonify({ 'daily_spending': [], 'recent_transactions': [] }) df = pd.DataFrame.from_dict(transactions, orient='index') if df.empty: return jsonify({ 'daily_spending': [], 'recent_transactions': [] }) df.dropna(subset=['uid','total','items','date', 'receipt_number', 'timestamp','hash', 'manual_entry'], inplace=True) if df.empty: return jsonify({ 'daily_spending': [], 'recent_transactions': [] }) df['date'] = pd.to_datetime(df['date'], format='%d/%m/%Y', errors='coerce') df.dropna(subset=['date'], inplace=True) if df.empty: return jsonify({ 'daily_spending': [], 'recent_transactions': [] }) daily_spending = df.groupby(df['date'].dt.date)['total'].sum().reset_index() return jsonify({ 'daily_spending': daily_spending.to_dict(orient='records'), 'recent_transactions': df.sort_values(by='timestamp', ascending=False) .head(10) .to_dict(orient='records') }) except Exception as e: return jsonify({'error': str(e)}), 500 # ======================================== # Modified verify_admin function (now checks database is_admin flag) # ======================================== def verify_admin(auth_header): if not auth_header or not auth_header.startswith('Bearer '): raise ValueError('Invalid token') token = auth_header.split(' ')[1] uid = verify_token(token) if not uid: raise PermissionError('Invalid user') user_ref = db.reference(f'users/{uid}') user_data = user_ref.get() if not user_data or not user_data.get('is_admin', False): raise PermissionError('Admin access required') try: auth.set_custom_user_claims(uid, {"admin": True}) print(f"Custom admin claim set for user {uid}") except Exception as e: print(f"Error setting custom admin claim: {e}") raise PermissionError('Error setting admin claim, but admin verified') return uid # ======================================== # Existing Admin Endpoints # ======================================== @app.route('/api/admin/overview', methods=['GET']) def get_admin_overview(): try: verify_admin(request.headers.get('Authorization', '')) users_ref = db.reference('users') all_users = users_ref.get() or {} users_list = [] for uid, user_data in all_users.items(): try: auth_user = auth.get_user(uid) email = auth_user.email except: email = "Deleted User" users_list.append({ 'uid': uid, 'email': email, 'daily_cash': user_data.get('daily_cash', 0), 'remaining_cash': user_data.get('remaining_cash', 0), 'last_reset': user_data.get('last_reset'), 'is_admin': user_data.get('is_admin', False) }) transactions_ref = db.reference('transactions') all_transactions = transactions_ref.get() or {} transactions_list = [{'id': tid, **data} for tid, data in all_transactions.items()] return jsonify({ 'users': users_list, 'transactions': transactions_list, 'analytics': { 'total_users': len(users_list), 'total_transactions': len(transactions_list), 'total_spent': sum(t['total'] for t in transactions_list) } }) except Exception as e: return jsonify({'error': str(e)}), 500 @app.route('/api/admin/users', methods=['POST']) def create_user(): try: verify_admin(request.headers.get('Authorization', '')) data = request.get_json() user = auth.create_user( email=data['email'], password=data['password'] ) user_ref = db.reference(f'users/{user.uid}') user_data = { 'daily_cash': data.get('daily_cash', 0), 'remaining_cash': data.get('daily_cash', 0), 'last_reset': '2025-01-01T00:00:00+00:00', 'is_admin': data.get('is_admin', False) } user_ref.set(user_data) return jsonify({ 'success': True, 'user': { 'uid': user.uid, 'email': user.email, **user_data } }), 201 except Exception as e: return jsonify({'error': str(e)}), 400 @app.route('/api/admin/users//limit', methods=['PUT']) def update_user_limit(uid): try: verify_admin(request.headers.get('Authorization', '')) data = request.get_json() new_limit = float(data['daily_cash']) user_ref = db.reference(f'users/{uid}') user_data = user_ref.get() if not user_data: return jsonify({'error': 'User not found'}), 404 updates = {'daily_cash': new_limit} current_remaining = user_data.get('remaining_cash', new_limit) if current_remaining > new_limit: updates['remaining_cash'] = new_limit user_ref.update(updates) return jsonify({ 'success': True, 'new_daily_cash': new_limit, 'updated_remaining': updates.get('remaining_cash', current_remaining) }) except Exception as e: return jsonify({'error': str(e)}), 400 # ======================================== # New Admin Endpoints for Updating Remaining Cash, Setting Cash Limits, and Resetting Password # ======================================== @app.route('/api/admin/users//remaining-cash', methods=['PUT']) def update_remaining_cash(uid): try: verify_admin(request.headers.get('Authorization', '')) data = request.get_json() if 'remaining_cash' not in data: return jsonify({'error': 'remaining_cash is required'}), 400 new_remaining_cash = float(data['remaining_cash']) user_ref = db.reference(f'users/{uid}') user_data = user_ref.get() if not user_data: return jsonify({'error': 'User not found'}), 404 user_ref.update({'remaining_cash': new_remaining_cash}) return jsonify({'success': True, 'remaining_cash': new_remaining_cash}) except Exception as e: return jsonify({'error': str(e)}), 400 @app.route('/api/admin/users//reset-password', methods=['PUT']) def admin_reset_password(uid): try: verify_admin(request.headers.get('Authorization', '')) data = request.get_json() new_password = data.get('new_password') if not new_password: return jsonify({'error': 'new_password is required'}), 400 auth.update_user(uid, password=new_password) return jsonify({'success': True, 'message': 'Password reset successfully'}) except Exception as e: return jsonify({'error': str(e)}), 400 # ======================================== # User management endpoint for profile # ======================================== @app.route('/api/user/profile', methods=['GET']) def get_user_profile(): try: uid = verify_token(request.headers.get('Authorization', '').split(' ')[1]) user = auth.get_user(uid) user_data = db.reference(f'users/{uid}').get() return jsonify({ 'uid': uid, 'email': user.email, 'daily_cash': user_data.get('daily_cash', 0), 'remaining_cash': user_data.get('remaining_cash', 0), 'last_reset': user_data.get('last_reset'), 'is_admin': user_data.get('is_admin', False) }) except Exception as e: uid = verify_token(request.headers.get('Authorization', '').split(' ')[1]) user = auth.get_user(uid) user_data = db.reference(f'users/{uid}').get() return jsonify({'error': str(e)+ f'user data: {user_data}'}), 500 # ======================================== # Receipt media endpoints # ======================================== def get_blob_from_image_url(image_url): parsed = urlparse(image_url) if parsed.netloc == "storage.googleapis.com": parts = parsed.path.strip("/").split("/", 1) if len(parts) == 2: bucket_name, blob_path = parts return blob_path prefix = f"/v0/b/{bucket.name}/o/" if parsed.path.startswith(prefix): encoded_blob_path = parsed.path[len(prefix):] blob_path = unquote(encoded_blob_path) return blob_path return None @app.route('/api/admin/receipt//view', methods=['GET']) def view_receipt(transaction_id): try: verify_admin(request.headers.get('Authorization', '')) transaction_ref = db.reference(f'transactions/{transaction_id}') transaction_data = transaction_ref.get() if not transaction_data: return jsonify({'error': 'Transaction not found'}), 404 image_url = transaction_data.get('image_url') if not image_url: return jsonify({'error': 'No receipt image found for this transaction'}), 404 blob_path = get_blob_from_image_url(image_url) if not blob_path: return jsonify({'error': 'Could not determine blob path from URL'}), 500 print(f"Blob path for view: {blob_path}") blob = bucket.blob(blob_path) if not blob.exists(): print("Blob does not exist at path:", blob_path) return jsonify({'error': 'Blob not found'}), 404 signed_url = blob.generate_signed_url(expiration=timedelta(minutes=10)) r = requests.get(signed_url) if r.status_code != 200: return jsonify({'error': 'Unable to fetch image from storage'}), 500 return send_file(io.BytesIO(r.content), mimetype='image/jpeg') except Exception as e: print(f"View receipt error: {str(e)}") return jsonify({'error': str(e)}), 500 @app.route('/api/admin/receipt//download', methods=['GET']) def download_receipt(transaction_id): try: verify_admin(request.headers.get('Authorization', '')) transaction_ref = db.reference(f'transactions/{transaction_id}') transaction_data = transaction_ref.get() if not transaction_data: return jsonify({'error': 'Transaction not found'}), 404 image_url = transaction_data.get('image_url') if not image_url: return jsonify({'error': 'No receipt image found for this transaction'}), 404 blob_path = get_blob_from_image_url(image_url) if not blob_path: return jsonify({'error': 'Could not determine blob path from URL'}), 500 print(f"Blob path for download: {blob_path}") blob = bucket.blob(blob_path) if not blob.exists(): print("Blob does not exist at path:", blob_path) return jsonify({'error': 'Blob not found'}), 404 signed_url = blob.generate_signed_url(expiration=timedelta(minutes=10)) r = requests.get(signed_url) if r.status_code != 200: return jsonify({'error': 'Unable to fetch image from storage'}), 500 return send_file( io.BytesIO(r.content), mimetype='image/jpeg', as_attachment=True, attachment_filename='receipt.jpg' ) except Exception as e: print(f"Download receipt error: {str(e)}") return jsonify({'error': str(e)}), 500 # ======================================== # Delete user endpoint # ======================================== @app.route('/api/admin/users/', methods=['DELETE']) def delete_user(uid): try: verify_admin(request.headers.get('Authorization', '')) try: user = auth.get_user(uid) except auth.UserNotFoundError: return jsonify({'error': 'User not found'}), 404 auth.delete_user(uid) db.reference(f'users/{uid}').delete() transactions_ref = db.reference('transactions') user_transactions = transactions_ref.order_by_child('uid').equal_to(uid).get() if user_transactions: for transaction_id in user_transactions.keys(): transactions_ref.child(transaction_id).delete() return jsonify({ 'success': True, 'message': f'User {uid} and all associated data deleted successfully' }) except Exception as e: return jsonify({'error': str(e)}), 500 if __name__ == '__main__': app.run(debug=True, host="0.0.0.0", port=7860)