File size: 9,396 Bytes
db4323e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
de17b7d
db4323e
de17b7d
 
db4323e
de17b7d
db4323e
 
de17b7d
db4323e
 
de17b7d
db4323e
 
 
 
 
 
 
 
de17b7d
db4323e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
de17b7d
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
"""
Voice Analysis API for Salesforce
==================================
Endpoints:
  /analyze - Full analysis (diarization + overlap + voice metrics)
  
Returns JSON that Salesforce can parse.

Models used:
  - pyannote/speaker-diarization-3.1 (who spoke when)
  - pyannote/overlapped-speech-detection (coaching detection)
"""

import gradio as gr
import os
import json
import torch
from pyannote.audio import Pipeline
import numpy as np

# ============================================================
# CONFIGURATION
# ============================================================

HF_TOKEN = os.environ.get("HF_TOKEN")

if not HF_TOKEN:
    print("WARNING: HF_TOKEN not set. Gated models will fail.")

# ============================================================
# LOAD MODELS (runs once at startup)
# ============================================================

print("Loading diarization model...")
try:
    diarization_pipeline = Pipeline.from_pretrained(
        "pyannote/speaker-diarization-3.1",
        use_auth_token=HF_TOKEN
    )
    print("βœ… Diarization model loaded")
except Exception as e:
    print(f"❌ Diarization model failed: {e}")
    diarization_pipeline = None

print("Loading overlap detection model...")
try:
    overlap_pipeline = Pipeline.from_pretrained(
        "pyannote/overlapped-speech-detection",
        use_auth_token=HF_TOKEN
    )
    print("βœ… Overlap detection model loaded")
except Exception as e:
    print(f"❌ Overlap detection failed: {e}")
    overlap_pipeline = None


# ============================================================
# ANALYSIS FUNCTIONS
# ============================================================

def analyze_diarization(audio_path):
    """
    Identifies different speakers and their timestamps.
    Returns list of segments with speaker labels.
    """
    if diarization_pipeline is None:
        return {"error": "Diarization model not loaded"}
    
    try:
        diarization = diarization_pipeline(audio_path)
        
        segments = []
        for turn, _, speaker in diarization.itertracks(yield_label=True):
            segments.append({
                "speaker": speaker,
                "start": round(turn.start, 2),
                "end": round(turn.end, 2),
                "duration": round(turn.end - turn.start, 2)
            })
        
        # Identify borrower (assumes agent speaks first)
        speakers = list(set([s["speaker"] for s in segments]))
        agent_speaker = segments[0]["speaker"] if segments else None
        borrower_speaker = None
        for s in speakers:
            if s != agent_speaker:
                borrower_speaker = s
                break
        
        return {
            "segments": segments,
            "speaker_count": len(speakers),
            "agent_speaker": agent_speaker,
            "borrower_speaker": borrower_speaker,
            "total_segments": len(segments)
        }
    
    except Exception as e:
        return {"error": str(e)}


def analyze_overlap(audio_path):
    """
    Detects overlapping speech (multiple people talking at once).
    Used for coaching detection.
    """
    if overlap_pipeline is None:
        return {"error": "Overlap detection model not loaded"}
    
    try:
        overlap = overlap_pipeline(audio_path)
        
        overlap_segments = []
        for segment, _, label in overlap.itertracks(yield_label=True):
            overlap_segments.append({
                "start": round(segment.start, 2),
                "end": round(segment.end, 2),
                "duration": round(segment.end - segment.start, 2)
            })
        
        total_overlap_duration = sum([s["duration"] for s in overlap_segments])
        
        return {
            "overlap_segments": overlap_segments,
            "overlap_count": len(overlap_segments),
            "total_overlap_duration": round(total_overlap_duration, 2)
        }
    
    except Exception as e:
        return {"error": str(e)}


