Spaces:
Paused
Paused
| import requests | |
| import json | |
| import time | |
| from datetime import datetime | |
| import os | |
| import subprocess | |
| from gtts import gTTS | |
| import speech_recognition as sr | |
| from bs4 import BeautifulSoup as BS4 | |
| import re | |
| import emoji | |
| from PIL import Image | |
| from moviepy.editor import VideoFileClip, TextClip, CompositeVideoClip | |
| from pydub import AudioSegment | |
| import logging | |
| import pickle | |
| logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s') | |
| def escape_ffmpeg_text(text): | |
| # Thay thế các ký tự đặc biệt | |
| escaped_text = text.replace(':', '\\:').replace(',', '\\,').replace('=', '\\=') | |
| return escaped_text | |
| def remove_special_characters(string): | |
| pattern = r'[^a-zA-Z0-9\s]' | |
| cleaned_string = re.sub(pattern, '', string) | |
| return ' '.join(cleaned_string.split()) | |
| def remove_emojis(text): | |
| text = emoji.demojize(text) | |
| text = re.sub(r':[a-zA-Z_]+:', '', text) | |
| return text | |
| def highlight_keywords(text, keywords): | |
| for keyword in keywords: | |
| text = re.sub(rf'({keyword})', r'\033[91m\1\033[0m', text) | |
| return text | |
| def check_file_exists(file_path): | |
| if not os.path.exists(file_path): | |
| print(f"Error: File not found: {file_path}") | |
| return False | |
| return True | |
| def create_title_audio(title_text, output_audio): | |
| tts = gTTS(text=title_text, lang='en') | |
| tts.save(output_audio) | |
| def create_title_audio(title_text, output_audio): | |
| tts = gTTS(text=title_text, lang='en') | |
| tts.save(output_audio) | |
| def convert_mp3_to_wav(mp3_file, wav_file): | |
| audio = AudioSegment.from_mp3(mp3_file) | |
| audio.export(wav_file, format="wav") | |
| def generate_word_timings(audio_file): | |
| r = sr.Recognizer() | |
| # Convert MP3 to WAV | |
| wav_file = audio_file.replace('.mp3', '.wav') | |
| convert_mp3_to_wav(audio_file, wav_file) | |
| with sr.AudioFile(wav_file) as source: | |
| audio = r.record(source) | |
| try: | |
| result = r.recognize_google(audio, language="en-US", show_all=True) | |
| os.remove(wav_file) # Clean up the temporary WAV file | |
| if not result: | |
| return [] | |
| # If 'words' key is not present, return word-level timestamps | |
| if 'words' not in result['alternative'][0]: | |
| text = result['alternative'][0]['transcript'] | |
| words = text.split() | |
| duration = len(audio.frame_data) / audio.sample_rate | |
| time_per_word = duration / len(words) | |
| return [(word, i * time_per_word, (i + 1) * time_per_word) for i, word in enumerate(words)] | |
| words = result['alternative'][0]['words'] | |
| return [(word['word'], word['start_time'], word['end_time']) for word in words] | |
| except Exception as e: | |
| print(f"Error in speech recognition: {str(e)}") | |
| if os.path.exists(wav_file): | |
| os.remove(wav_file) | |
| return [] | |
| def create_synchronized_subtitles(title_text, audio_file, output_srt): | |
| logging.info(f"Creating synchronized subtitles for: {audio_file}") | |
| word_timings = generate_word_timings(audio_file) | |
| if not word_timings: | |
| logging.warning(f"No word timings generated for {audio_file}. Creating basic subtitle.") | |
| with open(output_srt, 'w', encoding='utf-8') as f: | |
| f.write("1\n00:00:00,000 --> 00:00:05,000\n" + title_text + "\n") | |
| return | |
| with open(output_srt, 'w', encoding='utf-8') as f: | |
| for i, (word, start_time, end_time) in enumerate(word_timings): | |
| f.write(f"{i+1}\n") | |
| f.write(f"{format_time(start_time)} --> {format_time(end_time)}\n") | |
| f.write(f"{word}\n\n") | |
| logging.info(f"Subtitle file created: {output_srt}") | |
| def format_time(seconds): | |
| hours = int(seconds / 3600) | |
| minutes = int((seconds % 3600) / 60) | |
| seconds = seconds % 60 | |
| return f"{hours:02d}:{minutes:02d}:{seconds:06.3f}".replace('.', ',') | |
| def add_subtitles(input_video, subtitles_file, output_video): | |
| logging.info(f"Adding subtitles to video: {input_video}") | |
| if not os.path.exists(subtitles_file): | |
| logging.error(f"Subtitle file not found: {subtitles_file}") | |
| return False | |
| cmd = [ | |
| 'ffmpeg', | |
| '-y', | |
| '-i', input_video, | |
| '-vf', f"subtitles='{subtitles_file}':force_style='FontName=Arial,FontSize=24,PrimaryColour=&HFFFFFF&,OutlineColour=&H000000&,BackColour=&H000000&,BorderStyle=3,Outline=1,Shadow=0,MarginV=20'", | |
| '-c:a', 'copy', | |
| output_video | |
| ] | |
| try: | |
| subprocess.run(cmd, check=True, capture_output=True, text=True) | |
| logging.info(f"Subtitles added successfully: {output_video}") | |
| return True | |
| except subprocess.CalledProcessError as e: | |
| logging.error(f"Error adding subtitles: {e.stderr}") | |
| return False | |
| def check_audio_stream(video_file): | |
| cmd = [ | |
| 'ffprobe', | |
| '-v', 'quiet', | |
| '-print_format', 'json', | |
| '-show_streams', | |
| video_file | |
| ] | |
| result = subprocess.run(cmd, capture_output=True, text=True) | |
| data = json.loads(result.stdout) | |
| return any(stream['codec_type'] == 'audio' for stream in data['streams']) | |
| def merge_audio(video_file, title_audio, output_video): | |
| has_audio = check_audio_stream(video_file) | |
| if has_audio: | |
| cmd = [ | |
| 'ffmpeg', | |
| '-y', | |
| '-i', video_file, | |
| '-i', title_audio, | |
| '-filter_complex', '[0:a][1:a]concat=n=2:v=0:a=1[outa]', | |
| '-map', '0:v', | |
| '-map', '[outa]', | |
| '-c:v', 'copy', | |
| '-c:a', 'aac', | |
| output_video | |
| ] | |
| else: | |
| cmd = [ | |
| 'ffmpeg', | |
| '-y', | |
| '-i', video_file, | |
| '-i', title_audio, | |
| '-map', '0:v', | |
| '-map', '1:a', | |
| '-c:v', 'copy', | |
| '-c:a', 'aac', | |
| output_video | |
| ] | |
| subprocess.run(cmd, check=True) | |
| def add_rainbow_title(input_video, output_video, title_text): | |
| title_text=title_text.replace("'", "").replace('"','') | |
| title_text=escape_ffmpeg_text(title_text) | |
| print(title_text) | |
| colors = ['red', 'orange', 'yellow', 'green', 'blue', 'indigo', 'violet'] | |
| color_commands = [] | |
| for i, color in enumerate(colors): | |
| command = ( | |
| f"drawtext=text='{title_text}':fontsize=32:fontcolor={color}:" | |
| f"x=(w-tw)/2:y=10:box=1:boxcolor=black@0.5:boxborderw=5:" | |
| f"enable='between(mod(t,{len(colors)}),{i},{i+1})'" | |
| ) | |
| color_commands.append(command) | |
| filter_complex = ','.join(color_commands) | |
| cmd = [ | |
| 'ffmpeg', | |
| '-y', | |
| '-i', input_video, | |
| '-vf', filter_complex, | |
| '-c:a', 'copy', | |
| output_video | |
| ] | |
| subprocess.run(cmd, check=True) | |
| def add_subtitles(input_video, subtitles_file, output_video): | |
| cmd = [ | |
| 'ffmpeg', | |
| '-y', | |
| '-i', input_video, | |
| '-vf', f"subtitles={subtitles_file}:force_style='FontName=Arial,FontSize=24,PrimaryColour=&HFFFFFF&,OutlineColour=&H000000&,BackColour=&H000000&,BorderStyle=3,Outline=1,Shadow=0,MarginV=20'", | |
| '-c:a', 'copy', | |
| output_video | |
| ] | |
| subprocess.run(cmd, check=True) | |
| # Main script execution | |
| video_clips = [] | |
| subName = 'funny' | |
| post_data = [] | |
| prev = 0 | |
| stop = False | |
| nextUrl = None | |
| lenVideos = 10 | |
| url = f'https://www.reddit.com/r/{subName}/' | |
| session = requests.Session() | |
| # Tải cookie từ file pickle | |
| with open('cookies.pkl', 'rb') as file: | |
| cookies = pickle.load(file) | |
| # Gán cookie vào session của requests | |
| for cookie in cookies: | |
| session.cookies.set(cookie['name'], cookie['value'], domain=cookie['domain']) | |
| # Gửi yêu cầu với cookies đã được gán | |
| data={"cursor":0,"size":50,"query":{"conditions":[],"sort_orders":[{"field_name":"post_time","order":2}]}} | |
| response = session.post('https://www.tiktok.com/tiktok/creator/manage/item_list/v1/',json=data) | |
| oldList=None | |
| if response.status_code<400: | |
| oldList=response.json()['item_list'] | |
| while not stop: | |
| if len(post_data) < lenVideos: | |
| headers = { | |
| 'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:131.0) Gecko/20100101 Firefox/131.0', | |
| 'cookie': 'rdt=584a8f145972aa8519e21262efa13a80; edgebucket=DiqgaUXmcKOcAVSDTg; loid=0000000000etak4zv9.2.1632583709000.Z0FBQUFBQm5ETmlpMkRCWV9vT1h4TXBXS25CR1laVVVXdWhqcG9FYVFFZmdNUV83WUVDd2dNeE15WEVaNS0yYWlkOXZVZ2VvWTU5UmR5Y2dQbzJBMkR1N19TcDZOS1dHZVFJQUV2TUxWZ1dXUlF3WTFJdW9iQ1JyYUhmc2tRRFFjUk9qWjBjQk41VHM; session_tracker=gfklojgnfnqgmiajjh.0.1728895158137.Z0FBQUFBQm5ETmkyX0dZMEZSck1UMGUtVTAwcEtacURhR1dLVmN5b0djZlU3MG5SeHdvUEZUN2tOSmF4SDl4aGlMOEFncnFhQ3lzdDdEMlJhTG9FTWREbmhuWjRBd0Zqd1k0dGkxOXNUd0hndkEyVm80bXpTcy1ucUhrUWc4T0FqZ0NMRzJQSjkxcjE; csrf_token=5e250d04e45837bf23115c3752887cd9; token_v2=eyJhbGciOiJSUzI1NiIsImtpZCI6IlNIQTI1NjpzS3dsMnlsV0VtMjVmcXhwTU40cWY4MXE2OWFFdWFyMnpLMUdhVGxjdWNZIiwidHlwIjoiSldUIn0.eyJzdWIiOiJ1c2VyIiwiZXhwIjoxNzI4OTgxNTM4LjEyNDg3NSwiaWF0IjoxNzI4ODk1MTM4LjEyNDg3NCwianRpIjoiSFRPeUpBbFhwd0NkU3p4QWNyd0I0ckRMc1E2SGNRIiwiY2lkIjoiMFItV0FNaHVvby1NeVEiLCJsaWQiOiJ0Ml9ldGFrNHp2OSIsImFpZCI6InQyX2V0YWs0enY5IiwibGNhIjoxNjMyNTgzNzA5MDAwLCJzY3AiOiJlSnhra2RHT3REQUloZC1sMXo3Ql95cF9OaHRzY1lhc0xRYW9rM243RFZvY2s3MDdjTDRpSFA4bktJcUZMRTJ1QktHa0tXRUZXdE9VTmlMdjU4eTlPWkVGU3lGVFI4NDN5d29rYVVwUFVtTjVweWxSd1daa0xsZmFzVUtEQjZZcFZTNloyMEtQUzV2UTNJMUZ6MDZNcWx4V0h0VFlvM0pwYkdNSzJ4UGp6Y1pxUXlxdXk2bE1ZRmtvbjhXTGZ2eUctdFktZjdiZmhIWXdyS2dLRF9UT3VGeHdZX0hERkhiX25wcjBiRjJ3cUwzWGc5US0xLU4yN2JObW9kbTVfVnpQdnphU2NUbUc1aWZZdjd0LUNSMTQ1SG1aVVFjd1lnMF95ckFqNl9Ddk9vREtCUVdNSlloUEk1QXJsMl9fSmRpdVRmOGF0eWQtLUdiRVRXXzRyUm1vNXhMRW9VX2o2emNBQVBfX1hEX2U0dyIsInJjaWQiOiJMajhMd21qME5wNWg3eXRuM0R3eGo0UU5XUGZtRmFVWE1taDl3ZWI0N1ZnIiwiZmxvIjoyfQ.gaoJAdTk8XZ43NbyYB07TMoQsRSf0p5WDFyC8ZX0FPhWpghdReRrjyaJ0GwFeGmc5wMK3bTFVkUj177MktQrFZnEr27svwxA3ilpZ85NJXy4Z-nNQ7Do2PhfOzrk83Wk2bKCqm34EAHFVmU6hEfDLHowAmfIlxKzQBq5dhTQM06ZHKX7XIC_Gsbbsd8T3G1Ey2cmsThQiyZ0hqk6_0d1X7Knh1dCvEgb19zk5_UiJUduEi98B99B9tIvWcHgVuiFBY_PA4BHzuapZJdAHntlO2smXI-qZNH6IBsoQMa-irkSBTPsy3cHBycyuCahX9KwB37RaBb_TzFfZhqsgMr3aA; csv=2; g_state={"i_p":1728909182533,"i_l":1}; reddit_session=eyJhbGciOiJSUzI1NiIsImtpZCI6IlNIQTI1NjpsVFdYNlFVUEloWktaRG1rR0pVd1gvdWNFK01BSjBYRE12RU1kNzVxTXQ4IiwidHlwIjoiSldUIn0.eyJzdWIiOiJ0Ml9ldGFrNHp2OSIsImV4cCI6MTc0NDUzMzUzMi42MDgwMzcsImlhdCI6MTcyODg5NTEzMi42MDgwMzcsImp0aSI6IjVSdTdVM2JQNm5FeHM1bGhka2xwaUthNG1fTG1QdyIsImNpZCI6ImNvb2tpZSIsImxjYSI6MTYzMjU4MzcwOTAwMCwic2NwIjoiZUp5S2pnVUVBQURfX3dFVkFMayIsInYxIjoiMTE2MDg2MzQ3MjI3NywyMDI0LTEwLTE0VDA4OjM4OjUyLDUwNGM4NmZmYzBlOTQ1N2UzM2Y0NWI3NWY0YzYwNDVmZDU3M2FlNzgiLCJmbG8iOjJ9.EVFIwbgCVHWuiOHJjAi3MT0LbAeRzT-w0_N8QiHsReK1hDeswQQYyfgBh1u_wPkT2UpGqj__YQrXE6O-tvLnPdWGUktdLM_RT-BzsAjWnDBYS0cEZ-WrCQlwDNYUkORKgaeOQaX4g1bWE84UmfSla9zonJ4WhhgynebClHdbFz0pd3H047UwXwVjPMexSRh8zXuiUu98wWcPI2cFv-PT8lQUbJBAMkjvoJok6rQ0rmOviAZvh1HzKccw9RP7jPx6JJIYbSrJNmAaA-hML4f0mHHgM9dQswGUkmFWcjCJ3kuSfaaHjbYlQwdrsDF1iBCzlfH_9uhbsVSwDUECMGSnPQ' | |
| } | |
| req = requests.get(url, headers=headers) | |
| soup = BS4(req.text, 'html.parser') | |
| posts = soup.find_all('shreddit-post') | |
| for post in posts: | |
| if post['post-title'] not in str(oldList): | |
| players = post.find_all('shreddit-player-2') | |
| for player in players: | |
| if 'packaged-media-json' in str(player) and len(post_data) < lenVideos: | |
| js = player['packaged-media-json'] | |
| video_url = json.loads(js)['playbackMp4s']['permutations'][-1]['source']['url'] | |
| response = requests.get(video_url, stream=True) | |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| new_video_name = f"video_{timestamp}.mp4" | |
| with open(new_video_name, 'wb') as f: | |
| for chunk in response.iter_content(chunk_size=8192): | |
| f.write(chunk) | |
| print("Video downloaded successfully.") | |
| post_data.append({'title': post['post-title'], 'url': post['content-href'], 'video': new_video_name}) | |
| prev = len(players) | |
| els = soup.find_all('faceplate-partial') | |
| for el in els: | |
| if '/svc/shreddit/community-more-posts/hot/?after=' in el['src']: | |
| url = 'https://www.reddit.com' + el['src'] | |
| else: | |
| stop = True | |
| print(post_data) | |
| titles='' | |
| for i, post in enumerate(post_data, start=1): | |
| input_video = post['video'] | |
| title_text = post['title'] | |
| titles+=title_text+'\n' | |
| # Create audio for the title | |
| title_audio = f"title_audio_{i}.mp3" | |
| create_title_audio(title_text, title_audio) | |
| # Generate synchronized subtitles | |
| subtitles_file = f"subtitles_{i}.srt" | |
| create_synchronized_subtitles(title_text, title_audio, subtitles_file) | |
| # Merge title audio with video | |
| video_with_title_audio = f"video_with_title_audio_{i}.mp4" | |
| merge_audio(input_video, title_audio, video_with_title_audio) | |
| # Add rainbow title | |
| output_video = f"output_with_rainbow_title_{i}.mp4" | |
| add_rainbow_title(video_with_title_audio, output_video, title_text) | |
| # Add synchronized subtitles to video | |
| final_output_video = os.path.abspath(f"post_{i}_final.mp4") | |
| add_subtitles(output_video, subtitles_file, final_output_video) | |
| if check_file_exists(final_output_video): | |
| video_clips.append(final_output_video) | |
| # Clean up temporary files | |
| for temp_file in [title_audio, subtitles_file, video_with_title_audio, output_video]: | |
| if os.path.exists(temp_file): | |
| os.remove(temp_file) | |
| # Concatenate all processed videos | |
| with open('titles.txt', 'w') as f: | |
| f.write(titles) | |
| if video_clips: | |
| concat_file = os.path.abspath("concat.txt") | |
| with open(concat_file, 'w') as f: | |
| for clip in video_clips: | |
| if os.path.exists(clip): | |
| f.write(f"file '{clip}'\n") | |
| if os.path.getsize(concat_file) > 0: | |
| final_video_path = os.path.abspath("final_video.mp4") | |
| ffmpeg_concat_command = [ | |
| "ffmpeg", | |
| "-y", | |
| "-f", "concat", | |
| "-safe", "0", | |
| "-i", concat_file, | |
| "-c", "copy", | |
| final_video_path | |
| ] | |
| try: | |
| subprocess.run(ffmpeg_concat_command, check=True) | |
| print(f"Final video created: {final_video_path}") | |
| except subprocess.CalledProcessError as e: | |
| print(f"Error concatenating videos: {e}") | |
| else: | |
| print("No valid video clips to concat.") | |
| else: | |
| print("No video clips were created.") | |
| # Clean up temporary files | |
| for file in [concat_file] + video_clips: | |
| if os.path.exists(file): | |
| os.remove(file) | |
| print("Video creation process completed.") |