| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| |
|
| | import os |
| | import io |
| | import json |
| | import time |
| | import hashlib |
| | import shutil |
| | import zipfile |
| | import random |
| | import base64 |
| | import datetime as dt |
| | from pathlib import Path |
| | import xml.etree.ElementTree as ET |
| | import urllib.request |
| | import urllib.error |
| | import ssl |
| | import re |
| |
|
| | import streamlit as st |
| | try: |
| | from openai import OpenAI |
| | except Exception: |
| | OpenAI = None |
| |
|
| | APP_TITLE = "🎙️ Whizbang Podcast Creator Thingee" |
| | DATA_DIR = Path("podcast_data") |
| | DEFAULT_EXCHANGES = 5 |
| | MAX_UPLOAD_BYTES = 10 * 1024 * 1024 |
| | PAGE_SIZE_DEFAULT = 25 |
| | HIDE_DELETE = True |
| | HIDE_PAGINATION = True |
| |
|
| | VOICES = [ |
| | {"voice": "Alloy", "gender": "Female"}, |
| | {"voice": "Ash", "gender": "Male"}, |
| | {"voice": "Ballad", "gender": "Male"}, |
| | {"voice": "Coral", "gender": "Female"}, |
| | {"voice": "Echo", "gender": "Male"}, |
| | {"voice": "Fable", "gender": "Female"}, |
| | {"voice": "Onyx", "gender": "Male"}, |
| | {"voice": "Nova", "gender": "Female"}, |
| | {"voice": "Sage", "gender": "Female"}, |
| | {"voice": "Shimmer", "gender": "Female"}, |
| | {"voice": "Verse", "gender": "Male"}, |
| | ] |
| |
|
| | VIBES = [ |
| | {"vibe": "Calm","instructions": "Voice Affect: Calm, composed, reassuring. Tone sincere and empathetic. Moderate pacing."}, |
| | {"vibe": "Mad Scientist","instructions": "Exaggerated, theatrical delivery with gleeful cackles; high energy."}, |
| | {"vibe": "Chill Surfer","instructions": "Laid-back, mellow; stretched vowels; slow tempo."}, |
| | {"vibe": "Dramatic","instructions": "Low, hushed, suspenseful; slow pacing; restrained intensity."}, |
| | {"vibe": "Professional","instructions": "Clear, authoritative, composed; neutral and informative."}, |
| | {"vibe": "Sincere","instructions": "Calm and empathetic; slower for apology, faster for solutions."}, |
| | {"vibe": "Fitness Instructor","instructions": "High-energy, upbeat; punchy sentences; motivational."}, |
| | {"vibe": "NYC Cabbie","instructions": "Gruff, quick, clipped rhythm; no-nonsense with dry humor."}, |
| | {"vibe": "Connoisseur","instructions": "Slight French accent; refined, warm; deliberate pauses."}, |
| | {"vibe": "Patient Teacher","instructions": "Warm, instructive; slow, articulate; supportive."}, |
| | {"vibe": "Smooth Jazz DJ","instructions": "Deep, velvety; melodic phrasing; relaxed vibe."}, |
| | {"vibe": "Medieval Knight","instructions": "Deep, formal; noble cadence with archaic flair."}, |
| | {"vibe": "Friendly","instructions": "Cheerful, clear, reassuring; brief pauses after key steps."}, |
| | {"vibe": "Gourmet Chef","instructions": "Exuberant Italian chef; passionate and persuasive."}, |
| | {"vibe": "Old-Timey","instructions": "Refined, theatrical; crisp enunciation; steady cadence."}, |
| | {"vibe": "Noir Detective","instructions": "Cool, detached; slow and deliberate with dramatic pauses."}, |
| | {"vibe": "Emo Teenager","instructions": "Sarcastic, disinterested; monotone with sighs."}, |
| | {"vibe": "Sports Coach","instructions": "Energetic, animated; rapid for action; positive energy."}, |
| | {"vibe": "Sympathetic","instructions": "Warm, empathetic, professional; calm delivery."}, |
| | {"vibe": "Robot","instructions": "Monotone, mechanical; efficient and precise."}, |
| | {"vibe": "Eternal Optimist","instructions": "Warm, upbeat; solution-oriented; emphasize key words."}, |
| | {"vibe": "Auctioneer","instructions": "Fast-paced, rhythmic; exciting and persuasive."}, |
| | {"vibe": "Dramatic (Alt)","instructions": "Low, hushed; serious and mysterious; slow pacing."}, |
| | {"vibe": "True Crime Buff","instructions": "Deep, hushed; short, rhythmic phrasing; ominous tone."}, |
| | ] |
| |
|
| | TONE_DESCRIPTIONS = { |
| | "Informative": "Clear, factual, and helpful; prioritizes clarity over flair.", |
| | "Authoritative": "Confident, expert, and decisive; strong statements and tight structure.", |
| | "Inspirational": "Uplifting, aspirational, and future-focused; evokes possibility.", |
| | "Empathetic": "Warm, validating, and supportive; acknowledges feelings.", |
| | "Persuasive": "Benefit-led, objection-aware, and action-oriented.", |
| | "Educational": "Stepwise, scaffolded explanations with simple examples.", |
| | "Conversational": "Casual and back-and-forth; shorter sentences and light callbacks.", |
| | "Humorous": "Light wit, playful framing, and occasional punchlines.", |
| | "Storytelling": "Narrative-driven; uses scenes, stakes, and callbacks.", |
| | "Casual/Relaxed": "Easy-going, low-pressure pacing with informal phrasing.", |
| | "Charming": "Personable and a bit witty; smiles you can hear.", |
| | "Passionate": "High-energy enthusiasm and vivid phrasing.", |
| | "Dramatic": "Tension-building, cinematic rhythm; pauses for effect.", |
| | "Enthusiastic": "Upbeat momentum, positive framing, high engagement.", |
| | "Optimistic": "Hopeful outlook; solution-oriented framing.", |
| | "Mysterious": "Intriguing hints, restrained reveals, and suspense." |
| | } |
| |
|
| | |
| | def slugify(text: str) -> str: |
| | keep = "-_.() abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789" |
| | cleaned = "".join(c for c in text if c in keep).strip().replace(" ", "-") |
| | return cleaned.lower() or f"podcast-{int(time.time())}" |
| |
|
| | def now_iso() -> str: |
| | return dt.datetime.now().strftime("%Y-%m-%d %H:%M:%S") |
| |
|
| | def ensure_dirs(base: Path): |
| | base.mkdir(parents=True, exist_ok=True) |
| | (base / "parts").mkdir(parents=True, exist_ok=True) |
| |
|
| | def id3v2_strip(b: bytes) -> bytes: |
| | if len(b) >= 10 and b[0:3] == b"ID3": |
| | size = ((b[6] & 0x7f) << 21) | ((b[7] & 0x7f) << 14) | ((b[8] & 0x7f) << 7) | (b[9] & 0x7f) |
| | header_len = 10 + size |
| | return b[header_len:] |
| | return b |
| |
|
| | def id3v1_strip(b: bytes) -> bytes: |
| | if len(b) >= 128 and b[-128:-125] == b"TAG": |
| | return b[:-128] |
| | return b |
| |
|
| | def safe_concat_mp3(mp3_paths, out_path: Path): |
| | with open(out_path, "wb") as out: |
| | for i, p in enumerate(mp3_paths): |
| | data = Path(p).read_bytes() |
| | if i == 0: |
| | out.write(data) |
| | else: |
| | frames = id3v1_strip(id3v2_strip(data)) |
| | out.write(frames) |
| |
|
| | def read_docx_text(docx_bytes: bytes) -> str: |
| | try: |
| | with zipfile.ZipFile(io.BytesIO(docx_bytes)) as z: |
| | with z.open("word/document.xml") as f: |
| | xml = f.read() |
| | root = ET.fromstring(xml) |
| | ns = {"w": "http://schemas.openxmlformats.org/wordprocessingml/2006/main"} |
| | texts = [t.text for t in root.findall(".//w:t", ns) if t.text] |
| | return "\\n".join(texts).strip() |
| | except Exception: |
| | return "" |
| |
|
| | def load_index(): |
| | DATA_DIR.mkdir(exist_ok=True, parents=True) |
| | idx = DATA_DIR / "index.json" |
| | if idx.exists(): |
| | try: |
| | return json.loads(idx.read_text()) |
| | except Exception: |
| | pass |
| | return {"podcasts": []} |
| |
|
| | def save_index(index): |
| | (DATA_DIR / "index.json").write_text(json.dumps(index, indent=2)) |
| |
|
| | def uniquify_title(title: str, index: dict) -> str: |
| | base = title.strip() |
| | import re as _re |
| | m = _re.search(r"\\s*\\((\\d+)\\)\\s*$", base) |
| | if m: |
| | base = base[:m.start()].rstrip() |
| | existing = {(p.get("title","")).strip().lower() for p in index.get("podcasts", [])} |
| | if base.lower() not in existing: |
| | return base |
| | n = 2 |
| | while f"{base} ({n})".lower() in existing: |
| | n += 1 |
| | return f"{base} ({n})" |
| |
|
| | def uniquify_slug(slug: str, index: dict) -> str: |
| | base = slug.strip().lower() |
| | existing = {(p.get("slug","")).strip().lower() for p in index.get("podcasts", [])} |
| | if base not in existing: |
| | return base |
| | n = 2 |
| | while f"{base}-{n}" in existing: |
| | n += 1 |
| | return f"{base}-{n}" |
| |
|
| | def get_podcast_dir(slug: str) -> Path: |
| | return DATA_DIR / slug |
| |
|
| | def load_definition(slug: str): |
| | f = get_podcast_dir(slug) / "definition.json" |
| | if f.exists(): |
| | return json.loads(f.read_text()) |
| | return None |
| |
|
| | def save_definition(slug: str, definition: dict): |
| | pdir = get_podcast_dir(slug) |
| | ensure_dirs(pdir) |
| | (pdir / "definition.json").write_text(json.dumps(definition, indent=2)) |
| |
|
| | def load_script(slug: str): |
| | f = get_podcast_dir(slug) / "script.json" |
| | if f.exists(): |
| | return json.loads(f.read_text()) |
| | return None |
| |
|
| | def save_script(slug: str, script: dict): |
| | pdir = get_podcast_dir(slug) |
| | ensure_dirs(pdir) |
| | (pdir / "script.json").write_text(json.dumps(script, indent=2)) |
| |
|
| | def part_path(slug: str, i: int) -> Path: |
| | return get_podcast_dir(slug) / "parts" / f"{i:03d}.mp3" |
| |
|
| | def full_mp3_path(slug: str) -> Path: |
| | return get_podcast_dir(slug) / f"{slug}.mp3" |
| |
|
| | def compute_text_hash(text: str) -> str: |
| | return hashlib.sha256(text.encode("utf-8")).hexdigest()[:16] |
| |
|
| | |
| | _LABEL_RE = re.compile(r'^\s*(?:\[?\s*([A-Za-z0-9 ._\-]{1,40})\s*\]?\s*[:\-—]\s*)(.*)$') |
| | def strip_speaker_label(text: str) -> str: |
| | m = _LABEL_RE.match(text or "") |
| | return (m.group(2).strip() if m else (text or "").strip()) |
| |
|
| | |
| | def voice_label_for(voice: str) -> str: |
| | v = (voice or "").strip() |
| | for item in VOICES: |
| | if item["voice"].lower() == v.lower(): |
| | return f"{item['voice']} ({item['gender']})" |
| | return f"{v or 'Alloy'} (Female)" |
| |
|
| | def ensure_select_value(key: str, options: list, preferred_label: str, default_index: int): |
| | if preferred_label in options: |
| | st.session_state[key] = preferred_label |
| | else: |
| | try: |
| | token = (preferred_label or "").split(" ")[0] |
| | except Exception: |
| | token = "" |
| | idx = next((i for i, opt in enumerate(options) if opt.split(" ")[0].lower() == token.lower()), default_index) |
| | st.session_state[key] = options[idx] |
| |
|
| | |
| | _OPENAI_API_BASE = os.environ.get("OPENAI_API_BASE", "https://api.openai.com/v1") |
| | _SSL_CTX = ssl.create_default_context() |
| |
|
| | def _bearer_headers(extra=None): |
| | key = os.environ.get("OPENAI_API_KEY", "").strip() |
| | if not key: |
| | raise RuntimeError("OPENAI_API_KEY environment variable is not set.") |
| | base = {"Authorization": f"Bearer {key}"} |
| | if extra: |
| | base.update(extra) |
| | return base |
| |
|
| | def http_post_json(path: str, payload: dict, expect_json=True, as_bytes=False): |
| | url = f"{_OPENAI_API_BASE.rstrip('/')}/{path.lstrip('/')}" |
| | data = json.dumps(payload).encode("utf-8") |
| | req = urllib.request.Request(url, data=data, method="POST") |
| | hdrs = _bearer_headers({"Content-Type": "application/json"}) |
| | for k,v in hdrs.items(): |
| | req.add_header(k, v) |
| | with urllib.request.urlopen(req, context=_SSL_CTX, timeout=120) as resp: |
| | raw = resp.read() |
| | if as_bytes: |
| | return raw |
| | if expect_json: |
| | return json.loads(raw.decode("utf-8")) |
| | return raw |
| |
|
| | def select_voice_name(v: str) -> str: |
| | return (v or "alloy").strip().lower() |
| |
|
| | def vibe_instructions(vibe_name: str) -> str: |
| | for v in VIBES: |
| | if v["vibe"] == vibe_name: |
| | return v["instructions"].strip() |
| | return "" |
| |
|
| | |
| | TONE_DESCRIPTIONS = TONE_DESCRIPTIONS |
| |
|
| | def _build_prompts(defn: dict): |
| | title = defn.get("title","") |
| | episode = defn.get("episode","") |
| | topic = defn.get("topic","") |
| | tone = defn.get("tone","") |
| | h1 = defn.get("host1",{}); h2 = defn.get("host2",{}) |
| | vibe1 = h1.get("vibe",""); vibe2 = h2.get("vibe","") |
| | vibe1_instr = vibe_instructions(vibe1); vibe2_instr = vibe_instructions(vibe2) |
| | tone_desc = TONE_DESCRIPTIONS.get(tone, "") |
| |
|
| | system_prompt = ( |
| | "You are a seasoned podcast scriptwriter. Return ONLY JSON.\\n" |
| | "JSON shape MUST be: {\\\"turns\\\":[{\\\"speaker\\\":0|1, \\\"text\\\":\\\"...\\\"}, ...]}.\\n" |
| | "Requirements:\\n" |
| | "- EXACTLY N exchanges (2*N turns), strictly alternating speakers 0 then 1, then 0 then 1, ...\\n" |
| | "- Keep each turn ~1–3 sentences; compress ideas if needed, DO NOT reduce turn count.\\n" |
| | "- Dialog may optionally begin with a speaker label (e.g., \\\"Alex:\\\" or \\\"Host 0:\\\"), which will be preserved.\\n" |
| | "- The 'speaker' numeric field indicates who is speaking." |
| | ) |
| |
|
| | support = "\\n\\n".join(defn.get("supporting_texts") or []) |
| |
|
| | user_prompt = f"""Podcast: {title} |
| | Episode: {episode} |
| | Topic (paraphrase allowed for natural delivery): {topic} |
| | Script Tone: {tone} — {tone_desc} |
| | |
| | Host 0 |
| | - Name: {h1.get('name','Host 0')} |
| | - Voice (TTS target): {h1.get('voice','Alloy')} |
| | - Vibe: {vibe1} |
| | - Vibe instructions: {vibe1_instr[:800]} |
| | |
| | Host 1 |
| | - Name: {h2.get('name','Host 1')} |
| | - Voice (TTS target): {h2.get('voice','Echo')} |
| | - Vibe: {vibe2} |
| | - Vibe instructions: {vibe2_instr[:800]} |
| | |
| | Intro/Outro directives: |
| | - FIRST LINE (speaker 0) should INTRODUCE the podcast title ("{title}"), the episode ("{episode}"), and the subject ("{topic}") — feel free to paraphrase the subject for naturalness — and greet Host 1 by name. |
| | - Host 1 (their first line) acknowledges Host 0 by name. |
| | - FINAL LINES provide a succinct OUTRO that sums things up. |
| | |
| | N = {int(defn.get("exchanges", DEFAULT_EXCHANGES))} |
| | |
| | If helpful, incorporate relevant information from the supporting materials (optional). Keep tone aligned with the Script Tone. |
| | Supporting materials (optional): |
| | {support} |
| | |
| | OUTPUT SCHEMA (return ONLY JSON): |
| | {{"turns":[{{"speaker":0,"text":"(You may start with an optional speaker label like 'Alex: ' before the text)"}}, ...]}}""" |
| | return system_prompt, user_prompt |
| |
|
| | def _chat_json(system_prompt: str, user_prompt: str): |
| | if OpenAI is not None: |
| | try: |
| | client = OpenAI(api_key=os.environ.get("OPENAI_API_KEY","").strip()) |
| | completion = client.chat.completions.create( |
| | model="gpt-4o-mini", |
| | response_format={"type": "json_object"}, |
| | messages=[{"role": "system", "content": system_prompt},{"role": "user", "content": user_prompt}], |
| | temperature=0.6, |
| | ) |
| | return json.loads(completion.choices[0].message.content) |
| | except Exception: |
| | pass |
| | payload = {"model":"gpt-4o-mini","response_format":{"type":"json_object"},"messages":[{"role":"system","content":system_prompt},{"role":"user","content":user_prompt}], "temperature":0.6} |
| | resp = http_post_json("/chat/completions", payload, expect_json=True) |
| | return json.loads(resp["choices"][0]["message"]["content"]) |
| |
|
| | def _continue_json(defn: dict, existing_turns: list, need_total: int): |
| | title = defn.get("title",""); episode = defn.get("episode",""); topic = defn.get("topic","") |
| | tone = defn.get("tone",""); tone_desc = TONE_DESCRIPTIONS.get(tone, "") |
| | n_have = len(existing_turns); next_speaker = 0 if n_have % 2 == 0 else 1 |
| | cont_system = ("Return ONLY JSON {\"turns\":[...]}. Continue until total turns == NeedTotal. " |
| | "Strict alternation; ~1–3 sentences each; keep Script Tone consistent. Labels allowed.") |
| | context_tail = json.dumps(existing_turns[-6:], ensure_ascii=False) |
| | cont_user = f"""Podcast: {title} — Episode: {episode} — Topic: {topic} |
| | Script Tone: {tone} — {tone_desc} |
| | We currently have {n_have} turns. NeedTotal = {need_total}. Next speaker should be {next_speaker}. |
| | Context (last turns): |
| | {context_tail} |
| | Return ONLY the additional turns needed to reach NeedTotal. |
| | """ |
| | try: |
| | data = _chat_json(cont_system, cont_user) |
| | return data.get("turns", []) |
| | except Exception: |
| | return [] |
| |
|
| | def generate_script(definition: dict) -> dict: |
| | exchanges = int(definition.get("exchanges", DEFAULT_EXCHANGES)) |
| | need_total = exchanges * 2 |
| | system_prompt, user_prompt = _build_prompts(definition) |
| | try: |
| | data = _chat_json(system_prompt, user_prompt) |
| | except Exception: |
| | data = {"turns": []} |
| | turns = (data.get("turns") or []) |
| |
|
| | if len(turns) < need_total: |
| | extra = _continue_json(definition, turns, need_total) |
| | if extra: |
| | turns += extra |
| | while len(turns) < need_total: |
| | i = len(turns); who = 0 if i % 2 == 0 else 1 |
| | host = definition.get("host1") if who == 0 else definition.get("host2") |
| | name = (host or {}).get("name", f"Host {who}") |
| | topic = definition.get("topic",""); tone = definition.get("tone","") |
| | turns.append({"speaker": who, "text": f"{name}: Quick takeaway on {topic.lower()}—in a {tone.lower()} style."}) |
| |
|
| | data["turns"] = turns[:need_total] |
| | data["prompt"] = {"system": system_prompt, "user": user_prompt} |
| | return data |
| |
|
| | def validate_and_fix_script(definition: dict, script: dict) -> dict: |
| | exchanges = int(definition.get("exchanges", DEFAULT_EXCHANGES)) |
| | total_needed = exchanges * 2 |
| | fixed = [] |
| | for t in script.get("turns") or []: |
| | who = 0 if int(t.get("speaker", 0)) == 0 else 1 |
| | text = str(t.get("text", "")).strip() or "…" |
| | fixed.append({"speaker": who, "text": text}) |
| | out = [] |
| | for i in range(total_needed): |
| | expected = 0 if i % 2 == 0 else 1 |
| | if i < len(fixed): |
| | out.append({"speaker": expected, "text": fixed[i]["text"]}) |
| | else: |
| | host = definition.get("host1") if expected == 0 else definition.get("host2") |
| | name = (host or {}).get("name", f"Host {expected}") |
| | topic = definition.get("topic",""); tone = definition.get("tone","") |
| | out.append({"speaker": expected, "text": f"{name}: Another point on {topic.lower()}—kept {tone.lower()}."}) |
| | script["turns"] = out[:total_needed] |
| | return script |
| |
|
| | |
| | def tts_synthesize(text: str, voice: str, instructions: str) -> bytes: |
| | voice_name = select_voice_name(voice) |
| | if OpenAI is not None: |
| | try: |
| | client = OpenAI(api_key=os.environ.get("OPENAI_API_KEY","").strip()) |
| | with client.audio.speech.with_streaming_response.create( |
| | model="gpt-4o-mini-tts", voice=voice_name, input=text, format="mp3", |
| | instructions=instructions or None |
| | ) as response: |
| | return response.read() |
| | except Exception: |
| | pass |
| | payload = {"model": "gpt-4o-mini-tts", "voice": voice_name, "input": text, "format": "mp3"} |
| | if instructions: payload["instructions"] = instructions |
| | return http_post_json("/audio/speech", payload, expect_json=False, as_bytes=True) |
| |
|
| | def synthesize_all_parts(slug: str, script: dict, definition: dict, progress_cb=None): |
| | pdir = get_podcast_dir(slug); ensure_dirs(pdir) |
| | turns = script.get("turns", []); total = len(turns) |
| |
|
| | cache_map = {} |
| | cache_file = pdir / "part_hashes.json" |
| | if cache_file.exists(): |
| | try: cache_map = json.loads(cache_file.read_text()) |
| | except Exception: cache_map = {} |
| |
|
| | h1 = definition.get("host1",{}); h2 = definition.get("host2",{}) |
| | v1 = vibe_instructions(h1.get("vibe","")); v2 = vibe_instructions(h2.get("vibe","")) |
| | tone = definition.get("tone",""); tone_desc = TONE_DESCRIPTIONS.get(tone, "") |
| | extra_tone = f" | Global Script Tone: {tone} — {tone_desc}" if tone_desc else f" | Global Script Tone: {tone}" |
| | voice1 = h1.get("voice","Alloy"); voice2 = h2.get("voice","Alloy") |
| |
|
| | per_turn_ms = []; t0 = time.time() |
| | for i, turn in enumerate(turns): |
| | who = int(turn.get("speaker", 0)) |
| | raw_text = (turn.get("text") or "").strip() |
| | tts_text = strip_speaker_label(raw_text) |
| | voice = voice1 if who == 0 else voice2 |
| | instr = (v1 if who == 0 else v2) + extra_tone |
| |
|
| | h = compute_text_hash(f"{who}:{voice}:{instr}:{tts_text}") |
| | partfile = part_path(slug, i) |
| | if cache_map.get(f"{i}") == h and partfile.exists(): |
| | if progress_cb: progress_cb(i+1, total, f"Skipped (unchanged) part {i+1}/{total}") |
| | per_turn_ms.append(0.0); continue |
| |
|
| | if progress_cb: progress_cb(i+1, total, f"Synthesizing part {i+1}/{total} (voice: {voice})…") |
| | s = time.time(); audio = tts_synthesize(tts_text, voice, instr); e = time.time() |
| | partfile.write_bytes(audio); cache_map[f"{i}"] = h |
| | per_turn_ms.append((e - s) * 1000.0) |
| |
|
| | cache_file.write_text(json.dumps(cache_map, indent=2)) |
| |
|
| | parts = [part_path(slug, i) for i in range(total)] |
| | out = full_mp3_path(slug) |
| | if progress_cb: progress_cb(total, total, "Concatenating MP3…") |
| | safe_concat_mp3(parts, out) |
| | total_ms = (time.time() - t0) * 1000.0 |
| | return out, per_turn_ms, total_ms |
| |
|
| | |
| | def on_title_change(slug: str, key: str): |
| | new_title = st.session_state.get(key, "").strip() |
| | if new_title: |
| | do_rename(slug, new_title) |
| | st.toast('Title updated') |
| |
|
| | def export_zip(slug: str) -> bytes: |
| | pdir = get_podcast_dir(slug) |
| | mem = io.BytesIO() |
| | with zipfile.ZipFile(mem, "w", compression=zipfile.ZIP_DEFLATED) as z: |
| | for root, _, files in os.walk(pdir): |
| | for f in files: |
| | path = Path(root) / f |
| | arc = str(path.relative_to(pdir.parent)) |
| | z.write(path, arcname=arc) |
| | mem.seek(0) |
| | return mem.read() |
| |
|
| | def host_picker(label_prefix: str, default_name="", key_prefix="h"): |
| | cols = st.columns(3) |
| | name_key = f"{key_prefix}_name" |
| | name = cols[0].text_input(f"{label_prefix} Name", value=st.session_state.get(name_key, default_name), key=name_key) |
| |
|
| | voice_options = [f"{v['voice']} ({v['gender']})" for v in VOICES] |
| | default_voice = "Alloy" if key_prefix == "h1" else "Echo" |
| | default_voice_index = next((i for i, opt in enumerate(voice_options) if opt.split(" ")[0].lower()==default_voice.lower()), 0) |
| | voice_key = f"{key_prefix}_voice" |
| | desired_label = st.session_state.get(voice_key, voice_options[default_voice_index]) |
| | ensure_select_value(voice_key, voice_options, desired_label, default_voice_index) |
| | voice_choice = cols[1].selectbox(f"{label_prefix} Voice", options=voice_options, key=voice_key) |
| | voice = voice_choice.split(" ")[0] |
| |
|
| | vibe_names = [v["vibe"] for v in VIBES] |
| | default_vibe = "Calm" if key_prefix == "h1" else "Dramatic" |
| | default_vibe_index = vibe_names.index(default_vibe) if default_vibe in vibe_names else 0 |
| | vibe_key = f"{key_prefix}_vibe" |
| | if st.session_state.get(vibe_key) not in vibe_names: |
| | st.session_state[vibe_key] = vibe_names[default_vibe_index] |
| | vibe = cols[2].selectbox(f"{label_prefix} Vibe", options=vibe_names, key=vibe_key) |
| |
|
| | return {"name": name, "voice": voice, "vibe": vibe} |
| |
|
| | def open_details(slug: str): |
| | st.session_state["mode"] = "details"; st.session_state["selected_slug"] = slug |
| |
|
| | def open_edit(slug: str): |
| | st.session_state['mode'] = 'edit'; st.session_state['selected_slug'] = slug |
| |
|
| | def set_mode(mode: str): |
| | st.session_state["mode"] = mode |
| |
|
| | def do_rename(slug: str, new_title: str): |
| | index = load_index() |
| | for p in index["podcasts"]: |
| | if p["slug"] == slug: |
| | p["title"] = new_title.strip() or p["title"] |
| | save_index(index); break |
| |
|
| | def do_delete(slug: str): |
| | index = load_index() |
| | index["podcasts"] = [p for p in index["podcasts"] if p["slug"] != slug] |
| | save_index(index) |
| | pdir = get_podcast_dir(slug) |
| | if pdir.exists(): shutil.rmtree(pdir, ignore_errors=True) |
| |
|
| | def clone_podcast(slug: str): |
| | definition = load_definition(slug) |
| | if not definition: |
| | st.toast("Unable to clone: missing definition", icon="⚠️"); return |
| | idx = load_index() |
| | new_title = uniquify_title(definition.get("title","Untitled"), idx) |
| |
|
| | ep = (definition.get("episode") or "").strip() |
| | m = re.search(r'(\\d+)\\s*$', ep) |
| | new_episode = re.sub(r'(\\d+)\\s*$', lambda mm: str(int(mm.group(1))+1), ep) if m else "Episode 1" |
| |
|
| | st.session_state["new_title"] = new_title |
| | st.session_state["new_episode"] = new_episode |
| | st.session_state["new_topic"] = definition.get("topic","") |
| | st.session_state["new_exchanges"] = int(definition.get("exchanges", DEFAULT_EXCHANGES)) |
| | st.session_state["new_tone"] = definition.get("tone","Informative") |
| |
|
| | h1 = definition.get("host1",{}); h2 = definition.get("host2",{}) |
| | st.session_state["h1_name"] = h1.get("name","Host One") |
| | st.session_state["h2_name"] = h2.get("name","Host Two") |
| | st.session_state["h1_voice"] = voice_label_for(h1.get("voice","Alloy")) |
| | st.session_state["h2_voice"] = voice_label_for(h2.get("voice","Echo")) |
| | st.session_state["h1_vibe"] = h1.get("vibe","Calm") |
| | st.session_state["h2_vibe"] = h2.get("vibe","Dramatic") |
| |
|
| | set_mode("create") |
| |
|
| | |
| | def list_podcasts_ui(index): |
| | st.header("Your Podcasts") |
| | |
| | st.button("Create New Podcast", key="create_new_top", on_click=lambda: set_mode("create")) |
| |
|
| | |
| | qp = st.experimental_get_query_params() |
| | act = qp.get("action", []) |
| | slug_vals = qp.get("slug", []) |
| | act_val = (act[0] if isinstance(act, list) and act else act) or "" |
| | slug_val = (slug_vals[0] if isinstance(slug_vals, list) and slug_vals else slug_vals) or "" |
| | if act_val == "x_delete_disabled" and slug_val: |
| | do_delete(slug_val) |
| | try: |
| | st.experimental_set_query_params() |
| | except Exception: |
| | pass |
| | st.success("Podcast deleted.") |
| | st.rerun() |
| |
|
| | |
| | pods = list(index.get("podcasts", [])) |
| | pods.sort(key=lambda p: p.get("created_at",""), reverse=True) |
| |
|
| | if "list_page" not in st.session_state: |
| | st.session_state["list_page"] = 0 |
| | if "list_page_size" not in st.session_state: |
| | st.session_state["list_page_size"] = PAGE_SIZE_DEFAULT |
| |
|
| | if not pods: |
| | st.info("No podcasts yet. Click **Create New Podcast** to get started."); return |
| |
|
| | total = len(pods) |
| | page_size = st.session_state["list_page_size"] |
| | max_page = (total - 1) // page_size |
| |
|
| | start = st.session_state["list_page"] * page_size |
| | end = min(start + page_size, total) |
| | view = pods[start:end] |
| |
|
| | |
| | if "dl_zip_slug" not in st.session_state: |
| | st.session_state["dl_zip_slug"] = None |
| | if "confirm_delete_slug" not in st.session_state: |
| | st.session_state["confirm_delete_slug"] = None |
| |
|
| | for p in view: |
| | slug = p["slug"]; mp3 = full_mp3_path(slug) |
| | cols = st.columns([3,6,1,1,1,1,1], gap="small") |
| | with cols[0]: |
| | title_key = f"title_{slug}" |
| | if title_key not in st.session_state: st.session_state[title_key] = p["title"] |
| | new_title = st.text_input("", key=title_key, label_visibility="collapsed") |
| | if new_title != p["title"] and new_title.strip(): |
| | p["title"] = new_title.strip(); save_index(index) |
| | with cols[1]: |
| | if mp3.exists(): |
| | with open(mp3, "rb") as f: st.audio(f.read(), format="audio/mp3") |
| | else: st.caption("No audio yet") |
| | with cols[2]: |
| | if mp3.exists(): |
| | with open(mp3, "rb") as _f: |
| | st.download_button("MP3", data=_f.read(), file_name=f"{slug}.mp3", key=f"dlmp3_{slug}", help="Download MP3") |
| | else: |
| | st.caption("No audio yet") |
| | with cols[3]: |
| | zbytes = export_zip(slug) |
| | st.download_button("Export", data=zbytes, file_name=f"{slug}.zip", key=f"save_zip_{slug}", help="Export Podcast (.zip)") |
| | with cols[4]: |
| | st.button("Edit", key=f"edit_{slug}", help="Edit Script", on_click=lambda s=slug: open_edit(s)) |
| | with cols[5]: |
| | st.button("Clone", key=f"clone_{slug}", help="Clone", on_click=lambda s=slug: clone_podcast(s)) |
| | with cols[6]: |
| | if not HIDE_DELETE: |
| | st.markdown(f'<a href="?action=delete&slug={slug}" onclick="return confirm(\'Delete this podcast? This cannot be undone.\')" class="delbtn">Delete</a>', unsafe_allow_html=True) |
| | |
| | if st.session_state.get("confirm_delete_slug"): |
| | slug_c = st.session_state.get("confirm_delete_slug") |
| | st.warning(f"Delete podcast '{slug_c}'? This cannot be undone.") |
| | c1, c2 = st.columns([1,1]) |
| | with c1: |
| | if st.button("Confirm Delete", key="confirm_delete_now"): |
| | do_delete(slug_c) |
| | st.session_state["confirm_delete_slug"] = None |
| | st.success("Podcast deleted.") |
| | st.rerun() |
| | with c2: |
| | if st.button("Cancel", key="cancel_delete"): |
| | st.session_state["confirm_delete_slug"] = None |
| | st.rerun() |
| |
|
| | |
| |
|
| | if not HIDE_PAGINATION: |
| | colp1, colp2, colp3 = st.columns([2,6,2]) |
| | with colp1: |
| | if st.button("◀ Prev", disabled=st.session_state["list_page"] <= 0): |
| | st.session_state["list_page"] = max(0, st.session_state["list_page"] - 1); st.rerun() |
| | with colp2: |
| | st.caption(f"Showing {page_size} per page — Page {st.session_state['list_page']+1} of {max_page+1} (Total {total})") |
| | with colp3: |
| | if st.button("Next ▶", disabled=st.session_state["list_page"] >= max_page): |
| | st.session_state["list_page"] = min(max_page, st.session_state["list_page"] + 1); st.rerun() |
| |
|
| | st.markdown('<style>.stButton>button {padding:4px 6px; margin:0 2px; border:1px solid rgba(255,255,255,0.25);background:transparent; border-radius:8px; font-size:16px;} a.delbtn{display:inline-block;padding:4px 6px;margin:0 2px;border:1px solid #8a0000;border-radius:8px;text-decoration:none;color:#fff;background:#c00000;} a.delbtn:hover{background:#c00000;} .delwrap button{padding:4px 6px;margin:0 2px;border:1px solid #8a0000;border-radius:8px;background:#c00000;color:#fff;} .delwrap button:hover{background:#c00000;}</style>', unsafe_allow_html=True) |
| |
|
| | def create_form_ui(index): |
| | if st.session_state.get("generating") and st.session_state.get("pending_definition"): |
| | run_generation_flow(st.session_state["pending_definition"]); return |
| |
|
| | st.header("Create New Podcast") |
| | |
| | lucky = st.button("Feeling Lucky", key="lucky_btn") |
| | if lucky: |
| | sample_titles = ["Hot Takes Live","Acme Insider","Two Brains, One Mic","Curious Minds","Deep Dive Daily","Chatter Box", |
| | "Morning Buzz","Brainstorm Café","Weekend Roundtable","Idea Exchange","Think Tank Talks", |
| | "Pulse Check","Off the Record","The Daily Spark","Future Shock","Culture Chat","Signal & Noise", |
| | "The Conversation Lab","Trending Now","Mind Meld","Coffee & Code","Beyond the Hype","Lightning Round", |
| | "The Big Why","The Briefing Room","Micro Chats"] |
| | sample_topics = ["The future of AI in everyday life","How remote work changed cities","Five technologies shaping the next decade", |
| | "Lessons from famous product flops","What makes a great brand story","AI in healthcare","Space tourism prospects", |
| | "The psychology of habits","Virtual reality in classrooms","Cybersecurity for everyone","Quantum computing explained", |
| | "Designing ethical AI","Smart cities and sensors","Robotics at home"] |
| | sample_names = ["Alex","Sam","Jordan","Taylor","Casey","Riley","Morgan","Avery","Jamie","Quinn","Chris","Pat","Devon","Skyler","Cameron","Drew","Kendall","Peyton","Blake","Hayden","Logan","Shawn","Noel","Emerson","Harper","Reese","Rowan","Sage"] |
| | st.session_state["new_title"] = random.choice(sample_titles) |
| | st.session_state["new_topic"] = random.choice(sample_topics) |
| | n1 = random.choice(sample_names); n2 = random.choice([n for n in sample_names if n != n1]) |
| | st.session_state["h1_name"] = n1; st.session_state["h2_name"] = n2 |
| | v1 = random.choice(VOICES); v2 = random.choice([v for v in VOICES if v["voice"] != v1["voice"]]) |
| | st.session_state["h1_voice"] = f"{v1['voice']} ({v1['gender']})"; st.session_state["h2_voice"] = f"{v2['voice']} ({v2['gender']})" |
| | vibe_names = [v["vibe"] for v in VIBES]; vb1 = random.choice(vibe_names); vb2 = random.choice([v for v in vibe_names if v != vb1]) |
| | st.session_state["h1_vibe"] = vb1; st.session_state["h2_vibe"] = vb2 |
| | st.session_state["new_tone"] = random.choice(list(TONE_DESCRIPTIONS.keys())); st.session_state["new_episode"]="Episode 1" |
| | st.rerun() |
| |
|
| | with st.form("create_form"): |
| | col_title, col_episode = st.columns(2) |
| | with col_title: |
| | title = st.text_input("Title", key="new_title") |
| | with col_episode: |
| | episode = st.text_input("Episode", value=st.session_state.get("new_episode", "Episode 1"), key="new_episode") |
| |
|
| | col_topic, col_files = st.columns(2) |
| | with col_topic: |
| | topic = st.text_area("Topic", key="new_topic", height=98, placeholder="What should they talk about?") |
| | with col_files: |
| | files = st.file_uploader("Supporting file(s)", accept_multiple_files=True, type=["txt","docx"], key="new_support") |
| |
|
| | col_exch, col_tone = st.columns(2) |
| | with col_exch: |
| | exchanges = st.number_input("Exchanges", min_value=1, max_value=20, value=st.session_state.get("new_exchanges", DEFAULT_EXCHANGES), step=1, key="new_exchanges") |
| | with col_tone: |
| | tone = st.selectbox("Tone", options=list(TONE_DESCRIPTIONS.keys()), |
| | index=list(TONE_DESCRIPTIONS.keys()).index(st.session_state.get("new_tone","Informative")) if st.session_state.get("new_tone") in TONE_DESCRIPTIONS else 0, |
| | key="new_tone") |
| |
|
| | st.markdown("**Hosts**") |
| | host1 = host_picker("Host 1", default_name=st.session_state.get("h1_name","Host One"), key_prefix="h1") |
| | host2 = host_picker("Host 2", default_name=st.session_state.get("h2_name","Host Two"), key_prefix="h2") |
| |
|
| | submitted = st.form_submit_button("Generate Podcast", use_container_width=True) |
| |
|
| | if submitted: |
| | if not (title or "").strip(): st.error("Please provide a Title."); return |
| | episode_val = (st.session_state.get("new_episode") or "").strip() |
| | host1["name"] = st.session_state.get("h1_name", host1.get("name","Host One")) |
| | host2["name"] = st.session_state.get("h2_name", host2.get("name","Host Two")) |
| | host1["voice"] = (st.session_state.get("h1_voice") or "Alloy").split(" ")[0] |
| | host2["voice"] = (st.session_state.get("h2_voice") or "Echo").split(" ")[0] |
| | host1["vibe"] = st.session_state.get("h1_vibe", host1.get("vibe","Calm")) |
| | host2["vibe"] = st.session_state.get("h2_vibe", host2.get("vibe","Dramatic")) |
| |
|
| | idx = load_index(); title_u = uniquify_title(title, idx); slug = uniquify_slug(slugify(title_u), idx) |
| |
|
| | support_texts = [] |
| | for f in files or []: |
| | try: raw = f.getvalue() |
| | except Exception: raw = b"" |
| | if len(raw) > MAX_UPLOAD_BYTES: |
| | st.error(f'File "{getattr(f, "name", "(unknown)")}" exceeds 10 MB and was skipped.'); continue |
| | name_lower = (getattr(f, 'name', '') or '').lower() |
| | if f.type.endswith('text/plain') or name_lower.endswith('.txt'): |
| | try: support_texts.append(raw.decode('utf-8', errors='ignore')) |
| | except Exception: pass |
| | elif name_lower.endswith('.docx'): support_texts.append(read_docx_text(raw)) |
| |
|
| | definition = {"title": title_u.strip(),"episode": episode_val,"slug": slug,"topic": (topic or "").strip(), |
| | "exchanges": int(exchanges),"host1": host1,"host2": host2,"tone": tone, |
| | "supporting_texts": support_texts,"created_at": now_iso()} |
| | st.session_state["pending_definition"] = definition; st.session_state["generating"] = True; st.rerun() |
| |
|
| | |
| | def run_generation_flow(definition: dict): |
| | index = load_index(); slug = definition["slug"]; pdir = get_podcast_dir(slug); ensure_dirs(pdir) |
| | save_definition(slug, definition) |
| | index["podcasts"] = [p for p in index["podcasts"] if p["slug"] != slug] |
| | index["podcasts"].append({"title": definition["title"], "slug": slug, "created_at": definition["created_at"]}) |
| | save_index(index) |
| |
|
| | st.subheader("Processing"); prog = st.progress(0.0, text="Starting…") |
| | with st.status("Generating script via Chat API…", state="running", expanded=False) as box: |
| | def progress_cb(done, total, msg): |
| | frac = min(max(done / max(total, 1), 0.0), 1.0); prog.progress(frac, text=msg); box.write(msg) |
| | script = generate_script(definition); script = validate_and_fix_script(definition, script); save_script(slug, script) |
| | out_mp3, per_turn_ms, total_ms = synthesize_all_parts(slug, script, definition, progress_cb=progress_cb) |
| | box.write("Concatenation complete.") |
| | try: avg_ms = sum(per_turn_ms) / max(1, sum(1 for x in per_turn_ms if x > 0)) |
| | except Exception: avg_ms = 0.0 |
| | box.write(f"Timing: total {total_ms:.0f} ms • avg per synthesized turn {avg_ms:.0f} ms") |
| | box.update(label="Done!", state="complete", expanded=False) |
| |
|
| | left, right, right2 = st.columns([8,2,2]) |
| | with left: |
| | with open(out_mp3, "rb") as f: audio_bytes = f.read(); st.audio(audio_bytes, format="audio/mp3") |
| | |
| | st.session_state.pop("pending_definition", None); st.session_state["generating"] = False |
| |
|
| | |
| | def details_ui(slug: str): |
| | definition = load_definition(slug); script = load_script(slug) or {"turns": []} |
| | if script and "prompt" not in script: script["prompt"] = {} |
| | st.header(definition.get("title", slug)); st.caption(definition.get("created_at","")) |
| | mp3 = full_mp3_path(slug) |
| | if mp3.exists(): |
| | with open(mp3, "rb") as f: st.audio(f.read(), format="audio/mp3") |
| | with open(mp3, "rb") as f: _bytes = f.read() |
| | st.download_button("MP3", data=_bytes, file_name=f"{slug}.mp3", key=f"dlmp3_details_{slug}", help="Download MP3") |
| | else: |
| | st.warning("Audio not found. You can regenerate by editing the script and saving.") |
| | col1, col2, col3 = st.columns(3) |
| | with col1: st.button("Edit Script", on_click=lambda: open_edit(slug), use_container_width=True) |
| | with col2: |
| | if st.button("Re-generate Script", use_container_width=True): |
| | st.session_state["pending_definition"] = definition; st.session_state["generating"] = True; set_mode("create"); st.rerun() |
| | with col3: |
| | zbytes = export_zip(slug) |
| | st.download_button("Export", data=zbytes, file_name=f"{slug}.zip", key=f"dlzip_details_{slug}", help="Export Podcast (.zip)") |
| | with st.expander("Definition (read-only)"): st.json(definition, expanded=False) |
| | with st.expander("Script (read-only)"): st.json(script, expanded=False) |
| |
|
| | def edit_ui(slug: str): |
| | definition = load_definition(slug); script = load_script(slug) |
| | if not script: st.error("No script to edit."); return |
| | st.header(f"Details: {definition.get('title', slug)}") |
| | with st.expander("Definition (read-only)"): st.json(definition, expanded=False) |
| | with st.expander("Script (read-only)"): |
| | script_ro = load_script(slug) or {"turns": []} |
| | if script_ro and "prompt" not in script_ro: script_ro["prompt"] = {} |
| | st.json(script_ro, expanded=False) |
| | if "edit_expanded" not in st.session_state: st.session_state["edit_expanded"] = False |
| | if "edit_processing" not in st.session_state: st.session_state["edit_processing"] = False |
| | turns = script.get("turns", []); h1n=(definition.get("host1") or {}).get("name","Speaker 0"); h2n=(definition.get("host2") or {}).get("name","Speaker 1") |
| | with st.expander("Edit Script", expanded=st.session_state["edit_expanded"]): |
| | with st.form("edit_script_form"): |
| | for i, turn in enumerate(turns): |
| | who = int(turn.get("speaker", 0)); host_name = h1n if who == 0 else h2n |
| | st.markdown(f"**Turn {i+1} — {host_name}**") |
| | orig = turn.get("text", ""); new_txt = st.text_area("", value=orig, key=f"edit_line_{i}", height=100, label_visibility="collapsed") |
| | if new_txt != orig: turns[i]["text"] = new_txt |
| | saved = st.form_submit_button("Save Changes & Regenerate Changed Audio", type="primary", use_container_width=True) |
| | if saved: |
| | script = validate_and_fix_script(definition, {"turns": turns}); save_script(slug, script) |
| | st.session_state["edit_expanded"] = False; st.session_state["edit_processing"] = True; st.rerun() |
| | if st.session_state.get("edit_processing"): |
| | st.subheader("Processing"); prog = st.progress(0.0, text="Starting…") |
| | with st.status("Regenerating updated audio…", state="running", expanded=False) as box: |
| | def progress_cb(done, total, msg): prog.progress(min(max(done/max(total,1),0.0),1.0), text=msg); box.write(msg) |
| | out_mp3, per_turn_ms, total_ms = synthesize_all_parts(slug, script, definition, progress_cb=progress_cb) |
| | box.write("Concatenation complete."); box.update(label="Done!", state="complete", expanded=False) |
| | left, right = st.columns([8,1]) |
| | with left: |
| | with open(out_mp3, "rb") as f: audio_bytes = f.read(); st.audio(audio_bytes, format="audio/mp3") |
| | st.session_state["edit_processing"] = False; st.info("You can reopen **Edit Script** to make further changes.") |
| |
|
| | |
| | def main(): |
| | st.set_page_config(page_title="Podcast Creator", layout="wide") |
| | st.markdown('<style>audio{height:40px !important}</style>', unsafe_allow_html=True) |
| | st.title(APP_TITLE) |
| | current_mode = st.session_state.get("mode", "list") |
| | |
| | if current_mode != "list": |
| | st.button("Back to List", on_click=lambda: set_mode("list")) |
| |
|
| | DATA_DIR.mkdir(parents=True, exist_ok=True) |
| | if "mode" not in st.session_state: st.session_state["mode"] = "list" |
| | index = load_index() |
| | mode = st.session_state.get("mode", "list") |
| | if mode == "list": |
| | list_podcasts_ui(index) |
| | elif mode == "create": |
| | create_form_ui(index) |
| | elif mode == "details": |
| | slug = st.session_state.get("selected_slug") |
| | if slug: |
| | details_ui(slug) |
| | else: |
| | st.info("No podcast selected.") |
| | set_mode("list") |
| | elif mode == "edit": |
| | slug = st.session_state.get("selected_slug") |
| | if slug: |
| | edit_ui(slug) |
| | else: |
| | st.info("No podcast selected.") |
| | set_mode("list") |
| | else: |
| | set_mode("list") |
| |
|
| | if __name__ == "__main__": |
| | main() |