Spaces:
Runtime error
Runtime error
| import streamlit as st # type: ignore | |
| import pandas as pd # type: ignore | |
| import asyncio | |
| import uvloop # type: ignore | |
| import requests # type: ignore | |
| from io import StringIO, BytesIO | |
| from urllib3.util.retry import Retry # type: ignore | |
| from concurrent.futures import ThreadPoolExecutor, as_completed | |
| from requests.adapters import HTTPAdapter # type: ignore | |
| import collections.abc | |
| import smtplib | |
| import os | |
| import ssl | |
| from email.message import EmailMessage | |
| from openai import OpenAI | |
| # Compatibility for MutableSet | |
| import collections | |
| collections.MutableSet = collections.abc.MutableSet | |
| # Use uvloop for a faster event loop on supported platforms | |
| asyncio.set_event_loop_policy(uvloop.EventLoopPolicy()) | |
| # --------------------------- | |
| # Streamlit Page Config | |
| # --------------------------- | |
| st.set_page_config(page_title="π Trending Products", layout="wide") | |
| st.title("π Trending Products Report") | |
| ox_username = os.environ.get("OX_NAME") | |
| ox_password = os.environ.get("OX_PSWRD") | |
| xt_username = os.environ.get("XT_USER") | |
| xt_password = os.environ.get("XT_PSWRD") | |
| email_user = os.environ.get("EMAIL") | |
| email_password = os.environ.get("PSWRD") | |
| openai_api_key= os.environ.get("AI_API") | |
| rates = { | |
| 'USD': 0.75, 'EUR': 0.86, 'CAD': 0.54, 'AED': 0.21, | |
| 'JPY': 0.0051, 'BRL': 0.134, 'MXN': 0.038, 'EGP': 0.015, | |
| 'GBP': 1.0, 'AUD': 0.49, 'SGD': 0.58, 'PLN': 0.20 | |
| } | |
| DOMAIN_CURRENCY = { | |
| 'com': 'USD', 'co.uk': 'GBP', 'de': 'EUR', | |
| 'fr': 'EUR', 'es': 'EUR', 'it': 'EUR', | |
| 'ca': 'CAD' | |
| } | |
| profit_rate = 15 | |
| # --------------------------- | |
| # HTTP Session with Retries | |
| # --------------------------- | |
| retry_strategy = Retry( | |
| total=3, | |
| backoff_factor=1, | |
| status_forcelist=[429, 500, 502, 503, 504], | |
| allowed_methods=['POST'] | |
| ) | |
| adapter = HTTPAdapter(max_retries=retry_strategy) | |
| session = requests.Session() | |
| session.mount('https://', adapter) | |
| session.mount('http://', adapter) | |
| # --------------------------- | |
| # Cached API Calls | |
| # --------------------------- | |
| def get_xtopus_token(): | |
| resp = session.post( | |
| 'https://xt.xtopus.io/api/user/login', | |
| json={'username': xt_username, 'password': xt_password}, | |
| headers={'Content-Type': 'application/json'}, | |
| timeout=(5, 60) | |
| ) | |
| resp.raise_for_status() | |
| return resp.json()['data']['accessToken'] | |
| def fetch_bestseller(cat_id, domain, geo): | |
| currency = DOMAIN_CURRENCY.get(domain, 'GBP') | |
| payload = { | |
| 'source': 'amazon_bestsellers', | |
| 'domain': domain, | |
| 'geo_location': geo, | |
| 'query': cat_id, | |
| 'render': 'html', | |
| 'parse': True, | |
| 'context': [{'key': 'currency', 'value': currency}] | |
| } | |
| resp = session.post( | |
| 'https://realtime.oxylabs.io/v1/queries', | |
| auth=(ox_username, ox_password), | |
| json=payload, | |
| timeout=(5, 60) | |
| ) | |
| resp.raise_for_status() | |
| return resp.json() | |
| def get_listing(asin): | |
| resp = session.post( | |
| 'https://xt.xtopus.io/api/merchant/product/quantity', | |
| json={'market_product_id': asin}, | |
| headers={'Authorization': get_xtopus_token()}, | |
| timeout=(5, 60) | |
| ) | |
| if resp.status_code != 200: | |
| return {} | |
| return resp.json().get('data', {}) | |
| def get_trending_brands(titles: tuple, domain: str) -> str: | |
| client = OpenAI(api_key=openai_api_key) | |
| prompt = ( | |
| f"Given the following product titles for Amazon domain '{domain}':\n\n" | |
| + "\n".join(f"- {t}" for t in titles) | |
| + "\n\nThis is the latest data of 2025 just fetched now from amazon. Please suggest the TOP 5 trending brands among these products." | |
| ) | |
| resp = client.chat.completions.create( | |
| model="gpt-4o-mini-search-preview-2025-03-11", | |
| messages=[{"role": "user", "content": prompt}] | |
| ) | |
| return resp.choices[0].message.content.strip() | |
| # Initialize token | |
| xt_token = get_xtopus_token() | |
| # --------------------------- | |
| # Domain & Query Setup | |
| # --------------------------- | |
| domains = { | |
| 'com': ('90210', | |
| ['11060451','11060521','11056591','11057241','11058281'], | |
| {'11060451':'Skincare','11060521':'Bath & Body','11056591':'Fragrance', | |
| '11057241':'Hair Care','11058281':'Make-up'}), | |
| 'co.uk': ('G5 8AJ', | |
| ['118464031','74129031','118457031','66469031','118423031'], | |
| {'118464031':'Skincare','74129031':'Bath & Body','118457031':'Fragrance', | |
| '66469031':'Haircare','118423031':'Make-up'}), | |
| 'fr': ('75001', | |
| ['211020031','212073031','210965031','211038031','210972031'], | |
| {'211020031':'Skincare','212073031':'Bath & Body','210965031':'Fragrance', | |
| '211038031':'Haircare','210972031':'Make-up'}), | |
| 'de': ('10115', | |
| ['122878031','64272031','122877031','64486031','122880031'], | |
| {'122878031':'Skincare','64272031':'Bath & Body','122877031':'Fragrance', | |
| '64486031':'Haircare','122880031':'Make-up'}), | |
| 'es': ('08001', | |
| ['6397934031','4347676031','6397798031','4347698031','6397823031'], | |
| {'6397934031':'Skincare','4347676031':'Bath & Body','6397798031':'Fragrance', | |
| '4347698031':'Haircare','6397823031':'Make-up'}), | |
| 'it': ('20019', | |
| ['6306897031','4327880031','6306898031','4327902031','6306900031'], | |
| {'6306897031':'Skincare','4327880031':'Bath & Body','6306898031':'Fragrance', | |
| '4327902031':'Haircare','6306900031':'Make-up'}), | |
| 'ca': ('M4C 1A1', | |
| ['6344740011','6344392011','6344431011','6344512011','6344635011'], | |
| {'6344740011':'Skincare','6344392011':'Bath & Body','6344431011':'Fragrance', | |
| '6344512011':'Haircare','6344635011':'Make-up'}), | |
| } | |
| chosen = st.selectbox('Select Domain:', list(domains.keys())) | |
| geo, queries, names = domains[chosen] | |
| placeholders = {q: st.empty() for q in queries} | |
| if 'data_ready' not in st.session_state: | |
| st.session_state['data_ready'] = False | |
| # Sidebar: Export + Email + Trending Brands | |
| st.sidebar.header("Data Export & Email") | |
| if not st.session_state['data_ready']: | |
| st.sidebar.warning("Data not ready. Fetch Bestseller Data first.") | |
| email_to = st.sidebar.text_input("Send reports to:") | |
| send_email = st.sidebar.button("Send Reports") | |
| # --------------------------- | |
| # Fetch & Render Data | |
| # --------------------------- | |
| if st.button('Fetch Bestseller Data'): | |
| all_tables = [] | |
| with st.spinner("Fetching bestseller data..."): | |
| with ThreadPoolExecutor(max_workers=len(queries)) as executor: | |
| future_map = { | |
| executor.submit(fetch_bestseller, cid, chosen, geo): cid | |
| for cid in queries | |
| } | |
| for future in as_completed(future_map): | |
| cid = future_map[future] | |
| cat_name = names[cid] | |
| ph = placeholders[cid] | |
| ph.header(cat_name) | |
| with st.spinner(f"Processing {cat_name}..."): | |
| res = future.result() | |
| if not res or 'error' in res: | |
| ph.warning(f"β {cat_name}: {res.get('error')}") | |
| continue | |
| items = res['results'][0]['content']['results'] | |
| asins = [i['asin'] for i in items if i.get('asin')] | |
| lm = {} | |
| with ThreadPoolExecutor() as ex2: | |
| f2map = {ex2.submit(get_listing, a): a for a in asins} | |
| for f2 in as_completed(f2map): | |
| lm[f2map[f2]] = f2.result() or {} | |
| rows = [] | |
| rate = rates[DOMAIN_CURRENCY[chosen]] | |
| for it in items: | |
| asin = it.get('asin') | |
| title = it.get('title', '') | |
| price = it.get('price') | |
| qty = lm.get(asin, {}).get('product_qty') | |
| qty_disp = qty if qty is not None else 'N/A' | |
| gbp = round(pd.to_numeric(price, errors='coerce') * rate, 2) | |
| # Predicted Buy Box price | |
| predicted = round(gbp * 0.975, 2) | |
| refund = 0.01 * predicted | |
| mod = (profit_rate * predicted) / 100 | |
| # Domain-specific rules | |
| if chosen == 'co.uk': | |
| tax, ship = 1.2, (2.10 if predicted <= 30 else 2.50) | |
| comm = predicted * (0.08 if predicted <= 10 else 0.15) | |
| elif chosen == 'com': | |
| tax, ship = 1, 7.6 | |
| comm = predicted * (0.08 if predicted <= 7.5 else 0.15) | |
| elif chosen == 'es': | |
| tax, ship = 1.21, 4 | |
| comm = predicted * (0.08 if predicted <= 8.53 else 0.15) | |
| elif chosen == 'de': | |
| tax, ship = 1.19, 3.3 | |
| comm = predicted * (0.08 if predicted <= 8.53 else 0.15) | |
| elif chosen == 'fr': | |
| tax, ship = 1.20, 4 | |
| comm = predicted * (0.08 if predicted <= 8.53 else 0.15) | |
| elif chosen == 'it': | |
| tax, ship = 1.22, 5 | |
| comm = predicted * (0.08 if predicted <= 8.53 else 0.15) | |
| elif chosen == 'ca': | |
| tax, ship = 1, 7 | |
| comm = predicted * (0.08 if predicted <= 8.10 else 0.15) | |
| else: | |
| tax, ship, comm = 1, 0, predicted * 0.15 | |
| max_cost = round((predicted / tax) - mod - comm - ship - refund, 2) | |
| rows.append({ | |
| 'ASIN': asin, | |
| 'Title': title, | |
| 'Price': price, | |
| 'GBP Price': gbp, | |
| 'Predicted BB Price': predicted, | |
| 'Maximum Source Price': max_cost, | |
| 'Quantity Available': qty_disp, | |
| 'View on Amazon': f"https://www.amazon.{chosen}/dp/{asin}" | |
| }) | |
| df = pd.DataFrame(rows) | |
| df = df[df['Maximum Source Price'] > 0] | |
| all_tables.append(df) | |
| # Display splits | |
| not_s = df[df['Quantity Available'] == 'N/A'] | |
| no_s = df[df['Quantity Available'] == 0] | |
| yes_s = df[pd.to_numeric(df['Quantity Available'], errors='coerce') > 0] | |
| c1, c2, c3 = ph.columns(3) | |
| for col, subset, hdr in [ | |
| (c1, not_s, 'Not Selling'), | |
| (c2, no_s, 'No Stock'), | |
| (c3, yes_s, 'Selling'), | |
| ]: | |
| with col: | |
| st.subheader(f"{cat_name} β {hdr}") | |
| st.dataframe( | |
| subset.reset_index(drop=True), | |
| height=300, | |
| use_container_width=True, | |
| column_config={ | |
| 'View on Amazon': st.column_config.LinkColumn( | |
| label='View on Amazon', | |
| display_text='View Now' | |
| ) | |
| } | |
| ) | |
| # Hot Products | |
| combined = pd.concat(all_tables, ignore_index=True) | |
| hot = combined.groupby(['ASIN','Title','View on Amazon']).filter(lambda x: len(x) > 1) | |
| if not hot.empty: | |
| st.subheader('π₯ Hot Products') | |
| st.dataframe( | |
| hot.drop_duplicates().reset_index(drop=True), | |
| height=200, | |
| use_container_width=True, | |
| column_config={ | |
| 'View on Amazon': st.column_config.LinkColumn( | |
| label='View on Amazon', | |
| display_text='View Now' | |
| ) | |
| } | |
| ) | |
| st.balloons() | |
| st.session_state['data_ready'] = True | |
| # Prepare export buffers | |
| export_df = pd.concat(all_tables, ignore_index=True) | |
| # CSV | |
| buf_csv = StringIO() | |
| export_df.to_csv(buf_csv, index=False) | |
| st.session_state['buf_csv'] = buf_csv.getvalue() | |
| st.sidebar.download_button('Export CSV', st.session_state['buf_csv'], | |
| f'{chosen}.csv', 'text/csv') | |
| # Excel | |
| buf_xl = BytesIO() | |
| with pd.ExcelWriter(buf_xl, engine='openpyxl') as writer: | |
| for df in all_tables: | |
| qn = df['ASIN'].iloc[0] | |
| df[df['Quantity Available']=='N/A'] \ | |
| .to_excel(writer, sheet_name=f'{qn}_NotSell', index=False) | |
| df[df['Quantity Available']==0] \ | |
| .to_excel(writer, sheet_name=f'{qn}_NoStock', index=False) | |
| df[pd.to_numeric(df['Quantity Available'], errors='coerce')>0] \ | |
| .to_excel(writer, sheet_name=f'{qn}_Selling', index=False) | |
| buf_xl.seek(0) | |
| st.session_state['buf_xl'] = buf_xl.getvalue() | |
| st.sidebar.download_button('Export Excel', st.session_state['buf_xl'], | |
| f'{chosen}.xlsx', | |
| 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet') | |
| # HTML | |
| buf_html = StringIO() | |
| buf_html.write('<html><body>') | |
| for df in all_tables: | |
| buf_html.write(df.to_html(index=False, escape=False)) | |
| buf_html.write('</body></html>') | |
| st.session_state['buf_html'] = buf_html.getvalue() | |
| st.sidebar.download_button('Export HTML', st.session_state['buf_html'], | |
| f'{chosen}.html', 'text/html') | |
| # Cached GPT-4 Trending Brands | |
| try: | |
| titles = tuple(export_df['Title'].dropna().unique().tolist()) | |
| brands = get_trending_brands(titles, chosen) | |
| st.sidebar.markdown("### π·οΈ Top 5 Trending Brands") | |
| st.sidebar.markdown(brands) | |
| except Exception as e: | |
| st.sidebar.error(f"Failed to fetch trending brands: {e}") | |
| else: | |
| st.info("Click 'Fetch Bestseller Data' to begin.") | |
| # --------------------------- | |
| # Send Email if requested | |
| # --------------------------- | |
| if send_email: | |
| if not st.session_state['data_ready']: | |
| st.sidebar.error("Run 'Fetch Bestseller Data' before sending email.") | |
| elif not email_to: | |
| st.sidebar.error("Enter a valid recipient email address.") | |
| else: | |
| try: | |
| msg = EmailMessage() | |
| msg['Subject'] = f"Trending Products Reports β {chosen}" | |
| msg['From'] = email_user | |
| msg['To'] = email_to | |
| msg.set_content("Attached: CSV, Excel & HTML reports of trending products.") | |
| # Attach CSV | |
| csv_b = st.session_state['buf_csv'].encode('utf-8') | |
| msg.add_attachment(csv_b, maintype='text', subtype='csv', | |
| filename=f'{chosen}.csv') | |
| # Attach Excel | |
| msg.add_attachment( | |
| st.session_state['buf_xl'], | |
| maintype='application', | |
| subtype='vnd.openxmlformats-officedocument.spreadsheetml.sheet', | |
| filename=f'{chosen}.xlsx' | |
| ) | |
| # Attach HTML | |
| html_b = st.session_state['buf_html'].encode('utf-8') | |
| msg.add_attachment(html_b, maintype='text', subtype='html', | |
| filename=f'{chosen}.html') | |
| context = ssl.create_default_context() | |
| with smtplib.SMTP_SSL('smtp.hostinger.com', 465, context=context) as server: | |
| server.login(email_user, email_password) | |
| server.send_message(msg) | |
| st.sidebar.success(f"Reports sent to {email_to}") | |
| except Exception as e: | |
| st.sidebar.error(f"Failed to send email: {e}") |