Spaces:
Running
Running
| # server_gemini_seg.py | |
| import os | |
| import io | |
| import json | |
| import base64 | |
| import logging | |
| import uuid | |
| import time | |
| import difflib | |
| from typing import List, Dict, Any, Tuple, Optional | |
| from flask import Flask, request, jsonify | |
| from flask_cors import CORS | |
| from PIL import Image, ImageOps | |
| import numpy as np | |
| import cv2 | |
| # genai client | |
| from google import genai | |
| from google.genai import types | |
| # Firebase Admin (in-memory JSON init) | |
| try: | |
| import firebase_admin | |
| from firebase_admin import credentials as fb_credentials, storage as fb_storage | |
| FIREBASE_ADMIN_AVAILABLE = True | |
| except Exception: | |
| firebase_admin = None | |
| fb_credentials = None | |
| fb_storage = None | |
| FIREBASE_ADMIN_AVAILABLE = False | |
| logging.basicConfig(level=logging.INFO) | |
| log = logging.getLogger("wardrobe-server") | |
| GEMINI_API_KEY = os.getenv("GEMINI_API_KEY", "") | |
| if not GEMINI_API_KEY: | |
| log.warning("GEMINI_API_KEY not set — gemini calls will fail (but fallback still works).") | |
| client = genai.Client(api_key=GEMINI_API_KEY) if GEMINI_API_KEY else None | |
| # Firebase config (read service account JSON from env) | |
| FIREBASE_ADMIN_JSON = os.getenv("FIREBASE_ADMIN_JSON", "").strip() | |
| FIREBASE_STORAGE_BUCKET = os.getenv("FIREBASE_STORAGE_BUCKET", "").strip() # optional override | |
| if FIREBASE_ADMIN_JSON and not FIREBASE_ADMIN_AVAILABLE: | |
| log.warning("FIREBASE_ADMIN_JSON provided but firebase-admin SDK is not installed. Install firebase-admin.") | |
| app = Flask(__name__) | |
| CORS(app) | |
| # ---------- Categories mapping (map model 'type' to frontend categories) ---------- | |
| # NOTE: If frontend has a definitive categories array, replace this list with that array. | |
| # We use difflib.get_close_matches to pick the closest category from CATEGORIES. | |
| CATEGORIES = [ | |
| "top", | |
| "shirt", | |
| "blouse", | |
| "tshirt", | |
| "sweater", | |
| "jacket", | |
| "coat", | |
| "dress", | |
| "skirt", | |
| "pants", | |
| "trousers", | |
| "shorts", | |
| "jeans", | |
| "shoe", | |
| "heels", | |
| "sneaker", | |
| "boot", | |
| "sandals", | |
| "bag", | |
| "belt", | |
| "hat", | |
| "accessory", | |
| "others", | |
| ] | |
| def map_type_to_category(item_type: str) -> str: | |
| """Map a model-produced type string to the closest category from CATEGORIES. | |
| Falls back to 'unknown' if no reasonable match is found. | |
| """ | |
| if not item_type: | |
| return "others" | |
| t = item_type.strip().lower() | |
| # direct hit | |
| if t in CATEGORIES: | |
| return t | |
| # try splitting or common plural handling | |
| t_clean = t.rstrip("s") | |
| if t_clean in CATEGORIES: | |
| return t_clean | |
| # fuzzy match | |
| matches = difflib.get_close_matches(t, CATEGORIES, n=1, cutoff=0.6) | |
| if matches: | |
| return matches[0] | |
| # attempt to match by token intersection | |
| for token in t.replace("_", " ").split(): | |
| if token in CATEGORIES: | |
| return token | |
| return "others" | |
| # ---------- Firebase init helpers ---------- | |
| _firebase_app = None | |
| def init_firebase_admin_if_needed(): | |
| global _firebase_app | |
| if _firebase_app is not None: | |
| return _firebase_app | |
| if not FIREBASE_ADMIN_JSON: | |
| log.info("No FIREBASE_ADMIN_JSON env var set; skipping Firebase admin init.") | |
| return None | |
| if not FIREBASE_ADMIN_AVAILABLE: | |
| raise RuntimeError("firebase-admin not installed (pip install firebase-admin)") | |
| try: | |
| sa_obj = json.loads(FIREBASE_ADMIN_JSON) | |
| except Exception as e: | |
| log.exception("Failed parsing FIREBASE_ADMIN_JSON: %s", e) | |
| raise | |
| bucket_name = FIREBASE_STORAGE_BUCKET or (sa_obj.get("project_id") and f"{sa_obj.get('project_id')}.appspot.com") | |
| if not bucket_name: | |
| raise RuntimeError( | |
| "Could not determine storage bucket. Set FIREBASE_STORAGE_BUCKET or include project_id in service account JSON." | |
| ) | |
| try: | |
| cred = fb_credentials.Certificate(sa_obj) | |
| _firebase_app = firebase_admin.initialize_app(cred, {"storageBucket": bucket_name}) | |
| log.info("Initialized firebase admin with bucket: %s", bucket_name) | |
| return _firebase_app | |
| except Exception as e: | |
| log.exception("Failed to initialize firebase admin: %s", e) | |
| raise | |
| def upload_b64_to_firebase(base64_str: str, path: str, content_type="image/jpeg", metadata: dict = None) -> str: | |
| """Upload base64 string to Firebase Storage at `path`. Optionally attach metadata dict (custom metadata). | |
| Returns a public URL when possible, otherwise returns gs:///. | |
| """ | |
| if not FIREBASE_ADMIN_JSON: | |
| raise RuntimeError("FIREBASE_ADMIN_JSON not set") | |
| init_firebase_admin_if_needed() | |
| if not FIREBASE_ADMIN_AVAILABLE: | |
| raise RuntimeError("firebase-admin not available") | |
| raw = base64_str | |
| if raw.startswith("data:"): | |
| raw = raw.split(",", 1)[1] | |
| raw = raw.replace("\n", "").replace("\r", "") | |
| data = base64.b64decode(raw) | |
| try: | |
| bucket = fb_storage.bucket() | |
| blob = bucket.blob(path) | |
| blob.upload_from_string(data, content_type=content_type) | |
| if metadata: | |
| try: | |
| blob.metadata = {k: (json.dumps(v) if not isinstance(v, str) else v) for k, v in metadata.items()} | |
| blob.patch() | |
| except Exception as me: | |
| log.warning("Failed to patch metadata for %s: %s", path, me) | |
| try: | |
| blob.make_public() | |
| return blob.public_url | |
| except Exception as e: | |
| log.warning("Could not make blob public: %s", e) | |
| return f"gs://{bucket.name}/{path}" | |
| except Exception as e: | |
| log.exception("Firebase upload error for path %s: %s", path, e) | |
| raise | |
| # ---------- Image helpers (with EXIF transpose) ---------- | |
| def read_image_bytes(file_storage) -> Tuple[np.ndarray, int, int, bytes]: | |
| """Read bytes, apply EXIF orientation, return BGR numpy, width, height and raw bytes.""" | |
| data = file_storage.read() | |
| img = Image.open(io.BytesIO(data)) | |
| try: | |
| img = ImageOps.exif_transpose(img) | |
| except Exception: | |
| pass | |
| img = img.convert("RGB") | |
| w, h = img.size | |
| arr = np.array(img)[:, :, ::-1] # RGB -> BGR | |
| return arr, w, h, data | |
| def crop_and_b64(bgr_img: np.ndarray, x: int, y: int, w: int, h: int, max_side=512) -> str: | |
| h_img, w_img = bgr_img.shape[:2] | |
| x = max(0, int(x)) | |
| y = max(0, int(y)) | |
| x2 = min(w_img, int(x + w)) | |
| y2 = min(h_img, int(y + h)) | |
| crop = bgr_img[y:y2, x:x2] | |
| if crop.size == 0: | |
| return "" | |
| max_dim = max(crop.shape[0], crop.shape[1]) | |
| if max_dim > max_side: | |
| scale = max_side / max_dim | |
| crop = cv2.resize(crop, (int(crop.shape[1] * scale), int(crop.shape[0] * scale)), interpolation=cv2.INTER_AREA) | |
| _, jpeg = cv2.imencode(".jpg", crop, [int(cv2.IMWRITE_JPEG_QUALITY), 82]) | |
| return base64.b64encode(jpeg.tobytes()).decode("ascii") | |
| def fallback_contour_crops(bgr_img, max_items=8) -> List[Dict[str, Any]]: | |
| gray = cv2.cvtColor(bgr_img, cv2.COLOR_BGR2GRAY) | |
| blur = cv2.GaussianBlur(gray, (7, 7), 0) | |
| thresh = cv2.adaptiveThreshold(blur, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C, cv2.THRESH_BINARY_INV, 15, 6) | |
| kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (9, 9)) | |
| closed = cv2.morphologyEx(thresh, cv2.MORPH_CLOSE, kernel) | |
| contours, _ = cv2.findContours(closed, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) | |
| h_img, w_img = bgr_img.shape[:2] | |
| min_area = (w_img * h_img) * 0.005 | |
| items = [] | |
| for cnt in sorted(contours, key=cv2.contourArea, reverse=True): | |
| if len(items) >= max_items: | |
| break | |
| area = cv2.contourArea(cnt) | |
| if area < min_area: | |
| continue | |
| x, y, w, h = cv2.boundingRect(cnt) | |
| pad_x, pad_y = int(w * 0.07), int(h * 0.07) | |
| x = max(0, x - pad_x) | |
| y = max(0, y - pad_y) | |
| w = min(w_img - x, w + pad_x * 2) | |
| h = min(h_img - y, h + pad_y * 2) | |
| b64 = crop_and_b64(bgr_img, x, y, w, h) | |
| if not b64: | |
| continue | |
| items.append( | |
| { | |
| "id": str(uuid.uuid4()), | |
| "label": "unknown", | |
| "confidence": min(0.95, max(0.25, area / (w_img * h_img))), | |
| "bbox": {"x": x, "y": y, "w": w, "h": h}, | |
| "thumbnail_b64": b64, | |
| "source": "fallback", | |
| } | |
| ) | |
| if not items: | |
| h_half, w_half = h_img // 2, w_img // 2 | |
| rects = [(0, 0, w_half, h_half), (w_half, 0, w_half, h_half), (0, h_half, w_half, h_half), (w_half, h_half, w_half, h_half)] | |
| for r in rects: | |
| b64 = crop_and_b64(bgr_img, r[0], r[1], r[2], r[3]) | |
| if b64: | |
| items.append( | |
| { | |
| "id": str(uuid.uuid4()), | |
| "label": "unknown", | |
| "confidence": 0.3, | |
| "bbox": {"x": r[0], "y": r[1], "w": r[2], "h": r[3]}, | |
| "thumbnail_b64": b64, | |
| "source": "fallback-grid", | |
| } | |
| ) | |
| return items | |
| # ---------- AI analysis helper ---------- | |
| def analyze_crop_with_gemini(jpeg_b64: str) -> Dict[str, Any]: | |
| """Run Gemini on the cropped image bytes to extract: | |
| type (one-word category like 'shoe', 'jacket', 'dress'), | |
| summary (single-line description), brand (string or empty), tags (array of short descriptors) | |
| Returns dict, falls back to empty/defaults on error or missing key. | |
| """ | |
| if not client: | |
| return {"type": "unknown", "summary": "", "brand": "", "tags": []} | |
| try: | |
| # prepare prompt | |
| prompt = ( | |
| "You are an assistant that identifies clothing item characteristics from an image. " | |
| "Return only a JSON object with keys: type (single word like 'shoe','top','jacket'), " | |
| # "summary (a single short sentence, one line), brand (brand name if visible else empty string), " | |
| "summary (a very detailed sentence, with details like if its collar or round-neck, explain it in good detail), brand (brand name if visible else empty string), " | |
| "tags (an array of short single-word tags describing visible attributes, e.g. ['striped','leather','white']). " | |
| # "Keep values short and concise." | |
| "Keep values short and concise except in summary which requires expressiveness." | |
| ) | |
| contents = [types.Content(role="user", parts=[types.Part.from_text(text=prompt)])] | |
| # attach the image bytes | |
| image_bytes = base64.b64decode(jpeg_b64) | |
| contents.append(types.Content(role="user", parts=[types.Part.from_bytes(data=image_bytes, mime_type="image/jpeg")])) | |
| schema = { | |
| "type": "object", | |
| "properties": { | |
| "type": {"type": "string"}, | |
| "summary": {"type": "string"}, | |
| "brand": {"type": "string"}, | |
| "tags": {"type": "array", "items": {"type": "string"}}, | |
| }, | |
| "required": ["type", "summary"], | |
| } | |
| cfg = types.GenerateContentConfig(response_mime_type="application/json", response_schema=schema) | |
| # call model (use the same model family you used before) | |
| resp = client.models.generate_content(model="gemini-2.5-flash-lite", contents=contents, config=cfg) | |
| text = resp.text or "" | |
| parsed = {} | |
| try: | |
| parsed = json.loads(text) | |
| # coerce expected shapes | |
| parsed["type"] = str(parsed.get("type", "")).strip() | |
| parsed["summary"] = str(parsed.get("summary", "")).strip() | |
| parsed["brand"] = str(parsed.get("brand", "")).strip() | |
| tags = parsed.get("tags", []) | |
| if not isinstance(tags, list): | |
| tags = [] | |
| parsed["tags"] = [str(t).strip() for t in tags if str(t).strip()] | |
| except Exception as e: | |
| log.warning("Failed parsing Gemini analysis JSON: %s — raw: %s", e, (text[:300] if text else "")) | |
| parsed = {"type": "unknown", "summary": "", "brand": "", "tags": []} | |
| return { | |
| "type": parsed.get("type", "unknown") or "unknown", | |
| "summary": parsed.get("summary", "") or "", | |
| "brand": parsed.get("brand", "") or "", | |
| "tags": parsed.get("tags", []) or [], | |
| } | |
| except Exception as e: | |
| log.exception("analyze_crop_with_gemini failure: %s", e) | |
| return {"type": "unknown", "summary": "", "brand": "", "tags": []} | |
| # ---------- Main / processing ---------- | |
| def index_route(): | |
| return jsonify({"ok": True}), 200 | |
| def process_image(): | |
| if "photo" not in request.files: | |
| return jsonify({"error": "missing photo"}), 400 | |
| file = request.files["photo"] | |
| uid = (request.form.get("uid") or request.args.get("uid") or "anon").strip() or "anon" | |
| try: | |
| bgr_img, img_w, img_h, raw_bytes = read_image_bytes(file) | |
| except Exception as e: | |
| log.error("invalid image: %s", e) | |
| return jsonify({"error": "invalid image"}), 400 | |
| session_id = str(uuid.uuid4()) | |
| # Detection prompt (same as before) | |
| user_prompt = ( | |
| "You are an assistant that extracts clothing detections from a single image. " | |
| "Return a JSON object with a single key 'items' which is an array. Each item must have: " | |
| "label (string, short like 'top','skirt','sneakers'), " | |
| "bbox with normalized coordinates between 0 and 1: {x, y, w, h} where x,y are top-left relative to width/height, " | |
| "confidence (0-1). Example output: {\"items\":[{\"label\":\"top\",\"bbox\":{\"x\":0.1,\"y\":0.2,\"w\":0.3,\"h\":0.4},\"confidence\":0.95}]} " | |
| "Output ONLY valid JSON. If you cannot detect any clothing confidently, return {\"items\":[]}." | |
| ) | |
| try: | |
| contents = [types.Content(role="user", parts=[types.Part.from_text(text=user_prompt)])] | |
| contents.append(types.Content(role="user", parts=[types.Part.from_bytes(data=raw_bytes, mime_type="image/jpeg")])) | |
| schema = { | |
| "type": "object", | |
| "properties": { | |
| "items": { | |
| "type": "array", | |
| "items": { | |
| "type": "object", | |
| "properties": { | |
| "label": {"type": "string"}, | |
| "bbox": { | |
| "type": "object", | |
| "properties": { | |
| "x": {"type": "number"}, | |
| "y": {"type": "number"}, | |
| "w": {"type": "number"}, | |
| "h": {"type": "number"}, | |
| }, | |
| "required": ["x", "y", "w", "h"], | |
| }, | |
| "confidence": {"type": "number"}, | |
| }, | |
| "required": ["label", "bbox", "confidence"], | |
| }, | |
| } | |
| }, | |
| "required": ["items"], | |
| } | |
| cfg = types.GenerateContentConfig(response_mime_type="application/json", response_schema=schema) | |
| log.info("Calling Gemini model for detection (gemini-2.5-flash-lite)...") | |
| model_resp = client.models.generate_content(model="gemini-2.5-flash-lite", contents=contents, config=cfg) if client else None | |
| raw_text = (model_resp.text or "") if model_resp else "" | |
| log.info("Gemini raw response length: %d", len(raw_text)) | |
| parsed = None | |
| try: | |
| parsed = json.loads(raw_text) if raw_text else None | |
| except Exception as e: | |
| log.warning("Could not parse Gemini JSON: %s", e) | |
| parsed = None | |
| items_out: List[Dict[str, Any]] = [] | |
| if parsed and isinstance(parsed.get("items"), list) and len(parsed["items"]) > 0: | |
| for it in parsed["items"]: | |
| try: | |
| label = str(it.get("label", "unknown"))[:48] | |
| bbox = it.get("bbox", {}) | |
| nx = float(bbox.get("x", 0)) | |
| ny = float(bbox.get("y", 0)) | |
| nw = float(bbox.get("w", 0)) | |
| nh = float(bbox.get("h", 0)) | |
| nx = max(0.0, min(1.0, nx)) | |
| ny = max(0.0, min(1.0, ny)) | |
| nw = max(0.0, min(1.0, nw)) | |
| nh = max(0.0, min(1.0, nh)) | |
| px = int(nx * img_w) | |
| py = int(ny * img_h) | |
| pw = int(nw * img_w) | |
| ph = int(nh * img_h) | |
| if pw <= 8 or ph <= 8: | |
| continue | |
| b64 = crop_and_b64(bgr_img, px, py, pw, ph) | |
| if not b64: | |
| continue | |
| item_obj = { | |
| "id": str(uuid.uuid4()), | |
| "label": label, | |
| "confidence": float(it.get("confidence", 0.5)), | |
| "bbox": {"x": px, "y": py, "w": pw, "h": ph}, | |
| "thumbnail_b64": b64, | |
| "source": "gemini", | |
| } | |
| # Add placeholder analysis/title; will be filled later if analysis runs | |
| item_obj["analysis"] = {"type": "unknown", "summary": "", "brand": "", "tags": []} | |
| item_obj["title"] = "unknown" | |
| items_out.append(item_obj) | |
| except Exception as e: | |
| log.warning("skipping item due to error: %s", e) | |
| else: | |
| log.info("Gemini returned no items or parse failed — using fallback contour crops.") | |
| items_out = fallback_contour_crops(bgr_img, max_items=8) | |
| # ensure analysis/title placeholders | |
| for itm in items_out: | |
| itm.setdefault("analysis", {"type": "unknown", "summary": "", "brand": "", "tags": []}) | |
| itm.setdefault("title", "unknown") | |
| # Perform AI analysis per crop (if possible) and auto-upload to firebase with metadata (tmp + session) | |
| if FIREBASE_ADMIN_JSON and FIREBASE_ADMIN_AVAILABLE: | |
| try: | |
| init_firebase_admin_if_needed() | |
| bucket = fb_storage.bucket() | |
| except Exception as e: | |
| log.exception("Firebase admin init for upload failed: %s", e) | |
| bucket = None | |
| safe_uid = "".join(ch for ch in uid if ch.isalnum() or ch in ("-", "_")) or "anon" | |
| for itm in items_out: | |
| b64 = itm.get("thumbnail_b64") | |
| if not b64: | |
| continue | |
| # analyze | |
| try: | |
| analysis = analyze_crop_with_gemini(b64) if client else {"type": "unknown", "summary": "", "brand": "", "tags": []} | |
| except Exception as ae: | |
| log.warning("analysis failed: %s", ae) | |
| analysis = {"type": "unknown", "summary": "", "brand": "", "tags": []} | |
| # attach analysis and map to frontend category/title | |
| itm["analysis"] = analysis | |
| mapped_title = map_type_to_category(analysis.get("type", "") or itm.get("label", "")) | |
| itm["title"] = mapped_title | |
| item_id = itm.get("id") or str(uuid.uuid4()) | |
| path = f"detected/{safe_uid}/{item_id}.jpg" | |
| try: | |
| metadata = { | |
| "tmp": "true", | |
| "session_id": session_id, | |
| "uploaded_by": safe_uid, | |
| "uploaded_at": str(int(time.time())), | |
| # store AI fields as JSON strings for later inspection | |
| "ai_type": analysis.get("type", ""), | |
| "ai_brand": analysis.get("brand", ""), | |
| "ai_summary": analysis.get("summary", ""), | |
| "ai_tags": json.dumps(analysis.get("tags", [])), | |
| } | |
| url = upload_b64_to_firebase(b64, path, content_type="image/jpeg", metadata=metadata) | |
| itm["thumbnail_url"] = url | |
| itm["thumbnail_path"] = path | |
| itm.pop("thumbnail_b64", None) | |
| itm["_session_id"] = session_id | |
| log.debug("Auto-uploaded thumbnail for %s -> %s (session=%s)", item_id, url, session_id) | |
| except Exception as up_e: | |
| log.warning("Auto-upload failed for %s: %s", item_id, up_e) | |
| # keep thumbnail_b64 and analysis for client fallback | |
| else: | |
| if not FIREBASE_ADMIN_JSON: | |
| log.info("FIREBASE_ADMIN_JSON not set; skipping server-side thumbnail upload.") | |
| else: | |
| log.info("Firebase admin SDK not available; skipping server-side thumbnail upload.") | |
| # For items without firebase upload, still attempt local analysis mapping | |
| for itm in items_out: | |
| if "analysis" not in itm or not itm["analysis"]: | |
| # attempt lightweight analysis mapping using label | |
| itm.setdefault("analysis", {"type": itm.get("label", "unknown"), "summary": "", "brand": "", "tags": []}) | |
| mapped_title = map_type_to_category(itm["analysis"].get("type", "") or itm.get("label", "")) | |
| itm["title"] = mapped_title | |
| return jsonify({"ok": True, "items": items_out, "session_id": session_id, "debug": {"raw_model_text": (raw_text or "")[:1600]}}), 200 | |
| except Exception as ex: | |
| log.exception("Processing error: %s", ex) | |
| try: | |
| items_out = fallback_contour_crops(bgr_img, max_items=8) | |
| for itm in items_out: | |
| itm.setdefault("analysis", {"type": "unknown", "summary": "", "brand": "", "tags": []}) | |
| itm["title"] = map_type_to_category(itm["analysis"].get("type", "") or itm.get("label", "")) | |
| return jsonify({"ok": True, "items": items_out, "session_id": session_id, "debug": {"error": str(ex)}}), 200 | |
| except Exception as e2: | |
| log.exception("Fallback also failed: %s", e2) | |
| return jsonify({"error": "internal failure", "detail": str(e2)}), 500 | |
| # ---------- Finalize endpoint: keep selected and delete only session's temp files ---------- | |
| def finalize_detections(): | |
| """ | |
| Body JSON: { "uid": "user123", "keep_ids": ["id1","id2",...], "session_id": "<session id from /process>" } | |
| Server will delete only detected/<uid>/* files whose: | |
| - metadata.tmp == "true" | |
| - metadata.session_id == session_id | |
| - item_id NOT in keep_ids | |
| Returns: | |
| { ok: True, kept: [...], deleted: [...], errors: [...] } | |
| """ | |
| try: | |
| body = request.get_json(force=True) | |
| except Exception: | |
| return jsonify({"error": "invalid json"}), 400 | |
| uid = (body.get("uid") or request.args.get("uid") or "anon").strip() or "anon" | |
| keep_ids = set(body.get("keep_ids") or []) | |
| session_id = (body.get("session_id") or request.args.get("session_id") or "").strip() | |
| if not session_id: | |
| return jsonify({"error": "session_id required for finalize to avoid unsafe deletes"}), 400 | |
| if not FIREBASE_ADMIN_JSON or not FIREBASE_ADMIN_AVAILABLE: | |
| return jsonify({"error": "firebase admin not configured"}), 500 | |
| try: | |
| init_firebase_admin_if_needed() | |
| bucket = fb_storage.bucket() | |
| except Exception as e: | |
| log.exception("Firebase init error in finalize: %s", e) | |
| return jsonify({"error": "firebase admin init failed", "detail": str(e)}), 500 | |
| safe_uid = "".join(ch for ch in uid if ch.isalnum() or ch in ("-", "_")) or "anon" | |
| prefix = f"detected/{safe_uid}/" | |
| kept = [] | |
| deleted = [] | |
| errors = [] | |
| try: | |
| blobs = list(bucket.list_blobs(prefix=prefix)) | |
| for blob in blobs: | |
| try: | |
| name = blob.name | |
| fname = name.split("/")[-1] | |
| if "." not in fname: | |
| continue | |
| item_id = fname.rsplit(".", 1)[0] | |
| md = blob.metadata or {} | |
| # only consider temporary files matching this session id | |
| if str(md.get("session_id", "")) != session_id or str(md.get("tmp", "")).lower() not in ("true", "1", "yes"): | |
| continue | |
| if item_id in keep_ids: | |
| # ensure public URL available if possible | |
| try: | |
| blob.make_public() | |
| url = blob.public_url | |
| except Exception: | |
| url = f"gs://{bucket.name}/{name}" | |
| # extract AI metadata (if present) | |
| ai_type = md.get("ai_type") or "" | |
| ai_brand = md.get("ai_brand") or "" | |
| ai_summary = md.get("ai_summary") or "" | |
| ai_tags_raw = md.get("ai_tags") or "[]" | |
| try: | |
| ai_tags = json.loads(ai_tags_raw) if isinstance(ai_tags_raw, str) else ai_tags_raw | |
| except Exception: | |
| ai_tags = [] | |
| kept.append( | |
| { | |
| "id": item_id, | |
| "thumbnail_url": url, | |
| "thumbnail_path": name, | |
| "analysis": {"type": ai_type, "brand": ai_brand, "summary": ai_summary, "tags": ai_tags}, | |
| } | |
| ) | |
| else: | |
| try: | |
| blob.delete() | |
| deleted.append(item_id) | |
| except Exception as de: | |
| errors.append({"id": item_id, "error": str(de)}) | |
| except Exception as e: | |
| errors.append({"blob": getattr(blob, "name", None), "error": str(e)}) | |
| return jsonify({"ok": True, "kept": kept, "deleted": deleted, "errors": errors}), 200 | |
| except Exception as e: | |
| log.exception("finalize_detections error: %s", e) | |
| return jsonify({"error": "internal", "detail": str(e)}), 500 | |
| # ---------- Clear session: delete all temporary files for a session ---------- | |
| def clear_session(): | |
| """ | |
| Body JSON: { "session_id": "", "uid": "" } | |
| Deletes all detected//* blobs where metadata.session_id == session_id and metadata.tmp == "true". | |
| """ | |
| try: | |
| body = request.get_json(force=True) | |
| except Exception: | |
| return jsonify({"error": "invalid json"}), 400 | |
| session_id = (body.get("session_id") or request.args.get("session_id") or "").strip() | |
| uid = (body.get("uid") or request.args.get("uid") or "anon").strip() or "anon" | |
| if not session_id: | |
| return jsonify({"error": "session_id required"}), 400 | |
| if not FIREBASE_ADMIN_JSON or not FIREBASE_ADMIN_AVAILABLE: | |
| return jsonify({"error": "firebase admin not configured"}), 500 | |
| try: | |
| init_firebase_admin_if_needed() | |
| bucket = fb_storage.bucket() | |
| except Exception as e: | |
| log.exception("Firebase init error in clear_session: %s", e) | |
| return jsonify({"error": "firebase admin init failed", "detail": str(e)}), 500 | |
| safe_uid = "".join(ch for ch in uid if ch.isalnum() or ch in ("-", "_")) or "anon" | |
| prefix = f"detected/{safe_uid}/" | |
| deleted = [] | |
| errors = [] | |
| try: | |
| blobs = list(bucket.list_blobs(prefix=prefix)) | |
| for blob in blobs: | |
| try: | |
| md = blob.metadata or {} | |
| if str(md.get("session_id", "")) == session_id and str(md.get("tmp", "")).lower() in ("true", "1", "yes"): | |
| try: | |
| blob.delete() | |
| deleted.append(blob.name.split("/")[-1].rsplit(".", 1)[0]) | |
| except Exception as de: | |
| errors.append({"blob": blob.name, "error": str(de)}) | |
| except Exception as e: | |
| errors.append({"blob": getattr(blob, "name", None), "error": str(e)}) | |
| return jsonify({"ok": True, "deleted": deleted, "errors": errors}), 200 | |
| except Exception as e: | |
| log.exception("clear_session error: %s", e) | |
| return jsonify({"error": "internal", "detail": str(e)}), 500 | |
| if __name__ == "__main__": | |
| port = int(os.getenv("PORT", 7860)) | |
| log.info("Starting server on 0.0.0.0:%d", port) | |
| app.run(host="0.0.0.0", port=port, debug=True) |