import os import io import re import json import uuid import time import math # Added for Location Math import traceback from datetime import datetime, timedelta from flask import Flask, request, jsonify, Response from flask_cors import CORS import firebase_admin from firebase_admin import credentials, db, storage, auth from PIL import Image import requests # Google GenAI (Gemini) from google import genai from google.genai import types import logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # ----------------------------------------------------------------------------- # 1. CONFIGURATION & INITIALIZATION # ----------------------------------------------------------------------------- app = Flask(__name__) CORS(app) # --- HARDCODED ADMINS (MVP STRATEGY) --- HARDCODED_ADMIN_EMAILS = [ "rairorr@gmail.com", "carolrue7@gmail.com" ] # --- Firebase Initialization --- try: credentials_json_string = os.environ.get("FIREBASE") if not credentials_json_string: raise ValueError("The FIREBASE environment variable is not set.") credentials_json = json.loads(credentials_json_string) firebase_db_url = os.environ.get("Firebase_DB") firebase_storage_bucket = os.environ.get("Firebase_Storage") if not firebase_db_url or not firebase_storage_bucket: raise ValueError("Firebase_DB and Firebase_Storage environment variables must be set.") cred = credentials.Certificate(credentials_json) firebase_admin.initialize_app(cred, { "databaseURL": firebase_db_url, "storageBucket": firebase_storage_bucket }) logger.info("Firebase Admin SDK initialized successfully.") except Exception as e: logger.error(f"FATAL: Error initializing Firebase: {e}") raise bucket = storage.bucket() db_ref = db.reference() # --- Google GenAI Client Initialization --- try: api_key = os.environ.get("Gemini") if not api_key: raise ValueError("The 'Gemini' environment variable is not set.") client = genai.Client(api_key=api_key) logger.info("Google GenAI Client initialized successfully.") except Exception as e: logger.error(f"FATAL: Error initializing GenAI Client: {e}") raise # --- Model Constants --- VISION_MODEL = "gemini-2.5-flash" # Vision + text TEXT_MODEL = "gemini-2.5-flash" # text-only tasks # ----------------------------------------------------------------------------- # 2. HELPER FUNCTIONS # ----------------------------------------------------------------------------- def now_iso() -> str: return datetime.utcnow().isoformat() + "Z" def verify_token(auth_header): """Verifies the Firebase ID token from the Authorization header.""" if not auth_header or not auth_header.startswith("Bearer "): return None token = auth_header.split("Bearer ")[1] try: decoded = auth.verify_id_token(token) return decoded.get("uid") except Exception as e: logger.warning(f"Token verification failed: {e}") return None def verify_admin(auth_header): """Verifies if the user is an admin.""" uid = verify_token(auth_header) if not uid: raise PermissionError("Invalid or missing user token") user = db_ref.child(f"users/{uid}").get() or {} if not user.get("is_admin", False): raise PermissionError("Admin access required") return uid def upload_to_storage(data_bytes, destination_blob_name, content_type): """Uploads bytes to Firebase Storage and returns its public URL.""" blob = bucket.blob(destination_blob_name) blob.upload_from_string(data_bytes, content_type=content_type) blob.make_public() return blob.public_url def safe_float(x, default=None): try: if x is None or x == "": return default return float(x) except Exception: return default def safe_int(x, default=None): try: return int(x) except Exception: return default def normalize_text(s: str) -> str: return re.sub(r"\s+", " ", str(s or "")).strip().lower() def haversine_distance(lat1, lon1, lat2, lon2): """ Calculate the great circle distance in kilometers between two points on the earth (specified in decimal degrees). """ if lat1 is None or lon1 is None or lat2 is None or lon2 is None: return None try: # Convert decimal degrees to radians lat1, lon1, lat2, lon2 = map(math.radians, [float(lat1), float(lon1), float(lat2), float(lon2)]) # Haversine formula dlon = lon2 - lon1 dlat = lat2 - lat1 a = math.sin(dlat/2)**2 + math.cos(lat1) * math.cos(lat2) * math.sin(dlon/2)**2 c = 2 * math.asin(math.sqrt(a)) r = 6371 # Radius of earth in kilometers return r * c except Exception: return None def send_text_request(model_name, prompt, image=None): """ Helper: if image is provided, send [prompt, image]. If no image, send prompt only. Returns response text or None. """ try: chat = client.chats.create(model=model_name) if image is None: resp = chat.send_message([prompt]) else: resp = chat.send_message([prompt, image]) text_out = "" for part in resp.candidates[0].content.parts: if hasattr(part, "text") and part.text: text_out += part.text return text_out.strip() if text_out else None except Exception as e: logger.error(f"Error with model {model_name}: {e}") return None def extract_json_from_text(text: str): """ Robust-ish JSON extraction: - Try direct JSON parse - Else find first {...} block and parse that """ if not text: return None text = text.strip() # direct try: return json.loads(text) except Exception: pass # find first JSON object m = re.search(r"\{.*\}", text, re.DOTALL) if not m: return None try: return json.loads(m.group(0)) except Exception: return None def require_role(uid: str, allowed_roles: list[str]) -> dict: """ Reads user profile and checks role. Returns user_data if ok, else raises PermissionError with a clear message. """ user_data = db_ref.child(f"users/{uid}").get() if not user_data: raise PermissionError( f"User profile missing in RTDB at /users/{uid}. " f"Call /api/auth/social-signin (or /api/auth/signup) once after login to bootstrap the profile." ) # Bypass role check if user is admin if user_data.get("is_admin"): return user_data role = (user_data.get("role") or "").lower().strip() if role not in allowed_roles: raise PermissionError(f"Role '{role}' not allowed. Allowed roles: {allowed_roles}") return user_data def get_or_create_profile(uid: str) -> dict: """ Ensures /users/{uid} exists in RTDB for any authenticated user. **UPDATED**: Checks hardcoded admin emails to force role=admin. """ ref = db_ref.child(f"users/{uid}") user_data = ref.get() fb_user = auth.get_user(uid) email = (fb_user.email or "").lower() # Check Admin Injection is_hardcoded_admin = email in [e.lower() for e in HARDCODED_ADMIN_EMAILS] # If user exists, update Admin status if needed if user_data: patch = {} # If they are on the list, ensure they are admin if is_hardcoded_admin: if not user_data.get("is_admin") or user_data.get("role") != "admin": patch["is_admin"] = True patch["role"] = "admin" patch["onboardingComplete"] = True patch["roleSetAt"] = user_data.get("roleSetAt") or now_iso() # FIX: Ensure onboardingComplete is true if role is already set if user_data.get("role") and not user_data.get("onboardingComplete"): # Admins always complete onboarding if user_data.get("role") == "admin" or is_hardcoded_admin: patch["onboardingComplete"] = True else: # For others, only if they have a non-empty role patch["onboardingComplete"] = True if not user_data.get("roleSetAt"): patch["roleSetAt"] = user_data.get("createdAt") or now_iso() # Social signin patch for display name if not user_data.get("displayName") and fb_user.display_name: patch["displayName"] = fb_user.display_name if patch: ref.update(patch) user_data = ref.get() return user_data # Create new profile role = "admin" if is_hardcoded_admin else "" # Empty role triggers onboarding for non-admins onboarding_complete = True if is_hardcoded_admin else False new_user_data = { "email": email, "displayName": fb_user.display_name or "", "phone_number": "", "city": "", "role": role, "is_admin": is_hardcoded_admin, "onboardingComplete": onboarding_complete, "roleSetAt": now_iso() if is_hardcoded_admin else None, "verificationStatus": "unverified", # unverified | pending | verified | rejected "createdAt": now_iso() } ref.set(new_user_data) return new_user_data def push_notification(to_uid: str, notif_type: str, title: str, body: str, meta: dict | None = None): """ In-app notification stored in RTDB: /notifications/{uid}/{notifId} """ notif_id = str(uuid.uuid4()) payload = { "notifId": notif_id, "type": notif_type, "title": title, "body": body, "meta": meta or {}, "createdAt": now_iso(), "read": False } db_ref.child(f"notifications/{to_uid}/{notif_id}").set(payload) return payload def task_access_check(uid: str, task: dict, user_role: str): """ Role-aware access: - customer can access own tasks - tasker can access open tasks + tasks they are assigned to + tasks they bid on - admin can access all """ if user_role == "admin": return True owner = task.get("createdBy") assigned = task.get("assignedTaskerId") if user_role == "customer": return owner == uid if user_role == "tasker": if task.get("status") in ["open", "bidding"] and owner != uid: return True if assigned == uid: return True # bid check bids = db_ref.child(f"bids/{task.get('taskId')}").get() or {} for b in bids.values(): if b.get("taskerId") == uid: return True return False return False # ----------------------------------------------------------------------------- # 3. BASIC HEALTH # ----------------------------------------------------------------------------- @app.route("/api/health", methods=["GET"]) def health(): return jsonify({"ok": True, "service": "oneplus-server", "time": now_iso()}), 200 # ----------------------------------------------------------------------------- # 4. AUTH & USER PROFILES # ----------------------------------------------------------------------------- @app.route("/api/auth/signup", methods=["POST"]) def signup(): """ Email + password signup. Creates Firebase Auth user + RTDB user profile. """ try: data = request.get_json() or {} email = data.get("email") password = data.get("password") display_name = data.get("displayName") phone = data.get("phone") city = data.get("city") role = (data.get("role") or "customer").lower().strip() # customer|tasker|admin(not allowed here) if role not in ["customer", "tasker"]: return jsonify({"error": "Invalid role. Use customer or tasker."}), 400 if not email or not password: return jsonify({"error": "Email and password are required"}), 400 user = auth.create_user(email=email, password=password, display_name=display_name) # Admin Injection logic for Signup is_admin = False if email.lower() in [e.lower() for e in HARDCODED_ADMIN_EMAILS]: role = "admin" is_admin = True user_data = { "email": email, "displayName": display_name, "phone_number": phone, "city": city, "role": role, "is_admin": is_admin, "onboardingComplete": True, "roleSetAt": now_iso(), "verificationStatus": "unverified", "createdAt": now_iso() } db_ref.child(f"users/{user.uid}").set(user_data) return jsonify({"success": True, "uid": user.uid, **user_data}), 201 except Exception as e: logger.error(f"Signup failed: {e}") if "EMAIL_EXISTS" in str(e): return jsonify({"error": "An account with this email already exists."}), 409 return jsonify({"error": str(e)}), 400 @app.route("/api/auth/social-signin", methods=["POST"]) def social_signin(): """ Ensures RTDB user record exists. Social login happens on client. """ uid = verify_token(request.headers.get("Authorization")) if not uid: return jsonify({"error": "Invalid or expired token"}), 401 try: # get_or_create_profile handles admin injection user_data = get_or_create_profile(uid) return jsonify({"success": True, "uid": uid, **user_data}), 200 except Exception as e: logger.error(f"social_signin failed: {e}") return jsonify({"error": f"Failed to create user profile: {str(e)}"}), 500 @app.route("/api/auth/set-role", methods=["POST"]) def set_role_after_social_signin(): """ Set role after first social sign-in. """ uid = verify_token(request.headers.get("Authorization")) if not uid: return jsonify({"error": "Invalid or expired token"}), 401 data = request.get_json() or {} requested_role = (data.get("role") or "").lower().strip() if requested_role not in ["customer", "tasker"]: return jsonify({"error": "Invalid role. Use customer or tasker."}), 400 try: user_ref = db_ref.child(f"users/{uid}") user_data = get_or_create_profile(uid) # ensure exists # IF ADMIN via injection, LOCK role changes if user_data.get("is_admin"): patch = { "onboardingComplete": True, "roleSetAt": user_data.get("roleSetAt") or now_iso(), "updatedAt": now_iso() } user_ref.update(patch) updated = user_ref.get() return jsonify({"success": True, "uid": uid, "profile": updated, "note": "User is Admin, role locked."}), 200 current_role = (user_data.get("role") or "").lower().strip() # Allow switching to customer anytime if requested_role == "customer": patch = { "role": "customer", "onboardingComplete": True, "updatedAt": now_iso(), } if not user_data.get("roleSetAt"): patch["roleSetAt"] = now_iso() user_ref.update(patch) updated = user_ref.get() or {} return jsonify({"success": True, "uid": uid, "profile": updated}), 200 # Allow switching to tasker if requested_role == "tasker": # If they are already verified, just let them switch # If not, they can still switch but the UI will show them as pending/restricted patch = { "role": "tasker", "onboardingComplete": True, "updatedAt": now_iso(), } if not user_data.get("roleSetAt"): patch["roleSetAt"] = now_iso() # Record when they first became/requested to be a tasker if current_role != "tasker" and not user_data.get("roleUpgradedAt"): patch["roleUpgradedAt"] = now_iso() user_ref.update(patch) updated = user_ref.get() or {} return jsonify({"success": True, "uid": uid, "profile": updated}), 200 # Block any other change (like trying to set role to admin) return jsonify({ "error": "Role change blocked", "reason": "Invalid role requested.", }), 400 except Exception as e: logger.error(f"[SET ROLE] failed: {e}") return jsonify({"error": "Internal server error"}), 500 @app.route("/api/user/profile", methods=["GET"]) def get_user_profile(): uid = verify_token(request.headers.get("Authorization")) if not uid: return jsonify({"error": "Invalid or expired token"}), 401 try: user_data = get_or_create_profile(uid) return jsonify({"uid": uid, **user_data}), 200 except Exception as e: logger.error(f"get_user_profile failed: {e}") return jsonify({"error": "Failed to load profile"}), 500 @app.route("/api/user/profile", methods=["PUT"]) def update_user_profile(): """ Updates user profile. **UPDATED**: Accepts 'lat', 'lng', 'serviceRadiusKm' for Location Matching. """ uid = verify_token(request.headers.get("Authorization")) if not uid: return jsonify({"error": "Invalid or expired token"}), 401 try: _ = get_or_create_profile(uid) except Exception as e: return jsonify({"error": "Failed to bootstrap profile"}), 500 data = request.get_json() or {} allowed = {} # Common fields for key in ["displayName", "phone_number", "city"]: if key in data: allowed[key] = data.get(key) # Role-specific (tasker) + Location fields for key in ["skills", "categories", "bio", "serviceRadiusKm", "baseRate", "profilePhotoUrl", "availability", "lat", "lng"]: if key in data: # Type safety for numeric if key in ["lat", "lng", "baseRate", "serviceRadiusKm"]: allowed[key] = safe_float(data.get(key)) else: allowed[key] = data.get(key) if not allowed: return jsonify({"error": "No valid fields provided"}), 400 try: # If displayName changes, also update Auth profile if "displayName" in allowed and allowed["displayName"]: auth.update_user(uid, display_name=str(allowed["displayName"])) db_ref.child(f"users/{uid}").update(allowed) return jsonify({"success": True, "updated": allowed}), 200 except Exception as e: logger.error(f"update_user_profile failed: {e}") return jsonify({"error": f"Failed to update profile: {str(e)}"}), 500 # ----------------------------------------------------------------------------- # 5. AI & SMART CAPTURE # ----------------------------------------------------------------------------- @app.route("/api/ai/smart-capture", methods=["POST"]) def smart_capture(): """ Customer uploads image -> Gemini Vision -> Structured Task Data. """ uid = verify_token(request.headers.get("Authorization")) if not uid: return jsonify({"error": "Unauthorized"}), 401 try: if "image" not in request.files: return jsonify({"error": "Image file is required (field name: image)"}), 400 image_file = request.files["image"] context_text = request.form.get("contextText", "") or "" image_bytes = image_file.read() pil_image = Image.open(io.BytesIO(image_bytes)).convert("RGB") prompt = f""" You are One Plus Smart Capture (task diagnosis). Analyze the image + optional context and return ONLY valid JSON. Context (may be empty): "{context_text}" Return this schema: {{ "category": "plumbing|electrical|cleaning|moving|assembly|beauty|event_setup|handyman|painting|gardening|appliance_repair|other", "problemSummary": "plain language summary (1-2 sentences)", "difficulty": "easy|moderate|complex", "timeEstimate": "e.g. 30-60 minutes, 1-2 hours, 1 day", "priceBand": "low|medium|high", "suggestedBudgetRange": "e.g. $20-$40 (rough)", "suggestedMaterials": ["..."], "suggestedTitle": "short task title", "suggestedDescription": "professional task description, include key constraints + what to check" }} Rules: - Be realistic and safe. - If unsure, pick "other" category and state uncertainty in problemSummary. - Output must be JSON only (no markdown). """ raw = send_text_request(VISION_MODEL, prompt, pil_image) result = extract_json_from_text(raw) if not result: logger.error(f"[SMART CAPTURE] Could not parse JSON. Raw: {raw}") return jsonify({"error": "AI response format error"}), 500 # minimal cleanup defaults result["category"] = (result.get("category") or "other").strip() result["difficulty"] = (result.get("difficulty") or "moderate").strip() result["priceBand"] = (result.get("priceBand") or "medium").strip() if not isinstance(result.get("suggestedMaterials"), list): result["suggestedMaterials"] = [] # Store last smart capture on user (handy for UI) db_ref.child(f"users/{uid}/lastSmartCapture").set({ "createdAt": now_iso(), "contextText": context_text, "result": result }) return jsonify({"success": True, "smartCapture": result}), 200 except Exception as e: logger.error(f"[SMART CAPTURE] Error: {e}") return jsonify({"error": "Internal server error"}), 500 @app.route("/api/ai/improve-description", methods=["POST"]) def improve_description(): """ (RESTORED) User provides a short/vague description; AI rewrites professionally. """ uid = verify_token(request.headers.get("Authorization")) if not uid: return jsonify({"error": "Unauthorized"}), 401 data = request.get_json() or {} short_desc = (data.get("text") or "").strip() category = (data.get("category") or "other").strip() if len(short_desc) < 3: return jsonify({"error": "text is required"}), 400 prompt = f""" Rewrite this task description professionally for a services marketplace. Category: {category} User text: "{short_desc}" Return ONLY JSON: {{ "suggestedTitle": "...", "suggestedDescription": "...", "questionsForTasker": ["...", "...", "..."] }} JSON only, no markdown. """ raw = send_text_request(TEXT_MODEL, prompt, None) result = extract_json_from_text(raw) if not result: return jsonify({"error": "AI response format error"}), 500 if not isinstance(result.get("questionsForTasker"), list): result["questionsForTasker"] = [] return jsonify({"success": True, "result": result}), 200 # ----------------------------------------------------------------------------- # 6. TASKER VERIFICATION (NEW) # ----------------------------------------------------------------------------- @app.route("/api/tasker/verify-docs", methods=["POST"]) def upload_verification_docs(): """ Tasker uploads ID (required) and Certificate (optional). Status becomes 'pending'. """ uid = verify_token(request.headers.get("Authorization")) if not uid: return jsonify({"error": "Unauthorized"}), 401 try: # Only Tasker or Admin user = require_role(uid, ["tasker", "admin"]) # Files id_file = request.files.get("idDocument") cert_file = request.files.get("certificate") # optional if not id_file: return jsonify({"error": "ID Document is required"}), 400 verification_data = { "status": "pending", "submittedAt": now_iso() } # Upload ID ext = (id_file.mimetype or "").split("/")[-1] or "jpg" path = f"users/{uid}/verification/id_{int(time.time())}.{ext}" verification_data["idDocUrl"] = upload_to_storage(id_file.read(), path, id_file.mimetype) # Upload Cert if exists if cert_file: ext = (cert_file.mimetype or "").split("/")[-1] or "jpg" path = f"users/{uid}/verification/cert_{int(time.time())}.{ext}" verification_data["certDocUrl"] = upload_to_storage(cert_file.read(), path, cert_file.mimetype) # Update user profile db_ref.child(f"users/{uid}").update({ "verificationStatus": "pending", "verificationDocs": verification_data }) return jsonify({"success": True, "status": "pending", "docs": verification_data}), 200 except PermissionError as e: return jsonify({"error": str(e)}), 403 except Exception as e: logger.error(f"[VERIFICATION UPLOAD] Error: {e}") return jsonify({"error": "Internal server error"}), 500 # ----------------------------------------------------------------------------- # 7. TASKS (UPDATED WITH LOCATION) # ----------------------------------------------------------------------------- @app.route("/api/tasks", methods=["POST"]) def create_task(): """ Customer creates a task. **UPDATED**: Accepts 'lat' and 'lng' from form data for location matching. """ uid = verify_token(request.headers.get("Authorization")) if not uid: return jsonify({"error": "Unauthorized"}), 401 try: profile = get_or_create_profile(uid) role = (profile.get("role") or "").lower().strip() if role not in ["customer", "admin"]: return jsonify({ "error": "Forbidden", "reason": f"Role '{role}' not allowed to create tasks.", }), 403 # multipart category = request.form.get("category", "").strip() title = request.form.get("title", "").strip() description = request.form.get("description", "").strip() city = request.form.get("city", "").strip() address = request.form.get("address", "").strip() budget = request.form.get("budget", "").strip() schedule_at = request.form.get("scheduleAt", "").strip() smart_capture_json = request.form.get("smartCapture", "").strip() # Location Data lat = safe_float(request.form.get("lat")) lng = safe_float(request.form.get("lng")) if not category or not city or not description: return jsonify({"error": "category, city, and description are required"}), 400 task_id = str(uuid.uuid4()) created_at = now_iso() # Upload media (media[] or image) media_urls = [] files = [] if "media" in request.files: files = request.files.getlist("media") elif "image" in request.files: files = [request.files["image"]] for i, f in enumerate(files): data_bytes = f.read() if not data_bytes: continue ext = (f.mimetype or "application/octet-stream").split("/")[-1] path = f"tasks/{task_id}/media/{i+1}_{int(time.time())}.{ext}" url = upload_to_storage(data_bytes, path, f.mimetype or "application/octet-stream") media_urls.append(url) smart_capture = None if smart_capture_json: try: smart_capture = json.loads(smart_capture_json) except Exception: smart_capture = None task_payload = { "taskId": task_id, "createdBy": uid, "createdByName": profile.get("displayName") or "", "createdAt": created_at, "category": category, "title": title or (smart_capture or {}).get("suggestedTitle") or "Task Request", "description": description or (smart_capture or {}).get("suggestedDescription") or "", "city": city, "address": address, # Geo-location "lat": lat, "lng": lng, "budget": budget, "scheduleAt": schedule_at, "mediaUrls": media_urls, "smartCapture": smart_capture or {}, "status": "open", "assignedTaskerId": "", "selectedBidId": "", "completedAt": "", "cancelledAt": "" } db_ref.child(f"tasks/{task_id}").set(task_payload) # Notify taskers (Updated logic) notify_taskers_for_new_task(task_payload) return jsonify({"success": True, "task": task_payload}), 201 except PermissionError as e: return jsonify({"error": "Forbidden", "reason": str(e)}), 403 except Exception as e: logger.error(f"[CREATE TASK] Error: {e}") return jsonify({"error": "Internal server error"}), 500 def notify_taskers_for_new_task(task: dict): """ **UPDATED** matching: - Distance check (Haversine) vs Service Radius - Category check """ try: users = db_ref.child("users").get() or {} t_lat = task.get("lat") t_lng = task.get("lng") t_cat = normalize_text(task.get("category")) t_city = normalize_text(task.get("city")) for tasker_id, u in users.items(): if (u.get("role") or "").lower().strip() != "tasker": continue # 1. Location Logic (Precision or City fallback) u_lat = u.get("lat") u_lng = u.get("lng") radius = safe_float(u.get("serviceRadiusKm"), 50.0) # If precise location available for both, use math match_location = False if t_lat and t_lng and u_lat and u_lng: dist = haversine_distance(t_lat, t_lng, u_lat, u_lng) if dist is not None and dist <= radius: match_location = True else: # Fallback to City string match ucity = normalize_text(u.get("city")) if not ucity or (t_city and ucity == t_city): match_location = True if not match_location: continue # 2. Category Match cats = u.get("categories") or [] if isinstance(cats, str): cats = [c.strip() for c in cats.split(",") if c.strip()] if cats: ok = any(normalize_text(c) == t_cat for c in cats) if not ok: continue push_notification( to_uid=tasker_id, notif_type="new_task", title="New task in your area", body=f"{task.get('category')} • {task.get('city')}", meta={"taskId": task.get("taskId")} ) except Exception as e: logger.warning(f"[NOTIFY TASKERS] Failed: {e}") @app.route("/api/tasks", methods=["GET"]) def list_tasks(): """ Role-aware list. """ uid = verify_token(request.headers.get("Authorization")) if not uid: return jsonify({"error": "Unauthorized"}), 401 try: user = db_ref.child(f"users/{uid}").get() or {} role = (user.get("role") or "customer").lower().strip() status_f = (request.args.get("status") or "").strip() category_f = (request.args.get("category") or "").strip() city_f = (request.args.get("city") or "").strip() mine = (request.args.get("mine") or "").lower().strip() == "true" tasks = db_ref.child("tasks").get() or {} out = [] for t in tasks.values(): if not t: continue # role gating if role == "customer": if t.get("createdBy") != uid: continue elif role == "tasker": if mine: if t.get("assignedTaskerId") != uid: continue else: if t.get("status") not in ["open", "bidding"]: continue # admin sees all # filters if status_f and t.get("status") != status_f: continue if category_f and normalize_text(t.get("category")) != normalize_text(category_f): continue if city_f and normalize_text(t.get("city")) != normalize_text(city_f): continue out.append(t) out.sort(key=lambda x: x.get("createdAt") or "", reverse=True) return jsonify(out), 200 except Exception as e: logger.error(f"[LIST TASKS] Error: {e}") return jsonify({"error": "Internal server error"}), 500 @app.route("/api/tasker/recommended", methods=["GET"]) def recommended_tasks(): """ **NEW**: Smart Recommender for Taskers. Ranks open tasks based on: 1. Distance (Haversine) 2. Category Match """ uid = verify_token(request.headers.get("Authorization")) if not uid: return jsonify({"error": "Unauthorized"}), 401 try: user = db_ref.child(f"users/{uid}").get() or {} if user.get("role") != "tasker": return jsonify({"error": "Only taskers can view recommended feed"}), 403 u_lat = safe_float(user.get("lat")) u_lng = safe_float(user.get("lng")) u_cats = user.get("categories") or [] if isinstance(u_cats, str): u_cats = [x.strip().lower() for x in u_cats.split(",")] else: u_cats = [str(x).lower() for x in u_cats] all_tasks = db_ref.child("tasks").get() or {} scored_tasks = [] for t in all_tasks.values(): if t.get("status") not in ["open", "bidding"]: continue score = 100 # Distance Logic t_lat = safe_float(t.get("lat")) t_lng = safe_float(t.get("lng")) dist_km = None if u_lat and u_lng and t_lat and t_lng: dist_km = haversine_distance(u_lat, u_lng, t_lat, t_lng) if dist_km is not None: # Deduct 1 point per km score -= dist_km # Category Match t_cat = normalize_text(t.get("category")) if any(c in t_cat for c in u_cats): score += 50 # Big bonus for matching skill # Add verified bonus? t["_debug_score"] = score t["_distance_km"] = dist_km scored_tasks.append(t) # Sort: Higher score first scored_tasks.sort(key=lambda x: x["_debug_score"], reverse=True) return jsonify(scored_tasks), 200 except Exception as e: logger.error(f"[RECOMMENDER] {e}") return jsonify({"error": "Internal server error"}), 500 @app.route("/api/tasks/", methods=["GET"]) def get_task(task_id): uid = verify_token(request.headers.get("Authorization")) if not uid: return jsonify({"error": "Unauthorized"}), 401 try: user = db_ref.child(f"users/{uid}").get() or {} role = (user.get("role") or "customer").lower().strip() task = db_ref.child(f"tasks/{task_id}").get() if not task: return jsonify({"error": "Task not found"}), 404 if not task_access_check(uid, task, role): return jsonify({"error": "Access denied"}), 403 # attach bids count (cheap) bids = db_ref.child(f"bids/{task_id}").get() or {} task["bidsCount"] = len(bids) return jsonify(task), 200 except Exception as e: logger.error(f"[GET TASK] Error: {e}") return jsonify({"error": "Internal server error"}), 500 @app.route("/api/tasks/", methods=["PUT"]) def update_task(task_id): """ Customer edit task. """ uid = verify_token(request.headers.get("Authorization")) if not uid: return jsonify({"error": "Unauthorized"}), 401 try: user = require_role(uid, ["customer", "admin"]) task_ref = db_ref.child(f"tasks/{task_id}") task = task_ref.get() if not task: return jsonify({"error": "Task not found"}), 404 if user.get("role") != "admin" and task.get("createdBy") != uid: return jsonify({"error": "Access denied"}), 403 if task.get("status") not in ["open", "bidding"]: return jsonify({"error": "Task cannot be edited at this stage"}), 400 data = request.get_json() or {} allowed = {} # Added lat/lng here too for key in ["category", "title", "description", "city", "address", "budget", "scheduleAt", "lat", "lng"]: if key in data: allowed[key] = data.get(key) if not allowed: return jsonify({"error": "No valid fields provided"}), 400 allowed["updatedAt"] = now_iso() task_ref.update(allowed) return jsonify({"success": True, "updated": allowed, "task": task_ref.get()}), 200 except PermissionError as e: return jsonify({"error": str(e)}), 403 except Exception as e: logger.error(f"[UPDATE TASK] Error: {e}") return jsonify({"error": "Internal server error"}), 500 @app.route("/api/tasks//status", methods=["PUT"]) def update_task_status(task_id): uid = verify_token(request.headers.get("Authorization")) if not uid: return jsonify({"error": "Unauthorized"}), 401 try: user = db_ref.child(f"users/{uid}").get() or {} role = (user.get("role") or "customer").lower().strip() task_ref = db_ref.child(f"tasks/{task_id}") task = task_ref.get() if not task: return jsonify({"error": "Task not found"}), 404 data = request.get_json() or {} new_status = (data.get("status") or "").strip() if not new_status: return jsonify({"error": "status is required"}), 400 # Admin override if role == "admin": task_ref.update({"status": new_status, "updatedAt": now_iso()}) return jsonify({"success": True, "task": task_ref.get()}), 200 # Customer rules if role == "customer": if task.get("createdBy") != uid: return jsonify({"error": "Access denied"}), 403 if new_status == "cancelled": if task.get("status") in ["completed", "cancelled"]: return jsonify({"error": "Task already closed"}), 400 task_ref.update({"status": "cancelled", "cancelledAt": now_iso()}) if task.get("assignedTaskerId"): push_notification(task["assignedTaskerId"], "task_cancelled", "Task cancelled", "Customer cancelled the task.", {"taskId": task_id}) return jsonify({"success": True, "task": task_ref.get()}), 200 if new_status == "completed": if task.get("status") not in ["assigned", "in_progress"]: return jsonify({"error": "Task not in a completable state"}), 400 task_ref.update({"status": "completed", "completedAt": now_iso()}) if task.get("assignedTaskerId"): push_notification(task["assignedTaskerId"], "task_completed", "Task marked complete", "Customer marked the task completed.", {"taskId": task_id}) return jsonify({"success": True, "task": task_ref.get()}), 200 return jsonify({"error": "Invalid customer status update"}), 400 # Tasker rules if role == "tasker": if task.get("assignedTaskerId") != uid: return jsonify({"error": "Only assigned tasker can update status"}), 403 if new_status not in ["on_the_way", "in_progress", "completed"]: return jsonify({"error": "Invalid tasker status update"}), 400 task_ref.update({"status": new_status, "updatedAt": now_iso()}) push_notification(task["createdBy"], "task_update", "Task update", f"Task status: {new_status}", {"taskId": task_id}) return jsonify({"success": True, "task": task_ref.get()}), 200 return jsonify({"error": "Role not supported"}), 400 except Exception as e: logger.error(f"[TASK STATUS] Error: {e}") return jsonify({"error": "Internal server error"}), 500 @app.route("/api/tasks/", methods=["DELETE"]) def delete_task(task_id): """ (RESTORED) Customer can delete only if open/bidding and no assignment. """ uid = verify_token(request.headers.get("Authorization")) if not uid: return jsonify({"error": "Unauthorized"}), 401 try: user = require_role(uid, ["customer", "admin"]) role = (user.get("role") or "customer").lower().strip() task_ref = db_ref.child(f"tasks/{task_id}") task = task_ref.get() if not task: return jsonify({"error": "Task not found"}), 404 if role != "admin" and task.get("createdBy") != uid: return jsonify({"error": "Access denied"}), 403 if role != "admin" and task.get("status") not in ["open", "bidding"]: return jsonify({"error": "Task cannot be deleted at this stage"}), 400 # delete RTDB nodes task_ref.delete() db_ref.child(f"bids/{task_id}").delete() db_ref.child(f"chats/{task_id}").delete() db_ref.child(f"reviews/{task_id}").delete() # delete storage media for blob in bucket.list_blobs(prefix=f"tasks/{task_id}/"): try: blob.delete() except Exception: pass return jsonify({"success": True, "message": f"Task {task_id} deleted"}), 200 except PermissionError as e: return jsonify({"error": str(e)}), 403 except Exception as e: logger.error(f"[DELETE TASK] Error: {e}") return jsonify({"error": "Internal server error"}), 500 # ----------------------------------------------------------------------------- # 8. BIDDING # ----------------------------------------------------------------------------- @app.route("/api/tasks//bids", methods=["POST"]) def submit_bid(task_id): """ Tasker submits bid. **UPDATED**: Snapshots 'taskerVerified' status into the bid for the badge UI. """ uid = verify_token(request.headers.get("Authorization")) if not uid: return jsonify({"error": "Unauthorized"}), 401 try: user = require_role(uid, ["tasker", "admin"]) task = db_ref.child(f"tasks/{task_id}").get() if not task: return jsonify({"error": "Task not found"}), 404 if task.get("status") not in ["open", "bidding"]: return jsonify({"error": "Task not open for bids"}), 400 data = request.get_json() or {} price = str(data.get("price") or "").strip() timeline = str(data.get("timeline") or "").strip() message = str(data.get("message") or "").strip() if not price or not timeline: return jsonify({"error": "price and timeline are required"}), 400 bid_id = str(uuid.uuid4()) bid = { "bidId": bid_id, "taskId": task_id, "taskerId": uid, "taskerName": user.get("displayName"), "taskerPhoto": user.get("profilePhotoUrl"), "taskerVerified": (user.get("verificationStatus") == "verified"), # THE BADGE "price": price, "timeline": timeline, "message": message, "status": "submitted", "createdAt": now_iso() } db_ref.child(f"bids/{task_id}/{bid_id}").set(bid) if task.get("status") == "open": db_ref.child(f"tasks/{task_id}").update({"status": "bidding", "updatedAt": now_iso()}) push_notification( to_uid=task["createdBy"], notif_type="new_bid", title="New bid received", body=f"A tasker submitted a bid for {task.get('category')}", meta={"taskId": task_id, "bidId": bid_id} ) return jsonify({"success": True, "bid": bid}), 201 except PermissionError as e: return jsonify({"error": str(e)}), 403 except Exception as e: logger.error(f"[SUBMIT BID] Error: {e}") return jsonify({"error": "Internal server error"}), 500 @app.route("/api/tasks//bids", methods=["GET"]) def list_bids(task_id): uid = verify_token(request.headers.get("Authorization")) if not uid: return jsonify({"error": "Unauthorized"}), 401 try: user = db_ref.child(f"users/{uid}").get() or {} role = (user.get("role") or "customer").lower().strip() task = db_ref.child(f"tasks/{task_id}").get() if not task: return jsonify({"error": "Task not found"}), 404 bids = db_ref.child(f"bids/{task_id}").get() or {} out = list(bids.values()) if role == "admin": out.sort(key=lambda x: x.get("createdAt") or "", reverse=True) return jsonify(out), 200 if role == "customer": if task.get("createdBy") != uid: return jsonify({"error": "Access denied"}), 403 out.sort(key=lambda x: x.get("createdAt") or "", reverse=True) return jsonify(out), 200 if role == "tasker": if task.get("assignedTaskerId") == uid: out.sort(key=lambda x: x.get("createdAt") or "", reverse=True) return jsonify(out), 200 mine = [b for b in out if b.get("taskerId") == uid] mine.sort(key=lambda x: x.get("createdAt") or "", reverse=True) return jsonify(mine), 200 return jsonify({"error": "Role not supported"}), 400 except Exception as e: logger.error(f"[LIST BIDS] Error: {e}") return jsonify({"error": "Internal server error"}), 500 @app.route("/api/tasks//select-bid", methods=["PUT"]) def select_bid(task_id): uid = verify_token(request.headers.get("Authorization")) if not uid: return jsonify({"error": "Unauthorized"}), 401 try: require_role(uid, ["customer", "admin"]) task_ref = db_ref.child(f"tasks/{task_id}") task = task_ref.get() if not task: return jsonify({"error": "Task not found"}), 404 if task.get("createdBy") != uid and not (db_ref.child(f"users/{uid}").get() or {}).get("is_admin"): return jsonify({"error": "Access denied"}), 403 data = request.get_json() or {} bid_id = (data.get("bidId") or "").strip() if not bid_id: return jsonify({"error": "bidId is required"}), 400 bid = db_ref.child(f"bids/{task_id}/{bid_id}").get() if not bid: return jsonify({"error": "Bid not found"}), 404 tasker_id = bid.get("taskerId") task_ref.update({ "assignedTaskerId": tasker_id, "selectedBidId": bid_id, "status": "assigned", "updatedAt": now_iso() }) # mark bid as accepted, others as rejected bids = db_ref.child(f"bids/{task_id}").get() or {} for bkey, b in bids.items(): st = "accepted" if bkey == bid_id else "rejected" db_ref.child(f"bids/{task_id}/{bkey}").update({"status": st}) push_notification( to_uid=tasker_id, notif_type="bid_accepted", title="Bid accepted 🎉", body="Your bid was accepted. You’ve been assigned the job.", meta={"taskId": task_id, "bidId": bid_id} ) push_notification( to_uid=task["createdBy"], notif_type="task_assigned", title="Task assigned", body="You assigned the task to a tasker.", meta={"taskId": task_id, "assignedTaskerId": tasker_id} ) return jsonify({"success": True, "task": task_ref.get()}), 200 except PermissionError as e: return jsonify({"error": str(e)}), 403 except Exception as e: logger.error(f"[SELECT BID] Error: {e}") return jsonify({"error": "Internal server error"}), 500 # ----------------------------------------------------------------------------- # 9. CHAT # ----------------------------------------------------------------------------- @app.route("/api/chats//messages", methods=["GET"]) def list_messages(task_id): uid = verify_token(request.headers.get("Authorization")) if not uid: return jsonify({"error": "Unauthorized"}), 401 try: user = db_ref.child(f"users/{uid}").get() or {} role = (user.get("role") or "customer").lower().strip() task = db_ref.child(f"tasks/{task_id}").get() if not task: return jsonify({"error": "Task not found"}), 404 if not task_access_check(uid, task, role): return jsonify({"error": "Access denied"}), 403 msgs = db_ref.child(f"chats/{task_id}").get() or {} out = list(msgs.values()) out.sort(key=lambda x: x.get("createdAt") or "", reverse=False) return jsonify(out), 200 except Exception as e: logger.error(f"[LIST MSGS] Error: {e}") return jsonify({"error": "Internal server error"}), 500 @app.route("/api/chats//messages", methods=["POST"]) def send_message(task_id): """ Send message with optional file. """ uid = verify_token(request.headers.get("Authorization")) if not uid: return jsonify({"error": "Unauthorized"}), 401 try: user = db_ref.child(f"users/{uid}").get() or {} role = (user.get("role") or "customer").lower().strip() task = db_ref.child(f"tasks/{task_id}").get() if not task: return jsonify({"error": "Task not found"}), 404 if not task_access_check(uid, task, role): return jsonify({"error": "Access denied"}), 403 text = "" attachment_url = "" attachment_type = "" if request.content_type and "multipart/form-data" in request.content_type: text = (request.form.get("text") or "").strip() if "file" in request.files: f = request.files["file"] b = f.read() if b: ext = (f.mimetype or "application/octet-stream").split("/")[-1] path = f"tasks/{task_id}/chat/{uid}_{int(time.time())}.{ext}" attachment_url = upload_to_storage(b, path, f.mimetype or "application/octet-stream") attachment_type = f.mimetype or "" else: data = request.get_json() or {} text = (data.get("text") or "").strip() if not text and not attachment_url: return jsonify({"error": "Message text or file is required"}), 400 msg_id = str(uuid.uuid4()) msg = { "messageId": msg_id, "taskId": task_id, "senderId": uid, "senderRole": role, "text": text, "attachmentUrl": attachment_url, "attachmentType": attachment_type, "createdAt": now_iso() } db_ref.child(f"chats/{task_id}/{msg_id}").set(msg) other_uid = None if uid == task.get("createdBy"): other_uid = task.get("assignedTaskerId") or None else: other_uid = task.get("createdBy") if other_uid: push_notification( to_uid=other_uid, notif_type="chat_message", title="New message", body="You have a new message on a task.", meta={"taskId": task_id} ) return jsonify({"success": True, "message": msg}), 201 except Exception as e: logger.error(f"[SEND MSG] Error: {e}") return jsonify({"error": "Internal server error"}), 500 # ----------------------------------------------------------------------------- # 10. NOTIFICATIONS # ----------------------------------------------------------------------------- @app.route("/api/notifications", methods=["GET"]) def list_notifications(): uid = verify_token(request.headers.get("Authorization")) if not uid: return jsonify({"error": "Unauthorized"}), 401 try: notifs = db_ref.child(f"notifications/{uid}").get() or {} out = list(notifs.values()) out.sort(key=lambda x: x.get("createdAt") or "", reverse=True) return jsonify(out), 200 except Exception as e: logger.error(f"[LIST NOTIFS] Error: {e}") return jsonify({"error": "Internal server error"}), 500 @app.route("/api/notifications//read", methods=["PUT"]) def mark_notification_read(notif_id): """(RESTORED)""" uid = verify_token(request.headers.get("Authorization")) if not uid: return jsonify({"error": "Unauthorized"}), 401 try: ref = db_ref.child(f"notifications/{uid}/{notif_id}") n = ref.get() if not n: return jsonify({"error": "Notification not found"}), 404 ref.update({"read": True, "readAt": now_iso()}) return jsonify({"success": True}), 200 except Exception as e: logger.error(f"[READ NOTIF] Error: {e}") return jsonify({"error": "Internal server error"}), 500 # ----------------------------------------------------------------------------- # 11. REVIEWS # ----------------------------------------------------------------------------- @app.route("/api/tasks//review", methods=["POST"]) def leave_review(task_id): uid = verify_token(request.headers.get("Authorization")) if not uid: return jsonify({"error": "Unauthorized"}), 401 try: require_role(uid, ["customer", "admin"]) task = db_ref.child(f"tasks/{task_id}").get() if not task: return jsonify({"error": "Task not found"}), 404 if task.get("createdBy") != uid and not (db_ref.child(f"users/{uid}").get() or {}).get("is_admin"): return jsonify({"error": "Access denied"}), 403 if task.get("status") != "completed": return jsonify({"error": "Task must be completed before review"}), 400 assigned = task.get("assignedTaskerId") if not assigned: return jsonify({"error": "No assigned tasker to review"}), 400 data = request.get_json() or {} rating = safe_int(data.get("rating"), None) comment = (data.get("comment") or "").strip() if rating is None or rating < 1 or rating > 5: return jsonify({"error": "rating must be 1-5"}), 400 review = { "taskId": task_id, "customerId": uid, "taskerId": assigned, "rating": rating, "comment": comment, "createdAt": now_iso() } db_ref.child(f"reviews/{task_id}").set(review) push_notification(assigned, "review_received", "New review", "A customer left you a review.", {"taskId": task_id}) return jsonify({"success": True, "review": review}), 201 except PermissionError as e: return jsonify({"error": str(e)}), 403 except Exception as e: logger.error(f"[REVIEW] Error: {e}") return jsonify({"error": "Internal server error"}), 500 # ----------------------------------------------------------------------------- # 12. ADMIN # ----------------------------------------------------------------------------- @app.route("/api/admin/overview", methods=["GET"]) def admin_overview(): try: admin_uid = verify_admin(request.headers.get("Authorization")) users = db_ref.child("users").get() or {} tasks = db_ref.child("tasks").get() or {} total_users = len(users) total_taskers = sum(1 for u in users.values() if (u.get("role") or "").lower().strip() == "tasker") total_customers = sum(1 for u in users.values() if (u.get("role") or "").lower().strip() == "customer") # New: Pending verifications pending_verifications = sum(1 for u in users.values() if u.get("verificationStatus") == "pending") by_status = {} for t in tasks.values(): s = t.get("status") or "unknown" by_status[s] = by_status.get(s, 0) + 1 return jsonify({ "uid": admin_uid, "stats": { # Unified stats block "users": total_users, "customers": total_customers, "taskers": total_taskers, "tasks": len(tasks), "pendingVerifications": pending_verifications }, "dashboardStats": { # Legacy/Duplicate block just in case FE uses it "users": {"total": total_users}, "tasks": {"total": len(tasks)} } }), 200 except PermissionError as e: return jsonify({"error": str(e)}), 403 except Exception as e: logger.error(f"[ADMIN OVERVIEW] Error: {e}") return jsonify({"error": "Internal server error"}), 500 @app.route("/api/admin/users", methods=["GET"]) def admin_list_users(): try: verify_admin(request.headers.get("Authorization")) users = db_ref.child("users").get() or {} out = [{"uid": uid, **data} for uid, data in users.items()] out.sort(key=lambda x: x.get("createdAt") or "", reverse=True) return jsonify(out), 200 except PermissionError as e: return jsonify({"error": str(e)}), 403 except Exception as e: logger.error(f"[ADMIN USERS] Error: {e}") return jsonify({"error": "Internal server error"}), 500 @app.route("/api/admin/tasks", methods=["GET"]) def admin_list_tasks(): try: verify_admin(request.headers.get("Authorization")) tasks = db_ref.child("tasks").get() or {} out = list(tasks.values()) out.sort(key=lambda x: x.get("createdAt") or "", reverse=True) return jsonify(out), 200 except PermissionError as e: return jsonify({"error": str(e)}), 403 except Exception as e: logger.error(f"[ADMIN TASKS] Error: {e}") return jsonify({"error": "Internal server error"}), 500 @app.route("/api/admin/users//deactivate", methods=["PUT"]) def admin_deactivate_user(uid): """(RESTORED)""" try: verify_admin(request.headers.get("Authorization")) ref = db_ref.child(f"users/{uid}") user = ref.get() if not user: return jsonify({"error": "User not found"}), 404 ref.update({"disabled": True, "disabledAt": now_iso()}) return jsonify({"success": True}), 200 except PermissionError as e: return jsonify({"error": str(e)}), 403 except Exception as e: logger.error(f"[ADMIN DEACTIVATE] Error: {e}") return jsonify({"error": "Internal server error"}), 500 @app.route("/api/admin/task//chat", methods=["GET"]) def admin_view_chat(task_id): """(RESTORED)""" try: verify_admin(request.headers.get("Authorization")) msgs = db_ref.child(f"chats/{task_id}").get() or {} out = list(msgs.values()) out.sort(key=lambda x: x.get("createdAt") or "", reverse=False) return jsonify(out), 200 except PermissionError as e: return jsonify({"error": str(e)}), 403 except Exception as e: logger.error(f"[ADMIN VIEW CHAT] Error: {e}") return jsonify({"error": "Internal server error"}), 500 @app.route("/api/admin/users//verify", methods=["PUT"]) def admin_verify_user(target_uid): """ **NEW**: Admin approves or rejects tasker documents. Payload: { "status": "verified" | "rejected" } """ try: verify_admin(request.headers.get("Authorization")) data = request.get_json() or {} status = data.get("status") if status not in ["verified", "rejected"]: return jsonify({"error": "Invalid status"}), 400 db_ref.child(f"users/{target_uid}").update({ "verificationStatus": status, "verifiedAt": now_iso() if status == "verified" else None }) push_notification(target_uid, "verification_update", "Account Update", f"Your verification status is now: {status}") return jsonify({"success": True}), 200 except PermissionError: return jsonify({"error": "Admin required"}), 403 except Exception as e: logger.error(f"[ADMIN VERIFY] Error: {e}") return jsonify({"error": "Internal server error"}), 500 # ----------------------------------------------------------------------------- # 13. MAIN EXECUTION # ----------------------------------------------------------------------------- if __name__ == "__main__": app.run(debug=True, host="0.0.0.0", port=int(os.environ.get("PORT", 7860)))