File size: 14,859 Bytes
0db822c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
import os
import re
from typing import List, Optional
import json

from pydantic import BaseModel, Field
from google import genai
from tenacity import retry, wait_exponential, stop_after_attempt, retry_if_exception_type
from google.genai import errors as genai_errors
import httpx


def _is_retryable(exc: BaseException) -> bool:
    """Return True for transient Gemini / network errors that are worth retrying."""
    if isinstance(exc, (httpx.HTTPError, genai_errors.ServerError)):
        return True
    # google-genai sometimes surfaces a 503 UNAVAILABLE as ClientError
    if isinstance(exc, genai_errors.ClientError) and "503" in str(exc):
        return True
    return False


# ---------------------------------------------------------------------------
# Pydantic output schema
# ---------------------------------------------------------------------------

class AnalysisOutput(BaseModel):
    cleaned_transcript: str = Field(
        description=(
            "The phonetically corrected Arabic transcript. "
            "SPEAKER_01 = Agent (first to say the brand greeting), "
            "SPEAKER_00 = Customer. "
            "Never add words not present in the raw audio."
        )
    )
    agent_name: Optional[str] = Field(
        description="Agent's name extracted from the brand greeting line only.",
        default=None,
    )
    customer_name: Optional[str] = Field(
        description="Customer's name as spoken. Do not guess.", default=None
    )
    unit_number: List[str] = Field(
        description="Unit numbers parsed from the audio exactly as spoken.",
        default_factory=list,
    )
    project_name: Optional[str] = Field(
        description="Project name from the approved list only.", default=None
    )
    department_mentioned: Optional[str] = Field(
        description="Department explicitly named in the call.", default=None
    )
    call_type: str = Field(description="'Inbound' or 'Outbound'")
    customer_satisfaction: int = Field(description="1–5 integer. Infer from tone only.")
    is_urgent: bool = Field(
        description="True if satisfaction ≀ 2 or customer expresses critical frustration."
    )
    pain_points: List[str] = Field(
        description="Specific issues mentioned verbatim.", default_factory=list
    )
    action_items_promised: List[str] = Field(
        description="Commitments made by the agent.", default_factory=list
    )
    next_steps: List[str] = Field(
        description="Follow-up actions that should happen.", default_factory=list
    )


# ---------------------------------------------------------------------------
# Phonetic literal-protection pre-processor
# ---------------------------------------------------------------------------

# Terms that must survive LLM post-processing exactly as written.
# Maps what Whisper produces β†’ what the LLM must keep (same value = preserve).
_LITERAL_TERMS: dict[str, str] = {
    "Ω‡Ψ§ΩˆΨ³ ΩƒΩŠΨ¨Ω†Ψ¬": "Ω‡Ψ§ΩˆΨ³ ΩƒΩŠΨ¨Ω†Ψ¬",   # Housekeeping β€” NEVER β†’ Ω…Ω‚Ψ§ΩŠΨ³Ψ©
    "Ψ΄Ψ§Ω„ΩŠΩ‡": "Ψ΄Ψ§Ω„ΩŠΩ‡",              # Chalet β€” NEVER β†’ Ψ΄Ω‚Ψ©
    "Ψ¬Ψ¨Ψ³ΩˆΩ† بورد": "Ψ¬Ψ¨Ψ³ΩˆΩ† بورد",    # Gypsum board β€” preserve spelling
    "Ψ₯Ω„ Ψ¨ΩˆΨ³ΩƒΩˆ": "IL BOSCO",
    "Ψ§ΩŠΩ„ Ψ¨ΩˆΨ³ΩƒΩˆ": "IL BOSCO",
}

# Greeting patterns that identify the AGENT speaker line
_BRAND_GREETING_PATTERNS: list[str] = [
    r"Ω…Ψ΅Ψ±\s+Ψ₯ΩŠΨ·Ψ§Ω„ΩŠΨ§",
    r"Ω…ΩˆΨ³Ω‰\s+ΩƒΩˆΨ³Ψͺ",
    r"IL\s+BOSCO",
    r"Ψ§ΩŠΩ„\s+Ψ¨ΩˆΨ³ΩƒΩˆ",
    r"Ψ₯Ω„\s+Ψ¨ΩˆΨ³ΩƒΩˆ",
    r"La\s+Nuova",
    r"KAI\s+Sokhna",
    r"Mousa\s+Coast",
    r"Ω…ΨΉ\s+Ψ­ΨΆΨ±ΨͺΩƒ",   # "Ω…ΨΉΩƒ Ψ­ΨΆΨ±ΨͺΩƒ" / "Ω…ΨΉ Ψ­ΨΆΨ±ΨͺΩƒ" β€” agent self-intro
]


