| | import os, io, base64, json, time, random |
| | from typing import Optional, Dict, Any, List, Tuple |
| | from urllib.parse import quote_plus |
| |
|
| | from fastapi import FastAPI, Request, BackgroundTasks, Form |
| | from fastapi.responses import PlainTextResponse |
| | import httpx |
| | from bs4 import BeautifulSoup |
| | from PIL import Image |
| |
|
| | from twilio.rest import Client as TwilioClient |
| | from openai import OpenAI |
| |
|
| | |
| | app = FastAPI(title="TXTPRICE SMS Webhook (Async)") |
| |
|
| | oai_client = OpenAI(api_key=os.getenv("OPENAI_API_KEY")) |
| |
|
| | TW_SID = os.getenv("TWILIO_ACCOUNT_SID", "") |
| | TW_TOKEN = os.getenv("TWILIO_AUTH_TOKEN", "") |
| | TW_FROM = os.getenv("TWILIO_FROM", "") |
| | TW_STATUS_CB = os.getenv("TWILIO_STATUS_CALLBACK", "") |
| | _twilio_ok = bool(TW_SID and TW_TOKEN and TW_FROM) |
| | twilio_client = TwilioClient(TW_SID, TW_TOKEN) if _twilio_ok else None |
| |
|
| | |
| | from langchain_openai import ChatOpenAI |
| | from langchain_core.pydantic_v1 import BaseModel, Field |
| | from langchain_core.prompts import ChatPromptTemplate |
| |
|
| | lc_llm = ChatOpenAI(model="gpt-4o-mini", temperature=0) |
| |
|
| | MERCHANT_DOMAINS = ( |
| | "walmart.ca","realcanadiansuperstore.ca","amazon.ca","metro.ca", |
| | "nofrills.ca","freshco.com","well.ca","costco.ca","iga.net","londondrugs.com" |
| | ) |
| |
|
| | class Offer(BaseModel): |
| | merchant: str = Field(...) |
| | title: str = Field(...) |
| | price: float = Field(...) |
| | url: str = Field(...) |
| |
|
| | extract_prompt = ChatPromptTemplate.from_messages([ |
| | ("system","Extract one best CAD offer for the queried item from the page text. " |
| | "Return JSON: merchant,title,price(float),url. If none, return empty with price 0."), |
| | ("human","Query: {query}\nURL: {url}\n--- PAGE TEXT ---\n{text}\n--- END ---") |
| | ]) |
| | chain_extract = extract_prompt | lc_llm.with_structured_output(Offer) |
| |
|
| | |
| | def img_or_pdf_to_image_bytes(data: bytes, filename: str) -> bytes: |
| | name = (filename or "").lower() |
| | if name.endswith((".jpg",".jpeg",".png",".webp")): |
| | img = Image.open(io.BytesIO(data)).convert("RGB") |
| | buf = io.BytesIO(); img.save(buf, format="JPEG", quality=90); return buf.getvalue() |
| | if name.endswith(".pdf"): |
| | try: |
| | img = Image.open(io.BytesIO(data)).convert("RGB") |
| | buf = io.BytesIO(); img.save(buf, format="JPEG", quality=90); return buf.getvalue() |
| | except Exception: |
| | return data |
| | return data |
| |
|
| | def b64_data_uri(data: bytes, mime: str) -> str: |
| | return f"data:{mime};base64," + base64.b64encode(data).decode("utf-8") |
| |
|
| | def call_openai_vision_for_receipt(image_bytes: bytes) -> Dict[str, Any]: |
| | is_pdf = image_bytes[0:4] == b"%PDF" |
| | mime = "application/pdf" if is_pdf else "image/jpeg" |
| | system = "You are a strict, no-chitchat receipt parser for Canadian grocery receipts. Return ONLY JSON; prices in CAD." |
| | user_prompt = """ |
| | { "store":{"name":"string","address":"string|null","date":"YYYY-MM-DD|null"}, |
| | "items":[{"name":"string","size":"string|null","qty":1,"unit_price":0.00,"line_total":0.00}], |
| | "subtotal":0.00,"tax":0.00,"total":0.00 } |
| | Rules: shopper-friendly names; qty>=1; unit_price before tax; line_total=qty*unit_price; use null if missing. |
| | Return ONLY JSON. |
| | """ |
| | resp = oai_client.chat.completions.create( |
| | model="gpt-4o-mini", temperature=0, |
| | messages=[{"role":"system","content":system}, |
| | {"role":"user","content":[ |
| | {"type":"text","text":user_prompt}, |
| | {"type":"image_url","image_url":{"url":b64_data_uri(image_bytes,mime)}} |
| | ]}] |
| | ) |
| | s = resp.choices[0].message.content.strip() |
| | if s.startswith("```"): |
| | s = s.split("```",2)[1] |
| | if s.lower().startswith("json"): s = s.split("\n",1)[1] |
| | return json.loads(s) |
| |
|
| | |
| | from duckduckgo_search import DDGS |
| | try: |
| | from tavily import TavilyClient |
| | _HAS_TAVILY = True |
| | except Exception: |
| | _HAS_TAVILY = False |
| |
|
| | def _fetch_text(url: str, timeout=15) -> str: |
| | try: |
| | headers = {"User-Agent":"Mozilla/5.0 (compatible; PriceAgent/1.0)"} |
| | with httpx.Client(follow_redirects=True, timeout=timeout) as client: |
| | r = client.get(url, headers=headers) |
| | soup = BeautifulSoup(r.text,"html.parser") |
| | for t in soup(["script","style","noscript"]): t.decompose() |
| | return " ".join(soup.get_text(separator=" ").split())[:12000] |
| | except Exception: |
| | return "" |
| |
|
| | def _search_web(query: str, k: int = 5) -> List[str]: |
| | urls = [] |
| | if _HAS_TAVILY and os.getenv("TAVILY_API_KEY"): |
| | try: |
| | tv = TavilyClient(os.getenv("TAVILY_API_KEY")) |
| | res = tv.search(query=f"{query} price", max_results=k, include_domains=list(MERCHANT_DOMAINS)) |
| | for r in res.get("results", []): |
| | if r.get("url"): urls.append(r["url"]) |
| | except Exception: pass |
| | if not urls: |
| | try: |
| | with DDGS() as ddgs: |
| | for r in ddgs.text(f"{query} price", region="ca-en", max_results=k): |
| | u = r.get("href") or r.get("url") |
| | if u: urls.append(u) |
| | except Exception: pass |
| | urls = [u for u in urls if any(dom in u for dom in MERCHANT_DOMAINS)] |
| | return urls[:k] |
| |
|
| | def langchain_price_lookup(item_name: str) -> Optional[Dict[str, Any]]: |
| | urls = _search_web(item_name, k=5) |
| | best = None |
| | for u in urls: |
| | text = _fetch_text(u) |
| | if not text: continue |
| | try: |
| | offer = chain_extract.invoke({"query": item_name, "url": u, "text": text}) |
| | except Exception: |
| | continue |
| | if offer and offer.price and (best is None or offer.price < best.price): |
| | best = offer |
| | if not best: return None |
| | return {"title": best.title or item_name, "price": float(best.price), |
| | "source": best.merchant or "Other store", "link": best.url or urls[0]} |
| |
|
| | def research_prices(items: List[Dict[str, Any]], max_items=6): |
| | out=[] |
| | for it in items[:max_items]: |
| | name = it.get("name") |
| | if not name: continue |
| | offer = langchain_price_lookup(name) |
| | if not offer: continue |
| | unit = it.get("unit_price") |
| | cheaper = isinstance(unit,(int,float)) and offer["price"] < float(unit)-0.01 |
| | out.append({"item":name,"store":offer["source"],"price":offer["price"],"is_cheaper":cheaper}) |
| | time.sleep(0.25) |
| | return out |
| |
|
| | def compute_savings(receipt, found): |
| | cheaper = [f for f in found if f.get("is_cheaper")] |
| | s=0.0 |
| | for f in cheaper: |
| | try: |
| | |
| | unit = next((i.get("unit_price") for i in receipt.get("items", []) if i.get("name")==f["item"]), None) |
| | if isinstance(unit,(int,float)): |
| | s += max(0.0, float(unit) - float(f["price"])) |
| | else: |
| | s += 1.0 |
| | except Exception: |
| | pass |
| | return round(s,2), cheaper |
| |
|
| | def format_five_lines(receipt, savings, cheaper_list): |
| | store = (receipt.get("store") or {}).get("name") or "your store" |
| | total = receipt.get("total") or receipt.get("subtotal") |
| | try: |
| | total_txt = f"${float(str(total).replace('$','').strip()):.2f}" |
| | except Exception: |
| | total_txt = "N/A" |
| | lines = [ |
| | f"Receipt read: {store}, total {total_txt}.", |
| | f"I found potential savings of ${savings:.2f} by checking other stores.", |
| | ] |
| | if cheaper_list: |
| | items = "; ".join([f"{f['item']} @ {f['store']} for ${f['price']:.2f}" for f in cheaper_list[:3]]) |
| | lines.append(f"Cheaper picks: {items}.") |
| | else: |
| | lines.append("No clearly cheaper matches found right now for your items.") |
| | lines.append("Reply 'DEALS' anytime to get weekly picks tailored to your receipts.") |
| | return "\n".join(lines[:5]) |
| |
|
| | |
| | def download_twilio_media(url: str, timeout: int = 20) -> bytes: |
| | """ |
| | Twilio MediaUrl0 requires HTTP Basic Auth with Account SID/Token. |
| | Handles both Twilio-hosted and public URLs. |
| | """ |
| | headers = {"User-Agent": "TXTPRICE/1.0"} |
| | auth = (TW_SID, TW_TOKEN) if url.startswith("https://api.twilio.com") else None |
| | with httpx.Client(follow_redirects=True, timeout=timeout, headers=headers, auth=auth) as client: |
| | r = client.get(url) |
| | r.raise_for_status() |
| | return r.content |
| |
|
| | |
| | def process_and_reply(media_url: str, to_number: str): |
| | if not _twilio_ok: |
| | return |
| | try: |
| | content = download_twilio_media(media_url) |
| | img_bytes = img_or_pdf_to_image_bytes(content, "mms.jpg") |
| | receipt = call_openai_vision_for_receipt(img_bytes) |
| | items = receipt.get("items") or [] |
| | if not items: |
| | msg = "I couldn't read items. Send a clearer photo." |
| | else: |
| | found = research_prices(items) |
| | savings, cheaper = compute_savings(receipt, found) |
| | msg = format_five_lines(receipt, savings, cheaper) |
| | except Exception as e: |
| | msg = f"Processing error: {e}" |
| |
|
| | try: |
| | twilio_client.messages.create( |
| | to=to_number, |
| | from_=TW_FROM, |
| | body=msg, |
| | status_callback=TW_STATUS_CB or None |
| | ) |
| | except Exception as e: |
| | print(f"[ERROR] Twilio send failed: {e}") |
| |
|
| | |
| | @app.get("/sms") |
| | async def sms_health(): |
| | return PlainTextResponse("SMS webhook is up (POST only).", media_type="text/plain") |
| |
|
| | @app.post("/status") |
| | async def status_cb(MessageSid: str = Form(None), MessageStatus: str = Form(None)): |
| | print(f"[STATUS] MessageSid={MessageSid} MessageStatus={MessageStatus}") |
| | return PlainTextResponse("OK", media_type="text/plain") |
| |
|
| | @app.post("/sms") |
| | async def sms_webhook(request: Request, background_tasks: BackgroundTasks): |
| | form = dict(await request.form()) |
| | from_number = form.get("From", "") |
| | num_media = int(form.get("NumMedia","0") or "0") |
| | media_url = form.get("MediaUrl0") if num_media > 0 else None |
| |
|
| | |
| | if not media_url: |
| | ack = "<Response><Message>Please MMS a clear photo of your grocery receipt to analyze savings.</Message></Response>" |
| | return PlainTextResponse(ack, media_type="application/xml") |
| |
|
| | if _twilio_ok and from_number: |
| | background_tasks.add_task(process_and_reply, media_url, from_number) |
| | reply = "<Response><Message>Got it—processing your receipt now. You’ll get a follow-up text shortly.</Message></Response>" |
| | else: |
| | reply = "<Response><Message>Got your image, but Twilio credentials are missing. Set TWILIO_ACCOUNT_SID, TWILIO_AUTH_TOKEN, TWILIO_FROM.</Message></Response>" |
| |
|
| | return PlainTextResponse(reply, media_type="application/xml") |
| |
|