def detect_coaching(diarization_result, overlap_result):
    """
    Cross-references overlap with borrower segments.
    Overlap during borrower's speech = potential coaching.
    """
    coaching_flags = []
    
    if "error" in diarization_result or "error" in overlap_result:
        return {
            "coaching_detected": False,
            "error": "Could not analyze - model error"
        }
    
    borrower_speaker = diarization_result.get("borrower_speaker")
    
    if not borrower_speaker:
        return {
            "coaching_detected": False,
            "reason": "Could not identify borrower"
        }
    
    # Get borrower segments
    borrower_segments = [
        s for s in diarization_result["segments"] 
        if s["speaker"] == borrower_speaker
    ]
    
    # Get overlap segments
    overlap_segments = overlap_result.get("overlap_segments", [])
    
    # Check if any overlap falls within borrower's speaking time
    for overlap in overlap_segments:
        for borrower_seg in borrower_segments:
            # Check if overlap is during borrower's speech
            if (overlap["start"] >= borrower_seg["start"] and 
                overlap["start"] <= borrower_seg["end"]):
                coaching_flags.append({
                    "overlap_time": f"{overlap['start']}-{overlap['end']}",
                    "during_borrower_segment": f"{borrower_seg['start']}-{borrower_seg['end']}",
                    "duration": overlap["duration"]
                })
    
    return {
        "coaching_detected": len(coaching_flags) > 0,
        "coaching_instances": len(coaching_flags),
        "coaching_flags": coaching_flags,
        "borrower_segments_analyzed": len(borrower_segments)
    }


def analyze_voice_metrics(audio_path):
    """
    Basic voice analysis - pause detection, speaking rate.
    For hesitation indicators.
    """
    try:
        import librosa
        
        # Load audio
        y, sr = librosa.load(audio_path, sr=16000)
        
        duration = len(y) / sr
        
        # Simple energy-based silence detection
        energy = np.abs(y)
        threshold = np.mean(energy) * 0.1
        silence_samples = np.sum(energy < threshold)
        silence_ratio = silence_samples / len(y)
        
        return {
            "duration_seconds": round(duration, 2),
            "silence_ratio": round(silence_ratio, 3),
            "has_long_pauses": silence_ratio > 0.3
        }
    
    except Exception as e:
        return {"error": str(e), "duration_seconds": 0, "silence_ratio": 0, "has_long_pauses": False}


# ============================================================
# MAIN ANALYSIS FUNCTION
# ============================================================

def full_analysis(audio_file):
    """
    Complete audio analysis - called by Gradio/API.
    Returns JSON with all results.
    """
    if audio_file is None:
        return json.dumps({"error": "No audio file provided"}, indent=2)
    
    results = {
        "status": "success",
        "analysis": {}
    }
    
    try:
        # Run all analyses
        print(f"Analyzing: {audio_file}")
        
        # 1. Diarization
        print("Running diarization...")
        diarization_result = analyze_diarization(audio_file)
        results["analysis"]["diarization"] = diarization_result
        
        # 2. Overlap detection
        print("Running overlap detection...")
        overlap_result = analyze_overlap(audio_file)
        results["analysis"]["overlap"] = overlap_result
        
        # 3. Coaching detection (cross-reference)
        print("Analyzing coaching...")
        coaching_result = detect_coaching(diarization_result, overlap_result)
        results["analysis"]["coaching"] = coaching_result
        
        # 4. Voice metrics
        print("Analyzing voice metrics...")
        voice_result = analyze_voice_metrics(audio_file)
        results["analysis"]["voice_metrics"] = voice_result
        
        # 5. Summary
        results["summary"] = {
            "speaker_count": diarization_result.get("speaker_count", 0),
            "coaching_detected": coaching_result.get("coaching_detected", False),
            "coaching_instances": coaching_result.get("coaching_instances", 0),
            "has_long_pauses": voice_result.get("has_long_pauses", False),
            "total_overlap_duration": overlap_result.get("total_overlap_duration", 0)
        }
        
        print("Analysis complete!")
        
    except Exception as e:
        results["status"] = "error"
        results["error"] = str(e)
    
    return json.dumps(results, indent=2)


# ============================================================
# GRADIO INTERFACE
# ============================================================

demo = gr.Interface(
    fn=full_analysis,
    inputs=gr.Audio(type="filepath", label="Upload Audio (MP3, WAV, M4A)"),
    outputs=gr.JSON(label="Analysis Results"),
    title="πŸŽ™οΈ Voice Analysis API for Salesforce",
    description="""
    Upload a call recording to analyze:
    - **Speaker Diarization**: Who spoke when
    - **Coaching Detection**: Overlapping speech during borrower's responses  
    - **Voice Metrics**: Pause detection, silence ratio
    
    Returns JSON that Salesforce can parse via Apex callout.
    """,
    examples=[],
    allow_flagging="never"
)

# Launch with API enabled
demo.launch()