# server_gemini_seg.py import os import io import json import base64 import logging import uuid import time import difflib from typing import List, Dict, Any, Tuple, Optional from flask import Flask, request, jsonify from flask_cors import CORS from PIL import Image, ImageOps import numpy as np import cv2 # genai client from google import genai from google.genai import types # Firebase Admin (in-memory JSON init) try: import firebase_admin from firebase_admin import credentials as fb_credentials, storage as fb_storage FIREBASE_ADMIN_AVAILABLE = True except Exception: firebase_admin = None fb_credentials = None fb_storage = None FIREBASE_ADMIN_AVAILABLE = False logging.basicConfig(level=logging.INFO) log = logging.getLogger("wardrobe-server") GEMINI_API_KEY = os.getenv("GEMINI_API_KEY", "") if not GEMINI_API_KEY: log.warning("GEMINI_API_KEY not set — gemini calls will fail (but fallback still works).") client = genai.Client(api_key=GEMINI_API_KEY) if GEMINI_API_KEY else None # Firebase config (read service account JSON from env) FIREBASE_ADMIN_JSON = os.getenv("FIREBASE_ADMIN_JSON", "").strip() FIREBASE_STORAGE_BUCKET = os.getenv("FIREBASE_STORAGE_BUCKET", "").strip() # optional override if FIREBASE_ADMIN_JSON and not FIREBASE_ADMIN_AVAILABLE: log.warning("FIREBASE_ADMIN_JSON provided but firebase-admin SDK is not installed. Install firebase-admin.") app = Flask(__name__) CORS(app) # ---------- Categories mapping (map model 'type' to frontend categories) ---------- # NOTE: If frontend has a definitive categories array, replace this list with that array. # We use difflib.get_close_matches to pick the closest category from CATEGORIES. CATEGORIES = [ "top", "shirt", "blouse", "tshirt", "sweater", "jacket", "coat", "dress", "skirt", "pants", "trousers", "shorts", "jeans", "shoe", "heels", "sneaker", "boot", "sandals", "bag", "belt", "hat", "accessory", "others", ] def map_type_to_category(item_type: str) -> str: """Map a model-produced type string to the closest category from CATEGORIES. Falls back to 'unknown' if no reasonable match is found. """ if not item_type: return "others" t = item_type.strip().lower() # direct hit if t in CATEGORIES: return t # try splitting or common plural handling t_clean = t.rstrip("s") if t_clean in CATEGORIES: return t_clean # fuzzy match matches = difflib.get_close_matches(t, CATEGORIES, n=1, cutoff=0.6) if matches: return matches[0] # attempt to match by token intersection for token in t.replace("_", " ").split(): if token in CATEGORIES: return token return "others" # ---------- Firebase init helpers ---------- _firebase_app = None def init_firebase_admin_if_needed(): global _firebase_app if _firebase_app is not None: return _firebase_app if not FIREBASE_ADMIN_JSON: log.info("No FIREBASE_ADMIN_JSON env var set; skipping Firebase admin init.") return None if not FIREBASE_ADMIN_AVAILABLE: raise RuntimeError("firebase-admin not installed (pip install firebase-admin)") try: sa_obj = json.loads(FIREBASE_ADMIN_JSON) except Exception as e: log.exception("Failed parsing FIREBASE_ADMIN_JSON: %s", e) raise bucket_name = FIREBASE_STORAGE_BUCKET or (sa_obj.get("project_id") and f"{sa_obj.get('project_id')}.appspot.com") if not bucket_name: raise RuntimeError( "Could not determine storage bucket. Set FIREBASE_STORAGE_BUCKET or include project_id in service account JSON." ) try: cred = fb_credentials.Certificate(sa_obj) _firebase_app = firebase_admin.initialize_app(cred, {"storageBucket": bucket_name}) log.info("Initialized firebase admin with bucket: %s", bucket_name) return _firebase_app except Exception as e: log.exception("Failed to initialize firebase admin: %s", e) raise def upload_b64_to_firebase(base64_str: str, path: str, content_type="image/jpeg", metadata: dict = None) -> str: """Upload base64 string to Firebase Storage at `path`. Optionally attach metadata dict (custom metadata). Returns a public URL when possible, otherwise returns gs:///. """ if not FIREBASE_ADMIN_JSON: raise RuntimeError("FIREBASE_ADMIN_JSON not set") init_firebase_admin_if_needed() if not FIREBASE_ADMIN_AVAILABLE: raise RuntimeError("firebase-admin not available") raw = base64_str if raw.startswith("data:"): raw = raw.split(",", 1)[1] raw = raw.replace("\n", "").replace("\r", "") data = base64.b64decode(raw) try: bucket = fb_storage.bucket() blob = bucket.blob(path) blob.upload_from_string(data, content_type=content_type) if metadata: try: blob.metadata = {k: (json.dumps(v) if not isinstance(v, str) else v) for k, v in metadata.items()} blob.patch() except Exception as me: log.warning("Failed to patch metadata for %s: %s", path, me) try: blob.make_public() return blob.public_url except Exception as e: log.warning("Could not make blob public: %s", e) return f"gs://{bucket.name}/{path}" except Exception as e: log.exception("Firebase upload error for path %s: %s", path, e) raise # ---------- Image helpers (with EXIF transpose) ---------- def read_image_bytes(file_storage) -> Tuple[np.ndarray, int, int, bytes]: """Read bytes, apply EXIF orientation, return BGR numpy, width, height and raw bytes.""" data = file_storage.read() img = Image.open(io.BytesIO(data)) try: img = ImageOps.exif_transpose(img) except Exception: pass img = img.convert("RGB") w, h = img.size arr = np.array(img)[:, :, ::-1] # RGB -> BGR return arr, w, h, data def crop_and_b64(bgr_img: np.ndarray, x: int, y: int, w: int, h: int, max_side=512) -> str: h_img, w_img = bgr_img.shape[:2] x = max(0, int(x)) y = max(0, int(y)) x2 = min(w_img, int(x + w)) y2 = min(h_img, int(y + h)) crop = bgr_img[y:y2, x:x2] if crop.size == 0: return "" max_dim = max(crop.shape[0], crop.shape[1]) if max_dim > max_side: scale = max_side / max_dim crop = cv2.resize(crop, (int(crop.shape[1] * scale), int(crop.shape[0] * scale)), interpolation=cv2.INTER_AREA) _, jpeg = cv2.imencode(".jpg", crop, [int(cv2.IMWRITE_JPEG_QUALITY), 82]) return base64.b64encode(jpeg.tobytes()).decode("ascii") def fallback_contour_crops(bgr_img, max_items=8) -> List[Dict[str, Any]]: gray = cv2.cvtColor(bgr_img, cv2.COLOR_BGR2GRAY) blur = cv2.GaussianBlur(gray, (7, 7), 0) thresh = cv2.adaptiveThreshold(blur, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C, cv2.THRESH_BINARY_INV, 15, 6) kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (9, 9)) closed = cv2.morphologyEx(thresh, cv2.MORPH_CLOSE, kernel) contours, _ = cv2.findContours(closed, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) h_img, w_img = bgr_img.shape[:2] min_area = (w_img * h_img) * 0.005 items = [] for cnt in sorted(contours, key=cv2.contourArea, reverse=True): if len(items) >= max_items: break area = cv2.contourArea(cnt) if area < min_area: continue x, y, w, h = cv2.boundingRect(cnt) pad_x, pad_y = int(w * 0.07), int(h * 0.07) x = max(0, x - pad_x) y = max(0, y - pad_y) w = min(w_img - x, w + pad_x * 2) h = min(h_img - y, h + pad_y * 2) b64 = crop_and_b64(bgr_img, x, y, w, h) if not b64: continue items.append( { "id": str(uuid.uuid4()), "label": "unknown", "confidence": min(0.95, max(0.25, area / (w_img * h_img))), "bbox": {"x": x, "y": y, "w": w, "h": h}, "thumbnail_b64": b64, "source": "fallback", } ) if not items: h_half, w_half = h_img // 2, w_img // 2 rects = [(0, 0, w_half, h_half), (w_half, 0, w_half, h_half), (0, h_half, w_half, h_half), (w_half, h_half, w_half, h_half)] for r in rects: b64 = crop_and_b64(bgr_img, r[0], r[1], r[2], r[3]) if b64: items.append( { "id": str(uuid.uuid4()), "label": "unknown", "confidence": 0.3, "bbox": {"x": r[0], "y": r[1], "w": r[2], "h": r[3]}, "thumbnail_b64": b64, "source": "fallback-grid", } ) return items # ---------- AI analysis helper ---------- def analyze_crop_with_gemini(jpeg_b64: str) -> Dict[str, Any]: """Run Gemini on the cropped image bytes to extract: type (one-word category like 'shoe', 'jacket', 'dress'), summary (single-line description), brand (string or empty), tags (array of short descriptors) Returns dict, falls back to empty/defaults on error or missing key. """ if not client: return {"type": "unknown", "summary": "", "brand": "", "tags": []} try: # prepare prompt prompt = ( "You are an assistant that identifies clothing item characteristics from an image. " "Return only a JSON object with keys: type (single word like 'shoe','top','jacket'), " # "summary (a single short sentence, one line), brand (brand name if visible else empty string), " "summary (a very detailed sentence, with details like if its collar or round-neck, explain it in good detail), brand (brand name if visible else empty string), " "tags (an array of short single-word tags describing visible attributes, e.g. ['striped','leather','white']). " # "Keep values short and concise." "Keep values short and concise except in summary which requires expressiveness." ) contents = [types.Content(role="user", parts=[types.Part.from_text(text=prompt)])] # attach the image bytes image_bytes = base64.b64decode(jpeg_b64) contents.append(types.Content(role="user", parts=[types.Part.from_bytes(data=image_bytes, mime_type="image/jpeg")])) schema = { "type": "object", "properties": { "type": {"type": "string"}, "summary": {"type": "string"}, "brand": {"type": "string"}, "tags": {"type": "array", "items": {"type": "string"}}, }, "required": ["type", "summary"], } cfg = types.GenerateContentConfig(response_mime_type="application/json", response_schema=schema) # call model (use the same model family you used before) resp = client.models.generate_content(model="gemini-2.5-flash-lite", contents=contents, config=cfg) text = resp.text or "" parsed = {} try: parsed = json.loads(text) # coerce expected shapes parsed["type"] = str(parsed.get("type", "")).strip() parsed["summary"] = str(parsed.get("summary", "")).strip() parsed["brand"] = str(parsed.get("brand", "")).strip() tags = parsed.get("tags", []) if not isinstance(tags, list): tags = [] parsed["tags"] = [str(t).strip() for t in tags if str(t).strip()] except Exception as e: log.warning("Failed parsing Gemini analysis JSON: %s — raw: %s", e, (text[:300] if text else "")) parsed = {"type": "unknown", "summary": "", "brand": "", "tags": []} return { "type": parsed.get("type", "unknown") or "unknown", "summary": parsed.get("summary", "") or "", "brand": parsed.get("brand", "") or "", "tags": parsed.get("tags", []) or [], } except Exception as e: log.exception("analyze_crop_with_gemini failure: %s", e) return {"type": "unknown", "summary": "", "brand": "", "tags": []} # ---------- Main / processing ---------- @app.route("/", methods=["POST", "GET"]) def index_route(): return jsonify({"ok": True}), 200 @app.route("/process", methods=["POST"]) def process_image(): if "photo" not in request.files: return jsonify({"error": "missing photo"}), 400 file = request.files["photo"] uid = (request.form.get("uid") or request.args.get("uid") or "anon").strip() or "anon" try: bgr_img, img_w, img_h, raw_bytes = read_image_bytes(file) except Exception as e: log.error("invalid image: %s", e) return jsonify({"error": "invalid image"}), 400 session_id = str(uuid.uuid4()) # Detection prompt (same as before) user_prompt = ( "You are an assistant that extracts clothing detections from a single image. " "Return a JSON object with a single key 'items' which is an array. Each item must have: " "label (string, short like 'top','skirt','sneakers'), " "bbox with normalized coordinates between 0 and 1: {x, y, w, h} where x,y are top-left relative to width/height, " "confidence (0-1). Example output: {\"items\":[{\"label\":\"top\",\"bbox\":{\"x\":0.1,\"y\":0.2,\"w\":0.3,\"h\":0.4},\"confidence\":0.95}]} " "Output ONLY valid JSON. If you cannot detect any clothing confidently, return {\"items\":[]}." ) try: contents = [types.Content(role="user", parts=[types.Part.from_text(text=user_prompt)])] contents.append(types.Content(role="user", parts=[types.Part.from_bytes(data=raw_bytes, mime_type="image/jpeg")])) schema = { "type": "object", "properties": { "items": { "type": "array", "items": { "type": "object", "properties": { "label": {"type": "string"}, "bbox": { "type": "object", "properties": { "x": {"type": "number"}, "y": {"type": "number"}, "w": {"type": "number"}, "h": {"type": "number"}, }, "required": ["x", "y", "w", "h"], }, "confidence": {"type": "number"}, }, "required": ["label", "bbox", "confidence"], }, } }, "required": ["items"], } cfg = types.GenerateContentConfig(response_mime_type="application/json", response_schema=schema) log.info("Calling Gemini model for detection (gemini-2.5-flash-lite)...") model_resp = client.models.generate_content(model="gemini-2.5-flash-lite", contents=contents, config=cfg) if client else None raw_text = (model_resp.text or "") if model_resp else "" log.info("Gemini raw response length: %d", len(raw_text)) parsed = None try: parsed = json.loads(raw_text) if raw_text else None except Exception as e: log.warning("Could not parse Gemini JSON: %s", e) parsed = None items_out: List[Dict[str, Any]] = [] if parsed and isinstance(parsed.get("items"), list) and len(parsed["items"]) > 0: for it in parsed["items"]: try: label = str(it.get("label", "unknown"))[:48] bbox = it.get("bbox", {}) nx = float(bbox.get("x", 0)) ny = float(bbox.get("y", 0)) nw = float(bbox.get("w", 0)) nh = float(bbox.get("h", 0)) nx = max(0.0, min(1.0, nx)) ny = max(0.0, min(1.0, ny)) nw = max(0.0, min(1.0, nw)) nh = max(0.0, min(1.0, nh)) px = int(nx * img_w) py = int(ny * img_h) pw = int(nw * img_w) ph = int(nh * img_h) if pw <= 8 or ph <= 8: continue b64 = crop_and_b64(bgr_img, px, py, pw, ph) if not b64: continue item_obj = { "id": str(uuid.uuid4()), "label": label, "confidence": float(it.get("confidence", 0.5)), "bbox": {"x": px, "y": py, "w": pw, "h": ph}, "thumbnail_b64": b64, "source": "gemini", } # Add placeholder analysis/title; will be filled later if analysis runs item_obj["analysis"] = {"type": "unknown", "summary": "", "brand": "", "tags": []} item_obj["title"] = "unknown" items_out.append(item_obj) except Exception as e: log.warning("skipping item due to error: %s", e) else: log.info("Gemini returned no items or parse failed — using fallback contour crops.") items_out = fallback_contour_crops(bgr_img, max_items=8) # ensure analysis/title placeholders for itm in items_out: itm.setdefault("analysis", {"type": "unknown", "summary": "", "brand": "", "tags": []}) itm.setdefault("title", "unknown") # Perform AI analysis per crop (if possible) and auto-upload to firebase with metadata (tmp + session) if FIREBASE_ADMIN_JSON and FIREBASE_ADMIN_AVAILABLE: try: init_firebase_admin_if_needed() bucket = fb_storage.bucket() except Exception as e: log.exception("Firebase admin init for upload failed: %s", e) bucket = None safe_uid = "".join(ch for ch in uid if ch.isalnum() or ch in ("-", "_")) or "anon" for itm in items_out: b64 = itm.get("thumbnail_b64") if not b64: continue # analyze try: analysis = analyze_crop_with_gemini(b64) if client else {"type": "unknown", "summary": "", "brand": "", "tags": []} except Exception as ae: log.warning("analysis failed: %s", ae) analysis = {"type": "unknown", "summary": "", "brand": "", "tags": []} # attach analysis and map to frontend category/title itm["analysis"] = analysis mapped_title = map_type_to_category(analysis.get("type", "") or itm.get("label", "")) itm["title"] = mapped_title item_id = itm.get("id") or str(uuid.uuid4()) path = f"detected/{safe_uid}/{item_id}.jpg" try: metadata = { "tmp": "true", "session_id": session_id, "uploaded_by": safe_uid, "uploaded_at": str(int(time.time())), # store AI fields as JSON strings for later inspection "ai_type": analysis.get("type", ""), "ai_brand": analysis.get("brand", ""), "ai_summary": analysis.get("summary", ""), "ai_tags": json.dumps(analysis.get("tags", [])), } url = upload_b64_to_firebase(b64, path, content_type="image/jpeg", metadata=metadata) itm["thumbnail_url"] = url itm["thumbnail_path"] = path itm.pop("thumbnail_b64", None) itm["_session_id"] = session_id log.debug("Auto-uploaded thumbnail for %s -> %s (session=%s)", item_id, url, session_id) except Exception as up_e: log.warning("Auto-upload failed for %s: %s", item_id, up_e) # keep thumbnail_b64 and analysis for client fallback else: if not FIREBASE_ADMIN_JSON: log.info("FIREBASE_ADMIN_JSON not set; skipping server-side thumbnail upload.") else: log.info("Firebase admin SDK not available; skipping server-side thumbnail upload.") # For items without firebase upload, still attempt local analysis mapping for itm in items_out: if "analysis" not in itm or not itm["analysis"]: # attempt lightweight analysis mapping using label itm.setdefault("analysis", {"type": itm.get("label", "unknown"), "summary": "", "brand": "", "tags": []}) mapped_title = map_type_to_category(itm["analysis"].get("type", "") or itm.get("label", "")) itm["title"] = mapped_title return jsonify({"ok": True, "items": items_out, "session_id": session_id, "debug": {"raw_model_text": (raw_text or "")[:1600]}}), 200 except Exception as ex: log.exception("Processing error: %s", ex) try: items_out = fallback_contour_crops(bgr_img, max_items=8) for itm in items_out: itm.setdefault("analysis", {"type": "unknown", "summary": "", "brand": "", "tags": []}) itm["title"] = map_type_to_category(itm["analysis"].get("type", "") or itm.get("label", "")) return jsonify({"ok": True, "items": items_out, "session_id": session_id, "debug": {"error": str(ex)}}), 200 except Exception as e2: log.exception("Fallback also failed: %s", e2) return jsonify({"error": "internal failure", "detail": str(e2)}), 500 # ---------- Finalize endpoint: keep selected and delete only session's temp files ---------- @app.route("/finalize_detections", methods=["POST"]) def finalize_detections(): """ Body JSON: { "uid": "user123", "keep_ids": ["id1","id2",...], "session_id": "" } Server will delete only detected//* files whose: - metadata.tmp == "true" - metadata.session_id == session_id - item_id NOT in keep_ids Returns: { ok: True, kept: [...], deleted: [...], errors: [...] } """ try: body = request.get_json(force=True) except Exception: return jsonify({"error": "invalid json"}), 400 uid = (body.get("uid") or request.args.get("uid") or "anon").strip() or "anon" keep_ids = set(body.get("keep_ids") or []) session_id = (body.get("session_id") or request.args.get("session_id") or "").strip() if not session_id: return jsonify({"error": "session_id required for finalize to avoid unsafe deletes"}), 400 if not FIREBASE_ADMIN_JSON or not FIREBASE_ADMIN_AVAILABLE: return jsonify({"error": "firebase admin not configured"}), 500 try: init_firebase_admin_if_needed() bucket = fb_storage.bucket() except Exception as e: log.exception("Firebase init error in finalize: %s", e) return jsonify({"error": "firebase admin init failed", "detail": str(e)}), 500 safe_uid = "".join(ch for ch in uid if ch.isalnum() or ch in ("-", "_")) or "anon" prefix = f"detected/{safe_uid}/" kept = [] deleted = [] errors = [] try: blobs = list(bucket.list_blobs(prefix=prefix)) for blob in blobs: try: name = blob.name fname = name.split("/")[-1] if "." not in fname: continue item_id = fname.rsplit(".", 1)[0] md = blob.metadata or {} # only consider temporary files matching this session id if str(md.get("session_id", "")) != session_id or str(md.get("tmp", "")).lower() not in ("true", "1", "yes"): continue if item_id in keep_ids: # ensure public URL available if possible try: blob.make_public() url = blob.public_url except Exception: url = f"gs://{bucket.name}/{name}" # extract AI metadata (if present) ai_type = md.get("ai_type") or "" ai_brand = md.get("ai_brand") or "" ai_summary = md.get("ai_summary") or "" ai_tags_raw = md.get("ai_tags") or "[]" try: ai_tags = json.loads(ai_tags_raw) if isinstance(ai_tags_raw, str) else ai_tags_raw except Exception: ai_tags = [] kept.append( { "id": item_id, "thumbnail_url": url, "thumbnail_path": name, "analysis": {"type": ai_type, "brand": ai_brand, "summary": ai_summary, "tags": ai_tags}, } ) else: try: blob.delete() deleted.append(item_id) except Exception as de: errors.append({"id": item_id, "error": str(de)}) except Exception as e: errors.append({"blob": getattr(blob, "name", None), "error": str(e)}) return jsonify({"ok": True, "kept": kept, "deleted": deleted, "errors": errors}), 200 except Exception as e: log.exception("finalize_detections error: %s", e) return jsonify({"error": "internal", "detail": str(e)}), 500 # ---------- Clear session: delete all temporary files for a session ---------- @app.route("/clear_session", methods=["POST"]) def clear_session(): """ Body JSON: { "session_id": "", "uid": "" } Deletes all detected//* blobs where metadata.session_id == session_id and metadata.tmp == "true". """ try: body = request.get_json(force=True) except Exception: return jsonify({"error": "invalid json"}), 400 session_id = (body.get("session_id") or request.args.get("session_id") or "").strip() uid = (body.get("uid") or request.args.get("uid") or "anon").strip() or "anon" if not session_id: return jsonify({"error": "session_id required"}), 400 if not FIREBASE_ADMIN_JSON or not FIREBASE_ADMIN_AVAILABLE: return jsonify({"error": "firebase admin not configured"}), 500 try: init_firebase_admin_if_needed() bucket = fb_storage.bucket() except Exception as e: log.exception("Firebase init error in clear_session: %s", e) return jsonify({"error": "firebase admin init failed", "detail": str(e)}), 500 safe_uid = "".join(ch for ch in uid if ch.isalnum() or ch in ("-", "_")) or "anon" prefix = f"detected/{safe_uid}/" deleted = [] errors = [] try: blobs = list(bucket.list_blobs(prefix=prefix)) for blob in blobs: try: md = blob.metadata or {} if str(md.get("session_id", "")) == session_id and str(md.get("tmp", "")).lower() in ("true", "1", "yes"): try: blob.delete() deleted.append(blob.name.split("/")[-1].rsplit(".", 1)[0]) except Exception as de: errors.append({"blob": blob.name, "error": str(de)}) except Exception as e: errors.append({"blob": getattr(blob, "name", None), "error": str(e)}) return jsonify({"ok": True, "deleted": deleted, "errors": errors}), 200 except Exception as e: log.exception("clear_session error: %s", e) return jsonify({"error": "internal", "detail": str(e)}), 500 if __name__ == "__main__": port = int(os.getenv("PORT", 7860)) log.info("Starting server on 0.0.0.0:%d", port) app.run(host="0.0.0.0", port=port, debug=True)