Spaces:
Running
Running
| # ============================================================================= | |
| # IRISTRACK — BACKEND SERVER | |
| # ============================================================================= | |
| # SECTION INDEX | |
| # 1. Imports & Initialization | |
| # 2. Core Helper Functions | |
| # 3. Gemini Functions | |
| # 4. AssemblyAI Transcription | |
| # 5. Organization & Department Endpoints | |
| # 6. User Management Endpoints | |
| # 7. KPI Model & Task Endpoints | |
| # 8. Submission Endpoints | |
| # 9. Leave Management Endpoints | |
| # 10. Client Meeting Endpoints | |
| # 11. Expense Management Endpoints | |
| # 12. Budget Management Endpoints | |
| # 13. Notification Endpoints | |
| # 14. Analytics Engine | |
| # 15. Dashboard Endpoints | |
| # 16. WhatsApp / Twilio Relay Endpoints | |
| # 17. SuperAdmin Endpoints | |
| # 18. Audit Log | |
| # 19. Debug | |
| # 20. Main | |
| # | |
| # FIREBASE NODE CONTRACT | |
| # organizations/{orgId}/ | |
| # users/{orgId}/{uid}/ | |
| # departments/{orgId}/{deptId}/ | |
| # kpi_models/{orgId}/{roleId}/ | |
| # submissions/{orgId}/{uid}/{weekId}/ | |
| # tasks/{orgId}/{taskId}/ | |
| # expenses/{orgId}/{expenseId}/ | |
| # budgets/{orgId}/{deptId}/ | |
| # notifications/{orgId}/{uid}/ | |
| # leave_requests/{orgId}/{requestId}/ | |
| # client_meetings/{orgId}/{meetingId}/ | |
| # audit_log/{orgId}/{entryId}/ | |
| # platform_orgs/{orgId}/ <- SuperAdmin org index (list of all orgs) | |
| # superadmins/{uid}/ <- is_admin: True — set manually in Firebase console | |
| # | |
| # SUPERADMIN SAFETY CONTRACT | |
| # SuperAdmin = any Firebase user with is_admin: True in superadmins/{uid}. | |
| # Set this flag directly in the Firebase console — no endpoint required. | |
| # Pattern mirrors Pitchfy's is_admin flag. Single source of truth. | |
| # | |
| # WEBHOOK_SECRET explained | |
| # A shared secret string you generate once (e.g. openssl rand -hex 32) stored as a HF | |
| # Space secret. The cron scheduler sends it as "Authorization: Bearer <secret>" when | |
| # calling /api/whatsapp/send-weekly-prompts. Server compares strings — match = trusted. | |
| # It is a machine-to-machine API key for endpoints with no logged-in Firebase user. | |
| # | |
| # ENVIRONMENT VARIABLES (HuggingFace Secrets) | |
| # FIREBASE, Firebase_DB, FIREBASE_STORAGE_BUCKET, Gemini, ASSEMBLYAI_API_KEY, | |
| # TWILIO_ACCOUNT_SID, TWILIO_AUTH_TOKEN, TWILIO_WHATSAPP_NUMBER, | |
| # WEBHOOK_SECRET | |
| # ============================================================================= | |
| # ============================================================================= | |
| # 1. IMPORTS & INITIALIZATION | |
| # ============================================================================= | |
| import os | |
| import uuid | |
| import json | |
| import traceback | |
| import logging | |
| import hmac | |
| import hashlib | |
| import tempfile | |
| import time | |
| import re | |
| from datetime import datetime, timezone, date, timedelta | |
| from collections import defaultdict | |
| import requests | |
| from flask import Flask, request, jsonify | |
| from flask_cors import CORS | |
| import firebase_admin | |
| from firebase_admin import credentials, db, auth, storage | |
| try: | |
| from dotenv import load_dotenv | |
| load_dotenv() | |
| print("dotenv loaded (local dev mode)") | |
| except ImportError: | |
| print("dotenv not available — HF Spaces mode") | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| app = Flask(__name__) | |
| CORS(app) | |
| # ============================================================================= | |
| # CONSTANTS | |
| # ============================================================================= | |
| LEAVE_TYPES = ["annual", "sick", "maternity", "paternity", "unpaid", "study", "compassionate"] | |
| LEAVE_STATUSES = ["pending", "approved", "declined", "cancelled"] | |
| MEETING_STATUSES = ["scheduled", "in_progress", "completed", "cancelled"] | |
| EXPENSE_STATUSES = ["pending", "approved", "flagged", "rejected"] | |
| SUBMISSION_WEEK_FORMAT = "%G-W%V" # ISO 8601 week: %G = ISO year, %V = ISO week number (Mon-based) | |
| # Matches TypeScript's getISOWeek() output exactly e.g. "2026-W21" | |
| # %W (old value) was Monday-based week-of-year per Python locale, | |
| # which diverges from ISO by 1 on most dates — fixed here. | |
| # ============================================================================= | |
| # FIREBASE & AI INIT | |
| # ============================================================================= | |
| try: | |
| _fb_json = os.environ.get("FIREBASE") | |
| if not _fb_json: | |
| raise ValueError("'FIREBASE' env var not set.") | |
| _fb_url = os.environ.get("Firebase_DB") | |
| if not _fb_url: | |
| raise ValueError("'Firebase_DB' env var not set.") | |
| _fb_bucket = os.environ.get("FIREBASE_STORAGE_BUCKET", "") | |
| firebase_admin.initialize_app( | |
| credentials.Certificate(json.loads(_fb_json)), | |
| { | |
| "databaseURL": _fb_url, | |
| "storageBucket": _fb_bucket, # e.g. your-project.appspot.com | |
| }, | |
| ) | |
| db_ref = db.reference() | |
| logger.info(f"Firebase initialized. Storage bucket: '{_fb_bucket or 'not set'}'") | |
| except Exception as e: | |
| logger.critical(f"FATAL Firebase init: {e}") | |
| exit(1) | |
| try: | |
| _gemini_key = os.environ.get("Gemini") | |
| if not _gemini_key: | |
| raise ValueError("'Gemini' env var not set.") | |
| from google import genai | |
| gemini_client = genai.Client(api_key=_gemini_key) | |
| MODEL_NAME = "gemini-3.1-flash-lite" | |
| logger.info(f"Gemini initialized ({MODEL_NAME}).") | |
| except Exception as e: | |
| logger.critical(f"FATAL Gemini init: {e}") | |
| exit(1) | |
| ASSEMBLYAI_API_KEY = os.environ.get("ASSEMBLYAI_API_KEY") | |
| if not ASSEMBLYAI_API_KEY: | |
| logger.warning("ASSEMBLYAI_API_KEY not set — audio transcription disabled.") | |
| TWILIO_ACCOUNT_SID = os.environ.get("TWILIO_ACCOUNT_SID") | |
| TWILIO_AUTH_TOKEN = os.environ.get("TWILIO_AUTH_TOKEN") | |
| TWILIO_WHATSAPP_NUMBER = os.environ.get("TWILIO_WHATSAPP_NUMBER", "whatsapp:+14155238886") | |
| WEBHOOK_SECRET = os.environ.get("WEBHOOK_SECRET") | |
| # ============================================================================= | |
| # 2. CORE HELPER FUNCTIONS | |
| # ============================================================================= | |
| def _now_iso(): | |
| return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S.%f") + "Z" | |
| def _current_week_id(): | |
| return datetime.now(timezone.utc).strftime(SUBMISSION_WEEK_FORMAT) | |
| def _week_id_for_date(d: date): | |
| return d.strftime(SUBMISSION_WEEK_FORMAT) | |
| def _candidate_week_keys(week_id: str) -> list: | |
| """ | |
| Return candidate storage keys for an ISO week id, including legacy | |
| `%Y-W%W` keys for the same calendar week. The first item is the canonical | |
| (current-format) key. | |
| Earlier builds stored submissions under Python's `strftime("%Y-W%W")`, | |
| which usually differs from ISO `%G-W%V` by one. Old records remain under | |
| those keys, so reads must check both. | |
| """ | |
| keys = [week_id] | |
| m = re.match(r"^(\d{4})-W(\d{2})$", week_id or "") | |
| if not m: | |
| return keys | |
| iso_year = int(m.group(1)) | |
| iso_week = int(m.group(2)) | |
| try: | |
| monday = date.fromisocalendar(iso_year, iso_week, 1) | |
| except Exception: | |
| return keys | |
| for i in range(7): | |
| d = monday + timedelta(days=i) | |
| legacy = d.strftime("%Y-W%W") | |
| if legacy not in keys: | |
| keys.append(legacy) | |
| return keys | |
| def _find_week_submission(org_id: str, user_uid: str, week_id: str): | |
| """Return the first submission for a user across candidate week keys.""" | |
| for key in _candidate_week_keys(week_id): | |
| sub = db_ref.child(f"submissions/{org_id}/{user_uid}/{key}").get() | |
| if sub: | |
| return sub | |
| return None | |
| def _parse_iso(ts: str) -> datetime: | |
| if ts.endswith("Z"): | |
| ts = ts[:-1] + "+00:00" | |
| return datetime.fromisoformat(ts) | |
| def verify_token(auth_header): | |
| if not auth_header or not auth_header.startswith("Bearer "): | |
| return None | |
| try: | |
| return auth.verify_id_token(auth_header.split("Bearer ")[1])["uid"] | |
| except Exception as e: | |
| logger.warning(f"Token verification failed: {e}") | |
| return None | |
| def verify_superadmin(auth_header): | |
| """ | |
| SuperAdmin check — mirrors Pitchfy's is_admin pattern. | |
| Set is_admin: True on superadmins/{uid} directly in the Firebase console | |
| for rairorr@gmail.com's uid. No endpoint needed. | |
| """ | |
| uid = verify_token(auth_header) | |
| if not uid: | |
| raise PermissionError("Invalid or missing token.") | |
| user_data = db_ref.child(f"superadmins/{uid}").get() | |
| if not user_data or not user_data.get("is_admin", False): | |
| raise PermissionError("SuperAdmin access required.") | |
| return uid | |
| def verify_org_admin(auth_header, org_id): | |
| uid = verify_token(auth_header) | |
| if not uid: | |
| raise PermissionError("Invalid or missing token.") | |
| user_data = db_ref.child(f"users/{org_id}/{uid}").get() | |
| if not user_data: | |
| raise PermissionError("User not found in organization.") | |
| role = user_data.get("platform_role", "employee") | |
| if role not in ("org_admin", "dept_admin"): | |
| raise PermissionError("Admin access required.") | |
| return uid, role, user_data | |
| def verify_org_member(auth_header, org_id): | |
| uid = verify_token(auth_header) | |
| if not uid: | |
| raise PermissionError("Invalid or missing token.") | |
| user_data = db_ref.child(f"users/{org_id}/{uid}").get() | |
| if not user_data: | |
| raise PermissionError("User not found in organization.") | |
| return uid, user_data | |
| def get_org(org_id): | |
| return db_ref.child(f"organizations/{org_id}").get() | |
| def _write_audit(org_id, actor_uid, action, details=None): | |
| try: | |
| ref = db_ref.child(f"audit_log/{org_id}").push() | |
| ref.set({ | |
| "entryId": ref.key, | |
| "actorUid": actor_uid, | |
| "action": action, | |
| "details": details or {}, | |
| "timestamp": _now_iso(), | |
| }) | |
| except Exception as e: | |
| logger.error(f"Audit log write failed: {e}") | |
| def _send_whatsapp(to_number: str, message: str): | |
| """ | |
| Send a WhatsApp message via Twilio. | |
| to_number should be in E.164 format e.g. +263771234567 | |
| """ | |
| if not TWILIO_ACCOUNT_SID or not TWILIO_AUTH_TOKEN: | |
| logger.warning(f"Twilio not configured — skipping WhatsApp to {to_number}: {message[:60]}") | |
| return False | |
| try: | |
| url = f"https://api.twilio.com/2010-04-01/Accounts/{TWILIO_ACCOUNT_SID}/Messages.json" | |
| data = { | |
| "From": TWILIO_WHATSAPP_NUMBER, | |
| "To": f"whatsapp:{to_number}", | |
| "Body": message, | |
| } | |
| r = requests.post(url, data=data, auth=(TWILIO_ACCOUNT_SID, TWILIO_AUTH_TOKEN), timeout=10) | |
| r.raise_for_status() | |
| logger.info(f"WhatsApp sent to {to_number}: {message[:60]}") | |
| return True | |
| except Exception as e: | |
| logger.error(f"WhatsApp send failed to {to_number}: {e}") | |
| return False | |
| def _push_notification(org_id, uid, title, message, ntype="info", meta=None): | |
| try: | |
| ref = db_ref.child(f"notifications/{org_id}/{uid}").push() | |
| ref.set({ | |
| "notificationId": ref.key, | |
| "title": title, | |
| "message": message, | |
| "type": ntype, | |
| "meta": meta or {}, | |
| "read": False, | |
| "createdAt": _now_iso(), | |
| }) | |
| except Exception as e: | |
| logger.error(f"Push notification failed for {uid}: {e}") | |
| # ============================================================================= | |
| # 3. GEMINI FUNCTIONS | |
| # ============================================================================= | |
| def gemini_synthesize_kpi(natural_language_input: str, role_name: str) -> dict: | |
| prompt = f""" | |
| You are configuring a KPI framework for an employee role. | |
| Role: "{role_name}" | |
| Admin input: "{natural_language_input}" | |
| Extract KPI categories and their percentage weights. Weights must sum to 100. | |
| Return ONLY valid JSON: | |
| {{ | |
| "role": "{role_name}", | |
| "kpi_categories": [ | |
| {{"name": "<category name>", "weight": <int>, "description": "<one sentence rationale>"}} | |
| ], | |
| "synthesized_from": "{natural_language_input[:200]}" | |
| }} | |
| Rules: 3-7 categories. Weights are integers summing to exactly 100. | |
| """ | |
| try: | |
| r = gemini_client.models.generate_content(model=MODEL_NAME, contents=prompt) | |
| raw = r.text.strip().lstrip("```json").lstrip("```").rstrip("```").strip() | |
| return json.loads(raw) | |
| except Exception as e: | |
| logger.error(f"KPI synthesis failed: {e}") | |
| raise | |
| def gemini_synthesize_task(natural_language_input: str, org_context: str, kpi_categories: list) -> dict: | |
| cats = ", ".join(kpi_categories) if kpi_categories else "general" | |
| prompt = f""" | |
| You are creating a structured task from a manager's natural language description. | |
| Organization context: "{org_context}" | |
| KPI categories available: {cats} | |
| Manager input: "{natural_language_input}" | |
| Return ONLY valid JSON: | |
| {{ | |
| "title": "<concise task title>", | |
| "description": "<full task description>", | |
| "kpi_category": "<best matching category from the list>", | |
| "priority": "<high|medium|low>", | |
| "suggested_due_days": <int days from today>, | |
| "estimated_hours": <float> | |
| }} | |
| """ | |
| try: | |
| r = gemini_client.models.generate_content(model=MODEL_NAME, contents=prompt) | |
| raw = r.text.strip().lstrip("```json").lstrip("```").rstrip("```").strip() | |
| return json.loads(raw) | |
| except Exception as e: | |
| logger.error(f"Task synthesis failed: {e}") | |
| raise | |
| def gemini_extract_submission( | |
| submission_text: str, | |
| kpi_categories: list, | |
| role_name: str, | |
| kpi_weights: dict, | |
| ) -> dict: | |
| cats_json = json.dumps(kpi_categories) | |
| weights_json = json.dumps(kpi_weights) | |
| prompt = f""" | |
| You are an AI performance analyst processing an employee's weekly activity report. | |
| Role: "{role_name}" | |
| KPI categories: {cats_json} | |
| Target KPI weights (%): {weights_json} | |
| Employee submission: | |
| "{submission_text}" | |
| Analyze and return ONLY valid JSON: | |
| {{ | |
| "structured_activities": [ | |
| {{"activity": "<description>", "kpi_category": "<category>", "estimated_percent": <int>}} | |
| ], | |
| "actual_allocation": {{ "<category>": <percent int> }}, | |
| "alignment_score": <0-100 int>, | |
| "alignment_notes": "<2-3 sentence analysis of how well time allocation matches targets>", | |
| "success_events": ["<activity with likely revenue/strategic impact>"], | |
| "role_drift_detected": <true|false>, | |
| "role_drift_detail": "<null or brief description of drift>", | |
| "summary": "<one paragraph summary of the week>" | |
| }} | |
| Rules: | |
| - actual_allocation values must sum to 100 | |
| - alignment_score: 100 = perfect match to target weights, 0 = complete misalignment | |
| - role_drift: true if >20% of time on activities outside the role's defined KPI categories | |
| """ | |
| try: | |
| r = gemini_client.models.generate_content(model=MODEL_NAME, contents=prompt) | |
| raw = r.text.strip().lstrip("```json").lstrip("```").rstrip("```").strip() | |
| return json.loads(raw) | |
| except Exception as e: | |
| logger.error(f"Submission extraction failed: {e}") | |
| raise | |
| def gemini_check_submission_alignment_with_goals( | |
| submission_text: str, | |
| org_goals: str, | |
| kpi_categories: list, | |
| ) -> bool: | |
| """ | |
| Returns True if the submission meaningfully addresses work aligned with org goals/KPIs. | |
| Used by the WhatsApp weekly prompt scheduler to determine if an employee has already | |
| reported in a meaningful way this week. | |
| """ | |
| prompt = f""" | |
| Organization goals/KPIs: "{org_goals}" | |
| KPI categories: {json.dumps(kpi_categories)} | |
| Employee message or activity description: | |
| "{submission_text}" | |
| Does this message constitute a meaningful weekly work update that addresses the organization's | |
| goals or KPI categories? Answer with ONLY a JSON object: | |
| {{"is_meaningful_update": <true|false>, "confidence": "<high|medium|low>", "reason": "<one sentence>"}} | |
| """ | |
| try: | |
| r = gemini_client.models.generate_content(model=MODEL_NAME, contents=prompt) | |
| raw = r.text.strip().lstrip("```json").lstrip("```").rstrip("```").strip() | |
| result = json.loads(raw) | |
| return result.get("is_meaningful_update", False) | |
| except Exception as e: | |
| logger.error(f"Goal alignment check failed: {e}") | |
| return False | |
| def gemini_extract_expense(description_text: str, receipt_url: str = None) -> dict: | |
| prompt = f""" | |
| Extract expense details from this employee description. | |
| Description: "{description_text}" | |
| {"Receipt image URL: " + receipt_url if receipt_url else ""} | |
| Return ONLY valid JSON: | |
| {{ | |
| "vendor": "<vendor or payee name>", | |
| "amount": <float>, | |
| "currency": "<USD|ZWL|ZAR|other>", | |
| "category": "<meals|transport|accommodation|office_supplies|client_entertainment|other>", | |
| "date": "<YYYY-MM-DD or null>", | |
| "description": "<clean expense description>", | |
| "is_client_related": <true|false> | |
| }} | |
| """ | |
| try: | |
| r = gemini_client.models.generate_content(model=MODEL_NAME, contents=prompt) | |
| raw = r.text.strip().lstrip("```json").lstrip("```").rstrip("```").strip() | |
| return json.loads(raw) | |
| except Exception as e: | |
| logger.error(f"Expense extraction failed: {e}") | |
| raise | |
| def gemini_process_leave_request(natural_language_input: str, employee_name: str) -> dict: | |
| today = date.today().isoformat() | |
| prompt = f""" | |
| Today's date: {today} | |
| Employee name: "{employee_name}" | |
| Employee leave request: "{natural_language_input}" | |
| Extract leave request details and return ONLY valid JSON: | |
| {{ | |
| "leave_type": "<annual|sick|maternity|paternity|unpaid|study|compassionate>", | |
| "start_date": "<YYYY-MM-DD>", | |
| "end_date": "<YYYY-MM-DD>", | |
| "days_requested": <int>, | |
| "reason": "<employee's stated reason>", | |
| "is_urgent": <true|false>, | |
| "notes": "<any additional notes extracted>" | |
| }} | |
| Rules: | |
| - If dates are relative ("next Monday", "tomorrow") resolve from today's date. | |
| - If leave type is ambiguous, infer from context (sick = illness, annual = vacation etc). | |
| - days_requested should exclude weekends unless specifically stated. | |
| """ | |
| try: | |
| r = gemini_client.models.generate_content(model=MODEL_NAME, contents=prompt) | |
| raw = r.text.strip().lstrip("```json").lstrip("```").rstrip("```").strip() | |
| return json.loads(raw) | |
| except Exception as e: | |
| logger.error(f"Leave request extraction failed: {e}") | |
| raise | |
| def gemini_process_meeting_recording( | |
| transcript: str, | |
| meeting_title: str, | |
| attendees: list, | |
| org_context: str, | |
| ) -> dict: | |
| prompt = f""" | |
| You are processing a client meeting recording transcript. | |
| Meeting: "{meeting_title}" | |
| Organization context: "{org_context}" | |
| Attendees: {json.dumps(attendees)} | |
| Transcript: | |
| "{transcript}" | |
| Produce a structured meeting intelligence report. Return ONLY valid JSON: | |
| {{ | |
| "executive_summary": "<3-4 sentence overview of what was discussed and decided>", | |
| "key_decisions": ["<decision>"], | |
| "action_items": [ | |
| {{"action": "<what needs to happen>", "owner": "<name or role>", "due_date": "<YYYY-MM-DD or null>", "priority": "<high|medium|low>"}} | |
| ], | |
| "client_sentiment": "<positive|neutral|negative|mixed>", | |
| "follow_up_required": <true|false>, | |
| "risks_identified": ["<risk>"], | |
| "opportunities_identified": ["<opportunity>"], | |
| "next_meeting_suggested": <true|false>, | |
| "employee_summary": "<paragraph for the employee who attended — personal context>", | |
| "manager_summary": "<paragraph for the department head — strategic context>" | |
| }} | |
| """ | |
| try: | |
| r = gemini_client.models.generate_content(model=MODEL_NAME, contents=prompt) | |
| raw = r.text.strip().lstrip("```json").lstrip("```").rstrip("```").strip() | |
| return json.loads(raw) | |
| except Exception as e: | |
| logger.error(f"Meeting processing failed: {e}") | |
| raise | |
| def gemini_process_general_whatsapp_message( | |
| message: str, | |
| employee_name: str, | |
| org_context: str, | |
| conversation_state: str, | |
| ) -> dict: | |
| """ | |
| Routes an incoming WhatsApp message to the correct intent. | |
| Returns intent + extracted data. | |
| """ | |
| prompt = f""" | |
| You are IrisTrack, an AI employee assistant for "{org_context}". | |
| Employee: "{employee_name}" | |
| Current conversation state: "{conversation_state}" | |
| Employee message: "{message}" | |
| Classify the intent and extract data. Return ONLY valid JSON: | |
| {{ | |
| "intent": "<weekly_update|leave_request|expense_submission|meeting_update|general_query|greeting|other>", | |
| "confidence": "<high|medium|low>", | |
| "extracted_data": {{<relevant data or empty object>}}, | |
| "reply_to_employee": "<friendly, concise reply to send back via WhatsApp — max 2 sentences>", | |
| "needs_followup": <true|false>, | |
| "followup_question": "<null or question to ask employee>" | |
| }} | |
| Intents: | |
| - weekly_update: employee describing their work activities | |
| - leave_request: requesting time off | |
| - expense_submission: submitting an expense claim | |
| - meeting_update: updating on a client meeting | |
| - general_query: asking a question about policies, leave balance, tasks etc | |
| - greeting: hi, hello, hey etc | |
| - other: anything else | |
| """ | |
| try: | |
| r = gemini_client.models.generate_content(model=MODEL_NAME, contents=prompt) | |
| raw = r.text.strip().lstrip("```json").lstrip("```").rstrip("```").strip() | |
| return json.loads(raw) | |
| except Exception as e: | |
| logger.error(f"WhatsApp intent classification failed: {e}") | |
| return { | |
| "intent": "other", | |
| "confidence": "low", | |
| "extracted_data": {}, | |
| "reply_to_employee": "I received your message. I'll make sure it's noted.", | |
| "needs_followup": False, | |
| "followup_question": None, | |
| } | |
| # ============================================================================= | |
| # 4. ASSEMBLYAI TRANSCRIPTION | |
| # ============================================================================= | |
| def transcribe_audio_url(audio_url: str, language_code: str = "en") -> str: | |
| """ | |
| Transcribes an audio file from a URL using AssemblyAI. | |
| Returns the transcript text. | |
| Supports: voice notes from WhatsApp, web uploads. | |
| """ | |
| if not ASSEMBLYAI_API_KEY: | |
| raise RuntimeError("AssemblyAI API key not configured.") | |
| headers = {"authorization": ASSEMBLYAI_API_KEY, "content-type": "application/json"} | |
| # Submit transcription job | |
| submit_resp = requests.post( | |
| "https://api.assemblyai.com/v2/transcript", | |
| json={ | |
| "audio_url": audio_url, | |
| "language_code": language_code, | |
| "punctuate": True, | |
| "format_text": True, | |
| "speaker_labels": True, | |
| }, | |
| headers=headers, | |
| timeout=30, | |
| ) | |
| submit_resp.raise_for_status() | |
| transcript_id = submit_resp.json()["id"] | |
| logger.info(f"AssemblyAI transcript submitted: {transcript_id}") | |
| # Poll for completion | |
| polling_url = f"https://api.assemblyai.com/v2/transcript/{transcript_id}" | |
| max_attempts = 60 | |
| for attempt in range(max_attempts): | |
| time.sleep(3) | |
| poll_resp = requests.get(polling_url, headers=headers, timeout=15) | |
| poll_resp.raise_for_status() | |
| result = poll_resp.json() | |
| status = result.get("status") | |
| if status == "completed": | |
| logger.info(f"AssemblyAI transcript completed: {transcript_id}") | |
| return result.get("text", "") | |
| elif status == "error": | |
| raise RuntimeError(f"AssemblyAI error: {result.get('error')}") | |
| logger.info(f"AssemblyAI poll {attempt + 1}/{max_attempts}: {status}") | |
| raise TimeoutError("AssemblyAI transcription timed out.") | |
| def transcribe_audio_file(file_bytes: bytes, filename: str = "audio.ogg") -> str: | |
| """ | |
| Uploads audio bytes to AssemblyAI then transcribes. | |
| Used for web uploads where we have raw file data. | |
| """ | |
| if not ASSEMBLYAI_API_KEY: | |
| raise RuntimeError("AssemblyAI API key not configured.") | |
| headers = {"authorization": ASSEMBLYAI_API_KEY} | |
| # Upload | |
| upload_resp = requests.post( | |
| "https://api.assemblyai.com/v2/upload", | |
| headers={**headers, "content-type": "application/octet-stream"}, | |
| data=file_bytes, | |
| timeout=60, | |
| ) | |
| upload_resp.raise_for_status() | |
| upload_url = upload_resp.json()["upload_url"] | |
| logger.info(f"AssemblyAI upload complete: {upload_url[:60]}") | |
| return transcribe_audio_url(upload_url) | |
| # ============================================================================= | |
| # 5. ORGANIZATION & DEPARTMENT ENDPOINTS | |
| # ============================================================================= | |
| def register_organization(): | |
| """ | |
| Creates a new organization and sets the caller as org_admin. | |
| Body: { name, industry, country, email, password, displayName } | |
| """ | |
| try: | |
| data = request.get_json() or {} | |
| email = data.get("email") | |
| password = data.get("password") | |
| display_name = data.get("displayName", "") | |
| org_name = data.get("name", "").strip() | |
| industry = data.get("industry", "") | |
| country = data.get("country", "Zimbabwe") | |
| if not email or not password or not org_name: | |
| return jsonify({"error": "email, password, and organization name required."}), 400 | |
| # Create Firebase Auth user | |
| try: | |
| fb_user = auth.create_user(email=email, password=password, display_name=display_name) | |
| except Exception as e: | |
| if "EMAIL_EXISTS" in str(e): | |
| return jsonify({"error": "Email already registered."}), 409 | |
| raise | |
| org_id = str(uuid.uuid4()) | |
| # Write organization record | |
| org_data = { | |
| "orgId": org_id, | |
| "name": org_name, | |
| "industry": industry, | |
| "country": country, | |
| "createdAt": _now_iso(), | |
| "reporting_day": "friday", | |
| "reporting_time": "08:00", | |
| "org_goals": "", | |
| "active": True, | |
| } | |
| db_ref.child(f"organizations/{org_id}").set(org_data) | |
| # Write platform_orgs index for SuperAdmin | |
| db_ref.child(f"platform_orgs/{org_id}").set({ | |
| "orgId": org_id, | |
| "name": org_name, | |
| "adminUid": fb_user.uid, | |
| "createdAt": _now_iso(), | |
| }) | |
| # Write user record in org | |
| user_data = { | |
| "uid": fb_user.uid, | |
| "email": email, | |
| "displayName": display_name, | |
| "phone": data.get("phone", ""), | |
| "platform_role": "org_admin", | |
| "department_id": None, | |
| "position": "Organization Admin", | |
| "createdAt": _now_iso(), | |
| "active": True, | |
| "leave_balance": {"annual": 20, "sick": 10, "study": 5}, | |
| "whatsapp_state": "idle", | |
| } | |
| db_ref.child(f"users/{org_id}/{fb_user.uid}").set(user_data) | |
| _write_audit(org_id, fb_user.uid, "org_registered", {"org_name": org_name}) | |
| logger.info(f"Organization registered: {org_id} by {fb_user.uid}") | |
| return jsonify({"success": True, "orgId": org_id, "uid": fb_user.uid, **org_data}), 201 | |
| except Exception as e: | |
| logger.error(f"org_register failed: {e}\n{traceback.format_exc()}") | |
| return jsonify({"error": str(e)}), 500 | |
| def get_organization(org_id): | |
| try: | |
| uid, user_data = verify_org_member(request.headers.get("Authorization"), org_id) | |
| org = get_org(org_id) | |
| if not org: | |
| return jsonify({"error": "Organization not found."}), 404 | |
| return jsonify(org), 200 | |
| except PermissionError as e: | |
| return jsonify({"error": str(e)}), 403 | |
| def update_organization(org_id): | |
| """ | |
| Update org settings: name, industry, reporting_day, reporting_time, org_goals. | |
| """ | |
| try: | |
| uid, role, _ = verify_org_admin(request.headers.get("Authorization"), org_id) | |
| if role != "org_admin": | |
| return jsonify({"error": "Only org_admin can update organization settings."}), 403 | |
| data = request.get_json() or {} | |
| allowed = ["name", "industry", "country", "reporting_day", "reporting_time", "org_goals", "logo_url"] | |
| update = {k: v for k, v in data.items() if k in allowed} | |
| if not update: | |
| return jsonify({"error": "No valid fields to update."}), 400 | |
| db_ref.child(f"organizations/{org_id}").update(update) | |
| _write_audit(org_id, uid, "org_updated", update) | |
| return jsonify({"success": True}), 200 | |
| except PermissionError as e: | |
| return jsonify({"error": str(e)}), 403 | |
| def create_department(org_id): | |
| try: | |
| uid, role, _ = verify_org_admin(request.headers.get("Authorization"), org_id) | |
| data = request.get_json() or {} | |
| name = data.get("name", "").strip() | |
| if not name: | |
| return jsonify({"error": "Department name required."}), 400 | |
| dept_id = str(uuid.uuid4()) | |
| dept_data = { | |
| "deptId": dept_id, | |
| "name": name, | |
| "description": data.get("description", ""), | |
| "createdAt": _now_iso(), | |
| "createdBy": uid, | |
| } | |
| db_ref.child(f"departments/{org_id}/{dept_id}").set(dept_data) | |
| _write_audit(org_id, uid, "department_created", {"dept_name": name, "dept_id": dept_id}) | |
| return jsonify(dept_data), 201 | |
| except PermissionError as e: | |
| return jsonify({"error": str(e)}), 403 | |
| def list_departments(org_id): | |
| try: | |
| uid, user_data = verify_org_member(request.headers.get("Authorization"), org_id) | |
| depts = db_ref.child(f"departments/{org_id}").get() or {} | |
| return jsonify(list(depts.values())), 200 | |
| except PermissionError as e: | |
| return jsonify({"error": str(e)}), 403 | |
| # ============================================================================= | |
| # 6. USER MANAGEMENT ENDPOINTS | |
| # ============================================================================= | |
| def add_user(org_id): | |
| """ | |
| Admin adds a user to the org. | |
| Body: { email, password, displayName, phone, position, department_id, platform_role } | |
| platform_role: employee | dept_admin | |
| """ | |
| try: | |
| uid, role, _ = verify_org_admin(request.headers.get("Authorization"), org_id) | |
| data = request.get_json() or {} | |
| email = data.get("email") | |
| password = data.get("password") | |
| display_name = data.get("displayName", "") | |
| phone = data.get("phone", "") | |
| position = data.get("position", "") | |
| dept_id = data.get("department_id") | |
| member_role = data.get("platform_role", "employee") | |
| if not email or not password: | |
| return jsonify({"error": "email and password required."}), 400 | |
| if member_role not in ("employee", "dept_admin"): | |
| member_role = "employee" | |
| try: | |
| fb_user = auth.create_user(email=email, password=password, display_name=display_name) | |
| except Exception as e: | |
| if "EMAIL_EXISTS" in str(e): | |
| # User already exists in Firebase Auth — look them up and add to this org. | |
| # This is intentional: an employee may have an account from another org | |
| # or a pre-existing Firebase account. We enrol them here explicitly. | |
| try: | |
| fb_user = auth.get_user_by_email(email) | |
| logger.info(f"add_user | Email {email} exists in Auth — enrolling uid={fb_user.uid} into org={org_id}") | |
| except Exception as lookup_err: | |
| logger.error(f"add_user | Could not look up existing user {email}: {lookup_err}") | |
| return jsonify({"error": "Email already registered and could not be retrieved."}), 409 | |
| else: | |
| raise | |
| user_data = { | |
| "uid": fb_user.uid, | |
| "email": email, | |
| "displayName": display_name, | |
| "phone": phone, | |
| "platform_role": member_role, | |
| "department_id": dept_id, | |
| "position": position, | |
| "createdAt": _now_iso(), | |
| "addedBy": uid, | |
| "active": True, | |
| "leave_balance": {"annual": 20, "sick": 10, "study": 5}, | |
| "whatsapp_state": "idle", | |
| } | |
| db_ref.child(f"users/{org_id}/{fb_user.uid}").set(user_data) | |
| # Welcome via WhatsApp if phone provided | |
| if phone: | |
| org = get_org(org_id) | |
| org_name = org.get("name", "your organization") if org else "your organization" | |
| _send_whatsapp( | |
| phone, | |
| f"Welcome to {org_name} on IrisTrack! 👋\n\n" | |
| f"You can update your weekly activities, request leave, submit expenses, " | |
| f"and more — just reply to this message.\n\n" | |
| f"I'll remind you on reporting day. Talk soon!" | |
| ) | |
| _write_audit(org_id, uid, "user_added", {"new_uid": fb_user.uid, "email": email}) | |
| return jsonify({"success": True, "uid": fb_user.uid, **user_data}), 201 | |
| except PermissionError as e: | |
| return jsonify({"error": str(e)}), 403 | |
| except Exception as e: | |
| logger.error(f"add_user failed: {e}\n{traceback.format_exc()}") | |
| return jsonify({"error": str(e)}), 500 | |
| def list_users(org_id): | |
| try: | |
| uid, role, _ = verify_org_admin(request.headers.get("Authorization"), org_id) | |
| users = db_ref.child(f"users/{org_id}").get() or {} | |
| result = list(users.values()) | |
| # Dept admins only see their department | |
| if role == "dept_admin": | |
| caller_data = db_ref.child(f"users/{org_id}/{uid}").get() or {} | |
| my_dept = caller_data.get("department_id") | |
| result = [u for u in result if u.get("department_id") == my_dept] | |
| dept_filter = request.args.get("department_id") | |
| if dept_filter: | |
| result = [u for u in result if u.get("department_id") == dept_filter] | |
| return jsonify(result), 200 | |
| except PermissionError as e: | |
| return jsonify({"error": str(e)}), 403 | |
| def get_user(org_id, target_uid): | |
| try: | |
| caller_uid, caller_data = verify_org_member(request.headers.get("Authorization"), org_id) | |
| # Users can only fetch their own record unless they are admin | |
| if caller_uid != target_uid and caller_data.get("platform_role") not in ("org_admin", "dept_admin"): | |
| return jsonify({"error": "Access denied."}), 403 | |
| user_data = db_ref.child(f"users/{org_id}/{target_uid}").get() | |
| if not user_data: | |
| return jsonify({"error": "User not found."}), 404 | |
| return jsonify(user_data), 200 | |
| except PermissionError as e: | |
| return jsonify({"error": str(e)}), 403 | |
| def update_user(org_id, target_uid): | |
| try: | |
| caller_uid, role, _ = verify_org_admin(request.headers.get("Authorization"), org_id) | |
| data = request.get_json() or {} | |
| allowed = ["displayName", "phone", "position", "department_id", "platform_role", "active", "leave_balance"] | |
| update = {k: v for k, v in data.items() if k in allowed} | |
| if not update: | |
| return jsonify({"error": "No valid fields to update."}), 400 | |
| user_ref = db_ref.child(f"users/{org_id}/{target_uid}") | |
| if not user_ref.get(): | |
| return jsonify({"error": "User not found."}), 404 | |
| user_ref.update(update) | |
| _write_audit(org_id, caller_uid, "user_updated", {"target_uid": target_uid, "fields": list(update.keys())}) | |
| return jsonify({"success": True}), 200 | |
| except PermissionError as e: | |
| return jsonify({"error": str(e)}), 403 | |
| def get_my_profile(org_id): | |
| try: | |
| uid, user_data = verify_org_member(request.headers.get("Authorization"), org_id) | |
| return jsonify({"uid": uid, **user_data}), 200 | |
| except PermissionError as e: | |
| return jsonify({"error": str(e)}), 403 | |
| # ============================================================================= | |
| # 7. KPI MODEL & TASK ENDPOINTS | |
| # ============================================================================= | |
| def create_kpi_model(org_id): | |
| """ | |
| Admin defines KPI model for a role via natural language. | |
| Body: { role_name, natural_language OR kpi_categories (manual override) } | |
| """ | |
| try: | |
| uid, role, _ = verify_org_admin(request.headers.get("Authorization"), org_id) | |
| data = request.get_json() or {} | |
| role_name = data.get("role_name", "").strip() | |
| if not role_name: | |
| return jsonify({"error": "role_name required."}), 400 | |
| if data.get("natural_language"): | |
| synthesized = gemini_synthesize_kpi(data["natural_language"], role_name) | |
| elif data.get("kpi_categories"): | |
| # Manual override — admin provides structured categories | |
| synthesized = { | |
| "role": role_name, | |
| "kpi_categories": data["kpi_categories"], | |
| "synthesized_from": "manual", | |
| } | |
| else: | |
| return jsonify({"error": "natural_language or kpi_categories required."}), 400 | |
| model_id = str(uuid.uuid4()) | |
| model_data = { | |
| "modelId": model_id, | |
| "orgId": org_id, | |
| "createdBy": uid, | |
| "createdAt": _now_iso(), | |
| **synthesized, | |
| } | |
| db_ref.child(f"kpi_models/{org_id}/{model_id}").set(model_data) | |
| _write_audit(org_id, uid, "kpi_model_created", {"role_name": role_name, "model_id": model_id}) | |
| return jsonify(model_data), 201 | |
| except PermissionError as e: | |
| return jsonify({"error": str(e)}), 403 | |
| except Exception as e: | |
| logger.error(f"create_kpi_model failed: {e}") | |
| return jsonify({"error": str(e)}), 500 | |
| def list_kpi_models(org_id): | |
| try: | |
| uid, user_data = verify_org_member(request.headers.get("Authorization"), org_id) | |
| models = db_ref.child(f"kpi_models/{org_id}").get() or {} | |
| return jsonify(list(models.values())), 200 | |
| except PermissionError as e: | |
| return jsonify({"error": str(e)}), 403 | |
| def preview_kpi_model(org_id): | |
| """ | |
| Gemini synthesises a KPI model from natural language and returns it for admin | |
| review WITHOUT saving it. Admin confirms by calling POST /kpi-models with the | |
| same payload (or the returned kpi_categories directly). | |
| Body: { role_name, natural_language } | |
| """ | |
| try: | |
| uid, role, _ = verify_org_admin(request.headers.get("Authorization"), org_id) | |
| data = request.get_json() or {} | |
| role_name = data.get("role_name", "").strip() | |
| nl = data.get("natural_language", "").strip() | |
| if not role_name or not nl: | |
| return jsonify({"error": "role_name and natural_language required."}), 400 | |
| synthesized = gemini_synthesize_kpi(nl, role_name) | |
| return jsonify({"preview": synthesized, "confirmed": False}), 200 | |
| except PermissionError as e: | |
| return jsonify({"error": str(e)}), 403 | |
| except Exception as e: | |
| logger.error(f"preview_kpi_model failed: {e}") | |
| return jsonify({"error": str(e)}), 500 | |
| def update_kpi_model(org_id, model_id): | |
| """ | |
| Update an existing KPI model. | |
| Body: { role_name?, kpi_categories?, natural_language? } | |
| If natural_language provided, Gemini re-synthesises. Otherwise accepts kpi_categories directly. | |
| """ | |
| try: | |
| uid, role, _ = verify_org_admin(request.headers.get("Authorization"), org_id) | |
| model_ref = db_ref.child(f"kpi_models/{org_id}/{model_id}") | |
| model_data = model_ref.get() | |
| if not model_data: | |
| return jsonify({"error": "KPI model not found."}), 404 | |
| data = request.get_json() or {} | |
| update = {"updatedAt": _now_iso(), "updatedBy": uid} | |
| if data.get("role_name"): | |
| update["role"] = data["role_name"] | |
| if data.get("natural_language"): | |
| synthesized = gemini_synthesize_kpi(data["natural_language"], data.get("role_name", model_data.get("role", ""))) | |
| update["kpi_categories"] = synthesized["kpi_categories"] | |
| update["synthesized_from"] = data["natural_language"] | |
| elif data.get("kpi_categories"): | |
| update["kpi_categories"] = data["kpi_categories"] | |
| update["synthesized_from"] = "manual" | |
| model_ref.update(update) | |
| _write_audit(org_id, uid, "kpi_model_updated", {"model_id": model_id}) | |
| return jsonify({**model_data, **update}), 200 | |
| except PermissionError as e: | |
| return jsonify({"error": str(e)}), 403 | |
| except Exception as e: | |
| logger.error(f"update_kpi_model failed: {e}") | |
| return jsonify({"error": str(e)}), 500 | |
| def delete_kpi_model(org_id, model_id): | |
| """ | |
| Delete a KPI model. Org admin only. | |
| Models in active use (referenced by submissions) are soft-deleted — marked inactive | |
| rather than removed, so historical alignment scores remain meaningful. | |
| """ | |
| try: | |
| uid, role, _ = verify_org_admin(request.headers.get("Authorization"), org_id) | |
| if role != "org_admin": | |
| return jsonify({"error": "Only org_admin can delete KPI models."}), 403 | |
| model_ref = db_ref.child(f"kpi_models/{org_id}/{model_id}") | |
| model_data = model_ref.get() | |
| if not model_data: | |
| return jsonify({"error": "KPI model not found."}), 404 | |
| # Check if any submission references this model | |
| all_subs = db_ref.child(f"submissions/{org_id}").get() or {} | |
| in_use = any( | |
| isinstance(week_sub, dict) and week_sub.get("kpiModelId") == model_id | |
| for user_subs in all_subs.values() if isinstance(user_subs, dict) | |
| for week_sub in user_subs.values() if isinstance(week_sub, dict) | |
| ) | |
| if in_use: | |
| # Soft delete — keep data, mark inactive | |
| model_ref.update({"active": False, "deletedAt": _now_iso(), "deletedBy": uid}) | |
| _write_audit(org_id, uid, "kpi_model_soft_deleted", {"model_id": model_id}) | |
| return jsonify({"success": True, "soft_deleted": True, "reason": "Model has historical submissions — marked inactive rather than removed."}), 200 | |
| model_ref.delete() | |
| _write_audit(org_id, uid, "kpi_model_deleted", {"model_id": model_id}) | |
| return jsonify({"success": True, "soft_deleted": False}), 200 | |
| except PermissionError as e: | |
| return jsonify({"error": str(e)}), 403 | |
| except Exception as e: | |
| logger.error(f"delete_kpi_model failed: {e}") | |
| return jsonify({"error": str(e)}), 500 | |
| def preview_task(org_id): | |
| """ | |
| Gemini structures a task from natural language and returns it for admin review | |
| WITHOUT saving. Admin confirms by calling POST /tasks with the same payload. | |
| Body: { natural_language, kpi_model_id? } | |
| """ | |
| try: | |
| uid, role, _ = verify_org_admin(request.headers.get("Authorization"), org_id) | |
| data = request.get_json() or {} | |
| nl = data.get("natural_language", "").strip() | |
| if not nl: | |
| return jsonify({"error": "natural_language required."}), 400 | |
| org = get_org(org_id) | |
| org_context = (org.get("name", "") + " — " + org.get("industry", "")) if org else org_id | |
| kpi_cats = [] | |
| if data.get("kpi_model_id"): | |
| model = db_ref.child(f"kpi_models/{org_id}/{data['kpi_model_id']}").get() | |
| if model: | |
| kpi_cats = [c["name"] for c in model.get("kpi_categories", [])] | |
| structured = gemini_synthesize_task(nl, org_context, kpi_cats) | |
| due_date = (date.today() + timedelta(days=structured.get("suggested_due_days", 7))).isoformat() | |
| return jsonify({"preview": {**structured, "resolved_due_date": due_date}, "confirmed": False}), 200 | |
| except PermissionError as e: | |
| return jsonify({"error": str(e)}), 403 | |
| except Exception as e: | |
| logger.error(f"preview_task failed: {e}") | |
| return jsonify({"error": str(e)}), 500 | |
| def create_task(org_id): | |
| """ | |
| Admin creates a task via natural language. Gemini structures it. | |
| Body: { natural_language, assigned_to_uid, kpi_model_id (optional), due_date (optional override) } | |
| """ | |
| try: | |
| uid, role, _ = verify_org_admin(request.headers.get("Authorization"), org_id) | |
| data = request.get_json() or {} | |
| nl = data.get("natural_language", "").strip() | |
| if not nl: | |
| return jsonify({"error": "natural_language required."}), 400 | |
| org = get_org(org_id) | |
| org_context = (org.get("name", "") + " — " + org.get("industry", "")) if org else org_id | |
| # Get KPI categories for context | |
| kpi_cats = [] | |
| if data.get("kpi_model_id"): | |
| model = db_ref.child(f"kpi_models/{org_id}/{data['kpi_model_id']}").get() | |
| if model: | |
| kpi_cats = [c["name"] for c in model.get("kpi_categories", [])] | |
| structured = gemini_synthesize_task(nl, org_context, kpi_cats) | |
| # Resolve due date | |
| due_date = data.get("due_date") | |
| if not due_date and structured.get("suggested_due_days"): | |
| due_date = (date.today() + timedelta(days=structured["suggested_due_days"])).isoformat() | |
| task_id = str(uuid.uuid4()) | |
| task_data = { | |
| "taskId": task_id, | |
| "orgId": org_id, | |
| "assignedTo": data.get("assigned_to_uid"), # canonical GET field | |
| "assigned_to_uid": data.get("assigned_to_uid"), # mirrors POST input contract | |
| "assignedBy": uid, | |
| "kpiModelId": data.get("kpi_model_id"), | |
| "dueDate": due_date, | |
| "status": "open", | |
| "createdAt": _now_iso(), | |
| "synthesized_from": nl, | |
| **structured, | |
| } | |
| db_ref.child(f"tasks/{org_id}/{task_id}").set(task_data) | |
| # Notify assigned user | |
| assigned_uid = data.get("assigned_to_uid") | |
| if assigned_uid: | |
| assigned_user = db_ref.child(f"users/{org_id}/{assigned_uid}").get() or {} | |
| _push_notification(org_id, assigned_uid, "New Task Assigned", structured.get("title", "New task"), "info", | |
| {"task_id": task_id}) | |
| if assigned_user.get("phone"): | |
| _send_whatsapp( | |
| assigned_user["phone"], | |
| f"📋 New task assigned to you:\n\n" | |
| f"*{structured.get('title', '')}*\n" | |
| f"{structured.get('description', '')}\n\n" | |
| f"Priority: {structured.get('priority', 'medium').upper()}\n" | |
| f"Due: {due_date or 'No due date'}" | |
| ) | |
| _write_audit(org_id, uid, "task_created", {"task_id": task_id, "title": structured.get("title")}) | |
| return jsonify(task_data), 201 | |
| except PermissionError as e: | |
| return jsonify({"error": str(e)}), 403 | |
| except Exception as e: | |
| logger.error(f"create_task failed: {e}") | |
| return jsonify({"error": str(e)}), 500 | |
| def list_tasks(org_id): | |
| try: | |
| uid, user_data = verify_org_member(request.headers.get("Authorization"), org_id) | |
| all_tasks = db_ref.child(f"tasks/{org_id}").get() or {} | |
| tasks = list(all_tasks.values()) | |
| role = user_data.get("platform_role", "employee") | |
| if role == "employee": | |
| # Employees see only their assigned tasks | |
| tasks = [t for t in tasks if t.get("assignedTo") == uid] | |
| elif role == "dept_admin": | |
| # Dept admins see tasks for their department members | |
| my_dept = user_data.get("department_id") | |
| dept_users = { | |
| u_uid for u_uid, u_data in (db_ref.child(f"users/{org_id}").get() or {}).items() | |
| if isinstance(u_data, dict) and u_data.get("department_id") == my_dept | |
| } | |
| tasks = [t for t in tasks if t.get("assignedTo") in dept_users] | |
| status_filter = request.args.get("status") | |
| if status_filter: | |
| tasks = [t for t in tasks if t.get("status") == status_filter] | |
| return jsonify(tasks), 200 | |
| except PermissionError as e: | |
| return jsonify({"error": str(e)}), 403 | |
| def update_task_status(org_id, task_id): | |
| """Employee updates their task status. Body: { status: open|in_progress|completed|blocked, notes }""" | |
| try: | |
| uid, user_data = verify_org_member(request.headers.get("Authorization"), org_id) | |
| data = request.get_json() or {} | |
| task_ref = db_ref.child(f"tasks/{org_id}/{task_id}") | |
| task_data = task_ref.get() | |
| if not task_data: | |
| return jsonify({"error": "Task not found."}), 404 | |
| role = user_data.get("platform_role", "employee") | |
| if role == "employee" and task_data.get("assignedTo") != uid: | |
| return jsonify({"error": "You can only update tasks assigned to you."}), 403 | |
| new_status = data.get("status") | |
| if new_status not in ("open", "in_progress", "completed", "blocked"): | |
| return jsonify({"error": "Invalid status."}), 400 | |
| update = {"status": new_status, "updatedAt": _now_iso()} | |
| if data.get("notes"): | |
| update["completion_notes"] = data["notes"] | |
| if new_status == "completed": | |
| update["completedAt"] = _now_iso() | |
| task_ref.update(update) | |
| _write_audit(org_id, uid, "task_updated", {"task_id": task_id, "new_status": new_status}) | |
| return jsonify({"success": True}), 200 | |
| except PermissionError as e: | |
| return jsonify({"error": str(e)}), 403 | |
| # ============================================================================= | |
| # 8. SUBMISSION ENDPOINTS | |
| # ============================================================================= | |
| def _process_submission_text(org_id: str, user_uid: str, text: str, week_id: str = None): | |
| """ | |
| Core submission processing logic. Used by both web and WhatsApp paths. | |
| Returns structured submission data — does NOT send any WhatsApp reply to the submitting employee. | |
| Notifies dept head / org admin instead. | |
| """ | |
| user_data = db_ref.child(f"users/{org_id}/{user_uid}").get() or {} | |
| if not week_id: | |
| week_id = _current_week_id() | |
| # Resolve KPI model for this user's role/position | |
| position = user_data.get("position", "") | |
| kpi_categories, kpi_weights, kpi_model_id = [], {}, None | |
| all_models = db_ref.child(f"kpi_models/{org_id}").get() or {} | |
| for mid, model in all_models.items(): | |
| if isinstance(model, dict) and model.get("role", "").lower() in position.lower(): | |
| kpi_model_id = mid | |
| cats = model.get("kpi_categories", []) | |
| kpi_categories = [c["name"] for c in cats] | |
| kpi_weights = {c["name"]: c["weight"] for c in cats} | |
| break | |
| # Fall back to all models combined if no role match | |
| if not kpi_categories and all_models: | |
| first_model = list(all_models.values())[0] | |
| cats = first_model.get("kpi_categories", []) | |
| kpi_categories = [c["name"] for c in cats] | |
| kpi_weights = {c["name"]: c["weight"] for c in cats} | |
| extracted = gemini_extract_submission(text, kpi_categories, position, kpi_weights) | |
| submission_id = str(uuid.uuid4()) | |
| submission_data = { | |
| "submissionId": submission_id, | |
| "orgId": org_id, | |
| "uid": user_uid, | |
| "weekId": week_id, | |
| "rawText": text, | |
| "kpiModelId": kpi_model_id, | |
| "submittedAt": _now_iso(), | |
| "source": "web", | |
| **extracted, | |
| } | |
| db_ref.child(f"submissions/{org_id}/{user_uid}/{week_id}").set(submission_data) | |
| db_ref.child(f"users/{org_id}/{user_uid}").update({ | |
| "last_submission_at": _now_iso(), | |
| "last_alignment_score": extracted.get("alignment_score"), | |
| }) | |
| # Notify dept head / org admin — NOT the employee | |
| dept_id = user_data.get("department_id") | |
| all_users = db_ref.child(f"users/{org_id}").get() or {} | |
| employee_name = user_data.get("displayName", user_uid) | |
| alignment_score = extracted.get("alignment_score", 0) | |
| drift_detected = extracted.get("role_drift_detected", False) | |
| drift_line = f"⚠️ Role drift detected: {extracted.get('role_drift_detail', '')}" if drift_detected else "✅ No role drift" | |
| admin_message = ( | |
| f"📊 Weekly update from *{employee_name}*\n\n" | |
| f"Alignment score: *{alignment_score}/100*\n" | |
| f"{drift_line}\n\n" | |
| f"Summary: {extracted.get('summary', '')[:200]}" | |
| ) | |
| for admin_uid, admin_data in all_users.items(): | |
| if not isinstance(admin_data, dict): | |
| continue | |
| admin_role = admin_data.get("platform_role", "employee") | |
| is_dept_head = admin_role == "dept_admin" and admin_data.get("department_id") == dept_id | |
| is_org_admin = admin_role == "org_admin" | |
| if is_dept_head or is_org_admin: | |
| _push_notification( | |
| org_id, admin_uid, | |
| f"Update from {employee_name}", | |
| f"Alignment: {alignment_score}/100. {'⚠️ Drift detected.' if drift_detected else ''}", | |
| "warning" if drift_detected else "info", | |
| {"submission_id": submission_id, "uid": user_uid, "week_id": week_id}, | |
| ) | |
| if admin_data.get("phone") and (drift_detected or alignment_score < 50): | |
| _send_whatsapp(admin_data["phone"], admin_message) | |
| return submission_data | |
| def submit_weekly_update(org_id): | |
| """ | |
| Web submission endpoint. | |
| Accepts text OR audio file. | |
| Body (multipart): { text?, audio_file?, week_id? } | |
| """ | |
| try: | |
| uid, user_data = verify_org_member(request.headers.get("Authorization"), org_id) | |
| week_id = request.form.get("week_id") or request.json.get("week_id") if request.is_json else None | |
| # Resolve text — audio takes priority, transcribed via AssemblyAI | |
| text = None | |
| audio_file = request.files.get("audio_file") | |
| if audio_file: | |
| audio_bytes = audio_file.read() | |
| text = transcribe_audio_file(audio_bytes, audio_file.filename or "audio.ogg") | |
| else: | |
| text = (request.form.get("text") or (request.get_json() or {}).get("text", "")).strip() | |
| if not text: | |
| return jsonify({"error": "text or audio_file required."}), 400 | |
| submission = _process_submission_text(org_id, uid, text, week_id) | |
| return jsonify(submission), 201 | |
| except PermissionError as e: | |
| return jsonify({"error": str(e)}), 403 | |
| except Exception as e: | |
| logger.error(f"submit_weekly_update failed: {e}\n{traceback.format_exc()}") | |
| return jsonify({"error": str(e)}), 500 | |
| def get_user_submissions(org_id, target_uid): | |
| """Get all submissions for a user. Employees can only fetch their own.""" | |
| try: | |
| caller_uid, caller_data = verify_org_member(request.headers.get("Authorization"), org_id) | |
| role = caller_data.get("platform_role", "employee") | |
| if caller_uid != target_uid and role not in ("org_admin", "dept_admin"): | |
| return jsonify({"error": "Access denied."}), 403 | |
| subs = db_ref.child(f"submissions/{org_id}/{target_uid}").get() or {} | |
| return jsonify(list(subs.values())), 200 | |
| except PermissionError as e: | |
| return jsonify({"error": str(e)}), 403 | |
| def get_week_submissions(org_id, week_id): | |
| """Admin: get all submissions for a given week across the org.""" | |
| try: | |
| uid, role, caller_data = verify_org_admin(request.headers.get("Authorization"), org_id) | |
| all_users = db_ref.child(f"users/{org_id}").get() or {} | |
| # Department admins are scoped to their own department. | |
| if role == "dept_admin": | |
| my_dept = caller_data.get("department_id") | |
| allowed_uids = { | |
| u for u, ud in all_users.items() | |
| if isinstance(ud, dict) and ud.get("department_id") == my_dept | |
| } | |
| else: | |
| allowed_uids = set(all_users.keys()) | |
| result = [] | |
| for user_uid in allowed_uids: | |
| sub = _find_week_submission(org_id, user_uid, week_id) | |
| if sub: | |
| # Ensure the requested weekId is surfaced even if stored under | |
| # a legacy key. | |
| sub = {**sub, "weekId": week_id} | |
| result.append(sub) | |
| return jsonify(result), 200 | |
| except PermissionError as e: | |
| return jsonify({"error": str(e)}), 403 | |
| # ============================================================================= | |
| # 9. LEAVE MANAGEMENT ENDPOINTS | |
| # ============================================================================= | |
| def _process_leave_request_text(org_id: str, user_uid: str, text: str) -> dict: | |
| user_data = db_ref.child(f"users/{org_id}/{user_uid}").get() or {} | |
| employee_name = user_data.get("displayName", user_uid) | |
| extracted = gemini_process_leave_request(text, employee_name) | |
| request_id = str(uuid.uuid4()) | |
| leave_data = { | |
| "requestId": request_id, | |
| "orgId": org_id, | |
| "uid": user_uid, | |
| "employeeName": employee_name, | |
| "status": "pending", | |
| "submittedAt": _now_iso(), | |
| "raw_request": text, | |
| **extracted, | |
| } | |
| db_ref.child(f"leave_requests/{org_id}/{request_id}").set(leave_data) | |
| # Notify admins | |
| dept_id = user_data.get("department_id") | |
| all_users = db_ref.child(f"users/{org_id}").get() or {} | |
| for admin_uid, admin_data in all_users.items(): | |
| if not isinstance(admin_data, dict): | |
| continue | |
| admin_role = admin_data.get("platform_role", "employee") | |
| is_dept_head = admin_role == "dept_admin" and admin_data.get("department_id") == dept_id | |
| is_org_admin = admin_role == "org_admin" | |
| if is_dept_head or is_org_admin: | |
| _push_notification( | |
| org_id, admin_uid, | |
| f"Leave Request — {employee_name}", | |
| f"{extracted.get('leave_type', '').title()} leave: {extracted.get('start_date')} to {extracted.get('end_date')} ({extracted.get('days_requested', '?')} days)", | |
| "warning" if extracted.get("is_urgent") else "info", | |
| {"request_id": request_id}, | |
| ) | |
| # Confirm to employee via push notification only (no WhatsApp response from this path) | |
| _push_notification( | |
| org_id, user_uid, | |
| "Leave Request Received", | |
| f"Your {extracted.get('leave_type', 'leave')} request has been submitted and is pending approval.", | |
| "info", | |
| {"request_id": request_id}, | |
| ) | |
| return leave_data | |
| def submit_leave_request(org_id): | |
| """ | |
| Web leave request. Body: { text (natural language) } OR { audio_file } | |
| """ | |
| try: | |
| uid, user_data = verify_org_member(request.headers.get("Authorization"), org_id) | |
| audio_file = request.files.get("audio_file") | |
| if audio_file: | |
| text = transcribe_audio_file(audio_file.read(), audio_file.filename or "audio.ogg") | |
| else: | |
| text = (request.form.get("text") or (request.get_json() or {}).get("text", "")).strip() | |
| if not text: | |
| return jsonify({"error": "text or audio_file required."}), 400 | |
| leave_data = _process_leave_request_text(org_id, uid, text) | |
| return jsonify(leave_data), 201 | |
| except PermissionError as e: | |
| return jsonify({"error": str(e)}), 403 | |
| except Exception as e: | |
| logger.error(f"submit_leave_request failed: {e}") | |
| return jsonify({"error": str(e)}), 500 | |
| def list_leave_requests(org_id): | |
| """Admins see all requests; employees see their own.""" | |
| try: | |
| caller_uid, caller_data = verify_org_member(request.headers.get("Authorization"), org_id) | |
| role = caller_data.get("platform_role", "employee") | |
| all_requests = db_ref.child(f"leave_requests/{org_id}").get() or {} | |
| requests_list = list(all_requests.values()) | |
| if role == "employee": | |
| requests_list = [r for r in requests_list if r.get("uid") == caller_uid] | |
| elif role == "dept_admin": | |
| my_dept = caller_data.get("department_id") | |
| all_users = db_ref.child(f"users/{org_id}").get() or {} | |
| dept_uids = {u for u, ud in all_users.items() if isinstance(ud, dict) and ud.get("department_id") == my_dept} | |
| requests_list = [r for r in requests_list if r.get("uid") in dept_uids] | |
| status_filter = request.args.get("status") | |
| if status_filter: | |
| requests_list = [r for r in requests_list if r.get("status") == status_filter] | |
| requests_list.sort(key=lambda r: r.get("submittedAt", ""), reverse=True) | |
| return jsonify(requests_list), 200 | |
| except PermissionError as e: | |
| return jsonify({"error": str(e)}), 403 | |
| def decide_leave_request(org_id, request_id): | |
| """ | |
| Admin approves or declines a leave request. | |
| Body: { decision: approved|declined, note?: str } | |
| On approval, leave balance is automatically decremented. | |
| """ | |
| try: | |
| admin_uid, role, _ = verify_org_admin(request.headers.get("Authorization"), org_id) | |
| data = request.get_json() or {} | |
| decision = data.get("decision") | |
| if decision not in ("approved", "declined"): | |
| return jsonify({"error": "decision must be approved or declined."}), 400 | |
| leave_ref = db_ref.child(f"leave_requests/{org_id}/{request_id}") | |
| leave_data = leave_ref.get() | |
| if not leave_data: | |
| return jsonify({"error": "Leave request not found."}), 404 | |
| if leave_data.get("status") != "pending": | |
| return jsonify({"error": "This request has already been decided."}), 409 | |
| update = { | |
| "status": decision, | |
| "decidedBy": admin_uid, | |
| "decidedAt": _now_iso(), | |
| "decision_note": data.get("note", ""), | |
| } | |
| leave_ref.update(update) | |
| employee_uid = leave_data.get("uid") | |
| # Decrement leave balance on approval | |
| if decision == "approved": | |
| leave_type = leave_data.get("leave_type", "annual") | |
| days_taken = leave_data.get("days_requested", 0) | |
| user_ref = db_ref.child(f"users/{org_id}/{employee_uid}") | |
| user_data = user_ref.get() or {} | |
| balance = user_data.get("leave_balance", {}) | |
| current_days = balance.get(leave_type, 0) | |
| new_balance = max(0, current_days - days_taken) | |
| balance[leave_type] = new_balance | |
| user_ref.update({"leave_balance": balance}) | |
| # Notify employee | |
| employee_data = db_ref.child(f"users/{org_id}/{employee_uid}").get() or {} | |
| emoji = "✅" if decision == "approved" else "❌" | |
| decision_text = "approved" if decision == "approved" else "declined" | |
| notif_msg = ( | |
| f"Your {leave_data.get('leave_type', 'leave')} request " | |
| f"({leave_data.get('start_date')} to {leave_data.get('end_date')}) " | |
| f"has been {decision_text}." | |
| f"{' Note: ' + data['note'] if data.get('note') else ''}" | |
| ) | |
| _push_notification(org_id, employee_uid, f"{emoji} Leave {decision_text.title()}", notif_msg, "success" if decision == "approved" else "warning") | |
| if employee_data.get("phone"): | |
| _send_whatsapp(employee_data["phone"], f"{emoji} {notif_msg}") | |
| _write_audit(org_id, admin_uid, f"leave_{decision}", {"request_id": request_id, "employee_uid": employee_uid}) | |
| return jsonify({"success": True, "decision": decision}), 200 | |
| except PermissionError as e: | |
| return jsonify({"error": str(e)}), 403 | |
| except Exception as e: | |
| logger.error(f"decide_leave_request failed: {e}") | |
| return jsonify({"error": str(e)}), 500 | |
| # ============================================================================= | |
| # 10. CLIENT MEETING ENDPOINTS | |
| # ============================================================================= | |
| def create_meeting(org_id): | |
| """ | |
| Schedule a client meeting. | |
| Body: { title, client_name, scheduled_at (ISO), attendee_uids[], notes? } | |
| """ | |
| try: | |
| uid, user_data = verify_org_member(request.headers.get("Authorization"), org_id) | |
| data = request.get_json() or {} | |
| title = data.get("title", "").strip() | |
| client_name = data.get("client_name", "").strip() | |
| scheduled_at = data.get("scheduled_at") | |
| attendee_uids = data.get("attendee_uids", [uid]) | |
| if not title or not client_name: | |
| return jsonify({"error": "title and client_name required."}), 400 | |
| if uid not in attendee_uids: | |
| attendee_uids.append(uid) | |
| meeting_id = str(uuid.uuid4()) | |
| meeting_data = { | |
| "meetingId": meeting_id, | |
| "orgId": org_id, | |
| "title": title, | |
| "clientName": client_name, | |
| "scheduledAt": scheduled_at, | |
| "attendeeUids": attendee_uids, | |
| "notes": data.get("notes", ""), | |
| "status": "scheduled", | |
| "createdBy": uid, | |
| "createdAt": _now_iso(), | |
| } | |
| db_ref.child(f"client_meetings/{org_id}/{meeting_id}").set(meeting_data) | |
| # Notify all attendees | |
| for attendee_uid in attendee_uids: | |
| attendee = db_ref.child(f"users/{org_id}/{attendee_uid}").get() or {} | |
| _push_notification(org_id, attendee_uid, f"Meeting Scheduled: {title}", f"Client: {client_name} | {scheduled_at or 'TBD'}", "info", {"meeting_id": meeting_id}) | |
| if attendee.get("phone") and attendee_uid != uid: | |
| _send_whatsapp( | |
| attendee["phone"], | |
| f"📅 Meeting scheduled: *{title}*\n" | |
| f"Client: {client_name}\n" | |
| f"When: {scheduled_at or 'TBD'}\n\n" | |
| f"After the meeting, send the recording audio here for automatic summarization." | |
| ) | |
| _write_audit(org_id, uid, "meeting_created", {"meeting_id": meeting_id, "title": title}) | |
| return jsonify(meeting_data), 201 | |
| except PermissionError as e: | |
| return jsonify({"error": str(e)}), 403 | |
| except Exception as e: | |
| logger.error(f"create_meeting failed: {e}") | |
| return jsonify({"error": str(e)}), 500 | |
| def upload_meeting_recording(org_id, meeting_id): | |
| """ | |
| Upload a meeting recording for AI processing. | |
| Accepts: audio_file (multipart) OR audio_url (JSON body). | |
| Gemini produces transcript summary, action items, and dual summaries | |
| (one for the employee, one for the manager/dept head). | |
| """ | |
| try: | |
| uid, user_data = verify_org_member(request.headers.get("Authorization"), org_id) | |
| meeting_ref = db_ref.child(f"client_meetings/{org_id}/{meeting_id}") | |
| meeting_data = meeting_ref.get() | |
| if not meeting_data: | |
| return jsonify({"error": "Meeting not found."}), 404 | |
| if uid not in meeting_data.get("attendeeUids", []): | |
| return jsonify({"error": "You are not an attendee of this meeting."}), 403 | |
| # Transcribe | |
| audio_file = request.files.get("audio_file") | |
| if audio_file: | |
| transcript = transcribe_audio_file(audio_file.read(), audio_file.filename or "audio.ogg") | |
| elif request.is_json and request.get_json().get("audio_url"): | |
| transcript = transcribe_audio_url(request.get_json()["audio_url"]) | |
| else: | |
| return jsonify({"error": "audio_file or audio_url required."}), 400 | |
| if not transcript or len(transcript.strip()) < 20: | |
| return jsonify({"error": "Transcript too short or empty. Check audio quality."}), 422 | |
| # Resolve attendee names | |
| attendee_names = [] | |
| for attendee_uid in meeting_data.get("attendeeUids", []): | |
| attendee = db_ref.child(f"users/{org_id}/{attendee_uid}").get() or {} | |
| if attendee.get("displayName"): | |
| attendee_names.append(attendee["displayName"]) | |
| org = get_org(org_id) | |
| org_context = org.get("name", org_id) if org else org_id | |
| intelligence = gemini_process_meeting_recording( | |
| transcript, | |
| meeting_data["title"], | |
| attendee_names, | |
| org_context, | |
| ) | |
| update = { | |
| "status": "completed", | |
| "completedAt": _now_iso(), | |
| "transcript": transcript, | |
| "intelligence": intelligence, | |
| } | |
| meeting_ref.update(update) | |
| # Distribute summaries — each attendee gets their personal summary | |
| for attendee_uid in meeting_data.get("attendeeUids", []): | |
| attendee = db_ref.child(f"users/{org_id}/{attendee_uid}").get() or {} | |
| _push_notification( | |
| org_id, attendee_uid, | |
| f"Meeting Summary Ready: {meeting_data['title']}", | |
| intelligence.get("executive_summary", "")[:120], | |
| "success", | |
| {"meeting_id": meeting_id}, | |
| ) | |
| if attendee.get("phone"): | |
| action_items_text = "\n".join( | |
| f"• {ai['action']} ({ai.get('owner', '?')}) — {ai.get('due_date', 'TBD')}" | |
| for ai in intelligence.get("action_items", [])[:5] | |
| ) | |
| _send_whatsapp( | |
| attendee["phone"], | |
| f"📋 *{meeting_data['title']} — Meeting Summary*\n\n" | |
| f"{intelligence.get('employee_summary', intelligence.get('executive_summary', ''))[:400]}\n\n" | |
| f"*Action Items:*\n{action_items_text or 'None identified'}" | |
| ) | |
| # Department head / org admin gets the manager summary | |
| dept_id = user_data.get("department_id") | |
| all_users = db_ref.child(f"users/{org_id}").get() or {} | |
| for admin_uid, admin_data in all_users.items(): | |
| if not isinstance(admin_data, dict): | |
| continue | |
| admin_role = admin_data.get("platform_role", "employee") | |
| is_dept_head = admin_role == "dept_admin" and admin_data.get("department_id") == dept_id | |
| is_org_admin = admin_role == "org_admin" | |
| if (is_dept_head or is_org_admin) and admin_uid not in meeting_data.get("attendeeUids", []): | |
| _push_notification( | |
| org_id, admin_uid, | |
| f"Meeting Intelligence: {meeting_data['title']}", | |
| intelligence.get("manager_summary", "")[:120], | |
| "info", | |
| {"meeting_id": meeting_id}, | |
| ) | |
| if admin_data.get("phone"): | |
| _send_whatsapp( | |
| admin_data["phone"], | |
| f"🤝 *{meeting_data['title']} — Manager Brief*\n\n" | |
| f"{intelligence.get('manager_summary', '')[:500]}\n\n" | |
| f"Sentiment: {intelligence.get('client_sentiment', 'N/A').upper()}\n" | |
| f"Follow-up required: {'Yes' if intelligence.get('follow_up_required') else 'No'}" | |
| ) | |
| _write_audit(org_id, uid, "meeting_recording_processed", {"meeting_id": meeting_id}) | |
| return jsonify({"meetingId": meeting_id, **update}), 200 | |
| except PermissionError as e: | |
| return jsonify({"error": str(e)}), 403 | |
| except Exception as e: | |
| logger.error(f"upload_meeting_recording failed: {e}\n{traceback.format_exc()}") | |
| return jsonify({"error": str(e)}), 500 | |
| def list_meetings(org_id): | |
| try: | |
| uid, user_data = verify_org_member(request.headers.get("Authorization"), org_id) | |
| all_meetings = db_ref.child(f"client_meetings/{org_id}").get() or {} | |
| meetings = list(all_meetings.values()) | |
| role = user_data.get("platform_role", "employee") | |
| if role == "employee": | |
| meetings = [m for m in meetings if uid in m.get("attendeeUids", [])] | |
| status_filter = request.args.get("status") | |
| if status_filter: | |
| meetings = [m for m in meetings if m.get("status") == status_filter] | |
| meetings.sort(key=lambda m: m.get("scheduledAt", ""), reverse=True) | |
| return jsonify(meetings), 200 | |
| except PermissionError as e: | |
| return jsonify({"error": str(e)}), 403 | |
| def get_meeting(org_id, meeting_id): | |
| try: | |
| uid, user_data = verify_org_member(request.headers.get("Authorization"), org_id) | |
| meeting_data = db_ref.child(f"client_meetings/{org_id}/{meeting_id}").get() | |
| if not meeting_data: | |
| return jsonify({"error": "Meeting not found."}), 404 | |
| role = user_data.get("platform_role", "employee") | |
| if role == "employee" and uid not in meeting_data.get("attendeeUids", []): | |
| return jsonify({"error": "Access denied."}), 403 | |
| return jsonify(meeting_data), 200 | |
| except PermissionError as e: | |
| return jsonify({"error": str(e)}), 403 | |
| # ============================================================================= | |
| # 11. EXPENSE MANAGEMENT ENDPOINTS | |
| # ============================================================================= | |
| def _upload_receipt_image(org_id: str, expense_id: str, image_bytes: bytes, content_type: str) -> str: | |
| """ | |
| Uploads a receipt image to Firebase Storage and returns its public URL. | |
| Path: receipts/{orgId}/{expenseId}/{filename} | |
| Requires FIREBASE_STORAGE_BUCKET to be set. | |
| Falls back gracefully — if Storage is not configured, logs a warning and returns None. | |
| """ | |
| if not _fb_bucket: | |
| logger.warning("_upload_receipt_image | FIREBASE_STORAGE_BUCKET not set — skipping upload.") | |
| return None | |
| try: | |
| ext = content_type.split("/")[-1].replace("jpeg", "jpg") | |
| blob_path = f"receipts/{org_id}/{expense_id}/receipt.{ext}" | |
| bucket = storage.bucket() | |
| blob = bucket.blob(blob_path) | |
| blob.upload_from_string(image_bytes, content_type=content_type) | |
| blob.make_public() | |
| url = blob.public_url | |
| logger.info(f"Receipt uploaded: {url}") | |
| return url | |
| except Exception as e: | |
| logger.error(f"_upload_receipt_image failed for expense {expense_id}: {e}") | |
| return None | |
| def _process_expense_text(org_id: str, user_uid: str, text: str, receipt_url: str = None, expense_id: str = None) -> dict: | |
| extracted = gemini_extract_expense(text, receipt_url) | |
| expense_id = expense_id or str(uuid.uuid4()) # use pre-generated id (Storage upload) or mint new one | |
| user_data = db_ref.child(f"users/{org_id}/{user_uid}").get() or {} | |
| expense_data = { | |
| "expenseId": expense_id, | |
| "orgId": org_id, | |
| "uid": user_uid, | |
| "employeeName": user_data.get("displayName", user_uid), | |
| "department_id": user_data.get("department_id"), | |
| "receiptUrl": receipt_url, | |
| "rawText": text, | |
| "status": "pending", | |
| "submittedAt": _now_iso(), | |
| **extracted, | |
| } | |
| db_ref.child(f"expenses/{org_id}/{expense_id}").set(expense_data) | |
| # Check budget utilization | |
| dept_id = user_data.get("department_id") | |
| if dept_id and extracted.get("category"): | |
| budget_ref = db_ref.child(f"budgets/{org_id}/{dept_id}") | |
| budget_data = budget_ref.get() or {} | |
| cat_budget = budget_data.get(extracted["category"], {}) | |
| if isinstance(cat_budget, dict): | |
| allocated = float(cat_budget.get("allocated", 0)) | |
| spent = float(cat_budget.get("spent", 0)) | |
| new_spent = spent + float(extracted.get("amount", 0)) | |
| utilization = (new_spent / allocated * 100) if allocated > 0 else 0 | |
| expense_data["budget_utilization_pct"] = round(utilization, 1) | |
| # Notify admin | |
| all_users = db_ref.child(f"users/{org_id}").get() or {} | |
| employee_name = user_data.get("displayName", user_uid) | |
| for admin_uid, admin_data in all_users.items(): | |
| if not isinstance(admin_data, dict): | |
| continue | |
| admin_role = admin_data.get("platform_role", "employee") | |
| is_dept_head = admin_role == "dept_admin" and admin_data.get("department_id") == dept_id | |
| is_org_admin = admin_role == "org_admin" | |
| if is_dept_head or is_org_admin: | |
| _push_notification( | |
| org_id, admin_uid, | |
| f"Expense: {employee_name}", | |
| f"{extracted.get('currency', '')} {extracted.get('amount', '?')} — {extracted.get('category', '?')} — {extracted.get('vendor', '?')}", | |
| "info", | |
| {"expense_id": expense_id}, | |
| ) | |
| return expense_data | |
| def submit_expense(org_id): | |
| """ | |
| Accepts multipart/form-data or JSON. | |
| Fields: | |
| text — natural language description (required unless audio_file provided) | |
| audio_file — voice description (multipart); transcribed via AssemblyAI | |
| receipt_image — receipt photo (multipart); uploaded to Firebase Storage → sets receiptUrl | |
| receipt_url — fallback string URL if client handles its own upload | |
| Priority: receipt_image (uploaded here) > receipt_url (client-provided string) | |
| """ | |
| try: | |
| uid, user_data = verify_org_member(request.headers.get("Authorization"), org_id) | |
| # ── Resolve description text ────────────────────────────────────────── | |
| audio_file = request.files.get("audio_file") | |
| if audio_file: | |
| text = transcribe_audio_file(audio_file.read(), audio_file.filename or "audio.ogg") | |
| else: | |
| text = (request.form.get("text") or (request.get_json() or {}).get("text", "")).strip() | |
| if not text: | |
| return jsonify({"error": "text or audio_file required."}), 400 | |
| # ── Resolve receipt URL ─────────────────────────────────────────────── | |
| # Generate expense_id early so the Storage path is stable before DB write | |
| expense_id = str(uuid.uuid4()) | |
| receipt_url = None | |
| receipt_image = request.files.get("receipt_image") | |
| if receipt_image and receipt_image.filename: | |
| content_type = receipt_image.content_type or "image/jpeg" | |
| receipt_url = _upload_receipt_image(org_id, expense_id, receipt_image.read(), content_type) | |
| if not receipt_url: | |
| logger.warning(f"submit_expense | Receipt upload failed for {expense_id} — continuing without image.") | |
| if not receipt_url: | |
| # Fall back to client-provided URL | |
| receipt_url = request.form.get("receipt_url") or (request.get_json() or {}).get("receipt_url") | |
| expense_data = _process_expense_text(org_id, uid, text, receipt_url, expense_id=expense_id) | |
| return jsonify(expense_data), 201 | |
| except PermissionError as e: | |
| return jsonify({"error": str(e)}), 403 | |
| except Exception as e: | |
| logger.error(f"submit_expense failed: {e}") | |
| return jsonify({"error": str(e)}), 500 | |
| def list_expenses(org_id): | |
| try: | |
| caller_uid, caller_data = verify_org_member(request.headers.get("Authorization"), org_id) | |
| role = caller_data.get("platform_role", "employee") | |
| all_expenses = db_ref.child(f"expenses/{org_id}").get() or {} | |
| expenses = list(all_expenses.values()) | |
| if role == "employee": | |
| expenses = [e for e in expenses if e.get("uid") == caller_uid] | |
| elif role == "dept_admin": | |
| my_dept = caller_data.get("department_id") | |
| all_users = db_ref.child(f"users/{org_id}").get() or {} | |
| dept_uids = {u for u, ud in all_users.items() if isinstance(ud, dict) and ud.get("department_id") == my_dept} | |
| expenses = [e for e in expenses if e.get("uid") in dept_uids] | |
| status_filter = request.args.get("status") | |
| if status_filter: | |
| expenses = [e for e in expenses if e.get("status") == status_filter] | |
| expenses.sort(key=lambda e: e.get("submittedAt", ""), reverse=True) | |
| return jsonify(expenses), 200 | |
| except PermissionError as e: | |
| return jsonify({"error": str(e)}), 403 | |
| def decide_expense(org_id, expense_id): | |
| """Body: { decision: approved|flagged|rejected, note? }""" | |
| try: | |
| admin_uid, role, _ = verify_org_admin(request.headers.get("Authorization"), org_id) | |
| data = request.get_json() or {} | |
| decision = data.get("decision") | |
| if decision not in ("approved", "flagged", "rejected"): | |
| return jsonify({"error": "decision must be approved, flagged, or rejected."}), 400 | |
| expense_ref = db_ref.child(f"expenses/{org_id}/{expense_id}") | |
| expense_data = expense_ref.get() | |
| if not expense_data: | |
| return jsonify({"error": "Expense not found."}), 404 | |
| expense_ref.update({ | |
| "status": decision, | |
| "decidedBy": admin_uid, | |
| "decidedAt": _now_iso(), | |
| "decision_note": data.get("note", ""), | |
| }) | |
| # Update budget spent on approval | |
| if decision == "approved": | |
| dept_id = expense_data.get("department_id") | |
| cat = expense_data.get("category") | |
| amount = float(expense_data.get("amount", 0)) | |
| if dept_id and cat: | |
| budget_ref = db_ref.child(f"budgets/{org_id}/{dept_id}") | |
| budget_data = budget_ref.get() or {} | |
| cat_budget = budget_data.get(cat, {"allocated": 0, "spent": 0}) | |
| if isinstance(cat_budget, dict): | |
| cat_budget["spent"] = float(cat_budget.get("spent", 0)) + amount | |
| budget_data[cat] = cat_budget | |
| budget_ref.set(budget_data) | |
| # Threshold alert at 80% | |
| allocated = float(cat_budget.get("allocated", 0)) | |
| if allocated > 0 and cat_budget["spent"] / allocated >= 0.8: | |
| _push_notification( | |
| org_id, admin_uid, | |
| f"Budget Alert: {cat}", | |
| f"Department {dept_id} has used {round(cat_budget['spent']/allocated*100)}% of {cat} budget.", | |
| "warning", | |
| ) | |
| # Notify employee | |
| employee_uid = expense_data.get("uid") | |
| employee_data = db_ref.child(f"users/{org_id}/{employee_uid}").get() or {} | |
| emoji_map = {"approved": "✅", "flagged": "⚠️", "rejected": "❌"} | |
| notif_msg = f"Your expense ({expense_data.get('currency', '')} {expense_data.get('amount', '?')} — {expense_data.get('vendor', '?')}) has been {decision}." | |
| _push_notification(org_id, employee_uid, f"{emoji_map.get(decision, '')} Expense {decision.title()}", notif_msg, "success" if decision == "approved" else "warning") | |
| if employee_data.get("phone"): | |
| _send_whatsapp(employee_data["phone"], f"{emoji_map.get(decision, '')} {notif_msg}") | |
| _write_audit(org_id, admin_uid, f"expense_{decision}", {"expense_id": expense_id}) | |
| return jsonify({"success": True}), 200 | |
| except PermissionError as e: | |
| return jsonify({"error": str(e)}), 403 | |
| except Exception as e: | |
| logger.error(f"decide_expense failed: {e}") | |
| return jsonify({"error": str(e)}), 500 | |
| # ============================================================================= | |
| # 12. BUDGET MANAGEMENT ENDPOINTS | |
| # ============================================================================= | |
| def set_department_budget(org_id, dept_id): | |
| """ | |
| Admin sets budget allocations for a department. | |
| Body: { budgets: { "meals": { "allocated": 500, "spent": 0 }, ... } } | |
| """ | |
| try: | |
| uid, role, _ = verify_org_admin(request.headers.get("Authorization"), org_id) | |
| data = request.get_json() or {} | |
| budgets = data.get("budgets") | |
| if not budgets or not isinstance(budgets, dict): | |
| return jsonify({"error": "budgets object required."}), 400 | |
| db_ref.child(f"budgets/{org_id}/{dept_id}").update(budgets) | |
| _write_audit(org_id, uid, "budget_set", {"dept_id": dept_id, "categories": list(budgets.keys())}) | |
| return jsonify({"success": True}), 200 | |
| except PermissionError as e: | |
| return jsonify({"error": str(e)}), 403 | |
| def get_department_budget(org_id, dept_id): | |
| try: | |
| uid, role, caller_data = verify_org_admin(request.headers.get("Authorization"), org_id) | |
| budget_data = db_ref.child(f"budgets/{org_id}/{dept_id}").get() or {} | |
| return jsonify(budget_data), 200 | |
| except PermissionError as e: | |
| return jsonify({"error": str(e)}), 403 | |
| # ============================================================================= | |
| # 13. NOTIFICATION ENDPOINTS | |
| # ============================================================================= | |
| def get_notifications(org_id): | |
| try: | |
| uid, user_data = verify_org_member(request.headers.get("Authorization"), org_id) | |
| raw = db_ref.child(f"notifications/{org_id}/{uid}").get() or {} | |
| items = sorted(raw.values(), key=lambda x: x.get("createdAt", ""), reverse=True) | |
| return jsonify({"notifications": items[:50]}), 200 | |
| except PermissionError as e: | |
| return jsonify({"error": str(e)}), 403 | |
| def mark_notification_read(org_id, notif_id): | |
| try: | |
| uid, _ = verify_org_member(request.headers.get("Authorization"), org_id) | |
| db_ref.child(f"notifications/{org_id}/{uid}/{notif_id}").update({"read": True}) | |
| return jsonify({"success": True}), 200 | |
| except PermissionError as e: | |
| return jsonify({"error": str(e)}), 403 | |
| def mark_all_notifications_read(org_id): | |
| """ | |
| Marks all unread notifications for the calling user as read in a single write. | |
| Far cheaper than fanning out individual PUTs from the frontend. | |
| """ | |
| try: | |
| uid, _ = verify_org_member(request.headers.get("Authorization"), org_id) | |
| notifs = db_ref.child(f"notifications/{org_id}/{uid}").get() or {} | |
| updates = { | |
| notif_id: {**notif_data, "read": True} | |
| for notif_id, notif_data in notifs.items() | |
| if isinstance(notif_data, dict) and not notif_data.get("read", False) | |
| } | |
| if updates: | |
| db_ref.child(f"notifications/{org_id}/{uid}").update(updates) | |
| return jsonify({"success": True, "marked": len(updates)}), 200 | |
| except PermissionError as e: | |
| return jsonify({"error": str(e)}), 403 | |
| except Exception as e: | |
| logger.error(f"mark_all_notifications_read failed: {e}") | |
| return jsonify({"error": str(e)}), 500 | |
| def broadcast_notification(org_id): | |
| """ | |
| Admin sends a broadcast to all org users or a department. | |
| Body: { title, message, target: all|department, department_id?, send_whatsapp: bool } | |
| """ | |
| try: | |
| uid, role, _ = verify_org_admin(request.headers.get("Authorization"), org_id) | |
| data = request.get_json() or {} | |
| title = data.get("title", "").strip() | |
| message = data.get("message", "").strip() | |
| target = data.get("target", "all") | |
| dept_filter = data.get("department_id") | |
| via_wa = data.get("send_whatsapp", False) | |
| if not title or not message: | |
| return jsonify({"error": "title and message required."}), 400 | |
| all_users = db_ref.child(f"users/{org_id}").get() or {} | |
| sent_to = 0 | |
| for target_uid, target_data in all_users.items(): | |
| if not isinstance(target_data, dict) or not target_data.get("active"): | |
| continue | |
| if target == "department" and dept_filter and target_data.get("department_id") != dept_filter: | |
| continue | |
| _push_notification(org_id, target_uid, title, message, data.get("type", "info")) | |
| if via_wa and target_data.get("phone"): | |
| _send_whatsapp(target_data["phone"], f"📢 *{title}*\n\n{message}") | |
| sent_to += 1 | |
| _write_audit(org_id, uid, "broadcast_sent", {"title": title, "target": target, "sent_to": sent_to}) | |
| return jsonify({"success": True, "sent_to": sent_to}), 200 | |
| except PermissionError as e: | |
| return jsonify({"error": str(e)}), 403 | |
| # ============================================================================= | |
| # 14. ANALYTICS ENGINE | |
| # ============================================================================= | |
| def _compute_org_analytics(org_id: str) -> dict: | |
| """ | |
| Computes org-wide analytics for the admin dashboard. | |
| """ | |
| all_users = db_ref.child(f"users/{org_id}").get() or {} | |
| all_subs = db_ref.child(f"submissions/{org_id}").get() or {} | |
| all_expenses = db_ref.child(f"expenses/{org_id}").get() or {} | |
| all_leave = db_ref.child(f"leave_requests/{org_id}").get() or {} | |
| current_week = _current_week_id() | |
| week_keys = set(_candidate_week_keys(current_week)) | |
| total_users = len([u for u in all_users.values() if isinstance(u, dict) and u.get("active")]) | |
| submitted_uids_this_week = set() | |
| total_alignment = 0 | |
| alignment_count = 0 | |
| drift_flags = 0 | |
| score_distribution = {"0-40": 0, "41-60": 0, "61-80": 0, "81-100": 0} | |
| dept_alignment = defaultdict(list) | |
| for user_uid, user_sub_map in all_subs.items(): | |
| if not isinstance(user_sub_map, dict): | |
| continue | |
| user_data = all_users.get(user_uid, {}) | |
| dept_id = user_data.get("department_id") if isinstance(user_data, dict) else None | |
| for week_id, sub in user_sub_map.items(): | |
| if not isinstance(sub, dict): | |
| continue | |
| score = sub.get("alignment_score") | |
| if score is not None: | |
| total_alignment += score | |
| alignment_count += 1 | |
| dept_alignment[dept_id or "unassigned"].append(score) | |
| if score <= 40: score_distribution["0-40"] += 1 | |
| elif score <= 60: score_distribution["41-60"] += 1 | |
| elif score <= 80: score_distribution["61-80"] += 1 | |
| else: score_distribution["81-100"] += 1 | |
| if sub.get("role_drift_detected"): | |
| drift_flags += 1 | |
| if week_id in week_keys: | |
| submitted_uids_this_week.add(user_uid) | |
| submissions_this_week = len(submitted_uids_this_week) | |
| compliance_rate = round(submissions_this_week / total_users * 100, 1) if total_users else 0 | |
| avg_alignment = round(total_alignment / alignment_count, 1) if alignment_count else None | |
| dept_avg_alignment = { | |
| dept: round(sum(scores) / len(scores), 1) | |
| for dept, scores in dept_alignment.items() if scores | |
| } | |
| # Expense summary | |
| total_expenses_pending = sum(1 for e in all_expenses.values() if isinstance(e, dict) and e.get("status") == "pending") | |
| total_expenses_approved = sum(float(e.get("amount", 0)) for e in all_expenses.values() if isinstance(e, dict) and e.get("status") == "approved") | |
| # Leave summary | |
| leave_pending = sum(1 for l in all_leave.values() if isinstance(l, dict) and l.get("status") == "pending") | |
| leave_approved = sum(1 for l in all_leave.values() if isinstance(l, dict) and l.get("status") == "approved") | |
| # Missing submissions this week (only active users; checks all candidate week keys) | |
| missing_this_week = [ | |
| user_uid for user_uid, user_data in all_users.items() | |
| if isinstance(user_data, dict) | |
| and user_data.get("active") | |
| and user_uid not in submitted_uids_this_week | |
| ] | |
| return { | |
| "totalActiveUsers": total_users, | |
| "submissionsThisWeek": submissions_this_week, | |
| "complianceRate": compliance_rate, | |
| "missingSubmissionsUids": missing_this_week, | |
| "averageAlignmentScore": avg_alignment, | |
| "scoreDistribution": score_distribution, | |
| "driftFlagsTotal": drift_flags, | |
| "deptAlignmentAverages": dept_avg_alignment, | |
| "expensesPendingCount": total_expenses_pending, | |
| "totalApprovedSpend": round(total_expenses_approved, 2), | |
| "leavePendingCount": leave_pending, | |
| "leaveApprovedCount": leave_approved, | |
| "weekId": current_week, | |
| } | |
| def _compute_employee_analytics(org_id: str, user_uid: str) -> dict: | |
| all_subs = db_ref.child(f"submissions/{org_id}/{user_uid}").get() or {} | |
| scores = [] | |
| drift_count = 0 | |
| success_events = [] | |
| for week_id, sub in sorted(all_subs.items()): | |
| if not isinstance(sub, dict): | |
| continue | |
| score = sub.get("alignment_score") | |
| if score is not None: | |
| scores.append({"weekId": week_id, "score": score, "submittedAt": sub.get("submittedAt")}) | |
| if sub.get("role_drift_detected"): | |
| drift_count += 1 | |
| for se in sub.get("success_events", []): | |
| success_events.append({"weekId": week_id, "event": se}) | |
| avg_score = round(sum(s["score"] for s in scores) / len(scores), 1) if scores else None | |
| velocity = 0.0 | |
| if len(scores) >= 2: | |
| n = len(scores) | |
| xs = list(range(1, n + 1)) | |
| ys = [s["score"] for s in scores] | |
| mx, my = sum(xs) / n, sum(ys) / n | |
| num = sum((x - mx) * (y - my) for x, y in zip(xs, ys)) | |
| den = sum((x - mx) ** 2 for x in xs) | |
| velocity = round(num / den, 2) if den else 0.0 | |
| user_data = db_ref.child(f"users/{org_id}/{user_uid}").get() or {} | |
| leave_requests = db_ref.child(f"leave_requests/{org_id}").get() or {} | |
| my_leave = [l for l in leave_requests.values() if isinstance(l, dict) and l.get("uid") == user_uid] | |
| return { | |
| "scoreTrend": scores, | |
| "averageScore": avg_score, | |
| "improvementVelocity": velocity, | |
| "driftFlagsTotal": drift_count, | |
| "recentSuccessEvents": success_events[-5:], | |
| "leaveBalance": user_data.get("leave_balance", {}), | |
| "leaveHistory": my_leave, | |
| "totalSubmissions": len(scores), | |
| } | |
| # ============================================================================= | |
| # 15. DASHBOARD ENDPOINTS | |
| # ============================================================================= | |
| def admin_dashboard(org_id): | |
| try: | |
| uid, role, _ = verify_org_admin(request.headers.get("Authorization"), org_id) | |
| analytics = _compute_org_analytics(org_id) | |
| org = get_org(org_id) | |
| depts = db_ref.child(f"departments/{org_id}").get() or {} | |
| users = db_ref.child(f"users/{org_id}").get() or {} | |
| # Recent submissions — match canonical and legacy week keys | |
| current_week = _current_week_id() | |
| week_keys = _candidate_week_keys(current_week) | |
| recent_subs = [] | |
| for user_uid, sub_map in (db_ref.child(f"submissions/{org_id}").get() or {}).items(): | |
| if not isinstance(sub_map, dict): | |
| continue | |
| week_sub = None | |
| for key in week_keys: | |
| if isinstance(sub_map.get(key), dict): | |
| week_sub = sub_map[key] | |
| break | |
| if week_sub: | |
| user_info = users.get(user_uid, {}) | |
| recent_subs.append({ | |
| "submissionId": week_sub.get("submissionId"), | |
| "uid": user_uid, | |
| "displayName": user_info.get("displayName", user_uid), | |
| "department": user_info.get("department_id"), | |
| "weekId": current_week, | |
| "alignment_score": week_sub.get("alignment_score"), | |
| "role_drift_detected": week_sub.get("role_drift_detected"), | |
| "role_drift": week_sub.get("role_drift_detected"), | |
| "rawText": week_sub.get("rawText"), | |
| "summary": week_sub.get("summary"), | |
| "submittedAt": week_sub.get("submittedAt"), | |
| }) | |
| return jsonify({ | |
| "org": org, | |
| "analytics": analytics, | |
| "departments": list(depts.values()), | |
| "recentActivity": recent_subs, | |
| }), 200 | |
| except PermissionError as e: | |
| return jsonify({"error": str(e)}), 403 | |
| except Exception as e: | |
| logger.error(f"admin_dashboard failed: {e}\n{traceback.format_exc()}") | |
| return jsonify({"error": "Dashboard compute failed."}), 500 | |
| def employee_dashboard(org_id): | |
| try: | |
| uid, user_data = verify_org_member(request.headers.get("Authorization"), org_id) | |
| analytics = _compute_employee_analytics(org_id, uid) | |
| # Tasks | |
| all_tasks = db_ref.child(f"tasks/{org_id}").get() or {} | |
| my_tasks = [t for t in all_tasks.values() if isinstance(t, dict) and t.get("assignedTo") == uid] | |
| open_tasks = [t for t in my_tasks if t.get("status") in ("open", "in_progress")] | |
| # Notifications | |
| notifs_raw = db_ref.child(f"notifications/{org_id}/{uid}").get() or {} | |
| unread_count = sum(1 for n in notifs_raw.values() if isinstance(n, dict) and not n.get("read")) | |
| return jsonify({ | |
| "user": user_data, | |
| "analytics": analytics, | |
| "openTasks": open_tasks, | |
| "unreadNotifications": unread_count, | |
| }), 200 | |
| except PermissionError as e: | |
| return jsonify({"error": str(e)}), 403 | |
| # ============================================================================= | |
| # 16. WHATSAPP / TWILIO RELAY ENDPOINTS | |
| # ============================================================================= | |
| # | |
| # All WhatsApp interactions are routed through /api/whatsapp/inbound. | |
| # Twilio sends a POST to this endpoint on every incoming message. | |
| # The server maintains a per-user conversation state in Firebase to handle | |
| # multi-turn interactions (e.g. confirming a leave request). | |
| # | |
| # WEBHOOK_SECRET is used to authenticate the weekly prompt scheduler call. | |
| # | |
| # SMART PROMPT LOGIC: | |
| # The weekly prompt scheduler checks each employee's submission history. | |
| # If Gemini confirms a meaningful submission already exists this week, the | |
| # employee is NOT prompted again. Only those with no qualifying submission | |
| # receive the Friday reminder. | |
| # | |
| # ============================================================================= | |
| def whatsapp_inbound(): | |
| """ | |
| Receives incoming WhatsApp messages from Twilio. | |
| Identifies the org and user by phone number, routes the message. | |
| """ | |
| try: | |
| from_number = request.form.get("From", "").replace("whatsapp:", "").strip() | |
| body = request.form.get("Body", "").strip() | |
| media_url = request.form.get("MediaUrl0", "") | |
| media_type = request.form.get("MediaContentType0", "") | |
| if not from_number: | |
| return jsonify({"error": "No sender."}), 400 | |
| # Find user by phone number across all orgs | |
| org_id = user_uid = None | |
| platform_orgs = db_ref.child("platform_orgs").get() or {} | |
| for oid in platform_orgs: | |
| org_users = db_ref.child(f"users/{oid}").get() or {} | |
| for u_uid, u_data in org_users.items(): | |
| if isinstance(u_data, dict) and u_data.get("phone", "").replace("+", "").replace(" ", "") == from_number.replace("+", "").replace(" ", ""): | |
| org_id = oid | |
| user_uid = u_uid | |
| break | |
| if org_id: | |
| break | |
| if not org_id or not user_uid: | |
| logger.warning(f"WhatsApp inbound from unregistered number: {from_number}") | |
| return ("<?xml version='1.0' encoding='UTF-8'?>" | |
| "<Response><Message>Sorry, your number is not registered on IrisTrack. " | |
| "Please contact your HR admin.</Message></Response>"), 200, {"Content-Type": "text/xml"} | |
| user_data = db_ref.child(f"users/{org_id}/{user_uid}").get() or {} | |
| wa_state = user_data.get("whatsapp_state", "idle") | |
| # If audio message — transcribe via AssemblyAI first | |
| if media_url and media_type and "audio" in media_type: | |
| try: | |
| # Download audio from Twilio (requires auth) | |
| audio_resp = requests.get( | |
| media_url, | |
| auth=(TWILIO_ACCOUNT_SID, TWILIO_AUTH_TOKEN), | |
| timeout=30, | |
| ) | |
| audio_resp.raise_for_status() | |
| body = transcribe_audio_file(audio_resp.content, "voice_note.ogg") | |
| logger.info(f"Transcribed WhatsApp audio for {user_uid}: {body[:100]}") | |
| except Exception as e: | |
| logger.error(f"WhatsApp audio transcription failed for {user_uid}: {e}") | |
| _send_whatsapp(from_number, "Sorry, I couldn't process your voice note. Please try again or type your message.") | |
| return ("<?xml version='1.0' encoding='UTF-8'?><Response></Response>"), 200, {"Content-Type": "text/xml"} | |
| if not body: | |
| return ("<?xml version='1.0' encoding='UTF-8'?><Response></Response>"), 200, {"Content-Type": "text/xml"} | |
| org = get_org(org_id) | |
| org_name = org.get("name", org_id) if org else org_id | |
| # Route by intent | |
| intent_result = gemini_process_general_whatsapp_message( | |
| body, | |
| user_data.get("displayName", "Employee"), | |
| org_name, | |
| wa_state, | |
| ) | |
| intent = intent_result.get("intent", "other") | |
| reply_to_user = None # We generally do NOT reply immediately to the employee | |
| extracted = intent_result.get("extracted_data", {}) | |
| if intent == "weekly_update": | |
| _process_submission_text(org_id, user_uid, body) | |
| # No reply to employee — admins are notified internally | |
| reply_to_user = None | |
| elif intent == "leave_request": | |
| _process_leave_request_text(org_id, user_uid, body) | |
| reply_to_user = "Your leave request has been submitted and is pending approval. You'll be notified once a decision is made." | |
| elif intent == "expense_submission": | |
| receipt_url = media_url if media_url and "image" in (media_type or "") else None | |
| expense_text = body | |
| if not expense_text and receipt_url: | |
| expense_text = "Expense via WhatsApp receipt upload" | |
| _process_expense_text(org_id, user_uid, expense_text, receipt_url) | |
| reply_to_user = "Expense submitted for approval. You'll be notified once it's reviewed." | |
| elif intent == "meeting_update": | |
| # Employee is confirming a meeting outcome or uploading recording | |
| if media_url and "audio" in (media_type or ""): | |
| # Find their most recent scheduled meeting | |
| meetings = db_ref.child(f"client_meetings/{org_id}").get() or {} | |
| recent_meeting = None | |
| for mid, m in meetings.items(): | |
| if isinstance(m, dict) and user_uid in m.get("attendeeUids", []) and m.get("status") in ("scheduled", "in_progress"): | |
| recent_meeting = (mid, m) | |
| break | |
| if recent_meeting: | |
| meeting_id, meeting_data = recent_meeting | |
| db_ref.child(f"client_meetings/{org_id}/{meeting_id}").update({"status": "in_progress"}) | |
| # Transcription + processing happens internally | |
| try: | |
| audio_resp = requests.get(media_url, auth=(TWILIO_ACCOUNT_SID, TWILIO_AUTH_TOKEN), timeout=30) | |
| transcript = transcribe_audio_file(audio_resp.content, "meeting_audio.ogg") | |
| attendee_names = [] | |
| for a_uid in meeting_data.get("attendeeUids", []): | |
| a = db_ref.child(f"users/{org_id}/{a_uid}").get() or {} | |
| if a.get("displayName"): | |
| attendee_names.append(a["displayName"]) | |
| intelligence = gemini_process_meeting_recording(transcript, meeting_data["title"], attendee_names, org_name) | |
| db_ref.child(f"client_meetings/{org_id}/{meeting_id}").update({ | |
| "status": "completed", "completedAt": _now_iso(), | |
| "transcript": transcript, "intelligence": intelligence, | |
| }) | |
| # Notify chain | |
| for a_uid in meeting_data.get("attendeeUids", []): | |
| a_data = db_ref.child(f"users/{org_id}/{a_uid}").get() or {} | |
| _push_notification(org_id, a_uid, f"Meeting Summary Ready: {meeting_data['title']}", intelligence.get("executive_summary", "")[:120], "success", {"meeting_id": meeting_id}) | |
| except Exception as e: | |
| logger.error(f"WhatsApp meeting recording processing failed: {e}") | |
| reply_to_user = None | |
| else: | |
| reply_to_user = None | |
| elif intent == "general_query": | |
| # Surface leave balance, task status etc | |
| leave_balance = user_data.get("leave_balance", {}) | |
| balance_text = ", ".join(f"{k}: {v} days" for k, v in leave_balance.items()) | |
| reply_to_user = f"Here's a quick summary for you:\n\nLeave balance: {balance_text}\n\nFor more details, log in to the IrisTrack portal." | |
| elif intent == "greeting": | |
| reply_to_user = f"Hi {user_data.get('displayName', 'there')}! 👋 You can send me your weekly update, request leave, submit an expense, or ask about your balances." | |
| if reply_to_user: | |
| _send_whatsapp(from_number, reply_to_user) | |
| return ("<?xml version='1.0' encoding='UTF-8'?><Response></Response>"), 200, {"Content-Type": "text/xml"} | |
| except Exception as e: | |
| logger.error(f"whatsapp_inbound failed: {e}\n{traceback.format_exc()}") | |
| return ("<?xml version='1.0' encoding='UTF-8'?><Response></Response>"), 200, {"Content-Type": "text/xml"} | |
| def send_weekly_prompts(): | |
| """ | |
| Triggered by an external cron scheduler on the org's configured reporting day. | |
| Checks each employee's submission history — only sends prompts to those who | |
| have NOT made a meaningful update this week. Gemini evaluates alignment. | |
| Secured by WEBHOOK_SECRET. | |
| """ | |
| auth_header = request.headers.get("Authorization", "") | |
| if not WEBHOOK_SECRET or auth_header != f"Bearer {WEBHOOK_SECRET}": | |
| return jsonify({"error": "Unauthorized."}), 401 | |
| data = request.get_json() or {} | |
| org_id = data.get("org_id") | |
| platform_orgs = db_ref.child("platform_orgs").get() or {} | |
| orgs_to_process = [org_id] if org_id else list(platform_orgs.keys()) | |
| total_sent = 0 | |
| total_skipped = 0 | |
| for oid in orgs_to_process: | |
| org = get_org(oid) | |
| if not org or not org.get("active"): | |
| continue | |
| org_name = org.get("name", oid) | |
| org_goals = org.get("org_goals", "") | |
| current_week = _current_week_id() | |
| all_users = db_ref.child(f"users/{oid}").get() or {} | |
| all_subs = db_ref.child(f"submissions/{oid}").get() or {} | |
| # KPI categories for this org (combined from all models) | |
| all_models = db_ref.child(f"kpi_models/{oid}").get() or {} | |
| all_kpi_cats = list({ | |
| cat["name"] | |
| for model in all_models.values() | |
| if isinstance(model, dict) | |
| for cat in model.get("kpi_categories", []) | |
| if isinstance(cat, dict) | |
| }) | |
| for user_uid, user_data in all_users.items(): | |
| if not isinstance(user_data, dict) or not user_data.get("active"): | |
| continue | |
| if user_data.get("platform_role") in ("org_admin",): | |
| continue | |
| phone = user_data.get("phone", "") | |
| if not phone: | |
| continue | |
| # Check if employee already submitted meaningfully this week | |
| user_subs = all_subs.get(user_uid, {}) or {} | |
| week_sub = user_subs.get(current_week) | |
| already_submitted = False | |
| if week_sub and isinstance(week_sub, dict): | |
| # Direct submission record exists | |
| already_submitted = True | |
| else: | |
| # Check last_submission_at loosely via the user record | |
| last_sub_at = user_data.get("last_submission_at", "") | |
| if last_sub_at: | |
| try: | |
| last_sub_date = _parse_iso(last_sub_at).date() | |
| today = date.today() | |
| week_start = today - timedelta(days=today.weekday()) | |
| if last_sub_date >= week_start: | |
| # There's a recent submission — verify with Gemini if meaningful | |
| raw_text = week_sub.get("rawText", "") if week_sub else "" | |
| if raw_text: | |
| already_submitted = gemini_check_submission_alignment_with_goals( | |
| raw_text, org_goals, all_kpi_cats | |
| ) | |
| except Exception: | |
| pass | |
| if already_submitted: | |
| logger.info(f"CRON | Skipping prompt for {user_uid} — already submitted this week.") | |
| total_skipped += 1 | |
| continue | |
| # Send prompt | |
| display_name = user_data.get("displayName", "there") | |
| _send_whatsapp( | |
| phone, | |
| f"Hi {display_name}! 👋\n\n" | |
| f"It's your weekly check-in for *{org_name}*.\n\n" | |
| f"What did you work on this week? You can reply with a voice note or text — " | |
| f"no specific format needed, just tell me what you got done." | |
| ) | |
| total_sent += 1 | |
| logger.info(f"CRON | Sent weekly prompt to {user_uid} ({phone})") | |
| return jsonify({"success": True, "prompts_sent": total_sent, "skipped": total_skipped}), 200 | |
| # ============================================================================= | |
| # 17. SUPERADMIN ENDPOINTS | |
| # ============================================================================= | |
| def superadmin_list_orgs(): | |
| try: | |
| verify_superadmin(request.headers.get("Authorization")) | |
| platform_orgs = db_ref.child("platform_orgs").get() or {} | |
| result = [] | |
| for org_id, org_index in platform_orgs.items(): | |
| org_data = db_ref.child(f"organizations/{org_id}").get() or {} | |
| all_users = db_ref.child(f"users/{org_id}").get() or {} | |
| result.append({ | |
| **org_index, | |
| **org_data, | |
| "user_count": len([u for u in all_users.values() if isinstance(u, dict) and u.get("active")]), | |
| }) | |
| return jsonify(result), 200 | |
| except PermissionError as e: | |
| return jsonify({"error": str(e)}), 403 | |
| def superadmin_org_analytics(org_id): | |
| try: | |
| verify_superadmin(request.headers.get("Authorization")) | |
| analytics = _compute_org_analytics(org_id) | |
| return jsonify(analytics), 200 | |
| except PermissionError as e: | |
| return jsonify({"error": str(e)}), 403 | |
| def superadmin_deactivate_org(org_id): | |
| try: | |
| verify_superadmin(request.headers.get("Authorization")) | |
| db_ref.child(f"organizations/{org_id}").update({"active": False, "deactivatedAt": _now_iso()}) | |
| db_ref.child(f"platform_orgs/{org_id}").update({"active": False}) | |
| logger.warning(f"SuperAdmin deactivated org: {org_id}") | |
| return jsonify({"success": True}), 200 | |
| except PermissionError as e: | |
| return jsonify({"error": str(e)}), 403 | |
| # ============================================================================= | |
| # 18. AUDIT LOG | |
| # ============================================================================= | |
| def get_audit_log(org_id): | |
| try: | |
| uid, role, _ = verify_org_admin(request.headers.get("Authorization"), org_id) | |
| if role != "org_admin": | |
| return jsonify({"error": "Only org_admin can access audit log."}), 403 | |
| log_entries = db_ref.child(f"audit_log/{org_id}").get() or {} | |
| entries = sorted(log_entries.values(), key=lambda e: e.get("timestamp", ""), reverse=True) | |
| limit = min(int(request.args.get("limit", 100)), 500) | |
| return jsonify({"entries": entries[:limit], "total": len(entries)}), 200 | |
| except PermissionError as e: | |
| return jsonify({"error": str(e)}), 403 | |
| # ============================================================================= | |
| # 19. DEBUG | |
| # ============================================================================= | |
| def health_check(): | |
| return jsonify({ | |
| "status": "ok", | |
| "service": "IrisTrack", | |
| "version": "1.0.0", | |
| "timestamp": _now_iso(), | |
| "gemini": MODEL_NAME, | |
| "assemblyai": "configured" if ASSEMBLYAI_API_KEY else "missing", | |
| "twilio": "configured" if TWILIO_ACCOUNT_SID else "missing", | |
| }), 200 | |
| def debug_transcribe(): | |
| """Test AssemblyAI with an uploaded audio file.""" | |
| try: | |
| audio_file = request.files.get("audio_file") | |
| if not audio_file: | |
| return jsonify({"error": "audio_file required."}), 400 | |
| transcript = transcribe_audio_file(audio_file.read(), audio_file.filename or "test.ogg") | |
| return jsonify({"transcript": transcript}), 200 | |
| except Exception as e: | |
| return jsonify({"error": str(e)}), 500 | |
| # ============================================================================= | |
| # 20. MAIN | |
| # ============================================================================= | |
| if __name__ == "__main__": | |
| port = int(os.environ.get("PORT", 7860)) | |
| app.run(debug=False, host="0.0.0.0", port=port) | |