""" PriceOye AI Phone Advisor — FastAPI Backend ============================================ Hugging Face Spaces deployment on port 7860. Endpoints: GET / → serves index.html POST /chat → conversation POST /compare → compare two phones POST /detect → detect phone from image (uses Claude vision) GET /phones → full catalog JSON GET /health → health check Set ANTHROPIC_API_KEY in HF Space Secrets before deploying. """ import os import base64 import uuid from pathlib import Path from typing import Optional from fastapi import FastAPI, UploadFile, File, HTTPException from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import HTMLResponse from pydantic import BaseModel import anthropic from src.conversation import ( ConversationState, handle_message, get_greeting, format_phone_summary, PHONE_DB ) from src.scoring_engine import ( get_category_scores, get_sub_scores, recommend, UserPreferences, score_phone ) # ── Init ────────────────────────────────────────────────── app = FastAPI(title="PriceOye AI", version="3.0.0") app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"] ) SESSIONS: dict[str, ConversationState] = {} HTML_PATH = Path("index.html") ANTHROPIC_KEY = os.environ.get("ANTHROPIC_API_KEY", "") # ── Schemas ─────────────────────────────────────────────── class ChatRequest(BaseModel): session_id: Optional[str] = None message: str class CompareRequest(BaseModel): phone_id_a: str phone_id_b: str priority: Optional[str] = "balanced" # ── Routes ──────────────────────────────────────────────── @app.get("/", response_class=HTMLResponse) def serve_frontend(): if not HTML_PATH.exists(): raise HTTPException(status_code=404, detail="index.html not found") return HTMLResponse(content=HTML_PATH.read_text(encoding="utf-8")) @app.get("/health") def health(): return { "status": "ok", "phones": len(PHONE_DB), "vision_enabled": bool(ANTHROPIC_KEY), "version": "3.0.0" } @app.get("/phones") def list_phones(): return [ { "id": p.id, "name": p.name, "brand": p.brand, "os": p.os, "price_pkr": p.price_pkr, "price_label": p.price_label, "emoji": p.emoji, "tags": p.tags, "priceoye_url": p.priceoye_url, "whatmobile_url": p.whatmobile_url, "category_scores": {k: round(v, 1) for k, v in get_category_scores(p).items()}, } for p in PHONE_DB ] @app.post("/chat") def chat(req: ChatRequest): session_id = req.session_id or str(uuid.uuid4()) if session_id not in SESSIONS or req.session_id is None: greeting = get_greeting() SESSIONS[session_id] = greeting["state"] return {**greeting, "session_id": session_id} state = SESSIONS[session_id] response = handle_message(req.message, state) SESSIONS[session_id] = response["state"] return {**response, "session_id": session_id} @app.post("/compare") def compare_phones(req: CompareRequest): phone_a = next((p for p in PHONE_DB if p.id == req.phone_id_a), None) phone_b = next((p for p in PHONE_DB if p.id == req.phone_id_b), None) if not phone_a: raise HTTPException(400, f"Phone '{req.phone_id_a}' not found") if not phone_b: raise HTTPException(400, f"Phone '{req.phone_id_b}' not found") cats_a = get_category_scores(phone_a) cats_b = get_category_scores(phone_b) cats = list(cats_a.keys()) winner_map = {} for cat in cats: sa, sb = cats_a[cat], cats_b[cat] if sa > sb + 0.2: winner_map[cat] = "a" elif sb > sa + 0.2: winner_map[cat] = "b" else: winner_map[cat] = "tie" overall_a = round(sum(cats_a.values()) / len(cats_a), 2) overall_b = round(sum(cats_b.values()) / len(cats_b), 2) overall_winner = "a" if overall_a > overall_b else "b" if overall_b > overall_a else "tie" # Priority-weighted score prefs = UserPreferences( budget=max(phone_a.price_pkr, phone_b.price_pkr) + 50000, os_preference="any", priority=req.priority, ) score_a = score_phone(phone_a, prefs, PHONE_DB) score_b = score_phone(phone_b, prefs, PHONE_DB) priority_winner = "a" if score_a > score_b else "b" return { "phone_a": { "id": phone_a.id, "name": phone_a.name, "emoji": phone_a.emoji, "price_label": phone_a.price_label, "priceoye_url": phone_a.priceoye_url, "whatmobile_url": phone_a.whatmobile_url, "category_scores": {k: round(v, 1) for k, v in cats_a.items()}, "highlights": phone_a.highlights, "overall": overall_a, "priority_score": round(score_a, 1), }, "phone_b": { "id": phone_b.id, "name": phone_b.name, "emoji": phone_b.emoji, "price_label": phone_b.price_label, "priceoye_url": phone_b.priceoye_url, "whatmobile_url": phone_b.whatmobile_url, "category_scores": {k: round(v, 1) for k, v in cats_b.items()}, "highlights": phone_b.highlights, "overall": overall_b, "priority_score": round(score_b, 1), }, "winners": winner_map, "overall_winner": overall_winner, "priority_winner": priority_winner, "priority": req.priority, } @app.post("/detect") async def detect_phone(file: UploadFile = File(...)): """ Detect which phone is in an uploaded image using Claude Vision. Requires ANTHROPIC_API_KEY in environment. """ if not ANTHROPIC_KEY: raise HTTPException( status_code=503, detail="Image detection unavailable. ANTHROPIC_API_KEY not set." ) # Read and encode image image_bytes = await file.read() if len(image_bytes) > 10 * 1024 * 1024: # 10MB limit raise HTTPException(400, "Image too large. Please use under 10MB.") b64_image = base64.standard_b64encode(image_bytes).decode("utf-8") media_type = file.content_type or "image/jpeg" if media_type not in ("image/jpeg", "image/png", "image/webp", "image/gif"): media_type = "image/jpeg" try: client = anthropic.Anthropic(api_key=ANTHROPIC_KEY) response = client.messages.create( model="claude-sonnet-4-20250514", max_tokens=300, messages=[{ "role": "user", "content": [ { "type": "image", "source": { "type": "base64", "media_type": media_type, "data": b64_image, } }, { "type": "text", "text": ( "Identify the smartphone/mobile phone in this image. " "Respond in this exact JSON format only:\n" '{"brand": "Samsung", "model": "Galaxy S24", "confidence": "high"}\n' "If no phone is visible, respond:\n" '{"brand": null, "model": null, "confidence": "none"}\n' "Confidence levels: high, medium, low, none. " "Give the exact marketed name." ) } ] }] ) import json raw = response.content[0].text.strip() # Strip markdown fences if present raw = raw.replace("```json", "").replace("```", "").strip() result = json.loads(raw) except Exception as e: return { "brand": None, "model": None, "confidence": "none", "error": str(e) } # Try to match detected phone to DB matched_db_phone = None if result.get("model"): detected_name = f"{result.get('brand', '')} {result.get('model', '')}".lower() for p in PHONE_DB: if any(word in p.name.lower() for word in detected_name.split() if len(word) > 3): matched_db_phone = { "id": p.id, "name": p.name, "emoji": p.emoji, "price_label": p.price_label, "priceoye_url": p.priceoye_url, } break return { **result, "matched_in_db": matched_db_phone, "search_url": f"https://priceoye.pk/search?q={result.get('brand', '')}+{result.get('model', '')}".replace(" ", "+"), "whatmobile_url": f"https://www.whatmobile.com.pk/search?search={result.get('brand', '')}+{result.get('model', '')}".replace(" ", "+"), } # ── Entry point ─────────────────────────────────────────── if __name__ == "__main__": import uvicorn uvicorn.run("app:app", host="0.0.0.0", port=7860, reload=False)