import os, io, base64, json, time, random
from typing import Optional, Dict, Any, List, Tuple
from urllib.parse import quote_plus
from fastapi import FastAPI, Request, BackgroundTasks, Form
from fastapi.responses import PlainTextResponse
import httpx
from bs4 import BeautifulSoup
from PIL import Image
from twilio.rest import Client as TwilioClient
from openai import OpenAI
# ---------------- App & Clients ----------------
app = FastAPI(title="TXTPRICE SMS Webhook (Async)")
oai_client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
TW_SID = os.getenv("TWILIO_ACCOUNT_SID", "")
TW_TOKEN = os.getenv("TWILIO_AUTH_TOKEN", "")
TW_FROM = os.getenv("TWILIO_FROM", "") # e.g., +12175898085
TW_STATUS_CB = os.getenv("TWILIO_STATUS_CALLBACK", "")
_twilio_ok = bool(TW_SID and TW_TOKEN and TW_FROM)
twilio_client = TwilioClient(TW_SID, TW_TOKEN) if _twilio_ok else None
# ---------------- LangChain (offer extraction) ----------------
from langchain_openai import ChatOpenAI
from langchain_core.pydantic_v1 import BaseModel, Field
from langchain_core.prompts import ChatPromptTemplate
lc_llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
MERCHANT_DOMAINS = (
"walmart.ca","realcanadiansuperstore.ca","amazon.ca","metro.ca",
"nofrills.ca","freshco.com","well.ca","costco.ca","iga.net","londondrugs.com"
)
class Offer(BaseModel):
merchant: str = Field(...)
title: str = Field(...)
price: float = Field(...)
url: str = Field(...)
extract_prompt = ChatPromptTemplate.from_messages([
("system","Extract one best CAD offer for the queried item from the page text. "
"Return JSON: merchant,title,price(float),url. If none, return empty with price 0."),
("human","Query: {query}\nURL: {url}\n--- PAGE TEXT ---\n{text}\n--- END ---")
])
chain_extract = extract_prompt | lc_llm.with_structured_output(Offer)
# ---------------- Helpers ----------------
def img_or_pdf_to_image_bytes(data: bytes, filename: str) -> bytes:
name = (filename or "").lower()
if name.endswith((".jpg",".jpeg",".png",".webp")):
img = Image.open(io.BytesIO(data)).convert("RGB")
buf = io.BytesIO(); img.save(buf, format="JPEG", quality=90); return buf.getvalue()
if name.endswith(".pdf"):
try:
img = Image.open(io.BytesIO(data)).convert("RGB")
buf = io.BytesIO(); img.save(buf, format="JPEG", quality=90); return buf.getvalue()
except Exception:
return data
return data
def b64_data_uri(data: bytes, mime: str) -> str:
return f"data:{mime};base64," + base64.b64encode(data).decode("utf-8")
def call_openai_vision_for_receipt(image_bytes: bytes) -> Dict[str, Any]:
is_pdf = image_bytes[0:4] == b"%PDF"
mime = "application/pdf" if is_pdf else "image/jpeg"
system = "You are a strict, no-chitchat receipt parser for Canadian grocery receipts. Return ONLY JSON; prices in CAD."
user_prompt = """
{ "store":{"name":"string","address":"string|null","date":"YYYY-MM-DD|null"},
"items":[{"name":"string","size":"string|null","qty":1,"unit_price":0.00,"line_total":0.00}],
"subtotal":0.00,"tax":0.00,"total":0.00 }
Rules: shopper-friendly names; qty>=1; unit_price before tax; line_total=qty*unit_price; use null if missing.
Return ONLY JSON.
"""
resp = oai_client.chat.completions.create(
model="gpt-4o-mini", temperature=0,
messages=[{"role":"system","content":system},
{"role":"user","content":[
{"type":"text","text":user_prompt},
{"type":"image_url","image_url":{"url":b64_data_uri(image_bytes,mime)}}
]}]
)
s = resp.choices[0].message.content.strip()
if s.startswith("```"):
s = s.split("```",2)[1]
if s.lower().startswith("json"): s = s.split("\n",1)[1]
return json.loads(s)
# --- search helpers ---
from duckduckgo_search import DDGS
try:
from tavily import TavilyClient
_HAS_TAVILY = True
except Exception:
_HAS_TAVILY = False
def _fetch_text(url: str, timeout=15) -> str:
try:
headers = {"User-Agent":"Mozilla/5.0 (compatible; PriceAgent/1.0)"}
with httpx.Client(follow_redirects=True, timeout=timeout) as client:
r = client.get(url, headers=headers)
soup = BeautifulSoup(r.text,"html.parser")
for t in soup(["script","style","noscript"]): t.decompose()
return " ".join(soup.get_text(separator=" ").split())[:12000]
except Exception:
return ""
def _search_web(query: str, k: int = 5) -> List[str]:
urls = []
if _HAS_TAVILY and os.getenv("TAVILY_API_KEY"):
try:
tv = TavilyClient(os.getenv("TAVILY_API_KEY"))
res = tv.search(query=f"{query} price", max_results=k, include_domains=list(MERCHANT_DOMAINS))
for r in res.get("results", []):
if r.get("url"): urls.append(r["url"])
except Exception: pass
if not urls:
try:
with DDGS() as ddgs:
for r in ddgs.text(f"{query} price", region="ca-en", max_results=k):
u = r.get("href") or r.get("url")
if u: urls.append(u)
except Exception: pass
urls = [u for u in urls if any(dom in u for dom in MERCHANT_DOMAINS)]
return urls[:k]
def langchain_price_lookup(item_name: str) -> Optional[Dict[str, Any]]:
urls = _search_web(item_name, k=5)
best = None
for u in urls:
text = _fetch_text(u)
if not text: continue
try:
offer = chain_extract.invoke({"query": item_name, "url": u, "text": text})
except Exception:
continue
if offer and offer.price and (best is None or offer.price < best.price):
best = offer
if not best: return None
return {"title": best.title or item_name, "price": float(best.price),
"source": best.merchant or "Other store", "link": best.url or urls[0]}
def research_prices(items: List[Dict[str, Any]], max_items=6):
out=[]
for it in items[:max_items]:
name = it.get("name")
if not name: continue
offer = langchain_price_lookup(name)
if not offer: continue
unit = it.get("unit_price")
cheaper = isinstance(unit,(int,float)) and offer["price"] < float(unit)-0.01
out.append({"item":name,"store":offer["source"],"price":offer["price"],"is_cheaper":cheaper})
time.sleep(0.25)
return out
def compute_savings(receipt, found):
cheaper = [f for f in found if f.get("is_cheaper")]
s=0.0
for f in cheaper:
try:
# if unit_price available, compute real diff; else heuristic small credit
unit = next((i.get("unit_price") for i in receipt.get("items", []) if i.get("name")==f["item"]), None)
if isinstance(unit,(int,float)):
s += max(0.0, float(unit) - float(f["price"]))
else:
s += 1.0
except Exception:
pass
return round(s,2), cheaper
def format_five_lines(receipt, savings, cheaper_list):
store = (receipt.get("store") or {}).get("name") or "your store"
total = receipt.get("total") or receipt.get("subtotal")
try:
total_txt = f"${float(str(total).replace('$','').strip()):.2f}"
except Exception:
total_txt = "N/A"
lines = [
f"Receipt read: {store}, total {total_txt}.",
f"I found potential savings of ${savings:.2f} by checking other stores.",
]
if cheaper_list:
items = "; ".join([f"{f['item']} @ {f['store']} for ${f['price']:.2f}" for f in cheaper_list[:3]])
lines.append(f"Cheaper picks: {items}.")
else:
lines.append("No clearly cheaper matches found right now for your items.")
lines.append("Reply 'DEALS' anytime to get weekly picks tailored to your receipts.")
return "\n".join(lines[:5])
# --- NEW: authenticated media download (fixes 401) ---
def download_twilio_media(url: str, timeout: int = 20) -> bytes:
"""
Twilio MediaUrl0 requires HTTP Basic Auth with Account SID/Token.
Handles both Twilio-hosted and public URLs.
"""
headers = {"User-Agent": "TXTPRICE/1.0"}
auth = (TW_SID, TW_TOKEN) if url.startswith("https://api.twilio.com") else None
with httpx.Client(follow_redirects=True, timeout=timeout, headers=headers, auth=auth) as client:
r = client.get(url)
r.raise_for_status()
return r.content
# ---------------- Background Task ----------------
def process_and_reply(media_url: str, to_number: str):
if not _twilio_ok:
return
try:
content = download_twilio_media(media_url) # <-- Authenticated download
img_bytes = img_or_pdf_to_image_bytes(content, "mms.jpg")
receipt = call_openai_vision_for_receipt(img_bytes)
items = receipt.get("items") or []
if not items:
msg = "I couldn't read items. Send a clearer photo."
else:
found = research_prices(items)
savings, cheaper = compute_savings(receipt, found)
msg = format_five_lines(receipt, savings, cheaper)
except Exception as e:
msg = f"Processing error: {e}"
try:
twilio_client.messages.create(
to=to_number,
from_=TW_FROM,
body=msg,
status_callback=TW_STATUS_CB or None # logs to /status
)
except Exception as e:
print(f"[ERROR] Twilio send failed: {e}")
# ---------------- Routes ----------------
@app.get("/sms")
async def sms_health():
return PlainTextResponse("SMS webhook is up (POST only).", media_type="text/plain")
@app.post("/status")
async def status_cb(MessageSid: str = Form(None), MessageStatus: str = Form(None)):
print(f"[STATUS] MessageSid={MessageSid} MessageStatus={MessageStatus}")
return PlainTextResponse("OK", media_type="text/plain")
@app.post("/sms")
async def sms_webhook(request: Request, background_tasks: BackgroundTasks):
form = dict(await request.form())
from_number = form.get("From", "")
num_media = int(form.get("NumMedia","0") or "0")
media_url = form.get("MediaUrl0") if num_media > 0 else None
# Immediate ack to beat 15s timeout
if not media_url:
ack = "Please MMS a clear photo of your grocery receipt to analyze savings."
return PlainTextResponse(ack, media_type="application/xml")
if _twilio_ok and from_number:
background_tasks.add_task(process_and_reply, media_url, from_number)
reply = "Got it—processing your receipt now. You’ll get a follow-up text shortly."
else:
reply = "Got your image, but Twilio credentials are missing. Set TWILIO_ACCOUNT_SID, TWILIO_AUTH_TOKEN, TWILIO_FROM."
return PlainTextResponse(reply, media_type="application/xml")