""" CMA tool definitions — passed to the Anthropic API as the tools= parameter. Tools: classify_domain → calls DomainClassifier → returns DomainResult extract_entities → calls EvidenceNER → returns list[Entity] process_document → calls DocumentProcessor → returns {raw_text, entities} draft_complaint → handled internally by Claude (returns {"status": "proceed"}) recommend_action → calls NextActionPredictor → returns list[EscalationAction] store_memory → calls SessionMemory.set get_memory → calls SessionMemory.get Each tool is defined as an Anthropic ToolParam dict (name, description, input_schema). execute_tool() is the central dispatcher; all exceptions are caught and returned as {"error": ""} so the agent loop never crashes on tool failure. """ from __future__ import annotations import dataclasses import logging from src.agent.memory import SessionMemory logger = logging.getLogger(__name__) # --------------------------------------------------------------------------- # Tool JSON Schema definitions (Anthropic ToolParam format) # --------------------------------------------------------------------------- TOOL_DEFINITIONS: list[dict] = [ { "name": "classify_domain", "description": ( "Classify a consumer complaint into one of six domains: " "ecommerce, telecom, banking, cibil, insurance, or general. " "MUST be the very first tool called on every new complaint thread. " "The result includes a 'low_confidence' boolean field. " "When low_confidence is true (model confidence < 0.50, or keyword " "fallback was used), do NOT proceed with the suggested domain — " "instead ask the user one clarifying question to confirm the domain " "before continuing." ), "input_schema": { "type": "object", "properties": { "complaint_text": { "type": "string", "description": ( "The complaint text to classify. " "PII is already redacted — pass it as-is." ), }, }, "required": ["complaint_text"], }, }, { "name": "extract_entities", "description": ( "Extract named evidence entities (ORG, AMOUNT, DATE, REF_ID, ACCOUNT, PERSON) " "from complaint text using the EvidenceNER model. " "Call this on the user's initial message right after classify_domain() " "to pre-fill as many required fields as possible." ), "input_schema": { "type": "object", "properties": { "text": { "type": "string", "description": "The text to extract entities from.", }, }, "required": ["text"], }, }, { "name": "process_document", "description": ( "Process an uploaded document (PDF or image) through the full pipeline: " "Tesseract OCR → EvidenceNER → DocumentViT. " "Returns raw extracted text and a list of evidence entity spans. " "MUST be called before draft_complaint() when the user has uploaded a document." ), "input_schema": { "type": "object", "properties": { "file_path": { "type": "string", "description": ( "Absolute path to the uploaded file on the server. " "This path is provided in the [Document uploaded: ] " "prefix that appears in the user's message." ), }, }, "required": ["file_path"], }, }, { "name": "draft_complaint", "description": ( "Signal that all preconditions for drafting have been met. " "Returns {\"status\": \"proceed\"} which confirms you may now generate " "the formal complaint letter as your next text response. " "ONLY call this after receiving a [USER CONFIRMED] message AND " "after all six required fields (provider, date, amount, reference ID, " "prior_contact, desired_resolution) are confirmed." ), "input_schema": { "type": "object", "properties": { "complaint_context": { "type": "object", "description": "All collected and user-confirmed complaint details.", "properties": { "domain": {"type": "string"}, "provider": {"type": "string"}, "incident_date": {"type": "string"}, "amount": {"type": "string"}, "reference_id": {"type": "string"}, "prior_contact": {"type": "boolean"}, "desired_resolution": {"type": "string"}, "additional_entities": {"type": "object"}, }, "required": [ "domain", "provider", "incident_date", "desired_resolution" ], }, }, "required": ["complaint_context"], }, }, { "name": "recommend_action", "description": ( "Get a ranked list of escalation authorities for the complaint domain. " "Always call this immediately after presenting the draft letter. " "Returns authorities in recommended order with portal URLs and submission guidance." ), "input_schema": { "type": "object", "properties": { "domain": { "type": "string", "enum": [ "ecommerce", "telecom", "banking", "cibil", "insurance", "general" ], "description": "The classified complaint domain.", }, "entities": { "type": "object", "description": ( "Confirmed entity dict, e.g. " "{\"ORG\": \"HDFC Bank\", \"AMOUNT\": \"₹5000\"}." ), }, "prior_contact": { "type": "boolean", "description": ( "True if the user has already contacted the company. " "Affects whether company support appears as the first step." ), }, }, "required": ["domain"], }, }, { "name": "store_memory", "description": ( "Store a key-value pair in the session memory for this complaint. " "Use to persist domain, extracted entities, prior_contact, draft text, " "and any other state that must survive across conversation turns." ), "input_schema": { "type": "object", "properties": { "key": { "type": "string", "description": ( "Memory key. Suggested keys: 'domain', 'entities', " "'document_entities', 'provider', 'incident_date', " "'amount', 'reference_id', 'prior_contact', " "'desired_resolution', 'draft'." ), }, "value": { "description": "Value to store — any JSON-serialisable type.", }, }, "required": ["key", "value"], }, }, { "name": "get_memory", "description": ( "Retrieve a previously stored value from session memory. " "Use instead of asking the user to repeat information." ), "input_schema": { "type": "object", "properties": { "key": { "type": "string", "description": "The memory key to retrieve.", }, }, "required": ["key"], }, # cache_control on the last tool caches the entire tools list for 5 min. "cache_control": {"type": "ephemeral"}, }, ] # --------------------------------------------------------------------------- # Tool dispatcher # --------------------------------------------------------------------------- def execute_tool(name: str, tool_input: dict, memory: SessionMemory) -> dict | list: """ Dispatch *name* with *tool_input* and return a JSON-serialisable result. All exceptions are caught; callers receive {"error": ""} on any failure. This ensures the agent loop never crashes on a tool error — the agent receives the error description and can report it to the user or retry. """ try: return _dispatch(name, tool_input, memory) except Exception as exc: logger.exception("Tool %r raised %s: %s", name, type(exc).__name__, exc) return {"error": f"{type(exc).__name__}: {exc}"} def _dispatch(name: str, tool_input: dict, memory: SessionMemory) -> dict | list: """Inner dispatcher — may raise; execute_tool() wraps it in try/except.""" if name == "classify_domain": from src.classifier.predict import classify result = classify(tool_input["complaint_text"]) return dataclasses.asdict(result) if name == "extract_entities": from src.ner.predict import extract_entities entities = extract_entities(tool_input["text"]) return [dataclasses.asdict(e) for e in entities] if name == "process_document": from src.document_processor.processor import get_processor result = get_processor().process(tool_input["file_path"]) return { "raw_text": result["raw_text"], "entities": [dataclasses.asdict(e) for e in result["entities"]], } if name == "draft_complaint": # draft_complaint is handled internally by Claude (Rule 6). # The Python side simply confirms that preconditions passed and returns # a "proceed" signal. Claude generates the actual letter as text in # the assistant turn that follows. return { "status": "proceed", "complaint_context": tool_input.get("complaint_context", {}), } if name == "recommend_action": from src.next_action.predict import recommend_action actions = recommend_action( domain=tool_input["domain"], entities=tool_input.get("entities") or {}, prior_contact=bool(tool_input.get("prior_contact", False)), ) return [dataclasses.asdict(a) for a in actions] if name == "store_memory": memory.set(tool_input["key"], tool_input["value"]) return {"status": "stored", "key": tool_input["key"]} if name == "get_memory": value = memory.get(tool_input["key"]) return {"key": tool_input["key"], "value": value} return {"error": f"Unknown tool: {name!r}"} # --------------------------------------------------------------------------- # Backward-compat helper (used by older stubs that called build_tool_handlers) # --------------------------------------------------------------------------- def build_tool_handlers(memory: SessionMemory) -> dict: """Return a name→callable mapping, each bound to *memory*.""" def _make(n: str): return lambda inp: execute_tool(n, inp, memory) return {t["name"]: _make(t["name"]) for t in TOOL_DEFINITIONS}