File size: 10,924 Bytes
0fc94bc 141e99e 0fc94bc 93ebdf2 141e99e 0fc94bc 93ebdf2 141e99e 0fc94bc 93ebdf2 141e99e 0fc94bc 93ebdf2 0fc94bc 141e99e 0fc94bc 93ebdf2 0fc94bc 93ebdf2 0fc94bc 93ebdf2 141e99e 0fc94bc 141e99e 0fc94bc 141e99e 0fc94bc 141e99e 0fc94bc 141e99e 0fc94bc 141e99e 0fc94bc 141e99e 0fc94bc 141e99e 0fc94bc 141e99e 0fc94bc 141e99e 0fc94bc 141e99e 0fc94bc 141e99e 0fc94bc 93ebdf2 0fc94bc 141e99e 0fc94bc 141e99e 93ebdf2 0fc94bc 141e99e 0fc94bc 93ebdf2 0fc94bc 93ebdf2 0fc94bc 93ebdf2 0fc94bc 93ebdf2 0fc94bc 141e99e 93ebdf2 141e99e 93ebdf2 141e99e 0fc94bc 93ebdf2 0fc94bc 93ebdf2 0fc94bc | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 | import os, io, base64, json, time, random
from typing import Optional, Dict, Any, List, Tuple
from urllib.parse import quote_plus
from fastapi import FastAPI, Request, BackgroundTasks, Form
from fastapi.responses import PlainTextResponse
import httpx
from bs4 import BeautifulSoup
from PIL import Image
from twilio.rest import Client as TwilioClient
from openai import OpenAI
# ---------------- App & Clients ----------------
app = FastAPI(title="TXTPRICE SMS Webhook (Async)")
oai_client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
TW_SID = os.getenv("TWILIO_ACCOUNT_SID", "")
TW_TOKEN = os.getenv("TWILIO_AUTH_TOKEN", "")
TW_FROM = os.getenv("TWILIO_FROM", "") # e.g., +12175898085
TW_STATUS_CB = os.getenv("TWILIO_STATUS_CALLBACK", "")
_twilio_ok = bool(TW_SID and TW_TOKEN and TW_FROM)
twilio_client = TwilioClient(TW_SID, TW_TOKEN) if _twilio_ok else None
# ---------------- LangChain (offer extraction) ----------------
from langchain_openai import ChatOpenAI
from langchain_core.pydantic_v1 import BaseModel, Field
from langchain_core.prompts import ChatPromptTemplate
lc_llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
MERCHANT_DOMAINS = (
"walmart.ca","realcanadiansuperstore.ca","amazon.ca","metro.ca",
"nofrills.ca","freshco.com","well.ca","costco.ca","iga.net","londondrugs.com"
)
class Offer(BaseModel):
merchant: str = Field(...)
title: str = Field(...)
price: float = Field(...)
url: str = Field(...)
extract_prompt = ChatPromptTemplate.from_messages([
("system","Extract one best CAD offer for the queried item from the page text. "
"Return JSON: merchant,title,price(float),url. If none, return empty with price 0."),
("human","Query: {query}\nURL: {url}\n--- PAGE TEXT ---\n{text}\n--- END ---")
])
chain_extract = extract_prompt | lc_llm.with_structured_output(Offer)
# ---------------- Helpers ----------------
def img_or_pdf_to_image_bytes(data: bytes, filename: str) -> bytes:
name = (filename or "").lower()
if name.endswith((".jpg",".jpeg",".png",".webp")):
img = Image.open(io.BytesIO(data)).convert("RGB")
buf = io.BytesIO(); img.save(buf, format="JPEG", quality=90); return buf.getvalue()
if name.endswith(".pdf"):
try:
img = Image.open(io.BytesIO(data)).convert("RGB")
buf = io.BytesIO(); img.save(buf, format="JPEG", quality=90); return buf.getvalue()
except Exception:
return data
return data
def b64_data_uri(data: bytes, mime: str) -> str:
return f"data:{mime};base64," + base64.b64encode(data).decode("utf-8")
def call_openai_vision_for_receipt(image_bytes: bytes) -> Dict[str, Any]:
is_pdf = image_bytes[0:4] == b"%PDF"
mime = "application/pdf" if is_pdf else "image/jpeg"
system = "You are a strict, no-chitchat receipt parser for Canadian grocery receipts. Return ONLY JSON; prices in CAD."
user_prompt = """
{ "store":{"name":"string","address":"string|null","date":"YYYY-MM-DD|null"},
"items":[{"name":"string","size":"string|null","qty":1,"unit_price":0.00,"line_total":0.00}],
"subtotal":0.00,"tax":0.00,"total":0.00 }
Rules: shopper-friendly names; qty>=1; unit_price before tax; line_total=qty*unit_price; use null if missing.
Return ONLY JSON.
"""
resp = oai_client.chat.completions.create(
model="gpt-4o-mini", temperature=0,
messages=[{"role":"system","content":system},
{"role":"user","content":[
{"type":"text","text":user_prompt},
{"type":"image_url","image_url":{"url":b64_data_uri(image_bytes,mime)}}
]}]
)
s = resp.choices[0].message.content.strip()
if s.startswith("```"):
s = s.split("```",2)[1]
if s.lower().startswith("json"): s = s.split("\n",1)[1]
return json.loads(s)
# --- search helpers ---
from duckduckgo_search import DDGS
try:
from tavily import TavilyClient
_HAS_TAVILY = True
except Exception:
_HAS_TAVILY = False
def _fetch_text(url: str, timeout=15) -> str:
try:
headers = {"User-Agent":"Mozilla/5.0 (compatible; PriceAgent/1.0)"}
with httpx.Client(follow_redirects=True, timeout=timeout) as client:
r = client.get(url, headers=headers)
soup = BeautifulSoup(r.text,"html.parser")
for t in soup(["script","style","noscript"]): t.decompose()
return " ".join(soup.get_text(separator=" ").split())[:12000]
except Exception:
return ""
def _search_web(query: str, k: int = 5) -> List[str]:
urls = []
if _HAS_TAVILY and os.getenv("TAVILY_API_KEY"):
try:
tv = TavilyClient(os.getenv("TAVILY_API_KEY"))
res = tv.search(query=f"{query} price", max_results=k, include_domains=list(MERCHANT_DOMAINS))
for r in res.get("results", []):
if r.get("url"): urls.append(r["url"])
except Exception: pass
if not urls:
try:
with DDGS() as ddgs:
for r in ddgs.text(f"{query} price", region="ca-en", max_results=k):
u = r.get("href") or r.get("url")
if u: urls.append(u)
except Exception: pass
urls = [u for u in urls if any(dom in u for dom in MERCHANT_DOMAINS)]
return urls[:k]
def langchain_price_lookup(item_name: str) -> Optional[Dict[str, Any]]:
urls = _search_web(item_name, k=5)
best = None
for u in urls:
text = _fetch_text(u)
if not text: continue
try:
offer = chain_extract.invoke({"query": item_name, "url": u, "text": text})
except Exception:
continue
if offer and offer.price and (best is None or offer.price < best.price):
best = offer
if not best: return None
return {"title": best.title or item_name, "price": float(best.price),
"source": best.merchant or "Other store", "link": best.url or urls[0]}
def research_prices(items: List[Dict[str, Any]], max_items=6):
out=[]
for it in items[:max_items]:
name = it.get("name")
if not name: continue
offer = langchain_price_lookup(name)
if not offer: continue
unit = it.get("unit_price")
cheaper = isinstance(unit,(int,float)) and offer["price"] < float(unit)-0.01
out.append({"item":name,"store":offer["source"],"price":offer["price"],"is_cheaper":cheaper})
time.sleep(0.25)
return out
def compute_savings(receipt, found):
cheaper = [f for f in found if f.get("is_cheaper")]
s=0.0
for f in cheaper:
try:
# if unit_price available, compute real diff; else heuristic small credit
unit = next((i.get("unit_price") for i in receipt.get("items", []) if i.get("name")==f["item"]), None)
if isinstance(unit,(int,float)):
s += max(0.0, float(unit) - float(f["price"]))
else:
s += 1.0
except Exception:
pass
return round(s,2), cheaper
def format_five_lines(receipt, savings, cheaper_list):
store = (receipt.get("store") or {}).get("name") or "your store"
total = receipt.get("total") or receipt.get("subtotal")
try:
total_txt = f"${float(str(total).replace('$','').strip()):.2f}"
except Exception:
total_txt = "N/A"
lines = [
f"Receipt read: {store}, total {total_txt}.",
f"I found potential savings of ${savings:.2f} by checking other stores.",
]
if cheaper_list:
items = "; ".join([f"{f['item']} @ {f['store']} for ${f['price']:.2f}" for f in cheaper_list[:3]])
lines.append(f"Cheaper picks: {items}.")
else:
lines.append("No clearly cheaper matches found right now for your items.")
lines.append("Reply 'DEALS' anytime to get weekly picks tailored to your receipts.")
return "\n".join(lines[:5])
# --- NEW: authenticated media download (fixes 401) ---
def download_twilio_media(url: str, timeout: int = 20) -> bytes:
"""
Twilio MediaUrl0 requires HTTP Basic Auth with Account SID/Token.
Handles both Twilio-hosted and public URLs.
"""
headers = {"User-Agent": "TXTPRICE/1.0"}
auth = (TW_SID, TW_TOKEN) if url.startswith("https://api.twilio.com") else None
with httpx.Client(follow_redirects=True, timeout=timeout, headers=headers, auth=auth) as client:
r = client.get(url)
r.raise_for_status()
return r.content
# ---------------- Background Task ----------------
def process_and_reply(media_url: str, to_number: str):
if not _twilio_ok:
return
try:
content = download_twilio_media(media_url) # <-- Authenticated download
img_bytes = img_or_pdf_to_image_bytes(content, "mms.jpg")
receipt = call_openai_vision_for_receipt(img_bytes)
items = receipt.get("items") or []
if not items:
msg = "I couldn't read items. Send a clearer photo."
else:
found = research_prices(items)
savings, cheaper = compute_savings(receipt, found)
msg = format_five_lines(receipt, savings, cheaper)
except Exception as e:
msg = f"Processing error: {e}"
try:
twilio_client.messages.create(
to=to_number,
from_=TW_FROM,
body=msg,
status_callback=TW_STATUS_CB or None # logs to /status
)
except Exception as e:
print(f"[ERROR] Twilio send failed: {e}")
# ---------------- Routes ----------------
@app.get("/sms")
async def sms_health():
return PlainTextResponse("SMS webhook is up (POST only).", media_type="text/plain")
@app.post("/status")
async def status_cb(MessageSid: str = Form(None), MessageStatus: str = Form(None)):
print(f"[STATUS] MessageSid={MessageSid} MessageStatus={MessageStatus}")
return PlainTextResponse("OK", media_type="text/plain")
@app.post("/sms")
async def sms_webhook(request: Request, background_tasks: BackgroundTasks):
form = dict(await request.form())
from_number = form.get("From", "")
num_media = int(form.get("NumMedia","0") or "0")
media_url = form.get("MediaUrl0") if num_media > 0 else None
# Immediate ack to beat 15s timeout
if not media_url:
ack = "<Response><Message>Please MMS a clear photo of your grocery receipt to analyze savings.</Message></Response>"
return PlainTextResponse(ack, media_type="application/xml")
if _twilio_ok and from_number:
background_tasks.add_task(process_and_reply, media_url, from_number)
reply = "<Response><Message>Got it—processing your receipt now. You’ll get a follow-up text shortly.</Message></Response>"
else:
reply = "<Response><Message>Got your image, but Twilio credentials are missing. Set TWILIO_ACCOUNT_SID, TWILIO_AUTH_TOKEN, TWILIO_FROM.</Message></Response>"
return PlainTextResponse(reply, media_type="application/xml")
|