""" Experiment: Local Table QA with TAPAS/TAPEX models on financial data. Runs a table QA model locally on CPU to answer questions about data from the PostgreSQL financial database. Supports two architectures: - TAPAS (google/tapas-*): cell selection + aggregation - TAPEX (microsoft/tapex-*): seq2seq text generation (BART-based) Run: uv run --extra experiment python scripts/experiment_table_qa.py uv run --extra experiment python scripts/experiment_table_qa.py --model google/tapas-small-finetuned-wtq uv run --extra experiment python scripts/experiment_table_qa.py --model microsoft/tapex-base-finetuned-wtq """ import argparse import json import sys import time from datetime import datetime from pathlib import Path # Add project root to path so we can import src.* sys.path.insert(0, str(Path(__file__).resolve().parent.parent)) import pandas as pd from src.db.connection import get_connection DEFAULT_MODEL = "google/tapas-mini-finetuned-wtq" QUERY = """ SELECT transaction_date::text AS date, transaction_description AS description, category_name AS category, entry_amount::text AS amount, account_name AS account FROM v_transaction_details WHERE category_name IS NOT NULL ORDER BY transaction_date DESC LIMIT 15 """ QUESTIONS = [ "What is the total amount?", "Which category has the highest amount?", "How many transactions are there?", ] RESULTS_FILE = Path(__file__).resolve().parent.parent / "eval_cases" / "table_qa_results.jsonl" def is_tapex(model_name: str) -> bool: return "tapex" in model_name.lower() def fetch_table() -> pd.DataFrame: print("[STEP 1] Connecting to PostgreSQL...") with get_connection() as conn: print(f" -> Connected to: {conn.dsn}") print(f" -> Executing query:\n{QUERY.strip()}") with conn.cursor() as cur: cur.execute(QUERY) columns = [desc[0] for desc in cur.description] rows = cur.fetchall() print(f" -> Raw result: {len(rows)} rows, {len(columns)} columns") print(f" -> Columns: {columns}") # Build DataFrame from dict of lists — pandas 2.x defaults to object dtype. # TAPAS tokenizer mutates cells via iloc with its internal Cell namedtuple, # which requires object dtype (incompatible with pandas 3.0 StringDtype). data = {col: [str(row[i]) for row in rows] for i, col in enumerate(columns)} df = pd.DataFrame.from_dict(data) print(f" -> Dtypes (should be object):\n{df.dtypes.to_string()}") print(f" -> Sample row: {dict(df.iloc[0])}") print() return df def load_tapas(model_name): from transformers import pipeline tqa = pipeline("table-question-answering", model=model_name, device=-1) param_count = sum(p.numel() for p in tqa.model.parameters()) print(f" -> Architecture: TAPAS (cell selection + aggregation)") print(f" -> Model class: {type(tqa.model).__name__}") print(f" -> Tokenizer: {type(tqa.tokenizer).__name__}") print(f" -> Model params: {param_count:,}") return tqa, param_count def load_tapex(model_name): from transformers import BartForConditionalGeneration, TapexTokenizer tokenizer = TapexTokenizer.from_pretrained(model_name) model = BartForConditionalGeneration.from_pretrained(model_name) param_count = sum(p.numel() for p in model.parameters()) print(f" -> Architecture: TAPEX (seq2seq text generation, BART-based)") print(f" -> Model class: {type(model).__name__}") print(f" -> Tokenizer: {type(tokenizer).__name__}") print(f" -> Model params: {param_count:,}") return (tokenizer, model), param_count def run_tapas(tqa, table, query): result = tqa(table=table, query=query) return { "answer": result["answer"], "cells": result.get("cells", []), "aggregator": result.get("aggregator", "NONE"), } def run_tapex(tapex_pair, table, query): tokenizer, model = tapex_pair encoding = tokenizer(table=table, query=query, return_tensors="pt", truncation=True) print(f" -> Input token count: {encoding['input_ids'].shape[1]}") outputs = model.generate(**encoding, max_new_tokens=50) decoded = tokenizer.batch_decode(outputs, skip_special_tokens=True) answer = decoded[0] if decoded else "" return { "answer": answer, "cells": [], "aggregator": "seq2seq", } def main(): parser = argparse.ArgumentParser(description="Table QA experiment (TAPAS/TAPEX)") parser.add_argument("--model", default=DEFAULT_MODEL, help="HuggingFace model name") args = parser.parse_args() model_name = args.model use_tapex = is_tapex(model_name) run_results = { "timestamp": datetime.now().isoformat(), "model": model_name, "architecture": "tapex" if use_tapex else "tapas", "questions": [], } # --- Model loading --- print("=" * 60) print("[STEP 2] Loading model") print("=" * 60) print(f" -> Model: {model_name}") print(f" -> Device: CPU") print() t0 = time.time() if use_tapex: model_obj, param_count = load_tapex(model_name) else: model_obj, param_count = load_tapas(model_name) load_time = time.time() - t0 print(f" -> Load time: {load_time:.2f}s") print() run_results["params"] = param_count run_results["load_time_s"] = round(load_time, 2) # --- Data fetching --- print("=" * 60) print("[STEP 1] Fetching data from PostgreSQL") print("=" * 60) table = fetch_table() # --- Display table --- print("=" * 60) print("TABLE (15 most recent transactions)") print("=" * 60) print(table.to_string(index=False)) print() # --- Q&A --- run_fn = run_tapex if use_tapex else run_tapas print("=" * 60) print("[STEP 3] Running Table QA inference") print("=" * 60) for i, q in enumerate(QUESTIONS, 1): print(f"\n--- Question {i}/{len(QUESTIONS)} ---") print(f" -> Input query: {q!r}") print(f" -> Input table shape: {table.shape}") t0 = time.time() result = run_fn(model_obj, table, q) inference_time = time.time() - t0 print(f" -> Answer: {result['answer']}") print(f" -> Cells: {result.get('cells', [])}") print(f" -> Aggregator: {result.get('aggregator', 'N/A')}") print(f" -> Inference time: {inference_time:.3f}s") run_results["questions"].append({ "query": q, "answer": result["answer"], "cells": result.get("cells", []), "aggregator": result.get("aggregator", "N/A"), "inference_time_s": round(inference_time, 3), }) print() # --- Save results --- RESULTS_FILE.parent.mkdir(parents=True, exist_ok=True) with open(RESULTS_FILE, "a") as f: f.write(json.dumps(run_results, ensure_ascii=False) + "\n") print(f"Results appended to: {RESULTS_FILE}") print("=" * 60) print("Experiment complete.") print("=" * 60) if __name__ == "__main__": main()