smhs16's picture
Upload 8 files
f651dd8 verified
"""
PriceOye AI Phone Advisor β€” FastAPI Backend
============================================
Hugging Face Spaces deployment on port 7860.
Endpoints:
GET / β†’ serves index.html
POST /chat β†’ conversation
POST /compare β†’ compare two phones
POST /detect β†’ detect phone from image (uses Claude vision)
GET /phones β†’ full catalog JSON
GET /health β†’ health check
Set ANTHROPIC_API_KEY in HF Space Secrets before deploying.
"""
import os
import base64
import uuid
from pathlib import Path
from typing import Optional
from fastapi import FastAPI, UploadFile, File, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import HTMLResponse
from pydantic import BaseModel
import anthropic
from src.conversation import (
ConversationState, handle_message, get_greeting,
format_phone_summary, PHONE_DB
)
from src.scoring_engine import (
get_category_scores, get_sub_scores, recommend,
UserPreferences, score_phone
)
# ── Init ──────────────────────────────────────────────────
app = FastAPI(title="PriceOye AI", version="3.0.0")
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], allow_methods=["*"], allow_headers=["*"]
)
SESSIONS: dict[str, ConversationState] = {}
HTML_PATH = Path("index.html")
ANTHROPIC_KEY = os.environ.get("ANTHROPIC_API_KEY", "")
# ── Schemas ───────────────────────────────────────────────
class ChatRequest(BaseModel):
session_id: Optional[str] = None
message: str
class CompareRequest(BaseModel):
phone_id_a: str
phone_id_b: str
priority: Optional[str] = "balanced"
# ── Routes ────────────────────────────────────────────────
@app.get("/", response_class=HTMLResponse)
def serve_frontend():
if not HTML_PATH.exists():
raise HTTPException(status_code=404, detail="index.html not found")
return HTMLResponse(content=HTML_PATH.read_text(encoding="utf-8"))
@app.get("/health")
def health():
return {
"status": "ok",
"phones": len(PHONE_DB),
"vision_enabled": bool(ANTHROPIC_KEY),
"version": "3.0.0"
}
@app.get("/phones")
def list_phones():
return [
{
"id": p.id, "name": p.name, "brand": p.brand,
"os": p.os, "price_pkr": p.price_pkr,
"price_label": p.price_label, "emoji": p.emoji,
"tags": p.tags,
"priceoye_url": p.priceoye_url,
"whatmobile_url": p.whatmobile_url,
"category_scores": {k: round(v, 1) for k, v in get_category_scores(p).items()},
}
for p in PHONE_DB
]
@app.post("/chat")
def chat(req: ChatRequest):
session_id = req.session_id or str(uuid.uuid4())
if session_id not in SESSIONS or req.session_id is None:
greeting = get_greeting()
SESSIONS[session_id] = greeting["state"]
return {**greeting, "session_id": session_id}
state = SESSIONS[session_id]
response = handle_message(req.message, state)
SESSIONS[session_id] = response["state"]
return {**response, "session_id": session_id}
@app.post("/compare")
def compare_phones(req: CompareRequest):
phone_a = next((p for p in PHONE_DB if p.id == req.phone_id_a), None)
phone_b = next((p for p in PHONE_DB if p.id == req.phone_id_b), None)
if not phone_a:
raise HTTPException(400, f"Phone '{req.phone_id_a}' not found")
if not phone_b:
raise HTTPException(400, f"Phone '{req.phone_id_b}' not found")
cats_a = get_category_scores(phone_a)
cats_b = get_category_scores(phone_b)
cats = list(cats_a.keys())
winner_map = {}
for cat in cats:
sa, sb = cats_a[cat], cats_b[cat]
if sa > sb + 0.2:
winner_map[cat] = "a"
elif sb > sa + 0.2:
winner_map[cat] = "b"
else:
winner_map[cat] = "tie"
overall_a = round(sum(cats_a.values()) / len(cats_a), 2)
overall_b = round(sum(cats_b.values()) / len(cats_b), 2)
overall_winner = "a" if overall_a > overall_b else "b" if overall_b > overall_a else "tie"
# Priority-weighted score
prefs = UserPreferences(
budget=max(phone_a.price_pkr, phone_b.price_pkr) + 50000,
os_preference="any",
priority=req.priority,
)
score_a = score_phone(phone_a, prefs, PHONE_DB)
score_b = score_phone(phone_b, prefs, PHONE_DB)
priority_winner = "a" if score_a > score_b else "b"
return {
"phone_a": {
"id": phone_a.id, "name": phone_a.name,
"emoji": phone_a.emoji, "price_label": phone_a.price_label,
"priceoye_url": phone_a.priceoye_url,
"whatmobile_url": phone_a.whatmobile_url,
"category_scores": {k: round(v, 1) for k, v in cats_a.items()},
"highlights": phone_a.highlights,
"overall": overall_a,
"priority_score": round(score_a, 1),
},
"phone_b": {
"id": phone_b.id, "name": phone_b.name,
"emoji": phone_b.emoji, "price_label": phone_b.price_label,
"priceoye_url": phone_b.priceoye_url,
"whatmobile_url": phone_b.whatmobile_url,
"category_scores": {k: round(v, 1) for k, v in cats_b.items()},
"highlights": phone_b.highlights,
"overall": overall_b,
"priority_score": round(score_b, 1),
},
"winners": winner_map,
"overall_winner": overall_winner,
"priority_winner": priority_winner,
"priority": req.priority,
}
@app.post("/detect")
async def detect_phone(file: UploadFile = File(...)):
"""
Detect which phone is in an uploaded image using Claude Vision.
Requires ANTHROPIC_API_KEY in environment.
"""
if not ANTHROPIC_KEY:
raise HTTPException(
status_code=503,
detail="Image detection unavailable. ANTHROPIC_API_KEY not set."
)
# Read and encode image
image_bytes = await file.read()
if len(image_bytes) > 10 * 1024 * 1024: # 10MB limit
raise HTTPException(400, "Image too large. Please use under 10MB.")
b64_image = base64.standard_b64encode(image_bytes).decode("utf-8")
media_type = file.content_type or "image/jpeg"
if media_type not in ("image/jpeg", "image/png", "image/webp", "image/gif"):
media_type = "image/jpeg"
try:
client = anthropic.Anthropic(api_key=ANTHROPIC_KEY)
response = client.messages.create(
model="claude-sonnet-4-20250514",
max_tokens=300,
messages=[{
"role": "user",
"content": [
{
"type": "image",
"source": {
"type": "base64",
"media_type": media_type,
"data": b64_image,
}
},
{
"type": "text",
"text": (
"Identify the smartphone/mobile phone in this image. "
"Respond in this exact JSON format only:\n"
'{"brand": "Samsung", "model": "Galaxy S24", "confidence": "high"}\n'
"If no phone is visible, respond:\n"
'{"brand": null, "model": null, "confidence": "none"}\n'
"Confidence levels: high, medium, low, none. "
"Give the exact marketed name."
)
}
]
}]
)
import json
raw = response.content[0].text.strip()
# Strip markdown fences if present
raw = raw.replace("```json", "").replace("```", "").strip()
result = json.loads(raw)
except Exception as e:
return {
"brand": None, "model": None,
"confidence": "none",
"error": str(e)
}
# Try to match detected phone to DB
matched_db_phone = None
if result.get("model"):
detected_name = f"{result.get('brand', '')} {result.get('model', '')}".lower()
for p in PHONE_DB:
if any(word in p.name.lower() for word in detected_name.split() if len(word) > 3):
matched_db_phone = {
"id": p.id, "name": p.name,
"emoji": p.emoji, "price_label": p.price_label,
"priceoye_url": p.priceoye_url,
}
break
return {
**result,
"matched_in_db": matched_db_phone,
"search_url": f"https://priceoye.pk/search?q={result.get('brand', '')}+{result.get('model', '')}".replace(" ", "+"),
"whatmobile_url": f"https://www.whatmobile.com.pk/search?search={result.get('brand', '')}+{result.get('model', '')}".replace(" ", "+"),
}
# ── Entry point ───────────────────────────────────────────
if __name__ == "__main__":
import uvicorn
uvicorn.run("app:app", host="0.0.0.0", port=7860, reload=False)