Spaces:
Sleeping
Sleeping
| import ffmpeg | |
| import os | |
| import asyncio | |
| import edge_tts | |
| import uuid | |
| import time | |
| import math | |
| from google import genai | |
| from google.genai import types | |
| from groq import Groq | |
| # --- 1. GEMINI MANAGER (DYNAMIC FETCH & FLASH ONLY) --- | |
| class GeminiManager: | |
| def __init__(self): | |
| self.keys = [] | |
| for i in range(1, 6): | |
| k = os.getenv(f'GEMINI_API_KEY_{i}') | |
| if k: self.keys.append(k) | |
| if not self.keys: | |
| k = os.getenv('GEMINI_API_KEY') | |
| if k: self.keys.append(k) | |
| self.key_models_cache = {} | |
| self.key_status = {} | |
| for k in self.keys: | |
| self.key_status[k] = {"usage_count": 0, "window_start": time.time(), "cooldown_until": 0} | |
| self.RPM_LIMIT = 12 | |
| def get_healthy_key(self): | |
| current_time = time.time() | |
| for key in self.keys: | |
| stats = self.key_status[key] | |
| if current_time < stats["cooldown_until"]: continue | |
| if current_time - stats["window_start"] > 60: | |
| stats["usage_count"] = 0; stats["window_start"] = current_time | |
| if stats["usage_count"] < self.RPM_LIMIT: | |
| stats["usage_count"] += 1 | |
| return key | |
| time.sleep(1) | |
| return self.get_healthy_key() | |
| def mark_key_error(self, key): | |
| print(f"β οΈ Key Error on ...{key[-4:]}. Cooling down.") | |
| self.key_status[key]["cooldown_until"] = time.time() + 60 | |
| def fetch_flash_models(self, client, key): | |
| if key in self.key_models_cache: return self.key_models_cache[key] | |
| try: | |
| flash_list = [] | |
| for m in client.models.list(): | |
| if 'flash' in m.name.lower(): | |
| flash_list.append(m.name.replace('models/', '')) | |
| flash_list.sort(reverse=True) | |
| if not flash_list: flash_list = ["gemini-1.5-flash"] | |
| self.key_models_cache[key] = flash_list | |
| return flash_list | |
| except: return ["gemini-1.5-flash"] | |
| def translate_text(self, text): | |
| max_attempts = 3 | |
| attempts = 0 | |
| last_error = "" | |
| while attempts < max_attempts: | |
| key = self.get_healthy_key() | |
| if not key: return "System Busy" | |
| try: | |
| client = genai.Client(api_key=key, http_options={'api_version': 'v1beta'}) | |
| available_models = self.fetch_flash_models(client, key) | |
| for model_name in available_models: | |
| try: | |
| response = client.models.generate_content( | |
| model=model_name, | |
| contents=text, | |
| config=types.GenerateContentConfig( | |
| temperature=0.3, | |
| max_output_tokens=8192, | |
| system_instruction=( | |
| "You are a Translator. Convert the input English text into 'Natural Spoken Burmese' (α‘ααΌα±α¬α αα¬αΈ). " | |
| "Output ONLY the Burmese translation. No Markdown." | |
| ) | |
| ) | |
| ) | |
| return response.text.strip() | |
| except Exception as e: | |
| err = str(e).lower() | |
| if "429" in err or "403" in err or "resource exhausted" in err: | |
| self.mark_key_error(key) | |
| break | |
| continue | |
| except Exception as e: | |
| last_error = str(e) | |
| attempts += 1 | |
| return f"Translation Failed: {last_error}" | |
| gemini_manager = GeminiManager() | |
| # --- 2. GROQ LOGIC --- | |
| def transcribe_audio_groq(audio_path): | |
| try: | |
| k = os.getenv('GROQ_API_KEY_1') or os.getenv('GROQ_API_KEY') | |
| if not k: return None | |
| client = Groq(api_key=k) | |
| with open(audio_path, "rb") as file: | |
| transcription = client.audio.transcriptions.create( | |
| file=(os.path.basename(audio_path), file.read()), | |
| model="whisper-large-v3-turbo", | |
| response_format="text" | |
| ) | |
| return str(transcription).strip() | |
| except Exception as e: | |
| print(f"β Groq Transcription Error: {e}") | |
| return None | |
| # --- 3. MAIN ANALYSIS --- | |
| def analyze_script_with_ai(video_path): | |
| unique_id = str(uuid.uuid4())[:8] | |
| audio_path = f"temp_{unique_id}.mp3" | |
| try: | |
| if os.path.exists(audio_path): os.remove(audio_path) | |
| ( | |
| ffmpeg.input(video_path) | |
| .output(audio_path, format='mp3', acodec='libmp3lame', ab='64k') | |
| .run(quiet=True, overwrite_output=True) | |
| ) | |
| print("π Step 1: Transcribing with Groq...") | |
| english_text = transcribe_audio_groq(audio_path) | |
| if not english_text: | |
| if os.path.exists(audio_path): os.remove(audio_path) | |
| return "Transcription Failed (Check Groq Key)" | |
| print(f"π§ Step 2: Translating {len(english_text)} chars with Gemini...") | |
| burmese_text = gemini_manager.translate_text(english_text) | |
| if os.path.exists(audio_path): os.remove(audio_path) | |
| return burmese_text | |
| except Exception as e: | |
| if os.path.exists(audio_path): os.remove(audio_path) | |
| return f"Error: {str(e)}" | |
| # --- 4. VIDEO PROCESSING (ULTRA SAFE AUTO SUBTITLES) --- | |
| async def generate_voice(text, output_file, voice, srt_file=None): | |
| communicate = edge_tts.Communicate(text, voice) | |
| if srt_file: | |
| try: | |
| submaker = edge_tts.SubMaker() | |
| with open(output_file, "wb") as file: | |
| async for chunk in communicate.stream(): | |
| if chunk["type"] == "audio": | |
| file.write(chunk["data"]) | |
| elif chunk["type"] == "WordBoundary": | |
| submaker.create_sub((chunk["offset"], chunk["duration"]), chunk["text"]) | |
| # Write SRT file | |
| subs = submaker.generate_subs() | |
| # α‘αααΊα AI α α‘ααΆαα½ααΊαα±ααα·αΊ α α¬αααΊαΈααα½ααΊαα¬αα²α·αααΊ Error ααααΊα‘α±α¬ααΊ Dummy α α¬αααΊαΈααα·αΊαααΊ | |
| if not subs.strip(): | |
| subs = "1\n00:00:00,000 --> 00:00:02,000\n[Audio]\n" | |
| with open(srt_file, "w", encoding="utf-8") as file: | |
| file.write(subs) | |
| except Exception as e: | |
| print(f"β οΈ Auto Subtitle Generation Error: {e}") | |
| await communicate.save(output_file) # Error αααΊαα²α·αααΊ ααα―αΈααα―αΈα‘ααΆαα² ααααΊαΈαααΊ | |
| else: | |
| await communicate.save(output_file) | |
| def create_ai_audio(text, output_path, gender='male', srt_path=None): | |
| voice = "my-MM-ThihaNeural" if gender == 'male' else "my-MM-NilarNeural" | |
| try: | |
| asyncio.run(generate_voice(text, output_path, voice, srt_path)) | |
| return True | |
| except Exception as e: | |
| print(f"β οΈ TTS Error: {e}") | |
| return False | |
| def process_video_edit(input_path, output_path, options): | |
| try: | |
| probe = ffmpeg.probe(input_path) | |
| vid_info = next(s for s in probe['streams'] if s['codec_type'] == 'video') | |
| width = int(vid_info['width']) | |
| height = int(vid_info['height']) | |
| duration = float(probe['format']['duration']) | |
| input_stream = ffmpeg.input(input_path) | |
| v = input_stream.video | |
| a = input_stream.audio | |
| # --- A. AUDIO SYNC FIRST --- | |
| if options.get('ai_audio_path') and os.path.exists(options['ai_audio_path']): | |
| ai_a = ffmpeg.input(options['ai_audio_path']).audio | |
| ai_probe = ffmpeg.probe(options['ai_audio_path']) | |
| ai_duration = float(ai_probe['format']['duration']) | |
| if duration > 0 and ai_duration > 0: | |
| tempo = ai_duration / duration | |
| if tempo < 0.5: tempo = 0.5 | |
| if tempo > 2.0: tempo = 2.0 | |
| a = ai_a.filter('atempo', tempo).filter('atrim', duration=duration) | |
| else: | |
| a = ai_a | |
| # --- B. SMART MONEZLATION LOOP --- | |
| if options.get('monezlation') and duration < 70: | |
| target_duration = 70.0 | |
| total_plays = math.ceil(target_duration / duration) | |
| loop_count = total_plays - 1 | |
| if loop_count > 0: | |
| v = v.filter('loop', loop=loop_count, size=32767) | |
| a = a.filter('aloop', loop=loop_count, size=2147483647) | |
| duration = duration * total_plays | |
| # Filters | |
| if options.get('bypass_flip'): v = v.hflip() | |
| if options.get('bypass_speed'): | |
| v = v.filter('setpts', 'PTS/1.05') | |
| a = a.filter('atempo', '1.05') | |
| if options.get('bypass_zoom'): | |
| crop_w = int(width * 0.95) | |
| crop_h = int(height * 0.95) | |
| v = v.filter('crop', crop_w, crop_h) | |
| v = v.filter('scale', width, height) | |
| if options.get('bypass_color'): | |
| v = v.filter('eq', contrast=1.1, brightness=0.05, saturation=1.2) | |
| # Coordinate Fix for Blur | |
| if options.get('blur_enabled'): | |
| bx = int(options.get('blur_x', 0)) | |
| by = int(options.get('blur_y', 0)) | |
| bw = int(options.get('blur_w', 0)) | |
| bh = int(options.get('blur_h', 0)) | |
| if bw > 0 and bh > 0: | |
| v = v.filter('delogo', x=bx, y=by, w=bw, h=bh) | |
| # Logo Overlay | |
| if options.get('logo_path'): | |
| try: | |
| lw = int(options.get('logo_w', 100)) | |
| lh = int(options.get('logo_h', 100)) | |
| lx = int(options.get('logo_x', 10)) | |
| ly = int(options.get('logo_y', 10)) | |
| logo = ffmpeg.input(options['logo_path']).filter('scale', lw, lh) | |
| v = v.overlay(logo, x=lx, y=ly) | |
| except: pass | |
| # --- TEXT WATERMARK --- | |
| if options.get('text_watermark'): | |
| try: | |
| txt_content = options['text_watermark'] | |
| tx = int(options.get('text_x', 10)) | |
| ty = int(options.get('text_y', 10)) | |
| v = v.drawtext( | |
| text=txt_content, x=tx, y=ty, fontsize=35, fontcolor='white', | |
| borderw=2, bordercolor='black', shadowx=2, shadowy=2, shadowcolor='black@0.5' | |
| ) | |
| except: pass | |
| # --- AUTO SUBTITLES (FIXED PATH & SAFETY) --- | |
| if options.get('auto_subtitles') and options.get('srt_path') and os.path.exists(options['srt_path']): | |
| try: | |
| # FFmpeg αα½ααΊ Error ααααΊα α±αααΊ Relative Path ααΌα±α¬ααΊαΈαα―αΆαΈααΌααΊαΈ | |
| srt_p = os.path.relpath(options['srt_path']).replace('\\', '/') | |
| if os.path.getsize(options['srt_path']) > 0: | |
| # α‘αα«αα±α¬ααΊα α‘αα¬αΈαα½ααΊα‘αααΊαΈα α‘αααΊααα·αΊααα·αΊ | |
| style = "FontSize=24,PrimaryColour=&H0000FFFF,OutlineColour=&H00000000,BorderStyle=1,Outline=2,Alignment=2,MarginV=25" | |
| v = v.filter('subtitles', srt_p, force_style=style) | |
| else: | |
| print("β οΈ SRT File is empty, skipping subtitle burning.") | |
| except Exception as e: | |
| print(f"β οΈ Subtitle Filter Application Error: {e}") | |
| output = ffmpeg.output(v, a, output_path, vcodec='libx264', acodec='aac', preset='veryfast', shortest=None) | |
| output.run(overwrite_output=True, quiet=True) | |
| return True, "Success" | |
| except ffmpeg.Error as e: | |
| # Error α‘ααα‘αα»αααα‘α±α¬ααΊ Log αααΊαΈααΌααΊαΈ | |
| err_msg = e.stderr.decode('utf8') if e.stderr else str(e) | |
| print(f"β FFmpeg Critical Error:\n{err_msg}") | |
| return False, f"FFmpeg Render Error. (Check Terminal for details)" | |
| except Exception as e: | |
| return False, str(e) | |