def clean_transcript(raw: str) -> str:
    """
    Lightweight pre-processing pass BEFORE the LLM sees the transcript.

    1. Normalises Unicode punctuation so Arabic commas/semicolons are consistent.
    2. Protects literal terms from semantic re-mapping.
    3. Does NOT re-label speakers β€” that is the LLM's job.
    """
    text = raw

    # Normalise Arabic punctuation
    text = text.replace("،", "،").replace(";", "Ψ›")

    # Apply the literal-term substitutions that Whisper frequently gets wrong
    for wrong, right in _LITERAL_TERMS.items():
        text = re.sub(re.escape(wrong), right, text, flags=re.IGNORECASE)

    return text


def identify_agent_speaker(raw_transcript: str, max_lines: int = 20, max_seconds: float = 60.0) -> Optional[str]:
    """
    Scan the opening of a diarised transcript for a brand greeting.

    Two passes:
      1. First `max_lines` lines (catches normal calls quickly).
      2. All lines whose timestamp start <= max_seconds (catches calls with
         long silence / hold music before the agent picks up).

    Returns the SPEAKER_XX label of the greeting line, or None.
    """
    lines = raw_transcript.strip().splitlines()

    def _search(candidate_lines: list[str]) -> Optional[str]:
        for line in candidate_lines:
            for pattern in _BRAND_GREETING_PATTERNS:
                if re.search(pattern, line, re.IGNORECASE):
                    m = re.match(r"(SPEAKER_\d+)", line)
                    if m:
                        return m.group(1)
        return None

    # Pass 1 β€” first N lines
    result = _search(lines[:max_lines])
    if result:
        return result

    # Pass 2 β€” time-based: "SPEAKER_XX [00.0 - 05.2]: ..."
    time_candidates = []
    for line in lines:
        m = re.match(r"SPEAKER_\d+\s*\[([\d.]+)", line)
        if m and float(m.group(1)) <= max_seconds:
            time_candidates.append(line)
    return _search(time_candidates)


# ---------------------------------------------------------------------------
# Main analyser
# ---------------------------------------------------------------------------

