Pepguy's picture
Update app.py
65cfd54 verified
# single_suggest_server.py
"""
Single-endpoint suggestion server using Google Gemini (genai) exclusively.
Environment variables:
- GEMINI_API_KEY (optional) : API key for Google genai SDK
- FIREBASE_ADMIN_JSON (optional) : service account JSON (string) to initialize firebase-admin
- PORT (optional) : port to run (default 7860)
Behavior:
- If GEMINI_API_KEY & genai SDK present: uses Gemini for candidate generation and final notes.
- Otherwise falls back to deterministic naive generator (no placeholders).
- Firestore persistence if FIREBASE_ADMIN_JSON provided.
"""
import os
import io
import json
import uuid
import time
import logging
import difflib
from typing import List, Dict, Any, Optional
from flask import Flask, request, jsonify
from flask_cors import CORS
# HTTP helper for generic fetches if needed
import requests
# Try to import Google GenAI SDK
try:
from google import genai
from google.genai import types
GENAI_AVAILABLE = True
except Exception:
genai = None
types = None
GENAI_AVAILABLE = False
GEMINI_API_KEY = os.getenv("GEMINI_API_KEY", "").strip()
if GEMINI_API_KEY and GENAI_AVAILABLE:
client = genai.Client(api_key=GEMINI_API_KEY)
logging.info("Gemini client configured.")
else:
client = None
if GEMINI_API_KEY and not GENAI_AVAILABLE:
logging.warning("GEMINI_API_KEY provided but genai SDK not installed; Gemini disabled.")
else:
logging.info("GEMINI_API_KEY not provided; using deterministic fallback generator.")
# Firestore admin
try:
import firebase_admin
from firebase_admin import credentials as fb_credentials
from firebase_admin import firestore as fb_firestore_module
FIREBASE_AVAILABLE = True
except Exception:
firebase_admin = None
fb_credentials = None
fb_firestore_module = None
FIREBASE_AVAILABLE = False
FIREBASE_ADMIN_JSON = os.getenv("FIREBASE_ADMIN_JSON", "").strip()
_firestore_client = None
_firebase_app = None
logging.basicConfig(level=logging.INFO)
log = logging.getLogger("single-suggest-gemini-server")
def init_firestore_if_needed():
global _firestore_client, _firebase_app
if _firestore_client is not None:
return _firestore_client
if not FIREBASE_ADMIN_JSON:
log.info("No FIREBASE_ADMIN_JSON set; Firestore not initialized.")
return None
if not FIREBASE_AVAILABLE:
log.warning("FIREBASE_ADMIN_JSON provided but firebase-admin SDK not available; skip Firestore init.")
return None
try:
sa_obj = json.loads(FIREBASE_ADMIN_JSON)
except Exception as e:
log.exception("Failed parsing FIREBASE_ADMIN_JSON: %s", e)
return None
try:
cred = fb_credentials.Certificate(sa_obj)
try:
_firebase_app = firebase_admin.get_app()
except Exception:
_firebase_app = firebase_admin.initialize_app(cred)
_firestore_client = fb_firestore_module.client()
log.info("Initialized Firestore client.")
return _firestore_client
except Exception as e:
log.exception("Failed to init Firestore: %s", e)
return None
# ---------- categories / heuristics ----------
CATEGORIES = [
"top", "shirt", "blouse", "tshirt", "sweater", "jacket", "coat", "dress", "skirt",
"pants", "trousers", "shorts", "jeans", "shoe", "heels", "sneaker", "boot", "sandals",
"bag", "belt", "hat", "accessory", "others",
]
def map_type_to_category(item_type: str) -> str:
if not item_type:
return "others"
t = item_type.strip().lower()
if t in CATEGORIES:
return t
t_clean = t.rstrip("s")
if t_clean in CATEGORIES:
return t_clean
matches = difflib.get_close_matches(t, CATEGORIES, n=1, cutoff=0.6)
if matches:
return matches[0]
for token in t.replace("_", " ").split():
if token in CATEGORIES:
return token
return "others"
def _safe_item_brand(itm: Dict[str, Any]) -> str:
analysis = itm.get("analysis") or {}
brand = analysis.get("brand") if isinstance(analysis, dict) else None
if not brand:
brand = itm.get("brand") or ""
return str(brand).strip()
TOP_LIKE_CATEGORIES = {"top", "shirt", "tshirt", "blouse", "sweater"}
def _item_title_for_map(it: Dict[str, Any]) -> str:
return str((it.get("title") or (it.get("analysis") or {}).get("type") or it.get("label") or "")).strip().lower()
def prioritize_top_item(items: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
if not items:
return items
top_idx = None
for i, it in enumerate(items):
try:
title = _item_title_for_map(it)
cat = map_type_to_category(title)
if cat in TOP_LIKE_CATEGORIES:
top_idx = i
break
except Exception:
continue
if top_idx is not None and top_idx != 0:
new_items = items[:]
item = new_items.pop(top_idx)
new_items.insert(0, item)
return new_items
try:
best_idx = max(range(len(items)), key=lambda i: float(items[i].get("confidence", 0.5)))
if best_idx != 0:
new_items = items[:]
item = new_items.pop(best_idx)
new_items.insert(0, item)
return new_items
except Exception:
pass
return items
# ---------- deterministic naive generator (real) ----------
def naive_generate_candidates(wardrobe_items: List[Dict[str, Any]],
user_inputs: Dict[str, Any],
user_profile: Dict[str, Any],
past_week_items: List[Dict[str, Any]],
max_candidates: int = 6) -> List[Dict[str, Any]]:
grouped = {}
for itm in wardrobe_items:
title = (itm.get("title") or (itm.get("analysis") or {}).get("type") or itm.get("label") or "")
cat = map_type_to_category(title)
grouped.setdefault(cat, []).append(itm)
def pick(cat, n=3):
arr = grouped.get(cat, [])[:]
arr.sort(key=lambda x: float(x.get("confidence", 0.5)), reverse=True)
return arr[:n]
tops = pick("top", 5) + pick("shirt", 3) + pick("tshirt", 3)
bottoms = pick("pants", 4) + pick("jeans", 3) + pick("skirt", 2)
outer = pick("jacket", 3) + pick("coat", 2)
shoes = pick("shoe", 4) + pick("sneaker", 3) + pick("boot", 2) + pick("heels", 2)
dresses = grouped.get("dress", [])[:4]
seeds = dresses + tops
if not seeds:
seeds = wardrobe_items[:6]
past_ids = {x.get("id") for x in (past_week_items or []) if x.get("id")}
candidates = []
used = set()
for seed in seeds:
for b in (bottoms[:3] or [None]):
for sh in (shoes[:3] or [None]):
items = [seed]
if b and b.get("id") != seed.get("id"):
items.append(b)
if sh and sh.get("id") not in {seed.get("id"), b.get("id") if b else None}:
items.append(sh)
items = prioritize_top_item(items)
ids = tuple(sorted([str(x.get("id")) for x in items if x.get("id")]))
if ids in used:
continue
used.add(ids)
score = sum(float(x.get("confidence", 0.5)) for x in items) / max(1, len(items))
if any(x.get("id") in past_ids for x in items if x.get("id")):
score -= 0.15
score = max(0, min(1, score + (0.02 * ((hash(ids) % 100) / 100.0))))
candidate = {
"id": str(uuid.uuid4()),
"items": [{"id": x.get("id"), "label": x.get("label"), "title": x.get("title"),
"thumbnailUrl": x.get("thumbnailUrl") or x.get("thumbnail_url"),
"analysis": x.get("analysis", {}), "confidence": x.get("confidence", 0.5)} for x in items],
"score": round(float(score), 3),
"reason": "Auto combo",
"notes": "",
}
candidates.append(candidate)
if len(candidates) >= max_candidates:
break
if len(candidates) >= max_candidates:
break
if len(candidates) >= max_candidates:
break
candidates.sort(key=lambda c: c.get("score", 0), reverse=True)
return candidates
# ---------- Gemini helpers ----------
def generate_candidates_with_gemini(wardrobe_items: List[Dict[str, Any]],
user_inputs: Dict[str, Any],
user_profile: Dict[str, Any],
past_week_items: List[Dict[str, Any]],
max_candidates: int = 6) -> List[Dict[str, Any]]:
if not client:
log.info("Gemini disabled; using naive generator.")
return naive_generate_candidates(wardrobe_items, user_inputs, user_profile, past_week_items, max_candidates)
try:
summarized = []
for it in wardrobe_items:
a = it.get("analysis") or {}
summarized.append({
"id": it.get("id"),
"type": a.get("type") or it.get("title") or it.get("label") or "",
"summary": (a.get("summary") or "")[:180],
"brand": (a.get("brand") or "")[:80],
"tags": a.get("tags") or [],
"thumbnailUrl": it.get("thumbnailUrl") or it.get("thumbnail_url") or ""
})
# Compose a prompt asking for JSON candidates
prompt = (
"You are a stylist assistant. Given WARDROBE array (id,type,summary,brand,tags,thumbnailUrl),\n"
"USER_INPUT (moods, appearances, events, activity, preferred/excluded colors, keyBrands, etc.),\n"
"and PAST_WEEK (recent item ids), produce up to {max} candidate outfits.\n\n"
"Return only valid JSON: {\"candidates\": [ {\"id\": \"..\", \"item_ids\": [..], \"score\": 0-1, \"notes\": \"one-line\", \"short_reason\": \"phrase\"}, ... ]}\n\n"
"WARDROBE = {wardrobe}\nUSER_INPUT = {u}\nPAST_WEEK = {p}\n".format(max=max_candidates, wardrobe=json.dumps(summarized), u=json.dumps(user_inputs), p=json.dumps([p.get("id") for p in (past_week_items or [])]))
)
contents = [types.Content(role="user", parts=[types.Part.from_text(text=prompt)])]
schema = {
"type": "object",
"properties": {
"candidates": {
"type": "array",
"items": {
"type": "object",
"properties": {
"id": {"type": "string"},
"item_ids": {"type": "array", "items": {"type": "string"}},
"score": {"type": "number"},
"notes": {"type": "string"},
"short_reason": {"type": "string"},
},
"required": ["id", "item_ids"],
},
}
},
"required": ["candidates"],
}
cfg = types.GenerateContentConfig(response_mime_type="application/json", response_schema=schema)
resp = client.models.generate_content(
model="gemini-2.5-flash", # choose an appropriate Gemini model
contents=contents,
config=cfg
)
raw = resp.text or ""
parsed = json.loads(raw) if raw else None
id_map = {str(it.get("id")): it for it in wardrobe_items}
out = []
for c in (parsed.get("candidates", []) if parsed else [])[:max_candidates]:
items = []
for iid in c.get("item_ids", []):
itm = id_map.get(str(iid))
if itm:
items.append({"id": itm.get("id"), "label": itm.get("label"), "title": itm.get("title"),
"thumbnailUrl": itm.get("thumbnailUrl") or itm.get("thumbnail_url"),
"analysis": itm.get("analysis", {}), "confidence": itm.get("confidence", 0.5)})
items = prioritize_top_item(items)
out.append({
"id": c.get("id") or str(uuid.uuid4()),
"items": items,
"score": float(c.get("score", 0.5)),
"reason": c.get("short_reason") or "",
"notes": (c.get("notes") or "")[:300],
})
if not out:
log.warning("Gemini returned no candidates; fallback to naive generator.")
return naive_generate_candidates(wardrobe_items, user_inputs, user_profile, past_week_items, max_candidates)
out.sort(key=lambda x: x.get("score", 0), reverse=True)
return out[:max_candidates]
except Exception as e:
log.exception("Gemini candidate generation failed: %s", e)
return naive_generate_candidates(wardrobe_items, user_inputs, user_profile, past_week_items, max_candidates)
def finalize_suggestion_note_with_gemini(candidate: Dict[str, Any], user_inputs: Dict[str, Any], user_profile: Dict[str, Any]) -> str:
if not client:
# heuristic fallback
moods = user_inputs.get("moods") or []
events = user_inputs.get("events") or []
mood = moods[0] if moods else "your mood"
ev = events[0] if events else "your event"
return f"A curated outfit selected for {mood} at {ev} — stylish and practical."
try:
candidate_items = []
for it in candidate.get("items", []):
desc = (it.get("analysis") or {}).get("summary") or it.get("label") or it.get("title") or ""
brand = (it.get("analysis") or {}).get("brand") or ""
candidate_items.append({"id": it.get("id"), "desc": desc[:160], "brand": brand[:60]})
prompt = (
"You are a concise stylist. Given CANDIDATE_ITEMS (list of short item descriptions) and USER_INPUT, "
"write a single short friendly sentence (<=18 words) explaining why this outfit was chosen. Return plain text.\n\n"
f"CANDIDATE_ITEMS: {json.dumps(candidate_items)}\nUSER_INPUT: {json.dumps(user_inputs or {})}\nReturn only a single short sentence."
)
contents = [types.Content(role="user", parts=[types.Part.from_text(text=prompt)])]
resp = client.models.generate_content(model="gemini-2.5-flash-lite", contents=contents)
text = (resp.text or "").strip()
return text.splitlines()[0] if text else "A curated outfit chosen for your preferences."
except Exception as e:
log.exception("Gemini finalize note failed: %s", e)
moods = user_inputs.get("moods") or []
events = user_inputs.get("events") or []
mood = moods[0] if moods else "your mood"
ev = events[0] if events else "your event"
return f"A curated outfit selected for {mood} at {ev} — stylish and practical."
# ---------- Firestore fetching ----------
def fetch_wardrobe_from_firestore(uid: str) -> List[Dict[str, Any]]:
fs = init_firestore_if_needed()
if not fs:
return []
try:
subcol = fs.collection("users").document(uid).collection("wardrobe")
docs = subcol.limit(1000).get()
items = []
for d in docs:
dd = d.to_dict() or {}
thumb = dd.get("thumbnailUrl") if dd.get("thumbnailUrl") is not None else dd.get("thumbnail_url")
items.append({
"id": dd.get("id") or d.id,
"label": dd.get("label") or dd.get("title") or "item",
"title": dd.get("title") or dd.get("label") or "",
"thumbnailUrl": thumb,
"analysis": dd.get("analysis", {}),
"confidence": dd.get("confidence", 0.8),
})
if items:
return items
except Exception as e:
log.warning("users/{uid}/wardrobe subcollection read failed: %s", e)
try:
q = fs.collection("wardrobe").where("uid", "==", uid).limit(500)
docs = q.get()
items = []
for d in docs:
dd = d.to_dict() or {}
thumb = dd.get("thumbnailUrl") if dd.get("thumbnailUrl") is not None else dd.get("thumbnail_url")
items.append({
"id": dd.get("id") or d.id,
"label": dd.get("label") or dd.get("title") or "item",
"title": dd.get("title") or dd.get("label") or "",
"thumbnailUrl": thumb,
"analysis": dd.get("analysis", {}),
"confidence": dd.get("confidence", 0.8),
})
return items
except Exception as e:
log.warning("wardrobe collection query failed: %s", e)
return []
# ---------- refinement ----------
def refine_candidates_with_constraints(candidates: List[Dict[str, Any]],
wardrobe_items: List[Dict[str, Any]],
constraints: Dict[str, Any]) -> Dict[str, Any]:
require_brands = set([b.lower() for b in (constraints.get("require_brands") or []) if b])
reject_brands = set([b.lower() for b in (constraints.get("reject_brands") or []) if b])
past_ids = set([x.get("id") for x in (constraints.get("past_week_items") or []) if x.get("id")])
allow_rerun = bool(constraints.get("allow_rerun", False))
id_map = {str(it.get("id")): it for it in wardrobe_items}
refined = []
removed = []
for cand in candidates:
items = cand.get("items") or []
resolved = []
for i in items:
iid = str(i.get("id"))
full = id_map.get(iid)
if full:
resolved.append(full)
else:
resolved.append(i)
if require_brands:
if not any((_safe_item_brand(it).lower() in require_brands) for it in resolved):
removed.append({"id": cand.get("id"), "reason": "missing required brand"})
continue
if reject_brands:
if any((_safe_item_brand(it).lower() in reject_brands) for it in resolved):
removed.append({"id": cand.get("id"), "reason": "contains rejected brand"})
continue
if past_ids and any((it.get("id") in past_ids) for it in resolved):
if not allow_rerun:
removed.append({"id": cand.get("id"), "reason": "uses recent items"})
continue
else:
cand["_conflict_with_schedule"] = True
cand["items"] = [
{
"id": it.get("id"),
"label": it.get("label"),
"title": it.get("title"),
"thumbnailUrl": it.get("thumbnailUrl") if it.get("thumbnailUrl") is not None else it.get("thumbnail_url"),
"analysis": it.get("analysis", {}),
"confidence": it.get("confidence", 0.5),
} for it in resolved
]
refined.append(cand)
if not refined:
hint = "All candidates filtered out. Consider loosening constraints or allow rerun."
return {"refined": [], "rerun_required": True, "rerun_hint": hint, "removed": removed}
refined.sort(key=lambda c: c.get("score", 0), reverse=True)
return {"refined": refined, "rerun_required": False, "rerun_hint": "", "removed": removed}
# ---------- Flask app ----------
app = Flask(__name__)
CORS(app)
@app.route("/suggest", methods=["POST"])
def suggest_all():
is_multipart = request.content_type and request.content_type.startswith("multipart/form-data")
try:
if is_multipart:
form = request.form
files = request.files
uid = (form.get("uid") or form.get("user_id") or "anon").strip() or "anon"
user_inputs_raw = form.get("user_inputs")
user_inputs = json.loads(user_inputs_raw) if user_inputs_raw else {}
max_c = int(form.get("max_candidates") or 6)
wardrobe_items_raw = form.get("wardrobe_items")
wardrobe_items = json.loads(wardrobe_items_raw) if wardrobe_items_raw else []
audio_file = files.get("audio")
audio_b64 = None
if audio_file:
audio_bytes = audio_file.read()
import base64
audio_b64 = base64.b64encode(audio_bytes).decode("ascii")
else:
body = request.get_json(force=True)
uid = (body.get("uid") or body.get("user_id") or "anon").strip() or "anon"
user_inputs = body.get("user_inputs") or {}
max_c = int(body.get("max_candidates") or 6)
wardrobe_items = body.get("wardrobe_items") or []
audio_b64 = body.get("audio_b64")
except Exception as e:
log.exception("Invalid request payload: %s", e)
return jsonify({"error": "invalid request payload"}), 400
# normalize wardrobe items -> ensure thumbnailUrl exists
try:
normalized_items = []
for it in wardrobe_items or []:
if not isinstance(it, dict):
normalized_items.append(it)
continue
thumb = it.get("thumbnailUrl") if it.get("thumbnailUrl") is not None else it.get("thumbnail_url")
new_it = dict(it)
new_it["thumbnailUrl"] = thumb
normalized_items.append(new_it)
wardrobe_items = normalized_items
except Exception:
pass
# Try fetching from Firestore if wardrobe empty
if not wardrobe_items:
try:
wardrobe_items = fetch_wardrobe_from_firestore(uid) or []
log.info("Fetched %d wardrobe items for uid=%s from Firestore", len(wardrobe_items), uid)
except Exception as e:
log.warning("Failed to fetch wardrobe from Firestore: %s", e)
wardrobe_items = []
if not isinstance(user_inputs, dict):
return jsonify({"error": "user_inputs must be an object"}), 400
if not wardrobe_items:
return jsonify({"error": "no wardrobe_items provided and none found in Firestore"}), 400
# build user summary (try Firestore, else heuristic)
try:
fs = init_firestore_if_needed()
user_summary = None
if fs:
try:
doc_ref = fs.collection("users").document(uid)
doc = doc_ref.get()
if doc.exists:
data = doc.to_dict() or {}
user_summary = data.get("summary")
except Exception:
pass
if not user_summary:
moods = user_inputs.get("moods") or []
brands = user_inputs.get("keyBrands") or []
events = user_inputs.get("events") or []
parts = []
if moods:
parts.append("moods: " + ", ".join(moods[:3]))
if brands:
parts.append("likes brands: " + ", ".join(brands[:3]))
if events:
parts.append("often for: " + ", ".join(events[:2]))
user_summary = " & ".join(parts) if parts else "A user who likes practical, simple outfits."
except Exception as e:
log.exception("user summary build failed: %s", e)
user_summary = "A user who likes practical, simple outfits."
# fetch recent suggestions for penalization
past_week_items = []
try:
fs = init_firestore_if_needed()
if fs:
cutoff = int(time.time()) - 7 * 86400
q = fs.collection("suggestions").where("uid", "==", uid).where("createdAtTs", ">=", cutoff).limit(200)
docs = q.get()
for d in docs:
dd = d.to_dict() or {}
for it in dd.get("items", []) or []:
past_week_items.append({"id": it.get("id"), "label": it.get("label")})
except Exception as e:
log.warning("Failed to fetch recent suggestions: %s", e)
# candidate generation
try:
if client:
candidates = generate_candidates_with_gemini(wardrobe_items, user_inputs, {"summary": user_summary}, past_week_items, max_candidates=max_c)
else:
candidates = naive_generate_candidates(wardrobe_items, user_inputs, {"summary": user_summary}, past_week_items, max_candidates=max_c)
except Exception as e:
log.exception("candidate generation failed: %s", e)
candidates = naive_generate_candidates(wardrobe_items, user_inputs, {"summary": user_summary}, past_week_items, max_candidates=max_c)
# refine
constraints = {
"require_brands": user_inputs.get("keyBrands") or [],
"reject_brands": user_inputs.get("reject_brands") or user_inputs.get("excluded_brands") or [],
"past_week_items": past_week_items,
"allow_rerun": bool(user_inputs.get("allow_rerun", True)),
}
refine_result = refine_candidates_with_constraints(candidates, wardrobe_items, constraints)
if refine_result.get("rerun_required") and constraints.get("allow_rerun"):
log.info("Refine requested rerun; performing deterministic rerun.")
alt_candidates = naive_generate_candidates(wardrobe_items, user_inputs, {"summary": user_summary}, past_week_items, max_candidates=max(8, max_c * 2))
refine_result = refine_candidates_with_constraints(alt_candidates, wardrobe_items, constraints)
refined = refine_result.get("refined", [])
# finalize suggestion notes and build suggestion objects
suggestions = []
for cand in refined:
try:
cand_items = cand.get("items", []) or []
cand_items = prioritize_top_item(cand_items)
cand["items"] = cand_items
note = finalize_suggestion_note_with_gemini(cand, user_inputs, {"summary": user_summary}) if client else finalize_suggestion_note_with_gemini(cand, user_inputs, {"summary": user_summary})
except Exception as e:
log.warning("Failed to produce final note for candidate %s: %s", cand.get("id"), e)
note = cand.get("notes") or cand.get("reason") or "A curated outfit."
thumb_urls = [it.get("thumbnailUrl") for it in cand.get("items", []) if it.get("thumbnailUrl")]
suggestion = {
"id": cand.get("id") or str(uuid.uuid4()),
"items": cand.get("items", []),
"thumbnailUrls": thumb_urls,
"primary_item_id": (cand.get("items", []) and cand.get("items", [])[0].get("id")) or None,
"note": note,
"score": cand.get("score"),
"meta": {
"generated_at": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
"source": "single_suggest_pipeline_gemini" if client else "single_suggest_pipeline_naive",
"user_inputs": user_inputs,
},
"uid": uid,
"createdAtTs": int(time.time()),
}
suggestions.append(suggestion)
# persist suggestions to Firestore (best-effort)
persisted_ids = []
fs = init_firestore_if_needed()
if fs and suggestions:
try:
col = fs.collection("suggestions")
for s in suggestions:
try:
doc_id = s["id"]
col.document(doc_id).set(s)
persisted_ids.append(doc_id)
except Exception as se:
log.warning("Failed to persist suggestion %s: %s", s.get("id"), se)
except Exception as e:
log.warning("Failed to persist suggestions collection: %s", e)
debug = {
"candidates_count": len(candidates),
"refined_count": len(refined),
"persisted": persisted_ids,
"rerun_hint": refine_result.get("rerun_hint", ""),
}
return jsonify({"ok": True, "user_summary": user_summary, "suggestions": suggestions, "debug": debug}), 200
@app.route("/health", methods=["GET"])
def health():
return jsonify({
"ok": True,
"time": int(time.time()),
"gemini": bool(client),
"firestore": bool(init_firestore_if_needed())
}), 200
if __name__ == "__main__":
port = int(os.getenv("PORT", 7860))
log.info("Starting single-suggest server on 0.0.0.0:%d", port)
app.run(host="0.0.0.0", port=port, debug=False)