""" ASR Audio Analysis API Server Enterprise-grade REST API for audio processing: - Diarization (stereo/mono) - Whisper Transcription - Professional Audio Analysis """ import os import json import uuid import threading from pathlib import Path from datetime import datetime from flask import Flask, jsonify, send_from_directory, request from flask_cors import CORS from werkzeug.utils import secure_filename app = Flask(__name__) CORS(app) # Configuration BASE_DIR = Path(os.environ.get("APP_DIR", "/app")) OUTPUT_FOLDER = BASE_DIR / "output" UPLOAD_FOLDER = BASE_DIR / "uploads" WHISPER_MODEL = os.environ.get("WHISPER_MODEL", "ramalMr/whisper-small-az") ALLOWED_EXTENSIONS = {'wav', 'mp3', 'm4a', 'flac', 'ogg', 'opus', 'webm'} # Job tracking processing_jobs = {} job_lock = threading.Lock() # Create folders OUTPUT_FOLDER.mkdir(exist_ok=True) UPLOAD_FOLDER.mkdir(exist_ok=True) def allowed_file(filename): return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS def process_audio_file(job_id, audio_path, output_dir): """Process audio: diarization + transcription + analysis""" try: with job_lock: processing_jobs[job_id]['status'] = 'processing' processing_jobs[job_id]['stage'] = 'initializing' from stereo_diarizer import StereoCallDiarizer from whisper_transcriber import WhisperTranscriber from audio_analyzer import AudioAnalyzer # Step 1: Diarization with job_lock: processing_jobs[job_id]['stage'] = 'diarization' diarizer = StereoCallDiarizer(str(audio_path), verbose=False) diarizer.load_audio() with job_lock: processing_jobs[job_id]['is_stereo'] = diarizer.is_stereo left_seg, right_seg = diarizer.detect_speech_segments() diarizer.create_timeline(left_seg, right_seg) segment_files = diarizer.export_segments(str(output_dir)) diarizer.export_full_speakers(str(output_dir)) diarizer.export_transcript_txt(str(output_dir)) diarizer.export_transcript_json(str(output_dir)) # Step 2: Transcription with job_lock: processing_jobs[job_id]['stage'] = 'transcription' whisper = WhisperTranscriber(WHISPER_MODEL, device="cpu", verbose=False) transcribed = whisper.transcribe_segments(segment_files, diarizer.timeline) whisper.export_transcription(transcribed, str(output_dir)) # Step 3: Audio Analysis with job_lock: processing_jobs[job_id]['stage'] = 'audio_analysis' analyzer = AudioAnalyzer(verbose=False) analysis = analyzer.analyze_call( segment_files=segment_files, timeline=diarizer.timeline, call_id=output_dir.name, is_stereo=diarizer.is_stereo ) analyzer.export_analysis(analysis, str(output_dir)) # Success with job_lock: processing_jobs[job_id]['status'] = 'completed' processing_jobs[job_id]['stage'] = 'done' processing_jobs[job_id]['result'] = { 'call_name': output_dir.name, 'is_stereo': diarizer.is_stereo, 'quality_score': analysis.overall_quality_score } except Exception as e: with job_lock: processing_jobs[job_id]['status'] = 'failed' processing_jobs[job_id]['error'] = str(e) @app.route('/') def index(): return send_from_directory('.', 'dashboard.html') @app.route('/api/calls') def get_calls(): try: output_path = Path(OUTPUT_FOLDER) if not output_path.exists(): return jsonify([]) calls = [] for item in output_path.iterdir(): if item.is_dir(): analysis_file = item / 'audio_analysis.json' if analysis_file.exists(): calls.append(item.name) calls.sort(reverse=True) return jsonify(calls) except Exception as e: return jsonify({'error': str(e)}), 500 @app.route('/api/analysis/') def get_analysis(call_name): try: call_path = Path(OUTPUT_FOLDER) / call_name if not call_path.exists(): return jsonify({'error': 'Call not found'}), 404 # Load audio analysis analysis_file = call_path / 'audio_analysis.json' if not analysis_file.exists(): return jsonify({'error': 'Analysis not found'}), 404 with open(analysis_file, 'r', encoding='utf-8') as f: analysis = json.load(f) # Load transcription transcription = None trans_file = call_path / 'transcription.json' if trans_file.exists(): with open(trans_file, 'r', encoding='utf-8') as f: transcription = json.load(f) # Load metadata stats = None stats_file = call_path / 'transcript.json' if stats_file.exists(): with open(stats_file, 'r', encoding='utf-8') as f: data = json.load(f) stats = data.get('metadata') return jsonify({ 'call_name': call_name, 'analysis': analysis, 'transcription': transcription, 'statistics': stats }) except Exception as e: return jsonify({'error': str(e)}), 500 @app.route('/api/audio//') def get_audio(call_name, filename): try: call_path = Path(OUTPUT_FOLDER) / call_name return send_from_directory(call_path, filename) except Exception as e: return jsonify({'error': str(e)}), 404 @app.route('/api/statistics') def get_statistics(): try: output_path = Path(OUTPUT_FOLDER) if not output_path.exists(): return jsonify({'error': 'Output folder not found'}), 404 stats = { 'total_calls': 0, 'stereo_calls': 0, 'mono_calls': 0, 'avg_quality_score': 0, 'avg_duration': 0, 'avg_clarity': 0, 'avg_confidence': 0, 'total_segments': 0, 'emotion_distribution': {}, 'communication_styles': {} } quality_scores = [] durations = [] clarities = [] confidences = [] emotions = [] styles = [] for item in output_path.iterdir(): if item.is_dir(): analysis_file = item / 'audio_analysis.json' if analysis_file.exists(): with open(analysis_file, 'r', encoding='utf-8') as f: analysis = json.load(f) stats['total_calls'] += 1 if analysis.get('audio_type') == 'stereo': stats['stereo_calls'] += 1 else: stats['mono_calls'] += 1 if analysis.get('overall_quality_score'): quality_scores.append(float(analysis['overall_quality_score'])) if analysis.get('audio_duration'): durations.append(float(analysis['audio_duration'])) segments = analysis.get('segments', []) stats['total_segments'] += len(segments) for seg in segments: if seg.get('voice_quality', {}).get('clarity_score'): clarities.append(float(seg['voice_quality']['clarity_score'])) if seg.get('emotion', {}).get('confidence_score'): confidences.append(float(seg['emotion']['confidence_score'])) if seg.get('emotion', {}).get('primary_emotion'): emotions.append(seg['emotion']['primary_emotion']) for profile in analysis.get('speaker_profiles', {}).values(): if profile.get('communication_style'): styles.append(profile['communication_style']) if quality_scores: stats['avg_quality_score'] = round(sum(quality_scores) / len(quality_scores), 1) if durations: stats['avg_duration'] = round(sum(durations) / len(durations), 1) if clarities: stats['avg_clarity'] = round(sum(clarities) / len(clarities), 1) if confidences: stats['avg_confidence'] = round(sum(confidences) / len(confidences), 1) for e in set(emotions): stats['emotion_distribution'][e] = emotions.count(e) for s in set(styles): stats['communication_styles'][s] = styles.count(s) return jsonify(stats) except Exception as e: return jsonify({'error': str(e)}), 500 @app.route('/api/upload', methods=['POST']) def upload_file(): try: if 'file' not in request.files: return jsonify({'error': 'No file provided'}), 400 file = request.files['file'] if file.filename == '': return jsonify({'error': 'No file selected'}), 400 if not allowed_file(file.filename): return jsonify({'error': f'Invalid file type. Allowed: {", ".join(ALLOWED_EXTENSIONS)}'}), 400 job_id = str(uuid.uuid4()) filename = secure_filename(file.filename) timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') unique_filename = f"{timestamp}_{filename}" audio_path = UPLOAD_FOLDER / unique_filename file.save(str(audio_path)) output_dir = OUTPUT_FOLDER / audio_path.stem output_dir.mkdir(exist_ok=True) with job_lock: processing_jobs[job_id] = { 'job_id': job_id, 'filename': filename, 'status': 'queued', 'stage': 'pending', 'created_at': datetime.now().isoformat(), 'audio_path': str(audio_path), 'output_dir': str(output_dir), 'is_stereo': None } thread = threading.Thread( target=process_audio_file, args=(job_id, audio_path, output_dir) ) thread.daemon = True thread.start() return jsonify({ 'job_id': job_id, 'filename': filename, 'status': 'queued', 'message': 'File uploaded. Processing started.' }) except Exception as e: return jsonify({'error': str(e)}), 500 @app.route('/api/jobs/') def get_job_status(job_id): with job_lock: if job_id not in processing_jobs: return jsonify({'error': 'Job not found'}), 404 job = processing_jobs[job_id].copy() return jsonify(job) @app.route('/api/jobs') def get_all_jobs(): with job_lock: jobs = list(processing_jobs.values()) return jsonify(jobs) @app.route('/health') def health(): return jsonify({'status': 'healthy', 'service': 'ASR Audio Intelligence Platform', 'version': '2.0'}) if __name__ == '__main__': OUTPUT_FOLDER.mkdir(exist_ok=True) print("="*60) print("ASR Audio Intelligence Platform") print("="*60) print(f"Output: {OUTPUT_FOLDER}") print(f"Whisper: {WHISPER_MODEL}") print(f"Server: http://localhost:7860") print("="*60) app.run(host='0.0.0.0', port=7860, debug=False, threaded=True)