Notiflow / agent /extraction_agent.py
Dipan04's picture
sab kam khatam guys heehe
4b7573c
"""
extraction_agent.py
-------------------
Stage 3: Field Extraction Agent for Notiflow
Uses ModelRouter (Nova primary β†’ Gemini fallback) to extract structured
business fields from Hinglish messages, given a pre-classified intent.
message β†’ Intent Agent β†’ intent β†’ Extraction Agent β†’ structured fields
Integration note (backend upgrade):
The private _call_model() function now delegates to agent/model_router.py.
All schema enforcement, parsing, and public API logic is unchanged.
"""
import json
import logging
import re
from pathlib import Path
PROMPT_PATH = Path(__file__).parent.parent / "prompts" / "extraction_prompt.txt"
INTENT_SCHEMA: dict[str, list[str]] = {
"order": ["intent", "customer", "item", "quantity"],
"payment": ["intent", "customer", "amount", "payment_type"],
"credit": ["intent", "customer", "item", "quantity", "amount"],
"return": ["intent", "customer", "item", "reason"],
"preparation": ["intent", "item", "quantity"],
"other": ["intent", "note"],
}
VALID_INTENTS = set(INTENT_SCHEMA.keys())
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Prompt loader
# ---------------------------------------------------------------------------
def _load_prompt(message: str, intent: str) -> str:
template = PROMPT_PATH.read_text(encoding="utf-8")
prompt = template.replace("{message}", message.strip())
prompt = prompt.replace("{intent}", intent.strip().lower())
return prompt
# ---------------------------------------------------------------------------
# Model inference (now via ModelRouter β€” Nova primary, Gemini fallback)
# ---------------------------------------------------------------------------
def _call_model(prompt: str) -> str:
"""
Route the prompt through ModelRouter.
Returns raw text response from whichever model was available.
"""
from agent.model_router import route
raw, model_used = route(prompt, max_tokens=256)
logger.info("Extraction inference served by: %s", model_used)
return raw
# ---------------------------------------------------------------------------
# Response parser & normaliser
# ---------------------------------------------------------------------------
def _parse_extraction_response(raw: str, intent: str) -> dict:
cleaned = re.sub(r"```(?:json)?|```", "", raw).strip()
try:
parsed = json.loads(cleaned)
except json.JSONDecodeError:
match = re.search(r"\{.*\}", cleaned, re.DOTALL)
if match:
try:
parsed = json.loads(match.group(0))
except json.JSONDecodeError:
logger.warning("Could not parse model response as JSON; returning nulls")
parsed = {}
else:
parsed = {}
schema_fields = INTENT_SCHEMA.get(intent, INTENT_SCHEMA["other"])
result = {field: parsed.get(field, None) for field in schema_fields}
result["intent"] = intent
if "customer" in result and isinstance(result["customer"], str):
result["customer"] = result["customer"].strip().title()
for num_field in ("amount", "quantity"):
if num_field in result and result[num_field] is not None:
try:
val = float(result[num_field])
result[num_field] = int(val) if val.is_integer() else val
except (ValueError, TypeError):
result[num_field] = None
return result
# ---------------------------------------------------------------------------
# Public API
# ---------------------------------------------------------------------------
def extract_fields(message: str, intent: str) -> dict:
"""
Extract structured business fields from a Hinglish message.
Args:
message: Raw business message (Hinglish or English).
intent: Intent string from the Intent Agent.
Returns:
Dict with "intent" + extracted fields. Missing fields are null.
Raises:
ValueError: Unsupported intent.
RuntimeError: Both Nova and Gemini unavailable.
Examples:
>>> extract_fields("rahul ne 15000 bheja", "payment")
{'intent': 'payment', 'customer': 'Rahul', 'amount': 15000, 'payment_type': None}
"""
if not message or not message.strip():
return _null_result(intent)
intent = intent.lower().strip()
if intent not in VALID_INTENTS:
raise ValueError(f"Unsupported intent: '{intent}'.")
logger.info("Extracting | intent=%s | message=%r", intent, message)
prompt = _load_prompt(message, intent)
raw = _call_model(prompt)
result = _parse_extraction_response(raw, intent)
logger.info("Extracted: %s", result)
return result
def _null_result(intent: str) -> dict:
intent = intent.lower().strip() if intent in VALID_INTENTS else "other"
schema_fields = INTENT_SCHEMA.get(intent, INTENT_SCHEMA["other"])
result = {field: None for field in schema_fields}
result["intent"] = intent
return result