Spaces:
Sleeping
Sleeping
| import os | |
| import io | |
| import json | |
| import hashlib | |
| from datetime import datetime, time | |
| from PIL import Image | |
| import pytz | |
| from flask import Flask, request, jsonify | |
| from flask_cors import CORS | |
| import google.generativeai as genai | |
| import firebase_admin | |
| from firebase_admin import credentials, db, storage, auth | |
| import pandas as pd | |
| import requests # Needed for calling Firebase REST API | |
| app = Flask(__name__) | |
| CORS(app) | |
| # Firebase initialization | |
| cred = credentials.Certificate('mydateproject-e7994-firebase-adminsdk-mpa9a-2fd9c32a98.json') | |
| firebase_admin.initialize_app(cred, { | |
| 'databaseURL': 'https://mydateproject-e7994.firebaseio.com', | |
| 'storageBucket': 'mydateproject-e7994.firebasestorage.app' | |
| }) | |
| bucket = storage.bucket() | |
| api_key = os.environ['Gemini'] | |
| FIREBASE_API_KEY = os.environ.get('FIREBASE_API_KEY') # For auth REST API calls | |
| # Helper functions | |
| def configure_gemini(): | |
| genai.configure(api_key=api_key) | |
| return genai.GenerativeModel('gemini-2.0-flash-thinking-exp') | |
| def verify_token(token): | |
| try: | |
| decoded_token = auth.verify_id_token(token) | |
| return decoded_token['uid'] | |
| except Exception as e: | |
| return None | |
| def check_daily_reset(user_ref): | |
| try: | |
| user_data = user_ref.get() | |
| now = datetime.now(pytz.UTC) | |
| last_reset = datetime.fromisoformat(user_data.get('last_reset', '2000-01-01T00:00:00+00:00')) | |
| if now.time() >= time(8,0) and last_reset.date() < now.date(): | |
| user_ref.update({ | |
| 'remaining_cash': user_data['daily_cash'], | |
| 'last_reset': now.isoformat() | |
| }) | |
| return True | |
| return False | |
| except Exception as e: | |
| print(f"Reset error: {str(e)}") | |
| return False | |
| # Process receipt image | |
| def process_receipt(model, image): | |
| prompt = """Analyze this image and determine if it's a receipt. If it is a receipt, extract: | |
| - Total amount (as float) | |
| - List of items purchased (array of strings) | |
| - Date of transaction (DD/MM/YYYY format) | |
| - Receipt number (as string) | |
| Return JSON format with keys: is_receipt (boolean), total, items, date, receipt_number. | |
| If not a receipt, return {"is_receipt": false}""" | |
| try: | |
| response = model.generate_content([prompt, image]) | |
| return response.text | |
| except Exception as e: | |
| st.error(f"Gemini error: {str(e)}") | |
| return "{}" | |
| # ======================================== | |
| # Authentication Endpoints | |
| # ======================================== | |
| # ======================================== | |
| # Receipt Processing Endpoint (unchanged) | |
| # ======================================== | |
| def process_receipt_endpoint(): | |
| try: | |
| auth_header = request.headers.get('Authorization') | |
| token = auth_header.split('Bearer ')[1] if auth_header else None | |
| uid = verify_token(token) if token else None | |
| if not uid: | |
| return jsonify({'error': 'Invalid token'}), 401 | |
| user_ref = db.reference(f'users/{uid}') | |
| user_data = user_ref.get() | |
| check_daily_reset(user_ref) | |
| # Handle manual entry | |
| if request.form.get('manual_entry') == 'true': | |
| return handle_manual_entry(uid, user_ref, user_data) | |
| # Handle image processing | |
| file = request.files.get('receipt') | |
| if not file: | |
| return jsonify({'error': 'No file uploaded'}), 400 | |
| image_bytes = file.read() | |
| file_hash = hashlib.md5(image_bytes).hexdigest() | |
| transactions_ref = db.reference('transactions') | |
| existing = transactions_ref.order_by_child('hash').equal_to(file_hash).get() | |
| if existing: | |
| return jsonify({'error': 'Receipt already processed'}), 400 | |
| image = Image.open(io.BytesIO(image_bytes)) | |
| model = configure_gemini() | |
| result_text = process_receipt(model, image) | |
| try: | |
| json_str = result_text[result_text.find('{'):result_text.rfind('}')+1] | |
| data = json.loads(json_str) | |
| except json.JSONDecodeError: | |
| return jsonify({'error': 'Failed to parse receipt data', 'raw_response': result_text}), 400 | |
| if not data.get('is_receipt', False): | |
| return jsonify({'error': 'Not a valid receipt'}), 400 | |
| return validate_and_save_transaction( | |
| uid=uid, | |
| user_data=user_data, | |
| data=data, | |
| file_hash=file_hash, | |
| image_bytes=image_bytes, | |
| manual=False | |
| ) | |
| except Exception as e: | |
| print(image) | |
| return jsonify({'error': str(e)+f'uid: {uid}, user_data: {user_dta}, data:{data}, file_hash: {file_hash},image_bytes: {image_bytes}'}), 500 | |
| def handle_manual_entry(uid, user_ref, user_data): | |
| try: | |
| data = { | |
| 'total': float(request.form.get('total')), | |
| 'items': [item.strip() for item in request.form.get('items', '').split(',')], | |
| 'date': request.form.get('date'), | |
| 'receipt_number': request.form.get('receipt_number'), | |
| 'is_receipt': True | |
| } | |
| return validate_and_save_transaction( | |
| uid=uid, | |
| user_data=user_data, | |
| data=data, | |
| file_hash=hashlib.md5(str(datetime.now()).encode()).hexdigest(), | |
| image_bytes=None, | |
| manual=True | |
| ) | |
| except Exception as e: | |
| return jsonify({'error': str(e)}), 400 | |
| def validate_and_save_transaction(uid, user_data, data, file_hash, image_bytes, manual=False): | |
| transactions_ref = db.reference('transactions') | |
| existing = transactions_ref.order_by_child('receipt_number').equal_to(data['receipt_number']).get() | |
| if existing: | |
| return jsonify({'error': f"Receipt #{data['receipt_number']} already exists"}), 400 | |
| total = float(data.get('total', 0)) | |
| if total > user_data['remaining_cash']: | |
| return jsonify({'error': 'Insufficient funds'}), 400 | |
| # Upload image if available | |
| image_url = None | |
| if image_bytes and not manual: | |
| timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') | |
| blob = bucket.blob(f'receipts/{uid}/{timestamp}_{file_hash}.jpg') | |
| blob.upload_from_string(image_bytes, content_type='image/jpeg') | |
| image_url = blob.public_url | |
| # Update user cash | |
| new_remaining = user_data['remaining_cash'] - total | |
| db.reference(f'users/{uid}').update({'remaining_cash': new_remaining}) | |
| # Save transaction | |
| transaction_data = { | |
| 'uid': uid, | |
| 'total': total, | |
| 'items': data.get('items', []), | |
| 'date': data.get('date'), | |
| 'receipt_number': data.get('receipt_number'), | |
| 'timestamp': datetime.now(pytz.UTC).isoformat(), | |
| 'hash': file_hash, | |
| 'image_url': image_url, | |
| 'manual_entry': manual | |
| } | |
| new_transaction_ref = transactions_ref.push(transaction_data) | |
| return jsonify({ | |
| 'success': True, | |
| 'transaction': {**transaction_data, 'id': new_transaction_ref.key}, | |
| 'remaining_cash': new_remaining | |
| }) | |
| # ======================================== | |
| # Data Endpoints for Visualizations | |
| # ======================================== | |
| def get_spending_overview(): | |
| try: | |
| uid = verify_token(request.headers.get('Authorization', '').split(' ')[1]) | |
| transactions_ref = db.reference('transactions') | |
| transactions = transactions_ref.order_by_child('uid').equal_to(uid).get() | |
| df = pd.DataFrame(transactions.values()) | |
| if df.empty: | |
| return jsonify({'error': 'No transactions found'}), 404 | |
| df['date'] = pd.to_datetime(df['date'], format='%d/%m/%Y', errors='coerce') | |
| daily_spending = df.groupby(df['date'].dt.date)['total'].sum().reset_index() | |
| return jsonify({ | |
| 'daily_spending': daily_spending.to_dict(orient='records'), | |
| 'recent_transactions': df.sort_values(by='timestamp', ascending=False) | |
| .head(10) | |
| .to_dict(orient='records') | |
| }) | |
| except Exception as e: | |
| return jsonify({'error': str(e)}), 500 | |
| def get_global_overview(): | |
| try: | |
| verify_admin(request.headers.get('Authorization', '')) | |
| transactions_ref = db.reference('transactions') | |
| users_ref = db.reference('users') | |
| transactions = pd.DataFrame(transactions_ref.get().values()) | |
| users = pd.DataFrame(users_ref.get().values()) | |
| merged = pd.merge(transactions, users, left_on='uid', right_on='uid') | |
| return jsonify({ | |
| 'user_spending': merged.groupby('uid')['total'].sum().to_dict(), | |
| 'all_transactions': transactions.sort_values(by='timestamp', ascending=False) | |
| .to_dict(orient='records') | |
| }) | |
| except Exception as e: | |
| return jsonify({'error': str(e)}), 500 | |
| # Helper function to verify admin status | |
| def verify_admin(auth_header): | |
| if not auth_header.startswith('Bearer '): | |
| raise ValueError('Invalid token') | |
| token = auth_header.split(' ')[1] | |
| uid = verify_token(token) | |
| user = auth.get_user(uid) | |
| if not user.custom_claims or not user.custom_claims.get('admin'): | |
| raise PermissionError('Admin access required') | |
| # User management endpoint for profile | |
| def get_user_profile(): | |
| try: | |
| uid = verify_token(request.headers.get('Authorization', '').split(' ')[1]) | |
| user = auth.get_user(uid) | |
| user_data = db.reference(f'users/{uid}').get() | |
| return jsonify({ | |
| 'uid': uid, | |
| 'email': user.email, | |
| 'daily_cash': user_data.get('daily_cash', 100), | |
| 'remaining_cash': user_data.get('remaining_cash', 100), | |
| 'last_reset': user_data.get('last_reset'), | |
| 'is_admin': user_data.get('is_admin', False) | |
| }) | |
| except Exception as e: | |
| return jsonify({'error': str(e)}), 500 | |
| if __name__ == '__main__': | |
| app.run(debug=True, host="0.0.0.0", port=7860) | |