Podcasts / app.py
smhoerner's picture
Update app.py
2d1b0ed verified
# app_0.0.68.py
# Version: 0.0.68
# Focus: UI polish, performance (no precompute ZIP, simple pagination), stray "None" label fix.
# - Move "Create New Podcast" button beneath "Your Podcasts" header (no icon).
# - Move "Feeling Lucky" button beneath "Create New Podcast" header (no icon).
# - Remove icons from all "Back to List" buttons.
# - Fix tuple-return causing stray "None" label in Edit/Details routing.
# - Performance: avoid exporting ZIP/Base64 for every item during list render; do it only on click.
# - Performance: add lightweight pagination to "Your Podcasts" to reduce per-render costs.
# - Keep links without target="_blank".
import os
import io
import json
import time
import hashlib
import shutil
import zipfile
import random
import base64
import datetime as dt
from pathlib import Path
import xml.etree.ElementTree as ET
import urllib.request
import urllib.error
import ssl
import re
import streamlit as st
try:
from openai import OpenAI
except Exception:
OpenAI = None
APP_TITLE = "🎙️ Whizbang Podcast Creator Thingee"
DATA_DIR = Path("podcast_data")
DEFAULT_EXCHANGES = 5
MAX_UPLOAD_BYTES = 10 * 1024 * 1024 # 10 MB per file
PAGE_SIZE_DEFAULT = 25
HIDE_DELETE = True # temporary: hide Delete buttons UI
HIDE_PAGINATION = True # temporary: hide pager UI
VOICES = [
{"voice": "Alloy", "gender": "Female"},
{"voice": "Ash", "gender": "Male"},
{"voice": "Ballad", "gender": "Male"},
{"voice": "Coral", "gender": "Female"},
{"voice": "Echo", "gender": "Male"},
{"voice": "Fable", "gender": "Female"},
{"voice": "Onyx", "gender": "Male"},
{"voice": "Nova", "gender": "Female"},
{"voice": "Sage", "gender": "Female"},
{"voice": "Shimmer", "gender": "Female"},
{"voice": "Verse", "gender": "Male"},
]
VIBES = [
{"vibe": "Calm","instructions": "Voice Affect: Calm, composed, reassuring. Tone sincere and empathetic. Moderate pacing."},
{"vibe": "Mad Scientist","instructions": "Exaggerated, theatrical delivery with gleeful cackles; high energy."},
{"vibe": "Chill Surfer","instructions": "Laid-back, mellow; stretched vowels; slow tempo."},
{"vibe": "Dramatic","instructions": "Low, hushed, suspenseful; slow pacing; restrained intensity."},
{"vibe": "Professional","instructions": "Clear, authoritative, composed; neutral and informative."},
{"vibe": "Sincere","instructions": "Calm and empathetic; slower for apology, faster for solutions."},
{"vibe": "Fitness Instructor","instructions": "High-energy, upbeat; punchy sentences; motivational."},
{"vibe": "NYC Cabbie","instructions": "Gruff, quick, clipped rhythm; no-nonsense with dry humor."},
{"vibe": "Connoisseur","instructions": "Slight French accent; refined, warm; deliberate pauses."},
{"vibe": "Patient Teacher","instructions": "Warm, instructive; slow, articulate; supportive."},
{"vibe": "Smooth Jazz DJ","instructions": "Deep, velvety; melodic phrasing; relaxed vibe."},
{"vibe": "Medieval Knight","instructions": "Deep, formal; noble cadence with archaic flair."},
{"vibe": "Friendly","instructions": "Cheerful, clear, reassuring; brief pauses after key steps."},
{"vibe": "Gourmet Chef","instructions": "Exuberant Italian chef; passionate and persuasive."},
{"vibe": "Old-Timey","instructions": "Refined, theatrical; crisp enunciation; steady cadence."},
{"vibe": "Noir Detective","instructions": "Cool, detached; slow and deliberate with dramatic pauses."},
{"vibe": "Emo Teenager","instructions": "Sarcastic, disinterested; monotone with sighs."},
{"vibe": "Sports Coach","instructions": "Energetic, animated; rapid for action; positive energy."},
{"vibe": "Sympathetic","instructions": "Warm, empathetic, professional; calm delivery."},
{"vibe": "Robot","instructions": "Monotone, mechanical; efficient and precise."},
{"vibe": "Eternal Optimist","instructions": "Warm, upbeat; solution-oriented; emphasize key words."},
{"vibe": "Auctioneer","instructions": "Fast-paced, rhythmic; exciting and persuasive."},
{"vibe": "Dramatic (Alt)","instructions": "Low, hushed; serious and mysterious; slow pacing."},
{"vibe": "True Crime Buff","instructions": "Deep, hushed; short, rhythmic phrasing; ominous tone."},
]
TONE_DESCRIPTIONS = {
"Informative": "Clear, factual, and helpful; prioritizes clarity over flair.",
"Authoritative": "Confident, expert, and decisive; strong statements and tight structure.",
"Inspirational": "Uplifting, aspirational, and future-focused; evokes possibility.",
"Empathetic": "Warm, validating, and supportive; acknowledges feelings.",
"Persuasive": "Benefit-led, objection-aware, and action-oriented.",
"Educational": "Stepwise, scaffolded explanations with simple examples.",
"Conversational": "Casual and back-and-forth; shorter sentences and light callbacks.",
"Humorous": "Light wit, playful framing, and occasional punchlines.",
"Storytelling": "Narrative-driven; uses scenes, stakes, and callbacks.",
"Casual/Relaxed": "Easy-going, low-pressure pacing with informal phrasing.",
"Charming": "Personable and a bit witty; smiles you can hear.",
"Passionate": "High-energy enthusiasm and vivid phrasing.",
"Dramatic": "Tension-building, cinematic rhythm; pauses for effect.",
"Enthusiastic": "Upbeat momentum, positive framing, high engagement.",
"Optimistic": "Hopeful outlook; solution-oriented framing.",
"Mysterious": "Intriguing hints, restrained reveals, and suspense."
}
# ---------------- Helpers ----------------
def slugify(text: str) -> str:
keep = "-_.() abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
cleaned = "".join(c for c in text if c in keep).strip().replace(" ", "-")
return cleaned.lower() or f"podcast-{int(time.time())}"
def now_iso() -> str:
return dt.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
def ensure_dirs(base: Path):
base.mkdir(parents=True, exist_ok=True)
(base / "parts").mkdir(parents=True, exist_ok=True)
def id3v2_strip(b: bytes) -> bytes:
if len(b) >= 10 and b[0:3] == b"ID3":
size = ((b[6] & 0x7f) << 21) | ((b[7] & 0x7f) << 14) | ((b[8] & 0x7f) << 7) | (b[9] & 0x7f)
header_len = 10 + size
return b[header_len:]
return b
def id3v1_strip(b: bytes) -> bytes:
if len(b) >= 128 and b[-128:-125] == b"TAG":
return b[:-128]
return b
def safe_concat_mp3(mp3_paths, out_path: Path):
with open(out_path, "wb") as out:
for i, p in enumerate(mp3_paths):
data = Path(p).read_bytes()
if i == 0:
out.write(data)
else:
frames = id3v1_strip(id3v2_strip(data))
out.write(frames)
def read_docx_text(docx_bytes: bytes) -> str:
try:
with zipfile.ZipFile(io.BytesIO(docx_bytes)) as z:
with z.open("word/document.xml") as f:
xml = f.read()
root = ET.fromstring(xml)
ns = {"w": "http://schemas.openxmlformats.org/wordprocessingml/2006/main"}
texts = [t.text for t in root.findall(".//w:t", ns) if t.text]
return "\\n".join(texts).strip()
except Exception:
return ""
def load_index():
DATA_DIR.mkdir(exist_ok=True, parents=True)
idx = DATA_DIR / "index.json"
if idx.exists():
try:
return json.loads(idx.read_text())
except Exception:
pass
return {"podcasts": []}
def save_index(index):
(DATA_DIR / "index.json").write_text(json.dumps(index, indent=2))
def uniquify_title(title: str, index: dict) -> str:
base = title.strip()
import re as _re
m = _re.search(r"\\s*\\((\\d+)\\)\\s*$", base)
if m:
base = base[:m.start()].rstrip()
existing = {(p.get("title","")).strip().lower() for p in index.get("podcasts", [])}
if base.lower() not in existing:
return base
n = 2
while f"{base} ({n})".lower() in existing:
n += 1
return f"{base} ({n})"
def uniquify_slug(slug: str, index: dict) -> str:
base = slug.strip().lower()
existing = {(p.get("slug","")).strip().lower() for p in index.get("podcasts", [])}
if base not in existing:
return base
n = 2
while f"{base}-{n}" in existing:
n += 1
return f"{base}-{n}"
def get_podcast_dir(slug: str) -> Path:
return DATA_DIR / slug
def load_definition(slug: str):
f = get_podcast_dir(slug) / "definition.json"
if f.exists():
return json.loads(f.read_text())
return None
def save_definition(slug: str, definition: dict):
pdir = get_podcast_dir(slug)
ensure_dirs(pdir)
(pdir / "definition.json").write_text(json.dumps(definition, indent=2))
def load_script(slug: str):
f = get_podcast_dir(slug) / "script.json"
if f.exists():
return json.loads(f.read_text())
return None
def save_script(slug: str, script: dict):
pdir = get_podcast_dir(slug)
ensure_dirs(pdir)
(pdir / "script.json").write_text(json.dumps(script, indent=2))
def part_path(slug: str, i: int) -> Path:
return get_podcast_dir(slug) / "parts" / f"{i:03d}.mp3"
def full_mp3_path(slug: str) -> Path:
return get_podcast_dir(slug) / f"{slug}.mp3"
def compute_text_hash(text: str) -> str:
return hashlib.sha256(text.encode("utf-8")).hexdigest()[:16]
# Label stripping for TTS
_LABEL_RE = re.compile(r'^\s*(?:\[?\s*([A-Za-z0-9 ._\-]{1,40})\s*\]?\s*[:\-—]\s*)(.*)$')
def strip_speaker_label(text: str) -> str:
m = _LABEL_RE.match(text or "")
return (m.group(2).strip() if m else (text or "").strip())
# -------- Voice helpers (fix selectbox issues) --------
def voice_label_for(voice: str) -> str:
v = (voice or "").strip()
for item in VOICES:
if item["voice"].lower() == v.lower():
return f"{item['voice']} ({item['gender']})"
return f"{v or 'Alloy'} (Female)"
def ensure_select_value(key: str, options: list, preferred_label: str, default_index: int):
if preferred_label in options:
st.session_state[key] = preferred_label
else:
try:
token = (preferred_label or "").split(" ")[0]
except Exception:
token = ""
idx = next((i for i, opt in enumerate(options) if opt.split(" ")[0].lower() == token.lower()), default_index)
st.session_state[key] = options[idx]
# ---------------- OpenAI HTTP helpers ----------------
_OPENAI_API_BASE = os.environ.get("OPENAI_API_BASE", "https://api.openai.com/v1")
_SSL_CTX = ssl.create_default_context()
def _bearer_headers(extra=None):
key = os.environ.get("OPENAI_API_KEY", "").strip()
if not key:
raise RuntimeError("OPENAI_API_KEY environment variable is not set.")
base = {"Authorization": f"Bearer {key}"}
if extra:
base.update(extra)
return base
def http_post_json(path: str, payload: dict, expect_json=True, as_bytes=False):
url = f"{_OPENAI_API_BASE.rstrip('/')}/{path.lstrip('/')}"
data = json.dumps(payload).encode("utf-8")
req = urllib.request.Request(url, data=data, method="POST")
hdrs = _bearer_headers({"Content-Type": "application/json"})
for k,v in hdrs.items():
req.add_header(k, v)
with urllib.request.urlopen(req, context=_SSL_CTX, timeout=120) as resp:
raw = resp.read()
if as_bytes:
return raw
if expect_json:
return json.loads(raw.decode("utf-8"))
return raw
def select_voice_name(v: str) -> str:
return (v or "alloy").strip().lower()
def vibe_instructions(vibe_name: str) -> str:
for v in VIBES:
if v["vibe"] == vibe_name:
return v["instructions"].strip()
return ""
# ---------------- Script generation (2-pass) ----------------
TONE_DESCRIPTIONS = TONE_DESCRIPTIONS # already defined above
def _build_prompts(defn: dict):
title = defn.get("title","")
episode = defn.get("episode","")
topic = defn.get("topic","")
tone = defn.get("tone","")
h1 = defn.get("host1",{}); h2 = defn.get("host2",{})
vibe1 = h1.get("vibe",""); vibe2 = h2.get("vibe","")
vibe1_instr = vibe_instructions(vibe1); vibe2_instr = vibe_instructions(vibe2)
tone_desc = TONE_DESCRIPTIONS.get(tone, "")
system_prompt = (
"You are a seasoned podcast scriptwriter. Return ONLY JSON.\\n"
"JSON shape MUST be: {\\\"turns\\\":[{\\\"speaker\\\":0|1, \\\"text\\\":\\\"...\\\"}, ...]}.\\n"
"Requirements:\\n"
"- EXACTLY N exchanges (2*N turns), strictly alternating speakers 0 then 1, then 0 then 1, ...\\n"
"- Keep each turn ~1–3 sentences; compress ideas if needed, DO NOT reduce turn count.\\n"
"- Dialog may optionally begin with a speaker label (e.g., \\\"Alex:\\\" or \\\"Host 0:\\\"), which will be preserved.\\n"
"- The 'speaker' numeric field indicates who is speaking."
)
support = "\\n\\n".join(defn.get("supporting_texts") or [])
user_prompt = f"""Podcast: {title}
Episode: {episode}
Topic (paraphrase allowed for natural delivery): {topic}
Script Tone: {tone}{tone_desc}
Host 0
- Name: {h1.get('name','Host 0')}
- Voice (TTS target): {h1.get('voice','Alloy')}
- Vibe: {vibe1}
- Vibe instructions: {vibe1_instr[:800]}
Host 1
- Name: {h2.get('name','Host 1')}
- Voice (TTS target): {h2.get('voice','Echo')}
- Vibe: {vibe2}
- Vibe instructions: {vibe2_instr[:800]}
Intro/Outro directives:
- FIRST LINE (speaker 0) should INTRODUCE the podcast title ("{title}"), the episode ("{episode}"), and the subject ("{topic}") — feel free to paraphrase the subject for naturalness — and greet Host 1 by name.
- Host 1 (their first line) acknowledges Host 0 by name.
- FINAL LINES provide a succinct OUTRO that sums things up.
N = {int(defn.get("exchanges", DEFAULT_EXCHANGES))}
If helpful, incorporate relevant information from the supporting materials (optional). Keep tone aligned with the Script Tone.
Supporting materials (optional):
{support}
OUTPUT SCHEMA (return ONLY JSON):
{{"turns":[{{"speaker":0,"text":"(You may start with an optional speaker label like 'Alex: ' before the text)"}}, ...]}}"""
return system_prompt, user_prompt
def _chat_json(system_prompt: str, user_prompt: str):
if OpenAI is not None:
try:
client = OpenAI(api_key=os.environ.get("OPENAI_API_KEY","").strip())
completion = client.chat.completions.create(
model="gpt-4o-mini",
response_format={"type": "json_object"},
messages=[{"role": "system", "content": system_prompt},{"role": "user", "content": user_prompt}],
temperature=0.6,
)
return json.loads(completion.choices[0].message.content)
except Exception:
pass
payload = {"model":"gpt-4o-mini","response_format":{"type":"json_object"},"messages":[{"role":"system","content":system_prompt},{"role":"user","content":user_prompt}], "temperature":0.6}
resp = http_post_json("/chat/completions", payload, expect_json=True)
return json.loads(resp["choices"][0]["message"]["content"])
def _continue_json(defn: dict, existing_turns: list, need_total: int):
title = defn.get("title",""); episode = defn.get("episode",""); topic = defn.get("topic","")
tone = defn.get("tone",""); tone_desc = TONE_DESCRIPTIONS.get(tone, "")
n_have = len(existing_turns); next_speaker = 0 if n_have % 2 == 0 else 1
cont_system = ("Return ONLY JSON {\"turns\":[...]}. Continue until total turns == NeedTotal. "
"Strict alternation; ~1–3 sentences each; keep Script Tone consistent. Labels allowed.")
context_tail = json.dumps(existing_turns[-6:], ensure_ascii=False)
cont_user = f"""Podcast: {title} — Episode: {episode} — Topic: {topic}
Script Tone: {tone}{tone_desc}
We currently have {n_have} turns. NeedTotal = {need_total}. Next speaker should be {next_speaker}.
Context (last turns):
{context_tail}
Return ONLY the additional turns needed to reach NeedTotal.
"""
try:
data = _chat_json(cont_system, cont_user)
return data.get("turns", [])
except Exception:
return []
def generate_script(definition: dict) -> dict:
exchanges = int(definition.get("exchanges", DEFAULT_EXCHANGES))
need_total = exchanges * 2
system_prompt, user_prompt = _build_prompts(definition)
try:
data = _chat_json(system_prompt, user_prompt)
except Exception:
data = {"turns": []}
turns = (data.get("turns") or [])
if len(turns) < need_total:
extra = _continue_json(definition, turns, need_total)
if extra:
turns += extra
while len(turns) < need_total:
i = len(turns); who = 0 if i % 2 == 0 else 1
host = definition.get("host1") if who == 0 else definition.get("host2")
name = (host or {}).get("name", f"Host {who}")
topic = definition.get("topic",""); tone = definition.get("tone","")
turns.append({"speaker": who, "text": f"{name}: Quick takeaway on {topic.lower()}—in a {tone.lower()} style."})
data["turns"] = turns[:need_total]
data["prompt"] = {"system": system_prompt, "user": user_prompt}
return data
def validate_and_fix_script(definition: dict, script: dict) -> dict:
exchanges = int(definition.get("exchanges", DEFAULT_EXCHANGES))
total_needed = exchanges * 2
fixed = []
for t in script.get("turns") or []:
who = 0 if int(t.get("speaker", 0)) == 0 else 1
text = str(t.get("text", "")).strip() or "…"
fixed.append({"speaker": who, "text": text})
out = []
for i in range(total_needed):
expected = 0 if i % 2 == 0 else 1
if i < len(fixed):
out.append({"speaker": expected, "text": fixed[i]["text"]})
else:
host = definition.get("host1") if expected == 0 else definition.get("host2")
name = (host or {}).get("name", f"Host {expected}")
topic = definition.get("topic",""); tone = definition.get("tone","")
out.append({"speaker": expected, "text": f"{name}: Another point on {topic.lower()}—kept {tone.lower()}."})
script["turns"] = out[:total_needed]
return script
# ---------------- TTS ----------------
def tts_synthesize(text: str, voice: str, instructions: str) -> bytes:
voice_name = select_voice_name(voice)
if OpenAI is not None:
try:
client = OpenAI(api_key=os.environ.get("OPENAI_API_KEY","").strip())
with client.audio.speech.with_streaming_response.create(
model="gpt-4o-mini-tts", voice=voice_name, input=text, format="mp3",
instructions=instructions or None
) as response:
return response.read()
except Exception:
pass
payload = {"model": "gpt-4o-mini-tts", "voice": voice_name, "input": text, "format": "mp3"}
if instructions: payload["instructions"] = instructions
return http_post_json("/audio/speech", payload, expect_json=False, as_bytes=True)
def synthesize_all_parts(slug: str, script: dict, definition: dict, progress_cb=None):
pdir = get_podcast_dir(slug); ensure_dirs(pdir)
turns = script.get("turns", []); total = len(turns)
cache_map = {}
cache_file = pdir / "part_hashes.json"
if cache_file.exists():
try: cache_map = json.loads(cache_file.read_text())
except Exception: cache_map = {}
h1 = definition.get("host1",{}); h2 = definition.get("host2",{})
v1 = vibe_instructions(h1.get("vibe","")); v2 = vibe_instructions(h2.get("vibe",""))
tone = definition.get("tone",""); tone_desc = TONE_DESCRIPTIONS.get(tone, "")
extra_tone = f" | Global Script Tone: {tone}{tone_desc}" if tone_desc else f" | Global Script Tone: {tone}"
voice1 = h1.get("voice","Alloy"); voice2 = h2.get("voice","Alloy")
per_turn_ms = []; t0 = time.time()
for i, turn in enumerate(turns):
who = int(turn.get("speaker", 0))
raw_text = (turn.get("text") or "").strip()
tts_text = strip_speaker_label(raw_text)
voice = voice1 if who == 0 else voice2
instr = (v1 if who == 0 else v2) + extra_tone
h = compute_text_hash(f"{who}:{voice}:{instr}:{tts_text}")
partfile = part_path(slug, i)
if cache_map.get(f"{i}") == h and partfile.exists():
if progress_cb: progress_cb(i+1, total, f"Skipped (unchanged) part {i+1}/{total}")
per_turn_ms.append(0.0); continue
if progress_cb: progress_cb(i+1, total, f"Synthesizing part {i+1}/{total} (voice: {voice})…")
s = time.time(); audio = tts_synthesize(tts_text, voice, instr); e = time.time()
partfile.write_bytes(audio); cache_map[f"{i}"] = h
per_turn_ms.append((e - s) * 1000.0)
cache_file.write_text(json.dumps(cache_map, indent=2))
parts = [part_path(slug, i) for i in range(total)]
out = full_mp3_path(slug)
if progress_cb: progress_cb(total, total, "Concatenating MP3…")
safe_concat_mp3(parts, out)
total_ms = (time.time() - t0) * 1000.0
return out, per_turn_ms, total_ms
# ---------------- UI helpers ----------------
def on_title_change(slug: str, key: str):
new_title = st.session_state.get(key, "").strip()
if new_title:
do_rename(slug, new_title)
st.toast('Title updated')
def export_zip(slug: str) -> bytes:
pdir = get_podcast_dir(slug)
mem = io.BytesIO()
with zipfile.ZipFile(mem, "w", compression=zipfile.ZIP_DEFLATED) as z:
for root, _, files in os.walk(pdir):
for f in files:
path = Path(root) / f
arc = str(path.relative_to(pdir.parent))
z.write(path, arcname=arc)
mem.seek(0)
return mem.read()
def host_picker(label_prefix: str, default_name="", key_prefix="h"):
cols = st.columns(3)
name_key = f"{key_prefix}_name"
name = cols[0].text_input(f"{label_prefix} Name", value=st.session_state.get(name_key, default_name), key=name_key)
voice_options = [f"{v['voice']} ({v['gender']})" for v in VOICES]
default_voice = "Alloy" if key_prefix == "h1" else "Echo"
default_voice_index = next((i for i, opt in enumerate(voice_options) if opt.split(" ")[0].lower()==default_voice.lower()), 0)
voice_key = f"{key_prefix}_voice"
desired_label = st.session_state.get(voice_key, voice_options[default_voice_index])
ensure_select_value(voice_key, voice_options, desired_label, default_voice_index)
voice_choice = cols[1].selectbox(f"{label_prefix} Voice", options=voice_options, key=voice_key)
voice = voice_choice.split(" ")[0]
vibe_names = [v["vibe"] for v in VIBES]
default_vibe = "Calm" if key_prefix == "h1" else "Dramatic"
default_vibe_index = vibe_names.index(default_vibe) if default_vibe in vibe_names else 0
vibe_key = f"{key_prefix}_vibe"
if st.session_state.get(vibe_key) not in vibe_names:
st.session_state[vibe_key] = vibe_names[default_vibe_index]
vibe = cols[2].selectbox(f"{label_prefix} Vibe", options=vibe_names, key=vibe_key)
return {"name": name, "voice": voice, "vibe": vibe}
def open_details(slug: str):
st.session_state["mode"] = "details"; st.session_state["selected_slug"] = slug
def open_edit(slug: str):
st.session_state['mode'] = 'edit'; st.session_state['selected_slug'] = slug
def set_mode(mode: str):
st.session_state["mode"] = mode
def do_rename(slug: str, new_title: str):
index = load_index()
for p in index["podcasts"]:
if p["slug"] == slug:
p["title"] = new_title.strip() or p["title"]
save_index(index); break
def do_delete(slug: str):
index = load_index()
index["podcasts"] = [p for p in index["podcasts"] if p["slug"] != slug]
save_index(index)
pdir = get_podcast_dir(slug)
if pdir.exists(): shutil.rmtree(pdir, ignore_errors=True)
def clone_podcast(slug: str):
definition = load_definition(slug)
if not definition:
st.toast("Unable to clone: missing definition", icon="⚠️"); return
idx = load_index()
new_title = uniquify_title(definition.get("title","Untitled"), idx)
ep = (definition.get("episode") or "").strip()
m = re.search(r'(\\d+)\\s*$', ep)
new_episode = re.sub(r'(\\d+)\\s*$', lambda mm: str(int(mm.group(1))+1), ep) if m else "Episode 1"
st.session_state["new_title"] = new_title
st.session_state["new_episode"] = new_episode
st.session_state["new_topic"] = definition.get("topic","")
st.session_state["new_exchanges"] = int(definition.get("exchanges", DEFAULT_EXCHANGES))
st.session_state["new_tone"] = definition.get("tone","Informative")
h1 = definition.get("host1",{}); h2 = definition.get("host2",{})
st.session_state["h1_name"] = h1.get("name","Host One")
st.session_state["h2_name"] = h2.get("name","Host Two")
st.session_state["h1_voice"] = voice_label_for(h1.get("voice","Alloy"))
st.session_state["h2_voice"] = voice_label_for(h2.get("voice","Echo"))
st.session_state["h1_vibe"] = h1.get("vibe","Calm")
st.session_state["h2_vibe"] = h2.get("vibe","Dramatic")
set_mode("create")
# ---------------- UI components ----------------
def list_podcasts_ui(index):
st.header("Your Podcasts")
# Create button directly under header (no icon)
st.button("Create New Podcast", key="create_new_top", on_click=lambda: set_mode("create"))
# Handle delete via query params (from JS confirm link)
qp = st.experimental_get_query_params()
act = qp.get("action", [])
slug_vals = qp.get("slug", [])
act_val = (act[0] if isinstance(act, list) and act else act) or ""
slug_val = (slug_vals[0] if isinstance(slug_vals, list) and slug_vals else slug_vals) or ""
if act_val == "x_delete_disabled" and slug_val:
do_delete(slug_val)
try:
st.experimental_set_query_params() # clear all params
except Exception:
pass
st.success("Podcast deleted.")
st.rerun()
# Pagination state
pods = list(index.get("podcasts", []))
pods.sort(key=lambda p: p.get("created_at",""), reverse=True)
if "list_page" not in st.session_state:
st.session_state["list_page"] = 0
if "list_page_size" not in st.session_state:
st.session_state["list_page_size"] = PAGE_SIZE_DEFAULT
if not pods:
st.info("No podcasts yet. Click **Create New Podcast** to get started."); return
total = len(pods)
page_size = st.session_state["list_page_size"]
max_page = (total - 1) // page_size
start = st.session_state["list_page"] * page_size
end = min(start + page_size, total)
view = pods[start:end]
# Lightweight actions state
if "dl_zip_slug" not in st.session_state:
st.session_state["dl_zip_slug"] = None
if "confirm_delete_slug" not in st.session_state:
st.session_state["confirm_delete_slug"] = None
for p in view:
slug = p["slug"]; mp3 = full_mp3_path(slug)
cols = st.columns([3,6,1,1,1,1,1], gap="small")
with cols[0]:
title_key = f"title_{slug}"
if title_key not in st.session_state: st.session_state[title_key] = p["title"]
new_title = st.text_input("", key=title_key, label_visibility="collapsed")
if new_title != p["title"] and new_title.strip():
p["title"] = new_title.strip(); save_index(index)
with cols[1]:
if mp3.exists():
with open(mp3, "rb") as f: st.audio(f.read(), format="audio/mp3")
else: st.caption("No audio yet")
with cols[2]:
if mp3.exists():
with open(mp3, "rb") as _f:
st.download_button("MP3", data=_f.read(), file_name=f"{slug}.mp3", key=f"dlmp3_{slug}", help="Download MP3")
else:
st.caption("No audio yet")
with cols[3]:
zbytes = export_zip(slug)
st.download_button("Export", data=zbytes, file_name=f"{slug}.zip", key=f"save_zip_{slug}", help="Export Podcast (.zip)")
with cols[4]:
st.button("Edit", key=f"edit_{slug}", help="Edit Script", on_click=lambda s=slug: open_edit(s))
with cols[5]:
st.button("Clone", key=f"clone_{slug}", help="Clone", on_click=lambda s=slug: clone_podcast(s))
with cols[6]:
if not HIDE_DELETE:
st.markdown(f'<a href="?action=delete&slug={slug}" onclick="return confirm(\'Delete this podcast? This cannot be undone.\')" class="delbtn">Delete</a>', unsafe_allow_html=True)
# Delete confirmation UI (global, appears below list)
if st.session_state.get("confirm_delete_slug"):
slug_c = st.session_state.get("confirm_delete_slug")
st.warning(f"Delete podcast '{slug_c}'? This cannot be undone.")
c1, c2 = st.columns([1,1])
with c1:
if st.button("Confirm Delete", key="confirm_delete_now"):
do_delete(slug_c)
st.session_state["confirm_delete_slug"] = None
st.success("Podcast deleted.")
st.rerun()
with c2:
if st.button("Cancel", key="cancel_delete"):
st.session_state["confirm_delete_slug"] = None
st.rerun()
# Bottom pagination controls with icons
if not HIDE_PAGINATION:
colp1, colp2, colp3 = st.columns([2,6,2])
with colp1:
if st.button("◀ Prev", disabled=st.session_state["list_page"] <= 0):
st.session_state["list_page"] = max(0, st.session_state["list_page"] - 1); st.rerun()
with colp2:
st.caption(f"Showing {page_size} per page — Page {st.session_state['list_page']+1} of {max_page+1} (Total {total})")
with colp3:
if st.button("Next ▶", disabled=st.session_state["list_page"] >= max_page):
st.session_state["list_page"] = min(max_page, st.session_state["list_page"] + 1); st.rerun()
st.markdown('<style>.stButton>button {padding:4px 6px; margin:0 2px; border:1px solid rgba(255,255,255,0.25);background:transparent; border-radius:8px; font-size:16px;} a.delbtn{display:inline-block;padding:4px 6px;margin:0 2px;border:1px solid #8a0000;border-radius:8px;text-decoration:none;color:#fff;background:#c00000;} a.delbtn:hover{background:#c00000;} .delwrap button{padding:4px 6px;margin:0 2px;border:1px solid #8a0000;border-radius:8px;background:#c00000;color:#fff;} .delwrap button:hover{background:#c00000;}</style>', unsafe_allow_html=True)
def create_form_ui(index):
if st.session_state.get("generating") and st.session_state.get("pending_definition"):
run_generation_flow(st.session_state["pending_definition"]); return
st.header("Create New Podcast")
# Feeling Lucky button directly beneath header (no icon)
lucky = st.button("Feeling Lucky", key="lucky_btn")
if lucky:
sample_titles = ["Hot Takes Live","Acme Insider","Two Brains, One Mic","Curious Minds","Deep Dive Daily","Chatter Box",
"Morning Buzz","Brainstorm Café","Weekend Roundtable","Idea Exchange","Think Tank Talks",
"Pulse Check","Off the Record","The Daily Spark","Future Shock","Culture Chat","Signal & Noise",
"The Conversation Lab","Trending Now","Mind Meld","Coffee & Code","Beyond the Hype","Lightning Round",
"The Big Why","The Briefing Room","Micro Chats"]
sample_topics = ["The future of AI in everyday life","How remote work changed cities","Five technologies shaping the next decade",
"Lessons from famous product flops","What makes a great brand story","AI in healthcare","Space tourism prospects",
"The psychology of habits","Virtual reality in classrooms","Cybersecurity for everyone","Quantum computing explained",
"Designing ethical AI","Smart cities and sensors","Robotics at home"]
sample_names = ["Alex","Sam","Jordan","Taylor","Casey","Riley","Morgan","Avery","Jamie","Quinn","Chris","Pat","Devon","Skyler","Cameron","Drew","Kendall","Peyton","Blake","Hayden","Logan","Shawn","Noel","Emerson","Harper","Reese","Rowan","Sage"]
st.session_state["new_title"] = random.choice(sample_titles)
st.session_state["new_topic"] = random.choice(sample_topics)
n1 = random.choice(sample_names); n2 = random.choice([n for n in sample_names if n != n1])
st.session_state["h1_name"] = n1; st.session_state["h2_name"] = n2
v1 = random.choice(VOICES); v2 = random.choice([v for v in VOICES if v["voice"] != v1["voice"]])
st.session_state["h1_voice"] = f"{v1['voice']} ({v1['gender']})"; st.session_state["h2_voice"] = f"{v2['voice']} ({v2['gender']})"
vibe_names = [v["vibe"] for v in VIBES]; vb1 = random.choice(vibe_names); vb2 = random.choice([v for v in vibe_names if v != vb1])
st.session_state["h1_vibe"] = vb1; st.session_state["h2_vibe"] = vb2
st.session_state["new_tone"] = random.choice(list(TONE_DESCRIPTIONS.keys())); st.session_state["new_episode"]="Episode 1"
st.rerun()
with st.form("create_form"):
col_title, col_episode = st.columns(2)
with col_title:
title = st.text_input("Title", key="new_title")
with col_episode:
episode = st.text_input("Episode", value=st.session_state.get("new_episode", "Episode 1"), key="new_episode")
col_topic, col_files = st.columns(2)
with col_topic:
topic = st.text_area("Topic", key="new_topic", height=98, placeholder="What should they talk about?")
with col_files:
files = st.file_uploader("Supporting file(s)", accept_multiple_files=True, type=["txt","docx"], key="new_support")
col_exch, col_tone = st.columns(2)
with col_exch:
exchanges = st.number_input("Exchanges", min_value=1, max_value=20, value=st.session_state.get("new_exchanges", DEFAULT_EXCHANGES), step=1, key="new_exchanges")
with col_tone:
tone = st.selectbox("Tone", options=list(TONE_DESCRIPTIONS.keys()),
index=list(TONE_DESCRIPTIONS.keys()).index(st.session_state.get("new_tone","Informative")) if st.session_state.get("new_tone") in TONE_DESCRIPTIONS else 0,
key="new_tone")
st.markdown("**Hosts**")
host1 = host_picker("Host 1", default_name=st.session_state.get("h1_name","Host One"), key_prefix="h1")
host2 = host_picker("Host 2", default_name=st.session_state.get("h2_name","Host Two"), key_prefix="h2")
submitted = st.form_submit_button("Generate Podcast", use_container_width=True)
if submitted:
if not (title or "").strip(): st.error("Please provide a Title."); return
episode_val = (st.session_state.get("new_episode") or "").strip()
host1["name"] = st.session_state.get("h1_name", host1.get("name","Host One"))
host2["name"] = st.session_state.get("h2_name", host2.get("name","Host Two"))
host1["voice"] = (st.session_state.get("h1_voice") or "Alloy").split(" ")[0]
host2["voice"] = (st.session_state.get("h2_voice") or "Echo").split(" ")[0]
host1["vibe"] = st.session_state.get("h1_vibe", host1.get("vibe","Calm"))
host2["vibe"] = st.session_state.get("h2_vibe", host2.get("vibe","Dramatic"))
idx = load_index(); title_u = uniquify_title(title, idx); slug = uniquify_slug(slugify(title_u), idx)
support_texts = []
for f in files or []:
try: raw = f.getvalue()
except Exception: raw = b""
if len(raw) > MAX_UPLOAD_BYTES:
st.error(f'File "{getattr(f, "name", "(unknown)")}" exceeds 10 MB and was skipped.'); continue
name_lower = (getattr(f, 'name', '') or '').lower()
if f.type.endswith('text/plain') or name_lower.endswith('.txt'):
try: support_texts.append(raw.decode('utf-8', errors='ignore'))
except Exception: pass
elif name_lower.endswith('.docx'): support_texts.append(read_docx_text(raw))
definition = {"title": title_u.strip(),"episode": episode_val,"slug": slug,"topic": (topic or "").strip(),
"exchanges": int(exchanges),"host1": host1,"host2": host2,"tone": tone,
"supporting_texts": support_texts,"created_at": now_iso()}
st.session_state["pending_definition"] = definition; st.session_state["generating"] = True; st.rerun()
# -------- Generation flow --------
def run_generation_flow(definition: dict):
index = load_index(); slug = definition["slug"]; pdir = get_podcast_dir(slug); ensure_dirs(pdir)
save_definition(slug, definition)
index["podcasts"] = [p for p in index["podcasts"] if p["slug"] != slug]
index["podcasts"].append({"title": definition["title"], "slug": slug, "created_at": definition["created_at"]})
save_index(index)
st.subheader("Processing"); prog = st.progress(0.0, text="Starting…")
with st.status("Generating script via Chat API…", state="running", expanded=False) as box:
def progress_cb(done, total, msg):
frac = min(max(done / max(total, 1), 0.0), 1.0); prog.progress(frac, text=msg); box.write(msg)
script = generate_script(definition); script = validate_and_fix_script(definition, script); save_script(slug, script)
out_mp3, per_turn_ms, total_ms = synthesize_all_parts(slug, script, definition, progress_cb=progress_cb)
box.write("Concatenation complete.")
try: avg_ms = sum(per_turn_ms) / max(1, sum(1 for x in per_turn_ms if x > 0))
except Exception: avg_ms = 0.0
box.write(f"Timing: total {total_ms:.0f} ms • avg per synthesized turn {avg_ms:.0f} ms")
box.update(label="Done!", state="complete", expanded=False)
left, right, right2 = st.columns([8,2,2])
with left:
with open(out_mp3, "rb") as f: audio_bytes = f.read(); st.audio(audio_bytes, format="audio/mp3")
st.session_state.pop("pending_definition", None); st.session_state["generating"] = False
# -------- Details / Edit --------
def details_ui(slug: str):
definition = load_definition(slug); script = load_script(slug) or {"turns": []}
if script and "prompt" not in script: script["prompt"] = {}
st.header(definition.get("title", slug)); st.caption(definition.get("created_at",""))
mp3 = full_mp3_path(slug)
if mp3.exists():
with open(mp3, "rb") as f: st.audio(f.read(), format="audio/mp3")
with open(mp3, "rb") as f: _bytes = f.read()
st.download_button("MP3", data=_bytes, file_name=f"{slug}.mp3", key=f"dlmp3_details_{slug}", help="Download MP3")
else:
st.warning("Audio not found. You can regenerate by editing the script and saving.")
col1, col2, col3 = st.columns(3)
with col1: st.button("Edit Script", on_click=lambda: open_edit(slug), use_container_width=True)
with col2:
if st.button("Re-generate Script", use_container_width=True):
st.session_state["pending_definition"] = definition; st.session_state["generating"] = True; set_mode("create"); st.rerun()
with col3:
zbytes = export_zip(slug)
st.download_button("Export", data=zbytes, file_name=f"{slug}.zip", key=f"dlzip_details_{slug}", help="Export Podcast (.zip)")
with st.expander("Definition (read-only)"): st.json(definition, expanded=False)
with st.expander("Script (read-only)"): st.json(script, expanded=False)
def edit_ui(slug: str):
definition = load_definition(slug); script = load_script(slug)
if not script: st.error("No script to edit."); return
st.header(f"Details: {definition.get('title', slug)}")
with st.expander("Definition (read-only)"): st.json(definition, expanded=False)
with st.expander("Script (read-only)"):
script_ro = load_script(slug) or {"turns": []}
if script_ro and "prompt" not in script_ro: script_ro["prompt"] = {}
st.json(script_ro, expanded=False)
if "edit_expanded" not in st.session_state: st.session_state["edit_expanded"] = False
if "edit_processing" not in st.session_state: st.session_state["edit_processing"] = False
turns = script.get("turns", []); h1n=(definition.get("host1") or {}).get("name","Speaker 0"); h2n=(definition.get("host2") or {}).get("name","Speaker 1")
with st.expander("Edit Script", expanded=st.session_state["edit_expanded"]):
with st.form("edit_script_form"):
for i, turn in enumerate(turns):
who = int(turn.get("speaker", 0)); host_name = h1n if who == 0 else h2n
st.markdown(f"**Turn {i+1}{host_name}**")
orig = turn.get("text", ""); new_txt = st.text_area("", value=orig, key=f"edit_line_{i}", height=100, label_visibility="collapsed")
if new_txt != orig: turns[i]["text"] = new_txt
saved = st.form_submit_button("Save Changes & Regenerate Changed Audio", type="primary", use_container_width=True)
if saved:
script = validate_and_fix_script(definition, {"turns": turns}); save_script(slug, script)
st.session_state["edit_expanded"] = False; st.session_state["edit_processing"] = True; st.rerun()
if st.session_state.get("edit_processing"):
st.subheader("Processing"); prog = st.progress(0.0, text="Starting…")
with st.status("Regenerating updated audio…", state="running", expanded=False) as box:
def progress_cb(done, total, msg): prog.progress(min(max(done/max(total,1),0.0),1.0), text=msg); box.write(msg)
out_mp3, per_turn_ms, total_ms = synthesize_all_parts(slug, script, definition, progress_cb=progress_cb)
box.write("Concatenation complete."); box.update(label="Done!", state="complete", expanded=False)
left, right = st.columns([8,1])
with left:
with open(out_mp3, "rb") as f: audio_bytes = f.read(); st.audio(audio_bytes, format="audio/mp3")
st.session_state["edit_processing"] = False; st.info("You can reopen **Edit Script** to make further changes.")
# -------- App entry --------
def main():
st.set_page_config(page_title="Podcast Creator", layout="wide")
st.markdown('<style>audio{height:40px !important}</style>', unsafe_allow_html=True)
st.title(APP_TITLE)
current_mode = st.session_state.get("mode", "list")
# Back to List (no icon)
if current_mode != "list":
st.button("Back to List", on_click=lambda: set_mode("list"))
DATA_DIR.mkdir(parents=True, exist_ok=True)
if "mode" not in st.session_state: st.session_state["mode"] = "list"
index = load_index()
mode = st.session_state.get("mode", "list")
if mode == "list":
list_podcasts_ui(index)
elif mode == "create":
create_form_ui(index)
elif mode == "details":
slug = st.session_state.get("selected_slug")
if slug:
details_ui(slug)
else:
st.info("No podcast selected.")
set_mode("list")
elif mode == "edit":
slug = st.session_state.get("selected_slug")
if slug:
edit_ui(slug)
else:
st.info("No podcast selected.")
set_mode("list")
else:
set_mode("list")
if __name__ == "__main__":
main()