| """ |
| extraction_agent.py |
| ------------------- |
| Stage 3: Field Extraction Agent for Notiflow |
| |
| Uses ModelRouter (Nova primary β Gemini fallback) to extract structured |
| business fields from Hinglish messages, given a pre-classified intent. |
| |
| message β Intent Agent β intent β Extraction Agent β structured fields |
| |
| Integration note (backend upgrade): |
| The private _call_model() function now delegates to agent/model_router.py. |
| All schema enforcement, parsing, and public API logic is unchanged. |
| """ |
|
|
| import json |
| import logging |
| import re |
| from pathlib import Path |
|
|
| PROMPT_PATH = Path(__file__).parent.parent / "prompts" / "extraction_prompt.txt" |
|
|
| INTENT_SCHEMA: dict[str, list[str]] = { |
| "order": ["intent", "customer", "item", "quantity"], |
| "payment": ["intent", "customer", "amount", "payment_type"], |
| "credit": ["intent", "customer", "item", "quantity", "amount"], |
| "return": ["intent", "customer", "item", "reason"], |
| "preparation": ["intent", "item", "quantity"], |
| "other": ["intent", "note"], |
| } |
| VALID_INTENTS = set(INTENT_SCHEMA.keys()) |
|
|
| logger = logging.getLogger(__name__) |
|
|
|
|
| |
| |
| |
|
|
| def _load_prompt(message: str, intent: str) -> str: |
| template = PROMPT_PATH.read_text(encoding="utf-8") |
| prompt = template.replace("{message}", message.strip()) |
| prompt = prompt.replace("{intent}", intent.strip().lower()) |
| return prompt |
|
|
|
|
| |
| |
| |
|
|
| def _call_model(prompt: str) -> str: |
| """ |
| Route the prompt through ModelRouter. |
| Returns raw text response from whichever model was available. |
| """ |
| from agent.model_router import route |
| raw, model_used = route(prompt, max_tokens=256) |
| logger.info("Extraction inference served by: %s", model_used) |
| return raw |
|
|
|
|
| |
| |
| |
|
|
| def _parse_extraction_response(raw: str, intent: str) -> dict: |
| cleaned = re.sub(r"```(?:json)?|```", "", raw).strip() |
|
|
| try: |
| parsed = json.loads(cleaned) |
| except json.JSONDecodeError: |
| match = re.search(r"\{.*\}", cleaned, re.DOTALL) |
| if match: |
| try: |
| parsed = json.loads(match.group(0)) |
| except json.JSONDecodeError: |
| logger.warning("Could not parse model response as JSON; returning nulls") |
| parsed = {} |
| else: |
| parsed = {} |
|
|
| schema_fields = INTENT_SCHEMA.get(intent, INTENT_SCHEMA["other"]) |
| result = {field: parsed.get(field, None) for field in schema_fields} |
| result["intent"] = intent |
|
|
| if "customer" in result and isinstance(result["customer"], str): |
| result["customer"] = result["customer"].strip().title() |
|
|
| for num_field in ("amount", "quantity"): |
| if num_field in result and result[num_field] is not None: |
| try: |
| val = float(result[num_field]) |
| result[num_field] = int(val) if val.is_integer() else val |
| except (ValueError, TypeError): |
| result[num_field] = None |
|
|
| return result |
|
|
|
|
| |
| |
| |
|
|
| def extract_fields(message: str, intent: str) -> dict: |
| """ |
| Extract structured business fields from a Hinglish message. |
| |
| Args: |
| message: Raw business message (Hinglish or English). |
| intent: Intent string from the Intent Agent. |
| |
| Returns: |
| Dict with "intent" + extracted fields. Missing fields are null. |
| |
| Raises: |
| ValueError: Unsupported intent. |
| RuntimeError: Both Nova and Gemini unavailable. |
| |
| Examples: |
| >>> extract_fields("rahul ne 15000 bheja", "payment") |
| {'intent': 'payment', 'customer': 'Rahul', 'amount': 15000, 'payment_type': None} |
| """ |
| if not message or not message.strip(): |
| return _null_result(intent) |
|
|
| intent = intent.lower().strip() |
| if intent not in VALID_INTENTS: |
| raise ValueError(f"Unsupported intent: '{intent}'.") |
|
|
| logger.info("Extracting | intent=%s | message=%r", intent, message) |
| prompt = _load_prompt(message, intent) |
| raw = _call_model(prompt) |
| result = _parse_extraction_response(raw, intent) |
| logger.info("Extracted: %s", result) |
| return result |
|
|
|
|
| def _null_result(intent: str) -> dict: |
| intent = intent.lower().strip() if intent in VALID_INTENTS else "other" |
| schema_fields = INTENT_SCHEMA.get(intent, INTENT_SCHEMA["other"]) |
| result = {field: None for field in schema_fields} |
| result["intent"] = intent |
| return result |