Spaces:
Runtime error
Runtime error
| """ | |
| Reusable helper to fetch Swiggy order e-mails and return a list[dict]. | |
| Usage: | |
| from swiggy_scraper import fetch_swiggy_orders | |
| orders = fetch_swiggy_orders("17-May-2025", "20-May-2025") | |
| """ | |
| import os, imaplib, json | |
| from email import message_from_bytes | |
| from bs4 import BeautifulSoup | |
| from openai import OpenAI | |
| from dotenv import load_dotenv | |
| from datetime import datetime, timedelta | |
| from email.utils import parsedate_to_datetime | |
| from zoneinfo import ZoneInfo | |
| from db_schema import init_db, get_orders_by_date_from_db, save_orders_to_db | |
| load_dotenv() | |
| APP_PASSWORD = os.getenv("APP_PASSWORD") | |
| EMAIL_ID = os.getenv("EMAIL_ID") | |
| OPENAI_KEY = os.getenv("OPENAI_API_KEY") | |
| client = OpenAI(api_key=OPENAI_KEY) | |
| def _imap_connect(): | |
| m = imaplib.IMAP4_SSL("imap.gmail.com") | |
| m.login(EMAIL_ID, APP_PASSWORD) | |
| m.select('"[Gmail]/All Mail"') | |
| return m | |
| def _email_to_clean_text(msg): | |
| html = next( | |
| (part.get_payload(decode=True).decode(errors="ignore") | |
| for part in msg.walk() | |
| if part.get_content_type() == "text/html"), | |
| None, | |
| ) | |
| if not html: | |
| return "" | |
| soup = BeautifulSoup(html, "html.parser") | |
| for t in soup(["script", "style", "head", "meta", "link"]): | |
| t.decompose() | |
| return "\n".join( | |
| line.strip() for line in soup.get_text("\n").splitlines() if line.strip() | |
| ) | |
| def _get_all_dates(start_date: str, end_date: str): | |
| start = datetime.strptime(start_date, "%d-%b-%Y") | |
| end = datetime.strptime(end_date, "%d-%b-%Y") | |
| delta = (end - start).days + 1 | |
| return [(start + timedelta(days=i)).strftime("%Y-%m-%d") for i in range(delta)] | |
| def _extract_with_llm(email_number, subject, body, email_date, email_time): | |
| current_email = { | |
| "subject": subject, | |
| "body": body | |
| } | |
| prompt = f""" | |
| You are given a Swiggy order confirmation email with a subject and body. | |
| Extract and return only the following: | |
| - "restaurant_name": name of the restaurant | |
| - "delivery_address": the delivery address | |
| - "items": a list of ordered items, each with "name", "quantity", and "price" (number) | |
| - "total_price": the total bill paid including taxes, charges, etc. | |
| Example output format: | |
| {{ | |
| "restaurant_name": "Dominos Pizza", | |
| "delivery_address": "123 Main St, City", | |
| "total_price": 567, | |
| "items": [ | |
| {{ "name": "Veg Pizza", "quantity": 2, "price": 199 }}, | |
| {{ "name": "Coke", "quantity": 1, "price": 45 }} | |
| ] | |
| }} | |
| Return only valid JSON. No extra text or comments. | |
| {json.dumps(current_email, indent=2)} | |
| """ | |
| try: | |
| rsp = client.chat.completions.create( | |
| model="gpt-4o-mini", | |
| temperature=0, | |
| messages=[ | |
| {"role": "system", "content": "You are a precise JSON extractor."}, | |
| {"role": "user", "content": prompt}, | |
| ], | |
| ) | |
| # Attempt to parse the returned content | |
| parsed_data = json.loads(rsp.choices[0].message.content) | |
| # Wrap into final structure | |
| final_output = { | |
| "email_number": email_number, | |
| "order_date": email_date, | |
| "order_time": email_time, | |
| "restaurant_name": parsed_data.get("restaurant_name", ""), | |
| "delivery_address": parsed_data.get("delivery_address", ""), | |
| "items": parsed_data.get("items", []), | |
| "total_price": parsed_data.get("total_price", 0) | |
| } | |
| return final_output | |
| except json.JSONDecodeError as json_err: | |
| return { | |
| "email_number": email_number, | |
| "error": f"JSON decoding failed: {str(json_err)}", | |
| "raw_response": rsp.choices[0].message.content if 'rsp' in locals() else None | |
| } | |
| except Exception as e: | |
| return { | |
| "email_number": email_number, | |
| "error": f"Unexpected error: {str(e)}" | |
| } | |
| def fetch_swiggy_orders(start_date: str, end_date: str) -> list[dict]: | |
| mail = _imap_connect() | |
| all_dates = _get_all_dates(start_date, end_date) | |
| orders = [] | |
| for date_str in all_dates: | |
| # 1) Try loading from DB | |
| day_orders = get_orders_by_date_from_db(date_str) | |
| if day_orders: | |
| print(f"{date_str} loaded from DB") | |
| orders.extend(day_orders) | |
| continue | |
| # 2) Otherwise scrape emails for that date | |
| print(f"Fetching Swiggy emails for {date_str}") | |
| dt_obj = datetime.strptime(date_str, "%Y-%m-%d") | |
| next_day = (dt_obj + timedelta(days=1)).strftime("%d-%b-%Y") | |
| this_day = dt_obj.strftime("%d-%b-%Y") | |
| crit = f'(FROM "noreply@swiggy.in") SINCE "{this_day}" BEFORE "{next_day}"' | |
| _, data = mail.search(None, crit) | |
| ids = data[0].split() | |
| scraped_orders = [] | |
| for idx, eid in enumerate(ids, 1): | |
| _, msg_data = mail.fetch(eid, "(RFC822)") | |
| msg = message_from_bytes(msg_data[0][1]) | |
| subject = msg.get("Subject", "") | |
| body_text = _email_to_clean_text(msg) | |
| try: | |
| dt_obj = parsedate_to_datetime(msg["Date"]).astimezone(ZoneInfo("Asia/Kolkata")) | |
| email_date = dt_obj.strftime("%d-%b-%Y") | |
| email_time = dt_obj.strftime("%H:%M:%S") | |
| order = _extract_with_llm(idx, subject, body_text, email_date, email_time) | |
| scraped_orders.append(order) | |
| except Exception as exc: | |
| scraped_orders.append({"email_number": idx, "error": str(exc)}) | |
| # 3) Save newly scraped data to DB | |
| save_orders_to_db(date_str, scraped_orders) | |
| orders.extend(scraped_orders) | |
| mail.logout() | |
| return orders | |