guide / src /agent /tools.py
Saravanakumar R
Fix rate-limit storm and split draft from escalation generation
31ce85c
Raw
History Blame Contribute Delete
12 kB
"""
CMA tool definitions — passed to the Anthropic API as the tools= parameter.
Tools:
classify_domain → calls DomainClassifier → returns DomainResult
extract_entities → calls EvidenceNER → returns list[Entity]
process_document → calls DocumentProcessor → returns {raw_text, entities}
draft_complaint → handled internally by Claude (returns {"status": "proceed"})
recommend_action → calls NextActionPredictor → returns list[EscalationAction]
store_memory → calls SessionMemory.set
get_memory → calls SessionMemory.get
Each tool is defined as an Anthropic ToolParam dict (name, description, input_schema).
execute_tool() is the central dispatcher; all exceptions are caught and returned as
{"error": "<message>"} so the agent loop never crashes on tool failure.
"""
from __future__ import annotations
import dataclasses
import logging
from src.agent.memory import SessionMemory
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Tool JSON Schema definitions (Anthropic ToolParam format)
# ---------------------------------------------------------------------------
TOOL_DEFINITIONS: list[dict] = [
{
"name": "classify_domain",
"description": (
"Classify a consumer complaint into one of six domains: "
"ecommerce, telecom, banking, cibil, insurance, or general. "
"MUST be the very first tool called on every new complaint thread. "
"The result includes a 'low_confidence' boolean field. "
"When low_confidence is true (model confidence < 0.50, or keyword "
"fallback was used), do NOT proceed with the suggested domain — "
"instead ask the user one clarifying question to confirm the domain "
"before continuing."
),
"input_schema": {
"type": "object",
"properties": {
"complaint_text": {
"type": "string",
"description": (
"The complaint text to classify. "
"PII is already redacted — pass it as-is."
),
},
},
"required": ["complaint_text"],
},
},
{
"name": "extract_entities",
"description": (
"Extract named evidence entities (ORG, AMOUNT, DATE, REF_ID, ACCOUNT, PERSON) "
"from complaint text using the EvidenceNER model. "
"Call this on the user's initial message right after classify_domain() "
"to pre-fill as many required fields as possible."
),
"input_schema": {
"type": "object",
"properties": {
"text": {
"type": "string",
"description": "The text to extract entities from.",
},
},
"required": ["text"],
},
},
{
"name": "process_document",
"description": (
"Process an uploaded document (PDF or image) through the full pipeline: "
"Tesseract OCR → EvidenceNER → DocumentViT. "
"Returns raw extracted text and a list of evidence entity spans. "
"MUST be called before draft_complaint() when the user has uploaded a document."
),
"input_schema": {
"type": "object",
"properties": {
"file_path": {
"type": "string",
"description": (
"Absolute path to the uploaded file on the server. "
"This path is provided in the [Document uploaded: <path>] "
"prefix that appears in the user's message."
),
},
},
"required": ["file_path"],
},
},
{
"name": "draft_complaint",
"description": (
"Signal that all preconditions for drafting have been met. "
"Returns {\"status\": \"proceed\"} which confirms you may now generate "
"the formal complaint letter as your next text response. "
"ONLY call this after receiving a [USER CONFIRMED] message AND "
"after all six required fields (provider, date, amount, reference ID, "
"prior_contact, desired_resolution) are confirmed."
),
"input_schema": {
"type": "object",
"properties": {
"complaint_context": {
"type": "object",
"description": "All collected and user-confirmed complaint details.",
"properties": {
"domain": {"type": "string"},
"provider": {"type": "string"},
"incident_date": {"type": "string"},
"amount": {"type": "string"},
"reference_id": {"type": "string"},
"prior_contact": {"type": "boolean"},
"desired_resolution": {"type": "string"},
"additional_entities": {"type": "object"},
},
"required": [
"domain", "provider", "incident_date", "desired_resolution"
],
},
},
"required": ["complaint_context"],
},
},
{
"name": "recommend_action",
"description": (
"Get a ranked list of escalation authorities for the complaint domain. "
"Always call this immediately after presenting the draft letter. "
"Returns authorities in recommended order with portal URLs and submission guidance."
),
"input_schema": {
"type": "object",
"properties": {
"domain": {
"type": "string",
"enum": [
"ecommerce", "telecom", "banking",
"cibil", "insurance", "general"
],
"description": "The classified complaint domain.",
},
"entities": {
"type": "object",
"description": (
"Confirmed entity dict, e.g. "
"{\"ORG\": \"HDFC Bank\", \"AMOUNT\": \"₹5000\"}."
),
},
"prior_contact": {
"type": "boolean",
"description": (
"True if the user has already contacted the company. "
"Affects whether company support appears as the first step."
),
},
},
"required": ["domain"],
},
},
{
"name": "store_memory",
"description": (
"Store a key-value pair in the session memory for this complaint. "
"Use to persist domain, extracted entities, prior_contact, draft text, "
"and any other state that must survive across conversation turns."
),
"input_schema": {
"type": "object",
"properties": {
"key": {
"type": "string",
"description": (
"Memory key. Suggested keys: 'domain', 'entities', "
"'document_entities', 'provider', 'incident_date', "
"'amount', 'reference_id', 'prior_contact', "
"'desired_resolution', 'draft'."
),
},
"value": {
"description": "Value to store — any JSON-serialisable type.",
},
},
"required": ["key", "value"],
},
},
{
"name": "get_memory",
"description": (
"Retrieve a previously stored value from session memory. "
"Use instead of asking the user to repeat information."
),
"input_schema": {
"type": "object",
"properties": {
"key": {
"type": "string",
"description": "The memory key to retrieve.",
},
},
"required": ["key"],
},
# cache_control on the last tool caches the entire tools list for 5 min.
"cache_control": {"type": "ephemeral"},
},
]
# ---------------------------------------------------------------------------
# Tool dispatcher
# ---------------------------------------------------------------------------
def execute_tool(name: str, tool_input: dict, memory: SessionMemory) -> dict | list:
"""
Dispatch *name* with *tool_input* and return a JSON-serialisable result.
All exceptions are caught; callers receive {"error": "<message>"} on any failure.
This ensures the agent loop never crashes on a tool error — the agent receives
the error description and can report it to the user or retry.
"""
try:
return _dispatch(name, tool_input, memory)
except Exception as exc:
logger.exception("Tool %r raised %s: %s", name, type(exc).__name__, exc)
return {"error": f"{type(exc).__name__}: {exc}"}
def _dispatch(name: str, tool_input: dict, memory: SessionMemory) -> dict | list:
"""Inner dispatcher — may raise; execute_tool() wraps it in try/except."""
if name == "classify_domain":
from src.classifier.predict import classify
result = classify(tool_input["complaint_text"])
return dataclasses.asdict(result)
if name == "extract_entities":
from src.ner.predict import extract_entities
entities = extract_entities(tool_input["text"])
return [dataclasses.asdict(e) for e in entities]
if name == "process_document":
from src.document_processor.processor import get_processor
result = get_processor().process(tool_input["file_path"])
return {
"raw_text": result["raw_text"],
"entities": [dataclasses.asdict(e) for e in result["entities"]],
}
if name == "draft_complaint":
# draft_complaint is handled internally by Claude (Rule 6).
# The Python side simply confirms that preconditions passed and returns
# a "proceed" signal. Claude generates the actual letter as text in
# the assistant turn that follows.
return {
"status": "proceed",
"complaint_context": tool_input.get("complaint_context", {}),
}
if name == "recommend_action":
from src.next_action.predict import recommend_action
actions = recommend_action(
domain=tool_input["domain"],
entities=tool_input.get("entities") or {},
prior_contact=bool(tool_input.get("prior_contact", False)),
)
return [dataclasses.asdict(a) for a in actions]
if name == "store_memory":
memory.set(tool_input["key"], tool_input["value"])
return {"status": "stored", "key": tool_input["key"]}
if name == "get_memory":
value = memory.get(tool_input["key"])
return {"key": tool_input["key"], "value": value}
return {"error": f"Unknown tool: {name!r}"}
# ---------------------------------------------------------------------------
# Backward-compat helper (used by older stubs that called build_tool_handlers)
# ---------------------------------------------------------------------------
def build_tool_handlers(memory: SessionMemory) -> dict:
"""Return a name→callable mapping, each bound to *memory*."""
def _make(n: str):
return lambda inp: execute_tool(n, inp, memory)
return {t["name"]: _make(t["name"]) for t in TOOL_DEFINITIONS}