| import os |
| |
| import os |
| import time |
| import uuid |
|
|
|
|
| import json |
| import base64 |
| import asyncio |
| import nest_asyncio |
| import random |
| import logging |
| import atexit |
| import pathlib |
| from threading import Thread |
| from datetime import datetime, timedelta, timezone |
| from typing import Optional, Tuple, List |
|
|
| |
| from flask import Flask, render_template, request, redirect, session |
|
|
| |
| from moviepy.editor import ( |
| VideoFileClip, |
| ImageClip, |
| ColorClip, |
| CompositeVideoClip, |
| concatenate_videoclips |
| ) |
| from moviepy.video.fx import resize |
|
|
| |
| from PIL import Image, ImageDraw, ImageFont |
|
|
| |
| import numpy as np |
|
|
| |
| import requests |
|
|
| |
| import emoji |
|
|
| |
| from pymongo import MongoClient |
|
|
| |
| from google.oauth2.credentials import Credentials |
| from google.auth.transport.requests import Request |
| from googleapiclient.discovery import build |
| from googleapiclient.http import MediaFileUpload |
|
|
| |
| from yt_dlp import YoutubeDL |
|
|
| |
| from telegram import Update |
| from telegram.ext import ( |
| Application, |
| CommandHandler, |
| MessageHandler, |
| filters, |
| ContextTypes, |
| JobQueue |
| ) |
|
|
| UPLOAD_TIMES = [] |
| NEXT_RESET = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0) + timedelta(days=1) |
|
|
| |
| def patch_moviepy(): |
| original_resizer = resize.resize |
|
|
| def patched_resizer(clip, *args, **kwargs): |
| newsize = kwargs.get("newsize", None) |
| if newsize: |
| newsize = tuple(map(int, newsize)) |
| clip = clip.fl_image(lambda img: img.resize(newsize, Image.Resampling.LANCZOS)) |
| else: |
| clip = original_resizer(clip, *args, **kwargs) |
| return clip |
|
|
| resize.resize = patched_resizer |
|
|
| patch_moviepy() |
|
|
| import emoji |
| from PIL import Image, ImageDraw, ImageFont |
| import emoji |
| import os |
| import requests |
| from PIL import Image, ImageDraw, ImageFont |
| import emoji |
| import os, requests |
|
|
| import os |
| import emoji |
| import requests |
| from PIL import Image, ImageDraw, ImageFont |
|
|
| def create_text_image_2( |
| text: str, |
| width: int, |
| height: int, |
| *, |
| font_size: int = 60, |
| align: str = "center", |
| bg_color=(255, 255, 255), |
| text_color=(0, 0, 0) |
| ): |
| img = Image.new("RGBA", (width, height), color=bg_color) |
| draw = ImageDraw.Draw(img) |
|
|
| |
| try: |
| font = ImageFont.truetype("DejaVuSans-Bold.ttf", size=font_size) |
| except OSError: |
| font = ImageFont.load_default() |
|
|
| |
| all_emojis = emoji.emoji_list(text) |
| plain_text = emoji.replace_emoji(text, replace='') |
|
|
| |
| text_width, text_height = draw.textsize(plain_text, font=font) |
| total_emoji_width = len(all_emojis) * font_size |
| full_width = text_width + total_emoji_width + 5 * len(all_emojis) |
|
|
| |
| if align == "left": |
| x_start = 20 |
| elif align == "right": |
| x_start = max(20, width - full_width - 20) |
| else: |
| x_start = max(20, (width - full_width) // 2) |
|
|
| y_start = (height - text_height) // 2 |
|
|
| |
| draw.text((x_start, y_start), plain_text, font=font, fill=text_color) |
|
|
| |
| x = x_start + text_width + 5 |
| for em in all_emojis: |
| char = em["emoji"] |
| hexcode = "-".join(f"{ord(c):x}" for c in char) |
|
|
| |
| emoji_path = f"emoji_pngs/{hexcode}.png" |
| if not os.path.exists(emoji_path): |
| url = f"https://github.com/twitter/twemoji/raw/master/assets/72x72/{hexcode}.png" |
| os.makedirs("emoji_pngs", exist_ok=True) |
| try: |
| response = requests.get(url, timeout=5) |
| if response.ok: |
| with open(emoji_path, "wb") as f: |
| f.write(response.content) |
| except Exception: |
| continue |
|
|
| |
| if os.path.exists(emoji_path): |
| em_img = Image.open(emoji_path).convert("RGBA").resize((font_size, font_size)) |
| img.paste(em_img, (x, y_start), em_img) |
| x += font_size + 5 |
|
|
| return img |
|
|
| from PIL import Image, ImageDraw, ImageFont |
|
|
| def create_text_image_with_shadow( |
| text: str, |
| width: int, |
| height: int, |
| *, |
| font_size: int = 60, |
| align: str = "center", |
| bg_color=(0, 0, 0, 0), |
| text_color=(255, 255, 255), |
| shadow_color=(0, 0, 0), |
| font_name="DejaVuSans-Bold.ttf" |
| ): |
| |
| img = Image.new("RGBA", (width, height), bg_color) |
| draw = ImageDraw.Draw(img) |
|
|
| |
| try: |
| font = ImageFont.truetype(font_name, font_size) |
| except OSError: |
| raise ValueError(f"Font '{font_name}' not found. Ensure itβs installed system-wide or available.") |
|
|
| |
| text_bbox = draw.textbbox((0, 0), text, font=font) |
| text_width = text_bbox[2] - text_bbox[0] |
| text_height = text_bbox[3] - text_bbox[1] |
|
|
| |
| if align == "left": |
| x = 0 |
| elif align == "right": |
| x = width - text_width |
| else: |
| x = (width - text_width) // 2 |
| y = (height - text_height) // 2 |
|
|
| |
| draw.text((x + 1, y), text, font=font, fill=shadow_color) |
| |
| draw.text((x, y), text, font=font, fill=text_color) |
|
|
| return img |
| |
|
|
| def create_text_image(text, width, height): |
| img = Image.new("RGB", (width, height), color=(255, 255, 255)) |
| draw = ImageDraw.Draw(img) |
|
|
| |
| try: |
| font = ImageFont.truetype("DejaVuSans-Bold.ttf", size=60) |
| except: |
| font = ImageFont.load_default() |
|
|
| |
| emojis = emoji.emoji_list(text) |
| pure_text = emoji.replace_emoji(text, replace='') |
|
|
| |
| max_font_size = 70 |
| while True: |
| font = ImageFont.truetype("DejaVuSans-Bold.ttf", size=max_font_size) |
| text_width, text_height = draw.textsize(pure_text, font=font) |
| total_width = text_width + (len(emojis) * 60) + 20 |
| if total_width <= width - 40 or max_font_size <= 30: |
| break |
| max_font_size -= 2 |
|
|
| |
| start_x = (width - total_width) // 2 |
| y = (height - text_height) // 2 |
|
|
| |
| draw.text((start_x, y), pure_text, font=font, fill=(0, 0, 0)) |
|
|
| |
| x = start_x + text_width + 10 |
| for e in emojis: |
| hexcode = '-'.join(f"{ord(c):x}" for c in e['emoji']) |
| emoji_path = f"emoji_pngs/{hexcode}.png" |
| if not os.path.exists(emoji_path): |
| download_emoji_png(e['emoji']) |
|
|
| if os.path.exists(emoji_path): |
| emoji_img = Image.open(emoji_path).convert("RGBA") |
| emoji_img = emoji_img.resize((60, 60)) |
| img.paste(emoji_img, (x, y), emoji_img) |
| x += 60 + 4 |
|
|
| return img |
|
|
| from PIL import Image, ImageDraw, ImageFont |
| import numpy as np |
| from moviepy.editor import ImageClip |
|
|
| def generate_watermark_img(text, width, height=50): |
| img = Image.new("RGBA", (width, height), (0, 0, 0, 0)) |
| draw = ImageDraw.Draw(img) |
|
|
| try: |
| font = ImageFont.truetype("DejaVuSans-Bold.ttf", size=35) |
| except: |
| font = ImageFont.load_default() |
|
|
| text_width, text_height = draw.textsize(text, font=font) |
| draw.text((5, height - text_height - 2), text, fill="white", font=font, stroke_width=1, stroke_fill="black") |
|
|
| return img |
| |
| def download_emoji_png(emoji_char): |
| hexcode = '-'.join(f"{ord(c):x}" for c in emoji_char) |
| url = f"https://github.com/twitter/twemoji/raw/master/assets/72x72/{hexcode}.png" |
| os.makedirs("emoji_pngs", exist_ok=True) |
| path = f"emoji_pngs/{hexcode}.png" |
| try: |
| r = requests.get(url) |
| if r.status_code == 200: |
| with open(path, "wb") as f: |
| f.write(r.content) |
| print(f"β
Downloaded emoji: {emoji_char} β {path}") |
| else: |
| print(f"β Failed to download emoji: {emoji_char}") |
| except Exception as e: |
| print(f"β οΈ Error downloading emoji {emoji_char}: {e}") |
|
|
|
|
| def edit_video(video_path): |
| clip = VideoFileClip(video_path) |
|
|
| video_width = clip.w |
| video_height = clip.h |
| bar_height = 120 |
| total_height = video_height + bar_height |
|
|
| |
| final_bg = ColorClip(size=(video_width, total_height), color=(255, 255, 255), duration=clip.duration) |
|
|
| |
| caption = random.choice(CAPTIONS) |
| caption_img = create_text_image(caption, video_width, bar_height) |
| caption_clip = ImageClip(np.array(caption_img)).set_duration(clip.duration).set_position((0, 0)) |
|
|
| |
| eye_protection = ColorClip(size=(video_width, video_height), color=(255, 255, 255), duration=clip.duration) |
| eye_protection = eye_protection.set_opacity(0.1).set_position((0, bar_height)) |
|
|
| |
|
|
| |
| video_clip = clip.set_position((0, bar_height)) |
|
|
| |
| final = CompositeVideoClip( |
| [final_bg, caption_clip, video_clip, eye_protection], |
| size=(video_width, total_height) |
| ) |
|
|
| os.makedirs("edited", exist_ok=True) |
| output_path = f"edited/{uuid.uuid4().hex}.mp4" |
| final.write_videofile( |
| output_path, |
| codec="libx264", |
| audio_codec="aac", |
| preset="veryslow", |
| ffmpeg_params=[ |
| "-crf", "15", |
| "-pix_fmt", "yuv420p", |
| "-profile:v", "high", |
| "-level", "4.2", |
| ], |
| threads=2, |
| verbose=False, |
| logger=None |
| ) |
| return output_path |
|
|
| def edit_video_raw(video_path: str) -> str: |
| import os, uuid, random |
| import numpy as np |
| from moviepy.editor import ( |
| VideoFileClip, ImageClip, ColorClip, CompositeVideoClip |
| ) |
|
|
| |
| if not CAPTIONS: |
| raise ValueError("CAPTIONS list is empty!") |
|
|
| |
| clip = VideoFileClip(video_path) |
| vw, vh = 1080, 2100 |
|
|
| |
| CAPTION_H = 170 |
| LAUGH_H = 500 |
| MID_H = 120 |
| MAIN_H = vh - (CAPTION_H + LAUGH_H + MID_H) |
|
|
| if clip.duration < 6: |
| raise ValueError("Main video must be at least 6 seconds.") |
| if clip.duration > 180: |
| clip = clip.subclip(0, 180) |
|
|
| |
| caption_text = random.choice(CAPTIONS) |
| caption_img = create_text_image_2( |
| caption_text, vw, CAPTION_H, |
| font_size=72, align="center", |
| bg_color=(255, 255, 255), text_color=(0, 0, 0) |
| ) |
| caption_clip = ImageClip(np.array(caption_img)) \ |
| .set_duration(clip.duration) \ |
| .set_position((0, 0)) |
|
|
| |
| mid_text = random.choice([ |
| "Pura 1 din laga tab ye reel mili π€£", |
| "Ye miss mat kr dena π", |
| "Kha thi ye reel ab tak π€¨π€", |
| "Wait, ye dekh kr hi janna π₯π₯", |
| ]) |
| mid_img = create_text_image_2( |
| mid_text, vw, MID_H, |
| font_size=64, align="center", |
| bg_color=(0, 0, 0), text_color=(255, 255, 255) |
| ) |
| mid_caption_clip = ImageClip(np.array(mid_img)) \ |
| .set_duration(clip.duration) \ |
| .set_position((0, CAPTION_H + LAUGH_H)) |
|
|
| |
| clip = clip.resize(width=vw) |
| crop_y1 = 200 |
| crop_y2 = crop_y1 + MAIN_H |
| main_video = clip.crop(y1=crop_y1, y2=crop_y2) \ |
| .set_position((0, CAPTION_H + LAUGH_H + MID_H)) |
|
|
| |
| overlay = ColorClip(size=(vw, MAIN_H), color=(255, 255, 255), duration=clip.duration) \ |
| .set_opacity(0.1).set_position((0, CAPTION_H + LAUGH_H + MID_H)) |
|
|
| |
| laugh_files = ["laugh/laugh_one.mp4", "laugh/laugh_two.mp4"] |
| laugh_path = random.choice(laugh_files) |
| if not os.path.exists(laugh_path): |
| raise FileNotFoundError(f"β Laugh meme not found: {laugh_path}") |
|
|
| laugh_clip = VideoFileClip(laugh_path).resize(width=vw).subclip(0, 4) |
| y1 = (laugh_clip.h - LAUGH_H) // 2 |
| y2 = y1 + LAUGH_H |
| laugh_crop = laugh_clip.crop(y1=y1, y2=y2).set_position((0, CAPTION_H)) |
|
|
| |
| laugh_part1 = laugh_crop.subclip(0, 2).set_start(0) |
| laugh_frozen = ImageClip(laugh_crop.get_frame(2)) \ |
| .set_duration(clip.duration - 4).set_start(2) \ |
| .set_position((0, CAPTION_H)) |
| laugh_part3 = laugh_crop.subclip(2, 4).set_start(clip.duration - 2) |
|
|
| |
| final = CompositeVideoClip([ |
| caption_clip, |
| laugh_part1, |
| laugh_frozen, |
| laugh_part3, |
| mid_caption_clip, |
| main_video, |
| overlay |
| ], size=(vw, vh)).set_duration(clip.duration) |
|
|
| |
| os.makedirs("edited", exist_ok=True) |
| out_path = f"edited/{uuid.uuid4().hex}.mp4" |
| final.write_videofile( |
| out_path, |
| codec="libx264", |
| audio_codec="aac", |
| preset="veryslow", |
| ffmpeg_params=[ |
| "-crf", "15", |
| "-pix_fmt", "yuv420p", |
| "-profile:v", "high", |
| "-level", "4.2", |
| ], |
| threads=2, |
| logger=None, |
| fps=clip.fps |
| ) |
|
|
| clip.close(), laugh_clip.close(), final.close() |
| return out_path |
| |
| logging.basicConfig( |
| level=logging.INFO, |
| format="%(asctime)s - %(levelname)s - %(message)s", |
| handlers=[ |
| logging.FileHandler("app.log"), |
| logging.StreamHandler() |
| ] |
| ) |
| logger = logging.getLogger(__name__) |
|
|
| |
| CAPTIONS = [ |
| "Wait for it π", "Watch till end π", "Try not to laugh π€£", |
| "Don't skip this π₯", "You won't expect this! π", "Keep watching π", |
| "Stay till end! π₯", "Funniest one yet" |
| ] |
| BLOCKLIST = [ |
| "nsfw", "18+", "xxx", "sexy", "adult", "porn", "onlyfans", "escort", |
| "betting", "gambling", "iplwin", "1xbet", "winzo", "my11circle", "dream11", |
| "rummy", "teenpatti", "fantasy", "casino", "promotion" |
| ] |
|
|
| UPLOAD_TIMES: List[datetime] = [] |
| NEXT_RESET: datetime | None = None |
| first_run = True |
|
|
| |
| client = MongoClient(os.getenv("MONGO_URI")) |
| db1 = client.shortttt |
| meta = db1.meta |
|
|
| botdb = client.teleg4am_reelssss |
| a_raw = botdb.raw_links |
| a_reacted = botdb.reacted_links |
|
|
| |
| from flask import Flask |
|
|
| app = Flask(__name__) |
|
|
| @app.route("/") |
| def home(): |
| return "β
Code is running!" |
|
|
|
|
| |
| def get_random_link() -> Tuple[Optional[str], Optional[str]]: |
| raw_left = list(a_raw.find({"used": False})) |
| reacted_left = list(a_reacted.find({"used": False})) |
| if not raw_left and not reacted_left: |
| return None, None |
|
|
| choice_pool = "raw" if random.random() < 0.4 else "reacted" |
| if choice_pool == "raw" and not raw_left: |
| choice_pool = "reacted" |
| if choice_pool == "reacted" and not reacted_left: |
| choice_pool = "raw" |
|
|
| col, pool_list = (a_raw, raw_left) if choice_pool == "raw" else (a_reacted, reacted_left) |
| doc = random.choice(pool_list) |
| col.update_one({"_id": doc["_id"]}, {"$set": {"used": True}}) |
| return doc["link"], choice_pool |
| import os |
| import re |
| import uuid |
| import asyncio |
| import pathlib |
| import logging |
| from typing import Optional, Tuple |
|
|
| from telethon import TelegramClient |
| from telethon.sessions import StringSession |
| from telethon.tl.types import DocumentAttributeVideo |
| from moviepy.editor import VideoFileClip |
|
|
| API_ID = int(os.getenv("TG_API_ID", "3704772")) |
| API_HASH = os.getenv("TG_API_HASH", "b8e50a035abb851c0dd424e14cac4c06") |
| SESSION_STR = os.getenv("SESSION") |
| TARGET_BOT = "instasavegrambot" |
|
|
| logger = logging.getLogger(__name__) |
|
|
| def tg_duration_seconds(message) -> Optional[int]: |
| if not message or not message.media or not message.media.document: |
| return None |
| for attr in message.media.document.attributes: |
| if isinstance(attr, DocumentAttributeVideo): |
| return attr.duration |
| return None |
| import asyncio |
| import logging |
| import shutil |
|
|
| async def download_url_mp4(url: str, filename: str, timeout: int = 30) -> bool: |
| |
| if not shutil.which("wget"): |
| logger.error("β wget is not installed or not in PATH.") |
| return False |
|
|
| |
| wget_cmd = [ |
| "wget", |
| "--quiet", |
| f"--timeout={timeout}", |
| "--header=User-Agent: Mozilla/5.0 (Linux; Android 10)", |
| "--header=Referer: https://www.instagram.com/", |
| "-O", filename, |
| url |
| ] |
|
|
| try: |
| |
| proc = await asyncio.create_subprocess_exec( |
| *wget_cmd, |
| stdout=asyncio.subprocess.PIPE, |
| stderr=asyncio.subprocess.PIPE, |
| ) |
|
|
| |
| stdout, stderr = await proc.communicate() |
|
|
| if proc.returncode == 0: |
| logger.info(f"π₯ Downloaded MP4 β {filename}") |
| return True |
| else: |
| logger.error( |
| f"β wget exited with {proc.returncode}\nSTDERR: {stderr.decode().strip()}" |
| ) |
| return False |
|
|
| except Exception as e: |
| logger.error(f"β wget launch failed: {e}") |
| return False |
|
|
|
|
| async def send_to_bot_and_get_video(link: str) -> Tuple[Optional[str], Optional[int]]: |
| async with TelegramClient(StringSession(SESSION_STR), API_ID, API_HASH) as client: |
| bot = await client.get_entity(TARGET_BOT) |
|
|
| async def attempt(send_link: str, depth=0) -> Tuple[Optional[str], Optional[int]]: |
| if depth > 2: |
| logger.warning("π Retry limit reached.") |
| return None, None |
|
|
| async with client.conversation(bot, timeout=30) as conv: |
| await conv.send_message(send_link) |
| logger.info(f"π€ Sent to {TARGET_BOT}: {send_link}") |
|
|
| try: |
| reply = await conv.get_response() |
| except asyncio.TimeoutError: |
| logger.warning("β° No reply from bot in 30 seconds.") |
| return None, None |
|
|
| msg = reply |
| text = msg.message or "" |
| logger.info(f"π¬ Bot replied: {text[:80]}") |
|
|
| |
| duration = tg_duration_seconds(msg) |
| if duration: |
| if 20 <= duration <= 180: |
| pathlib.Path("reels").mkdir(exist_ok=True) |
| file_path = await msg.download_media(file="reels/") |
| logger.info(f"β
Downloaded video β {file_path}") |
| return file_path, duration |
| logger.info(f"β© Skipped due to invalid duration = {duration}s") |
| return None, None |
|
|
| |
| urls = re.findall(r"https://[^\s]+", text) |
| if urls: |
| cdn_url = urls[0].strip() |
| pathlib.Path("reels").mkdir(exist_ok=True) |
| fname = f"reels/{uuid.uuid4().hex}.mp4" |
| success = await download_url_mp4(cdn_url, fname) |
| if not success: |
| return None, None |
|
|
| |
| try: |
| clip = VideoFileClip(fname) |
| duration = int(clip.duration) |
| clip.close() |
| if 20 <= duration <= 180: |
| return fname, duration |
| else: |
| logger.info(f"β© CDN duration = {duration}s β Skipped") |
| os.remove(fname) |
| return None, None |
| except Exception as e: |
| logger.warning(f"ποΈ Duration read failed: {e}") |
| return None, None |
|
|
| |
| if "Request failed" in text: |
| logger.warning("π Bot said request failed, retrying onceβ¦") |
| await asyncio.sleep(5) |
| return await attempt(send_link, depth + 1) |
| if "We are experiencing high" in text: |
| await asyncio.sleep(3600) |
| return await attempt(send_link, depth + 1) |
| |
| if "http" in text and (msg.photo or text.count(" ") > 0): |
| logger.info("β Detected ad/promo. Ignored.") |
| return None, None |
|
|
| logger.info("β No usable video received from bot.") |
| return None, None |
|
|
| return await attempt(link) |
| |
| async def fetch_valid_reel() -> Tuple[Optional[str], Optional[str]]: |
| for _ in range(10): |
| link, pool = get_random_link() |
| if not link: |
| return None, None |
| logger.info(f"Trying {pool} link: {link}") |
| video_path, duration = await send_to_bot_and_get_video(link) |
| if video_path: |
| return video_path, pool |
| await asyncio.sleep(15) |
| return None, None |
|
|
| db = client["youtube"] |
| tokens = db["tokens"] |
|
|
| import os |
| from google.auth.transport.requests import Request |
| from google.oauth2.credentials import Credentials |
| from googleapiclient.discovery import build |
| from googleapiclient.http import MediaFileUpload |
|
|
| |
|
|
| def get_stored_token(): |
| return tokens.find_one({"_id": "youtube_token"}) |
|
|
| def save_token(creds: Credentials): |
| tokens.update_one( |
| {"_id": "youtube_token"}, |
| {"$set": { |
| "access_token": creds.token, |
| "expiry": creds.expiry.isoformat() if creds.expiry else None, |
| }}, |
| upsert=True |
| ) |
|
|
| def upload_to_youtube(video_path, title, desc): |
| |
| stored = get_stored_token() |
| token = stored["access_token"] if stored else None |
| expiry = stored["expiry"] if stored else None |
|
|
| |
| if expiry: |
| expiry = datetime.fromisoformat(expiry) |
| now = datetime.utcnow() |
|
|
| |
| creds = Credentials( |
| token=token if expiry and now < expiry else None, |
| refresh_token=os.getenv("YT_REFRESH_TOKEN"), |
| token_uri="https://oauth2.googleapis.com/token", |
| client_id=os.getenv("YT_CLIENT_ID"), |
| client_secret=os.getenv("YT_CLIENT_SECRET"), |
| scopes=["https://www.googleapis.com/auth/youtube.upload"] |
| ) |
|
|
| |
| if not creds.valid or creds.expired: |
| creds.refresh(Request()) |
| save_token(creds) |
| logger.info("π Refreshed access token") |
|
|
| |
| youtube = build("youtube", "v3", credentials=creds) |
|
|
| |
| request = youtube.videos().insert( |
| part="snippet,status", |
| body={ |
| "snippet": { |
| "title": title, |
| "description": desc, |
| "tags": ["funny", "memes", "comedy", "shorts"], |
| "categoryId": "23" |
| }, |
| "status": { |
| "privacyStatus": "public", |
| "madeForKids": False |
| } |
| }, |
| media_body=MediaFileUpload(video_path) |
| ) |
|
|
| res = request.execute() |
| video_url = f"https://youtube.com/watch?v={res['id']}" |
| logger.info(f"β
Uploaded: {video_url}") |
| return video_url |
|
|
|
|
| def get_next_part(): |
| last = meta.find_one(sort=[("part", -1)]) |
| return 1 if not last else last["part"] + 1 |
|
|
| def generate_description(title): |
| return f"Watch this hilarious clip: {title}" |
|
|
| def save_to_db(part, title, desc, link): |
| meta.insert_one({"part": part, "title": title, "description": desc, "link": link, "uploaded": time.time()}) |
|
|
| |
|
|
| def auto_loop(): |
| asyncio.set_event_loop(asyncio.new_event_loop()) |
| global UPLOAD_TIMES, NEXT_RESET |
|
|
| from datetime import datetime, timedelta, timezone |
| import time, os |
|
|
| ist = timezone(timedelta(hours=5, minutes=30)) |
|
|
| |
| DAILY_SLOTS = [ |
| (11, 30), |
| (18, 30), |
| (20, 0) |
| ] |
|
|
| uploads_done_today = 0 |
| NEXT_RESET = datetime.now(ist).replace(hour=0, minute=0, second=0, microsecond=0) + timedelta(days=1) |
|
|
| logger.info("[π
] Using FIXED GOLDEN SLOTS: 11:30 AM, 6:30 PM, 8:00 PM IST") |
|
|
| while True: |
| try: |
| now = datetime.now(ist) |
|
|
| |
| if now >= NEXT_RESET: |
| UPLOAD_TIMES.clear() |
| uploads_done_today = 0 |
| NEXT_RESET = now.replace(hour=0, minute=0, second=0, microsecond=0) + timedelta(days=1) |
| logger.info("[π] Daily reset completed.") |
|
|
| |
| if uploads_done_today >= len(DAILY_SLOTS): |
| time.sleep(60) |
| continue |
|
|
| |
| next_slot_hour, next_slot_min = DAILY_SLOTS[uploads_done_today] |
| next_upload_time = now.replace(hour=next_slot_hour, minute=next_slot_min, second=0, microsecond=0) |
|
|
| |
| if now > next_upload_time: |
| uploads_done_today += 1 |
| continue |
|
|
| |
| sleep_seconds = (next_upload_time - now).total_seconds() |
| if sleep_seconds > 0: |
| logger.info(f"[β³] Sleeping until next slot: {next_upload_time.strftime('%I:%M %p')}") |
| time.sleep(sleep_seconds) |
|
|
| |
| logger.info(f"[π] Uploading at golden time: {next_upload_time.strftime('%I:%M %p')}") |
|
|
| video_path, reel_type = asyncio.run(fetch_valid_reel()) |
|
|
| if not video_path: |
| logger.warning("[β οΈ] No valid reel found. Retrying in 5 mins...") |
| time.sleep(300) |
| continue |
|
|
| edited = edit_video_raw(video_path) if reel_type == "raw" else edit_video(video_path) |
|
|
| part = get_next_part() |
| title = f"Try not to laugh || #{part} #funny #memes #comedy #shorts" |
| desc = generate_description(title) |
|
|
| link = upload_to_youtube(edited, title, desc) |
| save_to_db(part, title, desc, link) |
|
|
| logger.info(f"[π€] Uploaded #{part}: {link}") |
|
|
| UPLOAD_TIMES.append(datetime.now(ist)) |
| uploads_done_today += 1 |
|
|
| |
| if os.path.exists(video_path): |
| os.remove(video_path) |
| if os.path.exists(edited): |
| os.remove(edited) |
|
|
| |
| time.sleep(120) |
|
|
| except Exception as e: |
| logger.error(f"[β] Loop error: {e}") |
| time.sleep(60) |
|
|
| |
| if __name__ == "__main__": |
| import asyncio |
| from threading import Thread |
|
|
| Thread(target=lambda: app.run(host="0.0.0.0", port=7860, debug=False, use_reloader=False)).start() |
|
|
| |
| Thread(target=auto_loop, daemon=True).start() |