openclaw / automation.py
Arshit Malik
fix: kaggle 60s poll delay, is_private string, openclaw password, sync_hub
d5d612e
#!/usr/bin/env python3
import os, json, time, subprocess, pickle, sys, shutil, re, random, traceback, logging
from pathlib import Path
from datetime import datetime, timezone
BASE_DIR = Path(__file__).resolve().parent
RUNS_DIR = BASE_DIR / "runs"
KAGGLE_TEMPLATE = BASE_DIR / "kaggle_template.py"
KAGGLE_TTS_TEMPLATE = BASE_DIR / "kaggle_tts_stt_template.py"
KAGGLE_LLM_TEMPLATE = BASE_DIR / "kaggle_llm_template.py"
TOPIC_HISTORY_FILE = BASE_DIR / "topic_history.json"
FACT_HISTORY_FILE = BASE_DIR / "fact_history.json"
KAGGLE_KEYS_FILE = BASE_DIR / "kaggle_keys.json"
CLIENT_SECRETS_FILE = BASE_DIR / "client_secrets.json"
TOKEN_FILE = BASE_DIR / "token.pickle"
TRIGGER_RUN_FILE = BASE_DIR / "trigger_run"
TRIGGER_SKIP_FILE = BASE_DIR / "trigger_skip"
NUM_FRAMES = 10
VIDEO_SPEED = 1.1
SCOPES = ["https://www.googleapis.com/auth/youtube.upload"]
RUNS_DIR.mkdir(parents=True, exist_ok=True)
HF_TOKEN = os.environ.get("HF_TOKEN", os.environ.get("HF_HUB_TOKEN", ""))
logging.basicConfig(
level=logging.INFO,
format="[%(asctime)s] %(message)s",
datefmt="%H:%M:%S",
handlers=[logging.StreamHandler(sys.stdout)]
)
log = logging.getLogger()
def load_kaggle_accounts():
if KAGGLE_KEYS_FILE.exists():
try:
return json.loads(KAGGLE_KEYS_FILE.read_text()).get("accounts", [])
except: pass
u = os.environ.get("KAGGLE_USERNAME", "")
k = os.environ.get("KAGGLE_KEY", "")
return [{"username": u, "key": k}] if u and k else []
KAGGLE_ACCOUNTS = load_kaggle_accounts()
def set_kaggle_creds(account):
kd = Path.home() / ".kaggle"
kd.mkdir(exist_ok=True)
(kd / "kaggle.json").write_text(json.dumps({"username": account["username"], "key": account["key"]}))
(kd / "kaggle.json").chmod(0o600)
def load_json_list(path):
if path.exists():
try:
d = json.loads(path.read_text())
return d if isinstance(d, list) else []
except: pass
return []
def get_used_topics():
return load_json_list(TOPIC_HISTORY_FILE)
def get_used_facts():
return load_json_list(FACT_HISTORY_FILE)
def save_topic(topic):
h = get_used_topics()
h.append({"topic": topic, "date": datetime.now().isoformat()})
TOPIC_HISTORY_FILE.write_text(json.dumps(h[-200:], ensure_ascii=False, indent=2))
try:
from sync_hub import push_file
push_file(TOPIC_HISTORY_FILE)
except: pass
def save_fact(fact, topic=""):
h = get_used_facts()
h.append({"fact": fact, "topic": topic, "date": datetime.now().isoformat()})
FACT_HISTORY_FILE.write_text(json.dumps(h[-200:], ensure_ascii=False, indent=2))
try:
from sync_hub import push_file
push_file(FACT_HISTORY_FILE)
except: pass
def load_state(run: Path) -> str:
f = run / "state.txt"
return f.read_text().strip() if f.exists() else "CONTENT"
def save_state(run: Path, state: str):
(run / "state.txt").write_text(state)
log.info(f"State -> {state}")
def save_error_log(run: Path, exc: Exception, stage: str):
(run / "error.log").write_text(
f"[{datetime.now().isoformat()}] Stage: {stage}\n{traceback.format_exc()}")
def atomic_write(path: Path, data):
tmp = path.with_suffix(".tmp")
txt = json.dumps(data, ensure_ascii=False, indent=2) if isinstance(data, (dict, list)) else str(data)
tmp.write_text(txt)
tmp.replace(path)
def push_kaggle_kernel(build_dir: Path, main_py_text: str, label: str):
(build_dir / "main.py").write_text(main_py_text)
suffix = build_dir.parent.name.replace("_", "")[-10:]
accounts = KAGGLE_ACCOUNTS.copy()
random.shuffle(accounts)
for acc in accounts:
tag = label[:5].replace(" ", "")
slug = f"{acc['username']}/yt-{tag}-{suffix}-{random.randint(1000, 9999)}"
meta = {
"id": slug,
"title": f"YT {label} {suffix}",
"code_file": "main.py",
"language": "python",
"kernel_type": "script",
"enable_gpu": "true",
"enable_internet": "true",
"is_private": "false"
}
(build_dir / "kernel-metadata.json").write_text(json.dumps(meta, indent=2))
set_kaggle_creds(acc)
r = subprocess.run(["kaggle", "kernels", "push", "-p", str(build_dir)],
capture_output=True, text=True)
if r.returncode == 0:
(build_dir / "kaggle_account.json").write_text(
json.dumps({"username": acc["username"], "slug": slug}))
log.info(f"Pushed {label} ({acc['username']}): {slug}")
return
log.error(f"Push failed ({acc['username']}): {r.stderr.strip()[:200]}")
raise RuntimeError(f"{label} kernel push failed on all accounts")
def wait_kaggle(slug: str, timeout: int = 3600):
start = time.time()
while time.time() - start < timeout:
elapsed = int(time.time() - start)
r = subprocess.run(["kaggle", "kernels", "status", slug],
capture_output=True, text=True)
status = r.stdout.strip().lower()
lines = [l for l in status.split("\n") if "has status" in l]
cur = lines[0] if lines else status[:120]
log.info(f" {elapsed//60:02d}:{elapsed%60:02d} | {cur}")
if "complete" in status:
log.info(f"Kernel done ({elapsed//60}m {elapsed%60}s)")
return
if "error" in status or "failed" in status:
raise RuntimeError(f"Kernel failed: {status}")
time.sleep(30)
raise RuntimeError(f"Kernel timed out after {timeout//60}min")
def step_content(run: Path):
log.info("=" * 56)
log.info("STEP 1: CONTENT (Kaggle DeepSeek-R1-32B from Dataset)")
log.info("=" * 56)
build_dir = run / "kaggle_llm_build"
if not build_dir.exists():
build_dir.mkdir()
used_topics = [e.get("topic", "") for e in get_used_topics()[-60:] if e.get("topic")]
used_facts = get_used_facts()[-20:]
tmpl = KAGGLE_LLM_TEMPLATE.read_text()
tmpl = tmpl.replace("TOPICS_PLACEHOLDER", json.dumps(used_topics, indent=2))
tmpl = tmpl.replace("FACTS_PLACEHOLDER", json.dumps(used_facts, indent=2))
tmpl = tmpl.replace("FRAMES_PLACEHOLDER", str(NUM_FRAMES))
tmpl = tmpl.replace("HFTOKEN_PLACEHOLDER", json.dumps(HF_TOKEN))
push_kaggle_kernel(build_dir, tmpl, "llm")
info = json.loads((build_dir / "kaggle_account.json").read_text())
for acc in KAGGLE_ACCOUNTS:
if acc["username"] == info["username"]:
set_kaggle_creds(acc); break
log.info(f"Waiting for LLM kernel {info['slug']} ...")
log.info(" Sleeping 60s for kernel to register...")
time.sleep(60)
log.info(" (Dataset mount ~0min + inference ~3-5min)")
wait_kaggle(info["slug"])
log.info("Downloading content.json ...")
subprocess.run(["kaggle", "kernels", "output", info["slug"], "-p", str(run)],
check=True, capture_output=True)
candidates = list(run.glob("**/content.json"))
if not candidates:
raise RuntimeError("content.json not found in kernel output")
src = candidates[0]
dst = run / "content.json"
if src != dst: shutil.move(str(src), str(dst))
data = json.loads(dst.read_text())
missing = [k for k in ("topic", "fact", "hook", "title", "script", "prompts", "tags")
if not data.get(k)]
if missing:
raise RuntimeError(f"content.json missing: {missing}")
if len(data["prompts"]) < NUM_FRAMES:
raise RuntimeError(f"Only {len(data['prompts'])}/{NUM_FRAMES} prompts")
save_topic(data["topic"])
save_fact(data["fact"], topic=data["topic"])
log.info(f"Content OK: topic={data['topic']} | {data['script_char_count']}c | {len(data['prompts'])} prompts")
shutil.rmtree(build_dir, ignore_errors=True)
save_state(run, "FRAMES")
def step_frames(run: Path):
log.info("=" * 56)
log.info("STEP 2: IMAGES (Kaggle FLUX.1-schnell)")
log.info("=" * 56)
build_dir = run / "kaggle_frames_build"
if not build_dir.exists():
build_dir.mkdir()
data = json.loads((run / "content.json").read_text())
tmpl = KAGGLE_TEMPLATE.read_text()
tmpl = tmpl.replace("{{PROMPTS_PLACEHOLDER}}", json.dumps(data["prompts"], indent=2))
tmpl = tmpl.replace("{{HF_TOKEN_PLACEHOLDER}}", json.dumps(HF_TOKEN))
push_kaggle_kernel(build_dir, tmpl, "frames")
info = json.loads((build_dir / "kaggle_account.json").read_text())
for acc in KAGGLE_ACCOUNTS:
if acc["username"] == info["username"]:
set_kaggle_creds(acc); break
wait_kaggle(info["slug"])
subprocess.run(["kaggle", "kernels", "output", info["slug"], "-p", str(run)],
check=True, capture_output=True)
frames = sorted(run.glob("**/frame_*.png"))
if len(frames) < NUM_FRAMES:
raise RuntimeError(f"Only {len(frames)}/{NUM_FRAMES} frames")
for f in frames:
if f.parent != run: shutil.move(str(f), str(run / f.name))
log.info(f"Downloaded {len(frames)} frames")
shutil.rmtree(build_dir, ignore_errors=True)
save_state(run, "TTS")
def step_tts(run: Path):
log.info("=" * 56)
log.info("STEP 3: TTS (Kaggle StyleTTS2 + Whisper)")
log.info("=" * 56)
build_dir = run / "kaggle_tts_build"
if not build_dir.exists():
build_dir.mkdir()
data = json.loads((run / "content.json").read_text())
tmpl = KAGGLE_TTS_TEMPLATE.read_text()
tmpl = tmpl.replace("{{SCRIPT_PLACEHOLDER}}", json.dumps(data["script"]))
tmpl = tmpl.replace("{{VIDEO_SPEED_PLACEHOLDER}}", str(VIDEO_SPEED))
push_kaggle_kernel(build_dir, tmpl, "tts")
info = json.loads((build_dir / "kaggle_account.json").read_text())
for acc in KAGGLE_ACCOUNTS:
if acc["username"] == info["username"]:
set_kaggle_creds(acc); break
wait_kaggle(info["slug"])
subprocess.run(["kaggle", "kernels", "output", info["slug"], "-p", str(run)],
check=True, capture_output=True)
voice = next(iter(run.glob("**/voice.wav")), None)
timings = next(iter(run.glob("**/word_timings.json")), None)
if not voice: raise RuntimeError("voice.wav not found in TTS output")
if not timings: raise RuntimeError("word_timings.json not found in TTS output")
if voice.parent != run: shutil.move(str(voice), str(run / "voice.wav"))
if timings.parent != run: shutil.move(str(timings), str(run / "word_timings.json"))
log.info("TTS done: voice.wav + word_timings.json")
shutil.rmtree(build_dir, ignore_errors=True)
save_state(run, "AUDIO")
def step_audio(run: Path):
log.info("STEP 4: AUDIO PROCESSING")
voice_fast = run / "voice_fast.wav"
r = subprocess.run(
["ffmpeg", "-y", "-i", str(run / "voice.wav"),
"-filter:a", f"atempo={VIDEO_SPEED}", str(voice_fast)],
capture_output=True, text=True)
if r.returncode != 0:
raise RuntimeError(f"ffmpeg speed failed: {r.stderr[:300]}")
r2 = subprocess.run(
["ffprobe", "-v", "quiet", "-show_entries", "format=duration",
"-of", "csv=p=0", str(voice_fast)],
capture_output=True, text=True, check=True)
audio_dur = float(r2.stdout.strip())
timings = json.loads((run / "word_timings.json").read_text())
adj = [{"word": w["word"], "start": w["start"] / VIDEO_SPEED, "end": w["end"] / VIDEO_SPEED}
for w in timings]
atomic_write(run / "adjusted_timings.json", adj)
atomic_write(run / "audio_duration.txt", str(audio_dur))
log.info(f"Audio: {audio_dur:.2f}s at {VIDEO_SPEED}x")
save_state(run, "VIDEO")
def build_ass(timings, out_path: Path, W=1080, H=1920):
def ts(s):
h = int(s // 3600); m = int((s % 3600) // 60); sec = s % 60
return f"{h}:{m:02d}:{sec:05.2f}"
header = (
"[Script Info]\nScriptType: v4.00+\n"
f"PlayResX: {W}\nPlayResY: {H}\nScaledBorderAndShadow: yes\n\n"
"[V4+ Styles]\nFormat: Name, Fontname, Fontsize, PrimaryColour, SecondaryColour, "
"OutlineColour, BackColour, Bold, Italic, Underline, StrikeOut, ScaleX, ScaleY, "
"Spacing, Angle, BorderStyle, Outline, Shadow, Alignment, MarginL, MarginR, MarginV, Encoding\n"
"Style: Default,Arial,72,&H00FFFFFF,&H000000FF,&H00000000,&H80000000,"
"-1,0,0,0,100,100,0,0,1,3,2,2,50,50,80,1\n\n"
"[Events]\nFormat: Layer, Start, End, Style, Name, MarginL, MarginR, MarginV, Effect, Text\n")
lines = [header]
for chunk in [timings[i:i+5] for i in range(0, len(timings), 5)]:
if not chunk: continue
text = " ".join(w["word"] for w in chunk)
lines.append(f"Dialogue: 0,{ts(chunk[0]['start'])},{ts(chunk[-1]['end'])},Default,,0,0,0,,{text}\n")
out_path.write_text("".join(lines), encoding="utf-8")
def step_video(run: Path):
log.info("STEP 5: VIDEO ASSEMBLY")
adj = json.loads((run / "adjusted_timings.json").read_text())
dur = float((run / "audio_duration.txt").read_text().strip())
frames = sorted(run.glob("frame_*.png"))
if not frames: raise RuntimeError("No frames found")
W, H = 1080, 1920
frame_dur = dur / len(frames)
concat_file = run / "frames.txt"
with open(concat_file, "w") as f:
for fr in frames:
f.write(f"file '{fr}'\nduration {frame_dur:.6f}\n")
f.write(f"file '{frames[-1]}'\n")
slides = run / "slides.mp4"
r = subprocess.run([
"ffmpeg", "-y", "-f", "concat", "-safe", "0", "-i", str(concat_file),
"-vf", f"scale={W}:{H}:force_original_aspect_ratio=increase,crop={W}:{H}",
"-c:v", "libx264", "-preset", "fast", "-pix_fmt", "yuv420p", "-r", "30", str(slides)
], capture_output=True, text=True)
if r.returncode != 0: raise RuntimeError(f"Slides failed: {r.stderr[:400]}")
ass = run / "subtitles.ass"
build_ass(adj, ass, W, H)
final = run / "final.mp4"
r = subprocess.run([
"ffmpeg", "-y", "-i", str(slides), "-i", str(run / "voice_fast.wav"),
"-vf", f"ass={ass}",
"-c:v", "libx264", "-preset", "fast", "-c:a", "aac", "-b:a", "192k",
"-shortest", str(final)
], capture_output=True, text=True)
if r.returncode != 0: raise RuntimeError(f"Final video failed: {r.stderr[:400]}")
log.info(f"Final video: {final.stat().st_size / 1024 / 1024:.1f} MB")
save_state(run, "UPLOAD")
def step_upload(run: Path):
log.info("STEP 6: YOUTUBE UPLOAD")
data = json.loads((run / "content.json").read_text())
final = run / "final.mp4"
if not final.exists(): raise RuntimeError("final.mp4 not found")
try:
from sync_hub import pull_state
pull_state(BASE_DIR)
except: pass
creds = None
if TOKEN_FILE.exists():
with open(TOKEN_FILE, "rb") as f:
creds = pickle.load(f)
if not creds or not creds.valid:
if creds and creds.expired and creds.refresh_token:
from google.auth.transport.requests import Request
creds.refresh(Request())
with open(TOKEN_FILE, "wb") as f: pickle.dump(creds, f)
else:
raise RuntimeError(
"token.pickle missing or invalid. Run auth_youtube.py on your Mac, "
"then upload token.pickle to HF Dataset arshitmalik/yt-pipeline-data")
try:
from sync_hub import push_file
push_file(TOKEN_FILE)
except: pass
from googleapiclient.discovery import build
from googleapiclient.http import MediaFileUpload
yt = build("youtube", "v3", credentials=creds)
body = {
"snippet": {
"title": data.get("title", "Science Fact")[:100],
"description": data.get("fact", "") + "\n\n" + "\n".join(data.get("tags", [])),
"tags": data.get("tags", []),
"categoryId": "28",
"defaultLanguage": "en"
},
"status": {"privacyStatus": "public", "selfDeclaredMadeForKids": False}
}
media = MediaFileUpload(str(final), chunksize=4 * 1024 * 1024, resumable=True)
req = yt.videos().insert(part="snippet,status", body=body, media_body=media)
log.info("Uploading to YouTube...")
resp = None
while resp is None:
status, resp = req.next_chunk()
if status: log.info(f"Upload progress: {int(status.progress() * 100)}%")
vid_id = resp.get("id", "unknown")
log.info(f"Published: https://youtube.com/watch?v={vid_id}")
atomic_write(run / "youtube_id.txt", vid_id)
try:
from sync_hub import push_all_state
push_all_state(BASE_DIR)
except: pass
save_state(run, "DONE")
STEPS = {
"CONTENT": step_content,
"FRAMES": step_frames,
"TTS": step_tts,
"AUDIO": step_audio,
"VIDEO": step_video,
"UPLOAD": step_upload,
}
def main():
log.info("=" * 56)
log.info("YouTube Shorts Pipeline — STARTING")
log.info(f"DeepSeek-R1-32B (Kaggle) | FLUX (Kaggle) | StyleTTS2 (Kaggle)")
log.info(f"Frames: {NUM_FRAMES} | Speed: {VIDEO_SPEED}x | Accounts: {len(KAGGLE_ACCOUNTS)}")
log.info("=" * 56)
try:
from sync_hub import pull_state
pull_state(BASE_DIR)
except: pass
NEXT_RUN_AFTER = time.time() + 15
while True:
if TRIGGER_SKIP_FILE.exists():
TRIGGER_SKIP_FILE.unlink(missing_ok=True)
NEXT_RUN_AFTER = time.time() + 86400
log.info("Skipped. Next run in 24h.")
time.sleep(60); continue
if TRIGGER_RUN_FILE.exists():
TRIGGER_RUN_FILE.unlink(missing_ok=True)
NEXT_RUN_AFTER = 0
log.info("Triggered immediate run!")
if time.time() < NEXT_RUN_AFTER:
left = NEXT_RUN_AFTER - time.time()
if int(left) % 300 < 30:
log.info(f"Next run in {left/3600:.1f}h ...")
time.sleep(30); continue
run = None
if RUNS_DIR.exists():
for d in sorted(RUNS_DIR.iterdir(), key=lambda x: x.stat().st_mtime, reverse=True):
if d.is_dir() and load_state(d) not in ("DONE", "FAILED"):
run = d; break
if not run:
run = RUNS_DIR / datetime.now().strftime("%Y%m%d_%H%M%S")
run.mkdir(parents=True)
save_state(run, "CONTENT")
state = load_state(run)
log.info(f"\nRun: {run.name} Stage: {state}")
try:
if state in STEPS:
STEPS[state](run)
else:
save_state(run, "FAILED")
except KeyboardInterrupt:
log.info("Interrupted"); sys.exit(0)
except Exception as e:
log.error(f"Error in {state}: {e}")
traceback.print_exc()
save_error_log(run, e, state)
save_state(run, "FAILED")
cur = load_state(run)
if cur == "DONE":
content = json.loads((run / "content.json").read_text())
yt_id = (run / "youtube_id.txt").read_text().strip() if (run / "youtube_id.txt").exists() else "?"
log.info(f"\nCOMPLETE: {content.get('title', '')} — https://youtube.com/watch?v={yt_id}")
try: shutil.rmtree(run)
except: pass
NEXT_RUN_AFTER = time.time() + 86400
log.info("Next run in 24h.")
elif cur == "FAILED":
log.error(f"FAILED — check: {run}")
NEXT_RUN_AFTER = time.time() + 300
log.info("Retrying in 5 min.")
if __name__ == "__main__":
main()