# ============================================================================= # IRISTRACK — BACKEND SERVER # ============================================================================= # SECTION INDEX # 1. Imports & Initialization # 2. Core Helper Functions # 3. Gemini Functions # 4. AssemblyAI Transcription # 5. Organization & Department Endpoints # 6. User Management Endpoints # 7. KPI Model & Task Endpoints # 8. Submission Endpoints # 9. Leave Management Endpoints # 10. Client Meeting Endpoints # 11. Expense Management Endpoints # 12. Budget Management Endpoints # 13. Notification Endpoints # 14. Analytics Engine # 15. Dashboard Endpoints # 16. WhatsApp / Twilio Relay Endpoints # 17. SuperAdmin Endpoints # 18. Audit Log # 19. Debug # 20. Main # # FIREBASE NODE CONTRACT # organizations/{orgId}/ # users/{orgId}/{uid}/ # departments/{orgId}/{deptId}/ # kpi_models/{orgId}/{roleId}/ # submissions/{orgId}/{uid}/{weekId}/ # tasks/{orgId}/{taskId}/ # expenses/{orgId}/{expenseId}/ # budgets/{orgId}/{deptId}/ # notifications/{orgId}/{uid}/ # leave_requests/{orgId}/{requestId}/ # client_meetings/{orgId}/{meetingId}/ # audit_log/{orgId}/{entryId}/ # platform_orgs/{orgId}/ <- SuperAdmin org index (list of all orgs) # superadmins/{uid}/ <- is_admin: True — set manually in Firebase console # # SUPERADMIN SAFETY CONTRACT # SuperAdmin = any Firebase user with is_admin: True in superadmins/{uid}. # Set this flag directly in the Firebase console — no endpoint required. # Pattern mirrors Pitchfy's is_admin flag. Single source of truth. # # WEBHOOK_SECRET explained # A shared secret string you generate once (e.g. openssl rand -hex 32) stored as a HF # Space secret. The cron scheduler sends it as "Authorization: Bearer " when # calling /api/whatsapp/send-weekly-prompts. Server compares strings — match = trusted. # It is a machine-to-machine API key for endpoints with no logged-in Firebase user. # # ENVIRONMENT VARIABLES (HuggingFace Secrets) # FIREBASE, Firebase_DB, FIREBASE_STORAGE_BUCKET, Gemini, ASSEMBLYAI_API_KEY, # TWILIO_ACCOUNT_SID, TWILIO_AUTH_TOKEN, TWILIO_WHATSAPP_NUMBER, # WEBHOOK_SECRET # ============================================================================= # ============================================================================= # 1. IMPORTS & INITIALIZATION # ============================================================================= import os import uuid import json import traceback import logging import hmac import hashlib import tempfile import time import re from datetime import datetime, timezone, date, timedelta from collections import defaultdict import requests from flask import Flask, request, jsonify from flask_cors import CORS import firebase_admin from firebase_admin import credentials, db, auth, storage try: from dotenv import load_dotenv load_dotenv() print("dotenv loaded (local dev mode)") except ImportError: print("dotenv not available — HF Spaces mode") logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) app = Flask(__name__) CORS(app) # ============================================================================= # CONSTANTS # ============================================================================= LEAVE_TYPES = ["annual", "sick", "maternity", "paternity", "unpaid", "study", "compassionate"] LEAVE_STATUSES = ["pending", "approved", "declined", "cancelled"] MEETING_STATUSES = ["scheduled", "in_progress", "completed", "cancelled"] EXPENSE_STATUSES = ["pending", "approved", "flagged", "rejected"] SUBMISSION_WEEK_FORMAT = "%G-W%V" # ISO 8601 week: %G = ISO year, %V = ISO week number (Mon-based) # Matches TypeScript's getISOWeek() output exactly e.g. "2026-W21" # %W (old value) was Monday-based week-of-year per Python locale, # which diverges from ISO by 1 on most dates — fixed here. # ============================================================================= # FIREBASE & AI INIT # ============================================================================= try: _fb_json = os.environ.get("FIREBASE") if not _fb_json: raise ValueError("'FIREBASE' env var not set.") _fb_url = os.environ.get("Firebase_DB") if not _fb_url: raise ValueError("'Firebase_DB' env var not set.") _fb_bucket = os.environ.get("FIREBASE_STORAGE_BUCKET", "") firebase_admin.initialize_app( credentials.Certificate(json.loads(_fb_json)), { "databaseURL": _fb_url, "storageBucket": _fb_bucket, # e.g. your-project.appspot.com }, ) db_ref = db.reference() logger.info(f"Firebase initialized. Storage bucket: '{_fb_bucket or 'not set'}'") except Exception as e: logger.critical(f"FATAL Firebase init: {e}") exit(1) try: _gemini_key = os.environ.get("Gemini") if not _gemini_key: raise ValueError("'Gemini' env var not set.") from google import genai gemini_client = genai.Client(api_key=_gemini_key) MODEL_NAME = "gemini-3.1-flash-lite" logger.info(f"Gemini initialized ({MODEL_NAME}).") except Exception as e: logger.critical(f"FATAL Gemini init: {e}") exit(1) ASSEMBLYAI_API_KEY = os.environ.get("ASSEMBLYAI_API_KEY") if not ASSEMBLYAI_API_KEY: logger.warning("ASSEMBLYAI_API_KEY not set — audio transcription disabled.") TWILIO_ACCOUNT_SID = os.environ.get("TWILIO_ACCOUNT_SID") TWILIO_AUTH_TOKEN = os.environ.get("TWILIO_AUTH_TOKEN") TWILIO_WHATSAPP_NUMBER = os.environ.get("TWILIO_WHATSAPP_NUMBER", "whatsapp:+14155238886") WEBHOOK_SECRET = os.environ.get("WEBHOOK_SECRET") # ============================================================================= # 2. CORE HELPER FUNCTIONS # ============================================================================= def _now_iso(): return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S.%f") + "Z" def _current_week_id(): return datetime.now(timezone.utc).strftime(SUBMISSION_WEEK_FORMAT) def _week_id_for_date(d: date): return d.strftime(SUBMISSION_WEEK_FORMAT) def _candidate_week_keys(week_id: str) -> list: """ Return candidate storage keys for an ISO week id, including legacy `%Y-W%W` keys for the same calendar week. The first item is the canonical (current-format) key. Earlier builds stored submissions under Python's `strftime("%Y-W%W")`, which usually differs from ISO `%G-W%V` by one. Old records remain under those keys, so reads must check both. """ keys = [week_id] m = re.match(r"^(\d{4})-W(\d{2})$", week_id or "") if not m: return keys iso_year = int(m.group(1)) iso_week = int(m.group(2)) try: monday = date.fromisocalendar(iso_year, iso_week, 1) except Exception: return keys for i in range(7): d = monday + timedelta(days=i) legacy = d.strftime("%Y-W%W") if legacy not in keys: keys.append(legacy) return keys def _find_week_submission(org_id: str, user_uid: str, week_id: str): """Return the first submission for a user across candidate week keys.""" for key in _candidate_week_keys(week_id): sub = db_ref.child(f"submissions/{org_id}/{user_uid}/{key}").get() if sub: return sub return None def _parse_iso(ts: str) -> datetime: if ts.endswith("Z"): ts = ts[:-1] + "+00:00" return datetime.fromisoformat(ts) def verify_token(auth_header): if not auth_header or not auth_header.startswith("Bearer "): return None try: return auth.verify_id_token(auth_header.split("Bearer ")[1])["uid"] except Exception as e: logger.warning(f"Token verification failed: {e}") return None def verify_superadmin(auth_header): """ SuperAdmin check — mirrors Pitchfy's is_admin pattern. Set is_admin: True on superadmins/{uid} directly in the Firebase console for rairorr@gmail.com's uid. No endpoint needed. """ uid = verify_token(auth_header) if not uid: raise PermissionError("Invalid or missing token.") user_data = db_ref.child(f"superadmins/{uid}").get() if not user_data or not user_data.get("is_admin", False): raise PermissionError("SuperAdmin access required.") return uid def verify_org_admin(auth_header, org_id): uid = verify_token(auth_header) if not uid: raise PermissionError("Invalid or missing token.") user_data = db_ref.child(f"users/{org_id}/{uid}").get() if not user_data: raise PermissionError("User not found in organization.") role = user_data.get("platform_role", "employee") if role not in ("org_admin", "dept_admin"): raise PermissionError("Admin access required.") return uid, role, user_data def verify_org_member(auth_header, org_id): uid = verify_token(auth_header) if not uid: raise PermissionError("Invalid or missing token.") user_data = db_ref.child(f"users/{org_id}/{uid}").get() if not user_data: raise PermissionError("User not found in organization.") return uid, user_data def get_org(org_id): return db_ref.child(f"organizations/{org_id}").get() def _write_audit(org_id, actor_uid, action, details=None): try: ref = db_ref.child(f"audit_log/{org_id}").push() ref.set({ "entryId": ref.key, "actorUid": actor_uid, "action": action, "details": details or {}, "timestamp": _now_iso(), }) except Exception as e: logger.error(f"Audit log write failed: {e}") def _send_whatsapp(to_number: str, message: str): """ Send a WhatsApp message via Twilio. to_number should be in E.164 format e.g. +263771234567 """ if not TWILIO_ACCOUNT_SID or not TWILIO_AUTH_TOKEN: logger.warning(f"Twilio not configured — skipping WhatsApp to {to_number}: {message[:60]}") return False try: url = f"https://api.twilio.com/2010-04-01/Accounts/{TWILIO_ACCOUNT_SID}/Messages.json" data = { "From": TWILIO_WHATSAPP_NUMBER, "To": f"whatsapp:{to_number}", "Body": message, } r = requests.post(url, data=data, auth=(TWILIO_ACCOUNT_SID, TWILIO_AUTH_TOKEN), timeout=10) r.raise_for_status() logger.info(f"WhatsApp sent to {to_number}: {message[:60]}") return True except Exception as e: logger.error(f"WhatsApp send failed to {to_number}: {e}") return False def _push_notification(org_id, uid, title, message, ntype="info", meta=None): try: ref = db_ref.child(f"notifications/{org_id}/{uid}").push() ref.set({ "notificationId": ref.key, "title": title, "message": message, "type": ntype, "meta": meta or {}, "read": False, "createdAt": _now_iso(), }) except Exception as e: logger.error(f"Push notification failed for {uid}: {e}") # ============================================================================= # 3. GEMINI FUNCTIONS # ============================================================================= def gemini_synthesize_kpi(natural_language_input: str, role_name: str) -> dict: prompt = f""" You are configuring a KPI framework for an employee role. Role: "{role_name}" Admin input: "{natural_language_input}" Extract KPI categories and their percentage weights. Weights must sum to 100. Return ONLY valid JSON: {{ "role": "{role_name}", "kpi_categories": [ {{"name": "", "weight": , "description": ""}} ], "synthesized_from": "{natural_language_input[:200]}" }} Rules: 3-7 categories. Weights are integers summing to exactly 100. """ try: r = gemini_client.models.generate_content(model=MODEL_NAME, contents=prompt) raw = r.text.strip().lstrip("```json").lstrip("```").rstrip("```").strip() return json.loads(raw) except Exception as e: logger.error(f"KPI synthesis failed: {e}") raise def gemini_synthesize_task(natural_language_input: str, org_context: str, kpi_categories: list) -> dict: cats = ", ".join(kpi_categories) if kpi_categories else "general" prompt = f""" You are creating a structured task from a manager's natural language description. Organization context: "{org_context}" KPI categories available: {cats} Manager input: "{natural_language_input}" Return ONLY valid JSON: {{ "title": "", "description": "", "kpi_category": "", "priority": "", "suggested_due_days": , "estimated_hours": }} """ try: r = gemini_client.models.generate_content(model=MODEL_NAME, contents=prompt) raw = r.text.strip().lstrip("```json").lstrip("```").rstrip("```").strip() return json.loads(raw) except Exception as e: logger.error(f"Task synthesis failed: {e}") raise def gemini_extract_submission( submission_text: str, kpi_categories: list, role_name: str, kpi_weights: dict, ) -> dict: cats_json = json.dumps(kpi_categories) weights_json = json.dumps(kpi_weights) prompt = f""" You are an AI performance analyst processing an employee's weekly activity report. Role: "{role_name}" KPI categories: {cats_json} Target KPI weights (%): {weights_json} Employee submission: "{submission_text}" Analyze and return ONLY valid JSON: {{ "structured_activities": [ {{"activity": "", "kpi_category": "", "estimated_percent": }} ], "actual_allocation": {{ "": }}, "alignment_score": <0-100 int>, "alignment_notes": "<2-3 sentence analysis of how well time allocation matches targets>", "success_events": [""], "role_drift_detected": , "role_drift_detail": "", "summary": "" }} Rules: - actual_allocation values must sum to 100 - alignment_score: 100 = perfect match to target weights, 0 = complete misalignment - role_drift: true if >20% of time on activities outside the role's defined KPI categories """ try: r = gemini_client.models.generate_content(model=MODEL_NAME, contents=prompt) raw = r.text.strip().lstrip("```json").lstrip("```").rstrip("```").strip() return json.loads(raw) except Exception as e: logger.error(f"Submission extraction failed: {e}") raise def gemini_check_submission_alignment_with_goals( submission_text: str, org_goals: str, kpi_categories: list, ) -> bool: """ Returns True if the submission meaningfully addresses work aligned with org goals/KPIs. Used by the WhatsApp weekly prompt scheduler to determine if an employee has already reported in a meaningful way this week. """ prompt = f""" Organization goals/KPIs: "{org_goals}" KPI categories: {json.dumps(kpi_categories)} Employee message or activity description: "{submission_text}" Does this message constitute a meaningful weekly work update that addresses the organization's goals or KPI categories? Answer with ONLY a JSON object: {{"is_meaningful_update": , "confidence": "", "reason": ""}} """ try: r = gemini_client.models.generate_content(model=MODEL_NAME, contents=prompt) raw = r.text.strip().lstrip("```json").lstrip("```").rstrip("```").strip() result = json.loads(raw) return result.get("is_meaningful_update", False) except Exception as e: logger.error(f"Goal alignment check failed: {e}") return False def gemini_extract_expense(description_text: str, receipt_url: str = None) -> dict: prompt = f""" Extract expense details from this employee description. Description: "{description_text}" {"Receipt image URL: " + receipt_url if receipt_url else ""} Return ONLY valid JSON: {{ "vendor": "", "amount": , "currency": "", "category": "", "date": "", "description": "", "is_client_related": }} """ try: r = gemini_client.models.generate_content(model=MODEL_NAME, contents=prompt) raw = r.text.strip().lstrip("```json").lstrip("```").rstrip("```").strip() return json.loads(raw) except Exception as e: logger.error(f"Expense extraction failed: {e}") raise def gemini_process_leave_request(natural_language_input: str, employee_name: str) -> dict: today = date.today().isoformat() prompt = f""" Today's date: {today} Employee name: "{employee_name}" Employee leave request: "{natural_language_input}" Extract leave request details and return ONLY valid JSON: {{ "leave_type": "", "start_date": "", "end_date": "", "days_requested": , "reason": "", "is_urgent": , "notes": "" }} Rules: - If dates are relative ("next Monday", "tomorrow") resolve from today's date. - If leave type is ambiguous, infer from context (sick = illness, annual = vacation etc). - days_requested should exclude weekends unless specifically stated. """ try: r = gemini_client.models.generate_content(model=MODEL_NAME, contents=prompt) raw = r.text.strip().lstrip("```json").lstrip("```").rstrip("```").strip() return json.loads(raw) except Exception as e: logger.error(f"Leave request extraction failed: {e}") raise def gemini_process_meeting_recording( transcript: str, meeting_title: str, attendees: list, org_context: str, ) -> dict: prompt = f""" You are processing a client meeting recording transcript. Meeting: "{meeting_title}" Organization context: "{org_context}" Attendees: {json.dumps(attendees)} Transcript: "{transcript}" Produce a structured meeting intelligence report. Return ONLY valid JSON: {{ "executive_summary": "<3-4 sentence overview of what was discussed and decided>", "key_decisions": [""], "action_items": [ {{"action": "", "owner": "", "due_date": "", "priority": ""}} ], "client_sentiment": "", "follow_up_required": , "risks_identified": [""], "opportunities_identified": [""], "next_meeting_suggested": , "employee_summary": "", "manager_summary": "" }} """ try: r = gemini_client.models.generate_content(model=MODEL_NAME, contents=prompt) raw = r.text.strip().lstrip("```json").lstrip("```").rstrip("```").strip() return json.loads(raw) except Exception as e: logger.error(f"Meeting processing failed: {e}") raise def gemini_process_general_whatsapp_message( message: str, employee_name: str, org_context: str, conversation_state: str, ) -> dict: """ Routes an incoming WhatsApp message to the correct intent. Returns intent + extracted data. """ prompt = f""" You are IrisTrack, an AI employee assistant for "{org_context}". Employee: "{employee_name}" Current conversation state: "{conversation_state}" Employee message: "{message}" Classify the intent and extract data. Return ONLY valid JSON: {{ "intent": "", "confidence": "", "extracted_data": {{}}, "reply_to_employee": "", "needs_followup": , "followup_question": "" }} Intents: - weekly_update: employee describing their work activities - leave_request: requesting time off - expense_submission: submitting an expense claim - meeting_update: updating on a client meeting - general_query: asking a question about policies, leave balance, tasks etc - greeting: hi, hello, hey etc - other: anything else """ try: r = gemini_client.models.generate_content(model=MODEL_NAME, contents=prompt) raw = r.text.strip().lstrip("```json").lstrip("```").rstrip("```").strip() return json.loads(raw) except Exception as e: logger.error(f"WhatsApp intent classification failed: {e}") return { "intent": "other", "confidence": "low", "extracted_data": {}, "reply_to_employee": "I received your message. I'll make sure it's noted.", "needs_followup": False, "followup_question": None, } # ============================================================================= # 4. ASSEMBLYAI TRANSCRIPTION # ============================================================================= def transcribe_audio_url(audio_url: str, language_code: str = "en") -> str: """ Transcribes an audio file from a URL using AssemblyAI. Returns the transcript text. Supports: voice notes from WhatsApp, web uploads. """ if not ASSEMBLYAI_API_KEY: raise RuntimeError("AssemblyAI API key not configured.") headers = {"authorization": ASSEMBLYAI_API_KEY, "content-type": "application/json"} # Submit transcription job submit_resp = requests.post( "https://api.assemblyai.com/v2/transcript", json={ "audio_url": audio_url, "language_code": language_code, "punctuate": True, "format_text": True, "speaker_labels": True, }, headers=headers, timeout=30, ) submit_resp.raise_for_status() transcript_id = submit_resp.json()["id"] logger.info(f"AssemblyAI transcript submitted: {transcript_id}") # Poll for completion polling_url = f"https://api.assemblyai.com/v2/transcript/{transcript_id}" max_attempts = 60 for attempt in range(max_attempts): time.sleep(3) poll_resp = requests.get(polling_url, headers=headers, timeout=15) poll_resp.raise_for_status() result = poll_resp.json() status = result.get("status") if status == "completed": logger.info(f"AssemblyAI transcript completed: {transcript_id}") return result.get("text", "") elif status == "error": raise RuntimeError(f"AssemblyAI error: {result.get('error')}") logger.info(f"AssemblyAI poll {attempt + 1}/{max_attempts}: {status}") raise TimeoutError("AssemblyAI transcription timed out.") def transcribe_audio_file(file_bytes: bytes, filename: str = "audio.ogg") -> str: """ Uploads audio bytes to AssemblyAI then transcribes. Used for web uploads where we have raw file data. """ if not ASSEMBLYAI_API_KEY: raise RuntimeError("AssemblyAI API key not configured.") headers = {"authorization": ASSEMBLYAI_API_KEY} # Upload upload_resp = requests.post( "https://api.assemblyai.com/v2/upload", headers={**headers, "content-type": "application/octet-stream"}, data=file_bytes, timeout=60, ) upload_resp.raise_for_status() upload_url = upload_resp.json()["upload_url"] logger.info(f"AssemblyAI upload complete: {upload_url[:60]}") return transcribe_audio_url(upload_url) # ============================================================================= # 5. ORGANIZATION & DEPARTMENT ENDPOINTS # ============================================================================= @app.route("/api/org/register", methods=["POST"]) def register_organization(): """ Creates a new organization and sets the caller as org_admin. Body: { name, industry, country, email, password, displayName } """ try: data = request.get_json() or {} email = data.get("email") password = data.get("password") display_name = data.get("displayName", "") org_name = data.get("name", "").strip() industry = data.get("industry", "") country = data.get("country", "Zimbabwe") if not email or not password or not org_name: return jsonify({"error": "email, password, and organization name required."}), 400 # Create Firebase Auth user try: fb_user = auth.create_user(email=email, password=password, display_name=display_name) except Exception as e: if "EMAIL_EXISTS" in str(e): return jsonify({"error": "Email already registered."}), 409 raise org_id = str(uuid.uuid4()) # Write organization record org_data = { "orgId": org_id, "name": org_name, "industry": industry, "country": country, "createdAt": _now_iso(), "reporting_day": "friday", "reporting_time": "08:00", "org_goals": "", "active": True, } db_ref.child(f"organizations/{org_id}").set(org_data) # Write platform_orgs index for SuperAdmin db_ref.child(f"platform_orgs/{org_id}").set({ "orgId": org_id, "name": org_name, "adminUid": fb_user.uid, "createdAt": _now_iso(), }) # Write user record in org user_data = { "uid": fb_user.uid, "email": email, "displayName": display_name, "phone": data.get("phone", ""), "platform_role": "org_admin", "department_id": None, "position": "Organization Admin", "createdAt": _now_iso(), "active": True, "leave_balance": {"annual": 20, "sick": 10, "study": 5}, "whatsapp_state": "idle", } db_ref.child(f"users/{org_id}/{fb_user.uid}").set(user_data) _write_audit(org_id, fb_user.uid, "org_registered", {"org_name": org_name}) logger.info(f"Organization registered: {org_id} by {fb_user.uid}") return jsonify({"success": True, "orgId": org_id, "uid": fb_user.uid, **org_data}), 201 except Exception as e: logger.error(f"org_register failed: {e}\n{traceback.format_exc()}") return jsonify({"error": str(e)}), 500 @app.route("/api/org/", methods=["GET"]) def get_organization(org_id): try: uid, user_data = verify_org_member(request.headers.get("Authorization"), org_id) org = get_org(org_id) if not org: return jsonify({"error": "Organization not found."}), 404 return jsonify(org), 200 except PermissionError as e: return jsonify({"error": str(e)}), 403 @app.route("/api/org/", methods=["PUT"]) def update_organization(org_id): """ Update org settings: name, industry, reporting_day, reporting_time, org_goals. """ try: uid, role, _ = verify_org_admin(request.headers.get("Authorization"), org_id) if role != "org_admin": return jsonify({"error": "Only org_admin can update organization settings."}), 403 data = request.get_json() or {} allowed = ["name", "industry", "country", "reporting_day", "reporting_time", "org_goals", "logo_url"] update = {k: v for k, v in data.items() if k in allowed} if not update: return jsonify({"error": "No valid fields to update."}), 400 db_ref.child(f"organizations/{org_id}").update(update) _write_audit(org_id, uid, "org_updated", update) return jsonify({"success": True}), 200 except PermissionError as e: return jsonify({"error": str(e)}), 403 @app.route("/api/org//departments", methods=["POST"]) def create_department(org_id): try: uid, role, _ = verify_org_admin(request.headers.get("Authorization"), org_id) data = request.get_json() or {} name = data.get("name", "").strip() if not name: return jsonify({"error": "Department name required."}), 400 dept_id = str(uuid.uuid4()) dept_data = { "deptId": dept_id, "name": name, "description": data.get("description", ""), "createdAt": _now_iso(), "createdBy": uid, } db_ref.child(f"departments/{org_id}/{dept_id}").set(dept_data) _write_audit(org_id, uid, "department_created", {"dept_name": name, "dept_id": dept_id}) return jsonify(dept_data), 201 except PermissionError as e: return jsonify({"error": str(e)}), 403 @app.route("/api/org//departments", methods=["GET"]) def list_departments(org_id): try: uid, user_data = verify_org_member(request.headers.get("Authorization"), org_id) depts = db_ref.child(f"departments/{org_id}").get() or {} return jsonify(list(depts.values())), 200 except PermissionError as e: return jsonify({"error": str(e)}), 403 # ============================================================================= # 6. USER MANAGEMENT ENDPOINTS # ============================================================================= @app.route("/api/org//users", methods=["POST"]) def add_user(org_id): """ Admin adds a user to the org. Body: { email, password, displayName, phone, position, department_id, platform_role } platform_role: employee | dept_admin """ try: uid, role, _ = verify_org_admin(request.headers.get("Authorization"), org_id) data = request.get_json() or {} email = data.get("email") password = data.get("password") display_name = data.get("displayName", "") phone = data.get("phone", "") position = data.get("position", "") dept_id = data.get("department_id") member_role = data.get("platform_role", "employee") if not email or not password: return jsonify({"error": "email and password required."}), 400 if member_role not in ("employee", "dept_admin"): member_role = "employee" try: fb_user = auth.create_user(email=email, password=password, display_name=display_name) except Exception as e: if "EMAIL_EXISTS" in str(e): # User already exists in Firebase Auth — look them up and add to this org. # This is intentional: an employee may have an account from another org # or a pre-existing Firebase account. We enrol them here explicitly. try: fb_user = auth.get_user_by_email(email) logger.info(f"add_user | Email {email} exists in Auth — enrolling uid={fb_user.uid} into org={org_id}") except Exception as lookup_err: logger.error(f"add_user | Could not look up existing user {email}: {lookup_err}") return jsonify({"error": "Email already registered and could not be retrieved."}), 409 else: raise user_data = { "uid": fb_user.uid, "email": email, "displayName": display_name, "phone": phone, "platform_role": member_role, "department_id": dept_id, "position": position, "createdAt": _now_iso(), "addedBy": uid, "active": True, "leave_balance": {"annual": 20, "sick": 10, "study": 5}, "whatsapp_state": "idle", } db_ref.child(f"users/{org_id}/{fb_user.uid}").set(user_data) # Welcome via WhatsApp if phone provided if phone: org = get_org(org_id) org_name = org.get("name", "your organization") if org else "your organization" _send_whatsapp( phone, f"Welcome to {org_name} on IrisTrack! 👋\n\n" f"You can update your weekly activities, request leave, submit expenses, " f"and more — just reply to this message.\n\n" f"I'll remind you on reporting day. Talk soon!" ) _write_audit(org_id, uid, "user_added", {"new_uid": fb_user.uid, "email": email}) return jsonify({"success": True, "uid": fb_user.uid, **user_data}), 201 except PermissionError as e: return jsonify({"error": str(e)}), 403 except Exception as e: logger.error(f"add_user failed: {e}\n{traceback.format_exc()}") return jsonify({"error": str(e)}), 500 @app.route("/api/org//users", methods=["GET"]) def list_users(org_id): try: uid, role, _ = verify_org_admin(request.headers.get("Authorization"), org_id) users = db_ref.child(f"users/{org_id}").get() or {} result = list(users.values()) # Dept admins only see their department if role == "dept_admin": caller_data = db_ref.child(f"users/{org_id}/{uid}").get() or {} my_dept = caller_data.get("department_id") result = [u for u in result if u.get("department_id") == my_dept] dept_filter = request.args.get("department_id") if dept_filter: result = [u for u in result if u.get("department_id") == dept_filter] return jsonify(result), 200 except PermissionError as e: return jsonify({"error": str(e)}), 403 @app.route("/api/org//users/", methods=["GET"]) def get_user(org_id, target_uid): try: caller_uid, caller_data = verify_org_member(request.headers.get("Authorization"), org_id) # Users can only fetch their own record unless they are admin if caller_uid != target_uid and caller_data.get("platform_role") not in ("org_admin", "dept_admin"): return jsonify({"error": "Access denied."}), 403 user_data = db_ref.child(f"users/{org_id}/{target_uid}").get() if not user_data: return jsonify({"error": "User not found."}), 404 return jsonify(user_data), 200 except PermissionError as e: return jsonify({"error": str(e)}), 403 @app.route("/api/org//users/", methods=["PUT"]) def update_user(org_id, target_uid): try: caller_uid, role, _ = verify_org_admin(request.headers.get("Authorization"), org_id) data = request.get_json() or {} allowed = ["displayName", "phone", "position", "department_id", "platform_role", "active", "leave_balance"] update = {k: v for k, v in data.items() if k in allowed} if not update: return jsonify({"error": "No valid fields to update."}), 400 user_ref = db_ref.child(f"users/{org_id}/{target_uid}") if not user_ref.get(): return jsonify({"error": "User not found."}), 404 user_ref.update(update) _write_audit(org_id, caller_uid, "user_updated", {"target_uid": target_uid, "fields": list(update.keys())}) return jsonify({"success": True}), 200 except PermissionError as e: return jsonify({"error": str(e)}), 403 @app.route("/api/org//users/me", methods=["GET"]) def get_my_profile(org_id): try: uid, user_data = verify_org_member(request.headers.get("Authorization"), org_id) return jsonify({"uid": uid, **user_data}), 200 except PermissionError as e: return jsonify({"error": str(e)}), 403 # ============================================================================= # 7. KPI MODEL & TASK ENDPOINTS # ============================================================================= @app.route("/api/org//kpi-models", methods=["POST"]) def create_kpi_model(org_id): """ Admin defines KPI model for a role via natural language. Body: { role_name, natural_language OR kpi_categories (manual override) } """ try: uid, role, _ = verify_org_admin(request.headers.get("Authorization"), org_id) data = request.get_json() or {} role_name = data.get("role_name", "").strip() if not role_name: return jsonify({"error": "role_name required."}), 400 if data.get("natural_language"): synthesized = gemini_synthesize_kpi(data["natural_language"], role_name) elif data.get("kpi_categories"): # Manual override — admin provides structured categories synthesized = { "role": role_name, "kpi_categories": data["kpi_categories"], "synthesized_from": "manual", } else: return jsonify({"error": "natural_language or kpi_categories required."}), 400 model_id = str(uuid.uuid4()) model_data = { "modelId": model_id, "orgId": org_id, "createdBy": uid, "createdAt": _now_iso(), **synthesized, } db_ref.child(f"kpi_models/{org_id}/{model_id}").set(model_data) _write_audit(org_id, uid, "kpi_model_created", {"role_name": role_name, "model_id": model_id}) return jsonify(model_data), 201 except PermissionError as e: return jsonify({"error": str(e)}), 403 except Exception as e: logger.error(f"create_kpi_model failed: {e}") return jsonify({"error": str(e)}), 500 @app.route("/api/org//kpi-models", methods=["GET"]) def list_kpi_models(org_id): try: uid, user_data = verify_org_member(request.headers.get("Authorization"), org_id) models = db_ref.child(f"kpi_models/{org_id}").get() or {} return jsonify(list(models.values())), 200 except PermissionError as e: return jsonify({"error": str(e)}), 403 @app.route("/api/org//kpi-models/preview", methods=["POST"]) def preview_kpi_model(org_id): """ Gemini synthesises a KPI model from natural language and returns it for admin review WITHOUT saving it. Admin confirms by calling POST /kpi-models with the same payload (or the returned kpi_categories directly). Body: { role_name, natural_language } """ try: uid, role, _ = verify_org_admin(request.headers.get("Authorization"), org_id) data = request.get_json() or {} role_name = data.get("role_name", "").strip() nl = data.get("natural_language", "").strip() if not role_name or not nl: return jsonify({"error": "role_name and natural_language required."}), 400 synthesized = gemini_synthesize_kpi(nl, role_name) return jsonify({"preview": synthesized, "confirmed": False}), 200 except PermissionError as e: return jsonify({"error": str(e)}), 403 except Exception as e: logger.error(f"preview_kpi_model failed: {e}") return jsonify({"error": str(e)}), 500 @app.route("/api/org//kpi-models/", methods=["PUT"]) def update_kpi_model(org_id, model_id): """ Update an existing KPI model. Body: { role_name?, kpi_categories?, natural_language? } If natural_language provided, Gemini re-synthesises. Otherwise accepts kpi_categories directly. """ try: uid, role, _ = verify_org_admin(request.headers.get("Authorization"), org_id) model_ref = db_ref.child(f"kpi_models/{org_id}/{model_id}") model_data = model_ref.get() if not model_data: return jsonify({"error": "KPI model not found."}), 404 data = request.get_json() or {} update = {"updatedAt": _now_iso(), "updatedBy": uid} if data.get("role_name"): update["role"] = data["role_name"] if data.get("natural_language"): synthesized = gemini_synthesize_kpi(data["natural_language"], data.get("role_name", model_data.get("role", ""))) update["kpi_categories"] = synthesized["kpi_categories"] update["synthesized_from"] = data["natural_language"] elif data.get("kpi_categories"): update["kpi_categories"] = data["kpi_categories"] update["synthesized_from"] = "manual" model_ref.update(update) _write_audit(org_id, uid, "kpi_model_updated", {"model_id": model_id}) return jsonify({**model_data, **update}), 200 except PermissionError as e: return jsonify({"error": str(e)}), 403 except Exception as e: logger.error(f"update_kpi_model failed: {e}") return jsonify({"error": str(e)}), 500 @app.route("/api/org//kpi-models/", methods=["DELETE"]) def delete_kpi_model(org_id, model_id): """ Delete a KPI model. Org admin only. Models in active use (referenced by submissions) are soft-deleted — marked inactive rather than removed, so historical alignment scores remain meaningful. """ try: uid, role, _ = verify_org_admin(request.headers.get("Authorization"), org_id) if role != "org_admin": return jsonify({"error": "Only org_admin can delete KPI models."}), 403 model_ref = db_ref.child(f"kpi_models/{org_id}/{model_id}") model_data = model_ref.get() if not model_data: return jsonify({"error": "KPI model not found."}), 404 # Check if any submission references this model all_subs = db_ref.child(f"submissions/{org_id}").get() or {} in_use = any( isinstance(week_sub, dict) and week_sub.get("kpiModelId") == model_id for user_subs in all_subs.values() if isinstance(user_subs, dict) for week_sub in user_subs.values() if isinstance(week_sub, dict) ) if in_use: # Soft delete — keep data, mark inactive model_ref.update({"active": False, "deletedAt": _now_iso(), "deletedBy": uid}) _write_audit(org_id, uid, "kpi_model_soft_deleted", {"model_id": model_id}) return jsonify({"success": True, "soft_deleted": True, "reason": "Model has historical submissions — marked inactive rather than removed."}), 200 model_ref.delete() _write_audit(org_id, uid, "kpi_model_deleted", {"model_id": model_id}) return jsonify({"success": True, "soft_deleted": False}), 200 except PermissionError as e: return jsonify({"error": str(e)}), 403 except Exception as e: logger.error(f"delete_kpi_model failed: {e}") return jsonify({"error": str(e)}), 500 @app.route("/api/org//tasks/preview", methods=["POST"]) def preview_task(org_id): """ Gemini structures a task from natural language and returns it for admin review WITHOUT saving. Admin confirms by calling POST /tasks with the same payload. Body: { natural_language, kpi_model_id? } """ try: uid, role, _ = verify_org_admin(request.headers.get("Authorization"), org_id) data = request.get_json() or {} nl = data.get("natural_language", "").strip() if not nl: return jsonify({"error": "natural_language required."}), 400 org = get_org(org_id) org_context = (org.get("name", "") + " — " + org.get("industry", "")) if org else org_id kpi_cats = [] if data.get("kpi_model_id"): model = db_ref.child(f"kpi_models/{org_id}/{data['kpi_model_id']}").get() if model: kpi_cats = [c["name"] for c in model.get("kpi_categories", [])] structured = gemini_synthesize_task(nl, org_context, kpi_cats) due_date = (date.today() + timedelta(days=structured.get("suggested_due_days", 7))).isoformat() return jsonify({"preview": {**structured, "resolved_due_date": due_date}, "confirmed": False}), 200 except PermissionError as e: return jsonify({"error": str(e)}), 403 except Exception as e: logger.error(f"preview_task failed: {e}") return jsonify({"error": str(e)}), 500 @app.route("/api/org//tasks", methods=["POST"]) def create_task(org_id): """ Admin creates a task via natural language. Gemini structures it. Body: { natural_language, assigned_to_uid, kpi_model_id (optional), due_date (optional override) } """ try: uid, role, _ = verify_org_admin(request.headers.get("Authorization"), org_id) data = request.get_json() or {} nl = data.get("natural_language", "").strip() if not nl: return jsonify({"error": "natural_language required."}), 400 org = get_org(org_id) org_context = (org.get("name", "") + " — " + org.get("industry", "")) if org else org_id # Get KPI categories for context kpi_cats = [] if data.get("kpi_model_id"): model = db_ref.child(f"kpi_models/{org_id}/{data['kpi_model_id']}").get() if model: kpi_cats = [c["name"] for c in model.get("kpi_categories", [])] structured = gemini_synthesize_task(nl, org_context, kpi_cats) # Resolve due date due_date = data.get("due_date") if not due_date and structured.get("suggested_due_days"): due_date = (date.today() + timedelta(days=structured["suggested_due_days"])).isoformat() task_id = str(uuid.uuid4()) task_data = { "taskId": task_id, "orgId": org_id, "assignedTo": data.get("assigned_to_uid"), # canonical GET field "assigned_to_uid": data.get("assigned_to_uid"), # mirrors POST input contract "assignedBy": uid, "kpiModelId": data.get("kpi_model_id"), "dueDate": due_date, "status": "open", "createdAt": _now_iso(), "synthesized_from": nl, **structured, } db_ref.child(f"tasks/{org_id}/{task_id}").set(task_data) # Notify assigned user assigned_uid = data.get("assigned_to_uid") if assigned_uid: assigned_user = db_ref.child(f"users/{org_id}/{assigned_uid}").get() or {} _push_notification(org_id, assigned_uid, "New Task Assigned", structured.get("title", "New task"), "info", {"task_id": task_id}) if assigned_user.get("phone"): _send_whatsapp( assigned_user["phone"], f"📋 New task assigned to you:\n\n" f"*{structured.get('title', '')}*\n" f"{structured.get('description', '')}\n\n" f"Priority: {structured.get('priority', 'medium').upper()}\n" f"Due: {due_date or 'No due date'}" ) _write_audit(org_id, uid, "task_created", {"task_id": task_id, "title": structured.get("title")}) return jsonify(task_data), 201 except PermissionError as e: return jsonify({"error": str(e)}), 403 except Exception as e: logger.error(f"create_task failed: {e}") return jsonify({"error": str(e)}), 500 @app.route("/api/org//tasks", methods=["GET"]) def list_tasks(org_id): try: uid, user_data = verify_org_member(request.headers.get("Authorization"), org_id) all_tasks = db_ref.child(f"tasks/{org_id}").get() or {} tasks = list(all_tasks.values()) role = user_data.get("platform_role", "employee") if role == "employee": # Employees see only their assigned tasks tasks = [t for t in tasks if t.get("assignedTo") == uid] elif role == "dept_admin": # Dept admins see tasks for their department members my_dept = user_data.get("department_id") dept_users = { u_uid for u_uid, u_data in (db_ref.child(f"users/{org_id}").get() or {}).items() if isinstance(u_data, dict) and u_data.get("department_id") == my_dept } tasks = [t for t in tasks if t.get("assignedTo") in dept_users] status_filter = request.args.get("status") if status_filter: tasks = [t for t in tasks if t.get("status") == status_filter] return jsonify(tasks), 200 except PermissionError as e: return jsonify({"error": str(e)}), 403 @app.route("/api/org//tasks/", methods=["PUT"]) def update_task_status(org_id, task_id): """Employee updates their task status. Body: { status: open|in_progress|completed|blocked, notes }""" try: uid, user_data = verify_org_member(request.headers.get("Authorization"), org_id) data = request.get_json() or {} task_ref = db_ref.child(f"tasks/{org_id}/{task_id}") task_data = task_ref.get() if not task_data: return jsonify({"error": "Task not found."}), 404 role = user_data.get("platform_role", "employee") if role == "employee" and task_data.get("assignedTo") != uid: return jsonify({"error": "You can only update tasks assigned to you."}), 403 new_status = data.get("status") if new_status not in ("open", "in_progress", "completed", "blocked"): return jsonify({"error": "Invalid status."}), 400 update = {"status": new_status, "updatedAt": _now_iso()} if data.get("notes"): update["completion_notes"] = data["notes"] if new_status == "completed": update["completedAt"] = _now_iso() task_ref.update(update) _write_audit(org_id, uid, "task_updated", {"task_id": task_id, "new_status": new_status}) return jsonify({"success": True}), 200 except PermissionError as e: return jsonify({"error": str(e)}), 403 # ============================================================================= # 8. SUBMISSION ENDPOINTS # ============================================================================= def _process_submission_text(org_id: str, user_uid: str, text: str, week_id: str = None): """ Core submission processing logic. Used by both web and WhatsApp paths. Returns structured submission data — does NOT send any WhatsApp reply to the submitting employee. Notifies dept head / org admin instead. """ user_data = db_ref.child(f"users/{org_id}/{user_uid}").get() or {} if not week_id: week_id = _current_week_id() # Resolve KPI model for this user's role/position position = user_data.get("position", "") kpi_categories, kpi_weights, kpi_model_id = [], {}, None all_models = db_ref.child(f"kpi_models/{org_id}").get() or {} for mid, model in all_models.items(): if isinstance(model, dict) and model.get("role", "").lower() in position.lower(): kpi_model_id = mid cats = model.get("kpi_categories", []) kpi_categories = [c["name"] for c in cats] kpi_weights = {c["name"]: c["weight"] for c in cats} break # Fall back to all models combined if no role match if not kpi_categories and all_models: first_model = list(all_models.values())[0] cats = first_model.get("kpi_categories", []) kpi_categories = [c["name"] for c in cats] kpi_weights = {c["name"]: c["weight"] for c in cats} extracted = gemini_extract_submission(text, kpi_categories, position, kpi_weights) submission_id = str(uuid.uuid4()) submission_data = { "submissionId": submission_id, "orgId": org_id, "uid": user_uid, "weekId": week_id, "rawText": text, "kpiModelId": kpi_model_id, "submittedAt": _now_iso(), "source": "web", **extracted, } db_ref.child(f"submissions/{org_id}/{user_uid}/{week_id}").set(submission_data) db_ref.child(f"users/{org_id}/{user_uid}").update({ "last_submission_at": _now_iso(), "last_alignment_score": extracted.get("alignment_score"), }) # Notify dept head / org admin — NOT the employee dept_id = user_data.get("department_id") all_users = db_ref.child(f"users/{org_id}").get() or {} employee_name = user_data.get("displayName", user_uid) alignment_score = extracted.get("alignment_score", 0) drift_detected = extracted.get("role_drift_detected", False) drift_line = f"⚠️ Role drift detected: {extracted.get('role_drift_detail', '')}" if drift_detected else "✅ No role drift" admin_message = ( f"📊 Weekly update from *{employee_name}*\n\n" f"Alignment score: *{alignment_score}/100*\n" f"{drift_line}\n\n" f"Summary: {extracted.get('summary', '')[:200]}" ) for admin_uid, admin_data in all_users.items(): if not isinstance(admin_data, dict): continue admin_role = admin_data.get("platform_role", "employee") is_dept_head = admin_role == "dept_admin" and admin_data.get("department_id") == dept_id is_org_admin = admin_role == "org_admin" if is_dept_head or is_org_admin: _push_notification( org_id, admin_uid, f"Update from {employee_name}", f"Alignment: {alignment_score}/100. {'⚠️ Drift detected.' if drift_detected else ''}", "warning" if drift_detected else "info", {"submission_id": submission_id, "uid": user_uid, "week_id": week_id}, ) if admin_data.get("phone") and (drift_detected or alignment_score < 50): _send_whatsapp(admin_data["phone"], admin_message) return submission_data @app.route("/api/org//submissions", methods=["POST"]) def submit_weekly_update(org_id): """ Web submission endpoint. Accepts text OR audio file. Body (multipart): { text?, audio_file?, week_id? } """ try: uid, user_data = verify_org_member(request.headers.get("Authorization"), org_id) week_id = request.form.get("week_id") or request.json.get("week_id") if request.is_json else None # Resolve text — audio takes priority, transcribed via AssemblyAI text = None audio_file = request.files.get("audio_file") if audio_file: audio_bytes = audio_file.read() text = transcribe_audio_file(audio_bytes, audio_file.filename or "audio.ogg") else: text = (request.form.get("text") or (request.get_json() or {}).get("text", "")).strip() if not text: return jsonify({"error": "text or audio_file required."}), 400 submission = _process_submission_text(org_id, uid, text, week_id) return jsonify(submission), 201 except PermissionError as e: return jsonify({"error": str(e)}), 403 except Exception as e: logger.error(f"submit_weekly_update failed: {e}\n{traceback.format_exc()}") return jsonify({"error": str(e)}), 500 @app.route("/api/org//submissions/", methods=["GET"]) def get_user_submissions(org_id, target_uid): """Get all submissions for a user. Employees can only fetch their own.""" try: caller_uid, caller_data = verify_org_member(request.headers.get("Authorization"), org_id) role = caller_data.get("platform_role", "employee") if caller_uid != target_uid and role not in ("org_admin", "dept_admin"): return jsonify({"error": "Access denied."}), 403 subs = db_ref.child(f"submissions/{org_id}/{target_uid}").get() or {} return jsonify(list(subs.values())), 200 except PermissionError as e: return jsonify({"error": str(e)}), 403 @app.route("/api/org//submissions/week/", methods=["GET"]) def get_week_submissions(org_id, week_id): """Admin: get all submissions for a given week across the org.""" try: uid, role, caller_data = verify_org_admin(request.headers.get("Authorization"), org_id) all_users = db_ref.child(f"users/{org_id}").get() or {} # Department admins are scoped to their own department. if role == "dept_admin": my_dept = caller_data.get("department_id") allowed_uids = { u for u, ud in all_users.items() if isinstance(ud, dict) and ud.get("department_id") == my_dept } else: allowed_uids = set(all_users.keys()) result = [] for user_uid in allowed_uids: sub = _find_week_submission(org_id, user_uid, week_id) if sub: # Ensure the requested weekId is surfaced even if stored under # a legacy key. sub = {**sub, "weekId": week_id} result.append(sub) return jsonify(result), 200 except PermissionError as e: return jsonify({"error": str(e)}), 403 # ============================================================================= # 9. LEAVE MANAGEMENT ENDPOINTS # ============================================================================= def _process_leave_request_text(org_id: str, user_uid: str, text: str) -> dict: user_data = db_ref.child(f"users/{org_id}/{user_uid}").get() or {} employee_name = user_data.get("displayName", user_uid) extracted = gemini_process_leave_request(text, employee_name) request_id = str(uuid.uuid4()) leave_data = { "requestId": request_id, "orgId": org_id, "uid": user_uid, "employeeName": employee_name, "status": "pending", "submittedAt": _now_iso(), "raw_request": text, **extracted, } db_ref.child(f"leave_requests/{org_id}/{request_id}").set(leave_data) # Notify admins dept_id = user_data.get("department_id") all_users = db_ref.child(f"users/{org_id}").get() or {} for admin_uid, admin_data in all_users.items(): if not isinstance(admin_data, dict): continue admin_role = admin_data.get("platform_role", "employee") is_dept_head = admin_role == "dept_admin" and admin_data.get("department_id") == dept_id is_org_admin = admin_role == "org_admin" if is_dept_head or is_org_admin: _push_notification( org_id, admin_uid, f"Leave Request — {employee_name}", f"{extracted.get('leave_type', '').title()} leave: {extracted.get('start_date')} to {extracted.get('end_date')} ({extracted.get('days_requested', '?')} days)", "warning" if extracted.get("is_urgent") else "info", {"request_id": request_id}, ) # Confirm to employee via push notification only (no WhatsApp response from this path) _push_notification( org_id, user_uid, "Leave Request Received", f"Your {extracted.get('leave_type', 'leave')} request has been submitted and is pending approval.", "info", {"request_id": request_id}, ) return leave_data @app.route("/api/org//leave", methods=["POST"]) def submit_leave_request(org_id): """ Web leave request. Body: { text (natural language) } OR { audio_file } """ try: uid, user_data = verify_org_member(request.headers.get("Authorization"), org_id) audio_file = request.files.get("audio_file") if audio_file: text = transcribe_audio_file(audio_file.read(), audio_file.filename or "audio.ogg") else: text = (request.form.get("text") or (request.get_json() or {}).get("text", "")).strip() if not text: return jsonify({"error": "text or audio_file required."}), 400 leave_data = _process_leave_request_text(org_id, uid, text) return jsonify(leave_data), 201 except PermissionError as e: return jsonify({"error": str(e)}), 403 except Exception as e: logger.error(f"submit_leave_request failed: {e}") return jsonify({"error": str(e)}), 500 @app.route("/api/org//leave", methods=["GET"]) def list_leave_requests(org_id): """Admins see all requests; employees see their own.""" try: caller_uid, caller_data = verify_org_member(request.headers.get("Authorization"), org_id) role = caller_data.get("platform_role", "employee") all_requests = db_ref.child(f"leave_requests/{org_id}").get() or {} requests_list = list(all_requests.values()) if role == "employee": requests_list = [r for r in requests_list if r.get("uid") == caller_uid] elif role == "dept_admin": my_dept = caller_data.get("department_id") all_users = db_ref.child(f"users/{org_id}").get() or {} dept_uids = {u for u, ud in all_users.items() if isinstance(ud, dict) and ud.get("department_id") == my_dept} requests_list = [r for r in requests_list if r.get("uid") in dept_uids] status_filter = request.args.get("status") if status_filter: requests_list = [r for r in requests_list if r.get("status") == status_filter] requests_list.sort(key=lambda r: r.get("submittedAt", ""), reverse=True) return jsonify(requests_list), 200 except PermissionError as e: return jsonify({"error": str(e)}), 403 @app.route("/api/org//leave//decision", methods=["PUT"]) def decide_leave_request(org_id, request_id): """ Admin approves or declines a leave request. Body: { decision: approved|declined, note?: str } On approval, leave balance is automatically decremented. """ try: admin_uid, role, _ = verify_org_admin(request.headers.get("Authorization"), org_id) data = request.get_json() or {} decision = data.get("decision") if decision not in ("approved", "declined"): return jsonify({"error": "decision must be approved or declined."}), 400 leave_ref = db_ref.child(f"leave_requests/{org_id}/{request_id}") leave_data = leave_ref.get() if not leave_data: return jsonify({"error": "Leave request not found."}), 404 if leave_data.get("status") != "pending": return jsonify({"error": "This request has already been decided."}), 409 update = { "status": decision, "decidedBy": admin_uid, "decidedAt": _now_iso(), "decision_note": data.get("note", ""), } leave_ref.update(update) employee_uid = leave_data.get("uid") # Decrement leave balance on approval if decision == "approved": leave_type = leave_data.get("leave_type", "annual") days_taken = leave_data.get("days_requested", 0) user_ref = db_ref.child(f"users/{org_id}/{employee_uid}") user_data = user_ref.get() or {} balance = user_data.get("leave_balance", {}) current_days = balance.get(leave_type, 0) new_balance = max(0, current_days - days_taken) balance[leave_type] = new_balance user_ref.update({"leave_balance": balance}) # Notify employee employee_data = db_ref.child(f"users/{org_id}/{employee_uid}").get() or {} emoji = "✅" if decision == "approved" else "❌" decision_text = "approved" if decision == "approved" else "declined" notif_msg = ( f"Your {leave_data.get('leave_type', 'leave')} request " f"({leave_data.get('start_date')} to {leave_data.get('end_date')}) " f"has been {decision_text}." f"{' Note: ' + data['note'] if data.get('note') else ''}" ) _push_notification(org_id, employee_uid, f"{emoji} Leave {decision_text.title()}", notif_msg, "success" if decision == "approved" else "warning") if employee_data.get("phone"): _send_whatsapp(employee_data["phone"], f"{emoji} {notif_msg}") _write_audit(org_id, admin_uid, f"leave_{decision}", {"request_id": request_id, "employee_uid": employee_uid}) return jsonify({"success": True, "decision": decision}), 200 except PermissionError as e: return jsonify({"error": str(e)}), 403 except Exception as e: logger.error(f"decide_leave_request failed: {e}") return jsonify({"error": str(e)}), 500 # ============================================================================= # 10. CLIENT MEETING ENDPOINTS # ============================================================================= @app.route("/api/org//meetings", methods=["POST"]) def create_meeting(org_id): """ Schedule a client meeting. Body: { title, client_name, scheduled_at (ISO), attendee_uids[], notes? } """ try: uid, user_data = verify_org_member(request.headers.get("Authorization"), org_id) data = request.get_json() or {} title = data.get("title", "").strip() client_name = data.get("client_name", "").strip() scheduled_at = data.get("scheduled_at") attendee_uids = data.get("attendee_uids", [uid]) if not title or not client_name: return jsonify({"error": "title and client_name required."}), 400 if uid not in attendee_uids: attendee_uids.append(uid) meeting_id = str(uuid.uuid4()) meeting_data = { "meetingId": meeting_id, "orgId": org_id, "title": title, "clientName": client_name, "scheduledAt": scheduled_at, "attendeeUids": attendee_uids, "notes": data.get("notes", ""), "status": "scheduled", "createdBy": uid, "createdAt": _now_iso(), } db_ref.child(f"client_meetings/{org_id}/{meeting_id}").set(meeting_data) # Notify all attendees for attendee_uid in attendee_uids: attendee = db_ref.child(f"users/{org_id}/{attendee_uid}").get() or {} _push_notification(org_id, attendee_uid, f"Meeting Scheduled: {title}", f"Client: {client_name} | {scheduled_at or 'TBD'}", "info", {"meeting_id": meeting_id}) if attendee.get("phone") and attendee_uid != uid: _send_whatsapp( attendee["phone"], f"📅 Meeting scheduled: *{title}*\n" f"Client: {client_name}\n" f"When: {scheduled_at or 'TBD'}\n\n" f"After the meeting, send the recording audio here for automatic summarization." ) _write_audit(org_id, uid, "meeting_created", {"meeting_id": meeting_id, "title": title}) return jsonify(meeting_data), 201 except PermissionError as e: return jsonify({"error": str(e)}), 403 except Exception as e: logger.error(f"create_meeting failed: {e}") return jsonify({"error": str(e)}), 500 @app.route("/api/org//meetings//recording", methods=["POST"]) def upload_meeting_recording(org_id, meeting_id): """ Upload a meeting recording for AI processing. Accepts: audio_file (multipart) OR audio_url (JSON body). Gemini produces transcript summary, action items, and dual summaries (one for the employee, one for the manager/dept head). """ try: uid, user_data = verify_org_member(request.headers.get("Authorization"), org_id) meeting_ref = db_ref.child(f"client_meetings/{org_id}/{meeting_id}") meeting_data = meeting_ref.get() if not meeting_data: return jsonify({"error": "Meeting not found."}), 404 if uid not in meeting_data.get("attendeeUids", []): return jsonify({"error": "You are not an attendee of this meeting."}), 403 # Transcribe audio_file = request.files.get("audio_file") if audio_file: transcript = transcribe_audio_file(audio_file.read(), audio_file.filename or "audio.ogg") elif request.is_json and request.get_json().get("audio_url"): transcript = transcribe_audio_url(request.get_json()["audio_url"]) else: return jsonify({"error": "audio_file or audio_url required."}), 400 if not transcript or len(transcript.strip()) < 20: return jsonify({"error": "Transcript too short or empty. Check audio quality."}), 422 # Resolve attendee names attendee_names = [] for attendee_uid in meeting_data.get("attendeeUids", []): attendee = db_ref.child(f"users/{org_id}/{attendee_uid}").get() or {} if attendee.get("displayName"): attendee_names.append(attendee["displayName"]) org = get_org(org_id) org_context = org.get("name", org_id) if org else org_id intelligence = gemini_process_meeting_recording( transcript, meeting_data["title"], attendee_names, org_context, ) update = { "status": "completed", "completedAt": _now_iso(), "transcript": transcript, "intelligence": intelligence, } meeting_ref.update(update) # Distribute summaries — each attendee gets their personal summary for attendee_uid in meeting_data.get("attendeeUids", []): attendee = db_ref.child(f"users/{org_id}/{attendee_uid}").get() or {} _push_notification( org_id, attendee_uid, f"Meeting Summary Ready: {meeting_data['title']}", intelligence.get("executive_summary", "")[:120], "success", {"meeting_id": meeting_id}, ) if attendee.get("phone"): action_items_text = "\n".join( f"• {ai['action']} ({ai.get('owner', '?')}) — {ai.get('due_date', 'TBD')}" for ai in intelligence.get("action_items", [])[:5] ) _send_whatsapp( attendee["phone"], f"📋 *{meeting_data['title']} — Meeting Summary*\n\n" f"{intelligence.get('employee_summary', intelligence.get('executive_summary', ''))[:400]}\n\n" f"*Action Items:*\n{action_items_text or 'None identified'}" ) # Department head / org admin gets the manager summary dept_id = user_data.get("department_id") all_users = db_ref.child(f"users/{org_id}").get() or {} for admin_uid, admin_data in all_users.items(): if not isinstance(admin_data, dict): continue admin_role = admin_data.get("platform_role", "employee") is_dept_head = admin_role == "dept_admin" and admin_data.get("department_id") == dept_id is_org_admin = admin_role == "org_admin" if (is_dept_head or is_org_admin) and admin_uid not in meeting_data.get("attendeeUids", []): _push_notification( org_id, admin_uid, f"Meeting Intelligence: {meeting_data['title']}", intelligence.get("manager_summary", "")[:120], "info", {"meeting_id": meeting_id}, ) if admin_data.get("phone"): _send_whatsapp( admin_data["phone"], f"🤝 *{meeting_data['title']} — Manager Brief*\n\n" f"{intelligence.get('manager_summary', '')[:500]}\n\n" f"Sentiment: {intelligence.get('client_sentiment', 'N/A').upper()}\n" f"Follow-up required: {'Yes' if intelligence.get('follow_up_required') else 'No'}" ) _write_audit(org_id, uid, "meeting_recording_processed", {"meeting_id": meeting_id}) return jsonify({"meetingId": meeting_id, **update}), 200 except PermissionError as e: return jsonify({"error": str(e)}), 403 except Exception as e: logger.error(f"upload_meeting_recording failed: {e}\n{traceback.format_exc()}") return jsonify({"error": str(e)}), 500 @app.route("/api/org//meetings", methods=["GET"]) def list_meetings(org_id): try: uid, user_data = verify_org_member(request.headers.get("Authorization"), org_id) all_meetings = db_ref.child(f"client_meetings/{org_id}").get() or {} meetings = list(all_meetings.values()) role = user_data.get("platform_role", "employee") if role == "employee": meetings = [m for m in meetings if uid in m.get("attendeeUids", [])] status_filter = request.args.get("status") if status_filter: meetings = [m for m in meetings if m.get("status") == status_filter] meetings.sort(key=lambda m: m.get("scheduledAt", ""), reverse=True) return jsonify(meetings), 200 except PermissionError as e: return jsonify({"error": str(e)}), 403 @app.route("/api/org//meetings/", methods=["GET"]) def get_meeting(org_id, meeting_id): try: uid, user_data = verify_org_member(request.headers.get("Authorization"), org_id) meeting_data = db_ref.child(f"client_meetings/{org_id}/{meeting_id}").get() if not meeting_data: return jsonify({"error": "Meeting not found."}), 404 role = user_data.get("platform_role", "employee") if role == "employee" and uid not in meeting_data.get("attendeeUids", []): return jsonify({"error": "Access denied."}), 403 return jsonify(meeting_data), 200 except PermissionError as e: return jsonify({"error": str(e)}), 403 # ============================================================================= # 11. EXPENSE MANAGEMENT ENDPOINTS # ============================================================================= def _upload_receipt_image(org_id: str, expense_id: str, image_bytes: bytes, content_type: str) -> str: """ Uploads a receipt image to Firebase Storage and returns its public URL. Path: receipts/{orgId}/{expenseId}/{filename} Requires FIREBASE_STORAGE_BUCKET to be set. Falls back gracefully — if Storage is not configured, logs a warning and returns None. """ if not _fb_bucket: logger.warning("_upload_receipt_image | FIREBASE_STORAGE_BUCKET not set — skipping upload.") return None try: ext = content_type.split("/")[-1].replace("jpeg", "jpg") blob_path = f"receipts/{org_id}/{expense_id}/receipt.{ext}" bucket = storage.bucket() blob = bucket.blob(blob_path) blob.upload_from_string(image_bytes, content_type=content_type) blob.make_public() url = blob.public_url logger.info(f"Receipt uploaded: {url}") return url except Exception as e: logger.error(f"_upload_receipt_image failed for expense {expense_id}: {e}") return None def _process_expense_text(org_id: str, user_uid: str, text: str, receipt_url: str = None, expense_id: str = None) -> dict: extracted = gemini_extract_expense(text, receipt_url) expense_id = expense_id or str(uuid.uuid4()) # use pre-generated id (Storage upload) or mint new one user_data = db_ref.child(f"users/{org_id}/{user_uid}").get() or {} expense_data = { "expenseId": expense_id, "orgId": org_id, "uid": user_uid, "employeeName": user_data.get("displayName", user_uid), "department_id": user_data.get("department_id"), "receiptUrl": receipt_url, "rawText": text, "status": "pending", "submittedAt": _now_iso(), **extracted, } db_ref.child(f"expenses/{org_id}/{expense_id}").set(expense_data) # Check budget utilization dept_id = user_data.get("department_id") if dept_id and extracted.get("category"): budget_ref = db_ref.child(f"budgets/{org_id}/{dept_id}") budget_data = budget_ref.get() or {} cat_budget = budget_data.get(extracted["category"], {}) if isinstance(cat_budget, dict): allocated = float(cat_budget.get("allocated", 0)) spent = float(cat_budget.get("spent", 0)) new_spent = spent + float(extracted.get("amount", 0)) utilization = (new_spent / allocated * 100) if allocated > 0 else 0 expense_data["budget_utilization_pct"] = round(utilization, 1) # Notify admin all_users = db_ref.child(f"users/{org_id}").get() or {} employee_name = user_data.get("displayName", user_uid) for admin_uid, admin_data in all_users.items(): if not isinstance(admin_data, dict): continue admin_role = admin_data.get("platform_role", "employee") is_dept_head = admin_role == "dept_admin" and admin_data.get("department_id") == dept_id is_org_admin = admin_role == "org_admin" if is_dept_head or is_org_admin: _push_notification( org_id, admin_uid, f"Expense: {employee_name}", f"{extracted.get('currency', '')} {extracted.get('amount', '?')} — {extracted.get('category', '?')} — {extracted.get('vendor', '?')}", "info", {"expense_id": expense_id}, ) return expense_data @app.route("/api/org//expenses", methods=["POST"]) def submit_expense(org_id): """ Accepts multipart/form-data or JSON. Fields: text — natural language description (required unless audio_file provided) audio_file — voice description (multipart); transcribed via AssemblyAI receipt_image — receipt photo (multipart); uploaded to Firebase Storage → sets receiptUrl receipt_url — fallback string URL if client handles its own upload Priority: receipt_image (uploaded here) > receipt_url (client-provided string) """ try: uid, user_data = verify_org_member(request.headers.get("Authorization"), org_id) # ── Resolve description text ────────────────────────────────────────── audio_file = request.files.get("audio_file") if audio_file: text = transcribe_audio_file(audio_file.read(), audio_file.filename or "audio.ogg") else: text = (request.form.get("text") or (request.get_json() or {}).get("text", "")).strip() if not text: return jsonify({"error": "text or audio_file required."}), 400 # ── Resolve receipt URL ─────────────────────────────────────────────── # Generate expense_id early so the Storage path is stable before DB write expense_id = str(uuid.uuid4()) receipt_url = None receipt_image = request.files.get("receipt_image") if receipt_image and receipt_image.filename: content_type = receipt_image.content_type or "image/jpeg" receipt_url = _upload_receipt_image(org_id, expense_id, receipt_image.read(), content_type) if not receipt_url: logger.warning(f"submit_expense | Receipt upload failed for {expense_id} — continuing without image.") if not receipt_url: # Fall back to client-provided URL receipt_url = request.form.get("receipt_url") or (request.get_json() or {}).get("receipt_url") expense_data = _process_expense_text(org_id, uid, text, receipt_url, expense_id=expense_id) return jsonify(expense_data), 201 except PermissionError as e: return jsonify({"error": str(e)}), 403 except Exception as e: logger.error(f"submit_expense failed: {e}") return jsonify({"error": str(e)}), 500 @app.route("/api/org//expenses", methods=["GET"]) def list_expenses(org_id): try: caller_uid, caller_data = verify_org_member(request.headers.get("Authorization"), org_id) role = caller_data.get("platform_role", "employee") all_expenses = db_ref.child(f"expenses/{org_id}").get() or {} expenses = list(all_expenses.values()) if role == "employee": expenses = [e for e in expenses if e.get("uid") == caller_uid] elif role == "dept_admin": my_dept = caller_data.get("department_id") all_users = db_ref.child(f"users/{org_id}").get() or {} dept_uids = {u for u, ud in all_users.items() if isinstance(ud, dict) and ud.get("department_id") == my_dept} expenses = [e for e in expenses if e.get("uid") in dept_uids] status_filter = request.args.get("status") if status_filter: expenses = [e for e in expenses if e.get("status") == status_filter] expenses.sort(key=lambda e: e.get("submittedAt", ""), reverse=True) return jsonify(expenses), 200 except PermissionError as e: return jsonify({"error": str(e)}), 403 @app.route("/api/org//expenses//decision", methods=["PUT"]) def decide_expense(org_id, expense_id): """Body: { decision: approved|flagged|rejected, note? }""" try: admin_uid, role, _ = verify_org_admin(request.headers.get("Authorization"), org_id) data = request.get_json() or {} decision = data.get("decision") if decision not in ("approved", "flagged", "rejected"): return jsonify({"error": "decision must be approved, flagged, or rejected."}), 400 expense_ref = db_ref.child(f"expenses/{org_id}/{expense_id}") expense_data = expense_ref.get() if not expense_data: return jsonify({"error": "Expense not found."}), 404 expense_ref.update({ "status": decision, "decidedBy": admin_uid, "decidedAt": _now_iso(), "decision_note": data.get("note", ""), }) # Update budget spent on approval if decision == "approved": dept_id = expense_data.get("department_id") cat = expense_data.get("category") amount = float(expense_data.get("amount", 0)) if dept_id and cat: budget_ref = db_ref.child(f"budgets/{org_id}/{dept_id}") budget_data = budget_ref.get() or {} cat_budget = budget_data.get(cat, {"allocated": 0, "spent": 0}) if isinstance(cat_budget, dict): cat_budget["spent"] = float(cat_budget.get("spent", 0)) + amount budget_data[cat] = cat_budget budget_ref.set(budget_data) # Threshold alert at 80% allocated = float(cat_budget.get("allocated", 0)) if allocated > 0 and cat_budget["spent"] / allocated >= 0.8: _push_notification( org_id, admin_uid, f"Budget Alert: {cat}", f"Department {dept_id} has used {round(cat_budget['spent']/allocated*100)}% of {cat} budget.", "warning", ) # Notify employee employee_uid = expense_data.get("uid") employee_data = db_ref.child(f"users/{org_id}/{employee_uid}").get() or {} emoji_map = {"approved": "✅", "flagged": "⚠️", "rejected": "❌"} notif_msg = f"Your expense ({expense_data.get('currency', '')} {expense_data.get('amount', '?')} — {expense_data.get('vendor', '?')}) has been {decision}." _push_notification(org_id, employee_uid, f"{emoji_map.get(decision, '')} Expense {decision.title()}", notif_msg, "success" if decision == "approved" else "warning") if employee_data.get("phone"): _send_whatsapp(employee_data["phone"], f"{emoji_map.get(decision, '')} {notif_msg}") _write_audit(org_id, admin_uid, f"expense_{decision}", {"expense_id": expense_id}) return jsonify({"success": True}), 200 except PermissionError as e: return jsonify({"error": str(e)}), 403 except Exception as e: logger.error(f"decide_expense failed: {e}") return jsonify({"error": str(e)}), 500 # ============================================================================= # 12. BUDGET MANAGEMENT ENDPOINTS # ============================================================================= @app.route("/api/org//budgets/", methods=["PUT"]) def set_department_budget(org_id, dept_id): """ Admin sets budget allocations for a department. Body: { budgets: { "meals": { "allocated": 500, "spent": 0 }, ... } } """ try: uid, role, _ = verify_org_admin(request.headers.get("Authorization"), org_id) data = request.get_json() or {} budgets = data.get("budgets") if not budgets or not isinstance(budgets, dict): return jsonify({"error": "budgets object required."}), 400 db_ref.child(f"budgets/{org_id}/{dept_id}").update(budgets) _write_audit(org_id, uid, "budget_set", {"dept_id": dept_id, "categories": list(budgets.keys())}) return jsonify({"success": True}), 200 except PermissionError as e: return jsonify({"error": str(e)}), 403 @app.route("/api/org//budgets/", methods=["GET"]) def get_department_budget(org_id, dept_id): try: uid, role, caller_data = verify_org_admin(request.headers.get("Authorization"), org_id) budget_data = db_ref.child(f"budgets/{org_id}/{dept_id}").get() or {} return jsonify(budget_data), 200 except PermissionError as e: return jsonify({"error": str(e)}), 403 # ============================================================================= # 13. NOTIFICATION ENDPOINTS # ============================================================================= @app.route("/api/org//notifications", methods=["GET"]) def get_notifications(org_id): try: uid, user_data = verify_org_member(request.headers.get("Authorization"), org_id) raw = db_ref.child(f"notifications/{org_id}/{uid}").get() or {} items = sorted(raw.values(), key=lambda x: x.get("createdAt", ""), reverse=True) return jsonify({"notifications": items[:50]}), 200 except PermissionError as e: return jsonify({"error": str(e)}), 403 @app.route("/api/org//notifications//read", methods=["PUT"]) def mark_notification_read(org_id, notif_id): try: uid, _ = verify_org_member(request.headers.get("Authorization"), org_id) db_ref.child(f"notifications/{org_id}/{uid}/{notif_id}").update({"read": True}) return jsonify({"success": True}), 200 except PermissionError as e: return jsonify({"error": str(e)}), 403 @app.route("/api/org//notifications/mark-all-read", methods=["PUT"]) def mark_all_notifications_read(org_id): """ Marks all unread notifications for the calling user as read in a single write. Far cheaper than fanning out individual PUTs from the frontend. """ try: uid, _ = verify_org_member(request.headers.get("Authorization"), org_id) notifs = db_ref.child(f"notifications/{org_id}/{uid}").get() or {} updates = { notif_id: {**notif_data, "read": True} for notif_id, notif_data in notifs.items() if isinstance(notif_data, dict) and not notif_data.get("read", False) } if updates: db_ref.child(f"notifications/{org_id}/{uid}").update(updates) return jsonify({"success": True, "marked": len(updates)}), 200 except PermissionError as e: return jsonify({"error": str(e)}), 403 except Exception as e: logger.error(f"mark_all_notifications_read failed: {e}") return jsonify({"error": str(e)}), 500 @app.route("/api/org//notifications/broadcast", methods=["POST"]) def broadcast_notification(org_id): """ Admin sends a broadcast to all org users or a department. Body: { title, message, target: all|department, department_id?, send_whatsapp: bool } """ try: uid, role, _ = verify_org_admin(request.headers.get("Authorization"), org_id) data = request.get_json() or {} title = data.get("title", "").strip() message = data.get("message", "").strip() target = data.get("target", "all") dept_filter = data.get("department_id") via_wa = data.get("send_whatsapp", False) if not title or not message: return jsonify({"error": "title and message required."}), 400 all_users = db_ref.child(f"users/{org_id}").get() or {} sent_to = 0 for target_uid, target_data in all_users.items(): if not isinstance(target_data, dict) or not target_data.get("active"): continue if target == "department" and dept_filter and target_data.get("department_id") != dept_filter: continue _push_notification(org_id, target_uid, title, message, data.get("type", "info")) if via_wa and target_data.get("phone"): _send_whatsapp(target_data["phone"], f"📢 *{title}*\n\n{message}") sent_to += 1 _write_audit(org_id, uid, "broadcast_sent", {"title": title, "target": target, "sent_to": sent_to}) return jsonify({"success": True, "sent_to": sent_to}), 200 except PermissionError as e: return jsonify({"error": str(e)}), 403 # ============================================================================= # 14. ANALYTICS ENGINE # ============================================================================= def _compute_org_analytics(org_id: str) -> dict: """ Computes org-wide analytics for the admin dashboard. """ all_users = db_ref.child(f"users/{org_id}").get() or {} all_subs = db_ref.child(f"submissions/{org_id}").get() or {} all_expenses = db_ref.child(f"expenses/{org_id}").get() or {} all_leave = db_ref.child(f"leave_requests/{org_id}").get() or {} current_week = _current_week_id() week_keys = set(_candidate_week_keys(current_week)) total_users = len([u for u in all_users.values() if isinstance(u, dict) and u.get("active")]) submitted_uids_this_week = set() total_alignment = 0 alignment_count = 0 drift_flags = 0 score_distribution = {"0-40": 0, "41-60": 0, "61-80": 0, "81-100": 0} dept_alignment = defaultdict(list) for user_uid, user_sub_map in all_subs.items(): if not isinstance(user_sub_map, dict): continue user_data = all_users.get(user_uid, {}) dept_id = user_data.get("department_id") if isinstance(user_data, dict) else None for week_id, sub in user_sub_map.items(): if not isinstance(sub, dict): continue score = sub.get("alignment_score") if score is not None: total_alignment += score alignment_count += 1 dept_alignment[dept_id or "unassigned"].append(score) if score <= 40: score_distribution["0-40"] += 1 elif score <= 60: score_distribution["41-60"] += 1 elif score <= 80: score_distribution["61-80"] += 1 else: score_distribution["81-100"] += 1 if sub.get("role_drift_detected"): drift_flags += 1 if week_id in week_keys: submitted_uids_this_week.add(user_uid) submissions_this_week = len(submitted_uids_this_week) compliance_rate = round(submissions_this_week / total_users * 100, 1) if total_users else 0 avg_alignment = round(total_alignment / alignment_count, 1) if alignment_count else None dept_avg_alignment = { dept: round(sum(scores) / len(scores), 1) for dept, scores in dept_alignment.items() if scores } # Expense summary total_expenses_pending = sum(1 for e in all_expenses.values() if isinstance(e, dict) and e.get("status") == "pending") total_expenses_approved = sum(float(e.get("amount", 0)) for e in all_expenses.values() if isinstance(e, dict) and e.get("status") == "approved") # Leave summary leave_pending = sum(1 for l in all_leave.values() if isinstance(l, dict) and l.get("status") == "pending") leave_approved = sum(1 for l in all_leave.values() if isinstance(l, dict) and l.get("status") == "approved") # Missing submissions this week (only active users; checks all candidate week keys) missing_this_week = [ user_uid for user_uid, user_data in all_users.items() if isinstance(user_data, dict) and user_data.get("active") and user_uid not in submitted_uids_this_week ] return { "totalActiveUsers": total_users, "submissionsThisWeek": submissions_this_week, "complianceRate": compliance_rate, "missingSubmissionsUids": missing_this_week, "averageAlignmentScore": avg_alignment, "scoreDistribution": score_distribution, "driftFlagsTotal": drift_flags, "deptAlignmentAverages": dept_avg_alignment, "expensesPendingCount": total_expenses_pending, "totalApprovedSpend": round(total_expenses_approved, 2), "leavePendingCount": leave_pending, "leaveApprovedCount": leave_approved, "weekId": current_week, } def _compute_employee_analytics(org_id: str, user_uid: str) -> dict: all_subs = db_ref.child(f"submissions/{org_id}/{user_uid}").get() or {} scores = [] drift_count = 0 success_events = [] for week_id, sub in sorted(all_subs.items()): if not isinstance(sub, dict): continue score = sub.get("alignment_score") if score is not None: scores.append({"weekId": week_id, "score": score, "submittedAt": sub.get("submittedAt")}) if sub.get("role_drift_detected"): drift_count += 1 for se in sub.get("success_events", []): success_events.append({"weekId": week_id, "event": se}) avg_score = round(sum(s["score"] for s in scores) / len(scores), 1) if scores else None velocity = 0.0 if len(scores) >= 2: n = len(scores) xs = list(range(1, n + 1)) ys = [s["score"] for s in scores] mx, my = sum(xs) / n, sum(ys) / n num = sum((x - mx) * (y - my) for x, y in zip(xs, ys)) den = sum((x - mx) ** 2 for x in xs) velocity = round(num / den, 2) if den else 0.0 user_data = db_ref.child(f"users/{org_id}/{user_uid}").get() or {} leave_requests = db_ref.child(f"leave_requests/{org_id}").get() or {} my_leave = [l for l in leave_requests.values() if isinstance(l, dict) and l.get("uid") == user_uid] return { "scoreTrend": scores, "averageScore": avg_score, "improvementVelocity": velocity, "driftFlagsTotal": drift_count, "recentSuccessEvents": success_events[-5:], "leaveBalance": user_data.get("leave_balance", {}), "leaveHistory": my_leave, "totalSubmissions": len(scores), } # ============================================================================= # 15. DASHBOARD ENDPOINTS # ============================================================================= @app.route("/api/org//dashboard/admin", methods=["GET"]) def admin_dashboard(org_id): try: uid, role, _ = verify_org_admin(request.headers.get("Authorization"), org_id) analytics = _compute_org_analytics(org_id) org = get_org(org_id) depts = db_ref.child(f"departments/{org_id}").get() or {} users = db_ref.child(f"users/{org_id}").get() or {} # Recent submissions — match canonical and legacy week keys current_week = _current_week_id() week_keys = _candidate_week_keys(current_week) recent_subs = [] for user_uid, sub_map in (db_ref.child(f"submissions/{org_id}").get() or {}).items(): if not isinstance(sub_map, dict): continue week_sub = None for key in week_keys: if isinstance(sub_map.get(key), dict): week_sub = sub_map[key] break if week_sub: user_info = users.get(user_uid, {}) recent_subs.append({ "submissionId": week_sub.get("submissionId"), "uid": user_uid, "displayName": user_info.get("displayName", user_uid), "department": user_info.get("department_id"), "weekId": current_week, "alignment_score": week_sub.get("alignment_score"), "role_drift_detected": week_sub.get("role_drift_detected"), "role_drift": week_sub.get("role_drift_detected"), "rawText": week_sub.get("rawText"), "summary": week_sub.get("summary"), "submittedAt": week_sub.get("submittedAt"), }) return jsonify({ "org": org, "analytics": analytics, "departments": list(depts.values()), "recentActivity": recent_subs, }), 200 except PermissionError as e: return jsonify({"error": str(e)}), 403 except Exception as e: logger.error(f"admin_dashboard failed: {e}\n{traceback.format_exc()}") return jsonify({"error": "Dashboard compute failed."}), 500 @app.route("/api/org//dashboard/employee", methods=["GET"]) def employee_dashboard(org_id): try: uid, user_data = verify_org_member(request.headers.get("Authorization"), org_id) analytics = _compute_employee_analytics(org_id, uid) # Tasks all_tasks = db_ref.child(f"tasks/{org_id}").get() or {} my_tasks = [t for t in all_tasks.values() if isinstance(t, dict) and t.get("assignedTo") == uid] open_tasks = [t for t in my_tasks if t.get("status") in ("open", "in_progress")] # Notifications notifs_raw = db_ref.child(f"notifications/{org_id}/{uid}").get() or {} unread_count = sum(1 for n in notifs_raw.values() if isinstance(n, dict) and not n.get("read")) return jsonify({ "user": user_data, "analytics": analytics, "openTasks": open_tasks, "unreadNotifications": unread_count, }), 200 except PermissionError as e: return jsonify({"error": str(e)}), 403 # ============================================================================= # 16. WHATSAPP / TWILIO RELAY ENDPOINTS # ============================================================================= # # All WhatsApp interactions are routed through /api/whatsapp/inbound. # Twilio sends a POST to this endpoint on every incoming message. # The server maintains a per-user conversation state in Firebase to handle # multi-turn interactions (e.g. confirming a leave request). # # WEBHOOK_SECRET is used to authenticate the weekly prompt scheduler call. # # SMART PROMPT LOGIC: # The weekly prompt scheduler checks each employee's submission history. # If Gemini confirms a meaningful submission already exists this week, the # employee is NOT prompted again. Only those with no qualifying submission # receive the Friday reminder. # # ============================================================================= @app.route("/api/whatsapp/inbound", methods=["POST"]) def whatsapp_inbound(): """ Receives incoming WhatsApp messages from Twilio. Identifies the org and user by phone number, routes the message. """ try: from_number = request.form.get("From", "").replace("whatsapp:", "").strip() body = request.form.get("Body", "").strip() media_url = request.form.get("MediaUrl0", "") media_type = request.form.get("MediaContentType0", "") if not from_number: return jsonify({"error": "No sender."}), 400 # Find user by phone number across all orgs org_id = user_uid = None platform_orgs = db_ref.child("platform_orgs").get() or {} for oid in platform_orgs: org_users = db_ref.child(f"users/{oid}").get() or {} for u_uid, u_data in org_users.items(): if isinstance(u_data, dict) and u_data.get("phone", "").replace("+", "").replace(" ", "") == from_number.replace("+", "").replace(" ", ""): org_id = oid user_uid = u_uid break if org_id: break if not org_id or not user_uid: logger.warning(f"WhatsApp inbound from unregistered number: {from_number}") return ("" "Sorry, your number is not registered on IrisTrack. " "Please contact your HR admin."), 200, {"Content-Type": "text/xml"} user_data = db_ref.child(f"users/{org_id}/{user_uid}").get() or {} wa_state = user_data.get("whatsapp_state", "idle") # If audio message — transcribe via AssemblyAI first if media_url and media_type and "audio" in media_type: try: # Download audio from Twilio (requires auth) audio_resp = requests.get( media_url, auth=(TWILIO_ACCOUNT_SID, TWILIO_AUTH_TOKEN), timeout=30, ) audio_resp.raise_for_status() body = transcribe_audio_file(audio_resp.content, "voice_note.ogg") logger.info(f"Transcribed WhatsApp audio for {user_uid}: {body[:100]}") except Exception as e: logger.error(f"WhatsApp audio transcription failed for {user_uid}: {e}") _send_whatsapp(from_number, "Sorry, I couldn't process your voice note. Please try again or type your message.") return (""), 200, {"Content-Type": "text/xml"} if not body: return (""), 200, {"Content-Type": "text/xml"} org = get_org(org_id) org_name = org.get("name", org_id) if org else org_id # Route by intent intent_result = gemini_process_general_whatsapp_message( body, user_data.get("displayName", "Employee"), org_name, wa_state, ) intent = intent_result.get("intent", "other") reply_to_user = None # We generally do NOT reply immediately to the employee extracted = intent_result.get("extracted_data", {}) if intent == "weekly_update": _process_submission_text(org_id, user_uid, body) # No reply to employee — admins are notified internally reply_to_user = None elif intent == "leave_request": _process_leave_request_text(org_id, user_uid, body) reply_to_user = "Your leave request has been submitted and is pending approval. You'll be notified once a decision is made." elif intent == "expense_submission": receipt_url = media_url if media_url and "image" in (media_type or "") else None expense_text = body if not expense_text and receipt_url: expense_text = "Expense via WhatsApp receipt upload" _process_expense_text(org_id, user_uid, expense_text, receipt_url) reply_to_user = "Expense submitted for approval. You'll be notified once it's reviewed." elif intent == "meeting_update": # Employee is confirming a meeting outcome or uploading recording if media_url and "audio" in (media_type or ""): # Find their most recent scheduled meeting meetings = db_ref.child(f"client_meetings/{org_id}").get() or {} recent_meeting = None for mid, m in meetings.items(): if isinstance(m, dict) and user_uid in m.get("attendeeUids", []) and m.get("status") in ("scheduled", "in_progress"): recent_meeting = (mid, m) break if recent_meeting: meeting_id, meeting_data = recent_meeting db_ref.child(f"client_meetings/{org_id}/{meeting_id}").update({"status": "in_progress"}) # Transcription + processing happens internally try: audio_resp = requests.get(media_url, auth=(TWILIO_ACCOUNT_SID, TWILIO_AUTH_TOKEN), timeout=30) transcript = transcribe_audio_file(audio_resp.content, "meeting_audio.ogg") attendee_names = [] for a_uid in meeting_data.get("attendeeUids", []): a = db_ref.child(f"users/{org_id}/{a_uid}").get() or {} if a.get("displayName"): attendee_names.append(a["displayName"]) intelligence = gemini_process_meeting_recording(transcript, meeting_data["title"], attendee_names, org_name) db_ref.child(f"client_meetings/{org_id}/{meeting_id}").update({ "status": "completed", "completedAt": _now_iso(), "transcript": transcript, "intelligence": intelligence, }) # Notify chain for a_uid in meeting_data.get("attendeeUids", []): a_data = db_ref.child(f"users/{org_id}/{a_uid}").get() or {} _push_notification(org_id, a_uid, f"Meeting Summary Ready: {meeting_data['title']}", intelligence.get("executive_summary", "")[:120], "success", {"meeting_id": meeting_id}) except Exception as e: logger.error(f"WhatsApp meeting recording processing failed: {e}") reply_to_user = None else: reply_to_user = None elif intent == "general_query": # Surface leave balance, task status etc leave_balance = user_data.get("leave_balance", {}) balance_text = ", ".join(f"{k}: {v} days" for k, v in leave_balance.items()) reply_to_user = f"Here's a quick summary for you:\n\nLeave balance: {balance_text}\n\nFor more details, log in to the IrisTrack portal." elif intent == "greeting": reply_to_user = f"Hi {user_data.get('displayName', 'there')}! 👋 You can send me your weekly update, request leave, submit an expense, or ask about your balances." if reply_to_user: _send_whatsapp(from_number, reply_to_user) return (""), 200, {"Content-Type": "text/xml"} except Exception as e: logger.error(f"whatsapp_inbound failed: {e}\n{traceback.format_exc()}") return (""), 200, {"Content-Type": "text/xml"} @app.route("/api/whatsapp/send-weekly-prompts", methods=["POST"]) def send_weekly_prompts(): """ Triggered by an external cron scheduler on the org's configured reporting day. Checks each employee's submission history — only sends prompts to those who have NOT made a meaningful update this week. Gemini evaluates alignment. Secured by WEBHOOK_SECRET. """ auth_header = request.headers.get("Authorization", "") if not WEBHOOK_SECRET or auth_header != f"Bearer {WEBHOOK_SECRET}": return jsonify({"error": "Unauthorized."}), 401 data = request.get_json() or {} org_id = data.get("org_id") platform_orgs = db_ref.child("platform_orgs").get() or {} orgs_to_process = [org_id] if org_id else list(platform_orgs.keys()) total_sent = 0 total_skipped = 0 for oid in orgs_to_process: org = get_org(oid) if not org or not org.get("active"): continue org_name = org.get("name", oid) org_goals = org.get("org_goals", "") current_week = _current_week_id() all_users = db_ref.child(f"users/{oid}").get() or {} all_subs = db_ref.child(f"submissions/{oid}").get() or {} # KPI categories for this org (combined from all models) all_models = db_ref.child(f"kpi_models/{oid}").get() or {} all_kpi_cats = list({ cat["name"] for model in all_models.values() if isinstance(model, dict) for cat in model.get("kpi_categories", []) if isinstance(cat, dict) }) for user_uid, user_data in all_users.items(): if not isinstance(user_data, dict) or not user_data.get("active"): continue if user_data.get("platform_role") in ("org_admin",): continue phone = user_data.get("phone", "") if not phone: continue # Check if employee already submitted meaningfully this week user_subs = all_subs.get(user_uid, {}) or {} week_sub = user_subs.get(current_week) already_submitted = False if week_sub and isinstance(week_sub, dict): # Direct submission record exists already_submitted = True else: # Check last_submission_at loosely via the user record last_sub_at = user_data.get("last_submission_at", "") if last_sub_at: try: last_sub_date = _parse_iso(last_sub_at).date() today = date.today() week_start = today - timedelta(days=today.weekday()) if last_sub_date >= week_start: # There's a recent submission — verify with Gemini if meaningful raw_text = week_sub.get("rawText", "") if week_sub else "" if raw_text: already_submitted = gemini_check_submission_alignment_with_goals( raw_text, org_goals, all_kpi_cats ) except Exception: pass if already_submitted: logger.info(f"CRON | Skipping prompt for {user_uid} — already submitted this week.") total_skipped += 1 continue # Send prompt display_name = user_data.get("displayName", "there") _send_whatsapp( phone, f"Hi {display_name}! 👋\n\n" f"It's your weekly check-in for *{org_name}*.\n\n" f"What did you work on this week? You can reply with a voice note or text — " f"no specific format needed, just tell me what you got done." ) total_sent += 1 logger.info(f"CRON | Sent weekly prompt to {user_uid} ({phone})") return jsonify({"success": True, "prompts_sent": total_sent, "skipped": total_skipped}), 200 # ============================================================================= # 17. SUPERADMIN ENDPOINTS # ============================================================================= @app.route("/api/superadmin/orgs", methods=["GET"]) def superadmin_list_orgs(): try: verify_superadmin(request.headers.get("Authorization")) platform_orgs = db_ref.child("platform_orgs").get() or {} result = [] for org_id, org_index in platform_orgs.items(): org_data = db_ref.child(f"organizations/{org_id}").get() or {} all_users = db_ref.child(f"users/{org_id}").get() or {} result.append({ **org_index, **org_data, "user_count": len([u for u in all_users.values() if isinstance(u, dict) and u.get("active")]), }) return jsonify(result), 200 except PermissionError as e: return jsonify({"error": str(e)}), 403 @app.route("/api/superadmin/orgs//analytics", methods=["GET"]) def superadmin_org_analytics(org_id): try: verify_superadmin(request.headers.get("Authorization")) analytics = _compute_org_analytics(org_id) return jsonify(analytics), 200 except PermissionError as e: return jsonify({"error": str(e)}), 403 @app.route("/api/superadmin/orgs//deactivate", methods=["POST"]) def superadmin_deactivate_org(org_id): try: verify_superadmin(request.headers.get("Authorization")) db_ref.child(f"organizations/{org_id}").update({"active": False, "deactivatedAt": _now_iso()}) db_ref.child(f"platform_orgs/{org_id}").update({"active": False}) logger.warning(f"SuperAdmin deactivated org: {org_id}") return jsonify({"success": True}), 200 except PermissionError as e: return jsonify({"error": str(e)}), 403 # ============================================================================= # 18. AUDIT LOG # ============================================================================= @app.route("/api/org//audit-log", methods=["GET"]) def get_audit_log(org_id): try: uid, role, _ = verify_org_admin(request.headers.get("Authorization"), org_id) if role != "org_admin": return jsonify({"error": "Only org_admin can access audit log."}), 403 log_entries = db_ref.child(f"audit_log/{org_id}").get() or {} entries = sorted(log_entries.values(), key=lambda e: e.get("timestamp", ""), reverse=True) limit = min(int(request.args.get("limit", 100)), 500) return jsonify({"entries": entries[:limit], "total": len(entries)}), 200 except PermissionError as e: return jsonify({"error": str(e)}), 403 # ============================================================================= # 19. DEBUG # ============================================================================= @app.route("/api/health", methods=["GET"]) def health_check(): return jsonify({ "status": "ok", "service": "IrisTrack", "version": "1.0.0", "timestamp": _now_iso(), "gemini": MODEL_NAME, "assemblyai": "configured" if ASSEMBLYAI_API_KEY else "missing", "twilio": "configured" if TWILIO_ACCOUNT_SID else "missing", }), 200 @app.route("/api/debug/transcribe-test", methods=["POST"]) def debug_transcribe(): """Test AssemblyAI with an uploaded audio file.""" try: audio_file = request.files.get("audio_file") if not audio_file: return jsonify({"error": "audio_file required."}), 400 transcript = transcribe_audio_file(audio_file.read(), audio_file.filename or "test.ogg") return jsonify({"transcript": transcript}), 200 except Exception as e: return jsonify({"error": str(e)}), 500 # ============================================================================= # 20. MAIN # ============================================================================= if __name__ == "__main__": port = int(os.environ.get("PORT", 7860)) app.run(debug=False, host="0.0.0.0", port=port)