Spaces:
Sleeping
Sleeping
| import os | |
| import io | |
| import json | |
| import hashlib | |
| import uuid | |
| from datetime import datetime, time, timedelta | |
| from PIL import Image | |
| import pytz | |
| from flask import Flask, request, jsonify, send_file | |
| from flask_cors import CORS | |
| import google.generativeai as genai | |
| import firebase_admin | |
| from firebase_admin import credentials, db, storage, auth | |
| import pandas as pd | |
| import numpy as np | |
| import requests | |
| from urllib.parse import urlparse, unquote | |
| app = Flask(__name__) | |
| CORS(app) | |
| # ---------- Firebase initialization ---------- | |
| Firebase_DB = os.getenv("Firebase_DB") | |
| Firebase_Storage = os.getenv("Firebase_Storage") | |
| try: | |
| cred_json = os.environ.get("FIREBASE") | |
| if cred_json: | |
| cred = credentials.Certificate(json.loads(cred_json)) | |
| firebase_admin.initialize_app(cred, { | |
| 'databaseURL': Firebase_DB, | |
| 'storageBucket': Firebase_Storage | |
| }) | |
| else: | |
| print("FIREBASE secret not set.") | |
| except Exception as e: | |
| print(f"Firebase init error: {e}") | |
| bucket = storage.bucket() | |
| # ---------- Helper functions ---------- | |
| def configure_gemini(): | |
| genai.configure(api_key=os.getenv("GEMINI_API_KEY")) | |
| return genai.GenerativeModel('gemini-2.0-flash-thinking-exp') | |
| def verify_token(token): | |
| try: | |
| return auth.verify_id_token(token)['uid'] | |
| except: | |
| return None | |
| def verify_global_admin(header): | |
| if not header or not header.startswith('Bearer '): | |
| raise PermissionError('Invalid token') | |
| uid = verify_token(header.split(' ')[1]) | |
| if not uid or not db.reference(f'users/{uid}').get().get('is_admin', False): | |
| raise PermissionError('Global admin required') | |
| return uid | |
| def verify_org_manager(uid, org_id): | |
| org = db.reference(f'organizations/{org_id}').get() or {} | |
| if uid != org.get('owner') and uid not in org.get('managers', []): | |
| raise PermissionError('Manager access required') | |
| return org | |
| def get_auth_uid(): | |
| h = request.headers.get('Authorization', '') | |
| parts = h.split(' ') | |
| return verify_token(parts[1]) if len(parts) == 2 and parts[0]=='Bearer' else None | |
| def get_blob_path(url): | |
| p = urlparse(url) | |
| if p.netloc == "storage.googleapis.com": | |
| return '/'.join(p.path.lstrip('/').split('/')[1:]) | |
| prefix = f"/v0/b/{bucket.name}/o/" | |
| if p.path.startswith(prefix): | |
| return unquote(p.path[len(prefix):]) | |
| return None | |
| # ======================================== | |
| # AUTH & REGISTRATION | |
| # ======================================== | |
| def register(): | |
| data = request.get_json() or {} | |
| email, password = data.get('email'), data.get('password') | |
| if not email or not password: | |
| return jsonify(error='email and password required'), 400 | |
| try: | |
| user = auth.create_user(email=email, password=password) | |
| db.reference(f'users/{user.uid}').set({ | |
| 'daily_cash': 0, | |
| 'remaining_cash': 0, | |
| 'last_reset': datetime.now(pytz.UTC).isoformat(), | |
| 'is_admin': False | |
| }) | |
| return jsonify(success=True, uid=user.uid, email=user.email), 201 | |
| except Exception as e: | |
| return jsonify(error=str(e)), 400 | |
| # ======================================== | |
| # ORGANIZATIONS & INVITES | |
| # ======================================== | |
| def create_org(): | |
| uid = get_auth_uid() | |
| if not uid: return jsonify(error='Invalid token'), 401 | |
| name = (request.get_json() or {}).get('name') | |
| if not name: return jsonify(error='Organization name required'), 400 | |
| ref = db.reference('organizations').push({ | |
| 'name': name, | |
| 'owner': uid, | |
| 'managers': [], | |
| 'members': [uid], | |
| 'created_at': datetime.now(pytz.UTC).isoformat() | |
| }) | |
| return jsonify(success=True, org_id=ref.key), 201 | |
| def list_orgs(): | |
| uid = get_auth_uid() | |
| if not uid: return jsonify(error='Invalid token'), 401 | |
| all_orgs = db.reference('organizations').get() or {} | |
| mine = [{'org_id': oid, **o} for oid,o in all_orgs.items() if uid in o.get('members',[])] | |
| return jsonify(organizations=mine) | |
| def invite_user(org_id): | |
| uid = get_auth_uid() | |
| try: verify_org_manager(uid, org_id) | |
| except PermissionError as e: return jsonify(error=str(e)), 403 | |
| j = request.get_json() or {} | |
| email, role = j.get('email'), j.get('role','member') | |
| if role not in ('member','manager'): return jsonify(error='invalid role'), 400 | |
| invite_id = str(uuid.uuid4()) | |
| db.reference(f'invites/{invite_id}').set({ | |
| 'org_id': org_id, | |
| 'email': email, | |
| 'role': role, | |
| 'sent_at': datetime.now(pytz.UTC).isoformat() | |
| }) | |
| return jsonify(success=True, invite_id=invite_id) | |
| def accept_invite(): | |
| uid = get_auth_uid() | |
| if not uid: return jsonify(error='Invalid token'), 401 | |
| inv = db.reference(f'invites/{(request.get_json() or {}).get("invite_id")}').get() | |
| if not inv: return jsonify(error='Invite not found'), 404 | |
| if auth.get_user(uid).email.lower() != inv['email'].lower(): | |
| return jsonify(error='Email mismatch'), 403 | |
| org_ref = db.reference(f'organizations/{inv["org_id"]}') | |
| org = org_ref.get() or {} | |
| m, g = set(org.get('members',[])), set(org.get('managers',[])) | |
| m.add(uid) | |
| if inv['role']=='manager': g.add(uid) | |
| org_ref.update({'members':list(m), 'managers':list(g)}) | |
| db.reference(f'invites/{inv["invite_id"]}').delete() | |
| return jsonify(success=True, org_id=inv['org_id']) | |
| # ======================================== | |
| # PROJECTS & ALLOCATIONS | |
| # ======================================== | |
| def create_project(org_id): | |
| uid = get_auth_uid() | |
| try: verify_org_manager(uid, org_id) | |
| except PermissionError as e: return jsonify(error=str(e)), 403 | |
| b = request.get_json() or {} | |
| name = b.get('name'); budget = float(b.get('budget',0)) | |
| if not name or budget<=0: return jsonify(error='name and positive budget required'), 400 | |
| ref = db.reference('projects').push({ | |
| 'org_id': org_id, | |
| 'name': name, | |
| 'budget': budget, | |
| 'spent': 0.0, | |
| 'recurring': bool(b.get('recurring',False)), | |
| 'interval': b.get('interval'), | |
| 'due_date': b.get('due_date'), | |
| 'allocations': {}, | |
| 'created_at': datetime.now(pytz.UTC).isoformat() | |
| }) | |
| return jsonify(success=True, project_id=ref.key), 201 | |
| def list_projects(org_id): | |
| uid = get_auth_uid() | |
| org = db.reference(f'organizations/{org_id}').get() or {} | |
| if uid not in org.get('members',[]): return jsonify(error='Access denied'), 403 | |
| projs = db.reference('projects').order_by_child('org_id').equal_to(org_id).get() or {} | |
| return jsonify(projects=[{'project_id':k,**v} for k,v in projs.items()]) | |
| def modify_project(org_id, pid): | |
| uid = get_auth_uid() | |
| try: verify_org_manager(uid, org_id) | |
| except PermissionError as e: return jsonify(error=str(e)), 403 | |
| ref = db.reference(f'projects/{pid}') | |
| if request.method=='PUT': | |
| ref.update(request.get_json() or {}) | |
| else: | |
| ref.delete() | |
| return jsonify(success=True) | |
| # Allocations | |
| def allocations(pid, org_id): | |
| uid = get_auth_uid() | |
| org = db.reference(f'organizations/{org_id}').get() or {} | |
| if request.method=='POST': | |
| try: verify_org_manager(uid, org_id) | |
| except PermissionError as e: return jsonify(error=str(e)), 403 | |
| b = request.get_json() or {} | |
| name, amt = b.get('name'), float(b.get('budget',0)) | |
| if not name or amt<=0: return jsonify(error='name and positive budget required'), 400 | |
| aid = str(uuid.uuid4()) | |
| db.reference(f'projects/{pid}/allocations/{aid}').set({ | |
| 'name': name, 'budget': amt, 'spent': 0.0 | |
| }) | |
| return jsonify(success=True, allocation_id=aid), 201 | |
| # GET | |
| if uid not in org.get('members',[]): return jsonify(error='Access denied'), 403 | |
| allocs = db.reference(f'projects/{pid}/allocations').get() or {} | |
| return jsonify(allocations=[{'allocation_id':k,**v} for k,v in allocs.items()]) | |
| def modify_allocation(org_id, pid, aid): | |
| uid = get_auth_uid() | |
| try: verify_org_manager(uid, org_id) | |
| except PermissionError as e: return jsonify(error=str(e)), 403 | |
| ref = db.reference(f'projects/{pid}/allocations/{aid}') | |
| if request.method=='PUT': | |
| updates = {} | |
| j = request.get_json() or {} | |
| if 'name' in j: updates['name'] = j['name'] | |
| if 'budget' in j: updates['budget'] = float(j['budget']) | |
| if not updates: return jsonify(error='nothing to update'), 400 | |
| ref.update(updates) | |
| else: | |
| ref.delete() | |
| return jsonify(success=True) | |
| # ======================================== | |
| # RECEIPTS, MANUAL ENTRY, AI REPORT | |
| # ======================================== | |
| def process_receipt(model, image): | |
| prompt = """Analyze receipt... return JSON with keys: is_receipt,total,items,date,receipt_number""" | |
| try: | |
| return model.generate_content([prompt, image]).text | |
| except: | |
| return "{}" | |
| def process_receipt_endpoint(): | |
| uid = get_auth_uid() | |
| if not uid: return jsonify(error='Invalid token'), 401 | |
| # Confirmation branch | |
| if request.form.get('confirmed')=='true': | |
| data = {**{k:request.form[k] for k in ('date','receipt_number','image_url')}, | |
| 'total':float(request.form['total']), | |
| 'items': [i.strip() for i in request.form['items'].split(',')], | |
| } | |
| return validate_and_save_transaction(uid, db.reference(f'users/{uid}').get(), data, | |
| request.form['file_hash'], None, False) | |
| file = request.files.get('receipt') | |
| if not file: return jsonify(error='No file uploaded'), 400 | |
| img_bytes = file.read() | |
| file_hash = hashlib.md5(img_bytes).hexdigest() | |
| if db.reference('transactions').order_by_child('hash').equal_to(file_hash).get(): | |
| return jsonify(error='Receipt already processed'), 400 | |
| ts = datetime.now().strftime('%Y%m%d_%H%M%S') | |
| blob = bucket.blob(f'receipts/{uid}/{ts}_{file_hash}.jpg') | |
| blob.upload_from_string(img_bytes, content_type='image/jpeg') | |
| img = Image.open(io.BytesIO(img_bytes)) | |
| buf = io.BytesIO(); img.save(buf, 'JPEG', optimize=True, quality=90) | |
| text = process_receipt(configure_gemini(), Image.open(io.BytesIO(buf.getvalue()))) | |
| try: | |
| js = json.loads(text[text.find('{'):text.rfind('}')+1]) | |
| except: | |
| return jsonify(error='Parse error', raw=text), 400 | |
| if not js.get('is_receipt'): return jsonify(error='Not a valid receipt'), 400 | |
| js.update(file_hash=file_hash, image_url=blob.public_url) | |
| return jsonify(success=True, extracted=True, data=js, message='Confirm to save') | |
| def manual_entry(): | |
| uid = get_auth_uid() | |
| if not uid: return jsonify(error='Invalid token'), 401 | |
| user = db.reference(f'users/{uid}') | |
| data = { | |
| 'total': float(request.form.get('total',0)), | |
| 'items': [i.strip() for i in request.form.get('items','').split(',')], | |
| 'date': request.form.get('date'), | |
| 'receipt_number': request.form.get('receipt_number') | |
| } | |
| return validate_and_save_transaction(uid, user.get(), data, | |
| hashlib.md5(str(datetime.now()).encode()).hexdigest(), | |
| None, True) | |
| def validate_and_save_transaction(uid, user_data, data, file_hash, img_bytes, manual): | |
| total = float(data.get('total',0)) | |
| db.reference(f'users/{uid}').update({ | |
| 'remaining_cash': user_data['remaining_cash'] - total | |
| }) | |
| # project/allocation spend | |
| pid = data.get('project_id'); aid = data.get('allocation_id') | |
| if pid: | |
| proj_ref = db.reference(f'projects/{pid}') | |
| p = proj_ref.get() or {} | |
| proj_ref.update({'spent': p.get('spent',0)+total}) | |
| if aid: | |
| alloc_ref = proj_ref.child(f'allocations/{aid}') | |
| a = alloc_ref.get() or {} | |
| alloc_ref.update({'spent': a.get('spent',0)+total}) | |
| tx = { | |
| 'uid': uid, 'total': total, 'items': data.get('items',[]), | |
| 'date': data.get('date'), 'receipt_number': data.get('receipt_number'), | |
| 'timestamp': datetime.now(pytz.UTC).isoformat(), | |
| 'hash': file_hash, 'manual_entry': manual, | |
| 'project_id': pid, 'allocation_id': aid | |
| } | |
| if img_bytes: | |
| ts = datetime.now().strftime('%Y%m%d_%H%M%S') | |
| blob = bucket.blob(f'receipts/{uid}/{ts}_{file_hash}.jpg') | |
| blob.upload_from_string(img_bytes, 'image/jpeg') | |
| tx['image_url'] = blob.public_url | |
| new = db.reference('transactions').push(tx) | |
| return jsonify(success=True, transaction={**tx,'id':new.key}) | |
| # ======================================== | |
| # PERSONAL OVERVIEW & PROFILE | |
| # ======================================== | |
| def spending_overview(): | |
| uid = get_auth_uid() | |
| ref = db.reference('transactions').order_by_child('uid').equal_to(uid) | |
| items = [{**v,'id':k} for k,v in (ref.get() or {}).items()] | |
| df = pd.DataFrame(items) | |
| if df.empty: | |
| return jsonify(daily_spending=[], recent_transactions=[]) | |
| df['parsed'] = pd.to_datetime(df['date'], errors='coerce').fillna(pd.Timestamp('2000-01-01')) | |
| df['date_only'] = df['parsed'].dt.date.astype(str) | |
| daily = df.groupby('date_only')['total'].sum().reset_index().rename(columns={'date_only':'date'}) | |
| recent = df.sort_values('timestamp',ascending=False).head(10).drop(columns=['parsed']) | |
| return jsonify( | |
| daily_spending=daily.to_dict(orient='records'), | |
| recent_transactions=recent.to_dict(orient='records') | |
| ) | |
| def user_profile(): | |
| uid = get_auth_uid() | |
| u = auth.get_user(uid) | |
| d = db.reference(f'users/{uid}').get() or {} | |
| return jsonify( | |
| uid=uid, email=u.email, | |
| daily_cash=d.get('daily_cash',0), | |
| remaining_cash=d.get('remaining_cash',0), | |
| last_reset=d.get('last_reset'), | |
| is_admin=d.get('is_admin',False) | |
| ) | |
| # ======================================== | |
| # ORG-LEVEL ADMIN | |
| # ======================================== | |
| def org_admin_overview(org_id): | |
| uid = get_auth_uid() | |
| try: org = verify_org_manager(uid, org_id) | |
| except PermissionError as e: return jsonify(error=str(e)), 403 | |
| members = [] | |
| for m in org.get('members',[]): | |
| try: em = auth.get_user(m).email | |
| except: em = None | |
| role = 'owner' if m==org['owner'] else 'manager' if m in org['managers'] else 'member' | |
| members.append(dict(uid=m,email=em,role=role)) | |
| txs = [] | |
| all_t = db.reference('transactions').get() or {} | |
| for tid,td in all_t.items(): | |
| p = db.reference(f'projects/{td.get("project_id","")}').get() or {} | |
| if p.get('org_id')==org_id: | |
| txs.append({**td,'id':tid}) | |
| return jsonify( | |
| members=members, | |
| transactions=txs, | |
| analytics=dict( | |
| total_members=len(members), | |
| total_transactions=len(txs), | |
| total_spent=sum(t['total'] for t in txs) | |
| ) | |
| ) | |
| def org_admin_set_role(org_id, mid): | |
| uid = get_auth_uid() | |
| try: verify_org_manager(uid, org_id) | |
| except PermissionError as e: return jsonify(error=str(e)), 403 | |
| r = (request.get_json() or {}).get('role') | |
| if r not in ('member','manager'): return jsonify(error='invalid role'),400 | |
| ref = db.reference(f'organizations/{org_id}') | |
| org = ref.get() or {} | |
| mng = set(org.get('managers',[])) | |
| if r=='manager': mng.add(mid) | |
| else: mng.discard(mid) | |
| ref.update(managers=list(mng)) | |
| return jsonify(success=True) | |
| def org_admin_remove_user(org_id, mid): | |
| uid = get_auth_uid() | |
| try: verify_org_manager(uid, org_id) | |
| except PermissionError as e: return jsonify(error=str(e)), 403 | |
| ref = db.reference(f'organizations/{org_id}') | |
| org = ref.get() or {} | |
| if mid==org.get('owner'): return jsonify(error='Cannot remove owner'),400 | |
| m_set, mg_set = set(org.get('members',[])), set(org.get('managers',[])) | |
| m_set.discard(mid); mg_set.discard(mid) | |
| ref.update(members=list(m_set), managers=list(mg_set)) | |
| return jsonify(success=True) | |
| def modify_transaction(tid): | |
| try: | |
| verify_org_manager(request.headers.get('Authorization','')) | |
| ref = db.reference(f'transactions/{tid}') | |
| if request.method=='PUT': | |
| ref.update(request.get_json() or {}) | |
| else: | |
| ref.delete() | |
| return jsonify(success=True) | |
| except Exception as e: | |
| return jsonify(error=str(e)),500 | |
| # ======================================== | |
| # GLOBAL ADMIN (developer) | |
| # ======================================== | |
| def get_admin_overview(): | |
| try: | |
| verify_global_admin(request.headers.get('Authorization','')) | |
| users = db.reference('users').get() or {} | |
| ulist = [{'uid':u,'email':(auth.get_user(u).email if auth.get_user(u) else None), | |
| 'is_admin':d.get('is_admin',False)} for u,d in users.items()] | |
| txs = db.reference('transactions').get() or {} | |
| tlist = [{**v,'id':k} for k,v in txs.items()] | |
| return jsonify( | |
| users=ulist, transactions=tlist, | |
| analytics=dict( | |
| total_users=len(ulist), | |
| total_transactions=len(tlist), | |
| total_spent=sum(t['total'] for t in tlist) | |
| ) | |
| ) | |
| except Exception as e: | |
| return jsonify(error=str(e)), 500 | |
| def create_user_admin(): | |
| try: | |
| verify_global_admin(request.headers.get('Authorization','')) | |
| d = request.get_json() or {} | |
| user = auth.create_user(email=d['email'], password=d['password']) | |
| db.reference(f'users/{user.uid}').set({ | |
| 'daily_cash': d.get('daily_cash',0), | |
| 'remaining_cash': d.get('daily_cash',0), | |
| 'last_reset': datetime.now(pytz.UTC).isoformat(), | |
| 'is_admin': d.get('is_admin',False) | |
| }) | |
| return jsonify(success=True, user=dict(uid=user.uid,email=user.email)),201 | |
| except Exception as e: | |
| return jsonify(error=str(e)),400 | |
| def admin_reset_password(uid): | |
| try: | |
| verify_global_admin(request.headers.get('Authorization','')) | |
| npw = (request.get_json() or {}).get('new_password') | |
| if not npw: return jsonify(error='new_password required'),400 | |
| auth.update_user(uid, password=npw) | |
| return jsonify(success=True) | |
| except Exception as e: | |
| return jsonify(error=str(e)),400 | |
| def delete_user(uid): | |
| try: | |
| verify_global_admin(request.headers.get('Authorization','')) | |
| auth.delete_user(uid) | |
| db.reference(f'users/{uid}').delete() | |
| txs = db.reference('transactions').order_by_child('uid').equal_to(uid).get() or {} | |
| for k in txs: db.reference(f'transactions/{k}').delete() | |
| return jsonify(success=True) | |
| except Exception as e: | |
| return jsonify(error=str(e)),500 | |
| # ======================================== | |
| if __name__ == '__main__': | |
| app.run(debug=True, host="0.0.0.0", port=7860) |