Spaces:
Sleeping
Sleeping
| import os | |
| import io | |
| import json | |
| import hashlib | |
| from datetime import datetime, time, timedelta | |
| from PIL import Image | |
| import pytz | |
| from flask import Flask, request, jsonify, send_file | |
| from flask_cors import CORS | |
| import google.generativeai as genai | |
| import firebase_admin | |
| from firebase_admin import credentials, db, storage, auth | |
| import pandas as pd | |
| import requests | |
| from urllib.parse import urlparse, unquote | |
| import numpy as np | |
| app = Flask(__name__) | |
| CORS(app) | |
| # Firebase initialization | |
| cred = credentials.Certificate('mydateproject-e7994-firebase-adminsdk-mpa9a-2fd9c32a98.json') | |
| firebase_admin.initialize_app(cred, { | |
| 'databaseURL': 'https://mydateproject-e7994.firebaseio.com', | |
| 'storageBucket': 'mydateproject-e7994.firebasestorage.app' | |
| }) | |
| bucket = storage.bucket() | |
| api_key = os.environ['Gemini'] | |
| FIREBASE_API_KEY = os.environ.get('FIREBASE_API_KEY') | |
| # Helper functions | |
| def configure_gemini(): | |
| genai.configure(api_key=api_key) | |
| return genai.GenerativeModel('gemini-2.0-flash-thinking-exp') | |
| def verify_token(token): | |
| try: | |
| decoded_token = auth.verify_id_token(token) | |
| return decoded_token['uid'] | |
| except Exception as e: | |
| return None | |
| def check_daily_reset(user_ref): | |
| try: | |
| user_data = user_ref.get() | |
| now = datetime.now(pytz.UTC) | |
| last_reset = datetime.fromisoformat(user_data.get('last_reset', '2000-01-01T00:00:00+00:00')) | |
| if now.time() >= time(8,0) and last_reset.date() < now.date(): | |
| user_ref.update({ | |
| 'remaining_cash': user_data['daily_cash'], | |
| 'last_reset': now.isoformat() | |
| }) | |
| return True | |
| return False | |
| except Exception as e: | |
| print(f"Reset error: {str(e)}") | |
| return False | |
| # Process receipt image | |
| def process_receipt(model, image): | |
| prompt = """Analyze this image and determine if it's a receipt. If it is a receipt, extract: | |
| - Total amount (as float) | |
| - List of items purchased (array of strings) | |
| - Date of transaction (DD/MM/YYYY format) | |
| - Receipt number (as string) | |
| Return JSON format with keys: is_receipt (boolean), total, items, date, receipt_number. | |
| If not a receipt, return {"is_receipt": false}""" | |
| try: | |
| response = model.generate_content([prompt, image]) | |
| return response.text | |
| except Exception as e: | |
| print(f"Gemini error: {str(e)}") | |
| return "{}" | |
| # Write report | |
| def generate_report(): | |
| prompt = """You are the TrueSpend AI analyst, analyze this transaction data | |
| and write an insightful business report on the spending habits of the employees. | |
| Make sure the report is in plain text""" | |
| model = configure_gemini() | |
| try: | |
| transaction_data = request.get_json() | |
| transaction_json_string = json.dumps(transaction_data['transactions']) | |
| response = model.generate_content([prompt, transaction_json_string]) | |
| report = response.text | |
| return jsonify({"report": report}) | |
| except Exception as e: | |
| return jsonify({"error": str(e)}), 500 | |
| # ======================================== | |
| # Authentication Endpoints | |
| # ======================================== | |
| # (Any existing authentication endpoints remain unchanged) | |
| # ======================================== | |
| # Image Receipt Processing Endpoint | |
| # ======================================== | |
| def process_receipt_endpoint(): | |
| try: | |
| auth_header = request.headers.get('Authorization') | |
| token = auth_header.split('Bearer ')[1] if auth_header else None | |
| uid = verify_token(token) if token else None | |
| if not uid: | |
| return jsonify({'error': 'Invalid token'}), 401 | |
| user_ref = db.reference(f'users/{uid}') | |
| user_data = user_ref.get() | |
| # If the confirmation flag is set, then use the form fields (and hidden file_hash/image_url) | |
| # to create the transaction. (Note: image_bytes is None because the image was already uploaded.) | |
| if request.form.get('confirmed') == 'true': | |
| data = { | |
| 'total': float(request.form.get('total')), | |
| 'items': [item.strip() for item in request.form.get('items', '').split(',')], | |
| 'date': request.form.get('date'), | |
| 'receipt_number': request.form.get('receipt_number'), | |
| 'is_receipt': True, | |
| 'image_url': request.form.get('image_url') # provided by the first submission | |
| } | |
| file_hash = request.form.get('file_hash', '') | |
| # In this confirmation branch, we pass image_bytes as None. | |
| return validate_and_save_transaction( | |
| uid=uid, | |
| user_data=user_data, | |
| data=data, | |
| file_hash=file_hash, | |
| image_bytes=None, | |
| manual=False | |
| ) | |
| # Handle image processing (first submission) | |
| file = request.files.get('receipt') | |
| if not file: | |
| return jsonify({'error': 'No file uploaded'}), 400 | |
| image_bytes = file.read() | |
| file_hash = hashlib.md5(image_bytes).hexdigest() | |
| transactions_ref = db.reference('transactions') | |
| existing = transactions_ref.order_by_child('hash').equal_to(file_hash).get() | |
| if existing: | |
| return jsonify({'error': 'Receipt already processed'}), 400 | |
| # Immediately upload the image so that it is stored regardless of review outcome. | |
| timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') | |
| blob = bucket.blob(f'receipts/{uid}/{timestamp}_{file_hash}.jpg') | |
| blob.upload_from_string(image_bytes, content_type='image/jpeg') | |
| image_url = blob.public_url | |
| # Process the image with Gemini | |
| image = Image.open(io.BytesIO(image_bytes)) | |
| # --- Image Compression --- | |
| compressed_buffer = io.BytesIO() | |
| image.save(compressed_buffer, format="JPEG", optimize=True, quality=90) # Adjust quality as needed | |
| compressed_image_bytes = compressed_buffer.getvalue() | |
| compressed_image = Image.open(io.BytesIO(compressed_image_bytes)) | |
| model = configure_gemini() | |
| result_text = process_receipt(model, compressed_image) | |
| try: | |
| json_str = result_text[result_text.find('{'):result_text.rfind('}')+1] | |
| data = json.loads(json_str) | |
| except json.JSONDecodeError: | |
| return jsonify({'error': 'Failed to parse receipt data', 'raw_response': result_text}), 400 | |
| if not data.get('is_receipt', False): | |
| return jsonify({'error': 'Not a valid receipt'}), 400 | |
| # Instead of saving immediately, return the extracted data along with the image info for review. | |
| data['file_hash'] = file_hash | |
| data['image_url'] = image_url | |
| return jsonify({ | |
| 'success': True, | |
| 'extracted': True, | |
| 'data': data, | |
| 'message': 'Review extracted information before confirming.' | |
| }) | |
| except Exception as e: | |
| print(e) | |
| return jsonify({'error': str(e)}), 500 | |
| # ======================================== | |
| # Manual Entry Endpoint | |
| # ======================================== | |
| def manual_entry_endpoint(): | |
| try: | |
| auth_header = request.headers.get('Authorization') | |
| token = auth_header.split('Bearer ')[1] if auth_header else None | |
| uid = verify_token(token) if token else None | |
| if not uid: | |
| return jsonify({'error': 'Invalid token'}), 401 | |
| user_ref = db.reference(f'users/{uid}') | |
| user_data = user_ref.get() | |
| return handle_manual_entry(uid, user_ref, user_data) | |
| except Exception as e: | |
| return jsonify({'error': str(e)}), 500 | |
| def handle_manual_entry(uid, user_ref, user_data): | |
| try: | |
| data = { | |
| 'total': float(request.form.get('total')), | |
| 'items': [item.strip() for item in request.form.get('items', '').split(',')], | |
| 'date': request.form.get('date'), | |
| 'receipt_number': request.form.get('receipt_number'), | |
| 'is_receipt': True | |
| } | |
| return validate_and_save_transaction( | |
| uid=uid, | |
| user_data=user_data, | |
| data=data, | |
| file_hash=hashlib.md5(str(datetime.now()).encode()).hexdigest(), | |
| image_bytes=None, | |
| manual=True | |
| ) | |
| except Exception as e: | |
| return jsonify({'error': str(e)}), 400 | |
| def validate_and_save_transaction(uid, user_data, data, file_hash, image_bytes, manual=False): | |
| transactions_ref = db.reference('transactions') | |
| receipt_number = data.get('receipt_number') | |
| items = data.get('items', []) | |
| # Check for duplicate transactions based on receipt number and items | |
| existing_transactions_with_receipt = transactions_ref.order_by_child('receipt_number').equal_to(receipt_number).get() | |
| if existing_transactions_with_receipt: | |
| for transaction_id, existing_transaction_data in existing_transactions_with_receipt.items(): | |
| existing_items = sorted(existing_transaction_data.get('items', [])) | |
| current_items = sorted(items) | |
| if existing_items == current_items: | |
| return jsonify({'error': f"Transaction with Receipt #{receipt_number} and identical items already exists"}), 400 | |
| total = float(data.get('total', 0)) | |
| # Initialize image_url from data (could be None) | |
| image_url = data.get('image_url') | |
| # Upload image if image_bytes are provided and not manual | |
| if image_bytes and not manual: | |
| timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') | |
| blob = bucket.blob(f'receipts/{uid}/{timestamp}_{file_hash}.jpg') | |
| blob.upload_from_string(image_bytes, content_type='image/jpeg') | |
| image_url = blob.public_url | |
| # Update user cash - allowing negative values | |
| new_remaining = user_data['remaining_cash'] - total | |
| db.reference(f'users/{uid}').update({'remaining_cash': new_remaining}) | |
| # Build transaction data without image_url initially | |
| transaction_data = { | |
| 'uid': uid, | |
| 'total': total, | |
| 'items': items, | |
| 'date': data.get('date'), | |
| 'receipt_number': receipt_number, | |
| 'timestamp': datetime.now(pytz.UTC).isoformat(), | |
| 'hash': file_hash, | |
| 'manual_entry': manual | |
| } | |
| # Only add image_url if it is not None | |
| if image_url is not None: | |
| transaction_data['image_url'] = image_url | |
| # Save the transaction | |
| new_transaction_ref = transactions_ref.push(transaction_data) | |
| return jsonify({ | |
| 'success': True, | |
| 'transaction': {**transaction_data, 'id': new_transaction_ref.key}, | |
| 'remaining_cash': new_remaining | |
| }) | |
| # ======================================== | |
| # Data Endpoints for Visualizations | |
| # ======================================== | |
| def get_spending_overview(): | |
| try: | |
| # Extract the token and verify it. | |
| auth_header = request.headers.get('Authorization', '') | |
| token = auth_header.split(' ')[1] if len(auth_header.split(' ')) > 1 else '' | |
| uid = verify_token(token) | |
| # Get transactions for the user. | |
| transactions_ref = db.reference('transactions') | |
| transactions = transactions_ref.order_by_child('uid').equal_to(uid).get() or {} | |
| # Convert to list for easier processing | |
| transactions_list = [] | |
| for tx_id, tx_data in transactions.items(): | |
| # Add ID to the transaction data | |
| tx_item = tx_data.copy() | |
| tx_item['id'] = tx_id | |
| transactions_list.append(tx_item) | |
| # Create a DataFrame | |
| df = pd.DataFrame(transactions_list) if transactions_list else pd.DataFrame() | |
| if df.empty: | |
| return jsonify({ | |
| 'daily_spending': [], | |
| 'recent_transactions': [] | |
| }) | |
| # Handle date parsing without dropping invalid dates | |
| if 'date' in df.columns: | |
| # Create a temporary column for parsed dates | |
| df['parsed_date'] = pd.to_datetime(df['date'], errors='coerce') | |
| # For rows with invalid dates, set a default date | |
| default_date = pd.Timestamp('2000-01-01') | |
| mask = df['parsed_date'].isna() | |
| if mask.any(): | |
| print(f"Found {mask.sum()} transactions with invalid dates") | |
| # Keep the original string in 'date' column, but use default for calculations | |
| df.loc[mask, 'parsed_date'] = default_date | |
| # Create ISO format dates for valid dates | |
| df['date_iso'] = df['parsed_date'].apply(lambda d: d.isoformat() if pd.notnull(d) else '2000-01-01T00:00:00') | |
| # Extract date part for daily spending calculations | |
| df['date_only'] = df['date_iso'].apply(lambda d: d.split('T')[0]) | |
| else: | |
| # If no date column exists, create default values | |
| df['date_only'] = '2000-01-01' | |
| df['date_iso'] = '2000-01-01T00:00:00' | |
| # For daily spending, group by date | |
| daily_spending = df.groupby('date_only')['total'].sum().reset_index() | |
| daily_spending.rename(columns={'date_only': 'date'}, inplace=True) | |
| # Sort transactions by timestamp if available | |
| if 'timestamp' in df.columns: | |
| df = df.sort_values(by='timestamp', ascending=False) | |
| # Select the most recent transactions | |
| recent_transactions = df.head(10) | |
| # Replace NaN with None for JSON serialization | |
| daily_spending = daily_spending.replace({np.nan: None}) | |
| recent_transactions = recent_transactions.replace({np.nan: None}) | |
| # Ensure proper column selections for the response | |
| recent_tx_dict = recent_transactions.to_dict(orient='records') | |
| # Make sure each transaction has the original date string | |
| for tx in recent_tx_dict: | |
| # Remove temporary columns | |
| if 'parsed_date' in tx: | |
| del tx['parsed_date'] | |
| if 'date_only' in tx: | |
| del tx['date_only'] | |
| if 'date_iso' in tx and 'date' not in tx: | |
| tx['date'] = tx['date_iso'] | |
| del tx['date_iso'] | |
| elif 'date_iso' in tx: | |
| del tx['date_iso'] | |
| return jsonify({ | |
| 'daily_spending': daily_spending.to_dict(orient='records'), | |
| 'recent_transactions': recent_tx_dict | |
| }) | |
| except Exception as e: | |
| print(f"Error in spending overview: {e}") | |
| return jsonify({'error': str(e)}), 500 | |
| # ======================================== | |
| # Modified verify_admin function (now checks database is_admin flag) | |
| # ======================================== | |
| def verify_admin(auth_header): | |
| if not auth_header or not auth_header.startswith('Bearer '): | |
| raise ValueError('Invalid token') | |
| token = auth_header.split(' ')[1] | |
| uid = verify_token(token) | |
| if not uid: | |
| raise PermissionError('Invalid user') | |
| user_ref = db.reference(f'users/{uid}') | |
| user_data = user_ref.get() | |
| if not user_data or not user_data.get('is_admin', False): | |
| raise PermissionError('Admin access required') | |
| try: | |
| auth.set_custom_user_claims(uid, {"admin": True}) | |
| print(f"Custom admin claim set for user {uid}") | |
| except Exception as e: | |
| print(f"Error setting custom admin claim: {e}") | |
| raise PermissionError('Error setting admin claim, but admin verified') | |
| return uid | |
| # ======================================== | |
| # Existing Admin Endpoints | |
| # ======================================== | |
| def get_admin_overview(): | |
| try: | |
| verify_admin(request.headers.get('Authorization', '')) | |
| users_ref = db.reference('users') | |
| all_users = users_ref.get() or {} | |
| users_list = [] | |
| for uid, user_data in all_users.items(): | |
| try: | |
| auth_user = auth.get_user(uid) | |
| email = auth_user.email | |
| except: | |
| email = "Deleted User" | |
| users_list.append({ | |
| 'uid': uid, | |
| 'email': email, | |
| 'daily_cash': user_data.get('daily_cash', 0), | |
| 'remaining_cash': user_data.get('remaining_cash', 0), | |
| 'last_reset': user_data.get('last_reset'), | |
| 'is_admin': user_data.get('is_admin', False) | |
| }) | |
| transactions_ref = db.reference('transactions') | |
| all_transactions = transactions_ref.get() or {} | |
| transactions_list = [{'id': tid, **data} for tid, data in all_transactions.items()] | |
| return jsonify({ | |
| 'users': users_list, | |
| 'transactions': transactions_list, | |
| 'analytics': { | |
| 'total_users': len(users_list), | |
| 'total_transactions': len(transactions_list), | |
| 'total_spent': sum(t['total'] for t in transactions_list) | |
| } | |
| }) | |
| except Exception as e: | |
| return jsonify({'error': str(e)}), 500 | |
| def create_user(): | |
| try: | |
| verify_admin(request.headers.get('Authorization', '')) | |
| data = request.get_json() | |
| user = auth.create_user( | |
| email=data['email'], | |
| password=data['password'] | |
| ) | |
| user_ref = db.reference(f'users/{user.uid}') | |
| user_data = { | |
| 'daily_cash': data.get('daily_cash', 0), | |
| 'remaining_cash': data.get('daily_cash', 0), | |
| 'last_reset': '2025-01-01T00:00:00+00:00', | |
| 'is_admin': data.get('is_admin', False) | |
| } | |
| user_ref.set(user_data) | |
| return jsonify({ | |
| 'success': True, | |
| 'user': { | |
| 'uid': user.uid, | |
| 'email': user.email, | |
| **user_data | |
| } | |
| }), 201 | |
| except Exception as e: | |
| return jsonify({'error': str(e)}), 400 | |
| def update_user_limit(uid): | |
| try: | |
| verify_admin(request.headers.get('Authorization', '')) | |
| data = request.get_json() | |
| new_limit = float(data['daily_cash']) | |
| user_ref = db.reference(f'users/{uid}') | |
| user_data = user_ref.get() | |
| if not user_data: | |
| return jsonify({'error': 'User not found'}), 404 | |
| updates = {'daily_cash': new_limit} | |
| current_remaining = user_data.get('remaining_cash', new_limit) | |
| if current_remaining > new_limit: | |
| updates['remaining_cash'] = new_limit | |
| user_ref.update(updates) | |
| return jsonify({ | |
| 'success': True, | |
| 'new_daily_cash': new_limit, | |
| 'updated_remaining': updates.get('remaining_cash', current_remaining) | |
| }) | |
| except Exception as e: | |
| return jsonify({'error': str(e)}), 400 | |
| # ======================================== | |
| # New Admin Endpoints for Updating Remaining Cash, Setting Cash Limits, and Resetting Password | |
| # ======================================== | |
| def update_remaining_cash(uid): | |
| try: | |
| verify_admin(request.headers.get('Authorization', '')) | |
| data = request.get_json() | |
| if 'remaining_cash' not in data: | |
| return jsonify({'error': 'remaining_cash is required'}), 400 | |
| new_remaining_cash = float(data['remaining_cash']) | |
| user_ref = db.reference(f'users/{uid}') | |
| user_data = user_ref.get() | |
| if not user_data: | |
| return jsonify({'error': 'User not found'}), 404 | |
| user_ref.update({'remaining_cash': new_remaining_cash}) | |
| return jsonify({'success': True, 'remaining_cash': new_remaining_cash}) | |
| except Exception as e: | |
| return jsonify({'error': str(e)}), 400 | |
| def admin_reset_password(uid): | |
| try: | |
| verify_admin(request.headers.get('Authorization', '')) | |
| data = request.get_json() | |
| new_password = data.get('new_password') | |
| if not new_password: | |
| return jsonify({'error': 'new_password is required'}), 400 | |
| auth.update_user(uid, password=new_password) | |
| return jsonify({'success': True, 'message': 'Password reset successfully'}) | |
| except Exception as e: | |
| return jsonify({'error': str(e)}), 400 | |
| def delete_transaction(transaction_id): | |
| """Allow admin users to delete transactions.""" | |
| try: | |
| verify_admin(request.headers.get('Authorization', '')) | |
| transactions_ref = db.reference(f'transactions/{transaction_id}') | |
| transaction_data = transactions_ref.get() | |
| if not transaction_data: | |
| return jsonify({'error': 'Transaction not found'}), 404 | |
| transactions_ref.delete() | |
| return jsonify({'success': True, 'message': 'Transaction deleted successfully'}) | |
| except Exception as e: | |
| return jsonify({'error': str(e)}), 500 | |
| def edit_transaction(transaction_id): | |
| """Allow admin users to edit transactions.""" | |
| try: | |
| verify_admin(request.headers.get('Authorization', '')) | |
| transactions_ref = db.reference(f'transactions/{transaction_id}') | |
| transaction_data = transactions_ref.get() | |
| if not transaction_data: | |
| return jsonify({'error': 'Transaction not found'}), 404 | |
| update_data = request.get_json() | |
| transactions_ref.update(update_data) | |
| return jsonify({'success': True, 'updated_transaction': update_data}) | |
| except Exception as e: | |
| return jsonify({'error': str(e)}), 500 | |
| # ======================================== | |
| # User management endpoint for profile | |
| # ======================================== | |
| def get_user_profile(): | |
| try: | |
| uid = verify_token(request.headers.get('Authorization', '').split(' ')[1]) | |
| user = auth.get_user(uid) | |
| user_data = db.reference(f'users/{uid}').get() | |
| return jsonify({ | |
| 'uid': uid, | |
| 'email': user.email, | |
| 'daily_cash': user_data.get('daily_cash', 0), | |
| 'remaining_cash': user_data.get('remaining_cash', 0), | |
| 'last_reset': user_data.get('last_reset'), | |
| 'is_admin': user_data.get('is_admin', False) | |
| }) | |
| except Exception as e: | |
| uid = verify_token(request.headers.get('Authorization', '').split(' ')[1]) | |
| user = auth.get_user(uid) | |
| user_data = db.reference(f'users/{uid}').get() | |
| return jsonify({'error': str(e)+ f'user data: {user_data}'}), 500 | |
| # ======================================== | |
| # Receipt media endpoints | |
| # ======================================== | |
| def get_blob_from_image_url(image_url): | |
| parsed = urlparse(image_url) | |
| if parsed.netloc == "storage.googleapis.com": | |
| parts = parsed.path.strip("/").split("/", 1) | |
| if len(parts) == 2: | |
| bucket_name, blob_path = parts | |
| return blob_path | |
| prefix = f"/v0/b/{bucket.name}/o/" | |
| if parsed.path.startswith(prefix): | |
| encoded_blob_path = parsed.path[len(prefix):] | |
| blob_path = unquote(encoded_blob_path) | |
| return blob_path | |
| return None | |
| def view_receipt(transaction_id): | |
| try: | |
| verify_admin(request.headers.get('Authorization', '')) | |
| transaction_ref = db.reference(f'transactions/{transaction_id}') | |
| transaction_data = transaction_ref.get() | |
| if not transaction_data: | |
| return jsonify({'error': 'Transaction not found'}), 404 | |
| image_url = transaction_data.get('image_url') | |
| if not image_url: | |
| return jsonify({'error': 'No receipt image found for this transaction'}), 404 | |
| blob_path = get_blob_from_image_url(image_url) | |
| if not blob_path: | |
| return jsonify({'error': 'Could not determine blob path from URL'}), 500 | |
| print(f"Blob path for view: {blob_path}") | |
| blob = bucket.blob(blob_path) | |
| if not blob.exists(): | |
| print("Blob does not exist at path:", blob_path) | |
| return jsonify({'error': 'Blob not found'}), 404 | |
| signed_url = blob.generate_signed_url(expiration=timedelta(minutes=10)) | |
| r = requests.get(signed_url) | |
| if r.status_code != 200: | |
| return jsonify({'error': 'Unable to fetch image from storage'}), 500 | |
| return send_file(io.BytesIO(r.content), mimetype='image/jpeg') | |
| except Exception as e: | |
| print(f"View receipt error: {str(e)}") | |
| return jsonify({'error': str(e)}), 500 | |
| def download_receipt(transaction_id): | |
| try: | |
| verify_admin(request.headers.get('Authorization', '')) | |
| transaction_ref = db.reference(f'transactions/{transaction_id}') | |
| transaction_data = transaction_ref.get() | |
| if not transaction_data: | |
| return jsonify({'error': 'Transaction not found'}), 404 | |
| image_url = transaction_data.get('image_url') | |
| if not image_url: | |
| return jsonify({'error': 'No receipt image found for this transaction'}), 404 | |
| blob_path = get_blob_from_image_url(image_url) | |
| if not blob_path: | |
| return jsonify({'error': 'Could not determine blob path from URL'}), 500 | |
| print(f"Blob path for download: {blob_path}") | |
| blob = bucket.blob(blob_path) | |
| if not blob.exists(): | |
| print("Blob does not exist at path:", blob_path) | |
| return jsonify({'error': 'Blob not found'}), 404 | |
| signed_url = blob.generate_signed_url(expiration=timedelta(minutes=10)) | |
| r = requests.get(signed_url) | |
| if r.status_code != 200: | |
| return jsonify({'error': 'Unable to fetch image from storage'}), 500 | |
| return send_file( | |
| io.BytesIO(r.content), | |
| mimetype='image/jpeg', | |
| as_attachment=True, | |
| attachment_filename='receipt.jpg' | |
| ) | |
| except Exception as e: | |
| print(f"Download receipt error: {str(e)}") | |
| return jsonify({'error': str(e)}), 500 | |
| # ======================================== | |
| # Delete user endpoint | |
| # ======================================== | |
| def delete_user(uid): | |
| try: | |
| verify_admin(request.headers.get('Authorization', '')) | |
| try: | |
| user = auth.get_user(uid) | |
| except auth.UserNotFoundError: | |
| return jsonify({'error': 'User not found'}), 404 | |
| auth.delete_user(uid) | |
| db.reference(f'users/{uid}').delete() | |
| transactions_ref = db.reference('transactions') | |
| user_transactions = transactions_ref.order_by_child('uid').equal_to(uid).get() | |
| if user_transactions: | |
| for transaction_id in user_transactions.keys(): | |
| transactions_ref.child(transaction_id).delete() | |
| return jsonify({ | |
| 'success': True, | |
| 'message': f'User {uid} and all associated data deleted successfully' | |
| }) | |
| except Exception as e: | |
| return jsonify({'error': str(e)}), 500 | |
| if __name__ == '__main__': | |
| app.run(debug=True, host="0.0.0.0", port=7860) | |