#!/usr/bin/env python3 import os, json, time, subprocess, pickle, sys, shutil, re, random, traceback, logging from pathlib import Path from datetime import datetime, timezone BASE_DIR = Path(__file__).resolve().parent RUNS_DIR = BASE_DIR / "runs" KAGGLE_TEMPLATE = BASE_DIR / "kaggle_template.py" KAGGLE_TTS_TEMPLATE = BASE_DIR / "kaggle_tts_stt_template.py" KAGGLE_LLM_TEMPLATE = BASE_DIR / "kaggle_llm_template.py" TOPIC_HISTORY_FILE = BASE_DIR / "topic_history.json" FACT_HISTORY_FILE = BASE_DIR / "fact_history.json" KAGGLE_KEYS_FILE = BASE_DIR / "kaggle_keys.json" CLIENT_SECRETS_FILE = BASE_DIR / "client_secrets.json" TOKEN_FILE = BASE_DIR / "token.pickle" TRIGGER_RUN_FILE = BASE_DIR / "trigger_run" TRIGGER_SKIP_FILE = BASE_DIR / "trigger_skip" NUM_FRAMES = 10 VIDEO_SPEED = 1.1 SCOPES = ["https://www.googleapis.com/auth/youtube.upload"] RUNS_DIR.mkdir(parents=True, exist_ok=True) HF_TOKEN = os.environ.get("HF_TOKEN", os.environ.get("HF_HUB_TOKEN", "")) logging.basicConfig( level=logging.INFO, format="[%(asctime)s] %(message)s", datefmt="%H:%M:%S", handlers=[logging.StreamHandler(sys.stdout)] ) log = logging.getLogger() def load_kaggle_accounts(): if KAGGLE_KEYS_FILE.exists(): try: return json.loads(KAGGLE_KEYS_FILE.read_text()).get("accounts", []) except: pass u = os.environ.get("KAGGLE_USERNAME", "") k = os.environ.get("KAGGLE_KEY", "") return [{"username": u, "key": k}] if u and k else [] KAGGLE_ACCOUNTS = load_kaggle_accounts() def set_kaggle_creds(account): kd = Path.home() / ".kaggle" kd.mkdir(exist_ok=True) (kd / "kaggle.json").write_text(json.dumps({"username": account["username"], "key": account["key"]})) (kd / "kaggle.json").chmod(0o600) def load_json_list(path): if path.exists(): try: d = json.loads(path.read_text()) return d if isinstance(d, list) else [] except: pass return [] def get_used_topics(): return load_json_list(TOPIC_HISTORY_FILE) def get_used_facts(): return load_json_list(FACT_HISTORY_FILE) def save_topic(topic): h = get_used_topics() h.append({"topic": topic, "date": datetime.now().isoformat()}) TOPIC_HISTORY_FILE.write_text(json.dumps(h[-200:], ensure_ascii=False, indent=2)) try: from sync_hub import push_file push_file(TOPIC_HISTORY_FILE) except: pass def save_fact(fact, topic=""): h = get_used_facts() h.append({"fact": fact, "topic": topic, "date": datetime.now().isoformat()}) FACT_HISTORY_FILE.write_text(json.dumps(h[-200:], ensure_ascii=False, indent=2)) try: from sync_hub import push_file push_file(FACT_HISTORY_FILE) except: pass def load_state(run: Path) -> str: f = run / "state.txt" return f.read_text().strip() if f.exists() else "CONTENT" def save_state(run: Path, state: str): (run / "state.txt").write_text(state) log.info(f"State -> {state}") def save_error_log(run: Path, exc: Exception, stage: str): (run / "error.log").write_text( f"[{datetime.now().isoformat()}] Stage: {stage}\n{traceback.format_exc()}") def atomic_write(path: Path, data): tmp = path.with_suffix(".tmp") txt = json.dumps(data, ensure_ascii=False, indent=2) if isinstance(data, (dict, list)) else str(data) tmp.write_text(txt) tmp.replace(path) def push_kaggle_kernel(build_dir: Path, main_py_text: str, label: str): (build_dir / "main.py").write_text(main_py_text) suffix = build_dir.parent.name.replace("_", "")[-10:] accounts = KAGGLE_ACCOUNTS.copy() random.shuffle(accounts) for acc in accounts: tag = label[:5].replace(" ", "") slug = f"{acc['username']}/yt-{tag}-{suffix}-{random.randint(1000, 9999)}" meta = { "id": slug, "title": f"YT {label} {suffix}", "code_file": "main.py", "language": "python", "kernel_type": "script", "enable_gpu": "true", "enable_internet": "true", "is_private": "false" } (build_dir / "kernel-metadata.json").write_text(json.dumps(meta, indent=2)) set_kaggle_creds(acc) r = subprocess.run(["kaggle", "kernels", "push", "-p", str(build_dir)], capture_output=True, text=True) if r.returncode == 0: (build_dir / "kaggle_account.json").write_text( json.dumps({"username": acc["username"], "slug": slug})) log.info(f"Pushed {label} ({acc['username']}): {slug}") return log.error(f"Push failed ({acc['username']}): {r.stderr.strip()[:200]}") raise RuntimeError(f"{label} kernel push failed on all accounts") def wait_kaggle(slug: str, timeout: int = 3600): start = time.time() while time.time() - start < timeout: elapsed = int(time.time() - start) r = subprocess.run(["kaggle", "kernels", "status", slug], capture_output=True, text=True) status = r.stdout.strip().lower() lines = [l for l in status.split("\n") if "has status" in l] cur = lines[0] if lines else status[:120] log.info(f" {elapsed//60:02d}:{elapsed%60:02d} | {cur}") if "complete" in status: log.info(f"Kernel done ({elapsed//60}m {elapsed%60}s)") return if "error" in status or "failed" in status: raise RuntimeError(f"Kernel failed: {status}") time.sleep(30) raise RuntimeError(f"Kernel timed out after {timeout//60}min") def step_content(run: Path): log.info("=" * 56) log.info("STEP 1: CONTENT (Kaggle DeepSeek-R1-32B from Dataset)") log.info("=" * 56) build_dir = run / "kaggle_llm_build" if not build_dir.exists(): build_dir.mkdir() used_topics = [e.get("topic", "") for e in get_used_topics()[-60:] if e.get("topic")] used_facts = get_used_facts()[-20:] tmpl = KAGGLE_LLM_TEMPLATE.read_text() tmpl = tmpl.replace("TOPICS_PLACEHOLDER", json.dumps(used_topics, indent=2)) tmpl = tmpl.replace("FACTS_PLACEHOLDER", json.dumps(used_facts, indent=2)) tmpl = tmpl.replace("FRAMES_PLACEHOLDER", str(NUM_FRAMES)) tmpl = tmpl.replace("HFTOKEN_PLACEHOLDER", json.dumps(HF_TOKEN)) push_kaggle_kernel(build_dir, tmpl, "llm") info = json.loads((build_dir / "kaggle_account.json").read_text()) for acc in KAGGLE_ACCOUNTS: if acc["username"] == info["username"]: set_kaggle_creds(acc); break log.info(f"Waiting for LLM kernel {info['slug']} ...") log.info(" Sleeping 60s for kernel to register...") time.sleep(60) log.info(" (Dataset mount ~0min + inference ~3-5min)") wait_kaggle(info["slug"]) log.info("Downloading content.json ...") subprocess.run(["kaggle", "kernels", "output", info["slug"], "-p", str(run)], check=True, capture_output=True) candidates = list(run.glob("**/content.json")) if not candidates: raise RuntimeError("content.json not found in kernel output") src = candidates[0] dst = run / "content.json" if src != dst: shutil.move(str(src), str(dst)) data = json.loads(dst.read_text()) missing = [k for k in ("topic", "fact", "hook", "title", "script", "prompts", "tags") if not data.get(k)] if missing: raise RuntimeError(f"content.json missing: {missing}") if len(data["prompts"]) < NUM_FRAMES: raise RuntimeError(f"Only {len(data['prompts'])}/{NUM_FRAMES} prompts") save_topic(data["topic"]) save_fact(data["fact"], topic=data["topic"]) log.info(f"Content OK: topic={data['topic']} | {data['script_char_count']}c | {len(data['prompts'])} prompts") shutil.rmtree(build_dir, ignore_errors=True) save_state(run, "FRAMES") def step_frames(run: Path): log.info("=" * 56) log.info("STEP 2: IMAGES (Kaggle FLUX.1-schnell)") log.info("=" * 56) build_dir = run / "kaggle_frames_build" if not build_dir.exists(): build_dir.mkdir() data = json.loads((run / "content.json").read_text()) tmpl = KAGGLE_TEMPLATE.read_text() tmpl = tmpl.replace("{{PROMPTS_PLACEHOLDER}}", json.dumps(data["prompts"], indent=2)) tmpl = tmpl.replace("{{HF_TOKEN_PLACEHOLDER}}", json.dumps(HF_TOKEN)) push_kaggle_kernel(build_dir, tmpl, "frames") info = json.loads((build_dir / "kaggle_account.json").read_text()) for acc in KAGGLE_ACCOUNTS: if acc["username"] == info["username"]: set_kaggle_creds(acc); break wait_kaggle(info["slug"]) subprocess.run(["kaggle", "kernels", "output", info["slug"], "-p", str(run)], check=True, capture_output=True) frames = sorted(run.glob("**/frame_*.png")) if len(frames) < NUM_FRAMES: raise RuntimeError(f"Only {len(frames)}/{NUM_FRAMES} frames") for f in frames: if f.parent != run: shutil.move(str(f), str(run / f.name)) log.info(f"Downloaded {len(frames)} frames") shutil.rmtree(build_dir, ignore_errors=True) save_state(run, "TTS") def step_tts(run: Path): log.info("=" * 56) log.info("STEP 3: TTS (Kaggle StyleTTS2 + Whisper)") log.info("=" * 56) build_dir = run / "kaggle_tts_build" if not build_dir.exists(): build_dir.mkdir() data = json.loads((run / "content.json").read_text()) tmpl = KAGGLE_TTS_TEMPLATE.read_text() tmpl = tmpl.replace("{{SCRIPT_PLACEHOLDER}}", json.dumps(data["script"])) tmpl = tmpl.replace("{{VIDEO_SPEED_PLACEHOLDER}}", str(VIDEO_SPEED)) push_kaggle_kernel(build_dir, tmpl, "tts") info = json.loads((build_dir / "kaggle_account.json").read_text()) for acc in KAGGLE_ACCOUNTS: if acc["username"] == info["username"]: set_kaggle_creds(acc); break wait_kaggle(info["slug"]) subprocess.run(["kaggle", "kernels", "output", info["slug"], "-p", str(run)], check=True, capture_output=True) voice = next(iter(run.glob("**/voice.wav")), None) timings = next(iter(run.glob("**/word_timings.json")), None) if not voice: raise RuntimeError("voice.wav not found in TTS output") if not timings: raise RuntimeError("word_timings.json not found in TTS output") if voice.parent != run: shutil.move(str(voice), str(run / "voice.wav")) if timings.parent != run: shutil.move(str(timings), str(run / "word_timings.json")) log.info("TTS done: voice.wav + word_timings.json") shutil.rmtree(build_dir, ignore_errors=True) save_state(run, "AUDIO") def step_audio(run: Path): log.info("STEP 4: AUDIO PROCESSING") voice_fast = run / "voice_fast.wav" r = subprocess.run( ["ffmpeg", "-y", "-i", str(run / "voice.wav"), "-filter:a", f"atempo={VIDEO_SPEED}", str(voice_fast)], capture_output=True, text=True) if r.returncode != 0: raise RuntimeError(f"ffmpeg speed failed: {r.stderr[:300]}") r2 = subprocess.run( ["ffprobe", "-v", "quiet", "-show_entries", "format=duration", "-of", "csv=p=0", str(voice_fast)], capture_output=True, text=True, check=True) audio_dur = float(r2.stdout.strip()) timings = json.loads((run / "word_timings.json").read_text()) adj = [{"word": w["word"], "start": w["start"] / VIDEO_SPEED, "end": w["end"] / VIDEO_SPEED} for w in timings] atomic_write(run / "adjusted_timings.json", adj) atomic_write(run / "audio_duration.txt", str(audio_dur)) log.info(f"Audio: {audio_dur:.2f}s at {VIDEO_SPEED}x") save_state(run, "VIDEO") def build_ass(timings, out_path: Path, W=1080, H=1920): def ts(s): h = int(s // 3600); m = int((s % 3600) // 60); sec = s % 60 return f"{h}:{m:02d}:{sec:05.2f}" header = ( "[Script Info]\nScriptType: v4.00+\n" f"PlayResX: {W}\nPlayResY: {H}\nScaledBorderAndShadow: yes\n\n" "[V4+ Styles]\nFormat: Name, Fontname, Fontsize, PrimaryColour, SecondaryColour, " "OutlineColour, BackColour, Bold, Italic, Underline, StrikeOut, ScaleX, ScaleY, " "Spacing, Angle, BorderStyle, Outline, Shadow, Alignment, MarginL, MarginR, MarginV, Encoding\n" "Style: Default,Arial,72,&H00FFFFFF,&H000000FF,&H00000000,&H80000000," "-1,0,0,0,100,100,0,0,1,3,2,2,50,50,80,1\n\n" "[Events]\nFormat: Layer, Start, End, Style, Name, MarginL, MarginR, MarginV, Effect, Text\n") lines = [header] for chunk in [timings[i:i+5] for i in range(0, len(timings), 5)]: if not chunk: continue text = " ".join(w["word"] for w in chunk) lines.append(f"Dialogue: 0,{ts(chunk[0]['start'])},{ts(chunk[-1]['end'])},Default,,0,0,0,,{text}\n") out_path.write_text("".join(lines), encoding="utf-8") def step_video(run: Path): log.info("STEP 5: VIDEO ASSEMBLY") adj = json.loads((run / "adjusted_timings.json").read_text()) dur = float((run / "audio_duration.txt").read_text().strip()) frames = sorted(run.glob("frame_*.png")) if not frames: raise RuntimeError("No frames found") W, H = 1080, 1920 frame_dur = dur / len(frames) concat_file = run / "frames.txt" with open(concat_file, "w") as f: for fr in frames: f.write(f"file '{fr}'\nduration {frame_dur:.6f}\n") f.write(f"file '{frames[-1]}'\n") slides = run / "slides.mp4" r = subprocess.run([ "ffmpeg", "-y", "-f", "concat", "-safe", "0", "-i", str(concat_file), "-vf", f"scale={W}:{H}:force_original_aspect_ratio=increase,crop={W}:{H}", "-c:v", "libx264", "-preset", "fast", "-pix_fmt", "yuv420p", "-r", "30", str(slides) ], capture_output=True, text=True) if r.returncode != 0: raise RuntimeError(f"Slides failed: {r.stderr[:400]}") ass = run / "subtitles.ass" build_ass(adj, ass, W, H) final = run / "final.mp4" r = subprocess.run([ "ffmpeg", "-y", "-i", str(slides), "-i", str(run / "voice_fast.wav"), "-vf", f"ass={ass}", "-c:v", "libx264", "-preset", "fast", "-c:a", "aac", "-b:a", "192k", "-shortest", str(final) ], capture_output=True, text=True) if r.returncode != 0: raise RuntimeError(f"Final video failed: {r.stderr[:400]}") log.info(f"Final video: {final.stat().st_size / 1024 / 1024:.1f} MB") save_state(run, "UPLOAD") def step_upload(run: Path): log.info("STEP 6: YOUTUBE UPLOAD") data = json.loads((run / "content.json").read_text()) final = run / "final.mp4" if not final.exists(): raise RuntimeError("final.mp4 not found") try: from sync_hub import pull_state pull_state(BASE_DIR) except: pass creds = None if TOKEN_FILE.exists(): with open(TOKEN_FILE, "rb") as f: creds = pickle.load(f) if not creds or not creds.valid: if creds and creds.expired and creds.refresh_token: from google.auth.transport.requests import Request creds.refresh(Request()) with open(TOKEN_FILE, "wb") as f: pickle.dump(creds, f) else: raise RuntimeError( "token.pickle missing or invalid. Run auth_youtube.py on your Mac, " "then upload token.pickle to HF Dataset arshitmalik/yt-pipeline-data") try: from sync_hub import push_file push_file(TOKEN_FILE) except: pass from googleapiclient.discovery import build from googleapiclient.http import MediaFileUpload yt = build("youtube", "v3", credentials=creds) body = { "snippet": { "title": data.get("title", "Science Fact")[:100], "description": data.get("fact", "") + "\n\n" + "\n".join(data.get("tags", [])), "tags": data.get("tags", []), "categoryId": "28", "defaultLanguage": "en" }, "status": {"privacyStatus": "public", "selfDeclaredMadeForKids": False} } media = MediaFileUpload(str(final), chunksize=4 * 1024 * 1024, resumable=True) req = yt.videos().insert(part="snippet,status", body=body, media_body=media) log.info("Uploading to YouTube...") resp = None while resp is None: status, resp = req.next_chunk() if status: log.info(f"Upload progress: {int(status.progress() * 100)}%") vid_id = resp.get("id", "unknown") log.info(f"Published: https://youtube.com/watch?v={vid_id}") atomic_write(run / "youtube_id.txt", vid_id) try: from sync_hub import push_all_state push_all_state(BASE_DIR) except: pass save_state(run, "DONE") STEPS = { "CONTENT": step_content, "FRAMES": step_frames, "TTS": step_tts, "AUDIO": step_audio, "VIDEO": step_video, "UPLOAD": step_upload, } def main(): log.info("=" * 56) log.info("YouTube Shorts Pipeline — STARTING") log.info(f"DeepSeek-R1-32B (Kaggle) | FLUX (Kaggle) | StyleTTS2 (Kaggle)") log.info(f"Frames: {NUM_FRAMES} | Speed: {VIDEO_SPEED}x | Accounts: {len(KAGGLE_ACCOUNTS)}") log.info("=" * 56) try: from sync_hub import pull_state pull_state(BASE_DIR) except: pass NEXT_RUN_AFTER = time.time() + 15 while True: if TRIGGER_SKIP_FILE.exists(): TRIGGER_SKIP_FILE.unlink(missing_ok=True) NEXT_RUN_AFTER = time.time() + 86400 log.info("Skipped. Next run in 24h.") time.sleep(60); continue if TRIGGER_RUN_FILE.exists(): TRIGGER_RUN_FILE.unlink(missing_ok=True) NEXT_RUN_AFTER = 0 log.info("Triggered immediate run!") if time.time() < NEXT_RUN_AFTER: left = NEXT_RUN_AFTER - time.time() if int(left) % 300 < 30: log.info(f"Next run in {left/3600:.1f}h ...") time.sleep(30); continue run = None if RUNS_DIR.exists(): for d in sorted(RUNS_DIR.iterdir(), key=lambda x: x.stat().st_mtime, reverse=True): if d.is_dir() and load_state(d) not in ("DONE", "FAILED"): run = d; break if not run: run = RUNS_DIR / datetime.now().strftime("%Y%m%d_%H%M%S") run.mkdir(parents=True) save_state(run, "CONTENT") state = load_state(run) log.info(f"\nRun: {run.name} Stage: {state}") try: if state in STEPS: STEPS[state](run) else: save_state(run, "FAILED") except KeyboardInterrupt: log.info("Interrupted"); sys.exit(0) except Exception as e: log.error(f"Error in {state}: {e}") traceback.print_exc() save_error_log(run, e, state) save_state(run, "FAILED") cur = load_state(run) if cur == "DONE": content = json.loads((run / "content.json").read_text()) yt_id = (run / "youtube_id.txt").read_text().strip() if (run / "youtube_id.txt").exists() else "?" log.info(f"\nCOMPLETE: {content.get('title', '')} — https://youtube.com/watch?v={yt_id}") try: shutil.rmtree(run) except: pass NEXT_RUN_AFTER = time.time() + 86400 log.info("Next run in 24h.") elif cur == "FAILED": log.error(f"FAILED — check: {run}") NEXT_RUN_AFTER = time.time() + 300 log.info("Retrying in 5 min.") if __name__ == "__main__": main()