Spaces:
Sleeping
Sleeping
| """ | |
| PriceOye AI Phone Advisor β FastAPI Backend | |
| ============================================ | |
| Hugging Face Spaces deployment on port 7860. | |
| Endpoints: | |
| GET / β serves index.html | |
| POST /chat β conversation | |
| POST /compare β compare two phones | |
| POST /detect β detect phone from image (uses Claude vision) | |
| GET /phones β full catalog JSON | |
| GET /health β health check | |
| Set ANTHROPIC_API_KEY in HF Space Secrets before deploying. | |
| """ | |
| import os | |
| import base64 | |
| import uuid | |
| from pathlib import Path | |
| from typing import Optional | |
| from fastapi import FastAPI, UploadFile, File, HTTPException | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from fastapi.responses import HTMLResponse | |
| from pydantic import BaseModel | |
| import anthropic | |
| from src.conversation import ( | |
| ConversationState, handle_message, get_greeting, | |
| format_phone_summary, PHONE_DB | |
| ) | |
| from src.scoring_engine import ( | |
| get_category_scores, get_sub_scores, recommend, | |
| UserPreferences, score_phone | |
| ) | |
| # ββ Init ββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| app = FastAPI(title="PriceOye AI", version="3.0.0") | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], allow_methods=["*"], allow_headers=["*"] | |
| ) | |
| SESSIONS: dict[str, ConversationState] = {} | |
| HTML_PATH = Path("index.html") | |
| ANTHROPIC_KEY = os.environ.get("ANTHROPIC_API_KEY", "") | |
| # ββ Schemas βββββββββββββββββββββββββββββββββββββββββββββββ | |
| class ChatRequest(BaseModel): | |
| session_id: Optional[str] = None | |
| message: str | |
| class CompareRequest(BaseModel): | |
| phone_id_a: str | |
| phone_id_b: str | |
| priority: Optional[str] = "balanced" | |
| # ββ Routes ββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def serve_frontend(): | |
| if not HTML_PATH.exists(): | |
| raise HTTPException(status_code=404, detail="index.html not found") | |
| return HTMLResponse(content=HTML_PATH.read_text(encoding="utf-8")) | |
| def health(): | |
| return { | |
| "status": "ok", | |
| "phones": len(PHONE_DB), | |
| "vision_enabled": bool(ANTHROPIC_KEY), | |
| "version": "3.0.0" | |
| } | |
| def list_phones(): | |
| return [ | |
| { | |
| "id": p.id, "name": p.name, "brand": p.brand, | |
| "os": p.os, "price_pkr": p.price_pkr, | |
| "price_label": p.price_label, "emoji": p.emoji, | |
| "tags": p.tags, | |
| "priceoye_url": p.priceoye_url, | |
| "whatmobile_url": p.whatmobile_url, | |
| "category_scores": {k: round(v, 1) for k, v in get_category_scores(p).items()}, | |
| } | |
| for p in PHONE_DB | |
| ] | |
| def chat(req: ChatRequest): | |
| session_id = req.session_id or str(uuid.uuid4()) | |
| if session_id not in SESSIONS or req.session_id is None: | |
| greeting = get_greeting() | |
| SESSIONS[session_id] = greeting["state"] | |
| return {**greeting, "session_id": session_id} | |
| state = SESSIONS[session_id] | |
| response = handle_message(req.message, state) | |
| SESSIONS[session_id] = response["state"] | |
| return {**response, "session_id": session_id} | |
| def compare_phones(req: CompareRequest): | |
| phone_a = next((p for p in PHONE_DB if p.id == req.phone_id_a), None) | |
| phone_b = next((p for p in PHONE_DB if p.id == req.phone_id_b), None) | |
| if not phone_a: | |
| raise HTTPException(400, f"Phone '{req.phone_id_a}' not found") | |
| if not phone_b: | |
| raise HTTPException(400, f"Phone '{req.phone_id_b}' not found") | |
| cats_a = get_category_scores(phone_a) | |
| cats_b = get_category_scores(phone_b) | |
| cats = list(cats_a.keys()) | |
| winner_map = {} | |
| for cat in cats: | |
| sa, sb = cats_a[cat], cats_b[cat] | |
| if sa > sb + 0.2: | |
| winner_map[cat] = "a" | |
| elif sb > sa + 0.2: | |
| winner_map[cat] = "b" | |
| else: | |
| winner_map[cat] = "tie" | |
| overall_a = round(sum(cats_a.values()) / len(cats_a), 2) | |
| overall_b = round(sum(cats_b.values()) / len(cats_b), 2) | |
| overall_winner = "a" if overall_a > overall_b else "b" if overall_b > overall_a else "tie" | |
| # Priority-weighted score | |
| prefs = UserPreferences( | |
| budget=max(phone_a.price_pkr, phone_b.price_pkr) + 50000, | |
| os_preference="any", | |
| priority=req.priority, | |
| ) | |
| score_a = score_phone(phone_a, prefs, PHONE_DB) | |
| score_b = score_phone(phone_b, prefs, PHONE_DB) | |
| priority_winner = "a" if score_a > score_b else "b" | |
| return { | |
| "phone_a": { | |
| "id": phone_a.id, "name": phone_a.name, | |
| "emoji": phone_a.emoji, "price_label": phone_a.price_label, | |
| "priceoye_url": phone_a.priceoye_url, | |
| "whatmobile_url": phone_a.whatmobile_url, | |
| "category_scores": {k: round(v, 1) for k, v in cats_a.items()}, | |
| "highlights": phone_a.highlights, | |
| "overall": overall_a, | |
| "priority_score": round(score_a, 1), | |
| }, | |
| "phone_b": { | |
| "id": phone_b.id, "name": phone_b.name, | |
| "emoji": phone_b.emoji, "price_label": phone_b.price_label, | |
| "priceoye_url": phone_b.priceoye_url, | |
| "whatmobile_url": phone_b.whatmobile_url, | |
| "category_scores": {k: round(v, 1) for k, v in cats_b.items()}, | |
| "highlights": phone_b.highlights, | |
| "overall": overall_b, | |
| "priority_score": round(score_b, 1), | |
| }, | |
| "winners": winner_map, | |
| "overall_winner": overall_winner, | |
| "priority_winner": priority_winner, | |
| "priority": req.priority, | |
| } | |
| async def detect_phone(file: UploadFile = File(...)): | |
| """ | |
| Detect which phone is in an uploaded image using Claude Vision. | |
| Requires ANTHROPIC_API_KEY in environment. | |
| """ | |
| if not ANTHROPIC_KEY: | |
| raise HTTPException( | |
| status_code=503, | |
| detail="Image detection unavailable. ANTHROPIC_API_KEY not set." | |
| ) | |
| # Read and encode image | |
| image_bytes = await file.read() | |
| if len(image_bytes) > 10 * 1024 * 1024: # 10MB limit | |
| raise HTTPException(400, "Image too large. Please use under 10MB.") | |
| b64_image = base64.standard_b64encode(image_bytes).decode("utf-8") | |
| media_type = file.content_type or "image/jpeg" | |
| if media_type not in ("image/jpeg", "image/png", "image/webp", "image/gif"): | |
| media_type = "image/jpeg" | |
| try: | |
| client = anthropic.Anthropic(api_key=ANTHROPIC_KEY) | |
| response = client.messages.create( | |
| model="claude-sonnet-4-20250514", | |
| max_tokens=300, | |
| messages=[{ | |
| "role": "user", | |
| "content": [ | |
| { | |
| "type": "image", | |
| "source": { | |
| "type": "base64", | |
| "media_type": media_type, | |
| "data": b64_image, | |
| } | |
| }, | |
| { | |
| "type": "text", | |
| "text": ( | |
| "Identify the smartphone/mobile phone in this image. " | |
| "Respond in this exact JSON format only:\n" | |
| '{"brand": "Samsung", "model": "Galaxy S24", "confidence": "high"}\n' | |
| "If no phone is visible, respond:\n" | |
| '{"brand": null, "model": null, "confidence": "none"}\n' | |
| "Confidence levels: high, medium, low, none. " | |
| "Give the exact marketed name." | |
| ) | |
| } | |
| ] | |
| }] | |
| ) | |
| import json | |
| raw = response.content[0].text.strip() | |
| # Strip markdown fences if present | |
| raw = raw.replace("```json", "").replace("```", "").strip() | |
| result = json.loads(raw) | |
| except Exception as e: | |
| return { | |
| "brand": None, "model": None, | |
| "confidence": "none", | |
| "error": str(e) | |
| } | |
| # Try to match detected phone to DB | |
| matched_db_phone = None | |
| if result.get("model"): | |
| detected_name = f"{result.get('brand', '')} {result.get('model', '')}".lower() | |
| for p in PHONE_DB: | |
| if any(word in p.name.lower() for word in detected_name.split() if len(word) > 3): | |
| matched_db_phone = { | |
| "id": p.id, "name": p.name, | |
| "emoji": p.emoji, "price_label": p.price_label, | |
| "priceoye_url": p.priceoye_url, | |
| } | |
| break | |
| return { | |
| **result, | |
| "matched_in_db": matched_db_phone, | |
| "search_url": f"https://priceoye.pk/search?q={result.get('brand', '')}+{result.get('model', '')}".replace(" ", "+"), | |
| "whatmobile_url": f"https://www.whatmobile.com.pk/search?search={result.get('brand', '')}+{result.get('model', '')}".replace(" ", "+"), | |
| } | |
| # ββ Entry point βββββββββββββββββββββββββββββββββββββββββββ | |
| if __name__ == "__main__": | |
| import uvicorn | |
| uvicorn.run("app:app", host="0.0.0.0", port=7860, reload=False) | |