Spaces:
Sleeping
Sleeping
| """ | |
| CMA tool definitions — passed to the Anthropic API as the tools= parameter. | |
| Tools: | |
| classify_domain → calls DomainClassifier → returns DomainResult | |
| extract_entities → calls EvidenceNER → returns list[Entity] | |
| process_document → calls DocumentProcessor → returns {raw_text, entities} | |
| draft_complaint → handled internally by Claude (returns {"status": "proceed"}) | |
| recommend_action → calls NextActionPredictor → returns list[EscalationAction] | |
| store_memory → calls SessionMemory.set | |
| get_memory → calls SessionMemory.get | |
| Each tool is defined as an Anthropic ToolParam dict (name, description, input_schema). | |
| execute_tool() is the central dispatcher; all exceptions are caught and returned as | |
| {"error": "<message>"} so the agent loop never crashes on tool failure. | |
| """ | |
| from __future__ import annotations | |
| import dataclasses | |
| import logging | |
| from src.agent.memory import SessionMemory | |
| logger = logging.getLogger(__name__) | |
| # --------------------------------------------------------------------------- | |
| # Tool JSON Schema definitions (Anthropic ToolParam format) | |
| # --------------------------------------------------------------------------- | |
| TOOL_DEFINITIONS: list[dict] = [ | |
| { | |
| "name": "classify_domain", | |
| "description": ( | |
| "Classify a consumer complaint into one of six domains: " | |
| "ecommerce, telecom, banking, cibil, insurance, or general. " | |
| "MUST be the very first tool called on every new complaint thread. " | |
| "The result includes a 'low_confidence' boolean field. " | |
| "When low_confidence is true (model confidence < 0.50, or keyword " | |
| "fallback was used), do NOT proceed with the suggested domain — " | |
| "instead ask the user one clarifying question to confirm the domain " | |
| "before continuing." | |
| ), | |
| "input_schema": { | |
| "type": "object", | |
| "properties": { | |
| "complaint_text": { | |
| "type": "string", | |
| "description": ( | |
| "The complaint text to classify. " | |
| "PII is already redacted — pass it as-is." | |
| ), | |
| }, | |
| }, | |
| "required": ["complaint_text"], | |
| }, | |
| }, | |
| { | |
| "name": "extract_entities", | |
| "description": ( | |
| "Extract named evidence entities (ORG, AMOUNT, DATE, REF_ID, ACCOUNT, PERSON) " | |
| "from complaint text using the EvidenceNER model. " | |
| "Call this on the user's initial message right after classify_domain() " | |
| "to pre-fill as many required fields as possible." | |
| ), | |
| "input_schema": { | |
| "type": "object", | |
| "properties": { | |
| "text": { | |
| "type": "string", | |
| "description": "The text to extract entities from.", | |
| }, | |
| }, | |
| "required": ["text"], | |
| }, | |
| }, | |
| { | |
| "name": "process_document", | |
| "description": ( | |
| "Process an uploaded document (PDF or image) through the full pipeline: " | |
| "Tesseract OCR → EvidenceNER → DocumentViT. " | |
| "Returns raw extracted text and a list of evidence entity spans. " | |
| "MUST be called before draft_complaint() when the user has uploaded a document." | |
| ), | |
| "input_schema": { | |
| "type": "object", | |
| "properties": { | |
| "file_path": { | |
| "type": "string", | |
| "description": ( | |
| "Absolute path to the uploaded file on the server. " | |
| "This path is provided in the [Document uploaded: <path>] " | |
| "prefix that appears in the user's message." | |
| ), | |
| }, | |
| }, | |
| "required": ["file_path"], | |
| }, | |
| }, | |
| { | |
| "name": "draft_complaint", | |
| "description": ( | |
| "Signal that all preconditions for drafting have been met. " | |
| "Returns {\"status\": \"proceed\"} which confirms you may now generate " | |
| "the formal complaint letter as your next text response. " | |
| "ONLY call this after receiving a [USER CONFIRMED] message AND " | |
| "after all six required fields (provider, date, amount, reference ID, " | |
| "prior_contact, desired_resolution) are confirmed." | |
| ), | |
| "input_schema": { | |
| "type": "object", | |
| "properties": { | |
| "complaint_context": { | |
| "type": "object", | |
| "description": "All collected and user-confirmed complaint details.", | |
| "properties": { | |
| "domain": {"type": "string"}, | |
| "provider": {"type": "string"}, | |
| "incident_date": {"type": "string"}, | |
| "amount": {"type": "string"}, | |
| "reference_id": {"type": "string"}, | |
| "prior_contact": {"type": "boolean"}, | |
| "desired_resolution": {"type": "string"}, | |
| "additional_entities": {"type": "object"}, | |
| }, | |
| "required": [ | |
| "domain", "provider", "incident_date", "desired_resolution" | |
| ], | |
| }, | |
| }, | |
| "required": ["complaint_context"], | |
| }, | |
| }, | |
| { | |
| "name": "recommend_action", | |
| "description": ( | |
| "Get a ranked list of escalation authorities for the complaint domain. " | |
| "Always call this immediately after presenting the draft letter. " | |
| "Returns authorities in recommended order with portal URLs and submission guidance." | |
| ), | |
| "input_schema": { | |
| "type": "object", | |
| "properties": { | |
| "domain": { | |
| "type": "string", | |
| "enum": [ | |
| "ecommerce", "telecom", "banking", | |
| "cibil", "insurance", "general" | |
| ], | |
| "description": "The classified complaint domain.", | |
| }, | |
| "entities": { | |
| "type": "object", | |
| "description": ( | |
| "Confirmed entity dict, e.g. " | |
| "{\"ORG\": \"HDFC Bank\", \"AMOUNT\": \"₹5000\"}." | |
| ), | |
| }, | |
| "prior_contact": { | |
| "type": "boolean", | |
| "description": ( | |
| "True if the user has already contacted the company. " | |
| "Affects whether company support appears as the first step." | |
| ), | |
| }, | |
| }, | |
| "required": ["domain"], | |
| }, | |
| }, | |
| { | |
| "name": "store_memory", | |
| "description": ( | |
| "Store a key-value pair in the session memory for this complaint. " | |
| "Use to persist domain, extracted entities, prior_contact, draft text, " | |
| "and any other state that must survive across conversation turns." | |
| ), | |
| "input_schema": { | |
| "type": "object", | |
| "properties": { | |
| "key": { | |
| "type": "string", | |
| "description": ( | |
| "Memory key. Suggested keys: 'domain', 'entities', " | |
| "'document_entities', 'provider', 'incident_date', " | |
| "'amount', 'reference_id', 'prior_contact', " | |
| "'desired_resolution', 'draft'." | |
| ), | |
| }, | |
| "value": { | |
| "description": "Value to store — any JSON-serialisable type.", | |
| }, | |
| }, | |
| "required": ["key", "value"], | |
| }, | |
| }, | |
| { | |
| "name": "get_memory", | |
| "description": ( | |
| "Retrieve a previously stored value from session memory. " | |
| "Use instead of asking the user to repeat information." | |
| ), | |
| "input_schema": { | |
| "type": "object", | |
| "properties": { | |
| "key": { | |
| "type": "string", | |
| "description": "The memory key to retrieve.", | |
| }, | |
| }, | |
| "required": ["key"], | |
| }, | |
| # cache_control on the last tool caches the entire tools list for 5 min. | |
| "cache_control": {"type": "ephemeral"}, | |
| }, | |
| ] | |
| # --------------------------------------------------------------------------- | |
| # Tool dispatcher | |
| # --------------------------------------------------------------------------- | |
| def execute_tool(name: str, tool_input: dict, memory: SessionMemory) -> dict | list: | |
| """ | |
| Dispatch *name* with *tool_input* and return a JSON-serialisable result. | |
| All exceptions are caught; callers receive {"error": "<message>"} on any failure. | |
| This ensures the agent loop never crashes on a tool error — the agent receives | |
| the error description and can report it to the user or retry. | |
| """ | |
| try: | |
| return _dispatch(name, tool_input, memory) | |
| except Exception as exc: | |
| logger.exception("Tool %r raised %s: %s", name, type(exc).__name__, exc) | |
| return {"error": f"{type(exc).__name__}: {exc}"} | |
| def _dispatch(name: str, tool_input: dict, memory: SessionMemory) -> dict | list: | |
| """Inner dispatcher — may raise; execute_tool() wraps it in try/except.""" | |
| if name == "classify_domain": | |
| from src.classifier.predict import classify | |
| result = classify(tool_input["complaint_text"]) | |
| return dataclasses.asdict(result) | |
| if name == "extract_entities": | |
| from src.ner.predict import extract_entities | |
| entities = extract_entities(tool_input["text"]) | |
| return [dataclasses.asdict(e) for e in entities] | |
| if name == "process_document": | |
| from src.document_processor.processor import get_processor | |
| result = get_processor().process(tool_input["file_path"]) | |
| return { | |
| "raw_text": result["raw_text"], | |
| "entities": [dataclasses.asdict(e) for e in result["entities"]], | |
| } | |
| if name == "draft_complaint": | |
| # draft_complaint is handled internally by Claude (Rule 6). | |
| # The Python side simply confirms that preconditions passed and returns | |
| # a "proceed" signal. Claude generates the actual letter as text in | |
| # the assistant turn that follows. | |
| return { | |
| "status": "proceed", | |
| "complaint_context": tool_input.get("complaint_context", {}), | |
| } | |
| if name == "recommend_action": | |
| from src.next_action.predict import recommend_action | |
| actions = recommend_action( | |
| domain=tool_input["domain"], | |
| entities=tool_input.get("entities") or {}, | |
| prior_contact=bool(tool_input.get("prior_contact", False)), | |
| ) | |
| return [dataclasses.asdict(a) for a in actions] | |
| if name == "store_memory": | |
| memory.set(tool_input["key"], tool_input["value"]) | |
| return {"status": "stored", "key": tool_input["key"]} | |
| if name == "get_memory": | |
| value = memory.get(tool_input["key"]) | |
| return {"key": tool_input["key"], "value": value} | |
| return {"error": f"Unknown tool: {name!r}"} | |
| # --------------------------------------------------------------------------- | |
| # Backward-compat helper (used by older stubs that called build_tool_handlers) | |
| # --------------------------------------------------------------------------- | |
| def build_tool_handlers(memory: SessionMemory) -> dict: | |
| """Return a name→callable mapping, each bound to *memory*.""" | |
| def _make(n: str): | |
| return lambda inp: execute_tool(n, inp, memory) | |
| return {t["name"]: _make(t["name"]) for t in TOOL_DEFINITIONS} | |