JIS-ASR / api_server.py
ramalMr's picture
Update api_server.py
3812759 verified
"""
ASR Audio Analysis API Server
Enterprise-grade REST API for audio processing:
- Diarization (stereo/mono)
- Whisper Transcription
- Professional Audio Analysis
"""
import os
import json
import uuid
import threading
from pathlib import Path
from datetime import datetime
from flask import Flask, jsonify, send_from_directory, request
from flask_cors import CORS
from werkzeug.utils import secure_filename
app = Flask(__name__)
CORS(app)
# Configuration
BASE_DIR = Path(os.environ.get("APP_DIR", "/app"))
OUTPUT_FOLDER = BASE_DIR / "output"
UPLOAD_FOLDER = BASE_DIR / "uploads"
WHISPER_MODEL = os.environ.get("WHISPER_MODEL", "ramalMr/whisper-small-az")
ALLOWED_EXTENSIONS = {'wav', 'mp3', 'm4a', 'flac', 'ogg', 'opus', 'webm'}
# Job tracking
processing_jobs = {}
job_lock = threading.Lock()
# Create folders
OUTPUT_FOLDER.mkdir(exist_ok=True)
UPLOAD_FOLDER.mkdir(exist_ok=True)
def allowed_file(filename):
return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS
def process_audio_file(job_id, audio_path, output_dir):
"""Process audio: diarization + transcription + analysis"""
try:
with job_lock:
processing_jobs[job_id]['status'] = 'processing'
processing_jobs[job_id]['stage'] = 'initializing'
from stereo_diarizer import StereoCallDiarizer
from whisper_transcriber import WhisperTranscriber
from audio_analyzer import AudioAnalyzer
# Step 1: Diarization
with job_lock:
processing_jobs[job_id]['stage'] = 'diarization'
diarizer = StereoCallDiarizer(str(audio_path), verbose=False)
diarizer.load_audio()
with job_lock:
processing_jobs[job_id]['is_stereo'] = diarizer.is_stereo
left_seg, right_seg = diarizer.detect_speech_segments()
diarizer.create_timeline(left_seg, right_seg)
segment_files = diarizer.export_segments(str(output_dir))
diarizer.export_full_speakers(str(output_dir))
diarizer.export_transcript_txt(str(output_dir))
diarizer.export_transcript_json(str(output_dir))
# Step 2: Transcription
with job_lock:
processing_jobs[job_id]['stage'] = 'transcription'
whisper = WhisperTranscriber(WHISPER_MODEL, device="cpu", verbose=False)
transcribed = whisper.transcribe_segments(segment_files, diarizer.timeline)
whisper.export_transcription(transcribed, str(output_dir))
# Step 3: Audio Analysis
with job_lock:
processing_jobs[job_id]['stage'] = 'audio_analysis'
analyzer = AudioAnalyzer(verbose=False)
analysis = analyzer.analyze_call(
segment_files=segment_files,
timeline=diarizer.timeline,
call_id=output_dir.name,
is_stereo=diarizer.is_stereo
)
analyzer.export_analysis(analysis, str(output_dir))
# Success
with job_lock:
processing_jobs[job_id]['status'] = 'completed'
processing_jobs[job_id]['stage'] = 'done'
processing_jobs[job_id]['result'] = {
'call_name': output_dir.name,
'is_stereo': diarizer.is_stereo,
'quality_score': analysis.overall_quality_score
}
except Exception as e:
with job_lock:
processing_jobs[job_id]['status'] = 'failed'
processing_jobs[job_id]['error'] = str(e)
@app.route('/')
def index():
return send_from_directory('.', 'dashboard.html')
@app.route('/api/calls')
def get_calls():
try:
output_path = Path(OUTPUT_FOLDER)
if not output_path.exists():
return jsonify([])
calls = []
for item in output_path.iterdir():
if item.is_dir():
analysis_file = item / 'audio_analysis.json'
if analysis_file.exists():
calls.append(item.name)
calls.sort(reverse=True)
return jsonify(calls)
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/analysis/<call_name>')
def get_analysis(call_name):
try:
call_path = Path(OUTPUT_FOLDER) / call_name
if not call_path.exists():
return jsonify({'error': 'Call not found'}), 404
# Load audio analysis
analysis_file = call_path / 'audio_analysis.json'
if not analysis_file.exists():
return jsonify({'error': 'Analysis not found'}), 404
with open(analysis_file, 'r', encoding='utf-8') as f:
analysis = json.load(f)
# Load transcription
transcription = None
trans_file = call_path / 'transcription.json'
if trans_file.exists():
with open(trans_file, 'r', encoding='utf-8') as f:
transcription = json.load(f)
# Load metadata
stats = None
stats_file = call_path / 'transcript.json'
if stats_file.exists():
with open(stats_file, 'r', encoding='utf-8') as f:
data = json.load(f)
stats = data.get('metadata')
return jsonify({
'call_name': call_name,
'analysis': analysis,
'transcription': transcription,
'statistics': stats
})
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/audio/<call_name>/<filename>')
def get_audio(call_name, filename):
try:
call_path = Path(OUTPUT_FOLDER) / call_name
return send_from_directory(call_path, filename)
except Exception as e:
return jsonify({'error': str(e)}), 404
@app.route('/api/statistics')
def get_statistics():
try:
output_path = Path(OUTPUT_FOLDER)
if not output_path.exists():
return jsonify({'error': 'Output folder not found'}), 404
stats = {
'total_calls': 0,
'stereo_calls': 0,
'mono_calls': 0,
'avg_quality_score': 0,
'avg_duration': 0,
'avg_clarity': 0,
'avg_confidence': 0,
'total_segments': 0,
'emotion_distribution': {},
'communication_styles': {}
}
quality_scores = []
durations = []
clarities = []
confidences = []
emotions = []
styles = []
for item in output_path.iterdir():
if item.is_dir():
analysis_file = item / 'audio_analysis.json'
if analysis_file.exists():
with open(analysis_file, 'r', encoding='utf-8') as f:
analysis = json.load(f)
stats['total_calls'] += 1
if analysis.get('audio_type') == 'stereo':
stats['stereo_calls'] += 1
else:
stats['mono_calls'] += 1
if analysis.get('overall_quality_score'):
quality_scores.append(float(analysis['overall_quality_score']))
if analysis.get('audio_duration'):
durations.append(float(analysis['audio_duration']))
segments = analysis.get('segments', [])
stats['total_segments'] += len(segments)
for seg in segments:
if seg.get('voice_quality', {}).get('clarity_score'):
clarities.append(float(seg['voice_quality']['clarity_score']))
if seg.get('emotion', {}).get('confidence_score'):
confidences.append(float(seg['emotion']['confidence_score']))
if seg.get('emotion', {}).get('primary_emotion'):
emotions.append(seg['emotion']['primary_emotion'])
for profile in analysis.get('speaker_profiles', {}).values():
if profile.get('communication_style'):
styles.append(profile['communication_style'])
if quality_scores:
stats['avg_quality_score'] = round(sum(quality_scores) / len(quality_scores), 1)
if durations:
stats['avg_duration'] = round(sum(durations) / len(durations), 1)
if clarities:
stats['avg_clarity'] = round(sum(clarities) / len(clarities), 1)
if confidences:
stats['avg_confidence'] = round(sum(confidences) / len(confidences), 1)
for e in set(emotions):
stats['emotion_distribution'][e] = emotions.count(e)
for s in set(styles):
stats['communication_styles'][s] = styles.count(s)
return jsonify(stats)
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/upload', methods=['POST'])
def upload_file():
try:
if 'file' not in request.files:
return jsonify({'error': 'No file provided'}), 400
file = request.files['file']
if file.filename == '':
return jsonify({'error': 'No file selected'}), 400
if not allowed_file(file.filename):
return jsonify({'error': f'Invalid file type. Allowed: {", ".join(ALLOWED_EXTENSIONS)}'}), 400
job_id = str(uuid.uuid4())
filename = secure_filename(file.filename)
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
unique_filename = f"{timestamp}_{filename}"
audio_path = UPLOAD_FOLDER / unique_filename
file.save(str(audio_path))
output_dir = OUTPUT_FOLDER / audio_path.stem
output_dir.mkdir(exist_ok=True)
with job_lock:
processing_jobs[job_id] = {
'job_id': job_id,
'filename': filename,
'status': 'queued',
'stage': 'pending',
'created_at': datetime.now().isoformat(),
'audio_path': str(audio_path),
'output_dir': str(output_dir),
'is_stereo': None
}
thread = threading.Thread(
target=process_audio_file,
args=(job_id, audio_path, output_dir)
)
thread.daemon = True
thread.start()
return jsonify({
'job_id': job_id,
'filename': filename,
'status': 'queued',
'message': 'File uploaded. Processing started.'
})
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/jobs/<job_id>')
def get_job_status(job_id):
with job_lock:
if job_id not in processing_jobs:
return jsonify({'error': 'Job not found'}), 404
job = processing_jobs[job_id].copy()
return jsonify(job)
@app.route('/api/jobs')
def get_all_jobs():
with job_lock:
jobs = list(processing_jobs.values())
return jsonify(jobs)
@app.route('/health')
def health():
return jsonify({'status': 'healthy', 'service': 'ASR Audio Intelligence Platform', 'version': '2.0'})
if __name__ == '__main__':
OUTPUT_FOLDER.mkdir(exist_ok=True)
print("="*60)
print("ASR Audio Intelligence Platform")
print("="*60)
print(f"Output: {OUTPUT_FOLDER}")
print(f"Whisper: {WHISPER_MODEL}")
print(f"Server: http://localhost:7860")
print("="*60)
app.run(host='0.0.0.0', port=7860, debug=False, threaded=True)