Spaces:
Paused
Paused
| import os | |
| import random | |
| import uuid | |
| import time | |
| import json | |
| import base64 | |
| import asyncio | |
| import logging | |
| import atexit | |
| import numpy as np | |
| import requests | |
| from flask import Flask, render_template, request, redirect | |
| from threading import Thread | |
| from pymongo import MongoClient | |
| from google.oauth2.credentials import Credentials | |
| from google.auth.transport.requests import Request | |
| from googleapiclient.discovery import build | |
| from googleapiclient.http import MediaFileUpload | |
| from moviepy.editor import VideoFileClip, ColorClip, CompositeVideoClip, ImageClip | |
| from moviepy.video.fx import resize | |
| from moviepy.editor import VideoFileClip, CompositeVideoClip, ColorClip, ImageClip, TextClip | |
| from PIL import Image, ImageDraw, ImageFont | |
| from datetime import datetime, timedelta, timezone | |
| from moviepy.editor import * | |
| import builtins | |
| import logging | |
| import re | |
| import builtins | |
| import re | |
| import subprocess | |
| # paste your full cookie JSON here | |
| raw_cookies = [ | |
| {"name": "datr", "value": "ejpyaDZsc2mYbftSKZ3Sk2Lu"}, | |
| {"name": "ds_user_id", "value": "75543047114"}, | |
| {"name": "csrftoken", "value": "UCHoYMciHF3W2HynHCA3fBEZfJd1beB5"}, | |
| {"name": "ig_did", "value": "E6AD4B02-7FC5-4C36-9570-7AE89F1A85D3"}, | |
| {"name": "wd", "value": "468x935"}, | |
| {"name": "mid", "value": "aHI6egABAAEk7DQNY7Nj1qArGgnq"}, | |
| {"name": "sessionid", "value": "75543047114%3AmWa1OwxIcbgqMd%3A6%3AAYcaKZIghR6DJJsEZSZjsunYklzQ4jcTREcsdBkIrA"}, | |
| {"name": "dpr", "value": "2.608695652173913"}, | |
| {"name": "rur", "value": "\"NHA\\05475543047114\\0541783852609:01feb3e34e17329d37efbc1dfbc7d6c7cbdfd7496e0c5ea0f5e76a3125f0dda6e0b45e18\""} | |
| ] | |
| # Convert to requests cookie dict | |
| cookies = {c['name']: c['value'] for c in raw_cookies} | |
| #====== LOGGING SETUP ====== | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format="%(asctime)s - %(levelname)s - %(message)s", | |
| handlers=[ | |
| logging.FileHandler("app.log"), | |
| logging.StreamHandler() | |
| ] | |
| ) | |
| logger = logging.getLogger(__name__) | |
| # ====== FLASK APP ====== | |
| app = Flask(__name__) | |
| # ====== MONGO DB ====== | |
| client = MongoClient(os.getenv("MONGO_URI", "mongodb+srv://kanhagarg930123:kanha@kanha.jhlzb3k.mongodb.net/?retryWrites=true&w=majority&appName=kanha")) | |
| db1 = client.shortttt | |
| meta = db1.meta | |
| db2 = client["instagram_bot"] | |
| reel_progress = db2["reel_progresss"] | |
| # ====== CONSTANTS ====== | |
| CAPTIONS = [ | |
| "Wait for it π", "Watch till end π", "Try not to laugh π€£", | |
| "Don't skip this π₯", "You won't expect this! π", "Keep watching π", | |
| "Stay till end! π₯", "Funniest one yet" | |
| ] | |
| BLOCKLIST = [ | |
| "nsfw","18+", "xxx", "sexy", "adult", "porn", "onlyfans", "escort", | |
| "betting", "gambling", "iplwin", "1xbet", "winzo", "my11circle", "dream11", | |
| "rummy", "teenpatti", "fantasy", "casino", "promotion" | |
| ] | |
| UPLOAD_TIMES = [] | |
| NEXT_RESET = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0) + timedelta(days=1) | |
| # ====== FUNCTIONS ====== | |
| def get_next_part(): | |
| last = meta.find_one(sort=[("part", -1)]) | |
| return 1 if not last else last["part"] + 1 | |
| def generate_description(title): | |
| return f"Watch this hilarious clip: {title}" | |
| # MongoDB integration assumed via reel_progress collection | |
| def get_next_fetch_index(username): | |
| entry = reel_progress.find_one({"username": username}) | |
| return entry["last_index"] if entry and "last_index" in entry else 0 | |
| def update_fetch_index(username, new_index): | |
| reel_progress.update_one( | |
| {"username": username}, | |
| {"$set": {"last_index": new_index}}, | |
| upsert=True | |
| ) | |
| def is_reel_fetched_or_skipped(username, reel_id): | |
| entry = reel_progress.find_one({"username": username}) | |
| if not entry: | |
| return False | |
| return reel_id in entry.get("fetched_ids", []) or reel_id in entry.get("skipped_ids", []) | |
| def mark_reel_fetched(username, reel_id): | |
| reel_progress.update_one( | |
| {"username": username}, | |
| {"$addToSet": {"fetched_ids": reel_id}}, | |
| upsert=True | |
| ) | |
| def mark_reel_skipped(username, reel_id): | |
| reel_progress.update_one( | |
| {"username": username}, | |
| {"$addToSet": {"skipped_ids": reel_id}}, | |
| upsert=True | |
| ) | |
| import os | |
| import base64 | |
| import tempfile | |
| from telethon.sync import TelegramClient | |
| import os | |
| from telethon.sync import TelegramClient | |
| from telethon.sessions import StringSession | |
| TG_API_ID = int(os.getenv("API_ID", 3704772)) | |
| TG_API_HASH = os.getenv("API_HASH", "b8e50a035abb851c0dd424e14cac4c06") | |
| SESSION_B64 = os.getenv("SESSION", "1BVtsOIEBu5jNa-E88vbC710wzyAKnteiQK7WQNvDK6wCeD3Q_c33uSaqvlvCUMglbhuVtugstOwIuE7WV2EuLGUEHH3AFJ84MZ93WVs010bx4N9nQOaQJNjIZDe4Nllq9r5PKRXxjgwuSqN-B7_TfpjQJT_ztOqQNZTQV3o9EqPBXfpiMzVF5U638wuRDVkInjbgAkI9ao36KDmcvBJzAW91l27loIsUL-Zst9H6XbAbVgqfs1fTOI6xQqPEyrFA-gB7lHbKrkqILwAmK88tTRQFHOvvXkGkJpDrXb1MZQssaVilGfXQ7sXWPltMpHklRy8HaPdfOUJ8t105DHXf-CN6SvzombM=") | |
| if not SESSION_B64: | |
| raise ValueError("SESSION is not set") | |
| client = TelegramClient(StringSession(SESSION_B64), TG_API_ID, TG_API_HASH) | |
| import json | |
| import time | |
| import random | |
| import pathlib | |
| import requests | |
| from typing import Optional, Tuple | |
| from telethon.sync import TelegramClient | |
| from telethon.tl.functions.messages import GetHistoryRequest | |
| # Assumes MongoDB functions are available: | |
| # get_next_fetch_index, update_fetch_index, is_reel_fetched_or_skipped, | |
| # mark_reel_fetched, mark_reel_skipped | |
| RAW_USERNAMES = os.getenv("RAW_USERNAMES", "nickyteja06,videshi__indian,_notyourtype_yt_,sisterfunnyreel").split(",") | |
| REACTED_USERNAMES = os.getenv("REACTED_USERNAMES", "biggestreact").split(",") | |
| import os | |
| import json | |
| import time | |
| import random | |
| import pathlib | |
| import requests | |
| from typing import Optional, Tuple | |
| from telethon.tl.functions.messages import GetHistoryRequest | |
| # Constants | |
| IG_HEADERS = { | |
| "User-Agent": "Mozilla/5.0", | |
| # Optional: Uncomment if using sessionid | |
| # "Cookie": f"sessionid={os.getenv('IG_SESSION_ID')}" | |
| } | |
| import requests | |
| def get_user_id(username: str) -> Optional[str]: | |
| Β Β headers = { | |
| Β Β Β Β "User-Agent": "Mozilla/5.0 (Linux; Android 10; Mobile)", | |
| Β Β Β Β "X-IG-App-ID": "936619743392459", | |
| Β Β Β Β "Referer": f"https://www.instagram.com/{username}/", | |
| Β Β Β Β "X-Requested-With": "XMLHttpRequest" | |
| Β Β } | |
| Β Β try: | |
| Β Β Β Β res = requests.get( | |
| Β Β Β Β Β Β f"https://www.instagram.com/api/v1/users/web_profile_info/?username={username}", | |
| Β Β Β Β Β Β headers=headers, | |
| Β Β Β Β Β Β cookies=cookies, Β # cookies from JSON you pasted | |
| Β Β Β Β Β Β timeout=10 | |
| Β Β Β Β ) | |
| Β Β Β Β if res.status_code == 200: | |
| Β Β Β Β Β Β return res.json()["data"]["user"]["id"] | |
| Β Β Β Β print(f"β οΈ Failed to get user ID: {res.status_code}") | |
| Β Β except Exception as e: | |
| Β Β Β Β print(f"β Error fetching user ID for @{username}: {e}") | |
| Β Β return None | |
| def get_links(user_id: str, limit: int = 5) -> list[Tuple[str, str]]: | |
| links = [] | |
| query_hash = "58b6785bea111c67129decbe6a448951" | |
| end_cursor = "" | |
| while len(links) < limit: | |
| variables = { | |
| "id": user_id, | |
| "first": 12, | |
| "after": end_cursor | |
| } | |
| params = { | |
| "query_hash": query_hash, | |
| "variables": json.dumps(variables) | |
| } | |
| try: | |
| res = requests.get("https://www.instagram.com/graphql/query/", params=params, headers=IG_HEADERS, timeout=10) | |
| if res.status_code != 200: | |
| logger.warning(f"[β οΈ] Failed to get links for user {user_id}, status {res.status_code}") | |
| break | |
| media = res.json()["data"]["user"]["edge_owner_to_timeline_media"] | |
| for edge in media["edges"]: | |
| reel_id = edge["node"]["id"] | |
| shortcode = edge["node"]["shortcode"] | |
| link = f"https://www.instagram.com/p/{shortcode}/" | |
| links.append((reel_id, link)) | |
| if len(links) >= limit: | |
| break | |
| if not media["page_info"]["has_next_page"]: | |
| break | |
| end_cursor = media["page_info"]["end_cursor"] | |
| except Exception as e: | |
| logger.warning(f"[β] get_links() error: {e}") | |
| break | |
| logger.info(f"[π] Found {len(links)} links for user ID {user_id}") | |
| return links | |
| def send_to_bot_and_get_video(link: str) -> Optional[str]: | |
| try: | |
| entity = client.loop.run_until_complete(client.get_entity("instasavegrambot")) | |
| client.loop.run_until_complete(client.send_message(entity, link)) | |
| for _ in range(15): # Wait up to 30s | |
| time.sleep(2) | |
| history = client.loop.run_until_complete( | |
| client(GetHistoryRequest( | |
| peer=entity, | |
| limit=2, | |
| offset_id=0, | |
| offset_date=None, | |
| add_offset=0, | |
| max_id=0, | |
| min_id=0, | |
| hash=0 | |
| )) | |
| ) | |
| for msg in history.messages: | |
| logger.info(f"[π¨] Bot message: {msg.message or '[media]'}") | |
| if msg.video and 20 <= msg.video.duration <= 180: | |
| path = f"reels/{msg.id}.mp4" | |
| pathlib.Path("reels").mkdir(exist_ok=True) | |
| client.loop.run_until_complete(msg.download_media(file=path)) | |
| logger.info(f"[β ] Downloaded reel to {path}") | |
| return path | |
| except Exception as e: | |
| logger.warning(f"[β] send_to_bot_and_get_video error: {e}") | |
| return None | |
| def fetch_valid_reel() -> Tuple[Optional[str], Optional[str]]: | |
| with client: | |
| pools = [(RAW_USERNAMES, "raw"), (REACTED_USERNAMES, "reacted")] | |
| pools = [(p, t) for p, t in pools if p] | |
| if not pools: | |
| logger.warning("[β οΈ] No usernames in either RAW or REACTED pools.") | |
| return None, None | |
| random.shuffle(pools) | |
| for usernames, reel_type in pools: | |
| random.shuffle(usernames) | |
| for username in usernames: | |
| logger.info(f"[π] Trying @{username} ({reel_type})") | |
| uid = get_user_id(username) | |
| if not uid: | |
| logger.warning(f"[π«] Skipping @{username}, failed to get user ID.") | |
| continue | |
| fetch_idx = get_next_fetch_index(username) | |
| links = get_links(uid, limit=5) | |
| for idx, (reel_id, link) in enumerate(links[fetch_idx:], start=fetch_idx): | |
| if is_reel_fetched_or_skipped(username, reel_id): | |
| logger.info(f"[β©] Already processed: @{username}/{reel_id}") | |
| continue | |
| logger.info(f"[π₯] Sending link to bot: {link}") | |
| video_path = send_to_bot_and_get_video(link) | |
| if not video_path: | |
| logger.warning(f"[β οΈ] Failed to fetch: {link}") | |
| mark_reel_skipped(username, reel_id) | |
| continue | |
| mark_reel_fetched(username, reel_id) | |
| update_fetch_index(username, idx + 1) | |
| return video_path, reel_type | |
| update_fetch_index(username, fetch_idx + len(links)) | |
| logger.warning("[π«] No valid reels found after checking all usernames.") | |
| return None, None | |
| def patch_moviepy(): | |
| original_resizer = resize.resize | |
| def patched_resizer(clip, *args, **kwargs): | |
| newsize = kwargs.get("newsize", None) | |
| if newsize: | |
| newsize = tuple(map(int, newsize)) | |
| clip = clip.fl_image(lambda img: img.resize(newsize, Image.Resampling.LANCZOS)) | |
| else: | |
| clip = original_resizer(clip, *args, **kwargs) | |
| return clip | |
| resize.resize = patched_resizer | |
| patch_moviepy() | |
| import emoji | |
| from PIL import Image, ImageDraw, ImageFont | |
| import emoji | |
| import os | |
| import requests | |
| def create_text_image(text, width, height): | |
| img = Image.new("RGB", (width, height), color=(255, 255, 255)) | |
| draw = ImageDraw.Draw(img) | |
| # Load font | |
| try: | |
| font = ImageFont.truetype("DejaVuSans-Bold.ttf", size=60) | |
| except: | |
| font = ImageFont.load_default() | |
| # Extract emoji and clean text | |
| emojis = emoji.emoji_list(text) | |
| pure_text = emoji.replace_emoji(text, replace='') | |
| # Adjust font size to fit | |
| max_font_size = 70 | |
| while True: | |
| font = ImageFont.truetype("DejaVuSans-Bold.ttf", size=max_font_size) | |
| text_width, text_height = draw.textsize(pure_text, font=font) | |
| total_width = text_width + (len(emojis) * 60) + 20 | |
| if total_width <= width - 40 or max_font_size <= 30: | |
| break | |
| max_font_size -= 2 | |
| # Starting X & Y for centered layout | |
| start_x = (width - total_width) // 2 | |
| y = (height - text_height) // 2 | |
| # Draw text first | |
| draw.text((start_x, y), pure_text, font=font, fill=(0, 0, 0)) | |
| # Then draw emojis to the right of the text | |
| x = start_x + text_width + 10 | |
| for e in emojis: | |
| hexcode = '-'.join(f"{ord(c):x}" for c in e['emoji']) | |
| emoji_path = f"emoji_pngs/{hexcode}.png" | |
| if not os.path.exists(emoji_path): | |
| download_emoji_png(e['emoji']) | |
| if os.path.exists(emoji_path): | |
| emoji_img = Image.open(emoji_path).convert("RGBA") | |
| emoji_img = emoji_img.resize((60, 60)) | |
| img.paste(emoji_img, (x, y), emoji_img) | |
| x += 60 + 4 | |
| return img | |
| from PIL import Image, ImageDraw, ImageFont | |
| import numpy as np | |
| from moviepy.editor import ImageClip | |
| def generate_watermark_img(text, width, height=50): | |
| img = Image.new("RGBA", (width, height), (0, 0, 0, 0)) | |
| draw = ImageDraw.Draw(img) | |
| try: | |
| font = ImageFont.truetype("DejaVuSans-Bold.ttf", size=35) | |
| except: | |
| font = ImageFont.load_default() | |
| text_width, text_height = draw.textsize(text, font=font) | |
| draw.text((5, height - text_height - 2), text, fill="white", font=font, stroke_width=1, stroke_fill="black") | |
| return img | |
| def download_emoji_png(emoji_char): | |
| hexcode = '-'.join(f"{ord(c):x}" for c in emoji_char) | |
| url = f"https://github.com/twitter/twemoji/raw/master/assets/72x72/{hexcode}.png" | |
| os.makedirs("emoji_pngs", exist_ok=True) | |
| path = f"emoji_pngs/{hexcode}.png" | |
| try: | |
| r = requests.get(url) | |
| if r.status_code == 200: | |
| with open(path, "wb") as f: | |
| f.write(r.content) | |
| print(f"β Downloaded emoji: {emoji_char} β {path}") | |
| else: | |
| print(f"β Failed to download emoji: {emoji_char}") | |
| except Exception as e: | |
| print(f"β οΈ Error downloading emoji {emoji_char}: {e}") | |
| def edit_video(video_path): | |
| clip = VideoFileClip(video_path) | |
| video_width = clip.w | |
| video_height = clip.h | |
| bar_height = 120 | |
| total_height = video_height + bar_height | |
| # === 1. Background Canvas | |
| final_bg = ColorClip(size=(video_width, total_height), color=(255, 255, 255), duration=clip.duration) | |
| # === 2. Caption Bar (Top) | |
| caption = random.choice(CAPTIONS) | |
| caption_img = create_text_image(caption, video_width, bar_height) | |
| caption_clip = ImageClip(np.array(caption_img)).set_duration(clip.duration).set_position((0, 0)) | |
| # === 3. Eye Protection Overlay (6% White Transparent) | |
| eye_protection = ColorClip(size=(video_width, video_height), color=(255, 255, 255), duration=clip.duration) | |
| eye_protection = eye_protection.set_opacity(0.1).set_position((0, bar_height)) | |
| # === 4. Watermark (Bottom-left using Pillow + ImageClip) | |
| watermark_img = generate_watermark_img("@fulltosscomedy4u", video_width, height=50) | |
| watermark_clip = ImageClip(np.array(watermark_img)).set_duration(clip.duration).set_position(("left", bar_height + video_height - 50)) | |
| # === 5. Position original video below top bar | |
| video_clip = clip.set_position((0, bar_height)) | |
| # === 6. Combine everything | |
| final = CompositeVideoClip( | |
| [final_bg, caption_clip, video_clip, eye_protection, watermark_clip], | |
| size=(video_width, total_height) | |
| ) | |
| os.makedirs("edited", exist_ok=True) | |
| output_path = f"edited/{uuid.uuid4().hex}.mp4" | |
| final.write_videofile( | |
| output_path, | |
| codec="libx264", | |
| audio_codec="aac", | |
| preset="slow", | |
| bitrate="12000k", | |
| verbose=False, | |
| logger=None | |
| ) | |
| return output_path | |
| def edit_video_raw(video_path): | |
| clip = VideoFileClip(video_path) | |
| video_width = clip.w | |
| video_height = clip.h | |
| top_bar_height = 120 | |
| mid_bar_height = 80 | |
| total_height = video_height + top_bar_height + mid_bar_height | |
| # === 1. Background Canvas | |
| final_bg = ColorClip(size=(video_width, total_height), color=(255, 255, 255), duration=clip.duration + 6) | |
| # === 2. Top Caption (Black on White) | |
| caption = random.choice(CAPTIONS) | |
| caption_img = create_text_image( | |
| caption, | |
| width=video_width, | |
| height=top_bar_height, | |
| font_size=min(max(int(video_width * 0.08), 48), 80), # Auto-resize | |
| align="center", | |
| bg_color=(255, 255, 255), | |
| text_color=(0, 0, 0) | |
| ) | |
| caption_clip = ImageClip(np.array(caption_img)).set_duration(clip.duration + 6).set_position((0, 0)) | |
| # === 3. Middle Caption (White on Black) | |
| mid_text = random.choice([ | |
| "Pura 1 din laga tab ye reel mili π€£", | |
| "Ye miss mat kr dena π", | |
| "Kha thi ye reel ab tak π€¨π€", | |
| "Wait, ye dekh kr hi janna π₯π₯" | |
| ]) | |
| mid_img = create_text_image( | |
| mid_text, | |
| width=video_width, | |
| height=mid_bar_height, | |
| font_size=min(max(int(video_width * 0.06), 40), 64), | |
| align="center", | |
| bg_color=(0, 0, 0), | |
| text_color=(255, 255, 255) | |
| ) | |
| mid_caption_clip = ImageClip(np.array(mid_img)).set_duration(clip.duration + 6).set_position((0, top_bar_height)) | |
| # === 4. Eye Protection Overlay | |
| eye_protection = ColorClip(size=(video_width, video_height), color=(255, 255, 255), duration=clip.duration + 6) | |
| eye_protection = eye_protection.set_opacity(0.1).set_position((0, top_bar_height + mid_bar_height)) | |
| # === 5. Watermark (Bottom Left) | |
| watermark_img = generate_watermark_img("@fulltosscomedy4u", video_width, height=50) | |
| watermark_clip = ImageClip(np.array(watermark_img)).set_duration(clip.duration + 6).set_position(("left", total_height - 50)) | |
| # === 6. Meme Section: Laugh -> Freeze -> Laugh | |
| laugh_index = random.choice([1, 2]) | |
| laugh_clip = VideoFileClip(f"laugh/{laugh_index}.mp4").resize(width=video_width).set_duration(2) | |
| freeze_frame = laugh_clip.to_ImageClip().set_duration(max(1, clip.duration - 4)) | |
| meme_part = concatenate_videoclips([laugh_clip, freeze_frame, laugh_clip]) | |
| # === 7. Combine All Video Parts | |
| full_video = concatenate_videoclips([meme_part, clip]) | |
| full_video = full_video.set_position((0, top_bar_height + mid_bar_height)) | |
| # === 8. Composite Final Video | |
| final = CompositeVideoClip( | |
| [final_bg, caption_clip, mid_caption_clip, full_video, eye_protection, watermark_clip], | |
| size=(video_width, total_height) | |
| ) | |
| # === 9. Export | |
| os.makedirs("edited", exist_ok=True) | |
| output_path = f"edited/{uuid.uuid4().hex}.mp4" | |
| final.write_videofile( | |
| output_path, | |
| codec="libx264", | |
| audio_codec="aac", | |
| preset="slow", | |
| bitrate="12000k", | |
| threads=4, | |
| verbose=False, | |
| logger=None | |
| ) | |
| return output_path | |
| def upload_to_youtube(video_path, title, desc): | |
| creds = Credentials( | |
| token=None, | |
| refresh_token=os.getenv("YT_REFRESH_TOKEN"), | |
| token_uri="https://oauth2.googleapis.com/token", | |
| client_id=os.getenv("YT_CLIENT_ID", "387804137131-sjih05p3329n5n72gsgglv1tb9t62882.apps.googleusercontent.com"), | |
| client_secret=os.getenv("YT_CLIENT_SECRET", "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"), | |
| scopes=["https://www.googleapis.com/auth/youtube.upload"] | |
| ) | |
| creds.refresh(Request()) | |
| youtube = build("youtube", "v3", credentials=creds) | |
| request = youtube.videos().insert( | |
| part="snippet,status", | |
| body={ | |
| "snippet": { | |
| "title": title, | |
| "description": desc, | |
| "tags": ["funny", "memes", "comedy", "shorts"], | |
| "categoryId": "23" | |
| }, | |
| "status": { | |
| "privacyStatus": "public", | |
| "madeForKids": False | |
| } | |
| }, | |
| media_body=MediaFileUpload(video_path) | |
| ) | |
| res = request.execute() | |
| logger.info(f"Uploaded: https://youtube.com/watch?v={res['id']}") | |
| return f"https://youtube.com/watch?v={res['id']}" | |
| first_run = True | |
| def save_to_db(part, title, desc, link): | |
| meta.insert_one({"part": part, "title": title, "description": desc, "link": link, "uploaded": time.time()}) | |
| def auto_loop(): | |
| asyncio.set_event_loop(asyncio.new_event_loop()) | |
| global UPLOAD_TIMES, NEXT_RESET, first_run | |
| ist = timezone(timedelta(hours=5, minutes=30)) # IST timezone | |
| daily_upload_count = random.randint(3, 5) | |
| uploads_done_today = 0 | |
| NEXT_RESET = datetime.now(ist).replace(hour=0, minute=0, second=0, microsecond=0) + timedelta(days=1) | |
| logger.info(f"[π ] Today's upload target: {daily_upload_count} reels.") | |
| def wait_until(hour: int, minute: int = 0): | |
| now = datetime.now(ist) | |
| target = now.replace(hour=hour, minute=minute, second=0, microsecond=0) | |
| if target < now: | |
| return | |
| logger.info(f"[π] Waiting until {target.strftime('%H:%M')} IST...") | |
| while datetime.now(ist) < target: | |
| time.sleep(10) | |
| wait_until(8) | |
| while True: | |
| try: | |
| now = datetime.now(ist) | |
| if now >= NEXT_RESET: | |
| UPLOAD_TIMES.clear() | |
| NEXT_RESET = now.replace(hour=0, minute=0, second=0, microsecond=0) + timedelta(days=1) | |
| daily_upload_count = random.randint(3, 5) | |
| uploads_done_today = 0 | |
| logger.info(f"[π] Reset for new day. New target: {daily_upload_count} uploads.") | |
| if uploads_done_today >= daily_upload_count: | |
| logger.info("[β ] Daily upload target reached.") | |
| time.sleep(60) | |
| continue | |
| if now.hour < 8 or now.hour >= 20: | |
| time.sleep(60) | |
| continue | |
| if not UPLOAD_TIMES or (now - UPLOAD_TIMES[-1]).total_seconds() >= random.randint(7200, 14400): | |
| video_path, reel_type = fetch_valid_reel() | |
| if not video_path: | |
| logger.warning("[β οΈ] No valid reel found. Retrying...") | |
| time.sleep(60) | |
| continue | |
| if reel_type == "reacted": | |
| edited = edit_video(video_path) | |
| else: | |
| edited = edit_video_raw(video_path) | |
| part = get_next_part() | |
| title = f"Try not to laugh || #{part} #funny #memes #comedy #shorts" | |
| desc = generate_description(title) | |
| link = upload_to_youtube(edited, title, desc) | |
| save_to_db(part, title, desc, link) | |
| logger.info(f"[π€] Uploaded #{part}: {link}") | |
| UPLOAD_TIMES.append(now) | |
| uploads_done_today += 1 | |
| os.remove(video) | |
| os.remove(edited) | |
| if uploads_done_today < daily_upload_count: | |
| gap_seconds = random.randint(7200, 14400) | |
| next_time = datetime.now(ist) + timedelta(seconds=gap_seconds) | |
| if next_time.hour >= 20: | |
| logger.info("[π] Next upload would exceed 8PM. Skipping.") | |
| continue | |
| logger.info(f"[β³] Waiting ~{gap_seconds // 60} minutes before next upload.") | |
| time.sleep(gap_seconds) | |
| else: | |
| time.sleep(60) | |
| except Exception as e: | |
| logger.error(f"Loop error: {e}") | |
| time.sleep(60) | |
| def home(): | |
| last = meta.find_one(sort=[("uploaded", -1)]) | |
| video_id = last["link"].split("v=")[-1] if last and "link" in last else None | |
| return render_template("index.html", time=time.ctime(), video_id=video_id) | |
| def run_now(): | |
| Thread(target=auto_loop, daemon=True).start() | |
| return redirect("/") | |
| if __name__ == "__main__": | |
| Thread(target=auto_loop, daemon=True).start() | |
| app.run(host="0.0.0.0", port=7860) |