import streamlit as st import requests from datetime import datetime import pandas as pd import json import io import zipfile # ========== HARDCODED SANDBOX CREDENTIALS ========== QB_CLIENT_ID = "ABHMR4u0r9zkbLQXoBgOgoHA1NYBA3rmxHTMY9qrZxHT0egddV" QB_CLIENT_SECRET = "L8ejkQtUz6JCLB4rsRYIkdbsctE8NK2MWCi3week" QB_REFRESH_TOKEN = "RT1-1-H0-17619102608c5kt4csegkbhep7esgp" QB_REALM_ID = "9341455017117852" TAX_CODE_ID = "5" # This is the TaxCode Id you want to use QB_BASE_URL = "https://sandbox-quickbooks.api.intuit.com/v3/company" TOKEN_URL = "https://oauth.platform.intuit.com/oauth2/v1/tokens/bearer" st.set_page_config(page_title="QuickBooks Line Items Table", layout="wide") st.title("QuickBooks: Sales Receipt Line Items & Invoice Creation") # ========== TOKEN REFRESH ========== def refresh_access_token(client_id, client_secret, refresh_token): headers = {"Accept": "application/json", "Content-Type": "application/x-www-form-urlencoded"} data = { "grant_type": "refresh_token", "refresh_token": refresh_token } resp = requests.post(TOKEN_URL, headers=headers, data=data, auth=(client_id, client_secret), timeout=20) if resp.status_code == 200: tokens = resp.json() return tokens["access_token"], tokens.get("refresh_token", refresh_token), None else: return None, None, f"Token refresh failed: {resp.status_code}: {resp.text}" # ========== API QUERY ========== def get_new_sales_receipts(access_token, realm_id, last_checked_iso, max_results=1000): receipts = [] more = True start_position = 1 if "Z" not in last_checked_iso and "+" not in last_checked_iso and "-" not in last_checked_iso[10:]: last_checked_iso += "Z" while more: query = f"SELECT * FROM SalesReceipt WHERE MetaData.CreateTime > '{last_checked_iso}' STARTPOSITION {start_position} MAXRESULTS {max_results}" url = f"{QB_BASE_URL}/{realm_id}/query" headers = { "Authorization": f"Bearer {access_token}", "Accept": "application/json" } params = {"query": query} resp = requests.get(url, headers=headers, params=params, timeout=30) if resp.status_code != 200: return receipts, f"QuickBooks API Error: {resp.status_code}: {resp.text}" data = resp.json() batch = data.get("QueryResponse", {}).get("SalesReceipt", []) receipts.extend(batch) if len(batch) < max_results: more = False else: start_position += max_results return receipts, None def get_all_sales_receipts(access_token, realm_id, start_position=1, max_results=1000): receipts = [] more = True while more: query = f"SELECT * FROM SalesReceipt STARTPOSITION {start_position} MAXRESULTS {max_results}" url = f"{QB_BASE_URL}/{realm_id}/query" headers = { "Authorization": f"Bearer {access_token}", "Accept": "application/json" } params = {"query": query} resp = requests.get(url, headers=headers, params=params, timeout=30) if resp.status_code != 200: return receipts, f"QuickBooks API Error: {resp.status_code}: {resp.text}" data = resp.json() batch = data.get("QueryResponse", {}).get("SalesReceipt", []) receipts.extend(batch) if len(batch) < max_results: more = False else: start_position += max_results return receipts, None def fetch_tax_codes(access_token, realm_id): url = f"{QB_BASE_URL}/{realm_id}/query" headers = { "Authorization": f"Bearer {access_token}", "Accept": "application/json" } params = {"query": "SELECT * FROM TaxCode"} resp = requests.get(url, headers=headers, params=params) if resp.status_code != 200: return None, f"QuickBooks API Error: {resp.status_code}: {resp.text}" data = resp.json() tax_codes = data.get("QueryResponse", {}).get("TaxCode", []) return tax_codes, None # ========== INVOICE CREATION ========== def create_invoice_from_sales_receipt(access_token, realm_id, receipt, tax_code_id): url = f"{QB_BASE_URL}/{realm_id}/invoice" headers = { "Authorization": f"Bearer {access_token}", "Accept": "application/json", "Content-Type": "application/json" } customer_ref = receipt.get("CustomerRef", {}) if not customer_ref or "value" not in customer_ref: return None, "Missing CustomerRef in SalesReceipt" lines = [] for line in receipt.get("Line", []): if line.get("DetailType") != "SalesItemLineDetail": continue detail = line.get("SalesItemLineDetail", {}) item_ref = detail.get("ItemRef", {}) if not item_ref or "value" not in item_ref: continue new_line = { "Amount": line.get("Amount"), "DetailType": "SalesItemLineDetail", "Description": line.get("Description", "-"), "SalesItemLineDetail": { "ItemRef": {"value": item_ref["value"]}, "Qty": detail.get("Qty"), "UnitPrice": detail.get("UnitPrice"), "TaxCodeRef": {"value": tax_code_id} } } lines.append(new_line) if not lines: return None, "No SalesItemLineDetail line items found in SalesReceipt" invoice_data = { "CustomerRef": {"value": customer_ref["value"]}, "Line": lines, "DocNumber": receipt.get("DocNumber"), } if receipt.get("BillAddr"): invoice_data["BillAddr"] = receipt["BillAddr"] if receipt.get("ShipAddr"): invoice_data["ShipAddr"] = receipt["ShipAddr"] resp = requests.post(url, headers=headers, data=json.dumps(invoice_data)) if resp.status_code not in (200, 201): return None, f"QuickBooks API Error: {resp.status_code}: {resp.text}" return resp.json(), None # ========== EXTRACT LINE ITEMS (with Bill/Ship Addr) ========== def flatten_address(addr): if not addr: return "-" parts = [addr.get(k, "") for k in ["Line1", "Line2", "Line3", "City", "CountrySubDivisionCode", "PostalCode", "Country"]] return ", ".join([p for p in parts if p]) def extract_line_items(receipts): rows = [] for r in receipts: txn_date = r.get("TxnDate", "") txn_type = "Sales Receipt" txn_number = r.get("DocNumber", r.get("Id", "")) customer_ref = r.get("CustomerRef", {}) customer_name = customer_ref.get("name", "") if isinstance(customer_ref, dict) else "" bill_addr_str = flatten_address(r.get("BillAddr", {})) ship_addr_str = flatten_address(r.get("ShipAddr", {})) lines = r.get("Line", []) for line in lines: if line.get("DetailType") != "SalesItemLineDetail": continue description = line.get("Description", "-") detail = line.get("SalesItemLineDetail", {}) item_name = detail.get("ItemRef", {}).get("name", "") if isinstance(detail.get("ItemRef", {}), dict) else "" quantity = detail.get("Qty", "") unit_price = detail.get("UnitPrice", "") amount = float(line.get("Amount", 0) or 0) debit = amount if amount > 0 else "" credit = -amount if amount < 0 else "" row = { "Transaction date": txn_date, "Transaction type": txn_type, "#": txn_number, "Name": customer_name, "Memo/Description": description, "Item Name": item_name, "Quantity": quantity if quantity != "" else "-", "Unit Price": unit_price if unit_price != "" else "-", "Debit": debit if debit != "" else "-", "Credit": credit if credit != "" else "-", "Billing Address": bill_addr_str, "Shipping Address": ship_addr_str } rows.append(row) return rows # ========== PER-RECEIPT JSON ZIP (include Bill/Ship Addr at top) ========== def build_zip_of_json(receipts): buf = io.BytesIO() with zipfile.ZipFile(buf, "w", compression=zipfile.ZIP_DEFLATED) as z: for r in receipts: number = r.get("DocNumber") or r.get("Id") or "unknown" out = r.copy() # Bill/Ship Addr at top level if "BillAddr" in r: out["BillAddr"] = r["BillAddr"] if "ShipAddr" in r: out["ShipAddr"] = r["ShipAddr"] fname = f"SalesReceipt-{number}.json" z.writestr(fname, json.dumps(out, indent=2)) buf.seek(0) return buf # ========== SESSION STATE ========== if "seen_receipt_ids" not in st.session_state: st.session_state["seen_receipt_ids"] = set() if "last_checked_iso" not in st.session_state: st.session_state["last_checked_iso"] = "2000-01-01T00:00:00Z" if "latest_new_receipts" not in st.session_state: st.session_state["latest_new_receipts"] = [] # ========== UI ========== with st.sidebar: st.header("QuickBooks Actions") max_results = st.slider("Max results per page (pagination)", min_value=10, max_value=1000, value=100, step=10) all_btn = st.button("Fetch All Sales Receipt Line Items", type="primary") st.markdown("---") st.markdown("**New Sales Receipts Only**") last_checked = st.text_input( "Last Checked Time (UTC, ISO)", value=st.session_state["last_checked_iso"], help="e.g., 2025-07-16T00:00:00Z" ) new_btn = st.button("Fetch New Sales Receipts") st.markdown("---") st.markdown("**Tax Codes**") fetch_tax_btn = st.button("Fetch Tax Codes") tab = st.empty() if fetch_tax_btn: with st.spinner("Refreshing QuickBooks access token..."): access_token, _, err = refresh_access_token(QB_CLIENT_ID, QB_CLIENT_SECRET, QB_REFRESH_TOKEN) if err or not access_token: st.error(f"Could not refresh access token: {err}") else: with st.spinner("Querying QuickBooks for Tax Codes..."): tax_codes, api_err = fetch_tax_codes(access_token, QB_REALM_ID) if api_err: st.error(api_err) elif not tax_codes: st.warning("No tax codes found.") else: df = pd.DataFrame([ { "TaxCode Id": tc.get("Id", "-"), "TaxCode Name": tc.get("Name", "-"), "Description": tc.get("Description", "-") } for tc in tax_codes ]) st.success(f"Found {len(tax_codes)} tax codes.") st.dataframe(df, use_container_width=True) st.markdown("> Use the **TaxCode Id** (not the name) in your API calls as `TaxCodeRef.value`.") if all_btn: with st.spinner("Refreshing QuickBooks access token..."): access_token, _, err = refresh_access_token(QB_CLIENT_ID, QB_CLIENT_SECRET, QB_REFRESH_TOKEN) if err or not access_token: st.error(f"Could not refresh access token: {err}") st.stop() with st.spinner("Querying QuickBooks for all Sales Receipts..."): receipts, api_err = get_all_sales_receipts(access_token, QB_REALM_ID, start_position=1, max_results=max_results) if api_err: st.error(api_err) elif not receipts: st.warning("No sales receipts found.") else: st.success(f"Found {len(receipts)} sales receipts.") line_items = extract_line_items(receipts) df = pd.DataFrame(line_items) with tab.container(): st.markdown("### Sales Receipt Line Items (All)") st.dataframe(df, use_container_width=True) csv = df.to_csv(index=False) st.download_button( label="Download Line Items as CSV", data=csv, file_name="quickbooks_sales_receipt_line_items.csv", mime="text/csv" ) zip_buf = build_zip_of_json(receipts) st.download_button( label="Download All Sales Receipts (ZIP of JSONs)", data=zip_buf, file_name="quickbooks_sales_receipts_json.zip", mime="application/zip" ) st.markdown("---") st.markdown("### View Full JSON of a Sales Receipt") idx = st.number_input("Select Sales Receipt #", min_value=1, max_value=len(receipts), value=1) st.json(receipts[idx-1]) st.session_state["seen_receipt_ids"] = {r.get("Id") for r in receipts} st.session_state["last_checked_iso"] = datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ") if new_btn: with st.spinner("Refreshing QuickBooks access token..."): access_token, _, err = refresh_access_token(QB_CLIENT_ID, QB_CLIENT_SECRET, QB_REFRESH_TOKEN) if err or not access_token: st.error(f"Could not refresh access token: {err}") st.stop() new_since = last_checked.strip() if "Z" not in new_since and "+" not in new_since and "-" not in new_since[10:]: new_since += "Z" with st.spinner(f"Querying QuickBooks for new Sales Receipts since {new_since}..."): receipts, api_err = get_new_sales_receipts(access_token, QB_REALM_ID, new_since, max_results=max_results) if api_err: st.error(api_err) st.session_state["latest_new_receipts"] = [] elif not receipts: st.info("No new sales receipts found.") st.session_state["latest_new_receipts"] = [] else: already_seen = st.session_state.get("seen_receipt_ids", set()) new_receipts = [r for r in receipts if r.get("Id") not in already_seen] if not new_receipts: st.info("No new, unseen sales receipts since last check.") st.session_state["latest_new_receipts"] = [] else: st.success(f"Found {len(new_receipts)} NEW sales receipts.") line_items = extract_line_items(new_receipts) df = pd.DataFrame(line_items) with tab.container(): st.markdown(f"### Sales Receipt Line Items (New since {new_since})") st.dataframe(df, use_container_width=True) csv = df.to_csv(index=False) st.download_button( label="Download NEW Line Items as CSV", data=csv, file_name="quickbooks_new_sales_receipt_line_items.csv", mime="text/csv" ) zip_buf = build_zip_of_json(new_receipts) st.download_button( label="Download NEW Sales Receipts (ZIP of JSONs)", data=zip_buf, file_name="quickbooks_new_sales_receipts_json.zip", mime="application/zip" ) st.markdown("---") st.markdown("### View Full JSON of a NEW Sales Receipt") idx = st.number_input("Select New Sales Receipt #", min_value=1, max_value=len(new_receipts), value=1) st.json(new_receipts[idx-1]) st.session_state["seen_receipt_ids"].update({r.get("Id") for r in new_receipts}) st.session_state["last_checked_iso"] = datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ") st.session_state["latest_new_receipts"] = new_receipts # ========== CREATE INVOICES FROM LATEST NEW RECEIPTS ========== if st.session_state.get("latest_new_receipts"): st.markdown("---") if st.button("Create Invoice(s) from New Sales Receipts"): with st.spinner("Refreshing QuickBooks access token..."): access_token, _, err = refresh_access_token(QB_CLIENT_ID, QB_CLIENT_SECRET, QB_REFRESH_TOKEN) if err or not access_token: st.error(f"Could not refresh access token: {err}") else: created, failed = 0, 0 results = [] for receipt in st.session_state["latest_new_receipts"]: resp, err_msg = create_invoice_from_sales_receipt(access_token, QB_REALM_ID, receipt, TAX_CODE_ID) if resp: created += 1 results.append({"SalesReceipt": receipt.get("DocNumber", ""), "Status": "Created", "InvoiceId": resp.get("Invoice", {}).get("Id", "-")}) else: failed += 1 results.append({"SalesReceipt": receipt.get("DocNumber", ""), "Status": f"Error: {err_msg}", "InvoiceId": ""}) if created: st.success(f"{created} invoice(s) created in QuickBooks! ({failed} failed)") if failed: st.error(f"{failed} invoice(s) failed to create.") st.dataframe(pd.DataFrame(results)) st.markdown("---") st.markdown( """ Fetch All Sales Receipt Line Items gets everything from your QuickBooks Sandbox.
Fetch New Sales Receipts only gets newly created receipts since your last fetch.
Billing/Shipping address included everywhere. Invoice creation is available after fetching new receipts.
""", unsafe_allow_html=True )