import os, io, base64, json, time, random from typing import Optional, Dict, Any, List, Tuple from urllib.parse import quote_plus from fastapi import FastAPI, Request, BackgroundTasks, Form from fastapi.responses import PlainTextResponse import httpx from bs4 import BeautifulSoup from PIL import Image from twilio.rest import Client as TwilioClient from openai import OpenAI # ---------------- App & Clients ---------------- app = FastAPI(title="TXTPRICE SMS Webhook (Async)") oai_client = OpenAI(api_key=os.getenv("OPENAI_API_KEY")) TW_SID = os.getenv("TWILIO_ACCOUNT_SID", "") TW_TOKEN = os.getenv("TWILIO_AUTH_TOKEN", "") TW_FROM = os.getenv("TWILIO_FROM", "") # e.g., +12175898085 TW_STATUS_CB = os.getenv("TWILIO_STATUS_CALLBACK", "") _twilio_ok = bool(TW_SID and TW_TOKEN and TW_FROM) twilio_client = TwilioClient(TW_SID, TW_TOKEN) if _twilio_ok else None # ---------------- LangChain (offer extraction) ---------------- from langchain_openai import ChatOpenAI from langchain_core.pydantic_v1 import BaseModel, Field from langchain_core.prompts import ChatPromptTemplate lc_llm = ChatOpenAI(model="gpt-4o-mini", temperature=0) MERCHANT_DOMAINS = ( "walmart.ca","realcanadiansuperstore.ca","amazon.ca","metro.ca", "nofrills.ca","freshco.com","well.ca","costco.ca","iga.net","londondrugs.com" ) class Offer(BaseModel): merchant: str = Field(...) title: str = Field(...) price: float = Field(...) url: str = Field(...) extract_prompt = ChatPromptTemplate.from_messages([ ("system","Extract one best CAD offer for the queried item from the page text. " "Return JSON: merchant,title,price(float),url. If none, return empty with price 0."), ("human","Query: {query}\nURL: {url}\n--- PAGE TEXT ---\n{text}\n--- END ---") ]) chain_extract = extract_prompt | lc_llm.with_structured_output(Offer) # ---------------- Helpers ---------------- def img_or_pdf_to_image_bytes(data: bytes, filename: str) -> bytes: name = (filename or "").lower() if name.endswith((".jpg",".jpeg",".png",".webp")): img = Image.open(io.BytesIO(data)).convert("RGB") buf = io.BytesIO(); img.save(buf, format="JPEG", quality=90); return buf.getvalue() if name.endswith(".pdf"): try: img = Image.open(io.BytesIO(data)).convert("RGB") buf = io.BytesIO(); img.save(buf, format="JPEG", quality=90); return buf.getvalue() except Exception: return data return data def b64_data_uri(data: bytes, mime: str) -> str: return f"data:{mime};base64," + base64.b64encode(data).decode("utf-8") def call_openai_vision_for_receipt(image_bytes: bytes) -> Dict[str, Any]: is_pdf = image_bytes[0:4] == b"%PDF" mime = "application/pdf" if is_pdf else "image/jpeg" system = "You are a strict, no-chitchat receipt parser for Canadian grocery receipts. Return ONLY JSON; prices in CAD." user_prompt = """ { "store":{"name":"string","address":"string|null","date":"YYYY-MM-DD|null"}, "items":[{"name":"string","size":"string|null","qty":1,"unit_price":0.00,"line_total":0.00}], "subtotal":0.00,"tax":0.00,"total":0.00 } Rules: shopper-friendly names; qty>=1; unit_price before tax; line_total=qty*unit_price; use null if missing. Return ONLY JSON. """ resp = oai_client.chat.completions.create( model="gpt-4o-mini", temperature=0, messages=[{"role":"system","content":system}, {"role":"user","content":[ {"type":"text","text":user_prompt}, {"type":"image_url","image_url":{"url":b64_data_uri(image_bytes,mime)}} ]}] ) s = resp.choices[0].message.content.strip() if s.startswith("```"): s = s.split("```",2)[1] if s.lower().startswith("json"): s = s.split("\n",1)[1] return json.loads(s) # --- search helpers --- from duckduckgo_search import DDGS try: from tavily import TavilyClient _HAS_TAVILY = True except Exception: _HAS_TAVILY = False def _fetch_text(url: str, timeout=15) -> str: try: headers = {"User-Agent":"Mozilla/5.0 (compatible; PriceAgent/1.0)"} with httpx.Client(follow_redirects=True, timeout=timeout) as client: r = client.get(url, headers=headers) soup = BeautifulSoup(r.text,"html.parser") for t in soup(["script","style","noscript"]): t.decompose() return " ".join(soup.get_text(separator=" ").split())[:12000] except Exception: return "" def _search_web(query: str, k: int = 5) -> List[str]: urls = [] if _HAS_TAVILY and os.getenv("TAVILY_API_KEY"): try: tv = TavilyClient(os.getenv("TAVILY_API_KEY")) res = tv.search(query=f"{query} price", max_results=k, include_domains=list(MERCHANT_DOMAINS)) for r in res.get("results", []): if r.get("url"): urls.append(r["url"]) except Exception: pass if not urls: try: with DDGS() as ddgs: for r in ddgs.text(f"{query} price", region="ca-en", max_results=k): u = r.get("href") or r.get("url") if u: urls.append(u) except Exception: pass urls = [u for u in urls if any(dom in u for dom in MERCHANT_DOMAINS)] return urls[:k] def langchain_price_lookup(item_name: str) -> Optional[Dict[str, Any]]: urls = _search_web(item_name, k=5) best = None for u in urls: text = _fetch_text(u) if not text: continue try: offer = chain_extract.invoke({"query": item_name, "url": u, "text": text}) except Exception: continue if offer and offer.price and (best is None or offer.price < best.price): best = offer if not best: return None return {"title": best.title or item_name, "price": float(best.price), "source": best.merchant or "Other store", "link": best.url or urls[0]} def research_prices(items: List[Dict[str, Any]], max_items=6): out=[] for it in items[:max_items]: name = it.get("name") if not name: continue offer = langchain_price_lookup(name) if not offer: continue unit = it.get("unit_price") cheaper = isinstance(unit,(int,float)) and offer["price"] < float(unit)-0.01 out.append({"item":name,"store":offer["source"],"price":offer["price"],"is_cheaper":cheaper}) time.sleep(0.25) return out def compute_savings(receipt, found): cheaper = [f for f in found if f.get("is_cheaper")] s=0.0 for f in cheaper: try: # if unit_price available, compute real diff; else heuristic small credit unit = next((i.get("unit_price") for i in receipt.get("items", []) if i.get("name")==f["item"]), None) if isinstance(unit,(int,float)): s += max(0.0, float(unit) - float(f["price"])) else: s += 1.0 except Exception: pass return round(s,2), cheaper def format_five_lines(receipt, savings, cheaper_list): store = (receipt.get("store") or {}).get("name") or "your store" total = receipt.get("total") or receipt.get("subtotal") try: total_txt = f"${float(str(total).replace('$','').strip()):.2f}" except Exception: total_txt = "N/A" lines = [ f"Receipt read: {store}, total {total_txt}.", f"I found potential savings of ${savings:.2f} by checking other stores.", ] if cheaper_list: items = "; ".join([f"{f['item']} @ {f['store']} for ${f['price']:.2f}" for f in cheaper_list[:3]]) lines.append(f"Cheaper picks: {items}.") else: lines.append("No clearly cheaper matches found right now for your items.") lines.append("Reply 'DEALS' anytime to get weekly picks tailored to your receipts.") return "\n".join(lines[:5]) # --- NEW: authenticated media download (fixes 401) --- def download_twilio_media(url: str, timeout: int = 20) -> bytes: """ Twilio MediaUrl0 requires HTTP Basic Auth with Account SID/Token. Handles both Twilio-hosted and public URLs. """ headers = {"User-Agent": "TXTPRICE/1.0"} auth = (TW_SID, TW_TOKEN) if url.startswith("https://api.twilio.com") else None with httpx.Client(follow_redirects=True, timeout=timeout, headers=headers, auth=auth) as client: r = client.get(url) r.raise_for_status() return r.content # ---------------- Background Task ---------------- def process_and_reply(media_url: str, to_number: str): if not _twilio_ok: return try: content = download_twilio_media(media_url) # <-- Authenticated download img_bytes = img_or_pdf_to_image_bytes(content, "mms.jpg") receipt = call_openai_vision_for_receipt(img_bytes) items = receipt.get("items") or [] if not items: msg = "I couldn't read items. Send a clearer photo." else: found = research_prices(items) savings, cheaper = compute_savings(receipt, found) msg = format_five_lines(receipt, savings, cheaper) except Exception as e: msg = f"Processing error: {e}" try: twilio_client.messages.create( to=to_number, from_=TW_FROM, body=msg, status_callback=TW_STATUS_CB or None # logs to /status ) except Exception as e: print(f"[ERROR] Twilio send failed: {e}") # ---------------- Routes ---------------- @app.get("/sms") async def sms_health(): return PlainTextResponse("SMS webhook is up (POST only).", media_type="text/plain") @app.post("/status") async def status_cb(MessageSid: str = Form(None), MessageStatus: str = Form(None)): print(f"[STATUS] MessageSid={MessageSid} MessageStatus={MessageStatus}") return PlainTextResponse("OK", media_type="text/plain") @app.post("/sms") async def sms_webhook(request: Request, background_tasks: BackgroundTasks): form = dict(await request.form()) from_number = form.get("From", "") num_media = int(form.get("NumMedia","0") or "0") media_url = form.get("MediaUrl0") if num_media > 0 else None # Immediate ack to beat 15s timeout if not media_url: ack = "Please MMS a clear photo of your grocery receipt to analyze savings." return PlainTextResponse(ack, media_type="application/xml") if _twilio_ok and from_number: background_tasks.add_task(process_and_reply, media_url, from_number) reply = "Got it—processing your receipt now. You’ll get a follow-up text shortly." else: reply = "Got your image, but Twilio credentials are missing. Set TWILIO_ACCOUNT_SID, TWILIO_AUTH_TOKEN, TWILIO_FROM." return PlainTextResponse(reply, media_type="application/xml")