import os import pickle import logging from typing import Optional from google.auth.transport.requests import Request from google_auth_oauthlib.flow import InstalledAppFlow from googleapiclient.discovery import build logger = logging.getLogger(__name__) SCOPES = [ "https://www.googleapis.com/auth/spreadsheets", "https://www.googleapis.com/auth/drive.readonly" ] _PROJECT_ROOT = os.path.abspath( os.path.join(os.path.dirname(__file__), "..", "..", "..") ) _CREDENTIALS_PATH = os.path.join(_PROJECT_ROOT, "credentials.json") _TOKEN_PATH = os.path.join(_PROJECT_ROOT, "token_sheets.pkl") _LEDGER_ID_PATH = os.path.join(_PROJECT_ROOT, ".ledger_id") # We will use these columns for the ledger HEADERS = ["Date", "Vendor", "Category", "Total", "Payment Method", "Order ID", "Notes", "Message ID"] def get_sheets_service(): """Authenticate and return the Google Sheets API service.""" import base64 # Reconstruct files from Hugging Face Secrets if available # Support both naming conventions: SHEETS_TOKEN_B64 and TOKEN_SHEETS_B64 sheets_token_b64 = os.environ.get("SHEETS_TOKEN_B64") or os.environ.get("TOKEN_SHEETS_B64") if sheets_token_b64 and not os.path.exists(_TOKEN_PATH): try: with open(_TOKEN_PATH, "wb") as f: f.write(base64.b64decode(sheets_token_b64)) logger.info(f"[SHEETS] Reconstructed token_sheets.pkl from HF Secret") except Exception as e: logger.error(f"Failed to decode Sheets token secret: {e}") gcp_creds_b64 = os.environ.get("GCP_CREDENTIALS_B64") if gcp_creds_b64 and not os.path.exists(_CREDENTIALS_PATH): try: with open(_CREDENTIALS_PATH, "wb") as f: f.write(base64.b64decode(gcp_creds_b64)) logger.info(f"[SHEETS] Reconstructed credentials.json from HF Secret") except Exception as e: logger.error(f"Failed to decode GCP_CREDENTIALS_B64: {e}") creds = None if os.path.exists(_TOKEN_PATH): with open(_TOKEN_PATH, "rb") as f: creds = pickle.load(f) if not creds or not creds.valid: if creds and creds.expired and creds.refresh_token: try: creds.refresh(Request()) except Exception: os.remove(_TOKEN_PATH) creds = None if not creds: if not os.path.exists(_CREDENTIALS_PATH): raise FileNotFoundError( f"Missing {_CREDENTIALS_PATH}. Please download it from Google Cloud Console." ) flow = InstalledAppFlow.from_client_secrets_file(_CREDENTIALS_PATH, SCOPES) creds = flow.run_local_server(port=0) with open(_TOKEN_PATH, "wb") as f: pickle.dump(creds, f) return build("sheets", "v4", credentials=creds) def find_spreadsheet_by_name(service, name: str) -> Optional[str]: """Search for a spreadsheet by name and return its ID.""" try: from googleapiclient.discovery import build drive_service = build('drive', 'v3', credentials=service._http.credentials) # Search for both native sheets and also handle dummy/xlsx naming query = f"mimeType='application/vnd.google-apps.spreadsheet' and name contains '{name}' and trashed = false" results = drive_service.files().list(q=query, fields="files(id, name)").execute() files = results.get('files', []) if files: return files[0]['id'] except Exception as e: logger.warning(f"[SHEETS] Auto-discovery failed: {e}") return None def get_or_create_ledger(service) -> str: """Return the existing ledger ID or create a new one with headers.""" if os.environ.get("SHEETS_LEDGER_ID"): return os.environ.get("SHEETS_LEDGER_ID") if os.path.exists(_LEDGER_ID_PATH): with open(_LEDGER_ID_PATH, "r") as f: spreadsheet_id = f.read().strip() if spreadsheet_id: return spreadsheet_id # Auto-discovery attempt before creating new discovered_id = find_spreadsheet_by_name(service, "Trace_Financial_Audit_Dummy_Sheet") if not discovered_id: discovered_id = find_spreadsheet_by_name(service, "Trace Financial Ledger") if discovered_id: with open(_LEDGER_ID_PATH, "w") as f: f.write(discovered_id) return discovered_id # Create a new spreadsheet spreadsheet = { 'properties': { 'title': 'Trace Financial Ledger' } } spreadsheet = service.spreadsheets().create(body=spreadsheet, fields='spreadsheetId').execute() spreadsheet_id = spreadsheet.get('spreadsheetId') # Save ID with open(_LEDGER_ID_PATH, "w") as f: f.write(spreadsheet_id) # Write headers body = { 'values': [HEADERS] } service.spreadsheets().values().update( spreadsheetId=spreadsheet_id, range=f"A1:{chr(64 + len(HEADERS))}1", valueInputOption="RAW", body=body ).execute() # Optional: Make headers bold requests = [ { "repeatCell": { "range": { "sheetId": 0, "startRowIndex": 0, "endRowIndex": 1, "startColumnIndex": 0, "endColumnIndex": len(HEADERS) }, "cell": { "userEnteredFormat": { "textFormat": { "bold": True } } }, "fields": "userEnteredFormat.textFormat.bold" } } ] service.spreadsheets().batchUpdate( spreadsheetId=spreadsheet_id, body={'requests': requests} ).execute() logger.info(f"[SHEETS] Created new ledger. ID: {spreadsheet_id}") return spreadsheet_id def append_transactions(transactions: list[dict]) -> str: """ Append new transactions to the Google Sheet ledger with deduplication. We return the URL of the Google Sheet. """ if not transactions: return "" try: service = get_sheets_service() spreadsheet_id = get_or_create_ledger(service) # ── Step 1: Fetch existing Message IDs from all tabs to prevent duplicates ──────── spreadsheet = service.spreadsheets().get(spreadsheetId=spreadsheet_id).execute() sheets = spreadsheet.get('sheets', []) existing_ids = set() for sheet in sheets: sheet_title = sheet.get('properties', {}).get('title') try: result = service.spreadsheets().values().get( spreadsheetId=spreadsheet_id, range=f"'{sheet_title}'!H:H" # Message ID is column H ).execute() for row in result.get('values', []): if row: existing_ids.add(row[0]) except Exception as e: logger.warning(f"[SHEETS] Failed to fetch IDs from tab '{sheet_title}': {e}") # ── Step 2: Prepare new rows ──────────────────────────────────────── rows = [] for t in transactions: msg_id = t.get("id") if not msg_id or msg_id in existing_ids: continue # Skip duplicates or missing IDs import re amount_str = re.sub(r'[^\d.]', '', t.get("total") or "") amount = float(amount_str) if amount_str else 0.0 # If no amount was found AND no useful metadata exists, skip has_useful_metadata = ( t.get("vendor", "").lower() not in ("", "unknown") or t.get("subject", "").strip() or t.get("body_preview", "").strip() ) if amount == 0 and not has_useful_metadata: continue # Extract notes from details details = [] if t.get("details"): for k, v in t["details"].items(): if isinstance(v, list): details.append(f"{k}: {len(v)} items") else: details.append(f"{k}: {v}") # Fallback: include subject/body preview for unparsed entries if not details: subj = t.get("subject", "").strip() if subj: details.append(subj[:120]) elif t.get("body_preview", "").strip(): details.append(t["body_preview"][:120]) notes_str = " | ".join(details) row = [ t.get("date", ""), t.get("vendor", ""), t.get("category", ""), amount, t.get("payment_method", "Unknown"), t.get("order_id", ""), notes_str, msg_id ] rows.append(row) if not rows: logger.info("[SHEETS] No new transactions to append.") return f"https://docs.google.com/spreadsheets/d/{spreadsheet_id}" body = { 'values': rows } result = service.spreadsheets().values().append( spreadsheetId=spreadsheet_id, range="A:H", valueInputOption="USER_ENTERED", insertDataOption="INSERT_ROWS", body=body ).execute() logger.info(f"[SHEETS] Appended {result.get('updates', {}).get('updatedRows', 0)} new rows.") return f"https://docs.google.com/spreadsheets/d/{spreadsheet_id}" except Exception as e: logger.error(f"[SHEETS] Error appending to Sheets: {e}") return "" def fetch_and_summarize() -> dict: """ Fetch all rows from the Google Sheet and generate a summary. """ try: service = get_sheets_service() spreadsheet_id = get_or_create_ledger(service) # Log metadata to help debugging spreadsheet = service.spreadsheets().get(spreadsheetId=spreadsheet_id).execute() title = spreadsheet.get('properties', {}).get('title') sheets = spreadsheet.get('sheets', []) logger.info(f"[SHEETS] Fetching from '{title}' (ID: {spreadsheet_id}) across {len(sheets)} tabs") all_rows = [] for sheet in sheets: sheet_title = sheet.get('properties', {}).get('title') try: # Read the full range for each sheet result = service.spreadsheets().values().get( spreadsheetId=spreadsheet_id, range=f"'{sheet_title}'!A:H" ).execute() values = result.get('values', []) if not values: continue # Basic header detection: skip if first row, first column says "Date" if len(values[0]) > 0 and str(values[0][0]).strip().lower() == "date": all_rows.extend(values[1:]) else: all_rows.extend(values) except Exception as e: logger.warning(f"[SHEETS] Failed to fetch tab '{sheet_title}': {e}") logger.info(f"[SHEETS] Found {len(all_rows)} total transaction rows across all tabs.") if not all_rows: logger.info("[SHEETS] Ledger is empty.") return { "total_spend": 0.0, "count": 0, "by_category": {}, "by_vendor": {} } rows = all_rows total_spend = 0.0 by_category = {} by_vendor = {} count = 0 transactions = [] # Added to return the actual data import re for row in rows: # Basic validation: must have at least Date, Vendor, Category, Total if len(row) < 4: continue vendor = row[1] or "Unknown" category = row[2] or "unknown" # Extract amount from Column D (index 3) amount_val = row[3] if not amount_val: continue try: # Remove everything except digits and decimal point cleaned_amt = re.sub(r'[^\d.]', '', str(amount_val)) if not cleaned_amt: amount = 0.0 else: amount = float(cleaned_amt) except (ValueError, IndexError) as e: logger.warning(f"[SHEETS] Failed to parse amount '{amount_val}': {e}") amount = 0.0 # Get Message ID from Column H (index 7) msg_id = row[7] if len(row) > 7 else None if amount > 0: total_spend += amount by_category[category] = by_category.get(category, 0) + amount by_vendor[vendor] = by_vendor.get(vendor, 0) + amount count += 1 # Add to transactions list transactions.append({ "date": row[0], "vendor": vendor, "category": category, "total": str(amount), "payment_method": row[4] if len(row) > 4 else "Unknown", "order_id": row[5] if len(row) > 5 else "", "notes": row[6] if len(row) > 6 else "", "id": msg_id, "_source": "sheets" }) summary = { "total_spend": round(total_spend, 2), "count": count, "by_category": {k: round(v, 2) for k, v in sorted(by_category.items(), key=lambda x: -x[1])}, "by_vendor": {k: round(v, 2) for k, v in sorted(by_vendor.items(), key=lambda x: -x[1])}, "transactions": transactions, # Include the list "sheet_url": f"https://docs.google.com/spreadsheets/d/{spreadsheet_id}" } logger.info(f"[SHEETS] Fetched summary: ₹{total_spend:,.2f} across {count} items.") return summary except Exception as e: logger.error(f"[SHEETS] Error fetching from Sheets: {e}") return {"error": str(e), "total_spend": 0.0}