import os import re from typing import List, Optional import json from pydantic import BaseModel, Field from google import genai from tenacity import retry, wait_exponential, stop_after_attempt, retry_if_exception_type from google.genai import errors as genai_errors import httpx def _is_retryable(exc: BaseException) -> bool: """Return True for transient Gemini / network errors that are worth retrying.""" if isinstance(exc, (httpx.HTTPError, genai_errors.ServerError)): return True # google-genai sometimes surfaces a 503 UNAVAILABLE as ClientError if isinstance(exc, genai_errors.ClientError) and "503" in str(exc): return True return False # --------------------------------------------------------------------------- # Pydantic output schema # --------------------------------------------------------------------------- class AnalysisOutput(BaseModel): cleaned_transcript: str = Field( description=( "The phonetically corrected Arabic transcript. " "SPEAKER_01 = Agent (first to say the brand greeting), " "SPEAKER_00 = Customer. " "Never add words not present in the raw audio." ) ) agent_name: Optional[str] = Field( description="Agent's name extracted from the brand greeting line only.", default=None, ) customer_name: Optional[str] = Field( description="Customer's name as spoken. Do not guess.", default=None ) unit_number: List[str] = Field( description="Unit numbers parsed from the audio exactly as spoken.", default_factory=list, ) project_name: Optional[str] = Field( description="Project name from the approved list only.", default=None ) department_mentioned: Optional[str] = Field( description="Department explicitly named in the call.", default=None ) call_type: str = Field(description="'Inbound' or 'Outbound'") customer_satisfaction: int = Field(description="1–5 integer. Infer from tone only.") is_urgent: bool = Field( description="True if satisfaction ≤ 2 or customer expresses critical frustration." ) pain_points: List[str] = Field( description="Specific issues mentioned verbatim.", default_factory=list ) action_items_promised: List[str] = Field( description="Commitments made by the agent.", default_factory=list ) next_steps: List[str] = Field( description="Follow-up actions that should happen.", default_factory=list ) # --------------------------------------------------------------------------- # Phonetic literal-protection pre-processor # --------------------------------------------------------------------------- # Terms that must survive LLM post-processing exactly as written. # Maps what Whisper produces → what the LLM must keep (same value = preserve). _LITERAL_TERMS: dict[str, str] = { "هاوس كيبنج": "هاوس كيبنج", # Housekeeping — NEVER → مقايسة "شاليه": "شاليه", # Chalet — NEVER → شقة "جبسون بورد": "جبسون بورد", # Gypsum board — preserve spelling "إل بوسكو": "IL BOSCO", "ايل بوسكو": "IL BOSCO", } # Greeting patterns that identify the AGENT speaker line _BRAND_GREETING_PATTERNS: list[str] = [ r"مصر\s+إيطاليا", r"موسى\s+كوست", r"IL\s+BOSCO", r"ايل\s+بوسكو", r"إل\s+بوسكو", r"La\s+Nuova", r"KAI\s+Sokhna", r"Mousa\s+Coast", r"مع\s+حضرتك", # "معك حضرتك" / "مع حضرتك" — agent self-intro ] def clean_transcript(raw: str) -> str: """ Lightweight pre-processing pass BEFORE the LLM sees the transcript. 1. Normalises Unicode punctuation so Arabic commas/semicolons are consistent. 2. Protects literal terms from semantic re-mapping. 3. Does NOT re-label speakers — that is the LLM's job. """ text = raw # Normalise Arabic punctuation text = text.replace("،", "،").replace(";", "؛") # Apply the literal-term substitutions that Whisper frequently gets wrong for wrong, right in _LITERAL_TERMS.items(): text = re.sub(re.escape(wrong), right, text, flags=re.IGNORECASE) return text def identify_agent_speaker(raw_transcript: str, max_lines: int = 20, max_seconds: float = 60.0) -> Optional[str]: """ Scan the opening of a diarised transcript for a brand greeting. Two passes: 1. First `max_lines` lines (catches normal calls quickly). 2. All lines whose timestamp start <= max_seconds (catches calls with long silence / hold music before the agent picks up). Returns the SPEAKER_XX label of the greeting line, or None. """ lines = raw_transcript.strip().splitlines() def _search(candidate_lines: list[str]) -> Optional[str]: for line in candidate_lines: for pattern in _BRAND_GREETING_PATTERNS: if re.search(pattern, line, re.IGNORECASE): m = re.match(r"(SPEAKER_\d+)", line) if m: return m.group(1) return None # Pass 1 — first N lines result = _search(lines[:max_lines]) if result: return result # Pass 2 — time-based: "SPEAKER_XX [00.0 - 05.2]: ..." time_candidates = [] for line in lines: m = re.match(r"SPEAKER_\d+\s*\[([\d.]+)", line) if m and float(m.group(1)) <= max_seconds: time_candidates.append(line) return _search(time_candidates) # --------------------------------------------------------------------------- # Main analyser # --------------------------------------------------------------------------- _SYSTEM_INSTRUCTION = """\ You are an expert Real Estate Call Analyst for "Misr Italia Properties". You receive a raw, automatically-transcribed Egyptian Arabic phone call (single stream — no speaker labels) and must: (a) separate the speakers and produce a labelled, phonetically-corrected transcript, and (b) extract structured business data. ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ STATELESS MODE — TREAT EVERY CALL INDEPENDENTLY ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ • Do NOT carry over context, vocabulary biases, or assumptions from any previous call. • The domain of each call (Maintenance, Housekeeping, Sales, …) is determined solely by what is said in THIS transcript. ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ SPEAKER IDENTIFICATION — LINGUISTIC DIARIZATION ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ The input is a single-stream transcript. Your primary task is to SEPARATE this text into a dialogue between AGENT (SPEAKER_01) and CUSTOMER (SPEAKER_00). 1. IDENTIFY THE AGENT: - The party who gives the brand greeting (e.g., "Misr Italia properties", "مع حضرتك من مصر إيطاليا") is the AGENT. - The party who explains project availability, offers appointments, or asks for the customer's budget is the AGENT. - Map this speaker to SPEAKER_01. 2. IDENTIFY THE CUSTOMER: - The party who asks about prices, location, or expresses a problem/enquiry is the CUSTOMER. - Map this speaker to SPEAKER_00. 3. CONSTRUCT THE DIALOGUE: - Partition the raw text into logical turns based on shifts in tone and intent. - Label every turn in the `cleaned_transcript` field: SPEAKER_01: [Agent's Arabic text] SPEAKER_00: [Customer's Arabic text] - Ensure the final output is a coherent, chronological dialogue without timestamps. ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ APPROVED PROJECTS (use exact spelling) ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ IL BOSCO · IL BOSCO City · La Nuova Vista · KAI Sokhna · Vinci · Solare · Mousa Coast · Street 31 Mall · Cairo Business Park · Garden 8 · Italian SQ · ElGoom Italian Hotel · HQ ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ PHONETIC FIDELITY — CRITICAL RULES ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ You MUST prioritise literal transcription over semantic guessing. 1. PRESERVE these terms exactly if they appear — do NOT remap: • "هاوس كيبنج" → keep as "هاوس كيبنج" (Housekeeping dept). NEVER → "مقايسة". • "شاليه" → keep as "شاليه" (Chalet). NEVER → "شقة". • "جبسون بورد" → keep as "جبسون بورد". • "IL BOSCO" → keep exact Latin spelling. 2. PHONETIC CORRECTIONS you ARE allowed to make: • "جيمزن بورد" / "جبسن بورد" → "جبسون بورد" • "التريشن" / "الترشن" → "Alteration" (NOT "Operations") • "عيسى" / "مايسة" / "مويسة" in context of utilities/electricity → "مقايسة" • "معاينه" / "معاينا" → "معاينة" • Agent name phonetic typos in the greeting line only. 3. NO HALLUCINATIONS: • Do NOT add greetings, filler phrases, or any word absent from the raw transcript. • If a field value cannot be determined, return null/empty. ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ EGYPTIAN NUMBER PARSING ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ Unit numbers are spoken in digit-pairs: "تلاتين اتنين واربعين" → "3042" (NOT "30242") "تلاتة واربعين" → "343" (NOT "30243") Parse only numbers explicitly spoken as unit identifiers. """ _CORRECTION_ONLY_INSTRUCTION = """\ You are a phonetic spell-checker for Egyptian Arabic speech-to-text output. Your ONLY job is to fix errors introduced by the ASR system: • Correct phonetic misspellings (e.g. "جيمزن بورد" → "جبسون بورد"). • Fix obvious ASR word-boundary errors. • Normalise Arabic punctuation (، ؛ …). • Apply the literal-term protections below — never remap these: "هاوس كيبنج" stays "هاوس كيبنج" (NEVER → "مقايسة") "شاليه" stays "شاليه" (NEVER → "شقة") "جبسون بورد" stays "جبسون بورد" "IL BOSCO" stays "IL BOSCO" (exact Latin spelling) Rules you MUST follow: 1. Do NOT add speaker labels (SPEAKER_XX, أ:, ب:, or any prefix). 2. Do NOT restructure the text into dialogue format. 3. Do NOT add, remove, or paraphrase any words — only fix spelling. 4. Return a single continuous corrected Arabic text string. 5. If a passage is already correct, return it unchanged. """ class _CorrectionResult(BaseModel): corrected_transcript: str class CallAnalyzer: def __init__(self, api_key: Optional[str] = None): self.api_key = api_key or os.environ.get("GEMINI_API_KEY") if not self.api_key: raise ValueError("GEMINI_API_KEY environment variable is required.") self.client = genai.Client(api_key=self.api_key) self.system_instruction = _SYSTEM_INSTRUCTION # Default to the user-specified stable model self.model_name = os.environ.get("GEMINI_MODEL", "gemini-2.5-flash") print(f"INFO: CallAnalyzer initialized with Gemini model: {self.model_name}") @retry( wait=wait_exponential(multiplier=2, min=5, max=60), stop=stop_after_attempt(5), retry=retry_if_exception_type(Exception), reraise=True, ) def analyze(self, transcript: str) -> AnalysisOutput: # Pre-process before sending to Gemini cleaned_input = clean_transcript(transcript) response = self.client.models.generate_content( model=self.model_name, contents=[ {"role": "user", "parts": [{"text": f"SYSTEM INSTRUCTION: {self.system_instruction}\n\nTRANSCRIPT TO ANALYZE:\n{cleaned_input}"}]} ], config={ "response_mime_type": "application/json", "response_schema": AnalysisOutput, "temperature": 0.1, }, ) if not response.parsed: # Fallback if parsing failed or reached safety filters raise ValueError(f"Gemini failed to return parsed output. Response: {response.text}") return response.parsed @retry( wait=wait_exponential(multiplier=2, min=5, max=60), stop=stop_after_attempt(5), retry=retry_if_exception_type(Exception), reraise=True, ) def correct_only(self, transcript: str) -> str: """ Lightweight Gemini pass: phonetic/spelling correction only. No speaker diarisation, no entity extraction, no restructuring. Returns a single corrected Arabic string. """ cleaned_input = clean_transcript(transcript) response = self.client.models.generate_content( model=self.model_name, contents=[ { "role": "user", "parts": [ { "text": ( f"INSTRUCTION:\n{_CORRECTION_ONLY_INSTRUCTION}\n\n" f"TRANSCRIPT TO CORRECT:\n{cleaned_input}" ) } ], } ], config={ "response_mime_type": "application/json", "response_schema": _CorrectionResult, "temperature": 0.1, }, ) if not response.parsed: raise ValueError( f"Gemini failed to return a corrected transcript. Response: {response.text}" ) return response.parsed.corrected_transcript if __name__ == "__main__": # Quick smoke test import dotenv dotenv.load_dotenv() analyzer = CallAnalyzer() test_transcript = "SPEAKER_01: أهلاً بك في مصر إيطاليا، معك أحمد المحمدي. SPEAKER_00: أهلاً بك، كنت أريد الاستفسار عن مشروع إل بوسكو." try: result = analyzer.analyze(test_transcript) print("Analysis Result:") print(result.model_dump_json(indent=2)) except Exception as e: print(f"Test failed: {e}")