OnePlus-API / main.py
rairo's picture
Update main.py
ffd6cda verified
raw
history blame
50.1 kB
import os
import io
import re
import json
import uuid
import time
import traceback
from datetime import datetime, timedelta
from flask import Flask, request, jsonify, Response
from flask_cors import CORS
import firebase_admin
from firebase_admin import credentials, db, storage, auth
from PIL import Image
import requests
# Google GenAI (Gemini)
from google import genai
from google.genai import types
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# -----------------------------------------------------------------------------
# 1. CONFIGURATION & INITIALIZATION
# -----------------------------------------------------------------------------
app = Flask(__name__)
CORS(app)
# --- Firebase Initialization ---
try:
credentials_json_string = os.environ.get("FIREBASE")
if not credentials_json_string:
raise ValueError("The FIREBASE environment variable is not set.")
credentials_json = json.loads(credentials_json_string)
firebase_db_url = os.environ.get("Firebase_DB")
firebase_storage_bucket = os.environ.get("Firebase_Storage")
if not firebase_db_url or not firebase_storage_bucket:
raise ValueError("Firebase_DB and Firebase_Storage environment variables must be set.")
cred = credentials.Certificate(credentials_json)
firebase_admin.initialize_app(cred, {
"databaseURL": firebase_db_url,
"storageBucket": firebase_storage_bucket
})
logger.info("Firebase Admin SDK initialized successfully.")
except Exception as e:
logger.error(f"FATAL: Error initializing Firebase: {e}")
raise
bucket = storage.bucket()
db_ref = db.reference()
# --- Google GenAI Client Initialization ---
try:
api_key = os.environ.get("Gemini")
if not api_key:
raise ValueError("The 'Gemini' environment variable is not set.")
client = genai.Client(api_key=api_key)
logger.info("Google GenAI Client initialized successfully.")
except Exception as e:
logger.error(f"FATAL: Error initializing GenAI Client: {e}")
raise
# --- Model Constants ---
VISION_MODEL = "gemini-2.5-flash" # Vision + text
TEXT_MODEL = "gemini-2.5-flash" # text-only tasks
# -----------------------------------------------------------------------------
# 2. HELPER FUNCTIONS
# -----------------------------------------------------------------------------
def now_iso() -> str:
return datetime.utcnow().isoformat() + "Z"
def verify_token(auth_header):
"""Verifies the Firebase ID token from the Authorization header."""
if not auth_header or not auth_header.startswith("Bearer "):
return None
token = auth_header.split("Bearer ")[1]
try:
decoded = auth.verify_id_token(token)
return decoded.get("uid")
except Exception as e:
logger.warning(f"Token verification failed: {e}")
return None
def verify_admin(auth_header):
"""Verifies if the user is an admin."""
uid = verify_token(auth_header)
if not uid:
raise PermissionError("Invalid or missing user token")
user = db_ref.child(f"users/{uid}").get() or {}
if not user.get("is_admin", False):
raise PermissionError("Admin access required")
return uid
def upload_to_storage(data_bytes, destination_blob_name, content_type):
"""Uploads bytes to Firebase Storage and returns its public URL."""
blob = bucket.blob(destination_blob_name)
blob.upload_from_string(data_bytes, content_type=content_type)
blob.make_public()
return blob.public_url
def safe_float(x, default=None):
try:
return float(x)
except Exception:
return default
def safe_int(x, default=None):
try:
return int(x)
except Exception:
return default
def normalize_text(s: str) -> str:
return re.sub(r"\s+", " ", str(s or "")).strip().lower()
def send_text_request(model_name, prompt, image=None):
"""
Helper: if image is provided, send [prompt, image].
If no image, send prompt only.
Returns response text or None.
"""
try:
chat = client.chats.create(model=model_name)
if image is None:
resp = chat.send_message([prompt])
else:
resp = chat.send_message([prompt, image])
text_out = ""
for part in resp.candidates[0].content.parts:
if hasattr(part, "text") and part.text:
text_out += part.text
return text_out.strip() if text_out else None
except Exception as e:
logger.error(f"Error with model {model_name}: {e}")
return None
def extract_json_from_text(text: str):
"""
Robust-ish JSON extraction:
- Try direct JSON parse
- Else find first {...} block and parse that
"""
if not text:
return None
text = text.strip()
# direct
try:
return json.loads(text)
except Exception:
pass
# find first JSON object
m = re.search(r"\{.*\}", text, re.DOTALL)
if not m:
return None
try:
return json.loads(m.group(0))
except Exception:
return None
def require_role(uid: str, allowed_roles: list[str]) -> dict:
"""
Reads user profile and checks role.
Returns user_data if ok, else raises PermissionError.
"""
user_data = db_ref.child(f"users/{uid}").get() or {}
role = (user_data.get("role") or "").lower().strip()
if role not in allowed_roles:
raise PermissionError(f"Role '{role}' not allowed.")
return user_data
def push_notification(to_uid: str, notif_type: str, title: str, body: str, meta: dict | None = None):
"""
In-app notification stored in RTDB:
/notifications/{uid}/{notifId}
"""
notif_id = str(uuid.uuid4())
payload = {
"notifId": notif_id,
"type": notif_type,
"title": title,
"body": body,
"meta": meta or {},
"createdAt": now_iso(),
"read": False
}
db_ref.child(f"notifications/{to_uid}/{notif_id}").set(payload)
return payload
def task_access_check(uid: str, task: dict, user_role: str):
"""
Role-aware access:
- customer can access own tasks
- tasker can access open tasks + tasks they are assigned to + tasks they bid on
- admin can access all
"""
if user_role == "admin":
return True
owner = task.get("createdBy")
assigned = task.get("assignedTaskerId")
if user_role == "customer":
return owner == uid
if user_role == "tasker":
if task.get("status") in ["open", "bidding"] and owner != uid:
return True
if assigned == uid:
return True
# bid check
bids = db_ref.child(f"bids/{task.get('taskId')}").get() or {}
for b in bids.values():
if b.get("taskerId") == uid:
return True
return False
return False
# -----------------------------------------------------------------------------
# 3. BASIC HEALTH
# -----------------------------------------------------------------------------
@app.route("/api/health", methods=["GET"])
def health():
return jsonify({"ok": True, "service": "oneplus-server", "time": now_iso()}), 200
# -----------------------------------------------------------------------------
# 4. AUTH & USER PROFILES (MVP)
# -----------------------------------------------------------------------------
@app.route("/api/auth/signup", methods=["POST"])
def signup():
"""
Email + password signup.
Creates Firebase Auth user + RTDB user profile.
"""
try:
data = request.get_json() or {}
email = data.get("email")
password = data.get("password")
display_name = data.get("displayName")
phone = data.get("phone")
city = data.get("city")
role = (data.get("role") or "customer").lower().strip() # customer|tasker|admin(not allowed here)
if role not in ["customer", "tasker"]:
return jsonify({"error": "Invalid role. Use customer or tasker."}), 400
if not email or not password:
return jsonify({"error": "Email and password are required"}), 400
user = auth.create_user(email=email, password=password, display_name=display_name)
user_data = {
"email": email,
"displayName": display_name,
"phone": phone,
"city": city,
"role": role,
"is_admin": False,
"createdAt": now_iso()
}
db_ref.child(f"users/{user.uid}").set(user_data)
return jsonify({"success": True, "uid": user.uid, **user_data}), 201
except Exception as e:
logger.error(f"Signup failed: {e}")
if "EMAIL_EXISTS" in str(e):
return jsonify({"error": "An account with this email already exists."}), 409
return jsonify({"error": str(e)}), 400
@app.route("/api/auth/social-signin", methods=["POST"])
def social_signin():
"""
Ensures RTDB user record exists. Social login happens on client,
we just bootstrap profile.
"""
uid = verify_token(request.headers.get("Authorization"))
if not uid:
return jsonify({"error": "Invalid or expired token"}), 401
user_ref = db_ref.child(f"users/{uid}")
user_data = user_ref.get()
try:
fb_user = auth.get_user(uid)
if user_data:
# backfill displayName if missing
if not user_data.get("displayName") and fb_user.display_name:
user_ref.update({"displayName": fb_user.display_name})
user_data = user_ref.get()
return jsonify({"uid": uid, **(user_data or {})}), 200
# create profile
new_user_data = {
"email": fb_user.email,
"displayName": fb_user.display_name,
"phone": "",
"city": "",
"role": "customer",
"is_admin": False,
"createdAt": now_iso()
}
user_ref.set(new_user_data)
return jsonify({"success": True, "uid": uid, **new_user_data}), 201
except Exception as e:
logger.error(f"social_signin failed: {e}")
return jsonify({"error": f"Failed to create user profile: {str(e)}"}), 500
@app.route("/api/user/profile", methods=["GET"])
def get_user_profile():
uid = verify_token(request.headers.get("Authorization"))
if not uid:
return jsonify({"error": "Invalid or expired token"}), 401
user_data = db_ref.child(f"users/{uid}").get()
if not user_data:
return jsonify({"error": "User not found"}), 404
return jsonify({"uid": uid, **user_data}), 200
@app.route("/api/user/profile", methods=["PUT"])
def update_user_profile():
uid = verify_token(request.headers.get("Authorization"))
if not uid:
return jsonify({"error": "Invalid or expired token"}), 401
data = request.get_json() or {}
allowed = {}
# Common fields
for key in ["displayName", "phone", "city"]:
if key in data:
allowed[key] = data.get(key)
# Role-specific (tasker)
# (customer can send them too, but we’ll store; frontend decides)
for key in ["skills", "categories", "bio", "serviceRadiusKm", "baseRate", "profilePhotoUrl", "availability"]:
if key in data:
allowed[key] = data.get(key)
if not allowed:
return jsonify({"error": "No valid fields provided"}), 400
try:
# If displayName changes, also update Auth profile
if "displayName" in allowed and allowed["displayName"]:
auth.update_user(uid, display_name=str(allowed["displayName"]))
db_ref.child(f"users/{uid}").update(allowed)
return jsonify({"success": True, "updated": allowed}), 200
except Exception as e:
logger.error(f"update_user_profile failed: {e}")
return jsonify({"error": f"Failed to update profile: {str(e)}"}), 500
# -----------------------------------------------------------------------------
# 5. AI (CUSTOMER) — SMART CAPTURE (MVP Critical)
# -----------------------------------------------------------------------------
@app.route("/api/ai/smart-capture", methods=["POST"])
def smart_capture():
"""
Customer uploads image (or video thumbnail) + optional context text.
Server sends to Gemini Vision and returns structured output for prefill:
- category
- problemSummary
- difficulty
- timeEstimate
- priceBand
- suggestedMaterials[]
- suggestedTitle
- suggestedDescription
- suggestedBudgetRange
"""
uid = verify_token(request.headers.get("Authorization"))
if not uid:
return jsonify({"error": "Unauthorized"}), 401
try:
# Accept multipart like SozoFix
if "image" not in request.files:
return jsonify({"error": "Image file is required (field name: image)"}), 400
image_file = request.files["image"]
context_text = request.form.get("contextText", "") or ""
image_bytes = image_file.read()
pil_image = Image.open(io.BytesIO(image_bytes)).convert("RGB")
prompt = f"""
You are One Plus Smart Capture (task diagnosis).
Analyze the image + optional context and return ONLY valid JSON.
Context (may be empty): "{context_text}"
Return this schema:
{{
"category": "plumbing|electrical|cleaning|moving|handyman|painting|gardening|appliance_repair|other",
"problemSummary": "plain language summary (1-2 sentences)",
"difficulty": "easy|moderate|complex",
"timeEstimate": "e.g. 30-60 minutes, 1-2 hours, 1 day",
"priceBand": "low|medium|high",
"suggestedBudgetRange": "e.g. $20-$40 (rough)",
"suggestedMaterials": ["..."],
"suggestedTitle": "short task title",
"suggestedDescription": "professional task description, include key constraints + what to check"
}}
Rules:
- Be realistic and safe.
- If unsure, pick "other" category and state uncertainty in problemSummary.
- Output must be JSON only (no markdown).
"""
raw = send_text_request(VISION_MODEL, prompt, pil_image)
result = extract_json_from_text(raw)
if not result:
logger.error(f"[SMART CAPTURE] Could not parse JSON. Raw: {raw}")
return jsonify({"error": "AI response format error"}), 500
# minimal cleanup defaults
result["category"] = (result.get("category") or "other").strip()
result["difficulty"] = (result.get("difficulty") or "moderate").strip()
result["priceBand"] = (result.get("priceBand") or "medium").strip()
if not isinstance(result.get("suggestedMaterials"), list):
result["suggestedMaterials"] = []
# Store last smart capture on user (handy for UI)
db_ref.child(f"users/{uid}/lastSmartCapture").set({
"createdAt": now_iso(),
"contextText": context_text,
"result": result
})
return jsonify({"success": True, "smartCapture": result}), 200
except Exception as e:
logger.error(f"[SMART CAPTURE] Error: {e}")
logger.error(traceback.format_exc())
return jsonify({"error": "Internal server error"}), 500
@app.route("/api/ai/improve-description", methods=["POST"])
def improve_description():
"""
MVP nice-to-have:
User provides a short/vague description; AI rewrites professionally and adds questions.
"""
uid = verify_token(request.headers.get("Authorization"))
if not uid:
return jsonify({"error": "Unauthorized"}), 401
data = request.get_json() or {}
short_desc = (data.get("text") or "").strip()
category = (data.get("category") or "other").strip()
if len(short_desc) < 3:
return jsonify({"error": "text is required"}), 400
prompt = f"""
Rewrite this task description professionally for a services marketplace.
Category: {category}
User text:
"{short_desc}"
Return ONLY JSON:
{{
"suggestedTitle": "...",
"suggestedDescription": "...",
"questionsForTasker": ["...", "...", "..."]
}}
JSON only, no markdown.
"""
raw = send_text_request(TEXT_MODEL, prompt, None)
result = extract_json_from_text(raw)
if not result:
return jsonify({"error": "AI response format error"}), 500
if not isinstance(result.get("questionsForTasker"), list):
result["questionsForTasker"] = []
return jsonify({"success": True, "result": result}), 200
# -----------------------------------------------------------------------------
# 6. TASKS (CUSTOMER POSTS, TASKER BROWSES) + MEDIA UPLOAD (MVP)
# -----------------------------------------------------------------------------
@app.route("/api/tasks", methods=["POST"])
def create_task():
"""
Customer creates a task.
Upload media like SozoFix:
- multipart/form-data
- fields: category, title, description, city, address(optional), budget, scheduleAt(optional ISO), contextText(optional)
- file fields: media (can send multiple) OR image (single)
"""
uid = verify_token(request.headers.get("Authorization"))
if not uid:
return jsonify({"error": "Unauthorized"}), 401
try:
user = require_role(uid, ["customer", "admin"]) # customers create tasks (admin can for testing)
# multipart
category = request.form.get("category", "").strip()
title = request.form.get("title", "").strip()
description = request.form.get("description", "").strip()
city = request.form.get("city", "").strip()
address = request.form.get("address", "").strip()
budget = request.form.get("budget", "").strip()
schedule_at = request.form.get("scheduleAt", "").strip() # ISO string from UI
smart_capture_json = request.form.get("smartCapture", "").strip() # optional JSON string from UI
if not category or not city or not description:
return jsonify({"error": "category, city, and description are required"}), 400
task_id = str(uuid.uuid4())
created_at = now_iso()
# Upload media (media[] or image)
media_urls = []
files = []
if "media" in request.files:
files = request.files.getlist("media")
elif "image" in request.files:
files = [request.files["image"]]
for i, f in enumerate(files):
data_bytes = f.read()
if not data_bytes:
continue
ext = (f.mimetype or "application/octet-stream").split("/")[-1]
path = f"tasks/{task_id}/media/{i+1}_{int(time.time())}.{ext}"
url = upload_to_storage(data_bytes, path, f.mimetype or "application/octet-stream")
media_urls.append(url)
# optional smartCapture object
smart_capture = None
if smart_capture_json:
try:
smart_capture = json.loads(smart_capture_json)
except Exception:
smart_capture = None
task_payload = {
"taskId": task_id,
"createdBy": uid,
"createdByName": user.get("displayName") or "",
"createdAt": created_at,
"category": category,
"title": title or (smart_capture or {}).get("suggestedTitle") or "Task Request",
"description": description or (smart_capture or {}).get("suggestedDescription") or "",
"city": city,
"address": address,
"budget": budget, # keep as string to avoid currency assumptions
"scheduleAt": schedule_at, # ISO string
"mediaUrls": media_urls,
"smartCapture": smart_capture or {},
"status": "open",
"assignedTaskerId": "",
"selectedBidId": "",
"completedAt": "",
"cancelledAt": ""
}
db_ref.child(f"tasks/{task_id}").set(task_payload)
# Notify taskers (basic broadcast by category + city)
notify_taskers_for_new_task(task_payload)
return jsonify({"success": True, "task": task_payload}), 201
except PermissionError as e:
return jsonify({"error": str(e)}), 403
except Exception as e:
logger.error(f"[CREATE TASK] Error: {e}")
logger.error(traceback.format_exc())
return jsonify({"error": "Internal server error"}), 500
def notify_taskers_for_new_task(task: dict):
"""
MVP matching:
- loop through users with role=tasker
- match category overlap + city match (or empty city)
- push in-app notification
"""
try:
users = db_ref.child("users").get() or {}
tcat = normalize_text(task.get("category"))
tcity = normalize_text(task.get("city"))
for tasker_id, u in users.items():
if (u.get("role") or "").lower().strip() != "tasker":
continue
# City match: if tasker has city set, match it; else allow.
ucity = normalize_text(u.get("city"))
if ucity and tcity and ucity != tcity:
continue
# Category match: if tasker categories list exists, try overlap; else allow.
cats = u.get("categories") or []
if isinstance(cats, str):
cats = [c.strip() for c in cats.split(",") if c.strip()]
if cats:
ok = any(normalize_text(c) == tcat for c in cats)
if not ok:
continue
push_notification(
to_uid=tasker_id,
notif_type="new_task",
title="New task in your area",
body=f"{task.get('category')}{task.get('city')}",
meta={"taskId": task.get("taskId")}
)
except Exception as e:
logger.warning(f"[NOTIFY TASKERS] Failed: {e}")
@app.route("/api/tasks", methods=["GET"])
def list_tasks():
"""
Role-aware list:
- customer: list my tasks
- tasker: list open/bidding tasks (with filters)
- admin: list all tasks (optional filters)
Query params:
status, category, city, mine=true
"""
uid = verify_token(request.headers.get("Authorization"))
if not uid:
return jsonify({"error": "Unauthorized"}), 401
try:
user = db_ref.child(f"users/{uid}").get() or {}
role = (user.get("role") or "customer").lower().strip()
status_f = (request.args.get("status") or "").strip()
category_f = (request.args.get("category") or "").strip()
city_f = (request.args.get("city") or "").strip()
mine = (request.args.get("mine") or "").lower().strip() == "true"
tasks = db_ref.child("tasks").get() or {}
out = []
for t in tasks.values():
if not t:
continue
# role gating
if role == "customer":
if t.get("createdBy") != uid:
continue
elif role == "tasker":
# default: show open/bidding, or mine jobs if mine=true
if mine:
if t.get("assignedTaskerId") != uid:
continue
else:
if t.get("status") not in ["open", "bidding"]:
continue
# admin sees all
# filters
if status_f and t.get("status") != status_f:
continue
if category_f and normalize_text(t.get("category")) != normalize_text(category_f):
continue
if city_f and normalize_text(t.get("city")) != normalize_text(city_f):
continue
out.append(t)
# sort newest first
out.sort(key=lambda x: x.get("createdAt") or "", reverse=True)
return jsonify(out), 200
except Exception as e:
logger.error(f"[LIST TASKS] Error: {e}")
return jsonify({"error": "Internal server error"}), 500
@app.route("/api/tasks/<string:task_id>", methods=["GET"])
def get_task(task_id):
uid = verify_token(request.headers.get("Authorization"))
if not uid:
return jsonify({"error": "Unauthorized"}), 401
try:
user = db_ref.child(f"users/{uid}").get() or {}
role = (user.get("role") or "customer").lower().strip()
task = db_ref.child(f"tasks/{task_id}").get()
if not task:
return jsonify({"error": "Task not found"}), 404
if not task_access_check(uid, task, role):
return jsonify({"error": "Access denied"}), 403
# attach bids count (cheap)
bids = db_ref.child(f"bids/{task_id}").get() or {}
task["bidsCount"] = len(bids)
return jsonify(task), 200
except Exception as e:
logger.error(f"[GET TASK] Error: {e}")
return jsonify({"error": "Internal server error"}), 500
@app.route("/api/tasks/<string:task_id>", methods=["PUT"])
def update_task(task_id):
"""
Customer can edit task only if not assigned/in_progress/completed.
"""
uid = verify_token(request.headers.get("Authorization"))
if not uid:
return jsonify({"error": "Unauthorized"}), 401
try:
user = require_role(uid, ["customer", "admin"])
task_ref = db_ref.child(f"tasks/{task_id}")
task = task_ref.get()
if not task:
return jsonify({"error": "Task not found"}), 404
if user.get("role") != "admin" and task.get("createdBy") != uid:
return jsonify({"error": "Access denied"}), 403
if task.get("status") not in ["open", "bidding"]:
return jsonify({"error": "Task cannot be edited at this stage"}), 400
data = request.get_json() or {}
allowed = {}
for key in ["category", "title", "description", "city", "address", "budget", "scheduleAt"]:
if key in data:
allowed[key] = data.get(key)
if not allowed:
return jsonify({"error": "No valid fields provided"}), 400
allowed["updatedAt"] = now_iso()
task_ref.update(allowed)
return jsonify({"success": True, "updated": allowed, "task": task_ref.get()}), 200
except PermissionError as e:
return jsonify({"error": str(e)}), 403
except Exception as e:
logger.error(f"[UPDATE TASK] Error: {e}")
return jsonify({"error": "Internal server error"}), 500
@app.route("/api/tasks/<string:task_id>/status", methods=["PUT"])
def update_task_status(task_id):
"""
Status transitions (MVP):
customer: cancel, mark_completed
tasker: on_the_way, in_progress, mark_completed (if assigned)
admin: any
"""
uid = verify_token(request.headers.get("Authorization"))
if not uid:
return jsonify({"error": "Unauthorized"}), 401
try:
user = db_ref.child(f"users/{uid}").get() or {}
role = (user.get("role") or "customer").lower().strip()
task_ref = db_ref.child(f"tasks/{task_id}")
task = task_ref.get()
if not task:
return jsonify({"error": "Task not found"}), 404
data = request.get_json() or {}
new_status = (data.get("status") or "").strip()
if not new_status:
return jsonify({"error": "status is required"}), 400
# Admin override
if role == "admin":
task_ref.update({"status": new_status, "updatedAt": now_iso()})
return jsonify({"success": True, "task": task_ref.get()}), 200
# Customer rules
if role == "customer":
if task.get("createdBy") != uid:
return jsonify({"error": "Access denied"}), 403
if new_status == "cancelled":
if task.get("status") in ["completed", "cancelled"]:
return jsonify({"error": "Task already closed"}), 400
task_ref.update({"status": "cancelled", "cancelledAt": now_iso()})
# notify assigned tasker (if any)
if task.get("assignedTaskerId"):
push_notification(task["assignedTaskerId"], "task_cancelled", "Task cancelled", "Customer cancelled the task.", {"taskId": task_id})
return jsonify({"success": True, "task": task_ref.get()}), 200
if new_status == "completed":
# allow completion only if was assigned/in_progress
if task.get("status") not in ["assigned", "in_progress"]:
return jsonify({"error": "Task not in a completable state"}), 400
task_ref.update({"status": "completed", "completedAt": now_iso()})
if task.get("assignedTaskerId"):
push_notification(task["assignedTaskerId"], "task_completed", "Task marked complete", "Customer marked the task completed.", {"taskId": task_id})
return jsonify({"success": True, "task": task_ref.get()}), 200
return jsonify({"error": "Invalid customer status update"}), 400
# Tasker rules
if role == "tasker":
if task.get("assignedTaskerId") != uid:
return jsonify({"error": "Only assigned tasker can update status"}), 403
if new_status not in ["on_the_way", "in_progress", "completed"]:
return jsonify({"error": "Invalid tasker status update"}), 400
# map on_the_way as in_progress-ish, but keep it if you want
task_ref.update({"status": new_status, "updatedAt": now_iso()})
# notify customer
push_notification(task["createdBy"], "task_update", "Task update", f"Task status: {new_status}", {"taskId": task_id})
return jsonify({"success": True, "task": task_ref.get()}), 200
return jsonify({"error": "Role not supported"}), 400
except Exception as e:
logger.error(f"[TASK STATUS] Error: {e}")
return jsonify({"error": "Internal server error"}), 500
@app.route("/api/tasks/<string:task_id>", methods=["DELETE"])
def delete_task(task_id):
"""
Customer can delete only if open/bidding and no assignment.
Also removes task media folder.
"""
uid = verify_token(request.headers.get("Authorization"))
if not uid:
return jsonify({"error": "Unauthorized"}), 401
try:
user = require_role(uid, ["customer", "admin"])
role = (user.get("role") or "customer").lower().strip()
task_ref = db_ref.child(f"tasks/{task_id}")
task = task_ref.get()
if not task:
return jsonify({"error": "Task not found"}), 404
if role != "admin" and task.get("createdBy") != uid:
return jsonify({"error": "Access denied"}), 403
if role != "admin" and task.get("status") not in ["open", "bidding"]:
return jsonify({"error": "Task cannot be deleted at this stage"}), 400
# delete RTDB nodes
task_ref.delete()
db_ref.child(f"bids/{task_id}").delete()
db_ref.child(f"chats/{task_id}").delete()
db_ref.child(f"reviews/{task_id}").delete()
# delete storage media
for blob in bucket.list_blobs(prefix=f"tasks/{task_id}/"):
try:
blob.delete()
except Exception:
pass
return jsonify({"success": True, "message": f"Task {task_id} deleted"}), 200
except PermissionError as e:
return jsonify({"error": str(e)}), 403
except Exception as e:
logger.error(f"[DELETE TASK] Error: {e}")
return jsonify({"error": "Internal server error"}), 500
# -----------------------------------------------------------------------------
# 7. BIDDING (TASKERS SUBMIT OFFERS) (MVP)
# -----------------------------------------------------------------------------
@app.route("/api/tasks/<string:task_id>/bids", methods=["POST"])
def submit_bid(task_id):
"""
Tasker submits bid: price + timeline + message.
Stored under /bids/{taskId}/{bidId}
"""
uid = verify_token(request.headers.get("Authorization"))
if not uid:
return jsonify({"error": "Unauthorized"}), 401
try:
require_role(uid, ["tasker", "admin"])
task = db_ref.child(f"tasks/{task_id}").get()
if not task:
return jsonify({"error": "Task not found"}), 404
if task.get("status") not in ["open", "bidding"]:
return jsonify({"error": "Task not open for bids"}), 400
data = request.get_json() or {}
price = (data.get("price") or "").strip()
timeline = (data.get("timeline") or "").strip()
message = (data.get("message") or "").strip()
if not price or not timeline:
return jsonify({"error": "price and timeline are required"}), 400
bid_id = str(uuid.uuid4())
bid = {
"bidId": bid_id,
"taskId": task_id,
"taskerId": uid,
"price": price,
"timeline": timeline,
"message": message,
"status": "submitted",
"createdAt": now_iso()
}
db_ref.child(f"bids/{task_id}/{bid_id}").set(bid)
# flip task to bidding
if task.get("status") == "open":
db_ref.child(f"tasks/{task_id}").update({"status": "bidding", "updatedAt": now_iso()})
# notify customer
push_notification(
to_uid=task["createdBy"],
notif_type="new_bid",
title="New bid received",
body=f"A tasker submitted a bid for {task.get('category')}",
meta={"taskId": task_id, "bidId": bid_id}
)
return jsonify({"success": True, "bid": bid}), 201
except PermissionError as e:
return jsonify({"error": str(e)}), 403
except Exception as e:
logger.error(f"[SUBMIT BID] Error: {e}")
return jsonify({"error": "Internal server error"}), 500
@app.route("/api/tasks/<string:task_id>/bids", methods=["GET"])
def list_bids(task_id):
"""
Customer: can see bids for own task
Tasker: can see bids if they bid
Admin: all
"""
uid = verify_token(request.headers.get("Authorization"))
if not uid:
return jsonify({"error": "Unauthorized"}), 401
try:
user = db_ref.child(f"users/{uid}").get() or {}
role = (user.get("role") or "customer").lower().strip()
task = db_ref.child(f"tasks/{task_id}").get()
if not task:
return jsonify({"error": "Task not found"}), 404
bids = db_ref.child(f"bids/{task_id}").get() or {}
out = list(bids.values())
if role == "admin":
out.sort(key=lambda x: x.get("createdAt") or "", reverse=True)
return jsonify(out), 200
if role == "customer":
if task.get("createdBy") != uid:
return jsonify({"error": "Access denied"}), 403
out.sort(key=lambda x: x.get("createdAt") or "", reverse=True)
return jsonify(out), 200
if role == "tasker":
# only show their own bids unless task assigned to them
if task.get("assignedTaskerId") == uid:
out.sort(key=lambda x: x.get("createdAt") or "", reverse=True)
return jsonify(out), 200
mine = [b for b in out if b.get("taskerId") == uid]
mine.sort(key=lambda x: x.get("createdAt") or "", reverse=True)
return jsonify(mine), 200
return jsonify({"error": "Role not supported"}), 400
except Exception as e:
logger.error(f"[LIST BIDS] Error: {e}")
return jsonify({"error": "Internal server error"}), 500
@app.route("/api/tasks/<string:task_id>/select-bid", methods=["PUT"])
def select_bid(task_id):
"""
Customer selects a bid:
- set task.assignedTaskerId
- set task.selectedBidId
- set task.status = assigned
- notify tasker + customer
"""
uid = verify_token(request.headers.get("Authorization"))
if not uid:
return jsonify({"error": "Unauthorized"}), 401
try:
require_role(uid, ["customer", "admin"])
task_ref = db_ref.child(f"tasks/{task_id}")
task = task_ref.get()
if not task:
return jsonify({"error": "Task not found"}), 404
if task.get("createdBy") != uid and not (db_ref.child(f"users/{uid}").get() or {}).get("is_admin"):
return jsonify({"error": "Access denied"}), 403
data = request.get_json() or {}
bid_id = (data.get("bidId") or "").strip()
if not bid_id:
return jsonify({"error": "bidId is required"}), 400
bid = db_ref.child(f"bids/{task_id}/{bid_id}").get()
if not bid:
return jsonify({"error": "Bid not found"}), 404
tasker_id = bid.get("taskerId")
task_ref.update({
"assignedTaskerId": tasker_id,
"selectedBidId": bid_id,
"status": "assigned",
"updatedAt": now_iso()
})
# mark bid as accepted, others as rejected (MVP)
bids = db_ref.child(f"bids/{task_id}").get() or {}
for bkey, b in bids.items():
st = "accepted" if bkey == bid_id else "rejected"
db_ref.child(f"bids/{task_id}/{bkey}").update({"status": st})
# notify tasker
push_notification(
to_uid=tasker_id,
notif_type="bid_accepted",
title="Bid accepted 🎉",
body="Your bid was accepted. You’ve been assigned the job.",
meta={"taskId": task_id, "bidId": bid_id}
)
# notify customer
push_notification(
to_uid=task["createdBy"],
notif_type="task_assigned",
title="Task assigned",
body="You assigned the task to a tasker. You can now chat.",
meta={"taskId": task_id, "assignedTaskerId": tasker_id}
)
return jsonify({"success": True, "task": task_ref.get()}), 200
except PermissionError as e:
return jsonify({"error": str(e)}), 403
except Exception as e:
logger.error(f"[SELECT BID] Error: {e}")
return jsonify({"error": "Internal server error"}), 500
# -----------------------------------------------------------------------------
# 8. CHAT (REAL-TIME DB STORED) (MVP)
# -----------------------------------------------------------------------------
@app.route("/api/chats/<string:task_id>/messages", methods=["GET"])
def list_messages(task_id):
uid = verify_token(request.headers.get("Authorization"))
if not uid:
return jsonify({"error": "Unauthorized"}), 401
try:
user = db_ref.child(f"users/{uid}").get() or {}
role = (user.get("role") or "customer").lower().strip()
task = db_ref.child(f"tasks/{task_id}").get()
if not task:
return jsonify({"error": "Task not found"}), 404
if not task_access_check(uid, task, role):
return jsonify({"error": "Access denied"}), 403
msgs = db_ref.child(f"chats/{task_id}").get() or {}
out = list(msgs.values())
out.sort(key=lambda x: x.get("createdAt") or "", reverse=False)
return jsonify(out), 200
except Exception as e:
logger.error(f"[LIST MSGS] Error: {e}")
return jsonify({"error": "Internal server error"}), 500
@app.route("/api/chats/<string:task_id>/messages", methods=["POST"])
def send_message(task_id):
"""
Send message. Optional attachment upload (single file) via multipart:
- text in form field "text"
- file in field "file"
Or JSON body: {"text": "..."} for text-only
"""
uid = verify_token(request.headers.get("Authorization"))
if not uid:
return jsonify({"error": "Unauthorized"}), 401
try:
user = db_ref.child(f"users/{uid}").get() or {}
role = (user.get("role") or "customer").lower().strip()
task = db_ref.child(f"tasks/{task_id}").get()
if not task:
return jsonify({"error": "Task not found"}), 404
if not task_access_check(uid, task, role):
return jsonify({"error": "Access denied"}), 403
text = ""
attachment_url = ""
attachment_type = ""
if request.content_type and "multipart/form-data" in request.content_type:
text = (request.form.get("text") or "").strip()
if "file" in request.files:
f = request.files["file"]
b = f.read()
if b:
ext = (f.mimetype or "application/octet-stream").split("/")[-1]
path = f"tasks/{task_id}/chat/{uid}_{int(time.time())}.{ext}"
attachment_url = upload_to_storage(b, path, f.mimetype or "application/octet-stream")
attachment_type = f.mimetype or ""
else:
data = request.get_json() or {}
text = (data.get("text") or "").strip()
if not text and not attachment_url:
return jsonify({"error": "Message text or file is required"}), 400
msg_id = str(uuid.uuid4())
msg = {
"messageId": msg_id,
"taskId": task_id,
"senderId": uid,
"senderRole": role,
"text": text,
"attachmentUrl": attachment_url,
"attachmentType": attachment_type,
"createdAt": now_iso()
}
db_ref.child(f"chats/{task_id}/{msg_id}").set(msg)
# notify the other party
other_uid = None
if uid == task.get("createdBy"):
other_uid = task.get("assignedTaskerId") or None
else:
other_uid = task.get("createdBy")
if other_uid:
push_notification(
to_uid=other_uid,
notif_type="chat_message",
title="New message",
body="You have a new message on a task.",
meta={"taskId": task_id}
)
return jsonify({"success": True, "message": msg}), 201
except Exception as e:
logger.error(f"[SEND MSG] Error: {e}")
return jsonify({"error": "Internal server error"}), 500
# -----------------------------------------------------------------------------
# 9. NOTIFICATIONS (IN-APP) (MVP)
# -----------------------------------------------------------------------------
@app.route("/api/notifications", methods=["GET"])
def list_notifications():
uid = verify_token(request.headers.get("Authorization"))
if not uid:
return jsonify({"error": "Unauthorized"}), 401
try:
notifs = db_ref.child(f"notifications/{uid}").get() or {}
out = list(notifs.values())
out.sort(key=lambda x: x.get("createdAt") or "", reverse=True)
return jsonify(out), 200
except Exception as e:
logger.error(f"[LIST NOTIFS] Error: {e}")
return jsonify({"error": "Internal server error"}), 500
@app.route("/api/notifications/<string:notif_id>/read", methods=["PUT"])
def mark_notification_read(notif_id):
uid = verify_token(request.headers.get("Authorization"))
if not uid:
return jsonify({"error": "Unauthorized"}), 401
try:
ref = db_ref.child(f"notifications/{uid}/{notif_id}")
n = ref.get()
if not n:
return jsonify({"error": "Notification not found"}), 404
ref.update({"read": True, "readAt": now_iso()})
return jsonify({"success": True}), 200
except Exception as e:
logger.error(f"[READ NOTIF] Error: {e}")
return jsonify({"error": "Internal server error"}), 500
# -----------------------------------------------------------------------------
# 10. REVIEWS (CUSTOMER RATES TASKER AFTER COMPLETION) (MVP)
# -----------------------------------------------------------------------------
@app.route("/api/tasks/<string:task_id>/review", methods=["POST"])
def leave_review(task_id):
uid = verify_token(request.headers.get("Authorization"))
if not uid:
return jsonify({"error": "Unauthorized"}), 401
try:
require_role(uid, ["customer", "admin"])
task = db_ref.child(f"tasks/{task_id}").get()
if not task:
return jsonify({"error": "Task not found"}), 404
if task.get("createdBy") != uid and not (db_ref.child(f"users/{uid}").get() or {}).get("is_admin"):
return jsonify({"error": "Access denied"}), 403
if task.get("status") != "completed":
return jsonify({"error": "Task must be completed before review"}), 400
assigned = task.get("assignedTaskerId")
if not assigned:
return jsonify({"error": "No assigned tasker to review"}), 400
data = request.get_json() or {}
rating = safe_int(data.get("rating"), None)
comment = (data.get("comment") or "").strip()
if rating is None or rating < 1 or rating > 5:
return jsonify({"error": "rating must be 1-5"}), 400
review = {
"taskId": task_id,
"customerId": uid,
"taskerId": assigned,
"rating": rating,
"comment": comment,
"createdAt": now_iso()
}
db_ref.child(f"reviews/{task_id}").set(review)
push_notification(assigned, "review_received", "New review", "A customer left you a review.", {"taskId": task_id})
return jsonify({"success": True, "review": review}), 201
except PermissionError as e:
return jsonify({"error": str(e)}), 403
except Exception as e:
logger.error(f"[REVIEW] Error: {e}")
return jsonify({"error": "Internal server error"}), 500
# -----------------------------------------------------------------------------
# 11. ADMIN (MVP OVERVIEW + MANAGEMENT)
# -----------------------------------------------------------------------------
@app.route("/api/admin/overview", methods=["GET"])
def admin_overview():
try:
admin_uid = verify_admin(request.headers.get("Authorization"))
users = db_ref.child("users").get() or {}
tasks = db_ref.child("tasks").get() or {}
# lightweight counts only
total_users = len(users)
total_taskers = sum(1 for u in users.values() if (u.get("role") or "").lower().strip() == "tasker")
total_customers = sum(1 for u in users.values() if (u.get("role") or "").lower().strip() == "customer")
by_status = {}
for t in tasks.values():
s = t.get("status") or "unknown"
by_status[s] = by_status.get(s, 0) + 1
# bids count (shallow)
bids_root = db_ref.child("bids").get() or {}
total_bids = 0
for task_bids in bids_root.values():
if isinstance(task_bids, dict):
total_bids += len(task_bids)
return jsonify({
"uid": admin_uid,
"dashboardStats": {
"users": {
"total": total_users,
"customers": total_customers,
"taskers": total_taskers
},
"tasks": {
"total": len(tasks),
"byStatus": by_status
},
"bids": {
"total": total_bids
}
}
}), 200
except PermissionError as e:
return jsonify({"error": str(e)}), 403
except Exception as e:
logger.error(f"[ADMIN OVERVIEW] Error: {e}")
return jsonify({"error": "Internal server error"}), 500
@app.route("/api/admin/users", methods=["GET"])
def admin_list_users():
try:
verify_admin(request.headers.get("Authorization"))
users = db_ref.child("users").get() or {}
out = [{"uid": uid, **data} for uid, data in users.items()]
out.sort(key=lambda x: x.get("createdAt") or "", reverse=True)
return jsonify(out), 200
except PermissionError as e:
return jsonify({"error": str(e)}), 403
except Exception as e:
logger.error(f"[ADMIN USERS] Error: {e}")
return jsonify({"error": "Internal server error"}), 500
@app.route("/api/admin/tasks", methods=["GET"])
def admin_list_tasks():
try:
verify_admin(request.headers.get("Authorization"))
tasks = db_ref.child("tasks").get() or {}
out = list(tasks.values())
out.sort(key=lambda x: x.get("createdAt") or "", reverse=True)
return jsonify(out), 200
except PermissionError as e:
return jsonify({"error": str(e)}), 403
except Exception as e:
logger.error(f"[ADMIN TASKS] Error: {e}")
return jsonify({"error": "Internal server error"}), 500
@app.route("/api/admin/users/<string:uid>/deactivate", methods=["PUT"])
def admin_deactivate_user(uid):
"""
MVP deactivate:
- sets users/{uid}/disabled=true
(Client should enforce; Auth disable is optional for later)
"""
try:
verify_admin(request.headers.get("Authorization"))
ref = db_ref.child(f"users/{uid}")
user = ref.get()
if not user:
return jsonify({"error": "User not found"}), 404
ref.update({"disabled": True, "disabledAt": now_iso()})
return jsonify({"success": True}), 200
except PermissionError as e:
return jsonify({"error": str(e)}), 403
except Exception as e:
logger.error(f"[ADMIN DEACTIVATE] Error: {e}")
return jsonify({"error": "Internal server error"}), 500
@app.route("/api/admin/task/<string:task_id>/chat", methods=["GET"])
def admin_view_chat(task_id):
try:
verify_admin(request.headers.get("Authorization"))
msgs = db_ref.child(f"chats/{task_id}").get() or {}
out = list(msgs.values())
out.sort(key=lambda x: x.get("createdAt") or "", reverse=False)
return jsonify(out), 200
except PermissionError as e:
return jsonify({"error": str(e)}), 403
except Exception as e:
logger.error(f"[ADMIN VIEW CHAT] Error: {e}")
return jsonify({"error": "Internal server error"}), 500
# -----------------------------------------------------------------------------
# 12. MAIN EXECUTION (HF Spaces)
# -----------------------------------------------------------------------------
if __name__ == "__main__":
app.run(debug=True, host="0.0.0.0", port=int(os.environ.get("PORT", 7860)))