Seth0330's picture
Update app.py
73f0dc8 verified
import streamlit as st
import requests
from datetime import datetime
import pandas as pd
import json
import io
import zipfile
# ========== HARDCODED SANDBOX CREDENTIALS ==========
QB_CLIENT_ID = "ABHMR4u0r9zkbLQXoBgOgoHA1NYBA3rmxHTMY9qrZxHT0egddV"
QB_CLIENT_SECRET = "L8ejkQtUz6JCLB4rsRYIkdbsctE8NK2MWCi3week"
QB_REFRESH_TOKEN = "RT1-1-H0-17619102608c5kt4csegkbhep7esgp"
QB_REALM_ID = "9341455017117852"
TAX_CODE_ID = "5" # This is the TaxCode Id you want to use
QB_BASE_URL = "https://sandbox-quickbooks.api.intuit.com/v3/company"
TOKEN_URL = "https://oauth.platform.intuit.com/oauth2/v1/tokens/bearer"
st.set_page_config(page_title="QuickBooks Line Items Table", layout="wide")
st.title("QuickBooks: Sales Receipt Line Items & Invoice Creation")
# ========== TOKEN REFRESH ==========
def refresh_access_token(client_id, client_secret, refresh_token):
headers = {"Accept": "application/json", "Content-Type": "application/x-www-form-urlencoded"}
data = {
"grant_type": "refresh_token",
"refresh_token": refresh_token
}
resp = requests.post(TOKEN_URL, headers=headers, data=data, auth=(client_id, client_secret), timeout=20)
if resp.status_code == 200:
tokens = resp.json()
return tokens["access_token"], tokens.get("refresh_token", refresh_token), None
else:
return None, None, f"Token refresh failed: {resp.status_code}: {resp.text}"
# ========== API QUERY ==========
def get_new_sales_receipts(access_token, realm_id, last_checked_iso, max_results=1000):
receipts = []
more = True
start_position = 1
if "Z" not in last_checked_iso and "+" not in last_checked_iso and "-" not in last_checked_iso[10:]:
last_checked_iso += "Z"
while more:
query = f"SELECT * FROM SalesReceipt WHERE MetaData.CreateTime > '{last_checked_iso}' STARTPOSITION {start_position} MAXRESULTS {max_results}"
url = f"{QB_BASE_URL}/{realm_id}/query"
headers = {
"Authorization": f"Bearer {access_token}",
"Accept": "application/json"
}
params = {"query": query}
resp = requests.get(url, headers=headers, params=params, timeout=30)
if resp.status_code != 200:
return receipts, f"QuickBooks API Error: {resp.status_code}: {resp.text}"
data = resp.json()
batch = data.get("QueryResponse", {}).get("SalesReceipt", [])
receipts.extend(batch)
if len(batch) < max_results:
more = False
else:
start_position += max_results
return receipts, None
def get_all_sales_receipts(access_token, realm_id, start_position=1, max_results=1000):
receipts = []
more = True
while more:
query = f"SELECT * FROM SalesReceipt STARTPOSITION {start_position} MAXRESULTS {max_results}"
url = f"{QB_BASE_URL}/{realm_id}/query"
headers = {
"Authorization": f"Bearer {access_token}",
"Accept": "application/json"
}
params = {"query": query}
resp = requests.get(url, headers=headers, params=params, timeout=30)
if resp.status_code != 200:
return receipts, f"QuickBooks API Error: {resp.status_code}: {resp.text}"
data = resp.json()
batch = data.get("QueryResponse", {}).get("SalesReceipt", [])
receipts.extend(batch)
if len(batch) < max_results:
more = False
else:
start_position += max_results
return receipts, None
def fetch_tax_codes(access_token, realm_id):
url = f"{QB_BASE_URL}/{realm_id}/query"
headers = {
"Authorization": f"Bearer {access_token}",
"Accept": "application/json"
}
params = {"query": "SELECT * FROM TaxCode"}
resp = requests.get(url, headers=headers, params=params)
if resp.status_code != 200:
return None, f"QuickBooks API Error: {resp.status_code}: {resp.text}"
data = resp.json()
tax_codes = data.get("QueryResponse", {}).get("TaxCode", [])
return tax_codes, None
# ========== INVOICE CREATION ==========
def create_invoice_from_sales_receipt(access_token, realm_id, receipt, tax_code_id):
url = f"{QB_BASE_URL}/{realm_id}/invoice"
headers = {
"Authorization": f"Bearer {access_token}",
"Accept": "application/json",
"Content-Type": "application/json"
}
customer_ref = receipt.get("CustomerRef", {})
if not customer_ref or "value" not in customer_ref:
return None, "Missing CustomerRef in SalesReceipt"
lines = []
for line in receipt.get("Line", []):
if line.get("DetailType") != "SalesItemLineDetail":
continue
detail = line.get("SalesItemLineDetail", {})
item_ref = detail.get("ItemRef", {})
if not item_ref or "value" not in item_ref:
continue
new_line = {
"Amount": line.get("Amount"),
"DetailType": "SalesItemLineDetail",
"Description": line.get("Description", "-"),
"SalesItemLineDetail": {
"ItemRef": {"value": item_ref["value"]},
"Qty": detail.get("Qty"),
"UnitPrice": detail.get("UnitPrice"),
"TaxCodeRef": {"value": tax_code_id}
}
}
lines.append(new_line)
if not lines:
return None, "No SalesItemLineDetail line items found in SalesReceipt"
invoice_data = {
"CustomerRef": {"value": customer_ref["value"]},
"Line": lines,
"DocNumber": receipt.get("DocNumber"),
}
if receipt.get("BillAddr"):
invoice_data["BillAddr"] = receipt["BillAddr"]
if receipt.get("ShipAddr"):
invoice_data["ShipAddr"] = receipt["ShipAddr"]
resp = requests.post(url, headers=headers, data=json.dumps(invoice_data))
if resp.status_code not in (200, 201):
return None, f"QuickBooks API Error: {resp.status_code}: {resp.text}"
return resp.json(), None
# ========== EXTRACT LINE ITEMS (with Bill/Ship Addr) ==========
def flatten_address(addr):
if not addr:
return "-"
parts = [addr.get(k, "") for k in ["Line1", "Line2", "Line3", "City", "CountrySubDivisionCode", "PostalCode", "Country"]]
return ", ".join([p for p in parts if p])
def extract_line_items(receipts):
rows = []
for r in receipts:
txn_date = r.get("TxnDate", "")
txn_type = "Sales Receipt"
txn_number = r.get("DocNumber", r.get("Id", ""))
customer_ref = r.get("CustomerRef", {})
customer_name = customer_ref.get("name", "") if isinstance(customer_ref, dict) else ""
bill_addr_str = flatten_address(r.get("BillAddr", {}))
ship_addr_str = flatten_address(r.get("ShipAddr", {}))
lines = r.get("Line", [])
for line in lines:
if line.get("DetailType") != "SalesItemLineDetail":
continue
description = line.get("Description", "-")
detail = line.get("SalesItemLineDetail", {})
item_name = detail.get("ItemRef", {}).get("name", "") if isinstance(detail.get("ItemRef", {}), dict) else ""
quantity = detail.get("Qty", "")
unit_price = detail.get("UnitPrice", "")
amount = float(line.get("Amount", 0) or 0)
debit = amount if amount > 0 else ""
credit = -amount if amount < 0 else ""
row = {
"Transaction date": txn_date,
"Transaction type": txn_type,
"#": txn_number,
"Name": customer_name,
"Memo/Description": description,
"Item Name": item_name,
"Quantity": quantity if quantity != "" else "-",
"Unit Price": unit_price if unit_price != "" else "-",
"Debit": debit if debit != "" else "-",
"Credit": credit if credit != "" else "-",
"Billing Address": bill_addr_str,
"Shipping Address": ship_addr_str
}
rows.append(row)
return rows
# ========== PER-RECEIPT JSON ZIP (include Bill/Ship Addr at top) ==========
def build_zip_of_json(receipts):
buf = io.BytesIO()
with zipfile.ZipFile(buf, "w", compression=zipfile.ZIP_DEFLATED) as z:
for r in receipts:
number = r.get("DocNumber") or r.get("Id") or "unknown"
out = r.copy()
# Bill/Ship Addr at top level
if "BillAddr" in r:
out["BillAddr"] = r["BillAddr"]
if "ShipAddr" in r:
out["ShipAddr"] = r["ShipAddr"]
fname = f"SalesReceipt-{number}.json"
z.writestr(fname, json.dumps(out, indent=2))
buf.seek(0)
return buf
# ========== SESSION STATE ==========
if "seen_receipt_ids" not in st.session_state:
st.session_state["seen_receipt_ids"] = set()
if "last_checked_iso" not in st.session_state:
st.session_state["last_checked_iso"] = "2000-01-01T00:00:00Z"
if "latest_new_receipts" not in st.session_state:
st.session_state["latest_new_receipts"] = []
# ========== UI ==========
with st.sidebar:
st.header("QuickBooks Actions")
max_results = st.slider("Max results per page (pagination)", min_value=10, max_value=1000, value=100, step=10)
all_btn = st.button("Fetch All Sales Receipt Line Items", type="primary")
st.markdown("---")
st.markdown("**New Sales Receipts Only**")
last_checked = st.text_input(
"Last Checked Time (UTC, ISO)",
value=st.session_state["last_checked_iso"],
help="e.g., 2025-07-16T00:00:00Z"
)
new_btn = st.button("Fetch New Sales Receipts")
st.markdown("---")
st.markdown("**Tax Codes**")
fetch_tax_btn = st.button("Fetch Tax Codes")
tab = st.empty()
if fetch_tax_btn:
with st.spinner("Refreshing QuickBooks access token..."):
access_token, _, err = refresh_access_token(QB_CLIENT_ID, QB_CLIENT_SECRET, QB_REFRESH_TOKEN)
if err or not access_token:
st.error(f"Could not refresh access token: {err}")
else:
with st.spinner("Querying QuickBooks for Tax Codes..."):
tax_codes, api_err = fetch_tax_codes(access_token, QB_REALM_ID)
if api_err:
st.error(api_err)
elif not tax_codes:
st.warning("No tax codes found.")
else:
df = pd.DataFrame([
{
"TaxCode Id": tc.get("Id", "-"),
"TaxCode Name": tc.get("Name", "-"),
"Description": tc.get("Description", "-")
}
for tc in tax_codes
])
st.success(f"Found {len(tax_codes)} tax codes.")
st.dataframe(df, use_container_width=True)
st.markdown("> Use the **TaxCode Id** (not the name) in your API calls as `TaxCodeRef.value`.")
if all_btn:
with st.spinner("Refreshing QuickBooks access token..."):
access_token, _, err = refresh_access_token(QB_CLIENT_ID, QB_CLIENT_SECRET, QB_REFRESH_TOKEN)
if err or not access_token:
st.error(f"Could not refresh access token: {err}")
st.stop()
with st.spinner("Querying QuickBooks for all Sales Receipts..."):
receipts, api_err = get_all_sales_receipts(access_token, QB_REALM_ID, start_position=1, max_results=max_results)
if api_err:
st.error(api_err)
elif not receipts:
st.warning("No sales receipts found.")
else:
st.success(f"Found {len(receipts)} sales receipts.")
line_items = extract_line_items(receipts)
df = pd.DataFrame(line_items)
with tab.container():
st.markdown("### Sales Receipt Line Items (All)")
st.dataframe(df, use_container_width=True)
csv = df.to_csv(index=False)
st.download_button(
label="Download Line Items as CSV",
data=csv,
file_name="quickbooks_sales_receipt_line_items.csv",
mime="text/csv"
)
zip_buf = build_zip_of_json(receipts)
st.download_button(
label="Download All Sales Receipts (ZIP of JSONs)",
data=zip_buf,
file_name="quickbooks_sales_receipts_json.zip",
mime="application/zip"
)
st.markdown("---")
st.markdown("### View Full JSON of a Sales Receipt")
idx = st.number_input("Select Sales Receipt #", min_value=1, max_value=len(receipts), value=1)
st.json(receipts[idx-1])
st.session_state["seen_receipt_ids"] = {r.get("Id") for r in receipts}
st.session_state["last_checked_iso"] = datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ")
if new_btn:
with st.spinner("Refreshing QuickBooks access token..."):
access_token, _, err = refresh_access_token(QB_CLIENT_ID, QB_CLIENT_SECRET, QB_REFRESH_TOKEN)
if err or not access_token:
st.error(f"Could not refresh access token: {err}")
st.stop()
new_since = last_checked.strip()
if "Z" not in new_since and "+" not in new_since and "-" not in new_since[10:]:
new_since += "Z"
with st.spinner(f"Querying QuickBooks for new Sales Receipts since {new_since}..."):
receipts, api_err = get_new_sales_receipts(access_token, QB_REALM_ID, new_since, max_results=max_results)
if api_err:
st.error(api_err)
st.session_state["latest_new_receipts"] = []
elif not receipts:
st.info("No new sales receipts found.")
st.session_state["latest_new_receipts"] = []
else:
already_seen = st.session_state.get("seen_receipt_ids", set())
new_receipts = [r for r in receipts if r.get("Id") not in already_seen]
if not new_receipts:
st.info("No new, unseen sales receipts since last check.")
st.session_state["latest_new_receipts"] = []
else:
st.success(f"Found {len(new_receipts)} NEW sales receipts.")
line_items = extract_line_items(new_receipts)
df = pd.DataFrame(line_items)
with tab.container():
st.markdown(f"### Sales Receipt Line Items (New since {new_since})")
st.dataframe(df, use_container_width=True)
csv = df.to_csv(index=False)
st.download_button(
label="Download NEW Line Items as CSV",
data=csv,
file_name="quickbooks_new_sales_receipt_line_items.csv",
mime="text/csv"
)
zip_buf = build_zip_of_json(new_receipts)
st.download_button(
label="Download NEW Sales Receipts (ZIP of JSONs)",
data=zip_buf,
file_name="quickbooks_new_sales_receipts_json.zip",
mime="application/zip"
)
st.markdown("---")
st.markdown("### View Full JSON of a NEW Sales Receipt")
idx = st.number_input("Select New Sales Receipt #", min_value=1, max_value=len(new_receipts), value=1)
st.json(new_receipts[idx-1])
st.session_state["seen_receipt_ids"].update({r.get("Id") for r in new_receipts})
st.session_state["last_checked_iso"] = datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ")
st.session_state["latest_new_receipts"] = new_receipts
# ========== CREATE INVOICES FROM LATEST NEW RECEIPTS ==========
if st.session_state.get("latest_new_receipts"):
st.markdown("---")
if st.button("Create Invoice(s) from New Sales Receipts"):
with st.spinner("Refreshing QuickBooks access token..."):
access_token, _, err = refresh_access_token(QB_CLIENT_ID, QB_CLIENT_SECRET, QB_REFRESH_TOKEN)
if err or not access_token:
st.error(f"Could not refresh access token: {err}")
else:
created, failed = 0, 0
results = []
for receipt in st.session_state["latest_new_receipts"]:
resp, err_msg = create_invoice_from_sales_receipt(access_token, QB_REALM_ID, receipt, TAX_CODE_ID)
if resp:
created += 1
results.append({"SalesReceipt": receipt.get("DocNumber", ""), "Status": "Created", "InvoiceId": resp.get("Invoice", {}).get("Id", "-")})
else:
failed += 1
results.append({"SalesReceipt": receipt.get("DocNumber", ""), "Status": f"Error: {err_msg}", "InvoiceId": ""})
if created:
st.success(f"{created} invoice(s) created in QuickBooks! ({failed} failed)")
if failed:
st.error(f"{failed} invoice(s) failed to create.")
st.dataframe(pd.DataFrame(results))
st.markdown("---")
st.markdown(
"""
<small>
<b>Fetch All Sales Receipt Line Items</b> gets everything from your QuickBooks Sandbox.<br>
<b>Fetch New Sales Receipts</b> only gets newly created receipts since your last fetch.<br>
Billing/Shipping address included everywhere. Invoice creation is available after fetching new receipts.<br>
</small>
""", unsafe_allow_html=True
)