Spaces:
Running
Running
| """ | |
| pipeline/intent_parser.py | |
| Qwen2.5-7B-Instruct (Q4_K_M GGUF) intent & entity extractor. | |
| Responsibilities | |
| ββββββββββββββββ | |
| Takes a raw transcript string from Moonshine ASR and returns a | |
| validated SchedulingIntent object β structured data the evaluator and | |
| DB layer can act on directly. | |
| Why GGUF + llama-cpp-python | |
| ββββββββββββββββββββββββββββ | |
| - Qwen2.5-7B-Instruct in Q4_K_M needs ~4.5 GB total: | |
| 20 layers on RTX 2050 GPU β ~0.8 GB VRAM | |
| remaining ~15 layers on CPU RAM β ~3.7 GB RAM | |
| - llama-cpp-python's grammar feature forces output to be valid JSON | |
| with no post-processing hacks β zero hallucinated keys. | |
| GBNF Grammar | |
| ββββββββββββ | |
| llama.cpp supports GBNF (Generative BNF) grammars that constrain the | |
| token sampling so the model physically cannot produce invalid JSON. | |
| We define a tight grammar matching exactly our SchedulingIntent schema. | |
| Download the GGUF before running (one-time, ~4.5 GB): | |
| huggingface-cli download Qwen/Qwen2.5-7B-Instruct-GGUF \ | |
| --include "qwen2.5-7b-instruct-q4_k_m.gguf" \ | |
| --local-dir ./models | |
| """ | |
| import json | |
| import logging | |
| import re | |
| import threading | |
| import time | |
| from datetime import date, timedelta | |
| from typing import Optional | |
| from pydantic import BaseModel, Field, field_validator | |
| from config import ( | |
| QWEN_GGUF_PATH, | |
| QWEN_N_GPU_LAYERS, | |
| QWEN_N_CTX, | |
| QWEN_MAX_TOKENS, | |
| QWEN_TEMPERATURE, | |
| ) | |
| logger = logging.getLogger(__name__) | |
| # ββ Output schema βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class SchedulingIntent(BaseModel): | |
| """ | |
| Structured output from Qwen2.5. Every field is Optional so partial | |
| information (e.g. caller gave name but no date yet) is represented | |
| cleanly rather than hallucinated. | |
| """ | |
| intent: str = Field( | |
| description=( | |
| "One of: book_meeting | reschedule | cancel | " | |
| "check_availability | provide_info | end_call | unclear" | |
| ) | |
| ) | |
| caller_name: Optional[str] = Field(None, description="Full name of the caller") | |
| preferred_date: Optional[str] = Field(None, description="YYYY-MM-DD or null") | |
| preferred_time: Optional[str] = Field(None, description="HH:MM 24-hour or null") | |
| duration_minutes: Optional[int] = Field(None, description="Meeting length in minutes") | |
| participants: list[str] = Field(default_factory=list) | |
| meeting_type: Optional[str] = Field(None, description="phone | video | in_person") | |
| notes: Optional[str] = Field(None, description="Any extra context") | |
| confidence: float = Field(0.0, description="0.0β1.0 extraction confidence") | |
| missing_fields: list[str] = Field( | |
| default_factory=list, | |
| description="Fields still needed to complete booking" | |
| ) | |
| def validate_intent(cls, v: str) -> str: | |
| valid = { | |
| "book_meeting", "reschedule", "cancel", | |
| "check_availability", "provide_info", "end_call", "unclear" | |
| } | |
| return v if v in valid else "unclear" | |
| def validate_date(cls, v: Optional[str]) -> Optional[str]: | |
| if v is None: | |
| return None | |
| if re.match(r"^\d{4}-\d{2}-\d{2}$", v): | |
| return v | |
| return None # reject malformed dates | |
| def validate_time(cls, v: Optional[str]) -> Optional[str]: | |
| if v is None: | |
| return None | |
| if re.match(r"^\d{2}:\d{2}$", v): | |
| return v | |
| return None | |
| def clamp_confidence(cls, v: float) -> float: | |
| return max(0.0, min(1.0, float(v))) | |
| def compute_missing(self) -> "SchedulingIntent": | |
| """ | |
| Populate missing_fields based on what a booking actually needs. | |
| Returns self for chaining. | |
| """ | |
| needed = [] | |
| if self.intent == "book_meeting": | |
| if not self.caller_name: needed.append("caller_name") | |
| if not self.preferred_date: needed.append("preferred_date") | |
| if not self.preferred_time: needed.append("preferred_time") | |
| self.missing_fields = needed | |
| return self | |
| # ββ GBNF grammar ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| _SCHEDULING_GRAMMAR = r""" | |
| root ::= ws "{" ws "\"intent\"" ws ":" ws intent-val ws "," ws "\"caller_name\"" ws ":" ws str-or-null ws "," ws "\"preferred_date\"" ws ":" ws str-or-null ws "," ws "\"preferred_time\"" ws ":" ws str-or-null ws "," ws "\"duration_minutes\"" ws ":" ws int-or-null ws "," ws "\"participants\"" ws ":" ws str-array ws "," ws "\"meeting_type\"" ws ":" ws meeting-type-val ws "," ws "\"notes\"" ws ":" ws str-or-null ws "," ws "\"confidence\"" ws ":" ws confidence-val ws "," ws "\"missing_fields\"" ws ":" ws str-array ws "}" ws | |
| intent-val ::= "\"book_meeting\"" | "\"reschedule\"" | "\"cancel\"" | "\"check_availability\"" | "\"provide_info\"" | "\"end_call\"" | "\"unclear\"" | |
| meeting-type-val ::= "\"phone\"" | "\"video\"" | "\"in_person\"" | "null" | |
| str-or-null ::= string | "null" | |
| int-or-null ::= integer | "null" | |
| confidence-val ::= ("0" | "1") | ("0" "." [0-9]+) | ("1" "." [0]* ) | |
| str-array ::= "[" ws "]" | "[" ws string (ws "," ws string)* ws "]" | |
| string ::= "\"" ([^"\\] | "\\\\" ["\\/bfnrt] | "\\u" [0-9a-fA-F] [0-9a-fA-F] [0-9a-fA-F] [0-9a-fA-F])* "\"" | |
| integer ::= ("-"?) [0-9]+ | |
| number ::= integer ("." [0-9]+)? | |
| bool ::= "true" | "false" | |
| null ::= "null" | |
| array ::= "[" ws (number (ws "," ws number)*)? ws "]" | |
| ws ::= [ \t\n\r]* | |
| """ | |
| # ββ System prompt ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| _SYSTEM_PROMPT = """You are a scheduling assistant that extracts structured information from call transcripts. | |
| Extract ONLY what the caller explicitly said. Never invent information. | |
| Return a single JSON object with these exact fields: | |
| - intent: what the caller wants (book_meeting/reschedule/cancel/check_availability/provide_info/end_call/unclear) | |
| - caller_name: their name if given, else null | |
| - preferred_date: YYYY-MM-DD if mentioned, else null. Resolve relative dates (e.g. "tomorrow", "next Monday") to absolute dates based on today being {today}. | |
| - preferred_time: HH:MM 24-hour if mentioned, else null. Convert "3pm" β "15:00", "9:30 in the morning" β "09:30". | |
| - duration_minutes: integer if mentioned, else null. Default 30 if they say "quick meeting". | |
| - participants: list of names mentioned besides the caller | |
| - meeting_type: "phone", "video", "in_person", or null | |
| - notes: any other relevant context, else null | |
| - confidence: 0.0β1.0 reflecting how complete the information is | |
| - missing_fields: list of field names still needed for a complete booking | |
| """ | |
| _USER_PROMPT_TEMPLATE = """Transcript: | |
| \"\"\"{transcript}\"\"\" | |
| JSON:""" | |
| # ββ Parser class ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class IntentParser: | |
| """ | |
| Lazy-loading Qwen2.5-7B-Instruct GGUF intent extractor. | |
| Thread-safe singleton pattern β one model instance for the whole app. | |
| """ | |
| def __init__(self): | |
| self._llm = None | |
| self._grammar = None | |
| self._lock = threading.Lock() | |
| self._loaded = False | |
| # ββ Public ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def parse(self, transcript: str) -> SchedulingIntent: | |
| """ | |
| Extract scheduling intent from a transcript string. | |
| Parameters | |
| ---------- | |
| transcript : str | |
| Raw text from Moonshine ASR (one or more utterances joined). | |
| Returns | |
| ------- | |
| SchedulingIntent | |
| Validated Pydantic model. Falls back to intent="unclear" on | |
| any parse failure so the pipeline never crashes. | |
| """ | |
| if not transcript or not transcript.strip(): | |
| return self._fallback("Empty transcript") | |
| self._ensure_loaded() | |
| if self._llm is None: | |
| return self._heuristic_parse(transcript) | |
| prompt = self._build_prompt(transcript) | |
| try: | |
| t0 = time.perf_counter() | |
| response = self._llm( | |
| prompt, | |
| max_tokens = QWEN_MAX_TOKENS, | |
| temperature = QWEN_TEMPERATURE, | |
| stop = ["\n}\n", "```"], # belt-and-suspenders stop | |
| grammar = self._grammar, | |
| echo = False, | |
| ) | |
| elapsed = time.perf_counter() - t0 | |
| raw_text = response["choices"][0]["text"].strip() | |
| logger.info(f"Qwen inference in {elapsed:.2f}s β raw: {raw_text[:120]}β¦") | |
| return self._parse_response(raw_text, transcript) | |
| except Exception as exc: | |
| logger.error(f"IntentParser.parse failed: {exc}", exc_info=True) | |
| return self._fallback(str(exc)) | |
| def parse_accumulated(self, utterances: list[str]) -> SchedulingIntent: | |
| """ | |
| Parse the full conversation so far (list of utterance strings). | |
| Joins them with newlines and runs a single inference pass. | |
| Use this after each new utterance to get an updated intent state. | |
| """ | |
| full_transcript = "\n".join(u for u in utterances if u.strip()) | |
| return self.parse(full_transcript) | |
| def unload(self): | |
| """Free memory β model reloads lazily on next call.""" | |
| with self._lock: | |
| if self._loaded: | |
| del self._llm | |
| self._llm = None | |
| self._loaded = False | |
| logger.info("IntentParser unloaded.") | |
| def is_loaded(self) -> bool: | |
| return self._loaded | |
| # ββ Internal ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _ensure_loaded(self): | |
| if self._loaded: | |
| return | |
| with self._lock: | |
| if self._loaded: | |
| return | |
| self._load() | |
| def _load(self): | |
| try: | |
| from llama_cpp import Llama, LlamaGrammar | |
| except ImportError: | |
| logger.warning( | |
| "llama-cpp-python is unavailable; falling back to heuristic intent parsing." | |
| ) | |
| self._loaded = True | |
| self._llm = None | |
| return | |
| if not QWEN_GGUF_PATH.exists(): | |
| logger.warning( | |
| f"GGUF not found at {QWEN_GGUF_PATH}; falling back to heuristic intent parsing." | |
| ) | |
| self._loaded = True | |
| self._llm = None | |
| return | |
| logger.info( | |
| f"Loading Qwen2.5-7B Q4_K_M β " | |
| f"{QWEN_N_GPU_LAYERS} layers on GPU, rest on CPUβ¦" | |
| ) | |
| t0 = time.perf_counter() | |
| self._llm = Llama( | |
| model_path = str(QWEN_GGUF_PATH), | |
| n_gpu_layers = QWEN_N_GPU_LAYERS, # 20 β ~0.8 GB VRAM | |
| n_ctx = QWEN_N_CTX, # 4096 tokens | |
| n_threads = 6, # leave 2 cores for Gradio | |
| n_batch = 512, | |
| verbose = False, | |
| ) | |
| self._grammar = LlamaGrammar.from_string(_SCHEDULING_GRAMMAR) | |
| elapsed = time.perf_counter() - t0 | |
| logger.info(f"Qwen2.5 ready in {elapsed:.1f}s") | |
| self._loaded = True | |
| def _build_prompt(self, transcript: str) -> str: | |
| """ | |
| Qwen2.5-Instruct uses ChatML format: | |
| <|im_start|>system\nβ¦<|im_end|>\n | |
| <|im_start|>user\nβ¦<|im_end|>\n | |
| <|im_start|>assistant\n | |
| The grammar then forces the assistant turn to be valid JSON. | |
| """ | |
| from datetime import date | |
| today = date.today().strftime("%Y-%m-%d") | |
| system = _SYSTEM_PROMPT.format(today=today) | |
| user = _USER_PROMPT_TEMPLATE.format(transcript=transcript.strip()) | |
| return ( | |
| f"<|im_start|>system\n{system}<|im_end|>\n" | |
| f"<|im_start|>user\n{user}<|im_end|>\n" | |
| f"<|im_start|>assistant\n" | |
| ) | |
| def _parse_response(self, raw: str, original_transcript: str) -> SchedulingIntent: | |
| """ | |
| Parse and validate Qwen's JSON output. | |
| The grammar guarantees structural validity; Pydantic validates values. | |
| """ | |
| # Strip any accidental markdown fences | |
| raw = raw.strip().lstrip("```json").lstrip("```").rstrip("```").strip() | |
| try: | |
| data = json.loads(raw) | |
| intent = SchedulingIntent(**data).compute_missing() | |
| logger.info( | |
| f"Parsed intent={intent.intent} " | |
| f"name={intent.caller_name} " | |
| f"date={intent.preferred_date} " | |
| f"time={intent.preferred_time} " | |
| f"confidence={intent.confidence:.2f} " | |
| f"missing={intent.missing_fields}" | |
| ) | |
| return intent | |
| except Exception as exc: | |
| logger.warning(f"JSON parse/validation failed: {exc} β raw was: {raw[:200]}") | |
| return self._fallback(str(exc)) | |
| def _fallback(reason: str) -> SchedulingIntent: | |
| logger.warning(f"Returning fallback intent β reason: {reason}") | |
| return SchedulingIntent( | |
| intent = "unclear", | |
| confidence = 0.0, | |
| ).compute_missing() | |
| def _heuristic_parse(self, transcript: str) -> SchedulingIntent: | |
| text = transcript.strip() | |
| if not text: | |
| return self._fallback("Empty transcript") | |
| lower = text.lower() | |
| intent = self._infer_intent(lower) | |
| caller_name = self._extract_name(text) | |
| preferred_date = self._extract_date(lower) | |
| preferred_time = self._extract_time(lower) | |
| duration_minutes = self._extract_duration(lower) | |
| meeting_type = self._extract_meeting_type(lower) | |
| notes = text if any((caller_name, preferred_date, preferred_time, duration_minutes, meeting_type)) else None | |
| confidence = 0.65 if intent != "unclear" else 0.20 | |
| if preferred_date or preferred_time: | |
| confidence = max(confidence, 0.45) | |
| return SchedulingIntent( | |
| intent = intent, | |
| caller_name = caller_name, | |
| preferred_date = preferred_date, | |
| preferred_time = preferred_time, | |
| duration_minutes = duration_minutes, | |
| participants = [], | |
| meeting_type = meeting_type, | |
| notes = notes, | |
| confidence = confidence, | |
| ).compute_missing() | |
| def _infer_intent(self, lower: str) -> str: | |
| if any(word in lower for word in ["thank you", "thanks", "goodbye", "bye"]): | |
| return "end_call" | |
| if any(word in lower for word in ["cancel", "drop", "call off"]): | |
| return "cancel" | |
| if any(word in lower for word in ["reschedule", "move", "change", "shift"]): | |
| return "reschedule" | |
| if any(word in lower for word in ["available", "availability", "free slot", "when can", "when is"]): | |
| return "check_availability" | |
| if any(word in lower for word in ["information", "info", "details", "tell me about"]): | |
| return "provide_info" | |
| if any(word in lower for word in ["book", "schedule", "set up", "arrange", "make an appointment", "confirm"]): | |
| return "book_meeting" | |
| return "unclear" | |
| def _extract_name(self, text: str) -> Optional[str]: | |
| match = re.search( | |
| r"\b(?:my name is|this is|i am|i'm|im|it's|its)\s+([A-Za-z]+(?:\s+[A-Za-z]+){0,2})", | |
| text, | |
| flags=re.I, | |
| ) | |
| if match: | |
| return match.group(1).strip().title() | |
| return None | |
| def _extract_date(self, lower: str) -> Optional[str]: | |
| today = date.today() | |
| match = re.search(r"\b(\d{4}-\d{2}-\d{2})\b", lower) | |
| if match: | |
| return match.group(1) | |
| match = re.search(r"\b(\d{1,2})[/-](\d{1,2})(?:[/-](\d{2,4}))?\b", lower) | |
| if match: | |
| month = int(match.group(1)) | |
| day = int(match.group(2)) | |
| year = int(match.group(3)) if match.group(3) else today.year | |
| if year < 100: | |
| year += 2000 | |
| try: | |
| return date(year, month, day).strftime("%Y-%m-%d") | |
| except ValueError: | |
| pass | |
| if "tomorrow" in lower: | |
| return (today + timedelta(days=1)).strftime("%Y-%m-%d") | |
| if "today" in lower: | |
| return today.strftime("%Y-%m-%d") | |
| weekdays = { | |
| "monday": 0, | |
| "tuesday": 1, | |
| "wednesday": 2, | |
| "thursday": 3, | |
| "friday": 4, | |
| "saturday": 5, | |
| "sunday": 6, | |
| } | |
| for name, idx in weekdays.items(): | |
| if f"next {name}" in lower: | |
| return self._next_weekday(today, idx, next_week=True) | |
| if name in lower: | |
| return self._next_weekday(today, idx, next_week=False) | |
| return None | |
| def _next_weekday(self, today: date, weekday: int, next_week: bool = False) -> str: | |
| days_ahead = (weekday - today.weekday() + 7) % 7 | |
| if days_ahead == 0 and not next_week: | |
| days_ahead = 0 | |
| elif days_ahead == 0: | |
| days_ahead = 7 | |
| elif next_week: | |
| days_ahead += 7 | |
| return (today + timedelta(days=days_ahead)).strftime("%Y-%m-%d") | |
| def _extract_time(self, lower: str) -> Optional[str]: | |
| if "noon" in lower: | |
| return "12:00" | |
| if "midnight" in lower: | |
| return "00:00" | |
| match = re.search(r"\b([01]?\d|2[0-3]):([0-5]\d)\b", lower) | |
| if match: | |
| hour = int(match.group(1)) | |
| minute = int(match.group(2)) | |
| return f"{hour:02d}:{minute:02d}" | |
| match = re.search(r"\b([1-9]|1[0-2])(?::([0-5]\d))?\s*(am|pm)\b", lower) | |
| if match: | |
| hour = int(match.group(1)) | |
| minute = int(match.group(2)) if match.group(2) else 0 | |
| if match.group(3) == "pm" and hour != 12: | |
| hour += 12 | |
| if match.group(3) == "am" and hour == 12: | |
| hour = 0 | |
| return f"{hour:02d}:{minute:02d}" | |
| return None | |
| def _extract_duration(self, lower: str) -> Optional[int]: | |
| match = re.search(r"\b(\d+)\s*(minutes|minute|mins|min)\b", lower) | |
| if match: | |
| return int(match.group(1)) | |
| match = re.search(r"\b(\d+)\s*(hours|hour|hrs|hr)\b", lower) | |
| if match: | |
| return int(match.group(1)) * 60 | |
| if "quick meeting" in lower or "short meeting" in lower: | |
| return 30 | |
| return None | |
| def _extract_meeting_type(self, lower: str) -> Optional[str]: | |
| if "video" in lower: | |
| return "video" | |
| if "phone" in lower or "call" in lower: | |
| return "phone" | |
| if "in person" in lower or "in-person" in lower or "in_person" in lower: | |
| return "in_person" | |
| return None | |
| # ββ Module singleton ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| _parser: Optional[IntentParser] = None | |
| def get_intent_parser() -> IntentParser: | |
| global _parser | |
| if _parser is None: | |
| _parser = IntentParser() | |
| return _parser | |
| # ββ Offline smoke test βββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _smoke_test_offline(): | |
| """Tests schema, validators, and prompt building without loading the model.""" | |
| logging.basicConfig(level=logging.INFO) | |
| logger.info("Running IntentParser offline smoke testβ¦") | |
| # 1. Valid full intent | |
| intent = SchedulingIntent( | |
| intent = "book_meeting", | |
| caller_name = "Priya Sharma", | |
| preferred_date = "2026-06-10", | |
| preferred_time = "14:00", | |
| duration_minutes = 30, | |
| participants = ["Priya Sharma"], | |
| meeting_type = "video", | |
| notes = None, | |
| confidence = 0.95, | |
| ).compute_missing() | |
| assert intent.missing_fields == [], f"Expected no missing fields, got {intent.missing_fields}" | |
| logger.info(" β Full booking intent β no missing fields") | |
| # 2. Partial intent β date and time missing | |
| partial = SchedulingIntent( | |
| intent = "book_meeting", | |
| caller_name = "Raj", | |
| confidence = 0.4, | |
| ).compute_missing() | |
| assert "preferred_date" in partial.missing_fields | |
| assert "preferred_time" in partial.missing_fields | |
| logger.info(f" β Partial intent missing fields: {partial.missing_fields}") | |
| # 3. Invalid intent string β coerced to "unclear" | |
| coerced = SchedulingIntent(intent="nonsense", confidence=0.1) | |
| assert coerced.intent == "unclear" | |
| logger.info(" β Invalid intent string coerced to 'unclear'") | |
| # 4. Malformed date β None | |
| bad_date = SchedulingIntent(intent="book_meeting", preferred_date="June 10th") | |
| assert bad_date.preferred_date is None | |
| logger.info(" β Malformed date rejected β None") | |
| # 5. Confidence clamping | |
| clamped = SchedulingIntent(intent="unclear", confidence=999.0) | |
| assert clamped.confidence == 1.0 | |
| logger.info(" β Confidence clamped to 1.0") | |
| # 6. Prompt build | |
| parser = IntentParser() | |
| prompt = parser._build_prompt("Hi I want to book a meeting tomorrow at 3pm") | |
| assert "<|im_start|>system" in prompt | |
| logger.info(" β Prompt structure correct") | |
| # 7. Singleton | |
| p1 = get_intent_parser() | |
| p2 = get_intent_parser() | |
| assert p1 is p2 | |
| logger.info(" β module singleton") | |
| logger.info("\nOffline smoke test PASSED β") | |
| if __name__ == "__main__": | |
| _smoke_test_offline() | |