binaryMao commited on
Commit
857e7cb
·
verified ·
1 Parent(s): 6ec5f30

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +126 -96
app.py CHANGED
@@ -1,8 +1,13 @@
1
  # -*- coding: utf-8 -*-
2
  """
3
- ROBOTSMALI VIDEO CAPTIONING V16 ULTRA STABLE ✅
4
- Aucun ImageMagick Rendu sous-titres via PIL
5
- Compatibilité NeMo multi-versions pour tous les modèles Mali (RNNT + CTC)
 
 
 
 
 
6
  """
7
 
8
  import os, tempfile
@@ -14,6 +19,7 @@ from PIL import Image, ImageDraw, ImageFont
14
  import gradio as gr
15
  from huggingface_hub import snapshot_download
16
  from moviepy.editor import VideoFileClip, CompositeVideoClip, ImageClip
 
17
  from nemo.collections import asr as nemo_asr
18
  from ctc_segmentation import ctc_segmentation, CtcSegmentationParameters, prepare_text
19
 
@@ -36,121 +42,145 @@ def load_model(name):
36
  return _model_cache[name]
37
  repo, mode = MODELS[name]
38
  path = snapshot_download(repo, local_dir_use_symlinks=False)
39
- nemo_file = [os.path.join(path,f) for f in os.listdir(path) if f.endswith(".nemo")][0]
40
- model = nemo_asr.models.EncDecCTCModelBPE.restore_from(nemo_file) if mode=="ctc" else nemo_asr.models.EncDecHybridRNNTCTCBPEModel.restore_from(nemo_file)
 
41
  model.to(DEVICE).eval()
42
  _model_cache[name] = model
43
  return model
44
 
45
- def get_vocab(model):
46
- key = id(model)
47
- if key in _vocab_cache: return _vocab_cache[key]
48
- vocab = model.tokenizer.vocab if hasattr(model.tokenizer,"vocab") else model.decoder.vocabulary
49
- vocab = list(vocab.keys()) if isinstance(vocab,dict) else list(vocab)
50
- _vocab_cache[key] = vocab
51
- return vocab
52
-
53
- # **TRANSCRIPTION UNIFIÉE** (Corrige Hypothesis / tuple / dict)
54
- def transcribe_text(model, wav):
55
- out = model.transcribe([wav])[0]
56
-
57
- if hasattr(out, "text"):
58
- return out.text.strip()
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
59
 
60
- if isinstance(out, dict) and "text" in out:
61
- return out["text"].strip()
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
62
 
63
- if isinstance(out, (list, tuple)) and len(out) > 0 and isinstance(out[0], str):
64
- return out[0].strip()
 
 
 
 
65
 
66
- return str(out).strip()
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
67
 
