""" extraction_agent.py ------------------- Stage 3: Field Extraction Agent for Notiflow Uses ModelRouter (Nova primary → Gemini fallback) to extract structured business fields from Hinglish messages, given a pre-classified intent. message → Intent Agent → intent → Extraction Agent → structured fields Integration note (backend upgrade): The private _call_model() function now delegates to agent/model_router.py. All schema enforcement, parsing, and public API logic is unchanged. """ import json import logging import re from pathlib import Path PROMPT_PATH = Path(__file__).parent.parent / "prompts" / "extraction_prompt.txt" INTENT_SCHEMA: dict[str, list[str]] = { "order": ["intent", "customer", "item", "quantity"], "payment": ["intent", "customer", "amount", "payment_type"], "credit": ["intent", "customer", "item", "quantity", "amount"], "return": ["intent", "customer", "item", "reason"], "preparation": ["intent", "item", "quantity"], "other": ["intent", "note"], } VALID_INTENTS = set(INTENT_SCHEMA.keys()) logger = logging.getLogger(__name__) # --------------------------------------------------------------------------- # Prompt loader # --------------------------------------------------------------------------- def _load_prompt(message: str, intent: str) -> str: template = PROMPT_PATH.read_text(encoding="utf-8") prompt = template.replace("{message}", message.strip()) prompt = prompt.replace("{intent}", intent.strip().lower()) return prompt # --------------------------------------------------------------------------- # Model inference (now via ModelRouter — Nova primary, Gemini fallback) # --------------------------------------------------------------------------- def _call_model(prompt: str) -> str: """ Route the prompt through ModelRouter. Returns raw text response from whichever model was available. """ from agent.model_router import route raw, model_used = route(prompt, max_tokens=256) logger.info("Extraction inference served by: %s", model_used) return raw # --------------------------------------------------------------------------- # Response parser & normaliser # --------------------------------------------------------------------------- def _parse_extraction_response(raw: str, intent: str) -> dict: cleaned = re.sub(r"```(?:json)?|```", "", raw).strip() try: parsed = json.loads(cleaned) except json.JSONDecodeError: match = re.search(r"\{.*\}", cleaned, re.DOTALL) if match: try: parsed = json.loads(match.group(0)) except json.JSONDecodeError: logger.warning("Could not parse model response as JSON; returning nulls") parsed = {} else: parsed = {} schema_fields = INTENT_SCHEMA.get(intent, INTENT_SCHEMA["other"]) result = {field: parsed.get(field, None) for field in schema_fields} result["intent"] = intent if "customer" in result and isinstance(result["customer"], str): result["customer"] = result["customer"].strip().title() for num_field in ("amount", "quantity"): if num_field in result and result[num_field] is not None: try: val = float(result[num_field]) result[num_field] = int(val) if val.is_integer() else val except (ValueError, TypeError): result[num_field] = None return result # --------------------------------------------------------------------------- # Public API # --------------------------------------------------------------------------- def extract_fields(message: str, intent: str) -> dict: """ Extract structured business fields from a Hinglish message. Args: message: Raw business message (Hinglish or English). intent: Intent string from the Intent Agent. Returns: Dict with "intent" + extracted fields. Missing fields are null. Raises: ValueError: Unsupported intent. RuntimeError: Both Nova and Gemini unavailable. Examples: >>> extract_fields("rahul ne 15000 bheja", "payment") {'intent': 'payment', 'customer': 'Rahul', 'amount': 15000, 'payment_type': None} """ if not message or not message.strip(): return _null_result(intent) intent = intent.lower().strip() if intent not in VALID_INTENTS: raise ValueError(f"Unsupported intent: '{intent}'.") logger.info("Extracting | intent=%s | message=%r", intent, message) prompt = _load_prompt(message, intent) raw = _call_model(prompt) result = _parse_extraction_response(raw, intent) logger.info("Extracted: %s", result) return result def _null_result(intent: str) -> dict: intent = intent.lower().strip() if intent in VALID_INTENTS else "other" schema_fields = INTENT_SCHEMA.get(intent, INTENT_SCHEMA["other"]) result = {field: None for field in schema_fields} result["intent"] = intent return result