Spaces:
Paused
Paused
| #!/usr/bin/env python3 | |
| import os, json, time, subprocess, pickle, sys, shutil, re, random, traceback, logging | |
| from pathlib import Path | |
| from datetime import datetime, timezone | |
| BASE_DIR = Path(__file__).resolve().parent | |
| RUNS_DIR = BASE_DIR / "runs" | |
| KAGGLE_TEMPLATE = BASE_DIR / "kaggle_template.py" | |
| KAGGLE_TTS_TEMPLATE = BASE_DIR / "kaggle_tts_stt_template.py" | |
| KAGGLE_LLM_TEMPLATE = BASE_DIR / "kaggle_llm_template.py" | |
| TOPIC_HISTORY_FILE = BASE_DIR / "topic_history.json" | |
| FACT_HISTORY_FILE = BASE_DIR / "fact_history.json" | |
| KAGGLE_KEYS_FILE = BASE_DIR / "kaggle_keys.json" | |
| CLIENT_SECRETS_FILE = BASE_DIR / "client_secrets.json" | |
| TOKEN_FILE = BASE_DIR / "token.pickle" | |
| TRIGGER_RUN_FILE = BASE_DIR / "trigger_run" | |
| TRIGGER_SKIP_FILE = BASE_DIR / "trigger_skip" | |
| NUM_FRAMES = 10 | |
| VIDEO_SPEED = 1.1 | |
| SCOPES = ["https://www.googleapis.com/auth/youtube.upload"] | |
| RUNS_DIR.mkdir(parents=True, exist_ok=True) | |
| HF_TOKEN = os.environ.get("HF_TOKEN", os.environ.get("HF_HUB_TOKEN", "")) | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format="[%(asctime)s] %(message)s", | |
| datefmt="%H:%M:%S", | |
| handlers=[logging.StreamHandler(sys.stdout)] | |
| ) | |
| log = logging.getLogger() | |
| def load_kaggle_accounts(): | |
| if KAGGLE_KEYS_FILE.exists(): | |
| try: | |
| return json.loads(KAGGLE_KEYS_FILE.read_text()).get("accounts", []) | |
| except: pass | |
| u = os.environ.get("KAGGLE_USERNAME", "") | |
| k = os.environ.get("KAGGLE_KEY", "") | |
| return [{"username": u, "key": k}] if u and k else [] | |
| KAGGLE_ACCOUNTS = load_kaggle_accounts() | |
| def set_kaggle_creds(account): | |
| kd = Path.home() / ".kaggle" | |
| kd.mkdir(exist_ok=True) | |
| (kd / "kaggle.json").write_text(json.dumps({"username": account["username"], "key": account["key"]})) | |
| (kd / "kaggle.json").chmod(0o600) | |
| def load_json_list(path): | |
| if path.exists(): | |
| try: | |
| d = json.loads(path.read_text()) | |
| return d if isinstance(d, list) else [] | |
| except: pass | |
| return [] | |
| def get_used_topics(): | |
| return load_json_list(TOPIC_HISTORY_FILE) | |
| def get_used_facts(): | |
| return load_json_list(FACT_HISTORY_FILE) | |
| def save_topic(topic): | |
| h = get_used_topics() | |
| h.append({"topic": topic, "date": datetime.now().isoformat()}) | |
| TOPIC_HISTORY_FILE.write_text(json.dumps(h[-200:], ensure_ascii=False, indent=2)) | |
| try: | |
| from sync_hub import push_file | |
| push_file(TOPIC_HISTORY_FILE) | |
| except: pass | |
| def save_fact(fact, topic=""): | |
| h = get_used_facts() | |
| h.append({"fact": fact, "topic": topic, "date": datetime.now().isoformat()}) | |
| FACT_HISTORY_FILE.write_text(json.dumps(h[-200:], ensure_ascii=False, indent=2)) | |
| try: | |
| from sync_hub import push_file | |
| push_file(FACT_HISTORY_FILE) | |
| except: pass | |
| def load_state(run: Path) -> str: | |
| f = run / "state.txt" | |
| return f.read_text().strip() if f.exists() else "CONTENT" | |
| def save_state(run: Path, state: str): | |
| (run / "state.txt").write_text(state) | |
| log.info(f"State -> {state}") | |
| def save_error_log(run: Path, exc: Exception, stage: str): | |
| (run / "error.log").write_text( | |
| f"[{datetime.now().isoformat()}] Stage: {stage}\n{traceback.format_exc()}") | |
| def atomic_write(path: Path, data): | |
| tmp = path.with_suffix(".tmp") | |
| txt = json.dumps(data, ensure_ascii=False, indent=2) if isinstance(data, (dict, list)) else str(data) | |
| tmp.write_text(txt) | |
| tmp.replace(path) | |
| def push_kaggle_kernel(build_dir: Path, main_py_text: str, label: str): | |
| (build_dir / "main.py").write_text(main_py_text) | |
| suffix = build_dir.parent.name.replace("_", "")[-10:] | |
| accounts = KAGGLE_ACCOUNTS.copy() | |
| random.shuffle(accounts) | |
| for acc in accounts: | |
| tag = label[:5].replace(" ", "") | |
| slug = f"{acc['username']}/yt-{tag}-{suffix}-{random.randint(1000, 9999)}" | |
| meta = { | |
| "id": slug, | |
| "title": f"YT {label} {suffix}", | |
| "code_file": "main.py", | |
| "language": "python", | |
| "kernel_type": "script", | |
| "enable_gpu": "true", | |
| "enable_internet": "true", | |
| "is_private": "false" | |
| } | |
| (build_dir / "kernel-metadata.json").write_text(json.dumps(meta, indent=2)) | |
| set_kaggle_creds(acc) | |
| r = subprocess.run(["kaggle", "kernels", "push", "-p", str(build_dir)], | |
| capture_output=True, text=True) | |
| if r.returncode == 0: | |
| (build_dir / "kaggle_account.json").write_text( | |
| json.dumps({"username": acc["username"], "slug": slug})) | |
| log.info(f"Pushed {label} ({acc['username']}): {slug}") | |
| return | |
| log.error(f"Push failed ({acc['username']}): {r.stderr.strip()[:200]}") | |
| raise RuntimeError(f"{label} kernel push failed on all accounts") | |
| def wait_kaggle(slug: str, timeout: int = 3600): | |
| start = time.time() | |
| while time.time() - start < timeout: | |
| elapsed = int(time.time() - start) | |
| r = subprocess.run(["kaggle", "kernels", "status", slug], | |
| capture_output=True, text=True) | |
| status = r.stdout.strip().lower() | |
| lines = [l for l in status.split("\n") if "has status" in l] | |
| cur = lines[0] if lines else status[:120] | |
| log.info(f" {elapsed//60:02d}:{elapsed%60:02d} | {cur}") | |
| if "complete" in status: | |
| log.info(f"Kernel done ({elapsed//60}m {elapsed%60}s)") | |
| return | |
| if "error" in status or "failed" in status: | |
| raise RuntimeError(f"Kernel failed: {status}") | |
| time.sleep(30) | |
| raise RuntimeError(f"Kernel timed out after {timeout//60}min") | |
| def step_content(run: Path): | |
| log.info("=" * 56) | |
| log.info("STEP 1: CONTENT (Kaggle DeepSeek-R1-32B from Dataset)") | |
| log.info("=" * 56) | |
| build_dir = run / "kaggle_llm_build" | |
| if not build_dir.exists(): | |
| build_dir.mkdir() | |
| used_topics = [e.get("topic", "") for e in get_used_topics()[-60:] if e.get("topic")] | |
| used_facts = get_used_facts()[-20:] | |
| tmpl = KAGGLE_LLM_TEMPLATE.read_text() | |
| tmpl = tmpl.replace("TOPICS_PLACEHOLDER", json.dumps(used_topics, indent=2)) | |
| tmpl = tmpl.replace("FACTS_PLACEHOLDER", json.dumps(used_facts, indent=2)) | |
| tmpl = tmpl.replace("FRAMES_PLACEHOLDER", str(NUM_FRAMES)) | |
| tmpl = tmpl.replace("HFTOKEN_PLACEHOLDER", json.dumps(HF_TOKEN)) | |
| push_kaggle_kernel(build_dir, tmpl, "llm") | |
| info = json.loads((build_dir / "kaggle_account.json").read_text()) | |
| for acc in KAGGLE_ACCOUNTS: | |
| if acc["username"] == info["username"]: | |
| set_kaggle_creds(acc); break | |
| log.info(f"Waiting for LLM kernel {info['slug']} ...") | |
| log.info(" Sleeping 60s for kernel to register...") | |
| time.sleep(60) | |
| log.info(" (Dataset mount ~0min + inference ~3-5min)") | |
| wait_kaggle(info["slug"]) | |
| log.info("Downloading content.json ...") | |
| subprocess.run(["kaggle", "kernels", "output", info["slug"], "-p", str(run)], | |
| check=True, capture_output=True) | |
| candidates = list(run.glob("**/content.json")) | |
| if not candidates: | |
| raise RuntimeError("content.json not found in kernel output") | |
| src = candidates[0] | |
| dst = run / "content.json" | |
| if src != dst: shutil.move(str(src), str(dst)) | |
| data = json.loads(dst.read_text()) | |
| missing = [k for k in ("topic", "fact", "hook", "title", "script", "prompts", "tags") | |
| if not data.get(k)] | |
| if missing: | |
| raise RuntimeError(f"content.json missing: {missing}") | |
| if len(data["prompts"]) < NUM_FRAMES: | |
| raise RuntimeError(f"Only {len(data['prompts'])}/{NUM_FRAMES} prompts") | |
| save_topic(data["topic"]) | |
| save_fact(data["fact"], topic=data["topic"]) | |
| log.info(f"Content OK: topic={data['topic']} | {data['script_char_count']}c | {len(data['prompts'])} prompts") | |
| shutil.rmtree(build_dir, ignore_errors=True) | |
| save_state(run, "FRAMES") | |
| def step_frames(run: Path): | |
| log.info("=" * 56) | |
| log.info("STEP 2: IMAGES (Kaggle FLUX.1-schnell)") | |
| log.info("=" * 56) | |
| build_dir = run / "kaggle_frames_build" | |
| if not build_dir.exists(): | |
| build_dir.mkdir() | |
| data = json.loads((run / "content.json").read_text()) | |
| tmpl = KAGGLE_TEMPLATE.read_text() | |
| tmpl = tmpl.replace("{{PROMPTS_PLACEHOLDER}}", json.dumps(data["prompts"], indent=2)) | |
| tmpl = tmpl.replace("{{HF_TOKEN_PLACEHOLDER}}", json.dumps(HF_TOKEN)) | |
| push_kaggle_kernel(build_dir, tmpl, "frames") | |
| info = json.loads((build_dir / "kaggle_account.json").read_text()) | |
| for acc in KAGGLE_ACCOUNTS: | |
| if acc["username"] == info["username"]: | |
| set_kaggle_creds(acc); break | |
| wait_kaggle(info["slug"]) | |
| subprocess.run(["kaggle", "kernels", "output", info["slug"], "-p", str(run)], | |
| check=True, capture_output=True) | |
| frames = sorted(run.glob("**/frame_*.png")) | |
| if len(frames) < NUM_FRAMES: | |
| raise RuntimeError(f"Only {len(frames)}/{NUM_FRAMES} frames") | |
| for f in frames: | |
| if f.parent != run: shutil.move(str(f), str(run / f.name)) | |
| log.info(f"Downloaded {len(frames)} frames") | |
| shutil.rmtree(build_dir, ignore_errors=True) | |
| save_state(run, "TTS") | |
| def step_tts(run: Path): | |
| log.info("=" * 56) | |
| log.info("STEP 3: TTS (Kaggle StyleTTS2 + Whisper)") | |
| log.info("=" * 56) | |
| build_dir = run / "kaggle_tts_build" | |
| if not build_dir.exists(): | |
| build_dir.mkdir() | |
| data = json.loads((run / "content.json").read_text()) | |
| tmpl = KAGGLE_TTS_TEMPLATE.read_text() | |
| tmpl = tmpl.replace("{{SCRIPT_PLACEHOLDER}}", json.dumps(data["script"])) | |
| tmpl = tmpl.replace("{{VIDEO_SPEED_PLACEHOLDER}}", str(VIDEO_SPEED)) | |
| push_kaggle_kernel(build_dir, tmpl, "tts") | |
| info = json.loads((build_dir / "kaggle_account.json").read_text()) | |
| for acc in KAGGLE_ACCOUNTS: | |
| if acc["username"] == info["username"]: | |
| set_kaggle_creds(acc); break | |
| wait_kaggle(info["slug"]) | |
| subprocess.run(["kaggle", "kernels", "output", info["slug"], "-p", str(run)], | |
| check=True, capture_output=True) | |
| voice = next(iter(run.glob("**/voice.wav")), None) | |
| timings = next(iter(run.glob("**/word_timings.json")), None) | |
| if not voice: raise RuntimeError("voice.wav not found in TTS output") | |
| if not timings: raise RuntimeError("word_timings.json not found in TTS output") | |
| if voice.parent != run: shutil.move(str(voice), str(run / "voice.wav")) | |
| if timings.parent != run: shutil.move(str(timings), str(run / "word_timings.json")) | |
| log.info("TTS done: voice.wav + word_timings.json") | |
| shutil.rmtree(build_dir, ignore_errors=True) | |
| save_state(run, "AUDIO") | |
| def step_audio(run: Path): | |
| log.info("STEP 4: AUDIO PROCESSING") | |
| voice_fast = run / "voice_fast.wav" | |
| r = subprocess.run( | |
| ["ffmpeg", "-y", "-i", str(run / "voice.wav"), | |
| "-filter:a", f"atempo={VIDEO_SPEED}", str(voice_fast)], | |
| capture_output=True, text=True) | |
| if r.returncode != 0: | |
| raise RuntimeError(f"ffmpeg speed failed: {r.stderr[:300]}") | |
| r2 = subprocess.run( | |
| ["ffprobe", "-v", "quiet", "-show_entries", "format=duration", | |
| "-of", "csv=p=0", str(voice_fast)], | |
| capture_output=True, text=True, check=True) | |
| audio_dur = float(r2.stdout.strip()) | |
| timings = json.loads((run / "word_timings.json").read_text()) | |
| adj = [{"word": w["word"], "start": w["start"] / VIDEO_SPEED, "end": w["end"] / VIDEO_SPEED} | |
| for w in timings] | |
| atomic_write(run / "adjusted_timings.json", adj) | |
| atomic_write(run / "audio_duration.txt", str(audio_dur)) | |
| log.info(f"Audio: {audio_dur:.2f}s at {VIDEO_SPEED}x") | |
| save_state(run, "VIDEO") | |
| def build_ass(timings, out_path: Path, W=1080, H=1920): | |
| def ts(s): | |
| h = int(s // 3600); m = int((s % 3600) // 60); sec = s % 60 | |
| return f"{h}:{m:02d}:{sec:05.2f}" | |
| header = ( | |
| "[Script Info]\nScriptType: v4.00+\n" | |
| f"PlayResX: {W}\nPlayResY: {H}\nScaledBorderAndShadow: yes\n\n" | |
| "[V4+ Styles]\nFormat: Name, Fontname, Fontsize, PrimaryColour, SecondaryColour, " | |
| "OutlineColour, BackColour, Bold, Italic, Underline, StrikeOut, ScaleX, ScaleY, " | |
| "Spacing, Angle, BorderStyle, Outline, Shadow, Alignment, MarginL, MarginR, MarginV, Encoding\n" | |
| "Style: Default,Arial,72,&H00FFFFFF,&H000000FF,&H00000000,&H80000000," | |
| "-1,0,0,0,100,100,0,0,1,3,2,2,50,50,80,1\n\n" | |
| "[Events]\nFormat: Layer, Start, End, Style, Name, MarginL, MarginR, MarginV, Effect, Text\n") | |
| lines = [header] | |
| for chunk in [timings[i:i+5] for i in range(0, len(timings), 5)]: | |
| if not chunk: continue | |
| text = " ".join(w["word"] for w in chunk) | |
| lines.append(f"Dialogue: 0,{ts(chunk[0]['start'])},{ts(chunk[-1]['end'])},Default,,0,0,0,,{text}\n") | |
| out_path.write_text("".join(lines), encoding="utf-8") | |
| def step_video(run: Path): | |
| log.info("STEP 5: VIDEO ASSEMBLY") | |
| adj = json.loads((run / "adjusted_timings.json").read_text()) | |
| dur = float((run / "audio_duration.txt").read_text().strip()) | |
| frames = sorted(run.glob("frame_*.png")) | |
| if not frames: raise RuntimeError("No frames found") | |
| W, H = 1080, 1920 | |
| frame_dur = dur / len(frames) | |
| concat_file = run / "frames.txt" | |
| with open(concat_file, "w") as f: | |
| for fr in frames: | |
| f.write(f"file '{fr}'\nduration {frame_dur:.6f}\n") | |
| f.write(f"file '{frames[-1]}'\n") | |
| slides = run / "slides.mp4" | |
| r = subprocess.run([ | |
| "ffmpeg", "-y", "-f", "concat", "-safe", "0", "-i", str(concat_file), | |
| "-vf", f"scale={W}:{H}:force_original_aspect_ratio=increase,crop={W}:{H}", | |
| "-c:v", "libx264", "-preset", "fast", "-pix_fmt", "yuv420p", "-r", "30", str(slides) | |
| ], capture_output=True, text=True) | |
| if r.returncode != 0: raise RuntimeError(f"Slides failed: {r.stderr[:400]}") | |
| ass = run / "subtitles.ass" | |
| build_ass(adj, ass, W, H) | |
| final = run / "final.mp4" | |
| r = subprocess.run([ | |
| "ffmpeg", "-y", "-i", str(slides), "-i", str(run / "voice_fast.wav"), | |
| "-vf", f"ass={ass}", | |
| "-c:v", "libx264", "-preset", "fast", "-c:a", "aac", "-b:a", "192k", | |
| "-shortest", str(final) | |
| ], capture_output=True, text=True) | |
| if r.returncode != 0: raise RuntimeError(f"Final video failed: {r.stderr[:400]}") | |
| log.info(f"Final video: {final.stat().st_size / 1024 / 1024:.1f} MB") | |
| save_state(run, "UPLOAD") | |
| def step_upload(run: Path): | |
| log.info("STEP 6: YOUTUBE UPLOAD") | |
| data = json.loads((run / "content.json").read_text()) | |
| final = run / "final.mp4" | |
| if not final.exists(): raise RuntimeError("final.mp4 not found") | |
| try: | |
| from sync_hub import pull_state | |
| pull_state(BASE_DIR) | |
| except: pass | |
| creds = None | |
| if TOKEN_FILE.exists(): | |
| with open(TOKEN_FILE, "rb") as f: | |
| creds = pickle.load(f) | |
| if not creds or not creds.valid: | |
| if creds and creds.expired and creds.refresh_token: | |
| from google.auth.transport.requests import Request | |
| creds.refresh(Request()) | |
| with open(TOKEN_FILE, "wb") as f: pickle.dump(creds, f) | |
| else: | |
| raise RuntimeError( | |
| "token.pickle missing or invalid. Run auth_youtube.py on your Mac, " | |
| "then upload token.pickle to HF Dataset arshitmalik/yt-pipeline-data") | |
| try: | |
| from sync_hub import push_file | |
| push_file(TOKEN_FILE) | |
| except: pass | |
| from googleapiclient.discovery import build | |
| from googleapiclient.http import MediaFileUpload | |
| yt = build("youtube", "v3", credentials=creds) | |
| body = { | |
| "snippet": { | |
| "title": data.get("title", "Science Fact")[:100], | |
| "description": data.get("fact", "") + "\n\n" + "\n".join(data.get("tags", [])), | |
| "tags": data.get("tags", []), | |
| "categoryId": "28", | |
| "defaultLanguage": "en" | |
| }, | |
| "status": {"privacyStatus": "public", "selfDeclaredMadeForKids": False} | |
| } | |
| media = MediaFileUpload(str(final), chunksize=4 * 1024 * 1024, resumable=True) | |
| req = yt.videos().insert(part="snippet,status", body=body, media_body=media) | |
| log.info("Uploading to YouTube...") | |
| resp = None | |
| while resp is None: | |
| status, resp = req.next_chunk() | |
| if status: log.info(f"Upload progress: {int(status.progress() * 100)}%") | |
| vid_id = resp.get("id", "unknown") | |
| log.info(f"Published: https://youtube.com/watch?v={vid_id}") | |
| atomic_write(run / "youtube_id.txt", vid_id) | |
| try: | |
| from sync_hub import push_all_state | |
| push_all_state(BASE_DIR) | |
| except: pass | |
| save_state(run, "DONE") | |
| STEPS = { | |
| "CONTENT": step_content, | |
| "FRAMES": step_frames, | |
| "TTS": step_tts, | |
| "AUDIO": step_audio, | |
| "VIDEO": step_video, | |
| "UPLOAD": step_upload, | |
| } | |
| def main(): | |
| log.info("=" * 56) | |
| log.info("YouTube Shorts Pipeline — STARTING") | |
| log.info(f"DeepSeek-R1-32B (Kaggle) | FLUX (Kaggle) | StyleTTS2 (Kaggle)") | |
| log.info(f"Frames: {NUM_FRAMES} | Speed: {VIDEO_SPEED}x | Accounts: {len(KAGGLE_ACCOUNTS)}") | |
| log.info("=" * 56) | |
| try: | |
| from sync_hub import pull_state | |
| pull_state(BASE_DIR) | |
| except: pass | |
| NEXT_RUN_AFTER = time.time() + 15 | |
| while True: | |
| if TRIGGER_SKIP_FILE.exists(): | |
| TRIGGER_SKIP_FILE.unlink(missing_ok=True) | |
| NEXT_RUN_AFTER = time.time() + 86400 | |
| log.info("Skipped. Next run in 24h.") | |
| time.sleep(60); continue | |
| if TRIGGER_RUN_FILE.exists(): | |
| TRIGGER_RUN_FILE.unlink(missing_ok=True) | |
| NEXT_RUN_AFTER = 0 | |
| log.info("Triggered immediate run!") | |
| if time.time() < NEXT_RUN_AFTER: | |
| left = NEXT_RUN_AFTER - time.time() | |
| if int(left) % 300 < 30: | |
| log.info(f"Next run in {left/3600:.1f}h ...") | |
| time.sleep(30); continue | |
| run = None | |
| if RUNS_DIR.exists(): | |
| for d in sorted(RUNS_DIR.iterdir(), key=lambda x: x.stat().st_mtime, reverse=True): | |
| if d.is_dir() and load_state(d) not in ("DONE", "FAILED"): | |
| run = d; break | |
| if not run: | |
| run = RUNS_DIR / datetime.now().strftime("%Y%m%d_%H%M%S") | |
| run.mkdir(parents=True) | |
| save_state(run, "CONTENT") | |
| state = load_state(run) | |
| log.info(f"\nRun: {run.name} Stage: {state}") | |
| try: | |
| if state in STEPS: | |
| STEPS[state](run) | |
| else: | |
| save_state(run, "FAILED") | |
| except KeyboardInterrupt: | |
| log.info("Interrupted"); sys.exit(0) | |
| except Exception as e: | |
| log.error(f"Error in {state}: {e}") | |
| traceback.print_exc() | |
| save_error_log(run, e, state) | |
| save_state(run, "FAILED") | |
| cur = load_state(run) | |
| if cur == "DONE": | |
| content = json.loads((run / "content.json").read_text()) | |
| yt_id = (run / "youtube_id.txt").read_text().strip() if (run / "youtube_id.txt").exists() else "?" | |
| log.info(f"\nCOMPLETE: {content.get('title', '')} — https://youtube.com/watch?v={yt_id}") | |
| try: shutil.rmtree(run) | |
| except: pass | |
| NEXT_RUN_AFTER = time.time() + 86400 | |
| log.info("Next run in 24h.") | |
| elif cur == "FAILED": | |
| log.error(f"FAILED — check: {run}") | |
| NEXT_RUN_AFTER = time.time() + 300 | |
| log.info("Retrying in 5 min.") | |
| if __name__ == "__main__": | |
| main() | |