Spaces:
Sleeping
Sleeping
File size: 12,005 Bytes
cbb1b1a dc0c45b cbb1b1a dc0c45b cbb1b1a dc0c45b cbb1b1a dc0c45b cbb1b1a dc0c45b 06254f4 dc0c45b 31ce85c dc0c45b cbb1b1a dc0c45b cbb1b1a dc0c45b | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 | """
CMA tool definitions β passed to the Anthropic API as the tools= parameter.
Tools:
classify_domain β calls DomainClassifier β returns DomainResult
extract_entities β calls EvidenceNER β returns list[Entity]
process_document β calls DocumentProcessor β returns {raw_text, entities}
draft_complaint β handled internally by Claude (returns {"status": "proceed"})
recommend_action β calls NextActionPredictor β returns list[EscalationAction]
store_memory β calls SessionMemory.set
get_memory β calls SessionMemory.get
Each tool is defined as an Anthropic ToolParam dict (name, description, input_schema).
execute_tool() is the central dispatcher; all exceptions are caught and returned as
{"error": "<message>"} so the agent loop never crashes on tool failure.
"""
from __future__ import annotations
import dataclasses
import logging
from src.agent.memory import SessionMemory
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Tool JSON Schema definitions (Anthropic ToolParam format)
# ---------------------------------------------------------------------------
TOOL_DEFINITIONS: list[dict] = [
{
"name": "classify_domain",
"description": (
"Classify a consumer complaint into one of six domains: "
"ecommerce, telecom, banking, cibil, insurance, or general. "
"MUST be the very first tool called on every new complaint thread. "
"The result includes a 'low_confidence' boolean field. "
"When low_confidence is true (model confidence < 0.50, or keyword "
"fallback was used), do NOT proceed with the suggested domain β "
"instead ask the user one clarifying question to confirm the domain "
"before continuing."
),
"input_schema": {
"type": "object",
"properties": {
"complaint_text": {
"type": "string",
"description": (
"The complaint text to classify. "
"PII is already redacted β pass it as-is."
),
},
},
"required": ["complaint_text"],
},
},
{
"name": "extract_entities",
"description": (
"Extract named evidence entities (ORG, AMOUNT, DATE, REF_ID, ACCOUNT, PERSON) "
"from complaint text using the EvidenceNER model. "
"Call this on the user's initial message right after classify_domain() "
"to pre-fill as many required fields as possible."
),
"input_schema": {
"type": "object",
"properties": {
"text": {
"type": "string",
"description": "The text to extract entities from.",
},
},
"required": ["text"],
},
},
{
"name": "process_document",
"description": (
"Process an uploaded document (PDF or image) through the full pipeline: "
"Tesseract OCR β EvidenceNER β DocumentViT. "
"Returns raw extracted text and a list of evidence entity spans. "
"MUST be called before draft_complaint() when the user has uploaded a document."
),
"input_schema": {
"type": "object",
"properties": {
"file_path": {
"type": "string",
"description": (
"Absolute path to the uploaded file on the server. "
"This path is provided in the [Document uploaded: <path>] "
"prefix that appears in the user's message."
),
},
},
"required": ["file_path"],
},
},
{
"name": "draft_complaint",
"description": (
"Signal that all preconditions for drafting have been met. "
"Returns {\"status\": \"proceed\"} which confirms you may now generate "
"the formal complaint letter as your next text response. "
"ONLY call this after receiving a [USER CONFIRMED] message AND "
"after all six required fields (provider, date, amount, reference ID, "
"prior_contact, desired_resolution) are confirmed."
),
"input_schema": {
"type": "object",
"properties": {
"complaint_context": {
"type": "object",
"description": "All collected and user-confirmed complaint details.",
"properties": {
"domain": {"type": "string"},
"provider": {"type": "string"},
"incident_date": {"type": "string"},
"amount": {"type": "string"},
"reference_id": {"type": "string"},
"prior_contact": {"type": "boolean"},
"desired_resolution": {"type": "string"},
"additional_entities": {"type": "object"},
},
"required": [
"domain", "provider", "incident_date", "desired_resolution"
],
},
},
"required": ["complaint_context"],
},
},
{
"name": "recommend_action",
"description": (
"Get a ranked list of escalation authorities for the complaint domain. "
"Always call this immediately after presenting the draft letter. "
"Returns authorities in recommended order with portal URLs and submission guidance."
),
"input_schema": {
"type": "object",
"properties": {
"domain": {
"type": "string",
"enum": [
"ecommerce", "telecom", "banking",
"cibil", "insurance", "general"
],
"description": "The classified complaint domain.",
},
"entities": {
"type": "object",
"description": (
"Confirmed entity dict, e.g. "
"{\"ORG\": \"HDFC Bank\", \"AMOUNT\": \"βΉ5000\"}."
),
},
"prior_contact": {
"type": "boolean",
"description": (
"True if the user has already contacted the company. "
"Affects whether company support appears as the first step."
),
},
},
"required": ["domain"],
},
},
{
"name": "store_memory",
"description": (
"Store a key-value pair in the session memory for this complaint. "
"Use to persist domain, extracted entities, prior_contact, draft text, "
"and any other state that must survive across conversation turns."
),
"input_schema": {
"type": "object",
"properties": {
"key": {
"type": "string",
"description": (
"Memory key. Suggested keys: 'domain', 'entities', "
"'document_entities', 'provider', 'incident_date', "
"'amount', 'reference_id', 'prior_contact', "
"'desired_resolution', 'draft'."
),
},
"value": {
"description": "Value to store β any JSON-serialisable type.",
},
},
"required": ["key", "value"],
},
},
{
"name": "get_memory",
"description": (
"Retrieve a previously stored value from session memory. "
"Use instead of asking the user to repeat information."
),
"input_schema": {
"type": "object",
"properties": {
"key": {
"type": "string",
"description": "The memory key to retrieve.",
},
},
"required": ["key"],
},
# cache_control on the last tool caches the entire tools list for 5 min.
"cache_control": {"type": "ephemeral"},
},
]
# ---------------------------------------------------------------------------
# Tool dispatcher
# ---------------------------------------------------------------------------
def execute_tool(name: str, tool_input: dict, memory: SessionMemory) -> dict | list:
"""
Dispatch *name* with *tool_input* and return a JSON-serialisable result.
All exceptions are caught; callers receive {"error": "<message>"} on any failure.
This ensures the agent loop never crashes on a tool error β the agent receives
the error description and can report it to the user or retry.
"""
try:
return _dispatch(name, tool_input, memory)
except Exception as exc:
logger.exception("Tool %r raised %s: %s", name, type(exc).__name__, exc)
return {"error": f"{type(exc).__name__}: {exc}"}
def _dispatch(name: str, tool_input: dict, memory: SessionMemory) -> dict | list:
"""Inner dispatcher β may raise; execute_tool() wraps it in try/except."""
if name == "classify_domain":
from src.classifier.predict import classify
result = classify(tool_input["complaint_text"])
return dataclasses.asdict(result)
if name == "extract_entities":
from src.ner.predict import extract_entities
entities = extract_entities(tool_input["text"])
return [dataclasses.asdict(e) for e in entities]
if name == "process_document":
from src.document_processor.processor import get_processor
result = get_processor().process(tool_input["file_path"])
return {
"raw_text": result["raw_text"],
"entities": [dataclasses.asdict(e) for e in result["entities"]],
}
if name == "draft_complaint":
# draft_complaint is handled internally by Claude (Rule 6).
# The Python side simply confirms that preconditions passed and returns
# a "proceed" signal. Claude generates the actual letter as text in
# the assistant turn that follows.
return {
"status": "proceed",
"complaint_context": tool_input.get("complaint_context", {}),
}
if name == "recommend_action":
from src.next_action.predict import recommend_action
actions = recommend_action(
domain=tool_input["domain"],
entities=tool_input.get("entities") or {},
prior_contact=bool(tool_input.get("prior_contact", False)),
)
return [dataclasses.asdict(a) for a in actions]
if name == "store_memory":
memory.set(tool_input["key"], tool_input["value"])
return {"status": "stored", "key": tool_input["key"]}
if name == "get_memory":
value = memory.get(tool_input["key"])
return {"key": tool_input["key"], "value": value}
return {"error": f"Unknown tool: {name!r}"}
# ---------------------------------------------------------------------------
# Backward-compat helper (used by older stubs that called build_tool_handlers)
# ---------------------------------------------------------------------------
def build_tool_handlers(memory: SessionMemory) -> dict:
"""Return a nameβcallable mapping, each bound to *memory*."""
def _make(n: str):
return lambda inp: execute_tool(n, inp, memory)
return {t["name"]: _make(t["name"]) for t in TOOL_DEFINITIONS}
|