# app_0.0.68.py # Version: 0.0.68 # Focus: UI polish, performance (no precompute ZIP, simple pagination), stray "None" label fix. # - Move "Create New Podcast" button beneath "Your Podcasts" header (no icon). # - Move "Feeling Lucky" button beneath "Create New Podcast" header (no icon). # - Remove icons from all "Back to List" buttons. # - Fix tuple-return causing stray "None" label in Edit/Details routing. # - Performance: avoid exporting ZIP/Base64 for every item during list render; do it only on click. # - Performance: add lightweight pagination to "Your Podcasts" to reduce per-render costs. # - Keep links without target="_blank". import os import io import json import time import hashlib import shutil import zipfile import random import base64 import datetime as dt from pathlib import Path import xml.etree.ElementTree as ET import urllib.request import urllib.error import ssl import re import streamlit as st try: from openai import OpenAI except Exception: OpenAI = None APP_TITLE = "🎙️ Whizbang Podcast Creator Thingee" DATA_DIR = Path("podcast_data") DEFAULT_EXCHANGES = 5 MAX_UPLOAD_BYTES = 10 * 1024 * 1024 # 10 MB per file PAGE_SIZE_DEFAULT = 25 HIDE_DELETE = True # temporary: hide Delete buttons UI HIDE_PAGINATION = True # temporary: hide pager UI VOICES = [ {"voice": "Alloy", "gender": "Female"}, {"voice": "Ash", "gender": "Male"}, {"voice": "Ballad", "gender": "Male"}, {"voice": "Coral", "gender": "Female"}, {"voice": "Echo", "gender": "Male"}, {"voice": "Fable", "gender": "Female"}, {"voice": "Onyx", "gender": "Male"}, {"voice": "Nova", "gender": "Female"}, {"voice": "Sage", "gender": "Female"}, {"voice": "Shimmer", "gender": "Female"}, {"voice": "Verse", "gender": "Male"}, ] VIBES = [ {"vibe": "Calm","instructions": "Voice Affect: Calm, composed, reassuring. Tone sincere and empathetic. Moderate pacing."}, {"vibe": "Mad Scientist","instructions": "Exaggerated, theatrical delivery with gleeful cackles; high energy."}, {"vibe": "Chill Surfer","instructions": "Laid-back, mellow; stretched vowels; slow tempo."}, {"vibe": "Dramatic","instructions": "Low, hushed, suspenseful; slow pacing; restrained intensity."}, {"vibe": "Professional","instructions": "Clear, authoritative, composed; neutral and informative."}, {"vibe": "Sincere","instructions": "Calm and empathetic; slower for apology, faster for solutions."}, {"vibe": "Fitness Instructor","instructions": "High-energy, upbeat; punchy sentences; motivational."}, {"vibe": "NYC Cabbie","instructions": "Gruff, quick, clipped rhythm; no-nonsense with dry humor."}, {"vibe": "Connoisseur","instructions": "Slight French accent; refined, warm; deliberate pauses."}, {"vibe": "Patient Teacher","instructions": "Warm, instructive; slow, articulate; supportive."}, {"vibe": "Smooth Jazz DJ","instructions": "Deep, velvety; melodic phrasing; relaxed vibe."}, {"vibe": "Medieval Knight","instructions": "Deep, formal; noble cadence with archaic flair."}, {"vibe": "Friendly","instructions": "Cheerful, clear, reassuring; brief pauses after key steps."}, {"vibe": "Gourmet Chef","instructions": "Exuberant Italian chef; passionate and persuasive."}, {"vibe": "Old-Timey","instructions": "Refined, theatrical; crisp enunciation; steady cadence."}, {"vibe": "Noir Detective","instructions": "Cool, detached; slow and deliberate with dramatic pauses."}, {"vibe": "Emo Teenager","instructions": "Sarcastic, disinterested; monotone with sighs."}, {"vibe": "Sports Coach","instructions": "Energetic, animated; rapid for action; positive energy."}, {"vibe": "Sympathetic","instructions": "Warm, empathetic, professional; calm delivery."}, {"vibe": "Robot","instructions": "Monotone, mechanical; efficient and precise."}, {"vibe": "Eternal Optimist","instructions": "Warm, upbeat; solution-oriented; emphasize key words."}, {"vibe": "Auctioneer","instructions": "Fast-paced, rhythmic; exciting and persuasive."}, {"vibe": "Dramatic (Alt)","instructions": "Low, hushed; serious and mysterious; slow pacing."}, {"vibe": "True Crime Buff","instructions": "Deep, hushed; short, rhythmic phrasing; ominous tone."}, ] TONE_DESCRIPTIONS = { "Informative": "Clear, factual, and helpful; prioritizes clarity over flair.", "Authoritative": "Confident, expert, and decisive; strong statements and tight structure.", "Inspirational": "Uplifting, aspirational, and future-focused; evokes possibility.", "Empathetic": "Warm, validating, and supportive; acknowledges feelings.", "Persuasive": "Benefit-led, objection-aware, and action-oriented.", "Educational": "Stepwise, scaffolded explanations with simple examples.", "Conversational": "Casual and back-and-forth; shorter sentences and light callbacks.", "Humorous": "Light wit, playful framing, and occasional punchlines.", "Storytelling": "Narrative-driven; uses scenes, stakes, and callbacks.", "Casual/Relaxed": "Easy-going, low-pressure pacing with informal phrasing.", "Charming": "Personable and a bit witty; smiles you can hear.", "Passionate": "High-energy enthusiasm and vivid phrasing.", "Dramatic": "Tension-building, cinematic rhythm; pauses for effect.", "Enthusiastic": "Upbeat momentum, positive framing, high engagement.", "Optimistic": "Hopeful outlook; solution-oriented framing.", "Mysterious": "Intriguing hints, restrained reveals, and suspense." } # ---------------- Helpers ---------------- def slugify(text: str) -> str: keep = "-_.() abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789" cleaned = "".join(c for c in text if c in keep).strip().replace(" ", "-") return cleaned.lower() or f"podcast-{int(time.time())}" def now_iso() -> str: return dt.datetime.now().strftime("%Y-%m-%d %H:%M:%S") def ensure_dirs(base: Path): base.mkdir(parents=True, exist_ok=True) (base / "parts").mkdir(parents=True, exist_ok=True) def id3v2_strip(b: bytes) -> bytes: if len(b) >= 10 and b[0:3] == b"ID3": size = ((b[6] & 0x7f) << 21) | ((b[7] & 0x7f) << 14) | ((b[8] & 0x7f) << 7) | (b[9] & 0x7f) header_len = 10 + size return b[header_len:] return b def id3v1_strip(b: bytes) -> bytes: if len(b) >= 128 and b[-128:-125] == b"TAG": return b[:-128] return b def safe_concat_mp3(mp3_paths, out_path: Path): with open(out_path, "wb") as out: for i, p in enumerate(mp3_paths): data = Path(p).read_bytes() if i == 0: out.write(data) else: frames = id3v1_strip(id3v2_strip(data)) out.write(frames) def read_docx_text(docx_bytes: bytes) -> str: try: with zipfile.ZipFile(io.BytesIO(docx_bytes)) as z: with z.open("word/document.xml") as f: xml = f.read() root = ET.fromstring(xml) ns = {"w": "http://schemas.openxmlformats.org/wordprocessingml/2006/main"} texts = [t.text for t in root.findall(".//w:t", ns) if t.text] return "\\n".join(texts).strip() except Exception: return "" def load_index(): DATA_DIR.mkdir(exist_ok=True, parents=True) idx = DATA_DIR / "index.json" if idx.exists(): try: return json.loads(idx.read_text()) except Exception: pass return {"podcasts": []} def save_index(index): (DATA_DIR / "index.json").write_text(json.dumps(index, indent=2)) def uniquify_title(title: str, index: dict) -> str: base = title.strip() import re as _re m = _re.search(r"\\s*\\((\\d+)\\)\\s*$", base) if m: base = base[:m.start()].rstrip() existing = {(p.get("title","")).strip().lower() for p in index.get("podcasts", [])} if base.lower() not in existing: return base n = 2 while f"{base} ({n})".lower() in existing: n += 1 return f"{base} ({n})" def uniquify_slug(slug: str, index: dict) -> str: base = slug.strip().lower() existing = {(p.get("slug","")).strip().lower() for p in index.get("podcasts", [])} if base not in existing: return base n = 2 while f"{base}-{n}" in existing: n += 1 return f"{base}-{n}" def get_podcast_dir(slug: str) -> Path: return DATA_DIR / slug def load_definition(slug: str): f = get_podcast_dir(slug) / "definition.json" if f.exists(): return json.loads(f.read_text()) return None def save_definition(slug: str, definition: dict): pdir = get_podcast_dir(slug) ensure_dirs(pdir) (pdir / "definition.json").write_text(json.dumps(definition, indent=2)) def load_script(slug: str): f = get_podcast_dir(slug) / "script.json" if f.exists(): return json.loads(f.read_text()) return None def save_script(slug: str, script: dict): pdir = get_podcast_dir(slug) ensure_dirs(pdir) (pdir / "script.json").write_text(json.dumps(script, indent=2)) def part_path(slug: str, i: int) -> Path: return get_podcast_dir(slug) / "parts" / f"{i:03d}.mp3" def full_mp3_path(slug: str) -> Path: return get_podcast_dir(slug) / f"{slug}.mp3" def compute_text_hash(text: str) -> str: return hashlib.sha256(text.encode("utf-8")).hexdigest()[:16] # Label stripping for TTS _LABEL_RE = re.compile(r'^\s*(?:\[?\s*([A-Za-z0-9 ._\-]{1,40})\s*\]?\s*[:\-—]\s*)(.*)$') def strip_speaker_label(text: str) -> str: m = _LABEL_RE.match(text or "") return (m.group(2).strip() if m else (text or "").strip()) # -------- Voice helpers (fix selectbox issues) -------- def voice_label_for(voice: str) -> str: v = (voice or "").strip() for item in VOICES: if item["voice"].lower() == v.lower(): return f"{item['voice']} ({item['gender']})" return f"{v or 'Alloy'} (Female)" def ensure_select_value(key: str, options: list, preferred_label: str, default_index: int): if preferred_label in options: st.session_state[key] = preferred_label else: try: token = (preferred_label or "").split(" ")[0] except Exception: token = "" idx = next((i for i, opt in enumerate(options) if opt.split(" ")[0].lower() == token.lower()), default_index) st.session_state[key] = options[idx] # ---------------- OpenAI HTTP helpers ---------------- _OPENAI_API_BASE = os.environ.get("OPENAI_API_BASE", "https://api.openai.com/v1") _SSL_CTX = ssl.create_default_context() def _bearer_headers(extra=None): key = os.environ.get("OPENAI_API_KEY", "").strip() if not key: raise RuntimeError("OPENAI_API_KEY environment variable is not set.") base = {"Authorization": f"Bearer {key}"} if extra: base.update(extra) return base def http_post_json(path: str, payload: dict, expect_json=True, as_bytes=False): url = f"{_OPENAI_API_BASE.rstrip('/')}/{path.lstrip('/')}" data = json.dumps(payload).encode("utf-8") req = urllib.request.Request(url, data=data, method="POST") hdrs = _bearer_headers({"Content-Type": "application/json"}) for k,v in hdrs.items(): req.add_header(k, v) with urllib.request.urlopen(req, context=_SSL_CTX, timeout=120) as resp: raw = resp.read() if as_bytes: return raw if expect_json: return json.loads(raw.decode("utf-8")) return raw def select_voice_name(v: str) -> str: return (v or "alloy").strip().lower() def vibe_instructions(vibe_name: str) -> str: for v in VIBES: if v["vibe"] == vibe_name: return v["instructions"].strip() return "" # ---------------- Script generation (2-pass) ---------------- TONE_DESCRIPTIONS = TONE_DESCRIPTIONS # already defined above def _build_prompts(defn: dict): title = defn.get("title","") episode = defn.get("episode","") topic = defn.get("topic","") tone = defn.get("tone","") h1 = defn.get("host1",{}); h2 = defn.get("host2",{}) vibe1 = h1.get("vibe",""); vibe2 = h2.get("vibe","") vibe1_instr = vibe_instructions(vibe1); vibe2_instr = vibe_instructions(vibe2) tone_desc = TONE_DESCRIPTIONS.get(tone, "") system_prompt = ( "You are a seasoned podcast scriptwriter. Return ONLY JSON.\\n" "JSON shape MUST be: {\\\"turns\\\":[{\\\"speaker\\\":0|1, \\\"text\\\":\\\"...\\\"}, ...]}.\\n" "Requirements:\\n" "- EXACTLY N exchanges (2*N turns), strictly alternating speakers 0 then 1, then 0 then 1, ...\\n" "- Keep each turn ~1–3 sentences; compress ideas if needed, DO NOT reduce turn count.\\n" "- Dialog may optionally begin with a speaker label (e.g., \\\"Alex:\\\" or \\\"Host 0:\\\"), which will be preserved.\\n" "- The 'speaker' numeric field indicates who is speaking." ) support = "\\n\\n".join(defn.get("supporting_texts") or []) user_prompt = f"""Podcast: {title} Episode: {episode} Topic (paraphrase allowed for natural delivery): {topic} Script Tone: {tone} — {tone_desc} Host 0 - Name: {h1.get('name','Host 0')} - Voice (TTS target): {h1.get('voice','Alloy')} - Vibe: {vibe1} - Vibe instructions: {vibe1_instr[:800]} Host 1 - Name: {h2.get('name','Host 1')} - Voice (TTS target): {h2.get('voice','Echo')} - Vibe: {vibe2} - Vibe instructions: {vibe2_instr[:800]} Intro/Outro directives: - FIRST LINE (speaker 0) should INTRODUCE the podcast title ("{title}"), the episode ("{episode}"), and the subject ("{topic}") — feel free to paraphrase the subject for naturalness — and greet Host 1 by name. - Host 1 (their first line) acknowledges Host 0 by name. - FINAL LINES provide a succinct OUTRO that sums things up. N = {int(defn.get("exchanges", DEFAULT_EXCHANGES))} If helpful, incorporate relevant information from the supporting materials (optional). Keep tone aligned with the Script Tone. Supporting materials (optional): {support} OUTPUT SCHEMA (return ONLY JSON): {{"turns":[{{"speaker":0,"text":"(You may start with an optional speaker label like 'Alex: ' before the text)"}}, ...]}}""" return system_prompt, user_prompt def _chat_json(system_prompt: str, user_prompt: str): if OpenAI is not None: try: client = OpenAI(api_key=os.environ.get("OPENAI_API_KEY","").strip()) completion = client.chat.completions.create( model="gpt-4o-mini", response_format={"type": "json_object"}, messages=[{"role": "system", "content": system_prompt},{"role": "user", "content": user_prompt}], temperature=0.6, ) return json.loads(completion.choices[0].message.content) except Exception: pass payload = {"model":"gpt-4o-mini","response_format":{"type":"json_object"},"messages":[{"role":"system","content":system_prompt},{"role":"user","content":user_prompt}], "temperature":0.6} resp = http_post_json("/chat/completions", payload, expect_json=True) return json.loads(resp["choices"][0]["message"]["content"]) def _continue_json(defn: dict, existing_turns: list, need_total: int): title = defn.get("title",""); episode = defn.get("episode",""); topic = defn.get("topic","") tone = defn.get("tone",""); tone_desc = TONE_DESCRIPTIONS.get(tone, "") n_have = len(existing_turns); next_speaker = 0 if n_have % 2 == 0 else 1 cont_system = ("Return ONLY JSON {\"turns\":[...]}. Continue until total turns == NeedTotal. " "Strict alternation; ~1–3 sentences each; keep Script Tone consistent. Labels allowed.") context_tail = json.dumps(existing_turns[-6:], ensure_ascii=False) cont_user = f"""Podcast: {title} — Episode: {episode} — Topic: {topic} Script Tone: {tone} — {tone_desc} We currently have {n_have} turns. NeedTotal = {need_total}. Next speaker should be {next_speaker}. Context (last turns): {context_tail} Return ONLY the additional turns needed to reach NeedTotal. """ try: data = _chat_json(cont_system, cont_user) return data.get("turns", []) except Exception: return [] def generate_script(definition: dict) -> dict: exchanges = int(definition.get("exchanges", DEFAULT_EXCHANGES)) need_total = exchanges * 2 system_prompt, user_prompt = _build_prompts(definition) try: data = _chat_json(system_prompt, user_prompt) except Exception: data = {"turns": []} turns = (data.get("turns") or []) if len(turns) < need_total: extra = _continue_json(definition, turns, need_total) if extra: turns += extra while len(turns) < need_total: i = len(turns); who = 0 if i % 2 == 0 else 1 host = definition.get("host1") if who == 0 else definition.get("host2") name = (host or {}).get("name", f"Host {who}") topic = definition.get("topic",""); tone = definition.get("tone","") turns.append({"speaker": who, "text": f"{name}: Quick takeaway on {topic.lower()}—in a {tone.lower()} style."}) data["turns"] = turns[:need_total] data["prompt"] = {"system": system_prompt, "user": user_prompt} return data def validate_and_fix_script(definition: dict, script: dict) -> dict: exchanges = int(definition.get("exchanges", DEFAULT_EXCHANGES)) total_needed = exchanges * 2 fixed = [] for t in script.get("turns") or []: who = 0 if int(t.get("speaker", 0)) == 0 else 1 text = str(t.get("text", "")).strip() or "…" fixed.append({"speaker": who, "text": text}) out = [] for i in range(total_needed): expected = 0 if i % 2 == 0 else 1 if i < len(fixed): out.append({"speaker": expected, "text": fixed[i]["text"]}) else: host = definition.get("host1") if expected == 0 else definition.get("host2") name = (host or {}).get("name", f"Host {expected}") topic = definition.get("topic",""); tone = definition.get("tone","") out.append({"speaker": expected, "text": f"{name}: Another point on {topic.lower()}—kept {tone.lower()}."}) script["turns"] = out[:total_needed] return script # ---------------- TTS ---------------- def tts_synthesize(text: str, voice: str, instructions: str) -> bytes: voice_name = select_voice_name(voice) if OpenAI is not None: try: client = OpenAI(api_key=os.environ.get("OPENAI_API_KEY","").strip()) with client.audio.speech.with_streaming_response.create( model="gpt-4o-mini-tts", voice=voice_name, input=text, format="mp3", instructions=instructions or None ) as response: return response.read() except Exception: pass payload = {"model": "gpt-4o-mini-tts", "voice": voice_name, "input": text, "format": "mp3"} if instructions: payload["instructions"] = instructions return http_post_json("/audio/speech", payload, expect_json=False, as_bytes=True) def synthesize_all_parts(slug: str, script: dict, definition: dict, progress_cb=None): pdir = get_podcast_dir(slug); ensure_dirs(pdir) turns = script.get("turns", []); total = len(turns) cache_map = {} cache_file = pdir / "part_hashes.json" if cache_file.exists(): try: cache_map = json.loads(cache_file.read_text()) except Exception: cache_map = {} h1 = definition.get("host1",{}); h2 = definition.get("host2",{}) v1 = vibe_instructions(h1.get("vibe","")); v2 = vibe_instructions(h2.get("vibe","")) tone = definition.get("tone",""); tone_desc = TONE_DESCRIPTIONS.get(tone, "") extra_tone = f" | Global Script Tone: {tone} — {tone_desc}" if tone_desc else f" | Global Script Tone: {tone}" voice1 = h1.get("voice","Alloy"); voice2 = h2.get("voice","Alloy") per_turn_ms = []; t0 = time.time() for i, turn in enumerate(turns): who = int(turn.get("speaker", 0)) raw_text = (turn.get("text") or "").strip() tts_text = strip_speaker_label(raw_text) voice = voice1 if who == 0 else voice2 instr = (v1 if who == 0 else v2) + extra_tone h = compute_text_hash(f"{who}:{voice}:{instr}:{tts_text}") partfile = part_path(slug, i) if cache_map.get(f"{i}") == h and partfile.exists(): if progress_cb: progress_cb(i+1, total, f"Skipped (unchanged) part {i+1}/{total}") per_turn_ms.append(0.0); continue if progress_cb: progress_cb(i+1, total, f"Synthesizing part {i+1}/{total} (voice: {voice})…") s = time.time(); audio = tts_synthesize(tts_text, voice, instr); e = time.time() partfile.write_bytes(audio); cache_map[f"{i}"] = h per_turn_ms.append((e - s) * 1000.0) cache_file.write_text(json.dumps(cache_map, indent=2)) parts = [part_path(slug, i) for i in range(total)] out = full_mp3_path(slug) if progress_cb: progress_cb(total, total, "Concatenating MP3…") safe_concat_mp3(parts, out) total_ms = (time.time() - t0) * 1000.0 return out, per_turn_ms, total_ms # ---------------- UI helpers ---------------- def on_title_change(slug: str, key: str): new_title = st.session_state.get(key, "").strip() if new_title: do_rename(slug, new_title) st.toast('Title updated') def export_zip(slug: str) -> bytes: pdir = get_podcast_dir(slug) mem = io.BytesIO() with zipfile.ZipFile(mem, "w", compression=zipfile.ZIP_DEFLATED) as z: for root, _, files in os.walk(pdir): for f in files: path = Path(root) / f arc = str(path.relative_to(pdir.parent)) z.write(path, arcname=arc) mem.seek(0) return mem.read() def host_picker(label_prefix: str, default_name="", key_prefix="h"): cols = st.columns(3) name_key = f"{key_prefix}_name" name = cols[0].text_input(f"{label_prefix} Name", value=st.session_state.get(name_key, default_name), key=name_key) voice_options = [f"{v['voice']} ({v['gender']})" for v in VOICES] default_voice = "Alloy" if key_prefix == "h1" else "Echo" default_voice_index = next((i for i, opt in enumerate(voice_options) if opt.split(" ")[0].lower()==default_voice.lower()), 0) voice_key = f"{key_prefix}_voice" desired_label = st.session_state.get(voice_key, voice_options[default_voice_index]) ensure_select_value(voice_key, voice_options, desired_label, default_voice_index) voice_choice = cols[1].selectbox(f"{label_prefix} Voice", options=voice_options, key=voice_key) voice = voice_choice.split(" ")[0] vibe_names = [v["vibe"] for v in VIBES] default_vibe = "Calm" if key_prefix == "h1" else "Dramatic" default_vibe_index = vibe_names.index(default_vibe) if default_vibe in vibe_names else 0 vibe_key = f"{key_prefix}_vibe" if st.session_state.get(vibe_key) not in vibe_names: st.session_state[vibe_key] = vibe_names[default_vibe_index] vibe = cols[2].selectbox(f"{label_prefix} Vibe", options=vibe_names, key=vibe_key) return {"name": name, "voice": voice, "vibe": vibe} def open_details(slug: str): st.session_state["mode"] = "details"; st.session_state["selected_slug"] = slug def open_edit(slug: str): st.session_state['mode'] = 'edit'; st.session_state['selected_slug'] = slug def set_mode(mode: str): st.session_state["mode"] = mode def do_rename(slug: str, new_title: str): index = load_index() for p in index["podcasts"]: if p["slug"] == slug: p["title"] = new_title.strip() or p["title"] save_index(index); break def do_delete(slug: str): index = load_index() index["podcasts"] = [p for p in index["podcasts"] if p["slug"] != slug] save_index(index) pdir = get_podcast_dir(slug) if pdir.exists(): shutil.rmtree(pdir, ignore_errors=True) def clone_podcast(slug: str): definition = load_definition(slug) if not definition: st.toast("Unable to clone: missing definition", icon="⚠️"); return idx = load_index() new_title = uniquify_title(definition.get("title","Untitled"), idx) ep = (definition.get("episode") or "").strip() m = re.search(r'(\\d+)\\s*$', ep) new_episode = re.sub(r'(\\d+)\\s*$', lambda mm: str(int(mm.group(1))+1), ep) if m else "Episode 1" st.session_state["new_title"] = new_title st.session_state["new_episode"] = new_episode st.session_state["new_topic"] = definition.get("topic","") st.session_state["new_exchanges"] = int(definition.get("exchanges", DEFAULT_EXCHANGES)) st.session_state["new_tone"] = definition.get("tone","Informative") h1 = definition.get("host1",{}); h2 = definition.get("host2",{}) st.session_state["h1_name"] = h1.get("name","Host One") st.session_state["h2_name"] = h2.get("name","Host Two") st.session_state["h1_voice"] = voice_label_for(h1.get("voice","Alloy")) st.session_state["h2_voice"] = voice_label_for(h2.get("voice","Echo")) st.session_state["h1_vibe"] = h1.get("vibe","Calm") st.session_state["h2_vibe"] = h2.get("vibe","Dramatic") set_mode("create") # ---------------- UI components ---------------- def list_podcasts_ui(index): st.header("Your Podcasts") # Create button directly under header (no icon) st.button("Create New Podcast", key="create_new_top", on_click=lambda: set_mode("create")) # Handle delete via query params (from JS confirm link) qp = st.experimental_get_query_params() act = qp.get("action", []) slug_vals = qp.get("slug", []) act_val = (act[0] if isinstance(act, list) and act else act) or "" slug_val = (slug_vals[0] if isinstance(slug_vals, list) and slug_vals else slug_vals) or "" if act_val == "x_delete_disabled" and slug_val: do_delete(slug_val) try: st.experimental_set_query_params() # clear all params except Exception: pass st.success("Podcast deleted.") st.rerun() # Pagination state pods = list(index.get("podcasts", [])) pods.sort(key=lambda p: p.get("created_at",""), reverse=True) if "list_page" not in st.session_state: st.session_state["list_page"] = 0 if "list_page_size" not in st.session_state: st.session_state["list_page_size"] = PAGE_SIZE_DEFAULT if not pods: st.info("No podcasts yet. Click **Create New Podcast** to get started."); return total = len(pods) page_size = st.session_state["list_page_size"] max_page = (total - 1) // page_size start = st.session_state["list_page"] * page_size end = min(start + page_size, total) view = pods[start:end] # Lightweight actions state if "dl_zip_slug" not in st.session_state: st.session_state["dl_zip_slug"] = None if "confirm_delete_slug" not in st.session_state: st.session_state["confirm_delete_slug"] = None for p in view: slug = p["slug"]; mp3 = full_mp3_path(slug) cols = st.columns([3,6,1,1,1,1,1], gap="small") with cols[0]: title_key = f"title_{slug}" if title_key not in st.session_state: st.session_state[title_key] = p["title"] new_title = st.text_input("", key=title_key, label_visibility="collapsed") if new_title != p["title"] and new_title.strip(): p["title"] = new_title.strip(); save_index(index) with cols[1]: if mp3.exists(): with open(mp3, "rb") as f: st.audio(f.read(), format="audio/mp3") else: st.caption("No audio yet") with cols[2]: if mp3.exists(): with open(mp3, "rb") as _f: st.download_button("MP3", data=_f.read(), file_name=f"{slug}.mp3", key=f"dlmp3_{slug}", help="Download MP3") else: st.caption("No audio yet") with cols[3]: zbytes = export_zip(slug) st.download_button("Export", data=zbytes, file_name=f"{slug}.zip", key=f"save_zip_{slug}", help="Export Podcast (.zip)") with cols[4]: st.button("Edit", key=f"edit_{slug}", help="Edit Script", on_click=lambda s=slug: open_edit(s)) with cols[5]: st.button("Clone", key=f"clone_{slug}", help="Clone", on_click=lambda s=slug: clone_podcast(s)) with cols[6]: if not HIDE_DELETE: st.markdown(f'Delete', unsafe_allow_html=True) # Delete confirmation UI (global, appears below list) if st.session_state.get("confirm_delete_slug"): slug_c = st.session_state.get("confirm_delete_slug") st.warning(f"Delete podcast '{slug_c}'? This cannot be undone.") c1, c2 = st.columns([1,1]) with c1: if st.button("Confirm Delete", key="confirm_delete_now"): do_delete(slug_c) st.session_state["confirm_delete_slug"] = None st.success("Podcast deleted.") st.rerun() with c2: if st.button("Cancel", key="cancel_delete"): st.session_state["confirm_delete_slug"] = None st.rerun() # Bottom pagination controls with icons if not HIDE_PAGINATION: colp1, colp2, colp3 = st.columns([2,6,2]) with colp1: if st.button("◀ Prev", disabled=st.session_state["list_page"] <= 0): st.session_state["list_page"] = max(0, st.session_state["list_page"] - 1); st.rerun() with colp2: st.caption(f"Showing {page_size} per page — Page {st.session_state['list_page']+1} of {max_page+1} (Total {total})") with colp3: if st.button("Next ▶", disabled=st.session_state["list_page"] >= max_page): st.session_state["list_page"] = min(max_page, st.session_state["list_page"] + 1); st.rerun() st.markdown('', unsafe_allow_html=True) def create_form_ui(index): if st.session_state.get("generating") and st.session_state.get("pending_definition"): run_generation_flow(st.session_state["pending_definition"]); return st.header("Create New Podcast") # Feeling Lucky button directly beneath header (no icon) lucky = st.button("Feeling Lucky", key="lucky_btn") if lucky: sample_titles = ["Hot Takes Live","Acme Insider","Two Brains, One Mic","Curious Minds","Deep Dive Daily","Chatter Box", "Morning Buzz","Brainstorm Café","Weekend Roundtable","Idea Exchange","Think Tank Talks", "Pulse Check","Off the Record","The Daily Spark","Future Shock","Culture Chat","Signal & Noise", "The Conversation Lab","Trending Now","Mind Meld","Coffee & Code","Beyond the Hype","Lightning Round", "The Big Why","The Briefing Room","Micro Chats"] sample_topics = ["The future of AI in everyday life","How remote work changed cities","Five technologies shaping the next decade", "Lessons from famous product flops","What makes a great brand story","AI in healthcare","Space tourism prospects", "The psychology of habits","Virtual reality in classrooms","Cybersecurity for everyone","Quantum computing explained", "Designing ethical AI","Smart cities and sensors","Robotics at home"] sample_names = ["Alex","Sam","Jordan","Taylor","Casey","Riley","Morgan","Avery","Jamie","Quinn","Chris","Pat","Devon","Skyler","Cameron","Drew","Kendall","Peyton","Blake","Hayden","Logan","Shawn","Noel","Emerson","Harper","Reese","Rowan","Sage"] st.session_state["new_title"] = random.choice(sample_titles) st.session_state["new_topic"] = random.choice(sample_topics) n1 = random.choice(sample_names); n2 = random.choice([n for n in sample_names if n != n1]) st.session_state["h1_name"] = n1; st.session_state["h2_name"] = n2 v1 = random.choice(VOICES); v2 = random.choice([v for v in VOICES if v["voice"] != v1["voice"]]) st.session_state["h1_voice"] = f"{v1['voice']} ({v1['gender']})"; st.session_state["h2_voice"] = f"{v2['voice']} ({v2['gender']})" vibe_names = [v["vibe"] for v in VIBES]; vb1 = random.choice(vibe_names); vb2 = random.choice([v for v in vibe_names if v != vb1]) st.session_state["h1_vibe"] = vb1; st.session_state["h2_vibe"] = vb2 st.session_state["new_tone"] = random.choice(list(TONE_DESCRIPTIONS.keys())); st.session_state["new_episode"]="Episode 1" st.rerun() with st.form("create_form"): col_title, col_episode = st.columns(2) with col_title: title = st.text_input("Title", key="new_title") with col_episode: episode = st.text_input("Episode", value=st.session_state.get("new_episode", "Episode 1"), key="new_episode") col_topic, col_files = st.columns(2) with col_topic: topic = st.text_area("Topic", key="new_topic", height=98, placeholder="What should they talk about?") with col_files: files = st.file_uploader("Supporting file(s)", accept_multiple_files=True, type=["txt","docx"], key="new_support") col_exch, col_tone = st.columns(2) with col_exch: exchanges = st.number_input("Exchanges", min_value=1, max_value=20, value=st.session_state.get("new_exchanges", DEFAULT_EXCHANGES), step=1, key="new_exchanges") with col_tone: tone = st.selectbox("Tone", options=list(TONE_DESCRIPTIONS.keys()), index=list(TONE_DESCRIPTIONS.keys()).index(st.session_state.get("new_tone","Informative")) if st.session_state.get("new_tone") in TONE_DESCRIPTIONS else 0, key="new_tone") st.markdown("**Hosts**") host1 = host_picker("Host 1", default_name=st.session_state.get("h1_name","Host One"), key_prefix="h1") host2 = host_picker("Host 2", default_name=st.session_state.get("h2_name","Host Two"), key_prefix="h2") submitted = st.form_submit_button("Generate Podcast", use_container_width=True) if submitted: if not (title or "").strip(): st.error("Please provide a Title."); return episode_val = (st.session_state.get("new_episode") or "").strip() host1["name"] = st.session_state.get("h1_name", host1.get("name","Host One")) host2["name"] = st.session_state.get("h2_name", host2.get("name","Host Two")) host1["voice"] = (st.session_state.get("h1_voice") or "Alloy").split(" ")[0] host2["voice"] = (st.session_state.get("h2_voice") or "Echo").split(" ")[0] host1["vibe"] = st.session_state.get("h1_vibe", host1.get("vibe","Calm")) host2["vibe"] = st.session_state.get("h2_vibe", host2.get("vibe","Dramatic")) idx = load_index(); title_u = uniquify_title(title, idx); slug = uniquify_slug(slugify(title_u), idx) support_texts = [] for f in files or []: try: raw = f.getvalue() except Exception: raw = b"" if len(raw) > MAX_UPLOAD_BYTES: st.error(f'File "{getattr(f, "name", "(unknown)")}" exceeds 10 MB and was skipped.'); continue name_lower = (getattr(f, 'name', '') or '').lower() if f.type.endswith('text/plain') or name_lower.endswith('.txt'): try: support_texts.append(raw.decode('utf-8', errors='ignore')) except Exception: pass elif name_lower.endswith('.docx'): support_texts.append(read_docx_text(raw)) definition = {"title": title_u.strip(),"episode": episode_val,"slug": slug,"topic": (topic or "").strip(), "exchanges": int(exchanges),"host1": host1,"host2": host2,"tone": tone, "supporting_texts": support_texts,"created_at": now_iso()} st.session_state["pending_definition"] = definition; st.session_state["generating"] = True; st.rerun() # -------- Generation flow -------- def run_generation_flow(definition: dict): index = load_index(); slug = definition["slug"]; pdir = get_podcast_dir(slug); ensure_dirs(pdir) save_definition(slug, definition) index["podcasts"] = [p for p in index["podcasts"] if p["slug"] != slug] index["podcasts"].append({"title": definition["title"], "slug": slug, "created_at": definition["created_at"]}) save_index(index) st.subheader("Processing"); prog = st.progress(0.0, text="Starting…") with st.status("Generating script via Chat API…", state="running", expanded=False) as box: def progress_cb(done, total, msg): frac = min(max(done / max(total, 1), 0.0), 1.0); prog.progress(frac, text=msg); box.write(msg) script = generate_script(definition); script = validate_and_fix_script(definition, script); save_script(slug, script) out_mp3, per_turn_ms, total_ms = synthesize_all_parts(slug, script, definition, progress_cb=progress_cb) box.write("Concatenation complete.") try: avg_ms = sum(per_turn_ms) / max(1, sum(1 for x in per_turn_ms if x > 0)) except Exception: avg_ms = 0.0 box.write(f"Timing: total {total_ms:.0f} ms • avg per synthesized turn {avg_ms:.0f} ms") box.update(label="Done!", state="complete", expanded=False) left, right, right2 = st.columns([8,2,2]) with left: with open(out_mp3, "rb") as f: audio_bytes = f.read(); st.audio(audio_bytes, format="audio/mp3") st.session_state.pop("pending_definition", None); st.session_state["generating"] = False # -------- Details / Edit -------- def details_ui(slug: str): definition = load_definition(slug); script = load_script(slug) or {"turns": []} if script and "prompt" not in script: script["prompt"] = {} st.header(definition.get("title", slug)); st.caption(definition.get("created_at","")) mp3 = full_mp3_path(slug) if mp3.exists(): with open(mp3, "rb") as f: st.audio(f.read(), format="audio/mp3") with open(mp3, "rb") as f: _bytes = f.read() st.download_button("MP3", data=_bytes, file_name=f"{slug}.mp3", key=f"dlmp3_details_{slug}", help="Download MP3") else: st.warning("Audio not found. You can regenerate by editing the script and saving.") col1, col2, col3 = st.columns(3) with col1: st.button("Edit Script", on_click=lambda: open_edit(slug), use_container_width=True) with col2: if st.button("Re-generate Script", use_container_width=True): st.session_state["pending_definition"] = definition; st.session_state["generating"] = True; set_mode("create"); st.rerun() with col3: zbytes = export_zip(slug) st.download_button("Export", data=zbytes, file_name=f"{slug}.zip", key=f"dlzip_details_{slug}", help="Export Podcast (.zip)") with st.expander("Definition (read-only)"): st.json(definition, expanded=False) with st.expander("Script (read-only)"): st.json(script, expanded=False) def edit_ui(slug: str): definition = load_definition(slug); script = load_script(slug) if not script: st.error("No script to edit."); return st.header(f"Details: {definition.get('title', slug)}") with st.expander("Definition (read-only)"): st.json(definition, expanded=False) with st.expander("Script (read-only)"): script_ro = load_script(slug) or {"turns": []} if script_ro and "prompt" not in script_ro: script_ro["prompt"] = {} st.json(script_ro, expanded=False) if "edit_expanded" not in st.session_state: st.session_state["edit_expanded"] = False if "edit_processing" not in st.session_state: st.session_state["edit_processing"] = False turns = script.get("turns", []); h1n=(definition.get("host1") or {}).get("name","Speaker 0"); h2n=(definition.get("host2") or {}).get("name","Speaker 1") with st.expander("Edit Script", expanded=st.session_state["edit_expanded"]): with st.form("edit_script_form"): for i, turn in enumerate(turns): who = int(turn.get("speaker", 0)); host_name = h1n if who == 0 else h2n st.markdown(f"**Turn {i+1} — {host_name}**") orig = turn.get("text", ""); new_txt = st.text_area("", value=orig, key=f"edit_line_{i}", height=100, label_visibility="collapsed") if new_txt != orig: turns[i]["text"] = new_txt saved = st.form_submit_button("Save Changes & Regenerate Changed Audio", type="primary", use_container_width=True) if saved: script = validate_and_fix_script(definition, {"turns": turns}); save_script(slug, script) st.session_state["edit_expanded"] = False; st.session_state["edit_processing"] = True; st.rerun() if st.session_state.get("edit_processing"): st.subheader("Processing"); prog = st.progress(0.0, text="Starting…") with st.status("Regenerating updated audio…", state="running", expanded=False) as box: def progress_cb(done, total, msg): prog.progress(min(max(done/max(total,1),0.0),1.0), text=msg); box.write(msg) out_mp3, per_turn_ms, total_ms = synthesize_all_parts(slug, script, definition, progress_cb=progress_cb) box.write("Concatenation complete."); box.update(label="Done!", state="complete", expanded=False) left, right = st.columns([8,1]) with left: with open(out_mp3, "rb") as f: audio_bytes = f.read(); st.audio(audio_bytes, format="audio/mp3") st.session_state["edit_processing"] = False; st.info("You can reopen **Edit Script** to make further changes.") # -------- App entry -------- def main(): st.set_page_config(page_title="Podcast Creator", layout="wide") st.markdown('', unsafe_allow_html=True) st.title(APP_TITLE) current_mode = st.session_state.get("mode", "list") # Back to List (no icon) if current_mode != "list": st.button("Back to List", on_click=lambda: set_mode("list")) DATA_DIR.mkdir(parents=True, exist_ok=True) if "mode" not in st.session_state: st.session_state["mode"] = "list" index = load_index() mode = st.session_state.get("mode", "list") if mode == "list": list_podcasts_ui(index) elif mode == "create": create_form_ui(index) elif mode == "details": slug = st.session_state.get("selected_slug") if slug: details_ui(slug) else: st.info("No podcast selected.") set_mode("list") elif mode == "edit": slug = st.session_state.get("selected_slug") if slug: edit_ui(slug) else: st.info("No podcast selected.") set_mode("list") else: set_mode("list") if __name__ == "__main__": main()