Spaces:
Sleeping
Sleeping
| import streamlit as st | |
| from openai import OpenAI | |
| from script_prompts import SYSTEM_MESSAGE_TEXT, SYSTEM_MESSAGE_TH_VO, USER_MESSAGE_TEXT_VO | |
| from prompts import SYSTEM_MESSAGE, USER_MESSAGE | |
| import json | |
| import os | |
| import time | |
| import requests | |
| from bs4 import BeautifulSoup | |
| PERSPECTIVE_INSTRUCTIONS = { | |
| "auto": 'The script should match the perspective (first person, third person, etc) of the reference material.', | |
| "first": 'The script should be written in first person perspective.', | |
| "third": 'The script should be written in third person perspective.', | |
| } | |
| DEFAULT_TONE_INSTRUCTION = 'the same as the tone of the reference material' | |
| DEFAULT_TONE = 'balanced' | |
| def fetch_blog_content(url): | |
| try: | |
| res = requests.get(url, timeout=10) | |
| res.raise_for_status() # Raise an error for bad status codes | |
| soup = BeautifulSoup(res.text, "html.parser") | |
| # A simple extraction: get all text within <p> tags | |
| paragraphs = soup.find_all('p') | |
| content = "\n\n".join([p.get_text() for p in paragraphs]) | |
| return content.strip() | |
| except Exception as e: | |
| st.error(f"Error fetching blog content: {e}") | |
| return "" | |
| # Set Streamlit layout to wide mode | |
| st.set_page_config(layout="wide") | |
| st.title("🎬 AI-Powered Content Planner & Script Writer - Public Blueprints") | |
| st.markdown("Paste a source content for generating a Public Blueprints Content Plan and Scripts.") | |
| # Placeholder at the very top for response time | |
| plan_time_placeholder = st.empty() | |
| script_time_placeholder = st.empty() | |
| # Models to try | |
| OPENAI_MODELS = ["gpt-4o", "gpt-4o-mini", "o3-mini"] | |
| GROQ_MODELS = ["llama-3.3-70b-specdec", "llama-3.3-70b-versatile", "llama-3.1-8b-instant", "deepseek-r1-distill-llama-70b"] | |
| # Sidebar: Model Selection | |
| st.sidebar.subheader("📤 Model for Plan Generation") | |
| blueprint_plan_model = st.sidebar.selectbox( | |
| "Choose model for blueprints content plan:", | |
| GROQ_MODELS + OPENAI_MODELS, | |
| index=0 # Default selection | |
| ) | |
| st.sidebar.subheader("📥 Model for Script Writing") | |
| script_writing_model = st.sidebar.selectbox( | |
| "Choose model for script writing:", | |
| GROQ_MODELS + OPENAI_MODELS, | |
| index=0 # Default selection | |
| ) | |
| # Assign the correct URL based on the selected model | |
| if blueprint_plan_model in GROQ_MODELS: | |
| plan_client = OpenAI(base_url="https://api.groq.com/openai/v1", api_key=os.environ.get("GROQ_API_KEY")) | |
| else: | |
| plan_client = OpenAI(api_key=os.environ.get("OPENAI_API_KEY")) | |
| if script_writing_model in GROQ_MODELS: | |
| script_client = OpenAI(base_url="https://api.groq.com/openai/v1", api_key=os.environ.get("GROQ_API_KEY")) | |
| else: | |
| script_client = OpenAI(api_key=os.environ.get("OPENAI_API_KEY")) | |
| # Layout: Two columns - left for reference document, right for plans and scripts | |
| col_doctext, col_output = st.columns([1, 1]) | |
| # Left Column: Input Source | |
| with col_doctext: | |
| st.subheader("📝 Input Source") | |
| blog_url = st.text_input("Enter a blog URL (optional):", key="blog_url") | |
| # Initialize fetched_content as empty string. | |
| fetched_content = "" | |
| if blog_url: | |
| fetched_content = fetch_blog_content(blog_url) | |
| if fetched_content: | |
| st.success("Blog content fetched successfully!") | |
| else: | |
| st.warning("Unable to fetch blog content. Please paste your content manually.") | |
| # The text area uses fetched_content as its default value. | |
| doctext = st.text_area("Or paste your content here:", value=fetched_content, height=400, key="doctext") | |
| # Right Column: Plan Generation and Script Writing | |
| with col_output: | |
| st.subheader("📋 Generated Content Plans") | |
| time_placeholder = st.empty() | |
| # Button to generate clip plans from the transcript | |
| if st.button("Generate Plan"): | |
| if not doctext.strip(): | |
| st.error("❌ Please enter an input source content.") | |
| else: | |
| with st.spinner("⏳ Generating content plan... Please wait."): | |
| try: | |
| # Prepare prompts for clip plan generation | |
| system_prompt = SYSTEM_MESSAGE | |
| user_prompt = USER_MESSAGE.format(source_content=doctext) | |
| messages = [ | |
| {"role": "system", "content": system_prompt}, | |
| {"role": "user", "content": user_prompt}, | |
| ] | |
| openai_args = { | |
| "model": blueprint_plan_model, | |
| "messages": messages, | |
| "response_format": {"type": "json_object"}, | |
| } | |
| if blueprint_plan_model == "o3-mini": | |
| openai_args["reasoning_effort"] = "low" | |
| else: | |
| openai_args["max_tokens"] = 5000 | |
| openai_args["temperature"] = 0.45 | |
| start_time = time.time() | |
| response = plan_client.chat.completions.create(**openai_args) | |
| end_time = time.time() | |
| elapsed_time = end_time - start_time | |
| plan_time_placeholder.markdown(f"⏱️ **Response Time for Content Planning:** {elapsed_time:.2f} seconds") | |
| generated_response = response.choices[0].message.content.strip() | |
| content_plan = json.loads(generated_response) | |
| # Assume the response JSON has a single key containing a list of clip plans | |
| plan_key = list(content_plan.keys())[0] | |
| blueprint_plans = content_plan.get(plan_key, []) | |
| # Save clip plans in session state so they persist | |
| st.session_state.blueprint_plans = blueprint_plans | |
| # Clear any previous extraction outputs | |
| for i in range(len(blueprint_plans)): | |
| st.session_state.pop(f"extracted_blueprint_{i}", None) | |
| except json.JSONDecodeError: | |
| st.error("⚠️ Failed to parse OpenAI response. Try again.") | |
| except Exception as e: | |
| st.error(f"❌ Error: {str(e)}") | |
| # Display clip plans if they exist in session state | |
| if "blueprint_plans" in st.session_state: | |
| # We'll work with a reference to the clip plans list | |
| updated_blueprint_plans = st.session_state.blueprint_plans | |
| for i, blueprint in enumerate(updated_blueprint_plans): | |
| # Each blueprint is rendered in an expander with editable fields | |
| with st.expander(f"🎬 Blueprint Plan {i + 1}", expanded=True): | |
| st.markdown(f"**Blueprint:** {blueprint.get('Blueprint', 'N/A')}") | |
| st.markdown(f"**Content Focus:** {blueprint.get('Content Focus', 'N/A')}") | |
| st.markdown(f"**Narrative Thread:** {blueprint.get('Narrative Thread', 'N/A')}") | |
| st.markdown(f"**Tag:** {blueprint.get('Tag', 'N/A')}") | |
| # Button for transcript extraction for this clip | |
| if st.button("Generate Script", key=f"extract_{i}"): | |
| with st.spinner("⏳ Generating script... Please wait."): | |
| try: | |
| # Send only the specific (and possibly edited) content focus to the script writer | |
| content_focus_script = updated_blueprint_plans[i].get("Content Focus", "N/A") | |
| script_user_prompt = USER_MESSAGE_TEXT_VO.format( | |
| doctext=doctext, | |
| content_focus=content_focus_script | |
| ) | |
| if updated_blueprint_plans[i].get("Narrative Thread") == 'text': | |
| script_system_prompt = SYSTEM_MESSAGE_TEXT.format( | |
| perspective_instruction=PERSPECTIVE_INSTRUCTIONS.get("auto"), | |
| tone_instruction=DEFAULT_TONE_INSTRUCTION, | |
| ) | |
| else: | |
| script_system_prompt = SYSTEM_MESSAGE_TH_VO.format( | |
| perspective_instruction=PERSPECTIVE_INSTRUCTIONS.get("auto"), | |
| tone_instruction=DEFAULT_TONE_INSTRUCTION, | |
| ) | |
| clipper_messages = [ | |
| {"role": "system", "content": script_system_prompt}, | |
| {"role": "user", "content": script_user_prompt}, | |
| ] | |
| extraction_args = { | |
| "model": script_writing_model, | |
| "messages": clipper_messages, | |
| } | |
| if script_writing_model == "o3-mini": | |
| extraction_args["reasoning_effort"] = "low" | |
| else: | |
| extraction_args["max_tokens"] = 5000 | |
| extraction_args["temperature"] = 0.45 | |
| clipper_response = script_client.chat.completions.create(**extraction_args) | |
| start_time = time.time() | |
| extraction_response = clipper_response.choices[0].message.content.strip() | |
| end_time = time.time() | |
| elapsed_time = end_time - start_time | |
| script_time_placeholder.markdown(f"⏱️ **Response Time for Script Writing:** {elapsed_time:.2f} seconds") | |
| extracted_clip = extraction_response | |
| # Save the extraction result for this clip in session state | |
| st.session_state[f"extracted_clip_{i}"] = extracted_clip | |
| except Exception as e: | |
| st.error(f"❌ Extraction error: {str(e)}") | |
| # Display extraction output if available | |
| if f"extracted_clip_{i}" in st.session_state: | |
| st.markdown("#### 📝Script: ") | |
| st.write(st.session_state[f"extracted_clip_{i}"]) |