testsss / reddit.py
alptangall's picture
Update reddit.py
7e523bf verified
import requests
import json
import time
from datetime import datetime
import os
import subprocess
from gtts import gTTS
import speech_recognition as sr
from bs4 import BeautifulSoup as BS4
import re
import emoji
from PIL import Image
from moviepy.editor import VideoFileClip, TextClip, CompositeVideoClip
from pydub import AudioSegment
import logging
import pickle
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s')
def escape_ffmpeg_text(text):
# Thay thế các ký tự đặc biệt
escaped_text = text.replace(':', '\\:').replace(',', '\\,').replace('=', '\\=')
return escaped_text
def remove_special_characters(string):
pattern = r'[^a-zA-Z0-9\s]'
cleaned_string = re.sub(pattern, '', string)
return ' '.join(cleaned_string.split())
def remove_emojis(text):
text = emoji.demojize(text)
text = re.sub(r':[a-zA-Z_]+:', '', text)
return text
def highlight_keywords(text, keywords):
for keyword in keywords:
text = re.sub(rf'({keyword})', r'\033[91m\1\033[0m', text)
return text
def check_file_exists(file_path):
if not os.path.exists(file_path):
print(f"Error: File not found: {file_path}")
return False
return True
def create_title_audio(title_text, output_audio):
tts = gTTS(text=title_text, lang='en')
tts.save(output_audio)
def create_title_audio(title_text, output_audio):
tts = gTTS(text=title_text, lang='en')
tts.save(output_audio)
def convert_mp3_to_wav(mp3_file, wav_file):
audio = AudioSegment.from_mp3(mp3_file)
audio.export(wav_file, format="wav")
def generate_word_timings(audio_file):
r = sr.Recognizer()
# Convert MP3 to WAV
wav_file = audio_file.replace('.mp3', '.wav')
convert_mp3_to_wav(audio_file, wav_file)
with sr.AudioFile(wav_file) as source:
audio = r.record(source)
try:
result = r.recognize_google(audio, language="en-US", show_all=True)
os.remove(wav_file) # Clean up the temporary WAV file
if not result:
return []
# If 'words' key is not present, return word-level timestamps
if 'words' not in result['alternative'][0]:
text = result['alternative'][0]['transcript']
words = text.split()
duration = len(audio.frame_data) / audio.sample_rate
time_per_word = duration / len(words)
return [(word, i * time_per_word, (i + 1) * time_per_word) for i, word in enumerate(words)]
words = result['alternative'][0]['words']
return [(word['word'], word['start_time'], word['end_time']) for word in words]
except Exception as e:
print(f"Error in speech recognition: {str(e)}")
if os.path.exists(wav_file):
os.remove(wav_file)
return []
def create_synchronized_subtitles(title_text, audio_file, output_srt):
logging.info(f"Creating synchronized subtitles for: {audio_file}")
word_timings = generate_word_timings(audio_file)
if not word_timings:
logging.warning(f"No word timings generated for {audio_file}. Creating basic subtitle.")
with open(output_srt, 'w', encoding='utf-8') as f:
f.write("1\n00:00:00,000 --> 00:00:05,000\n" + title_text + "\n")
return
with open(output_srt, 'w', encoding='utf-8') as f:
for i, (word, start_time, end_time) in enumerate(word_timings):
f.write(f"{i+1}\n")
f.write(f"{format_time(start_time)} --> {format_time(end_time)}\n")
f.write(f"{word}\n\n")
logging.info(f"Subtitle file created: {output_srt}")
def format_time(seconds):
hours = int(seconds / 3600)
minutes = int((seconds % 3600) / 60)
seconds = seconds % 60
return f"{hours:02d}:{minutes:02d}:{seconds:06.3f}".replace('.', ',')
def add_subtitles(input_video, subtitles_file, output_video):
logging.info(f"Adding subtitles to video: {input_video}")
if not os.path.exists(subtitles_file):
logging.error(f"Subtitle file not found: {subtitles_file}")
return False
cmd = [
'ffmpeg',
'-y',
'-i', input_video,
'-vf', f"subtitles='{subtitles_file}':force_style='FontName=Arial,FontSize=24,PrimaryColour=&HFFFFFF&,OutlineColour=&H000000&,BackColour=&H000000&,BorderStyle=3,Outline=1,Shadow=0,MarginV=20'",
'-c:a', 'copy',
output_video
]
try:
subprocess.run(cmd, check=True, capture_output=True, text=True)
logging.info(f"Subtitles added successfully: {output_video}")
return True
except subprocess.CalledProcessError as e:
logging.error(f"Error adding subtitles: {e.stderr}")
return False
def check_audio_stream(video_file):
cmd = [
'ffprobe',
'-v', 'quiet',
'-print_format', 'json',
'-show_streams',
video_file
]
result = subprocess.run(cmd, capture_output=True, text=True)
data = json.loads(result.stdout)
return any(stream['codec_type'] == 'audio' for stream in data['streams'])
def merge_audio(video_file, title_audio, output_video):
has_audio = check_audio_stream(video_file)
if has_audio:
cmd = [
'ffmpeg',
'-y',
'-i', video_file,
'-i', title_audio,
'-filter_complex', '[0:a][1:a]concat=n=2:v=0:a=1[outa]',
'-map', '0:v',
'-map', '[outa]',
'-c:v', 'copy',
'-c:a', 'aac',
output_video
]
else:
cmd = [
'ffmpeg',
'-y',
'-i', video_file,
'-i', title_audio,
'-map', '0:v',
'-map', '1:a',
'-c:v', 'copy',
'-c:a', 'aac',
output_video
]
subprocess.run(cmd, check=True)
def add_rainbow_title(input_video, output_video, title_text):
title_text=title_text.replace("'", "").replace('"','')
title_text=escape_ffmpeg_text(title_text)
print(title_text)
colors = ['red', 'orange', 'yellow', 'green', 'blue', 'indigo', 'violet']
color_commands = []
for i, color in enumerate(colors):
command = (
f"drawtext=text='{title_text}':fontsize=32:fontcolor={color}:"
f"x=(w-tw)/2:y=10:box=1:boxcolor=black@0.5:boxborderw=5:"
f"enable='between(mod(t,{len(colors)}),{i},{i+1})'"
)
color_commands.append(command)
filter_complex = ','.join(color_commands)
cmd = [
'ffmpeg',
'-y',
'-i', input_video,
'-vf', filter_complex,
'-c:a', 'copy',
output_video
]
subprocess.run(cmd, check=True)
def add_subtitles(input_video, subtitles_file, output_video):
cmd = [
'ffmpeg',
'-y',
'-i', input_video,
'-vf', f"subtitles={subtitles_file}:force_style='FontName=Arial,FontSize=24,PrimaryColour=&HFFFFFF&,OutlineColour=&H000000&,BackColour=&H000000&,BorderStyle=3,Outline=1,Shadow=0,MarginV=20'",
'-c:a', 'copy',
output_video
]
subprocess.run(cmd, check=True)
# Main script execution
video_clips = []
subName = 'funny'
post_data = []
prev = 0
stop = False
nextUrl = None
lenVideos = 10
url = f'https://www.reddit.com/r/{subName}/'
session = requests.Session()
# Tải cookie từ file pickle
with open('cookies.pkl', 'rb') as file:
cookies = pickle.load(file)
# Gán cookie vào session của requests
for cookie in cookies:
session.cookies.set(cookie['name'], cookie['value'], domain=cookie['domain'])
# Gửi yêu cầu với cookies đã được gán
data={"cursor":0,"size":50,"query":{"conditions":[],"sort_orders":[{"field_name":"post_time","order":2}]}}
response = session.post('https://www.tiktok.com/tiktok/creator/manage/item_list/v1/',json=data)
oldList=None
if response.status_code<400:
oldList=response.json()['item_list']
while not stop:
if len(post_data) < lenVideos:
headers = {
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:131.0) Gecko/20100101 Firefox/131.0',
'cookie': 'rdt=584a8f145972aa8519e21262efa13a80; edgebucket=DiqgaUXmcKOcAVSDTg; loid=0000000000etak4zv9.2.1632583709000.Z0FBQUFBQm5ETmlpMkRCWV9vT1h4TXBXS25CR1laVVVXdWhqcG9FYVFFZmdNUV83WUVDd2dNeE15WEVaNS0yYWlkOXZVZ2VvWTU5UmR5Y2dQbzJBMkR1N19TcDZOS1dHZVFJQUV2TUxWZ1dXUlF3WTFJdW9iQ1JyYUhmc2tRRFFjUk9qWjBjQk41VHM; session_tracker=gfklojgnfnqgmiajjh.0.1728895158137.Z0FBQUFBQm5ETmkyX0dZMEZSck1UMGUtVTAwcEtacURhR1dLVmN5b0djZlU3MG5SeHdvUEZUN2tOSmF4SDl4aGlMOEFncnFhQ3lzdDdEMlJhTG9FTWREbmhuWjRBd0Zqd1k0dGkxOXNUd0hndkEyVm80bXpTcy1ucUhrUWc4T0FqZ0NMRzJQSjkxcjE; csrf_token=5e250d04e45837bf23115c3752887cd9; token_v2=eyJhbGciOiJSUzI1NiIsImtpZCI6IlNIQTI1NjpzS3dsMnlsV0VtMjVmcXhwTU40cWY4MXE2OWFFdWFyMnpLMUdhVGxjdWNZIiwidHlwIjoiSldUIn0.eyJzdWIiOiJ1c2VyIiwiZXhwIjoxNzI4OTgxNTM4LjEyNDg3NSwiaWF0IjoxNzI4ODk1MTM4LjEyNDg3NCwianRpIjoiSFRPeUpBbFhwd0NkU3p4QWNyd0I0ckRMc1E2SGNRIiwiY2lkIjoiMFItV0FNaHVvby1NeVEiLCJsaWQiOiJ0Ml9ldGFrNHp2OSIsImFpZCI6InQyX2V0YWs0enY5IiwibGNhIjoxNjMyNTgzNzA5MDAwLCJzY3AiOiJlSnhra2RHT3REQUloZC1sMXo3Ql95cF9OaHRzY1lhc0xRYW9rM243RFZvY2s3MDdjTDRpSFA4bktJcUZMRTJ1QktHa0tXRUZXdE9VTmlMdjU4eTlPWkVGU3lGVFI4NDN5d29rYVVwUFVtTjVweWxSd1daa0xsZmFzVUtEQjZZcFZTNloyMEtQUzV2UTNJMUZ6MDZNcWx4V0h0VFlvM0pwYkdNSzJ4UGp6Y1pxUXlxdXk2bE1ZRmtvbjhXTGZ2eUctdFktZjdiZmhIWXdyS2dLRF9UT3VGeHdZX0hERkhiX25wcjBiRjJ3cUwzWGc5US0xLU4yN2JObW9kbTVfVnpQdnphU2NUbUc1aWZZdjd0LUNSMTQ1SG1aVVFjd1lnMF95ckFqNl9Ddk9vREtCUVdNSlloUEk1QXJsMl9fSmRpdVRmOGF0eWQtLUdiRVRXXzRyUm1vNXhMRW9VX2o2emNBQVBfX1hEX2U0dyIsInJjaWQiOiJMajhMd21qME5wNWg3eXRuM0R3eGo0UU5XUGZtRmFVWE1taDl3ZWI0N1ZnIiwiZmxvIjoyfQ.gaoJAdTk8XZ43NbyYB07TMoQsRSf0p5WDFyC8ZX0FPhWpghdReRrjyaJ0GwFeGmc5wMK3bTFVkUj177MktQrFZnEr27svwxA3ilpZ85NJXy4Z-nNQ7Do2PhfOzrk83Wk2bKCqm34EAHFVmU6hEfDLHowAmfIlxKzQBq5dhTQM06ZHKX7XIC_Gsbbsd8T3G1Ey2cmsThQiyZ0hqk6_0d1X7Knh1dCvEgb19zk5_UiJUduEi98B99B9tIvWcHgVuiFBY_PA4BHzuapZJdAHntlO2smXI-qZNH6IBsoQMa-irkSBTPsy3cHBycyuCahX9KwB37RaBb_TzFfZhqsgMr3aA; csv=2; g_state={"i_p":1728909182533,"i_l":1}; reddit_session=eyJhbGciOiJSUzI1NiIsImtpZCI6IlNIQTI1NjpsVFdYNlFVUEloWktaRG1rR0pVd1gvdWNFK01BSjBYRE12RU1kNzVxTXQ4IiwidHlwIjoiSldUIn0.eyJzdWIiOiJ0Ml9ldGFrNHp2OSIsImV4cCI6MTc0NDUzMzUzMi42MDgwMzcsImlhdCI6MTcyODg5NTEzMi42MDgwMzcsImp0aSI6IjVSdTdVM2JQNm5FeHM1bGhka2xwaUthNG1fTG1QdyIsImNpZCI6ImNvb2tpZSIsImxjYSI6MTYzMjU4MzcwOTAwMCwic2NwIjoiZUp5S2pnVUVBQURfX3dFVkFMayIsInYxIjoiMTE2MDg2MzQ3MjI3NywyMDI0LTEwLTE0VDA4OjM4OjUyLDUwNGM4NmZmYzBlOTQ1N2UzM2Y0NWI3NWY0YzYwNDVmZDU3M2FlNzgiLCJmbG8iOjJ9.EVFIwbgCVHWuiOHJjAi3MT0LbAeRzT-w0_N8QiHsReK1hDeswQQYyfgBh1u_wPkT2UpGqj__YQrXE6O-tvLnPdWGUktdLM_RT-BzsAjWnDBYS0cEZ-WrCQlwDNYUkORKgaeOQaX4g1bWE84UmfSla9zonJ4WhhgynebClHdbFz0pd3H047UwXwVjPMexSRh8zXuiUu98wWcPI2cFv-PT8lQUbJBAMkjvoJok6rQ0rmOviAZvh1HzKccw9RP7jPx6JJIYbSrJNmAaA-hML4f0mHHgM9dQswGUkmFWcjCJ3kuSfaaHjbYlQwdrsDF1iBCzlfH_9uhbsVSwDUECMGSnPQ'
}
req = requests.get(url, headers=headers)
soup = BS4(req.text, 'html.parser')
posts = soup.find_all('shreddit-post')
for post in posts:
if post['post-title'] not in str(oldList):
players = post.find_all('shreddit-player-2')
for player in players:
if 'packaged-media-json' in str(player) and len(post_data) < lenVideos:
js = player['packaged-media-json']
video_url = json.loads(js)['playbackMp4s']['permutations'][-1]['source']['url']
response = requests.get(video_url, stream=True)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
new_video_name = f"video_{timestamp}.mp4"
with open(new_video_name, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
print("Video downloaded successfully.")
post_data.append({'title': post['post-title'], 'url': post['content-href'], 'video': new_video_name})
prev = len(players)
els = soup.find_all('faceplate-partial')
for el in els:
if '/svc/shreddit/community-more-posts/hot/?after=' in el['src']:
url = 'https://www.reddit.com' + el['src']
else:
stop = True
print(post_data)
titles=''
for i, post in enumerate(post_data, start=1):
input_video = post['video']
title_text = post['title']
titles+=title_text+'\n'
# Create audio for the title
title_audio = f"title_audio_{i}.mp3"
create_title_audio(title_text, title_audio)
# Generate synchronized subtitles
subtitles_file = f"subtitles_{i}.srt"
create_synchronized_subtitles(title_text, title_audio, subtitles_file)
# Merge title audio with video
video_with_title_audio = f"video_with_title_audio_{i}.mp4"
merge_audio(input_video, title_audio, video_with_title_audio)
# Add rainbow title
output_video = f"output_with_rainbow_title_{i}.mp4"
add_rainbow_title(video_with_title_audio, output_video, title_text)
# Add synchronized subtitles to video
final_output_video = os.path.abspath(f"post_{i}_final.mp4")
add_subtitles(output_video, subtitles_file, final_output_video)
if check_file_exists(final_output_video):
video_clips.append(final_output_video)
# Clean up temporary files
for temp_file in [title_audio, subtitles_file, video_with_title_audio, output_video]:
if os.path.exists(temp_file):
os.remove(temp_file)
# Concatenate all processed videos
with open('titles.txt', 'w') as f:
f.write(titles)
if video_clips:
concat_file = os.path.abspath("concat.txt")
with open(concat_file, 'w') as f:
for clip in video_clips:
if os.path.exists(clip):
f.write(f"file '{clip}'\n")
if os.path.getsize(concat_file) > 0:
final_video_path = os.path.abspath("final_video.mp4")
ffmpeg_concat_command = [
"ffmpeg",
"-y",
"-f", "concat",
"-safe", "0",
"-i", concat_file,
"-c", "copy",
final_video_path
]
try:
subprocess.run(ffmpeg_concat_command, check=True)
print(f"Final video created: {final_video_path}")
except subprocess.CalledProcessError as e:
print(f"Error concatenating videos: {e}")
else:
print("No valid video clips to concat.")
else:
print("No video clips were created.")
# Clean up temporary files
for file in [concat_file] + video_clips:
if os.path.exists(file):
os.remove(file)
print("Video creation process completed.")