|
|
import gradio as gr |
|
|
import numpy as np |
|
|
import torch |
|
|
import soundfile as sf |
|
|
import librosa |
|
|
from transformers import AutoFeatureExtractor, AutoModelForAudioFrameClassification |
|
|
from recitations_segmenter import segment_recitations, clean_speech_intervals |
|
|
import io |
|
|
from PIL import Image |
|
|
import tempfile |
|
|
import os |
|
|
import zipfile |
|
|
|
|
|
|
|
|
from gradio_client import Client, handle_file |
|
|
|
|
|
|
|
|
from arabic_aligner import ArabicAligner |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
device = 'cuda' if torch.cuda.is_available() else 'cpu' |
|
|
dtype = torch.bfloat16 if torch.cuda.is_available() else torch.float32 |
|
|
|
|
|
print(f"Loading model on {device}...") |
|
|
processor = AutoFeatureExtractor.from_pretrained("obadx/recitation-segmenter-v2") |
|
|
model = AutoModelForAudioFrameClassification.from_pretrained( |
|
|
"obadx/recitation-segmenter-v2", |
|
|
torch_dtype=dtype, |
|
|
device_map=device |
|
|
) |
|
|
print("Model loaded successfully!") |
|
|
|
|
|
|
|
|
asr_client = Client("aboalaa1472/Quran_ASR") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def read_audio(path, sampling_rate=16000): |
|
|
audio, sr = sf.read(path) |
|
|
if len(audio.shape) > 1: |
|
|
audio = audio.mean(axis=1) |
|
|
if sr != sampling_rate: |
|
|
audio = librosa.resample(audio, orig_sr=sr, target_sr=sampling_rate) |
|
|
return torch.tensor(audio).float() |
|
|
|
|
|
def get_interval(x, intervals, idx, sr=16000): |
|
|
start = int(intervals[idx][0] * sr) |
|
|
end = int(intervals[idx][1] * sr) |
|
|
return x[start:end] |
|
|
|
|
|
def plot_signal(x, intervals, sr=16000): |
|
|
import matplotlib.pyplot as plt |
|
|
fig, ax = plt.subplots(figsize=(20, 4)) |
|
|
if isinstance(x, torch.Tensor): |
|
|
x = x.numpy() |
|
|
ax.plot(x, linewidth=0.5) |
|
|
for s, e in intervals: |
|
|
ax.axvline(x=s * sr, color='red', alpha=0.4) |
|
|
ax.axvline(x=e * sr, color='red', alpha=0.4) |
|
|
plt.tight_layout() |
|
|
buf = io.BytesIO() |
|
|
plt.savefig(buf, format="png") |
|
|
buf.seek(0) |
|
|
img = Image.open(buf) |
|
|
plt.close() |
|
|
return img |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def process_audio_and_compare(audio_file, reference_text, min_silence_ms, min_speech_ms, pad_ms): |
|
|
if audio_file is None: |
|
|
return None, "⚠️ ارفع ملف صوتي أولاً", None |
|
|
|
|
|
try: |
|
|
wav = read_audio(audio_file) |
|
|
|
|
|
sampled_outputs = segment_recitations( |
|
|
[wav], |
|
|
model, |
|
|
processor, |
|
|
device=device, |
|
|
dtype=dtype, |
|
|
batch_size=4, |
|
|
) |
|
|
|
|
|
clean_out = clean_speech_intervals( |
|
|
sampled_outputs[0].speech_intervals, |
|
|
sampled_outputs[0].is_complete, |
|
|
min_silence_duration_ms=min_silence_ms, |
|
|
min_speech_duration_ms=min_speech_ms, |
|
|
pad_duration_ms=pad_ms, |
|
|
return_seconds=True, |
|
|
) |
|
|
|
|
|
intervals = clean_out.clean_speech_intervals |
|
|
plot_img = plot_signal(wav, intervals) |
|
|
|
|
|
temp_dir = tempfile.mkdtemp() |
|
|
segment_files = [] |
|
|
full_asr_text = [] |
|
|
|
|
|
result_text = f"✅ عدد المقاطع: {len(intervals)}\n\n" |
|
|
|
|
|
for i in range(len(intervals)): |
|
|
seg = get_interval(wav, intervals, i) |
|
|
if isinstance(seg, torch.Tensor): |
|
|
seg = seg.cpu().numpy() |
|
|
|
|
|
seg_path = os.path.join(temp_dir, f"segment_{i+1:03d}.wav") |
|
|
sf.write(seg_path, seg, 16000) |
|
|
segment_files.append(seg_path) |
|
|
|
|
|
|
|
|
asr_text = asr_client.predict( |
|
|
uploaded_audio=handle_file(seg_path), |
|
|
mic_audio=handle_file(seg_path), |
|
|
api_name="/run" |
|
|
) |
|
|
full_asr_text.append(asr_text) |
|
|
result_text += f"🎵 مقطع {i+1} ({intervals[i][0]:.2f}s → {intervals[i][1]:.2f}s)\n📜 {asr_text}\n\n" |
|
|
|
|
|
full_asr_text_str = " ".join(full_asr_text) |
|
|
result_text += f"\n🧾 النص الكامل:\n{full_asr_text_str}\n\n" |
|
|
|
|
|
|
|
|
aligner = ArabicAligner() |
|
|
align_results = aligner.align_and_compare(full_asr_text_str, reference_text) |
|
|
|
|
|
stats = align_results['statistics'] |
|
|
result_text += ( |
|
|
f"📊 إحصائيات المقارنة:\n" |
|
|
f"- إجمالي كلمات المرجع: {stats['total_reference_words']}\n" |
|
|
f"- إجمالي كلمات ASR: {stats['total_user_words']}\n" |
|
|
f"- إجمالي الأخطاء: {stats['total_errors']}\n" |
|
|
f" - أخطاء الكلمات: {stats['word_level_errors']}\n" |
|
|
f" - أخطاء الحركات: {stats['diacritic_errors']}\n" |
|
|
f"- الدقة: {stats['accuracy']:.2f}%\n\n" |
|
|
f"✏️ تفاصيل الأخطاء:\n" |
|
|
) |
|
|
|
|
|
for i, error in enumerate(align_results['errors'], 1): |
|
|
result_text += f"[{i}] Type: {error.error_type.value.upper()} | User: '{error.user_word}' | Expected: '{error.reference_word}' | Details: {error.details}\n" |
|
|
|
|
|
|
|
|
zip_path = os.path.join(temp_dir, "segments.zip") |
|
|
with zipfile.ZipFile(zip_path, 'w') as zipf: |
|
|
for f in segment_files: |
|
|
zipf.write(f, os.path.basename(f)) |
|
|
|
|
|
return plot_img, result_text, zip_path |
|
|
|
|
|
except Exception as e: |
|
|
return None, f"❌ خطأ: {str(e)}", None |
|
|
|
|
|
|
|
|
|
|
|
with gr.Blocks(title="Quran Segmentation + ASR + Comparison") as demo: |
|
|
gr.Markdown("## 🕌 تقطيع التلاوات + التعرف على النص القرآني + المقارنة بالنص المشكول") |
|
|
|
|
|
with gr.Row(): |
|
|
with gr.Column(): |
|
|
audio_input = gr.Audio(type="filepath", label="📤 ارفع التلاوة") |
|
|
reference_text_input = gr.Textbox(label="📖 أدخل نص القرآن المشكول للمقارنة", lines=10) |
|
|
min_silence = gr.Slider(10, 500, 30, step=10, label="Min Silence (ms)") |
|
|
min_speech = gr.Slider(10, 500, 30, step=10, label="Min Speech (ms)") |
|
|
padding = gr.Slider(0, 200, 30, step=10, label="Padding (ms)") |
|
|
btn = gr.Button("🚀 ابدأ") |
|
|
|
|
|
with gr.Column(): |
|
|
plot_out = gr.Image(label="📈 الإشارة") |
|
|
text_out = gr.Textbox(lines=30, label="📜 النتائج") |
|
|
|
|
|
zip_out = gr.File(label="📦 تحميل المقاطع") |
|
|
|
|
|
btn.click( |
|
|
fn=process_audio_and_compare, |
|
|
inputs=[audio_input, reference_text_input, min_silence, min_speech, padding], |
|
|
outputs=[plot_out, text_out, zip_out] |
|
|
) |
|
|
|
|
|
if __name__ == "__main__": |
|
|
demo.launch() |
|
|
|