68
- def draw_subtitle(text, w, h):
69
- bg = Image.new("RGBA", (w, int(h*0.12)), (0,0,0,180))
70
- draw = ImageDraw.Draw(bg)
71
- try: font = ImageFont.truetype("/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf", size=h//18)
72
- except: font = ImageFont.load_default()
73
- tw, th = draw.textsize(text, font=font)
74
- draw.text(((w-tw)//2,(h*0.12-th)//2), text, fill="white", font=font)
 
 
75
  return bg
76
 
77
- def render_subtitles(video, subs):
78
- out = "RobotsMali_Subtitled.mp4"
79
- base = VideoFileClip(video)
80
- W,H=base.size
81
- layers=[ImageClip(np.array(draw_subtitle(t.upper(),W,H))).set_start(s).set_duration(e-s).set_pos(("center","bottom")) for s,e,t in subs]
82
- CompositeVideoClip([base]+layers).write_videofile(out,codec="libx264",audio_codec="aac",fps=base.fps)
83
- base.close()
 
 
 
84
  return out
85
 
86
- def extract_audio(v,w): VideoFileClip(v).audio.write_audiofile(w,fps=16000,codec="pcm_s16le",ffmpeg_params=["-ac","1"],logger=None)
87
-
88
- def clean_audio(w):
89
- a,sr=sf.read(w)
90
- if a.ndim==2:a=a.mean(1)
91
- a,_=librosa.effects.trim(a,top_db=30)
92
- thr=np.percentile(np.abs(a),5)
93
- a=np.where(np.abs(a)<thr,0,a)
94
- o=w.replace(".wav","_clean.wav"); sf.write(o,a,sr)
95
- return o,a,sr
96
-
97
- MAX_WORDS=4; MAX_CHARS=45; MAX_DURATION=3.5
98
-
99
- def group(sp):
100
- subs=[];buf=[]
101
- def push(b): subs.append((b[0][0],b[-1][1]," ".join(x[2] for x in b)))
102
- for w in sp:
103
- test=buf+[w]; txt=" ".join(x[2] for x in test); dur=test[-1][1]-test[0][0]
104
- if len(test)>MAX_WORDS or len(txt)>MAX_CHARS or dur>MAX_DURATION: push(buf); buf=[w]
105
- else: buf=test
106
- push(buf); return subs
107
-
108
- # ✅ **UNIVERSAL ALIGNMENT SAFE (plus jamais unpack error)**
109
- def align_ctc(model,audio,sr,text):
110
- words=text.split()
111
- x=torch.tensor(audio).float().unsqueeze(0).to(DEVICE)
112
- ln=torch.tensor([x.shape[1]]).to(DEVICE)
113
- total=len(audio)/sr
114
- with torch.no_grad(): logits,loglen=model(input_signal=x,input_signal_length=ln)
115
- vocab=set(get_vocab(model))
116
- words=[w for w in words if all(c in vocab for c in w)]
117
- if not words:return []
118
- cfg=CtcSegmentationParameters(); cfg.char_list=list(vocab)
119
- gt = prepare_text(cfg, words)[0] # ✅ always safe
120
- timing,_,_=ctc_segmentation(cfg,logits.cpu().numpy()[0],gt)
121
- tps=total/loglen.cpu().numpy()[0]
122
- return group([(timing[i]*tps,(timing[i+1]*tps if i+1<len(timing) else total),words[i]) for i in range(len(words))])
123
-
124
- def rnnt_vad(text,audio,sr):
125
- it=librosa.effects.split(audio,top_db=25)
126
- w=text.split()
127
- if len(it)==0: total=len(audio)/sr; return [(0,total,text)]
128
- spans=[]
129
- for s,e in it:
130
- seg=w[:MAX_WORDS] if len(w)>=MAX_WORDS else w; w=w[len(seg):]
131
- if not seg:break
132
- spans.append((s/sr,e/sr," ".join(seg)))
133
- return group(spans)
134
-
135
  def pipeline(video, model_name):
136
  try:
137
  tmp=os.path.join(tempfile.gettempdir(),"audio.wav")
138
  extract_audio(video,tmp)
139
  clean,audio,sr=clean_audio(tmp)
140
  model=load_model(model_name)
141
- text=transcribe_text(model,clean)
142
  mode=MODELS[model_name][1]
143
  subs = align_ctc(model,audio,sr,text) if mode=="ctc" else rnnt_vad(text,audio,sr)
144
- if not subs:return "⚠️ Aucun mot utilisable.",None
145
- out=render_subtitles(video,subs)
146
- return "✅ Terminé !",out
147
  except Exception as e:
148
  return f"❌ ERREUR : {e}",None
149
 
150
- with gr.Blocks(title="RobotsMali V16") as demo:
151
- gr.Markdown("# ⚡ RobotsMali V16 — Sous-titrage Bambara (Style Netflix, Sans ImageMagick)")
152
- v=gr.Video(); m=gr.Dropdown(list(MODELS.keys()),value="Soloba V1 (CTC)")
153
- b=gr.Button("▶️ Générer"); s=gr.Markdown(); o=gr.Video()
154
- b.click(pipeline,[v,m],[s,o])
 
 
155
 
156
  demo.launch(share=True)
 
1
  # -*- coding: utf-8 -*-
2
  """
3
+ ROBOTSMALI VIDEO CAPTIONING — V21 (Stable)
4
+ - Alignement parfait pour Soloba (CTC)
5
+ - Découpage fluide pour Soloni (RNNT)
6
+ - QuartzNet supporté sans crash
7
+ - Filtrage Bambara phonétique (retire français)
8
+ - Sous-titres style Netflix
9
+ - Durée vidéo exacte (plus d'allongement)
10
+ - Compatible Google Colab + Kali + Linux
11
  """
12
 
13
  import os, tempfile
 
19
  import gradio as gr
20
  from huggingface_hub import snapshot_download
21
  from moviepy.editor import VideoFileClip, CompositeVideoClip, ImageClip
22
+
23
  from nemo.collections import asr as nemo_asr
24
  from ctc_segmentation import ctc_segmentation, CtcSegmentationParameters, prepare_text
25
 
 
42
  return _model_cache[name]
43
  repo, mode = MODELS[name]
44
  path = snapshot_download(repo, local_dir_use_symlinks=False)
45
+ nemo = [os.path.join(path, f) for f in os.listdir(path) if f.endswith(".nemo")][0]
46
+ model = nemo_asr.models.EncDecCTCModelBPE.restore_from(nemo) if mode=="ctc" \
47
+ else nemo_asr.models.EncDecHybridRNNTCTCBPEModel.restore_from(nemo)
48
  model.to(DEVICE).eval()
49
  _model_cache[name] = model
50
  return model
51
 
52
+ def extract_audio(video, wav):
53
+ VideoFileClip(video).audio.write_audiofile(
54
+ wav, fps=16000, codec="pcm_s16le", ffmpeg_params=["-ac","1"], logger=None
55
+ )
56
+
57
+ def clean_audio(wav):
58
+ audio, sr = sf.read(wav)
59
+ if audio.ndim == 2: audio = audio.mean(1)
60
+ audio,_ = librosa.effects.trim(audio, top_db=35)
61
+ out = wav.replace(".wav","_clean.wav")
62
+ sf.write(out, audio, sr)
63
+ return out, audio, sr
64
+
65
+ def transcribe(model, wav):
66
+ o = model.transcribe([wav])[0]
67
+ return o.text.strip() if hasattr(o,"text") else str(o).strip()
68
+
69
+ # ---------- FILTRAGE BAMBARA ---------- #
70
+ def keep_bambara_words(words):
71
+ filtered=[]
72
+ for w in words:
73
+ w2=w.lower()
74
+ if any(ch in w2 for ch in ["ɛ","ɔ","ŋ"]) or sum(c in "aeiou" for c in w2)>=2:
75
+ filtered.append(w)
76
+ return filtered
77
+
78
+ MAX_WORDS=4; MAX_CHARS=45; MAX_DURATION=3.4
79
+
80
+ def group(spans):
81
+ subs=[]; buf=[]
82
+ def push(b):
83
+ if b: subs.append((b[0][0], b[-1][1], " ".join(x[2] for x in b)))
84
+ for w in spans:
85
+ test=buf+[w]; txt=" ".join(x[2] for x in test)
86
+ dur=test[-1][1]-test[0][0]
87
+ if len(test)>MAX_WORDS or len(txt)>MAX_CHARS or dur>MAX_DURATION:
88
+ push(buf); buf=[w]
89
+ else:
90
+ buf=test
91
+ push(buf); return subs
92
 
93
+ # ---------- ALIGNEMENT CTC (Soloba + QuartzNet) ---------- #
94
+ def align_ctc(model, audio, sr, text):
95
+ words = keep_bambara_words(text.split())
96
+ if not words: return []
97
+ x = torch.tensor(audio).float().unsqueeze(0).to(DEVICE)
98
+ ln = torch.tensor([x.shape[1]]).to(DEVICE)
99
+ total = len(audio)/sr
100
+ with torch.no_grad():
101
+ logits, _ = model(input_signal=x, input_signal_length=ln)
102
+ frames = logits.shape[1]
103
+ if frames <= 2: return []
104
+ vocab = list(model.tokenizer.vocab.keys())
105
+ cfg = CtcSegmentationParameters(); cfg.char_list=vocab
106
+
107
+ out = prepare_text(cfg, words)
108
+ gt = out[0] if isinstance(out, (list,tuple)) else out
109
+
110
+ timing, _, _ = ctc_segmentation(cfg, logits.cpu().numpy()[0], gt)
111
+ tps = total / float(frames)
112
 
113
+ spans=[]
114
+ for i in range(len(words)):
115
+ st=float(timing[i])*tps
116
+ en=float(timing[i+1])*tps if i+1<len(timing) else total
117
+ spans.append((st,en,words[i]))
118
+ return group(spans)
119
 
120
+ # ---------- ALIGNEMENT RNNT (Soloni) ---------- #
121
+ def rnnt_vad(text, audio, sr):
122
+ intervals = librosa.effects.split(audio, top_db=28)
123
+ words = keep_bambara_words(text.split())
124
+ if not intervals or not words:
125
+ return [(0,len(audio)/sr,text)]
126
+ spans=[]; idx=0
127
+ total_audio=sum(e-s for s,e in intervals)
128
+ for s,e in intervals:
129
+ seg_d=(e-s)/sr
130
+ k=max(1,int(len(words)*((e-s)/total_audio)))
131
+ chunk=words[idx:idx+k]; idx+=k
132
+ if not chunk: continue
133
+ parts=[chunk[i:i+MAX_WORDS] for i in range(0,len(chunk),MAX_WORDS)]
134
+ step=seg_d/len(parts); base=s/sr
135
+ for j,p in enumerate(parts):
136
+ st=base+j*step; en=base+(j+1)*step
137
+ spans.append((st,en," ".join(p)))
138
+ return group(spans)
139
 
140
+ # ---------- RENDER SUBTITLES ---------- #
141
+ def draw_sub(text,W,H):
142
+ bg=Image.new("RGBA",(W,int(H*0.12)),(0,0,0,180))
143
+ d=ImageDraw.Draw(bg)
144
+ try: font=ImageFont.truetype("/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf",size=max(18,H//18))
145
+ except: font=ImageFont.load_default()
146
+ box=d.textbbox((0,0),text,font)
147
+ tw=box[2]-box[0]; th=box[3]-box[1]
148
+ d.text(((W-tw)//2,(H*0.12-th)//2),text,font=font,fill="white")
149
  return bg
150
 
151
+ def burn(video,subs):
152
+ out="RobotsMali_Subtitled.mp4"
153
+ base=VideoFileClip(video); W,H=base.size; dur=base.duration
154
+ layers=[]
155
+ for s,e,t in subs:
156
+ s=max(0,min(s,dur)); e=max(0,min(e,dur))
157
+ if e<=s: continue
158
+ img=draw_sub(t.upper(),W,H)
159
+ layers.append(ImageClip(np.array(img)).set_start(s).set_duration(e-s).set_pos(("center","bottom")))
160
+ CompositeVideoClip([base]+layers).set_duration(dur).write_videofile(out,codec="libx264",audio_codec="aac",fps=base.fps)
161
  return out
162
 
163
+ # ---------- PIPELINE ---------- #
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
164
  def pipeline(video, model_name):
165
  try:
166
  tmp=os.path.join(tempfile.gettempdir(),"audio.wav")
167
  extract_audio(video,tmp)
168
  clean,audio,sr=clean_audio(tmp)
169
  model=load_model(model_name)
170
+ text=transcribe(model,clean)
171
  mode=MODELS[model_name][1]
172
  subs = align_ctc(model,audio,sr,text) if mode=="ctc" else rnnt_vad(text,audio,sr)
173
+ if not subs: return "⚠️ Aucun sous-titre utilisable.",None
174
+ return "✅ Terminé !", burn(video,subs)
 
175
  except Exception as e:
176
  return f"❌ ERREUR : {e}",None
177
 
178
+ with gr.Blocks(title="RobotsMali V21 — Bambara Aligné") as demo:
179
+ gr.Markdown("# ⚡ RobotsMali V21 — Sous-titrage Bambara Stable")
180
+ video=gr.Video()
181
+ model=gr.Dropdown(list(MODELS.keys()),value="Soloba V1 (CTC)")
182
+ run=gr.Button("▶️ Générer")
183
+ status=gr.Markdown(); out=gr.Video()
184
+ run.click(pipeline,[video,model],[status,out])
185
 
186
  demo.launch(share=True)