|
|
import gradio as gr |
|
|
import numpy as np |
|
|
import torch |
|
|
import soundfile as sf |
|
|
import librosa |
|
|
from matplotlib import pyplot as plt |
|
|
from transformers import AutoFeatureExtractor, AutoModelForAudioFrameClassification |
|
|
from recitations_segmenter import segment_recitations, clean_speech_intervals |
|
|
import io |
|
|
from PIL import Image |
|
|
import tempfile |
|
|
import os |
|
|
import zipfile |
|
|
|
|
|
|
|
|
device = 'cuda' if torch.cuda.is_available() else 'cpu' |
|
|
dtype = torch.bfloat16 if torch.cuda.is_available() else torch.float32 |
|
|
|
|
|
print(f"Loading model on {device}...") |
|
|
processor = AutoFeatureExtractor.from_pretrained("obadx/recitation-segmenter-v2") |
|
|
model = AutoModelForAudioFrameClassification.from_pretrained( |
|
|
"obadx/recitation-segmenter-v2", |
|
|
torch_dtype=dtype, |
|
|
device_map=device |
|
|
) |
|
|
print("Model loaded successfully!") |
|
|
|
|
|
def read_audio(path, sampling_rate=16000): |
|
|
"""قراءة ملف صوتي وتحويله""" |
|
|
audio, sr = sf.read(path) |
|
|
if len(audio.shape) > 1: |
|
|
audio = audio.mean(axis=1) |
|
|
if sr != sampling_rate: |
|
|
audio = librosa.resample(audio, orig_sr=sr, target_sr=sampling_rate) |
|
|
return torch.tensor(audio).float() |
|
|
|
|
|
def get_interval(x: np.ndarray, intervals: list[list[int]], idx: int, sr=16000, delta=0.3, exact_boundries=False): |
|
|
"""استخراج مقطع صوتي من الفواصل""" |
|
|
start = int((intervals[idx][0] - delta) * sr) |
|
|
end = int(intervals[idx][1] * sr) |
|
|
if not exact_boundries: |
|
|
start = 0 if idx == 0 else int((intervals[idx][0] - delta) * sr) |
|
|
end = len(x) if idx == len(intervals) - 1 else int((intervals[idx + 1][0] - delta) * sr) |
|
|
return x[start: end] |
|
|
|
|
|
def plot_signal(x: np.ndarray, intervals: list[list[float]], log_min_count=5, sr=16000): |
|
|
"""رسم الإشارة الصوتية مع الفواصل""" |
|
|
fig, ax = plt.subplots(figsize=(20, 4)) |
|
|
if isinstance(x, torch.Tensor): |
|
|
x = x.numpy() |
|
|
ax.plot(x, linewidth=0.5) |
|
|
|
|
|
intervals_flat = np.array(intervals).reshape(-1) |
|
|
diffs = np.diff(intervals_flat) |
|
|
|
|
|
min_silence_diffs_idx = float('-inf') |
|
|
info_text = "" |
|
|
|
|
|
if len(intervals_flat) > 2: |
|
|
silence_diffs = diffs[1: len(diffs): 2] |
|
|
min_silence_diffs_ids = silence_diffs.argsort()[: log_min_count] |
|
|
min_silence_diffs_idx = min_silence_diffs_ids[0] * 2 + 1 |
|
|
|
|
|
info_text += f'Minimum Silence Interval IDs: {min_silence_diffs_ids}\n' |
|
|
info_text += f'Minimum Silence Intervals: {silence_diffs[min_silence_diffs_ids]}\n' |
|
|
|
|
|
speech_diffs = diffs[0: len(diffs): 2] |
|
|
min_speech_diffs_ids = speech_diffs.argsort()[: log_min_count] |
|
|
info_text += f'Minimum Speech Interval IDs: {min_speech_diffs_ids}\n' |
|
|
info_text += f'Minimum Speech Intervals: {speech_diffs[min_speech_diffs_ids]}\n' |
|
|
|
|
|
ymin = x.min() |
|
|
ymax = x.max() |
|
|
|
|
|
for idx, val in enumerate(intervals_flat): |
|
|
color = 'red' |
|
|
if idx in [min_silence_diffs_idx, min_silence_diffs_idx + 1]: |
|
|
color = 'green' |
|
|
ax.axvline(x=val * sr, ymin=0, ymax=1, color=color, alpha=0.6, linewidth=1) |
|
|
|
|
|
ax.set_xlabel('Samples') |
|
|
ax.set_ylabel('Amplitude') |
|
|
ax.set_title('Audio Signal with Detected Intervals') |
|
|
ax.grid(True, alpha=0.3) |
|
|
plt.tight_layout() |
|
|
|
|
|
buf = io.BytesIO() |
|
|
plt.savefig(buf, format='png', dpi=100, bbox_inches='tight') |
|
|
buf.seek(0) |
|
|
img = Image.open(buf) |
|
|
plt.close() |
|
|
|
|
|
return img, info_text |
|
|
|
|
|
def process_audio(audio_file, min_silence_ms, min_speech_ms, pad_ms): |
|
|
"""معالجة الملف الصوتي وتقطيعه""" |
|
|
|
|
|
if audio_file is None: |
|
|
return None, "⚠️ من فضلك ارفع ملف صوتي", None, [] |
|
|
|
|
|
try: |
|
|
|
|
|
wav = read_audio(audio_file) |
|
|
|
|
|
|
|
|
sampled_outputs = segment_recitations( |
|
|
[wav], |
|
|
model, |
|
|
processor, |
|
|
device=device, |
|
|
dtype=dtype, |
|
|
batch_size=4, |
|
|
) |
|
|
|
|
|
|
|
|
clean_out = clean_speech_intervals( |
|
|
sampled_outputs[0].speech_intervals, |
|
|
sampled_outputs[0].is_complete, |
|
|
min_silence_duration_ms=min_silence_ms, |
|
|
min_speech_duration_ms=min_speech_ms, |
|
|
pad_duration_ms=pad_ms, |
|
|
return_seconds=True, |
|
|
) |
|
|
|
|
|
intervals = clean_out.clean_speech_intervals |
|
|
|
|
|
|
|
|
plot_img, stats_text = plot_signal(wav, intervals) |
|
|
|
|
|
|
|
|
num_segments = len(intervals) |
|
|
|
|
|
result_text = f"✅ تم التقطيع بنجاح!\n\n" |
|
|
result_text += f"📊 عدد المقاطع: {num_segments}\n" |
|
|
result_text += f"⏱️ طول الملف الأصلي: {len(wav)/16000:.2f} ثانية\n\n" |
|
|
result_text += "=" * 50 + "\n" |
|
|
result_text += stats_text |
|
|
result_text += "=" * 50 + "\n\n" |
|
|
|
|
|
|
|
|
temp_dir = tempfile.mkdtemp() |
|
|
segment_files = [] |
|
|
|
|
|
for idx in range(num_segments): |
|
|
audio_seg = get_interval( |
|
|
x=wav, |
|
|
intervals=intervals, |
|
|
idx=idx, |
|
|
delta=0.050, |
|
|
exact_boundries=True |
|
|
) |
|
|
|
|
|
if isinstance(audio_seg, torch.Tensor): |
|
|
audio_seg = audio_seg.cpu().numpy() |
|
|
|
|
|
duration = len(audio_seg) / 16000 |
|
|
result_text += f"مقطع {idx + 1}: من {intervals[idx][0]:.2f}s إلى {intervals[idx][1]:.2f}s (المدة: {duration:.2f}s)\n" |
|
|
|
|
|
|
|
|
segment_path = os.path.join(temp_dir, f"segment_{idx+1:03d}.wav") |
|
|
sf.write(segment_path, audio_seg, 16000) |
|
|
segment_files.append(segment_path) |
|
|
|
|
|
|
|
|
zip_path = os.path.join(temp_dir, "segments.zip") |
|
|
with zipfile.ZipFile(zip_path, 'w') as zipf: |
|
|
for seg_file in segment_files: |
|
|
zipf.write(seg_file, os.path.basename(seg_file)) |
|
|
|
|
|
|
|
|
audio_html = "<div style='max-height: 500px; overflow-y: auto;'>" |
|
|
for idx, seg_file in enumerate(segment_files): |
|
|
audio_html += f""" |
|
|
<div style='margin: 10px 0; padding: 10px; border: 1px solid #ddd; border-radius: 5px;'> |
|
|
<h4 style='margin: 5px 0;'>🎵 مقطع {idx + 1}</h4> |
|
|
<audio controls style='width: 100%;'> |
|
|
<source src='file/{seg_file}' type='audio/wav'> |
|
|
</audio> |
|
|
</div> |
|
|
""" |
|
|
audio_html += "</div>" |
|
|
|
|
|
return plot_img, result_text, zip_path, segment_files |
|
|
|
|
|
except Exception as e: |
|
|
return None, f"❌ حدث خطأ: {str(e)}", None, [] |
|
|
|
|
|
|
|
|
with gr.Blocks(title="تقطيع التلاوات القرآنية") as demo: |
|
|
|
|
|
gr.Markdown(""" |
|
|
# 🕌 تقطيع التلاوات القرآنية |
|
|
|
|
|
أداة لتقطيع ملفات التلاوات القرآنية تلقائياً باستخدام AI |
|
|
|
|
|
**استخدم Model:** `obadx/recitation-segmenter-v2` |
|
|
""") |
|
|
|
|
|
with gr.Row(): |
|
|
with gr.Column(scale=1): |
|
|
audio_input = gr.Audio( |
|
|
label="📤 ارفع ملف التلاوة", |
|
|
type="filepath" |
|
|
) |
|
|
|
|
|
with gr.Accordion("⚙️ إعدادات التقطيع", open=True): |
|
|
min_silence = gr.Slider( |
|
|
minimum=10, |
|
|
maximum=500, |
|
|
value=30, |
|
|
step=10, |
|
|
label="أقل مدة للسكوت (ميلي ثانية)" |
|
|
) |
|
|
|
|
|
min_speech = gr.Slider( |
|
|
minimum=10, |
|
|
maximum=500, |
|
|
value=30, |
|
|
step=10, |
|
|
label="أقل مدة للكلام (ميلي ثانية)" |
|
|
) |
|
|
|
|
|
padding = gr.Slider( |
|
|
minimum=0, |
|
|
maximum=200, |
|
|
value=30, |
|
|
step=10, |
|
|
label="Padding (ميلي ثانية)" |
|
|
) |
|
|
|
|
|
process_btn = gr.Button("🚀 ابدأ التقطيع", variant="primary", size="lg") |
|
|
|
|
|
with gr.Column(scale=2): |
|
|
plot_output = gr.Image(label="📈 الإشارة الصوتية") |
|
|
result_text = gr.Textbox( |
|
|
label="📋 النتائج", |
|
|
lines=15, |
|
|
max_lines=20 |
|
|
) |
|
|
|
|
|
gr.Markdown("### 💾 تحميل المقاطع") |
|
|
|
|
|
zip_download = gr.File(label="📦 حمل كل المقاطع (ZIP)") |
|
|
|
|
|
gr.Markdown("### 🎵 استماع للمقاطع") |
|
|
|
|
|
|
|
|
segment_outputs = [] |
|
|
for i in range(50): |
|
|
audio_out = gr.Audio(label=f"مقطع {i+1}", visible=False) |
|
|
segment_outputs.append(audio_out) |
|
|
|
|
|
def process_and_show(audio, min_sil, min_sp, pad): |
|
|
plot, text, zip_file, segments = process_audio(audio, min_sil, min_sp, pad) |
|
|
|
|
|
outputs = [plot, text, zip_file] |
|
|
|
|
|
|
|
|
for i in range(50): |
|
|
if i < len(segments): |
|
|
outputs.append(gr.Audio(value=segments[i], visible=True, label=f"مقطع {i+1}")) |
|
|
else: |
|
|
outputs.append(gr.Audio(visible=False)) |
|
|
|
|
|
return outputs |
|
|
|
|
|
process_btn.click( |
|
|
fn=process_and_show, |
|
|
inputs=[audio_input, min_silence, min_speech, padding], |
|
|
outputs=[plot_output, result_text, zip_download] + segment_outputs |
|
|
) |
|
|
|
|
|
gr.Markdown(""" |
|
|
--- |
|
|
### 💡 معلومات |
|
|
|
|
|
- الأداة تستخدم نموذج AI مدرب خصيصاً لتقطيع التلاوات القرآنية |
|
|
- يتم اكتشاف فترات الكلام والسكوت تلقائياً |
|
|
- يمكنك تحميل كل المقاطع دفعة واحدة من ملف ZIP |
|
|
- أو الاستماع لكل مقطع على حدة |
|
|
""") |
|
|
|
|
|
if __name__ == "__main__": |
|
|
demo.launch() |
|
|
|