_SYSTEM_INSTRUCTION = """\
You are an expert Real Estate Call Analyst for "Misr Italia Properties".
You receive a raw, automatically-transcribed Egyptian Arabic phone call (single
stream β€” no speaker labels) and must:
  (a) separate the speakers and produce a labelled, phonetically-corrected transcript, and
  (b) extract structured business data.

━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
STATELESS MODE β€” TREAT EVERY CALL INDEPENDENTLY
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
β€’ Do NOT carry over context, vocabulary biases, or assumptions from any previous call.
β€’ The domain of each call (Maintenance, Housekeeping, Sales, …) is determined solely
  by what is said in THIS transcript.
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
SPEAKER IDENTIFICATION β€” LINGUISTIC DIARIZATION
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
The input is a single-stream transcript. Your primary task is to SEPARATE this text 
into a dialogue between AGENT (SPEAKER_01) and CUSTOMER (SPEAKER_00).

1. IDENTIFY THE AGENT:
   - The party who gives the brand greeting (e.g., "Misr Italia properties", "Ω…ΨΉ Ψ­ΨΆΨ±ΨͺΩƒ Ω…Ω† Ω…Ψ΅Ψ± Ψ₯ΩŠΨ·Ψ§Ω„ΩŠΨ§") is the AGENT.
   - The party who explains project availability, offers appointments, or asks for the customer's budget is the AGENT.
   - Map this speaker to SPEAKER_01.

2. IDENTIFY THE CUSTOMER:
   - The party who asks about prices, location, or expresses a problem/enquiry is the CUSTOMER.
   - Map this speaker to SPEAKER_00.

3. CONSTRUCT THE DIALOGUE:
   - Partition the raw text into logical turns based on shifts in tone and intent. 
   - Label every turn in the `cleaned_transcript` field: 
     SPEAKER_01: [Agent's Arabic text]
     SPEAKER_00: [Customer's Arabic text]
   - Ensure the final output is a coherent, chronological dialogue without timestamps.

━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
APPROVED PROJECTS (use exact spelling)
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
IL BOSCO Β· IL BOSCO City Β· La Nuova Vista Β· KAI Sokhna Β· Vinci Β· Solare Β·
Mousa Coast Β· Street 31 Mall Β· Cairo Business Park Β· Garden 8 Β· Italian SQ Β·
ElGoom Italian Hotel Β· HQ

━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
PHONETIC FIDELITY β€” CRITICAL RULES
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
You MUST prioritise literal transcription over semantic guessing.

1. PRESERVE these terms exactly if they appear β€” do NOT remap:
   β€’ "Ω‡Ψ§ΩˆΨ³ ΩƒΩŠΨ¨Ω†Ψ¬"  β†’ keep as "Ω‡Ψ§ΩˆΨ³ ΩƒΩŠΨ¨Ω†Ψ¬"  (Housekeeping dept). NEVER β†’ "Ω…Ω‚Ψ§ΩŠΨ³Ψ©".
   β€’ "Ψ΄Ψ§Ω„ΩŠΩ‡"       β†’ keep as "Ψ΄Ψ§Ω„ΩŠΩ‡"        (Chalet). NEVER β†’ "Ψ΄Ω‚Ψ©".
   β€’ "Ψ¬Ψ¨Ψ³ΩˆΩ† بورد"  β†’ keep as "Ψ¬Ψ¨Ψ³ΩˆΩ† بورد".
   β€’ "IL BOSCO"    β†’ keep exact Latin spelling.

2. PHONETIC CORRECTIONS you ARE allowed to make:
   β€’ "Ψ¬ΩŠΩ…Ψ²Ω† بورد" / "Ψ¬Ψ¨Ψ³Ω† بورد"    β†’ "Ψ¬Ψ¨Ψ³ΩˆΩ† بورد"
   β€’ "Ψ§Ω„ΨͺΨ±ΩŠΨ΄Ω†" / "Ψ§Ω„ΨͺΨ±Ψ΄Ω†"           β†’ "Alteration"  (NOT "Operations")
   β€’ "ΨΉΩŠΨ³Ω‰" / "Ω…Ψ§ΩŠΨ³Ψ©" / "Ω…ΩˆΩŠΨ³Ψ©" in context of utilities/electricity β†’ "Ω…Ω‚Ψ§ΩŠΨ³Ψ©"
   β€’ "Ω…ΨΉΨ§ΩŠΩ†Ω‡" / "Ω…ΨΉΨ§ΩŠΩ†Ψ§"            β†’ "Ω…ΨΉΨ§ΩŠΩ†Ψ©"
   β€’ Agent name phonetic typos in the greeting line only.

3. NO HALLUCINATIONS:
   β€’ Do NOT add greetings, filler phrases, or any word absent from the raw transcript.
   β€’ If a field value cannot be determined, return null/empty.

━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
EGYPTIAN NUMBER PARSING
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
Unit numbers are spoken in digit-pairs:
  "ΨͺΩ„Ψ§ΨͺΩŠΩ† Ψ§ΨͺΩ†ΩŠΩ† ΩˆΨ§Ψ±Ψ¨ΨΉΩŠΩ†"  β†’  "3042"   (NOT "30242")
  "ΨͺΩ„Ψ§ΨͺΨ© ΩˆΨ§Ψ±Ψ¨ΨΉΩŠΩ†"         β†’  "343"    (NOT "30243")
Parse only numbers explicitly spoken as unit identifiers.
"""


_CORRECTION_ONLY_INSTRUCTION = """\
You are a phonetic spell-checker for Egyptian Arabic speech-to-text output.

Your ONLY job is to fix errors introduced by the ASR system:
  β€’ Correct phonetic misspellings (e.g. "Ψ¬ΩŠΩ…Ψ²Ω† بورد" β†’ "Ψ¬Ψ¨Ψ³ΩˆΩ† بورد").
  β€’ Fix obvious ASR word-boundary errors.
  β€’ Normalise Arabic punctuation (، Ψ› …).
  β€’ Apply the literal-term protections below β€” never remap these:
      "Ω‡Ψ§ΩˆΨ³ ΩƒΩŠΨ¨Ω†Ψ¬" stays "Ω‡Ψ§ΩˆΨ³ ΩƒΩŠΨ¨Ω†Ψ¬"  (NEVER β†’ "Ω…Ω‚Ψ§ΩŠΨ³Ψ©")
      "Ψ΄Ψ§Ω„ΩŠΩ‡"       stays "Ψ΄Ψ§Ω„ΩŠΩ‡"        (NEVER β†’ "Ψ΄Ω‚Ψ©")
      "Ψ¬Ψ¨Ψ³ΩˆΩ† بورد"  stays "Ψ¬Ψ¨Ψ³ΩˆΩ† بورد"
      "IL BOSCO"    stays "IL BOSCO"     (exact Latin spelling)

Rules you MUST follow:
  1. Do NOT add speaker labels (SPEAKER_XX, Ψ£:, Ψ¨:, or any prefix).
  2. Do NOT restructure the text into dialogue format.
  3. Do NOT add, remove, or paraphrase any words β€” only fix spelling.
  4. Return a single continuous corrected Arabic text string.
  5. If a passage is already correct, return it unchanged.
"""


