US-aware Capture/Ledger + wire in the reasoning LLM agent (Qwen3-8B via vLLM/Modal) with router fallback
d103096 verified | """Choose how the agent's LLM runs — the 'both' strategy. | |
| One config switch selects the client behind the same Agent loop: | |
| * ``local`` — LlamaCppClient loads our GGUF and runs offline (🔌 Off the Grid). | |
| * ``modal`` — ModalClient calls a Modal GPU endpoint serving the fine-tuned model. | |
| * ``router`` — a deterministic stand-in (no model) that routes a question to the | |
| right tools and composes a grounded answer. Lets the hosted free-CPU | |
| Space work with zero GPU, and is the always-on fallback. | |
| The tax math is deterministic and local in every mode — only the natural-language | |
| planning/explanation changes. | |
| """ | |
| from __future__ import annotations | |
| import json | |
| import re | |
| import unicodedata | |
| from typing import List, Optional | |
| from .llm import AssistantTurn, LLMClient, ToolCall | |
| def _norm(s: str) -> str: | |
| return "".join(c for c in unicodedata.normalize("NFD", s.lower()) | |
| if unicodedata.category(c) != "Mn") | |
| _PERIOD_RE = re.compile(r"\[(\d{4})-(\d{2})\]") | |
| _LANG_RE = re.compile(r"\[(en|es)\]") | |
| _COUNTRY_RE = re.compile(r"\[(us|mx)\]") | |
| _TAGS_RE = re.compile(r"\[(?:en|es|us|mx|\d{4}-\d{2})\]") | |
| class RouterClient: | |
| """Deterministic planner+composer so the agent works without a model. | |
| Bilingual: the Ask tab tags the message with [en]/[es]; the router routes on | |
| English + Spanish keywords and composes the answer in the requested language. | |
| """ | |
| def chat(self, messages: List[dict], tools: List[dict]) -> AssistantTurn: | |
| if messages and messages[-1].get("role") == "tool": | |
| return AssistantTurn(text=self._compose(messages)) | |
| user = next((m["content"] for m in reversed(messages) | |
| if m.get("role") == "user"), "") | |
| return AssistantTurn(tool_calls=self._plan(user)) | |
| # --- planning --------------------------------------------------------- | |
| def _period(self, text: str): | |
| m = _PERIOD_RE.search(text) | |
| if m: | |
| return int(m.group(1)), int(m.group(2)) | |
| return 2024, 5 # demo default | |
| def _lang(self, text: str) -> str: | |
| m = _LANG_RE.search(text) | |
| return m.group(1) if m else "en" | |
| def _country(self, text: str) -> str: | |
| m = _COUNTRY_RE.search(text) | |
| return m.group(1) if m else "mx" | |
| _RULE_WORDS = ["deduc", "puedo", "requisit", "obligacion", "cuando declaro", | |
| "plazo", "fecha limite", "regulacion", "ley ", "permitido", | |
| "es legal", "tengo que", "debo declarar", "necesito", "debo cobrar", | |
| "can i deduct", "deduct", "requirement", "obligation", | |
| "when do i file", "when do i declare", "deadline", "regulation", | |
| "allowed", "is it legal", "do i have to", "do i need", "need to", | |
| "should i", "collect", "write off", "write-off"] | |
| # Explicit "compute my tax bill" intent (US), vs. an info/rule question. | |
| _US_COMPUTE_WORDS = ["how much", "owe", "estimate", "my tax", "se tax", | |
| "self-employment", "self employment", "quarterly", "1040", | |
| "what do i pay", "how much do i"] | |
| _STATEMENT_WORDS = ["utilidad", "ganancia", "perdida", "resultado", "rentab", | |
| "profit", "income statement", "p&l", "earnings", "net profit"] | |
| _BALANCE_WORDS = ["balance", "activo", "pasivo", "capital", | |
| "assets", "liabilit", "equity"] | |
| _SUMMARY_WORDS = ["resumen", "cuanto gane", "cuanto facture", "ingreso", "gasto", | |
| "facture", "vendi", "summary", "how much did i", "income", | |
| "expenses", "revenue", "sales"] | |
| _CLASSIFY_WORDS = ["clasific", "que cuenta", "categoriz", "classify", | |
| "which account", "categorize", "what account"] | |
| def _plan(self, user: str) -> List[ToolCall]: | |
| q = _norm(user) | |
| year, month = self._period(user) | |
| country = self._country(user) | |
| question = _TAGS_RE.sub("", user).strip() | |
| ym = {"year": year, "month": month} | |
| is_rule = any(w in q for w in self._RULE_WORDS) and "cuanto" not in q and "how much" not in q | |
| if country == "us": | |
| juris = {"query": question, "jurisdiction": "US"} | |
| if is_rule: | |
| return [ToolCall("cite_regulation", juris)] | |
| if any(w in q for w in self._US_COMPUTE_WORDS): | |
| return [ToolCall("us_tax_summary", {"year": year})] | |
| if any(w in q for w in self._STATEMENT_WORDS): | |
| return [ToolCall("income_statement", {"year": year, "month": month})] | |
| if any(w in q for w in self._BALANCE_WORDS): | |
| return [ToolCall("balance_sheet", {})] | |
| if any(w in q for w in self._SUMMARY_WORDS): | |
| return [ToolCall("income_statement", {"year": year, "month": month})] | |
| return [ToolCall("cite_regulation", juris)] | |
| # --- Mexico --- | |
| if is_rule: | |
| return [ToolCall("cite_regulation", {"query": question, "jurisdiction": "MX"})] | |
| if any(w in q for w in ["regimen", "conviene", "resico", "regime", "suits", "which regime"]): | |
| return [ToolCall("compare_regimes", ym)] | |
| if "iva" in q or "vat" in q: | |
| return [ToolCall("compute_iva", ym)] | |
| if "isr" in q or "income tax" in q: | |
| return [ToolCall("compute_isr_resico", ym)] | |
| if any(w in q for w in self._STATEMENT_WORDS): | |
| return [ToolCall("income_statement", ym)] | |
| if any(w in q for w in self._BALANCE_WORDS): | |
| return [ToolCall("balance_sheet", {})] | |
| if any(w in q for w in self._SUMMARY_WORDS): | |
| return [ToolCall("month_summary", ym)] | |
| if any(w in q for w in self._CLASSIFY_WORDS): | |
| return [ToolCall("classify_transaction", {"description": question})] | |
| return [ToolCall("cite_regulation", {"query": question, "jurisdiction": "MX"})] | |
| # --- composing -------------------------------------------------------- | |
| def _recent_tool_results(self, messages: List[dict]): | |
| out = [] | |
| for m in reversed(messages): | |
| if m.get("role") == "tool": | |
| try: | |
| out.append((m.get("name", ""), json.loads(m["content"]))) | |
| except Exception: | |
| pass | |
| elif m.get("role") == "assistant" and m.get("tool_calls"): | |
| break | |
| return list(reversed(out)) | |
| def _compose(self, messages: List[dict]) -> str: | |
| user = next((m["content"] for m in reversed(messages) | |
| if m.get("role") == "user"), "") | |
| lang = self._lang(user) | |
| disclaimer = ("\n\n_Educational assistant — confirm with your accountant (CPA)._" | |
| if lang == "en" else | |
| "\n\n_Asistente educativo — confirma con tu contador (CPA)._") | |
| empty = "No data to answer that." if lang == "en" else "No encontré datos para responder." | |
| parts = [self._format(name, r, lang) for name, r in self._recent_tool_results(messages)] | |
| text = "\n".join(p for p in parts if p) or empty | |
| return text + disclaimer | |
| def _money(v): | |
| try: | |
| return f"${float(v):,.2f}" | |
| except (TypeError, ValueError): | |
| return str(v) | |
| def _format(self, name: str, r: dict, lang: str = "en") -> str: | |
| en = lang == "en" | |
| if name == "cite_regulation": | |
| if r.get("grounded"): | |
| cites = ", ".join(dict.fromkeys( # unique, order-preserving | |
| c["source"] for c in r.get("citations", [])[:3])) | |
| top = r.get("citations", [{}])[0].get("excerpt", "") | |
| lead = "Per" if en else "Según" | |
| return f"📚 {lead} {cites}:\n{top[:260].rstrip()}…" | |
| return "⚠️ " + r.get("message", "No source for that.") | |
| if name == "compare_regimes": | |
| if en: | |
| return (f"🧾 Recommended regime: **{r['recommended']}** — " | |
| f"RESICO {self._money(r['resico_isr'])} vs General " | |
| f"{self._money(r['general_isr'])} (saves {self._money(r['monthly_savings'])}).") | |
| return (f"🧾 Régimen recomendado: **{r['recommended']}** — " | |
| f"RESICO {self._money(r['resico_isr'])} vs General " | |
| f"{self._money(r['general_isr'])} (ahorro {self._money(r['monthly_savings'])}).") | |
| if name == "compute_iva": | |
| label = "VAT (IVA) for the month" if en else r.get("label", "IVA") | |
| return f"💧 {label}: **{self._money(r['amount'])}**." | |
| if name == "compute_isr_resico": | |
| if en: | |
| return f"📊 Income tax (RESICO): **{self._money(r['amount'])}** (income {self._money(r.get('income'))})." | |
| return f"📊 ISR RESICO: **{self._money(r['amount'])}** (ingresos {self._money(r.get('income'))})." | |
| if name == "income_statement": | |
| if en: | |
| return (f"📈 {r['period']}: revenue {self._money(r['revenue'])} − expenses " | |
| f"{self._money(r['expenses'])} = net profit **{self._money(r['net_profit'])}**.") | |
| return (f"📈 {r['period']}: ingresos {self._money(r['revenue'])} − gastos " | |
| f"{self._money(r['expenses'])} = utilidad **{self._money(r['net_profit'])}**.") | |
| if name == "balance_sheet": | |
| if en: | |
| return (f"⚖️ Assets {self._money(r['assets'])} = liabilities {self._money(r['liabilities'])} " | |
| f"+ equity {self._money(r['equity'])}.") | |
| return (f"⚖️ Activos {self._money(r['assets'])} = pasivos {self._money(r['liabilities'])} " | |
| f"+ capital {self._money(r['equity'])}.") | |
| if name == "month_summary": | |
| if en: | |
| return (f"🗂️ Income {self._money(r['income'])}, deductible expenses " | |
| f"{self._money(r['deductible_expenses'])}, VAT collected " | |
| f"{self._money(r['iva_trasladado'])}, VAT paid {self._money(r['iva_acreditable'])}.") | |
| return (f"🗂️ Ingresos {self._money(r['income'])}, gastos deducibles " | |
| f"{self._money(r['deductible_expenses'])}, IVA cobrado " | |
| f"{self._money(r['iva_trasladado'])}, IVA pagado {self._money(r['iva_acreditable'])}.") | |
| if name == "classify_transaction": | |
| if en: | |
| ded = "deductible" if r.get("deducible") else "non-deductible" | |
| return f"🏷️ Classified as **{r['cuenta']}** ({r['sat_code']}) — {ded}." | |
| ded = "deducible" if r.get("deducible") else "no deducible" | |
| return f"🏷️ Se clasifica como **{r['cuenta']}** ({r['sat_code']}) — {ded}." | |
| if name in ("us_tax_summary", "us_tax_estimate"): | |
| def amt(key): | |
| v = r.get(key, {}) | |
| return self._money(v.get("amount") if isinstance(v, dict) else v) | |
| yr = f" {r['year']}" if r.get("year") else "" | |
| if en: | |
| return (f"🇺🇸 US self-employed estimate{yr}: net profit {amt('net_profit')}, " | |
| f"self-employment tax {amt('self_employment_tax')}, federal income tax " | |
| f"{amt('federal_income_tax')} (taxable income {self._money(r.get('taxable_income'))}), " | |
| f"**total ~{self._money(r.get('total_annual_tax'))}/yr**. " | |
| f"Quarterly estimate {amt('quarterly_estimated_tax')}.") | |
| return (f"🇺🇸 Estimación EE. UU.{yr}: utilidad neta {amt('net_profit')}, " | |
| f"impuesto de autoempleo {amt('self_employment_tax')}, impuesto federal " | |
| f"{amt('federal_income_tax')} (base gravable {self._money(r.get('taxable_income'))}), " | |
| f"**total ~{self._money(r.get('total_annual_tax'))}/año**. " | |
| f"Pago trimestral {amt('quarterly_estimated_tax')}.") | |
| return "" | |
| class OpenAIToolClient: | |
| """Talks to an OpenAI-compatible endpoint (our vLLM-served reasoning model). | |
| This is the real agent brain: it does native function-calling, so the Agent loop | |
| works unchanged. Stdlib-only (urllib) so the Space needs no extra dependency. | |
| """ | |
| def __init__(self, base_url: str, model: str = "pa-agent", | |
| api_key: str = "EMPTY", timeout: float = 180.0): | |
| self.base_url = base_url.rstrip("/") | |
| self.model = model | |
| self.api_key = api_key | |
| self.timeout = timeout | |
| def chat(self, messages: List[dict], tools: List[dict]) -> AssistantTurn: | |
| import urllib.request | |
| body = json.dumps({ | |
| "model": self.model, | |
| "messages": messages, | |
| "tools": tools, | |
| "tool_choice": "auto", | |
| "temperature": 0.2, | |
| "max_tokens": 1200, | |
| }).encode() | |
| req = urllib.request.Request( | |
| f"{self.base_url}/chat/completions", data=body, | |
| headers={"Content-Type": "application/json", | |
| "Authorization": f"Bearer {self.api_key}"}) | |
| with urllib.request.urlopen(req, timeout=self.timeout) as resp: | |
| data = json.loads(resp.read()) | |
| msg = data["choices"][0]["message"] | |
| calls = [] | |
| for tc in msg.get("tool_calls") or []: | |
| fn = tc.get("function", {}) | |
| args = fn.get("arguments") or "{}" | |
| if isinstance(args, str): | |
| try: | |
| args = json.loads(args) | |
| except json.JSONDecodeError: | |
| args = {} | |
| calls.append(ToolCall(name=fn.get("name", ""), arguments=args, | |
| id=tc.get("id", ""))) | |
| return AssistantTurn(text=msg.get("content"), tool_calls=calls) | |
| def get_client(mode: Optional[str] = None) -> LLMClient: | |
| """Return the configured LLM client. Defaults to the deterministic router. | |
| PA_LLM_MODE: | |
| "openai" + PA_LLM_ENDPOINT → vLLM-served reasoning model (the real agent) | |
| "local" → llama.cpp + our GGUF (off-grid) | |
| "router" (default) → deterministic fallback, no model | |
| """ | |
| import os | |
| mode = mode or os.environ.get("PA_LLM_MODE", "router") | |
| if mode == "openai": | |
| endpoint = os.environ.get("PA_LLM_ENDPOINT", "").strip() | |
| if endpoint: | |
| return OpenAIToolClient( | |
| endpoint, | |
| model=os.environ.get("PA_LLM_MODEL", "pa-agent"), | |
| timeout=float(os.environ.get("PA_LLM_TIMEOUT", "180"))) | |
| if mode == "local": | |
| from .. import config | |
| from .llm import LlamaCppClient | |
| from huggingface_hub import hf_hub_download | |
| path = hf_hub_download(config.GGUF_REPO, config.MODEL_GGUF_FILE) | |
| return LlamaCppClient(path) | |
| return RouterClient() | |