import os # ───────────── Standard Library import os import time import uuid import json import base64 import asyncio import nest_asyncio import random import logging import atexit import pathlib from threading import Thread from datetime import datetime, timedelta, timezone from typing import Optional, Tuple, List # ───────────── Flask from flask import Flask, render_template, request, redirect, session # ───────────── MoviePy from moviepy.editor import ( VideoFileClip, ImageClip, ColorClip, CompositeVideoClip, concatenate_videoclips ) from moviepy.video.fx import resize # ───────────── Pillow (PIL) from PIL import Image, ImageDraw, ImageFont # ───────────── NumPy import numpy as np # ───────────── Requests import requests # ───────────── Emoji Handling import emoji # ───────────── MongoDB from pymongo import MongoClient # ───────────── YouTube API (Google) from google.oauth2.credentials import Credentials from google.auth.transport.requests import Request from googleapiclient.discovery import build from googleapiclient.http import MediaFileUpload # ───────────── yt-dlp from yt_dlp import YoutubeDL # ───────────── Telegram Bot from telegram import Update from telegram.ext import ( Application, CommandHandler, MessageHandler, filters, ContextTypes, JobQueue ) UPLOAD_TIMES = [] NEXT_RESET = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0) + timedelta(days=1) def patch_moviepy(): original_resizer = resize.resize def patched_resizer(clip, *args, **kwargs): newsize = kwargs.get("newsize", None) if newsize: newsize = tuple(map(int, newsize)) clip = clip.fl_image(lambda img: img.resize(newsize, Image.Resampling.LANCZOS)) else: clip = original_resizer(clip, *args, **kwargs) return clip resize.resize = patched_resizer patch_moviepy() import emoji from PIL import Image, ImageDraw, ImageFont import emoji import os import requests from PIL import Image, ImageDraw, ImageFont import emoji import os, requests def create_text_image_2( text: str, width: int, height: int, *, font_size: int = 60, align: str = "center", # "left" | "center" | "right" bg_color=(255, 255, 255), text_color=(0, 0, 0) ): img = Image.new("RGB", (width, height), color=bg_color) draw = ImageDraw.Draw(img) # Load font try: font = ImageFont.truetype("DejaVuSans-Bold.ttf", size=font_size) except OSError: font = ImageFont.load_default() # Extract emoji & pure text emojis = emoji.emoji_list(text) pure_text = emoji.replace_emoji(text, replace='') # Calc text width txt_w, txt_h = draw.textsize(pure_text, font=font) full_w = txt_w + len(emojis) * font_size # rough emoji width = font_size # X‑offset based on alignment if align == "left": x_start = 20 elif align == "right": x_start = max(20, width - full_w - 20) else: # center x_start = max(20, (width - full_w) // 2) y_start = (height - txt_h) // 2 # Draw text draw.text((x_start, y_start), pure_text, font=font, fill=text_color) # Draw emojis right after text x = x_start + txt_w + 5 for e in emojis: hexcode = "-".join(f"{ord(c):x}" for c in e["emoji"]) path = f"emoji_pngs/{hexcode}.png" if not os.path.exists(path): # download if missing url = f"https://github.com/twitter/twemoji/raw/master/assets/72x72/{hexcode}.png" os.makedirs("emoji_pngs", exist_ok=True) try: r = requests.get(url) if r.ok: with open(path, "wb") as f: f.write(r.content) except Exception: continue if os.path.exists(path): em_img = Image.open(path).convert("RGBA").resize((font_size, font_size)) img.paste(em_img, (x, y_start), em_img) x += font_size return img def create_text_image(text, width, height): img = Image.new("RGB", (width, height), color=(255, 255, 255)) draw = ImageDraw.Draw(img) # Load font try: font = ImageFont.truetype("DejaVuSans-Bold.ttf", size=60) except: font = ImageFont.load_default() # Extract emoji and clean text emojis = emoji.emoji_list(text) pure_text = emoji.replace_emoji(text, replace='') # Adjust font size to fit max_font_size = 70 while True: font = ImageFont.truetype("DejaVuSans-Bold.ttf", size=max_font_size) text_width, text_height = draw.textsize(pure_text, font=font) total_width = text_width + (len(emojis) * 60) + 20 if total_width <= width - 40 or max_font_size <= 30: break max_font_size -= 2 # Starting X & Y for centered layout start_x = (width - total_width) // 2 y = (height - text_height) // 2 # Draw text first draw.text((start_x, y), pure_text, font=font, fill=(0, 0, 0)) # Then draw emojis to the right of the text x = start_x + text_width + 10 for e in emojis: hexcode = '-'.join(f"{ord(c):x}" for c in e['emoji']) emoji_path = f"emoji_pngs/{hexcode}.png" if not os.path.exists(emoji_path): download_emoji_png(e['emoji']) if os.path.exists(emoji_path): emoji_img = Image.open(emoji_path).convert("RGBA") emoji_img = emoji_img.resize((60, 60)) img.paste(emoji_img, (x, y), emoji_img) x += 60 + 4 return img from PIL import Image, ImageDraw, ImageFont import numpy as np from moviepy.editor import ImageClip def generate_watermark_img(text, width, height=50): img = Image.new("RGBA", (width, height), (0, 0, 0, 0)) draw = ImageDraw.Draw(img) try: font = ImageFont.truetype("DejaVuSans-Bold.ttf", size=35) except: font = ImageFont.load_default() text_width, text_height = draw.textsize(text, font=font) draw.text((5, height - text_height - 2), text, fill="white", font=font, stroke_width=1, stroke_fill="black") return img def download_emoji_png(emoji_char): hexcode = '-'.join(f"{ord(c):x}" for c in emoji_char) url = f"https://github.com/twitter/twemoji/raw/master/assets/72x72/{hexcode}.png" os.makedirs("emoji_pngs", exist_ok=True) path = f"emoji_pngs/{hexcode}.png" try: r = requests.get(url) if r.status_code == 200: with open(path, "wb") as f: f.write(r.content) print(f"✅ Downloaded emoji: {emoji_char} → {path}") else: print(f"❌ Failed to download emoji: {emoji_char}") except Exception as e: print(f"⚠️ Error downloading emoji {emoji_char}: {e}") def edit_video(video_path): clip = VideoFileClip(video_path) video_width = clip.w video_height = clip.h bar_height = 120 total_height = video_height + bar_height # === 1. Background Canvas final_bg = ColorClip(size=(video_width, total_height), color=(255, 255, 255), duration=clip.duration) # === 2. Caption Bar (Top) caption = random.choice(CAPTIONS) caption_img = create_text_image(caption, video_width, bar_height) caption_clip = ImageClip(np.array(caption_img)).set_duration(clip.duration).set_position((0, 0)) # === 3. Eye Protection Overlay (6% White Transparent) eye_protection = ColorClip(size=(video_width, video_height), color=(255, 255, 255), duration=clip.duration) eye_protection = eye_protection.set_opacity(0.1).set_position((0, bar_height)) # === 4. Watermark (Bottom-left using Pillow + ImageClip) watermark_img = generate_watermark_img("@fulltosscomedy4u", video_width, height=50) watermark_clip = ImageClip(np.array(watermark_img)).set_duration(clip.duration).set_position(("left", bar_height + video_height - 50)) # === 5. Position original video below top bar video_clip = clip.set_position((0, bar_height)) # === 6. Combine everything final = CompositeVideoClip( [final_bg, caption_clip, video_clip, eye_protection, watermark_clip], size=(video_width, total_height) ) os.makedirs("edited", exist_ok=True) output_path = f"edited/{uuid.uuid4().hex}.mp4" final.write_videofile( output_path, codec="libx264", audio_codec="aac", preset="veryslow", bitrate="12000k", verbose=False, logger=None ) return output_path def edit_video_raw(video_path: str) -> str: import os, uuid, random import numpy as np from moviepy.editor import ( VideoFileClip, ImageClip, ColorClip, TextClip, CompositeVideoClip ) # Load main video clip = VideoFileClip(video_path).resize(width=1220) vw, vh = 1220, 2460 # Pixel-perfect heights CAPTION_H = 230 LAUGH_H = 508 MID_H = 210 MAIN_H = 1512 if clip.duration < 6: raise ValueError("Main video must be at least 6 seconds.") if clip.duration > 180: clip = clip.subclip(0, 180) # Top caption caption = random.choice(CAPTIONS) caption_img = create_text_image_2( caption, vw, CAPTION_H, font_size=72, align="center", bg_color=(0, 0, 0), text_color=(255, 255, 255) ) caption_clip = ImageClip(np.array(caption_img)) \ .set_duration(clip.duration) \ .set_position((0, 0)) # Laugh meme (2s + freeze + 2s) laugh_files = ["laugh/laugh_one.mp4", "laugh/laugh_two.mp4"] laugh_path = random.choice(laugh_files) if not os.path.exists(laugh_path): raise FileNotFoundError(f"❌ Laugh meme not found: {laugh_path}") laugh_raw = VideoFileClip(laugh_path).resize(width=1220) if laugh_raw.duration < 4: raise ValueError("Laugh meme must be at least 4 seconds.") laugh_start = ( laugh_raw.subclip(0, 2) .resize(height=LAUGH_H) .set_start(0) .set_position((0, CAPTION_H)) ) freeze_frame = ( laugh_start.to_ImageClip() .set_start(2) .set_duration(max(0, clip.duration - 4)) .set_position((0, CAPTION_H)) ) laugh_end = ( laugh_raw.subclip(laugh_raw.duration - 2) .resize(height=LAUGH_H) .set_start(clip.duration - 2) .set_position((0, CAPTION_H)) ) # Mid caption mid_text = random.choice([ "Pura 1 din laga tab ye reel mili 🤣", "Ye miss mat kr dena 😜", "Kha thi ye reel ab tak 🤨🤗", "Wait, ye dekh kr hi janna 💥💥", ]) mid_img = create_text_image_2( mid_text, vw, MID_H, font_size=64, align="center", bg_color=(0, 0, 0), text_color=(255, 255, 255) ) mid_caption_clip = ImageClip(np.array(mid_img)) \ .set_duration(clip.duration) \ .set_start(0) \ .set_position((0, CAPTION_H + LAUGH_H)) # Main video main_video_y = CAPTION_H + LAUGH_H + MID_H main_video = clip.resize(height=MAIN_H) \ .set_start(0) \ .set_position((0, main_video_y)) # Light filter over main video to reduce copyright detection overlay = ColorClip(size=(vw, MAIN_H), color=(255, 255, 255), duration=clip.duration) \ .set_opacity(0.06) \ .set_position((0, main_video_y)) # Watermark: center-right inside main video watermark_text = TextClip( "@FullTossComedy4U", fontsize=48, font="Roboto-Bold", # Use a modern-looking font if available color="white", stroke_color="black", stroke_width=2, ).set_duration(clip.duration) watermark_position = ( vw - watermark_text.w - 40, main_video_y + (MAIN_H // 2) - (watermark_text.h // 2) ) watermark = watermark_text.set_position(watermark_position) # Final composition final = CompositeVideoClip([ caption_clip, laugh_start, freeze_frame, laugh_end, mid_caption_clip, main_video, overlay, watermark, ], size=(vw, vh)) # Export os.makedirs("edited", exist_ok=True) out_path = f"edited/{uuid.uuid4().hex}.mp4" final.write_videofile( out_path, codec="libx264", audio_codec="aac", preset="veryslow", logger=None, bitrate="15000k", threads=4, fps=clip.fps ) clip.close(), laugh_raw.close(), final.close() return out_path # ───────────────────────────── LOGGING logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s", handlers=[ logging.FileHandler("app.log"), logging.StreamHandler() ] ) logger = logging.getLogger(__name__) # ───────────────────────────── CONSTANTS & GLOBALS CAPTIONS = [ "Wait for it 😜", "Watch till end 😂", "Try not to laugh 🤣", "Don't skip this 🔥", "You won't expect this! 😀", "Keep watching 😆", "Stay till end! 💥", "Funniest one yet" ] BLOCKLIST = [ "nsfw", "18+", "xxx", "sexy", "adult", "porn", "onlyfans", "escort", "betting", "gambling", "iplwin", "1xbet", "winzo", "my11circle", "dream11", "rummy", "teenpatti", "fantasy", "casino", "promotion" ] UPLOAD_TIMES: List[datetime] = [] NEXT_RESET: datetime | None = None first_run = True # ───────────────────────────── DATABASE client = MongoClient(os.getenv("MONGO_URI")) db1 = client.shortttt # meta for YouTube uploads meta = db1.meta botdb = client.teleg4am_reelssss a_raw = botdb.raw_links # {link:str, used:bool} a_reacted = botdb.reacted_links # ───────────────────────────── FLASK UI from flask import Flask app = Flask(__name__) @app.route("/") def home(): return "✅ Code is running!" # ───── Function 1: pick random link ────────────────────────────────── def get_random_link() -> Tuple[Optional[str], Optional[str]]: raw_left = list(a_raw.find({"used": False})) reacted_left = list(a_reacted.find({"used": False})) if not raw_left and not reacted_left: return None, None choice_pool = "raw" if random.random() < 0.7 else "reacted" if choice_pool == "raw" and not raw_left: choice_pool = "reacted" if choice_pool == "reacted" and not reacted_left: choice_pool = "raw" col, pool_list = (a_raw, raw_left) if choice_pool == "raw" else (a_reacted, reacted_left) doc = random.choice(pool_list) col.update_one({"_id": doc["_id"]}, {"$set": {"used": True}}) return doc["link"], choice_pool import os import re import uuid import asyncio import pathlib import logging from typing import Optional, Tuple from telethon import TelegramClient from telethon.sessions import StringSession from telethon.tl.types import DocumentAttributeVideo from moviepy.editor import VideoFileClip API_ID = int(os.getenv("TG_API_ID", "3704772")) API_HASH = os.getenv("TG_API_HASH", "b8e50a035abb851c0dd424e14cac4c06") SESSION_STR = os.getenv("SESSION") TARGET_BOT = "SaveMedia_bot" logger = logging.getLogger(__name__) def tg_duration_seconds(message) -> Optional[int]: if not message or not message.media or not message.media.document: return None for attr in message.media.document.attributes: if isinstance(attr, DocumentAttributeVideo): return attr.duration return None import asyncio import logging import shutil async def download_url_mp4(url: str, filename: str, timeout: int = 30) -> bool: # 1. Ensure wget exists if not shutil.which("wget"): logger.error("❌ wget is not installed or not in PATH.") return False # 2. Build wget command wget_cmd = [ "wget", "--quiet", # minimal output # still show a progress bar f"--timeout={timeout}", # seconds "--header=User-Agent: Mozilla/5.0 (Linux; Android 10)", "--header=Referer: https://www.instagram.com/", "-O", filename, # output path url ] try: # 3. Launch wget as an async subprocess proc = await asyncio.create_subprocess_exec( *wget_cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) # 4. Wait for it to finish stdout, stderr = await proc.communicate() if proc.returncode == 0: logger.info(f"📥 Downloaded MP4 → {filename}") return True else: logger.error( f"❌ wget exited with {proc.returncode}\nSTDERR: {stderr.decode().strip()}" ) return False except Exception as e: logger.error(f"❌ wget launch failed: {e}") return False async def send_to_bot_and_get_video(link: str) -> Tuple[Optional[str], Optional[int]]: async with TelegramClient(StringSession(SESSION_STR), API_ID, API_HASH) as client: bot = await client.get_entity(TARGET_BOT) async def attempt(send_link: str, depth=0) -> Tuple[Optional[str], Optional[int]]: if depth > 1: logger.warning("🔁 Retry limit reached.") return None, None async with client.conversation(bot, timeout=30) as conv: await conv.send_message(send_link) logger.info(f"📤 Sent to {TARGET_BOT}: {send_link}") try: reply = await conv.get_response() except asyncio.TimeoutError: logger.warning("⏰ No reply from bot in 30 seconds.") return None, None msg = reply text = msg.message or "" logger.info(f"📬 Bot replied: {text[:80]}") # ✅ Case 1: Telegram video file duration = tg_duration_seconds(msg) if duration: if 20 <= duration <= 180: pathlib.Path("reels").mkdir(exist_ok=True) file_path = await msg.download_media(file="reels/") logger.info(f"✅ Downloaded video → {file_path}") return file_path, duration logger.info(f"⏩ Skipped due to invalid duration = {duration}s") return None, None # ✅ Case 2: CDN link in full text (even if surrounded by message) urls = re.findall(r"https://[^\s]+", text) if urls: cdn_url = urls[0].strip() pathlib.Path("reels").mkdir(exist_ok=True) fname = f"reels/{uuid.uuid4().hex}.mp4" success = await download_url_mp4(cdn_url, fname) if not success: return None, None # Try to get duration using MoviePy try: clip = VideoFileClip(fname) duration = int(clip.duration) clip.close() if 20 <= duration <= 180: return fname, duration else: logger.info(f"⏩ CDN duration = {duration}s → Skipped") os.remove(fname) return None, None except Exception as e: logger.warning(f"🎞️ Duration read failed: {e}") return None, None # ❌ Case 3: Ad or error if "Request failed" in text: logger.warning("🔁 Bot said request failed, retrying once…") return await attempt(send_link, depth + 1) if "http" in text and (msg.photo or text.count(" ") > 0): logger.info("❌ Detected ad/promo. Ignored.") return None, None logger.info("❌ No usable video received from bot.") return None, None return await attempt(link) # ────────── Loop until we get a valid reel ──────── async def fetch_valid_reel() -> Tuple[Optional[str], Optional[str]]: for _ in range(10): link, pool = get_random_link() if not link: return None, None logger.info(f"Trying {pool} link: {link}") video_path, duration = await send_to_bot_and_get_video(link) if video_path: return video_path, pool await asyncio.sleep(15) return None, None # ───────────────────────────── YOUTUBE UPLOAD & META # (upload_to_youtube, save_to_db) — unchanged from original def upload_to_youtube(video_path, title, desc): creds = Credentials( token=None, refresh_token=os.getenv("YT_REFRESH_TOKEN", ), token_uri="https://oauth2.googleapis.com/token", client_id=os.getenv("YT_CLIENT_ID"), client_secret=os.getenv("YT_CLIENT_SECRET"), scopes=["https://www.googleapis.com/auth/youtube.upload"] ) creds.refresh(Request()) youtube = build("youtube", "v3", credentials=creds) request = youtube.videos().insert( part="snippet,status", body={ "snippet": { "title": title, "description": desc, "tags": ["funny", "memes", "comedy", "shorts"], "categoryId": "23" }, "status": { "privacyStatus": "public", "madeForKids": False } }, media_body=MediaFileUpload(video_path) ) res = request.execute() logger.info(f"Uploaded: https://youtube.com/watch?v={res['id']}") return f"https://youtube.com/watch?v={res['id']}" def get_next_part(): last = meta.find_one(sort=[("part", -1)]) return 1 if not last else last["part"] + 1 def generate_description(title): return f"Watch this hilarious clip: {title}" def save_to_db(part, title, desc, link): meta.insert_one({"part": part, "title": title, "description": desc, "link": link, "uploaded": time.time()}) # ───────────────────────────── MAIN AUTO LOOP def auto_loop(): asyncio.set_event_loop(asyncio.new_event_loop()) global UPLOAD_TIMES, NEXT_RESET ist = timezone(timedelta(hours=5, minutes=30)) daily_upload_count = random.randint(3, 5) uploads_done_today = 0 NEXT_RESET = datetime.now(ist).replace(hour=0, minute=0, second=0, microsecond=0) + timedelta(days=1) logger.info(f"[📅] Today's upload target: {daily_upload_count} reels.") def wait_until(hour: int, minute: int = 0): now = datetime.now(ist) target = now.replace(hour=hour, minute=minute, second=0, microsecond=0) if target < now: return logger.info(f"[🕗] Waiting until {target.strftime('%H:%M')} IST...") while datetime.now(ist) < target: time.sleep(10) wait_until(8) while True: try: now = datetime.now(ist) if now >= NEXT_RESET: UPLOAD_TIMES.clear() NEXT_RESET = now.replace(hour=0, minute=0, second=0, microsecond=0) + timedelta(days=1) daily_upload_count = random.randint(3, 5) uploads_done_today = 0 logger.info(f"[🔁] Reset for new day. New target: {daily_upload_count} uploads.") if uploads_done_today >= daily_upload_count: logger.info("[✅] Daily upload target reached.") time.sleep(60) continue if now.hour < 8 or now.hour >= 20: time.sleep(60) continue if not UPLOAD_TIMES or (now - UPLOAD_TIMES[-1]).total_seconds() >= random.randint(7200, 14400): video_path, reel_type = asyncio.run(fetch_valid_reel()) if not video_path: logger.warning("[⚠️] No valid reel found. Retrying...") time.sleep(60) continue edited = edit_video_raw(video_path) if reel_type == "raw" else edit_video(video_path) part = get_next_part() title = f"Try not to laugh || #{part} #funny #memes #comedy #shorts" desc = generate_description(title) link = upload_to_youtube(edited, title, desc) save_to_db(part, title, desc, link) logger.info(f"[📤] Uploaded #{part}: {link}") UPLOAD_TIMES.append(now) uploads_done_today += 1 os.remove(video_path) os.remove(edited) if uploads_done_today < daily_upload_count: gap_seconds = random.randint(7200, 14400) next_time = datetime.now(ist) + timedelta(seconds=gap_seconds) if next_time.hour >= 20: logger.info("[🌙] Next upload would exceed 8PM. Skipping.") continue logger.info(f"[⏳] Waiting ~{gap_seconds // 60} minutes before next upload.") time.sleep(gap_seconds) else: time.sleep(60) except Exception as e: logger.error(f"Loop error: {e}") time.sleep(60) if __name__ == "__main__": import asyncio from threading import Thread Thread(target=lambda: app.run(host="0.0.0.0", port=7860, debug=False, use_reloader=False)).start() # ✅ Run uploader loop in background Thread(target=auto_loop, daemon=True).start()