class _CorrectionResult(BaseModel):
    corrected_transcript: str


class CallAnalyzer:
    def __init__(self, api_key: Optional[str] = None):
        self.api_key = api_key or os.environ.get("GEMINI_API_KEY")
        if not self.api_key:
            raise ValueError("GEMINI_API_KEY environment variable is required.")

        self.client = genai.Client(api_key=self.api_key)
        self.system_instruction = _SYSTEM_INSTRUCTION
        # Default to the user-specified stable model
        self.model_name = os.environ.get("GEMINI_MODEL", "gemini-2.5-flash")
        print(f"INFO: CallAnalyzer initialized with Gemini model: {self.model_name}")

    @retry(
        wait=wait_exponential(multiplier=2, min=5, max=60),
        stop=stop_after_attempt(5),
        retry=retry_if_exception_type(Exception),
        reraise=True,
    )
    def analyze(self, transcript: str) -> AnalysisOutput:
        # Pre-process before sending to Gemini
        cleaned_input = clean_transcript(transcript)

        response = self.client.models.generate_content(
            model=self.model_name,
            contents=[
                {"role": "user", "parts": [{"text": f"SYSTEM INSTRUCTION: {self.system_instruction}\n\nTRANSCRIPT TO ANALYZE:\n{cleaned_input}"}]}
            ],
            config={
                "response_mime_type": "application/json",
                "response_schema": AnalysisOutput,
                "temperature": 0.1,
            },
        )
        
        if not response.parsed:
            # Fallback if parsing failed or reached safety filters
            raise ValueError(f"Gemini failed to return parsed output. Response: {response.text}")

        return response.parsed

    @retry(
        wait=wait_exponential(multiplier=2, min=5, max=60),
        stop=stop_after_attempt(5),
        retry=retry_if_exception_type(Exception),
        reraise=True,
    )
    def correct_only(self, transcript: str) -> str:
        """
        Lightweight Gemini pass: phonetic/spelling correction only.
        No speaker diarisation, no entity extraction, no restructuring.
        Returns a single corrected Arabic string.
        """
        cleaned_input = clean_transcript(transcript)

        response = self.client.models.generate_content(
            model=self.model_name,
            contents=[
                {
                    "role": "user",
                    "parts": [
                        {
                            "text": (
                                f"INSTRUCTION:\n{_CORRECTION_ONLY_INSTRUCTION}\n\n"
                                f"TRANSCRIPT TO CORRECT:\n{cleaned_input}"
                            )
                        }
                    ],
                }
            ],
            config={
                "response_mime_type": "application/json",
                "response_schema": _CorrectionResult,
                "temperature": 0.1,
            },
        )

        if not response.parsed:
            raise ValueError(
                f"Gemini failed to return a corrected transcript. Response: {response.text}"
            )

        return response.parsed.corrected_transcript

if __name__ == "__main__":
    # Quick smoke test
    import dotenv
    dotenv.load_dotenv()
    
    analyzer = CallAnalyzer()
    test_transcript = "SPEAKER_01: Ψ£Ω‡Ω„Ψ§Ω‹ Ψ¨Ωƒ في Ω…Ψ΅Ψ± Ψ₯ΩŠΨ·Ψ§Ω„ΩŠΨ§ΨŒ Ω…ΨΉΩƒ Ψ£Ψ­Ω…Ψ― Ψ§Ω„Ω…Ψ­Ω…Ψ―ΩŠ. SPEAKER_00: Ψ£Ω‡Ω„Ψ§Ω‹ Ψ¨ΩƒΨŒ ΩƒΩ†Ψͺ أريد Ψ§Ω„Ψ§Ψ³Ψͺفسار ΨΉΩ† Ω…Ψ΄Ψ±ΩˆΨΉ Ψ₯Ω„ Ψ¨ΩˆΨ³ΩƒΩˆ."
    try:
        result = analyzer.analyze(test_transcript)
        print("Analysis Result:")
        print(result.model_dump_json(indent=2))
    except Exception as e:
        print(f"Test failed: {e}")