Spaces:
Sleeping
Sleeping
| import streamlit as st | |
| import requests | |
| from datetime import datetime | |
| import pandas as pd | |
| import json | |
| import io | |
| import zipfile | |
| # ========== HARDCODED SANDBOX CREDENTIALS ========== | |
| QB_CLIENT_ID = "ABHMR4u0r9zkbLQXoBgOgoHA1NYBA3rmxHTMY9qrZxHT0egddV" | |
| QB_CLIENT_SECRET = "L8ejkQtUz6JCLB4rsRYIkdbsctE8NK2MWCi3week" | |
| QB_REFRESH_TOKEN = "RT1-1-H0-17619102608c5kt4csegkbhep7esgp" | |
| QB_REALM_ID = "9341455017117852" | |
| TAX_CODE_ID = "5" # This is the TaxCode Id you want to use | |
| QB_BASE_URL = "https://sandbox-quickbooks.api.intuit.com/v3/company" | |
| TOKEN_URL = "https://oauth.platform.intuit.com/oauth2/v1/tokens/bearer" | |
| st.set_page_config(page_title="QuickBooks Line Items Table", layout="wide") | |
| st.title("QuickBooks: Sales Receipt Line Items & Invoice Creation") | |
| # ========== TOKEN REFRESH ========== | |
| def refresh_access_token(client_id, client_secret, refresh_token): | |
| headers = {"Accept": "application/json", "Content-Type": "application/x-www-form-urlencoded"} | |
| data = { | |
| "grant_type": "refresh_token", | |
| "refresh_token": refresh_token | |
| } | |
| resp = requests.post(TOKEN_URL, headers=headers, data=data, auth=(client_id, client_secret), timeout=20) | |
| if resp.status_code == 200: | |
| tokens = resp.json() | |
| return tokens["access_token"], tokens.get("refresh_token", refresh_token), None | |
| else: | |
| return None, None, f"Token refresh failed: {resp.status_code}: {resp.text}" | |
| # ========== API QUERY ========== | |
| def get_new_sales_receipts(access_token, realm_id, last_checked_iso, max_results=1000): | |
| receipts = [] | |
| more = True | |
| start_position = 1 | |
| if "Z" not in last_checked_iso and "+" not in last_checked_iso and "-" not in last_checked_iso[10:]: | |
| last_checked_iso += "Z" | |
| while more: | |
| query = f"SELECT * FROM SalesReceipt WHERE MetaData.CreateTime > '{last_checked_iso}' STARTPOSITION {start_position} MAXRESULTS {max_results}" | |
| url = f"{QB_BASE_URL}/{realm_id}/query" | |
| headers = { | |
| "Authorization": f"Bearer {access_token}", | |
| "Accept": "application/json" | |
| } | |
| params = {"query": query} | |
| resp = requests.get(url, headers=headers, params=params, timeout=30) | |
| if resp.status_code != 200: | |
| return receipts, f"QuickBooks API Error: {resp.status_code}: {resp.text}" | |
| data = resp.json() | |
| batch = data.get("QueryResponse", {}).get("SalesReceipt", []) | |
| receipts.extend(batch) | |
| if len(batch) < max_results: | |
| more = False | |
| else: | |
| start_position += max_results | |
| return receipts, None | |
| def get_all_sales_receipts(access_token, realm_id, start_position=1, max_results=1000): | |
| receipts = [] | |
| more = True | |
| while more: | |
| query = f"SELECT * FROM SalesReceipt STARTPOSITION {start_position} MAXRESULTS {max_results}" | |
| url = f"{QB_BASE_URL}/{realm_id}/query" | |
| headers = { | |
| "Authorization": f"Bearer {access_token}", | |
| "Accept": "application/json" | |
| } | |
| params = {"query": query} | |
| resp = requests.get(url, headers=headers, params=params, timeout=30) | |
| if resp.status_code != 200: | |
| return receipts, f"QuickBooks API Error: {resp.status_code}: {resp.text}" | |
| data = resp.json() | |
| batch = data.get("QueryResponse", {}).get("SalesReceipt", []) | |
| receipts.extend(batch) | |
| if len(batch) < max_results: | |
| more = False | |
| else: | |
| start_position += max_results | |
| return receipts, None | |
| def fetch_tax_codes(access_token, realm_id): | |
| url = f"{QB_BASE_URL}/{realm_id}/query" | |
| headers = { | |
| "Authorization": f"Bearer {access_token}", | |
| "Accept": "application/json" | |
| } | |
| params = {"query": "SELECT * FROM TaxCode"} | |
| resp = requests.get(url, headers=headers, params=params) | |
| if resp.status_code != 200: | |
| return None, f"QuickBooks API Error: {resp.status_code}: {resp.text}" | |
| data = resp.json() | |
| tax_codes = data.get("QueryResponse", {}).get("TaxCode", []) | |
| return tax_codes, None | |
| # ========== INVOICE CREATION ========== | |
| def create_invoice_from_sales_receipt(access_token, realm_id, receipt, tax_code_id): | |
| url = f"{QB_BASE_URL}/{realm_id}/invoice" | |
| headers = { | |
| "Authorization": f"Bearer {access_token}", | |
| "Accept": "application/json", | |
| "Content-Type": "application/json" | |
| } | |
| customer_ref = receipt.get("CustomerRef", {}) | |
| if not customer_ref or "value" not in customer_ref: | |
| return None, "Missing CustomerRef in SalesReceipt" | |
| lines = [] | |
| for line in receipt.get("Line", []): | |
| if line.get("DetailType") != "SalesItemLineDetail": | |
| continue | |
| detail = line.get("SalesItemLineDetail", {}) | |
| item_ref = detail.get("ItemRef", {}) | |
| if not item_ref or "value" not in item_ref: | |
| continue | |
| new_line = { | |
| "Amount": line.get("Amount"), | |
| "DetailType": "SalesItemLineDetail", | |
| "Description": line.get("Description", "-"), | |
| "SalesItemLineDetail": { | |
| "ItemRef": {"value": item_ref["value"]}, | |
| "Qty": detail.get("Qty"), | |
| "UnitPrice": detail.get("UnitPrice"), | |
| "TaxCodeRef": {"value": tax_code_id} | |
| } | |
| } | |
| lines.append(new_line) | |
| if not lines: | |
| return None, "No SalesItemLineDetail line items found in SalesReceipt" | |
| invoice_data = { | |
| "CustomerRef": {"value": customer_ref["value"]}, | |
| "Line": lines, | |
| "DocNumber": receipt.get("DocNumber"), | |
| } | |
| if receipt.get("BillAddr"): | |
| invoice_data["BillAddr"] = receipt["BillAddr"] | |
| if receipt.get("ShipAddr"): | |
| invoice_data["ShipAddr"] = receipt["ShipAddr"] | |
| resp = requests.post(url, headers=headers, data=json.dumps(invoice_data)) | |
| if resp.status_code not in (200, 201): | |
| return None, f"QuickBooks API Error: {resp.status_code}: {resp.text}" | |
| return resp.json(), None | |
| # ========== EXTRACT LINE ITEMS (with Bill/Ship Addr) ========== | |
| def flatten_address(addr): | |
| if not addr: | |
| return "-" | |
| parts = [addr.get(k, "") for k in ["Line1", "Line2", "Line3", "City", "CountrySubDivisionCode", "PostalCode", "Country"]] | |
| return ", ".join([p for p in parts if p]) | |
| def extract_line_items(receipts): | |
| rows = [] | |
| for r in receipts: | |
| txn_date = r.get("TxnDate", "") | |
| txn_type = "Sales Receipt" | |
| txn_number = r.get("DocNumber", r.get("Id", "")) | |
| customer_ref = r.get("CustomerRef", {}) | |
| customer_name = customer_ref.get("name", "") if isinstance(customer_ref, dict) else "" | |
| bill_addr_str = flatten_address(r.get("BillAddr", {})) | |
| ship_addr_str = flatten_address(r.get("ShipAddr", {})) | |
| lines = r.get("Line", []) | |
| for line in lines: | |
| if line.get("DetailType") != "SalesItemLineDetail": | |
| continue | |
| description = line.get("Description", "-") | |
| detail = line.get("SalesItemLineDetail", {}) | |
| item_name = detail.get("ItemRef", {}).get("name", "") if isinstance(detail.get("ItemRef", {}), dict) else "" | |
| quantity = detail.get("Qty", "") | |
| unit_price = detail.get("UnitPrice", "") | |
| amount = float(line.get("Amount", 0) or 0) | |
| debit = amount if amount > 0 else "" | |
| credit = -amount if amount < 0 else "" | |
| row = { | |
| "Transaction date": txn_date, | |
| "Transaction type": txn_type, | |
| "#": txn_number, | |
| "Name": customer_name, | |
| "Memo/Description": description, | |
| "Item Name": item_name, | |
| "Quantity": quantity if quantity != "" else "-", | |
| "Unit Price": unit_price if unit_price != "" else "-", | |
| "Debit": debit if debit != "" else "-", | |
| "Credit": credit if credit != "" else "-", | |
| "Billing Address": bill_addr_str, | |
| "Shipping Address": ship_addr_str | |
| } | |
| rows.append(row) | |
| return rows | |
| # ========== PER-RECEIPT JSON ZIP (include Bill/Ship Addr at top) ========== | |
| def build_zip_of_json(receipts): | |
| buf = io.BytesIO() | |
| with zipfile.ZipFile(buf, "w", compression=zipfile.ZIP_DEFLATED) as z: | |
| for r in receipts: | |
| number = r.get("DocNumber") or r.get("Id") or "unknown" | |
| out = r.copy() | |
| # Bill/Ship Addr at top level | |
| if "BillAddr" in r: | |
| out["BillAddr"] = r["BillAddr"] | |
| if "ShipAddr" in r: | |
| out["ShipAddr"] = r["ShipAddr"] | |
| fname = f"SalesReceipt-{number}.json" | |
| z.writestr(fname, json.dumps(out, indent=2)) | |
| buf.seek(0) | |
| return buf | |
| # ========== SESSION STATE ========== | |
| if "seen_receipt_ids" not in st.session_state: | |
| st.session_state["seen_receipt_ids"] = set() | |
| if "last_checked_iso" not in st.session_state: | |
| st.session_state["last_checked_iso"] = "2000-01-01T00:00:00Z" | |
| if "latest_new_receipts" not in st.session_state: | |
| st.session_state["latest_new_receipts"] = [] | |
| # ========== UI ========== | |
| with st.sidebar: | |
| st.header("QuickBooks Actions") | |
| max_results = st.slider("Max results per page (pagination)", min_value=10, max_value=1000, value=100, step=10) | |
| all_btn = st.button("Fetch All Sales Receipt Line Items", type="primary") | |
| st.markdown("---") | |
| st.markdown("**New Sales Receipts Only**") | |
| last_checked = st.text_input( | |
| "Last Checked Time (UTC, ISO)", | |
| value=st.session_state["last_checked_iso"], | |
| help="e.g., 2025-07-16T00:00:00Z" | |
| ) | |
| new_btn = st.button("Fetch New Sales Receipts") | |
| st.markdown("---") | |
| st.markdown("**Tax Codes**") | |
| fetch_tax_btn = st.button("Fetch Tax Codes") | |
| tab = st.empty() | |
| if fetch_tax_btn: | |
| with st.spinner("Refreshing QuickBooks access token..."): | |
| access_token, _, err = refresh_access_token(QB_CLIENT_ID, QB_CLIENT_SECRET, QB_REFRESH_TOKEN) | |
| if err or not access_token: | |
| st.error(f"Could not refresh access token: {err}") | |
| else: | |
| with st.spinner("Querying QuickBooks for Tax Codes..."): | |
| tax_codes, api_err = fetch_tax_codes(access_token, QB_REALM_ID) | |
| if api_err: | |
| st.error(api_err) | |
| elif not tax_codes: | |
| st.warning("No tax codes found.") | |
| else: | |
| df = pd.DataFrame([ | |
| { | |
| "TaxCode Id": tc.get("Id", "-"), | |
| "TaxCode Name": tc.get("Name", "-"), | |
| "Description": tc.get("Description", "-") | |
| } | |
| for tc in tax_codes | |
| ]) | |
| st.success(f"Found {len(tax_codes)} tax codes.") | |
| st.dataframe(df, use_container_width=True) | |
| st.markdown("> Use the **TaxCode Id** (not the name) in your API calls as `TaxCodeRef.value`.") | |
| if all_btn: | |
| with st.spinner("Refreshing QuickBooks access token..."): | |
| access_token, _, err = refresh_access_token(QB_CLIENT_ID, QB_CLIENT_SECRET, QB_REFRESH_TOKEN) | |
| if err or not access_token: | |
| st.error(f"Could not refresh access token: {err}") | |
| st.stop() | |
| with st.spinner("Querying QuickBooks for all Sales Receipts..."): | |
| receipts, api_err = get_all_sales_receipts(access_token, QB_REALM_ID, start_position=1, max_results=max_results) | |
| if api_err: | |
| st.error(api_err) | |
| elif not receipts: | |
| st.warning("No sales receipts found.") | |
| else: | |
| st.success(f"Found {len(receipts)} sales receipts.") | |
| line_items = extract_line_items(receipts) | |
| df = pd.DataFrame(line_items) | |
| with tab.container(): | |
| st.markdown("### Sales Receipt Line Items (All)") | |
| st.dataframe(df, use_container_width=True) | |
| csv = df.to_csv(index=False) | |
| st.download_button( | |
| label="Download Line Items as CSV", | |
| data=csv, | |
| file_name="quickbooks_sales_receipt_line_items.csv", | |
| mime="text/csv" | |
| ) | |
| zip_buf = build_zip_of_json(receipts) | |
| st.download_button( | |
| label="Download All Sales Receipts (ZIP of JSONs)", | |
| data=zip_buf, | |
| file_name="quickbooks_sales_receipts_json.zip", | |
| mime="application/zip" | |
| ) | |
| st.markdown("---") | |
| st.markdown("### View Full JSON of a Sales Receipt") | |
| idx = st.number_input("Select Sales Receipt #", min_value=1, max_value=len(receipts), value=1) | |
| st.json(receipts[idx-1]) | |
| st.session_state["seen_receipt_ids"] = {r.get("Id") for r in receipts} | |
| st.session_state["last_checked_iso"] = datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ") | |
| if new_btn: | |
| with st.spinner("Refreshing QuickBooks access token..."): | |
| access_token, _, err = refresh_access_token(QB_CLIENT_ID, QB_CLIENT_SECRET, QB_REFRESH_TOKEN) | |
| if err or not access_token: | |
| st.error(f"Could not refresh access token: {err}") | |
| st.stop() | |
| new_since = last_checked.strip() | |
| if "Z" not in new_since and "+" not in new_since and "-" not in new_since[10:]: | |
| new_since += "Z" | |
| with st.spinner(f"Querying QuickBooks for new Sales Receipts since {new_since}..."): | |
| receipts, api_err = get_new_sales_receipts(access_token, QB_REALM_ID, new_since, max_results=max_results) | |
| if api_err: | |
| st.error(api_err) | |
| st.session_state["latest_new_receipts"] = [] | |
| elif not receipts: | |
| st.info("No new sales receipts found.") | |
| st.session_state["latest_new_receipts"] = [] | |
| else: | |
| already_seen = st.session_state.get("seen_receipt_ids", set()) | |
| new_receipts = [r for r in receipts if r.get("Id") not in already_seen] | |
| if not new_receipts: | |
| st.info("No new, unseen sales receipts since last check.") | |
| st.session_state["latest_new_receipts"] = [] | |
| else: | |
| st.success(f"Found {len(new_receipts)} NEW sales receipts.") | |
| line_items = extract_line_items(new_receipts) | |
| df = pd.DataFrame(line_items) | |
| with tab.container(): | |
| st.markdown(f"### Sales Receipt Line Items (New since {new_since})") | |
| st.dataframe(df, use_container_width=True) | |
| csv = df.to_csv(index=False) | |
| st.download_button( | |
| label="Download NEW Line Items as CSV", | |
| data=csv, | |
| file_name="quickbooks_new_sales_receipt_line_items.csv", | |
| mime="text/csv" | |
| ) | |
| zip_buf = build_zip_of_json(new_receipts) | |
| st.download_button( | |
| label="Download NEW Sales Receipts (ZIP of JSONs)", | |
| data=zip_buf, | |
| file_name="quickbooks_new_sales_receipts_json.zip", | |
| mime="application/zip" | |
| ) | |
| st.markdown("---") | |
| st.markdown("### View Full JSON of a NEW Sales Receipt") | |
| idx = st.number_input("Select New Sales Receipt #", min_value=1, max_value=len(new_receipts), value=1) | |
| st.json(new_receipts[idx-1]) | |
| st.session_state["seen_receipt_ids"].update({r.get("Id") for r in new_receipts}) | |
| st.session_state["last_checked_iso"] = datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ") | |
| st.session_state["latest_new_receipts"] = new_receipts | |
| # ========== CREATE INVOICES FROM LATEST NEW RECEIPTS ========== | |
| if st.session_state.get("latest_new_receipts"): | |
| st.markdown("---") | |
| if st.button("Create Invoice(s) from New Sales Receipts"): | |
| with st.spinner("Refreshing QuickBooks access token..."): | |
| access_token, _, err = refresh_access_token(QB_CLIENT_ID, QB_CLIENT_SECRET, QB_REFRESH_TOKEN) | |
| if err or not access_token: | |
| st.error(f"Could not refresh access token: {err}") | |
| else: | |
| created, failed = 0, 0 | |
| results = [] | |
| for receipt in st.session_state["latest_new_receipts"]: | |
| resp, err_msg = create_invoice_from_sales_receipt(access_token, QB_REALM_ID, receipt, TAX_CODE_ID) | |
| if resp: | |
| created += 1 | |
| results.append({"SalesReceipt": receipt.get("DocNumber", ""), "Status": "Created", "InvoiceId": resp.get("Invoice", {}).get("Id", "-")}) | |
| else: | |
| failed += 1 | |
| results.append({"SalesReceipt": receipt.get("DocNumber", ""), "Status": f"Error: {err_msg}", "InvoiceId": ""}) | |
| if created: | |
| st.success(f"{created} invoice(s) created in QuickBooks! ({failed} failed)") | |
| if failed: | |
| st.error(f"{failed} invoice(s) failed to create.") | |
| st.dataframe(pd.DataFrame(results)) | |
| st.markdown("---") | |
| st.markdown( | |
| """ | |
| <small> | |
| <b>Fetch All Sales Receipt Line Items</b> gets everything from your QuickBooks Sandbox.<br> | |
| <b>Fetch New Sales Receipts</b> only gets newly created receipts since your last fetch.<br> | |
| Billing/Shipping address included everywhere. Invoice creation is available after fetching new receipts.<br> | |
| </small> | |
| """, unsafe_allow_html=True | |
| ) | |