Uni-Prod / app.py
Vnmrsharma's picture
Update app.py
0e2181a verified
import streamlit as st # type: ignore
import pandas as pd # type: ignore
import asyncio
import uvloop # type: ignore
import requests # type: ignore
from io import StringIO, BytesIO
from urllib3.util.retry import Retry # type: ignore
from concurrent.futures import ThreadPoolExecutor, as_completed
from requests.adapters import HTTPAdapter # type: ignore
import collections.abc
import smtplib
import os
import ssl
from email.message import EmailMessage
from openai import OpenAI
# Compatibility for MutableSet
import collections
collections.MutableSet = collections.abc.MutableSet
# Use uvloop for a faster event loop on supported platforms
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
# ---------------------------
# Streamlit Page Config
# ---------------------------
st.set_page_config(page_title="πŸ›’ Trending Products", layout="wide")
st.title("πŸ›’ Trending Products Report")
ox_username = os.environ.get("OX_NAME")
ox_password = os.environ.get("OX_PSWRD")
xt_username = os.environ.get("XT_USER")
xt_password = os.environ.get("XT_PSWRD")
email_user = os.environ.get("EMAIL")
email_password = os.environ.get("PSWRD")
openai_api_key= os.environ.get("AI_API")
rates = {
'USD': 0.75, 'EUR': 0.86, 'CAD': 0.54, 'AED': 0.21,
'JPY': 0.0051, 'BRL': 0.134, 'MXN': 0.038, 'EGP': 0.015,
'GBP': 1.0, 'AUD': 0.49, 'SGD': 0.58, 'PLN': 0.20
}
DOMAIN_CURRENCY = {
'com': 'USD', 'co.uk': 'GBP', 'de': 'EUR',
'fr': 'EUR', 'es': 'EUR', 'it': 'EUR',
'ca': 'CAD'
}
profit_rate = 15
# ---------------------------
# HTTP Session with Retries
# ---------------------------
retry_strategy = Retry(
total=3,
backoff_factor=1,
status_forcelist=[429, 500, 502, 503, 504],
allowed_methods=['POST']
)
adapter = HTTPAdapter(max_retries=retry_strategy)
session = requests.Session()
session.mount('https://', adapter)
session.mount('http://', adapter)
# ---------------------------
# Cached API Calls
# ---------------------------
@st.cache_data
def get_xtopus_token():
resp = session.post(
'https://xt.xtopus.io/api/user/login',
json={'username': xt_username, 'password': xt_password},
headers={'Content-Type': 'application/json'},
timeout=(5, 60)
)
resp.raise_for_status()
return resp.json()['data']['accessToken']
@st.cache_data
def fetch_bestseller(cat_id, domain, geo):
currency = DOMAIN_CURRENCY.get(domain, 'GBP')
payload = {
'source': 'amazon_bestsellers',
'domain': domain,
'geo_location': geo,
'query': cat_id,
'render': 'html',
'parse': True,
'context': [{'key': 'currency', 'value': currency}]
}
resp = session.post(
'https://realtime.oxylabs.io/v1/queries',
auth=(ox_username, ox_password),
json=payload,
timeout=(5, 60)
)
resp.raise_for_status()
return resp.json()
@st.cache_data
def get_listing(asin):
resp = session.post(
'https://xt.xtopus.io/api/merchant/product/quantity',
json={'market_product_id': asin},
headers={'Authorization': get_xtopus_token()},
timeout=(5, 60)
)
if resp.status_code != 200:
return {}
return resp.json().get('data', {})
@st.cache_data
def get_trending_brands(titles: tuple, domain: str) -> str:
client = OpenAI(api_key=openai_api_key)
prompt = (
f"Given the following product titles for Amazon domain '{domain}':\n\n"
+ "\n".join(f"- {t}" for t in titles)
+ "\n\nThis is the latest data of 2025 just fetched now from amazon. Please suggest the TOP 5 trending brands among these products."
)
resp = client.chat.completions.create(
model="gpt-4o-mini-search-preview-2025-03-11",
messages=[{"role": "user", "content": prompt}]
)
return resp.choices[0].message.content.strip()
# Initialize token
xt_token = get_xtopus_token()
# ---------------------------
# Domain & Query Setup
# ---------------------------
domains = {
'com': ('90210',
['11060451','11060521','11056591','11057241','11058281'],
{'11060451':'Skincare','11060521':'Bath & Body','11056591':'Fragrance',
'11057241':'Hair Care','11058281':'Make-up'}),
'co.uk': ('G5 8AJ',
['118464031','74129031','118457031','66469031','118423031'],
{'118464031':'Skincare','74129031':'Bath & Body','118457031':'Fragrance',
'66469031':'Haircare','118423031':'Make-up'}),
'fr': ('75001',
['211020031','212073031','210965031','211038031','210972031'],
{'211020031':'Skincare','212073031':'Bath & Body','210965031':'Fragrance',
'211038031':'Haircare','210972031':'Make-up'}),
'de': ('10115',
['122878031','64272031','122877031','64486031','122880031'],
{'122878031':'Skincare','64272031':'Bath & Body','122877031':'Fragrance',
'64486031':'Haircare','122880031':'Make-up'}),
'es': ('08001',
['6397934031','4347676031','6397798031','4347698031','6397823031'],
{'6397934031':'Skincare','4347676031':'Bath & Body','6397798031':'Fragrance',
'4347698031':'Haircare','6397823031':'Make-up'}),
'it': ('20019',
['6306897031','4327880031','6306898031','4327902031','6306900031'],
{'6306897031':'Skincare','4327880031':'Bath & Body','6306898031':'Fragrance',
'4327902031':'Haircare','6306900031':'Make-up'}),
'ca': ('M4C 1A1',
['6344740011','6344392011','6344431011','6344512011','6344635011'],
{'6344740011':'Skincare','6344392011':'Bath & Body','6344431011':'Fragrance',
'6344512011':'Haircare','6344635011':'Make-up'}),
}
chosen = st.selectbox('Select Domain:', list(domains.keys()))
geo, queries, names = domains[chosen]
placeholders = {q: st.empty() for q in queries}
if 'data_ready' not in st.session_state:
st.session_state['data_ready'] = False
# Sidebar: Export + Email + Trending Brands
st.sidebar.header("Data Export & Email")
if not st.session_state['data_ready']:
st.sidebar.warning("Data not ready. Fetch Bestseller Data first.")
email_to = st.sidebar.text_input("Send reports to:")
send_email = st.sidebar.button("Send Reports")
# ---------------------------
# Fetch & Render Data
# ---------------------------
if st.button('Fetch Bestseller Data'):
all_tables = []
with st.spinner("Fetching bestseller data..."):
with ThreadPoolExecutor(max_workers=len(queries)) as executor:
future_map = {
executor.submit(fetch_bestseller, cid, chosen, geo): cid
for cid in queries
}
for future in as_completed(future_map):
cid = future_map[future]
cat_name = names[cid]
ph = placeholders[cid]
ph.header(cat_name)
with st.spinner(f"Processing {cat_name}..."):
res = future.result()
if not res or 'error' in res:
ph.warning(f"❌ {cat_name}: {res.get('error')}")
continue
items = res['results'][0]['content']['results']
asins = [i['asin'] for i in items if i.get('asin')]
lm = {}
with ThreadPoolExecutor() as ex2:
f2map = {ex2.submit(get_listing, a): a for a in asins}
for f2 in as_completed(f2map):
lm[f2map[f2]] = f2.result() or {}
rows = []
rate = rates[DOMAIN_CURRENCY[chosen]]
for it in items:
asin = it.get('asin')
title = it.get('title', '')
price = it.get('price')
qty = lm.get(asin, {}).get('product_qty')
qty_disp = qty if qty is not None else 'N/A'
gbp = round(pd.to_numeric(price, errors='coerce') * rate, 2)
# Predicted Buy Box price
predicted = round(gbp * 0.975, 2)
refund = 0.01 * predicted
mod = (profit_rate * predicted) / 100
# Domain-specific rules
if chosen == 'co.uk':
tax, ship = 1.2, (2.10 if predicted <= 30 else 2.50)
comm = predicted * (0.08 if predicted <= 10 else 0.15)
elif chosen == 'com':
tax, ship = 1, 7.6
comm = predicted * (0.08 if predicted <= 7.5 else 0.15)
elif chosen == 'es':
tax, ship = 1.21, 4
comm = predicted * (0.08 if predicted <= 8.53 else 0.15)
elif chosen == 'de':
tax, ship = 1.19, 3.3
comm = predicted * (0.08 if predicted <= 8.53 else 0.15)
elif chosen == 'fr':
tax, ship = 1.20, 4
comm = predicted * (0.08 if predicted <= 8.53 else 0.15)
elif chosen == 'it':
tax, ship = 1.22, 5
comm = predicted * (0.08 if predicted <= 8.53 else 0.15)
elif chosen == 'ca':
tax, ship = 1, 7
comm = predicted * (0.08 if predicted <= 8.10 else 0.15)
else:
tax, ship, comm = 1, 0, predicted * 0.15
max_cost = round((predicted / tax) - mod - comm - ship - refund, 2)
rows.append({
'ASIN': asin,
'Title': title,
'Price': price,
'GBP Price': gbp,
'Predicted BB Price': predicted,
'Maximum Source Price': max_cost,
'Quantity Available': qty_disp,
'View on Amazon': f"https://www.amazon.{chosen}/dp/{asin}"
})
df = pd.DataFrame(rows)
df = df[df['Maximum Source Price'] > 0]
all_tables.append(df)
# Display splits
not_s = df[df['Quantity Available'] == 'N/A']
no_s = df[df['Quantity Available'] == 0]
yes_s = df[pd.to_numeric(df['Quantity Available'], errors='coerce') > 0]
c1, c2, c3 = ph.columns(3)
for col, subset, hdr in [
(c1, not_s, 'Not Selling'),
(c2, no_s, 'No Stock'),
(c3, yes_s, 'Selling'),
]:
with col:
st.subheader(f"{cat_name} – {hdr}")
st.dataframe(
subset.reset_index(drop=True),
height=300,
use_container_width=True,
column_config={
'View on Amazon': st.column_config.LinkColumn(
label='View on Amazon',
display_text='View Now'
)
}
)
# Hot Products
combined = pd.concat(all_tables, ignore_index=True)
hot = combined.groupby(['ASIN','Title','View on Amazon']).filter(lambda x: len(x) > 1)
if not hot.empty:
st.subheader('πŸ”₯ Hot Products')
st.dataframe(
hot.drop_duplicates().reset_index(drop=True),
height=200,
use_container_width=True,
column_config={
'View on Amazon': st.column_config.LinkColumn(
label='View on Amazon',
display_text='View Now'
)
}
)
st.balloons()
st.session_state['data_ready'] = True
# Prepare export buffers
export_df = pd.concat(all_tables, ignore_index=True)
# CSV
buf_csv = StringIO()
export_df.to_csv(buf_csv, index=False)
st.session_state['buf_csv'] = buf_csv.getvalue()
st.sidebar.download_button('Export CSV', st.session_state['buf_csv'],
f'{chosen}.csv', 'text/csv')
# Excel
buf_xl = BytesIO()
with pd.ExcelWriter(buf_xl, engine='openpyxl') as writer:
for df in all_tables:
qn = df['ASIN'].iloc[0]
df[df['Quantity Available']=='N/A'] \
.to_excel(writer, sheet_name=f'{qn}_NotSell', index=False)
df[df['Quantity Available']==0] \
.to_excel(writer, sheet_name=f'{qn}_NoStock', index=False)
df[pd.to_numeric(df['Quantity Available'], errors='coerce')>0] \
.to_excel(writer, sheet_name=f'{qn}_Selling', index=False)
buf_xl.seek(0)
st.session_state['buf_xl'] = buf_xl.getvalue()
st.sidebar.download_button('Export Excel', st.session_state['buf_xl'],
f'{chosen}.xlsx',
'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet')
# HTML
buf_html = StringIO()
buf_html.write('<html><body>')
for df in all_tables:
buf_html.write(df.to_html(index=False, escape=False))
buf_html.write('</body></html>')
st.session_state['buf_html'] = buf_html.getvalue()
st.sidebar.download_button('Export HTML', st.session_state['buf_html'],
f'{chosen}.html', 'text/html')
# Cached GPT-4 Trending Brands
try:
titles = tuple(export_df['Title'].dropna().unique().tolist())
brands = get_trending_brands(titles, chosen)
st.sidebar.markdown("### 🏷️ Top 5 Trending Brands")
st.sidebar.markdown(brands)
except Exception as e:
st.sidebar.error(f"Failed to fetch trending brands: {e}")
else:
st.info("Click 'Fetch Bestseller Data' to begin.")
# ---------------------------
# Send Email if requested
# ---------------------------
if send_email:
if not st.session_state['data_ready']:
st.sidebar.error("Run 'Fetch Bestseller Data' before sending email.")
elif not email_to:
st.sidebar.error("Enter a valid recipient email address.")
else:
try:
msg = EmailMessage()
msg['Subject'] = f"Trending Products Reports – {chosen}"
msg['From'] = email_user
msg['To'] = email_to
msg.set_content("Attached: CSV, Excel & HTML reports of trending products.")
# Attach CSV
csv_b = st.session_state['buf_csv'].encode('utf-8')
msg.add_attachment(csv_b, maintype='text', subtype='csv',
filename=f'{chosen}.csv')
# Attach Excel
msg.add_attachment(
st.session_state['buf_xl'],
maintype='application',
subtype='vnd.openxmlformats-officedocument.spreadsheetml.sheet',
filename=f'{chosen}.xlsx'
)
# Attach HTML
html_b = st.session_state['buf_html'].encode('utf-8')
msg.add_attachment(html_b, maintype='text', subtype='html',
filename=f'{chosen}.html')
context = ssl.create_default_context()
with smtplib.SMTP_SSL('smtp.hostinger.com', 465, context=context) as server:
server.login(email_user, email_password)
server.send_message(msg)
st.sidebar.success(f"Reports sent to {email_to}")
except Exception as e:
st.sidebar.error(f"Failed to send email: {e}")