Spaces:
Sleeping
Sleeping
| import streamlit as st | |
| import os | |
| import re | |
| import time | |
| import tempfile | |
| import requests | |
| import json | |
| from google import genai | |
| from google.genai import types | |
| import google.generativeai as genai | |
| import io | |
| import base64 | |
| import numpy as np | |
| import cv2 | |
| import logging | |
| import uuid | |
| import subprocess | |
| from pathlib import Path | |
| import urllib.parse | |
| import pandas as pd | |
| import plotly.graph_objects as go | |
| import matplotlib.pyplot as plt | |
| from langchain_google_genai import ChatGoogleGenerativeAI | |
| # For PandasAI using a single dataframe | |
| from pandasai import SmartDataframe | |
| from pandasai.responses.response_parser import ResponseParser | |
| from pandasai.exceptions import InvalidOutputValueMismatch | |
| import base64 | |
| import os | |
| import uuid | |
| import matplotlib | |
| import matplotlib.pyplot as plt | |
| from io import BytesIO | |
| import dataframe_image as dfi | |
| import uuid | |
| from PIL import ImageFont, ImageDraw, Image | |
| import seaborn as sns | |
| #Streamlit response parse | |
| class StreamLitResponse(ResponseParser): | |
| def __init__(self, context): | |
| super().__init__(context) | |
| # Ensure the export directory exists | |
| os.makedirs("./exports/charts", exist_ok=True) | |
| def format_dataframe(self, result): | |
| """ | |
| Convert a DataFrame to an image using dataframe_image, | |
| and return a dict with type 'plot' to match the expected output. | |
| """ | |
| try: | |
| df = result['value'] | |
| # Apply styling if desired | |
| styled_df = df.style | |
| img_path = f"./exports/charts/{uuid.uuid4().hex}.png" | |
| dfi.export(styled_df, img_path) | |
| except Exception as e: | |
| print("Error in format_dataframe:", e) | |
| # Fallback to a string representation if needed | |
| img_path = str(result['value']) | |
| print("response_class_path (dataframe):", img_path) | |
| # Return as a dict with type 'plot' | |
| return {'type': 'plot', 'value': img_path} | |
| def format_plot(self, result): | |
| img_value = result["value"] | |
| # Case 1: If it's a matplotlib figure | |
| if hasattr(img_value, "savefig"): | |
| try: | |
| img_path = f"./exports/charts/{uuid.uuid4().hex}.png" | |
| img_value.savefig(img_path, format="png") | |
| return {'type': 'plot', 'value': img_path} | |
| except Exception as e: | |
| print("Error saving matplotlib figure:", e) | |
| return {'type': 'plot', 'value': str(img_value)} | |
| # Case 2: If it's a file path (e.g., a .png file) | |
| if isinstance(img_value, str) and os.path.isfile(img_value): | |
| return {'type': 'plot', 'value': str(img_value)} | |
| # Case 3: If it's a BytesIO object | |
| if isinstance(img_value, io.BytesIO): | |
| try: | |
| img_path = f"./exports/charts/{uuid.uuid4().hex}.png" | |
| with open(img_path, "wb") as f: | |
| f.write(img_value.getvalue()) | |
| return {'type': 'plot', 'value': img_path} | |
| except Exception as e: | |
| print("Error writing BytesIO to file:", e) | |
| return {'type': 'plot', 'value': str(img_value)} | |
| # Case 4: If it's a base64 string | |
| if isinstance(img_value, str) and (img_value.startswith("iVBOR") or img_value.startswith("data:image")): | |
| try: | |
| # Extract raw base64 if it's a data URI | |
| if "base64," in img_value: | |
| img_value = img_value.split("base64,")[1] | |
| # Decode and save to file | |
| img_path = f"./exports/charts/{uuid.uuid4().hex}.png" | |
| with open(img_path, "wb") as f: | |
| f.write(base64.b64decode(img_value)) | |
| return {'type': 'plot', 'value': img_path} | |
| except Exception as e: | |
| print("Error decoding base64 image:", e) | |
| return {'type': 'plot', 'value': str(img_value)} | |
| # Fallback: Return as a string | |
| return {'type': 'plot', 'value': str(img_value)} | |
| def format_other(self, result): | |
| # For non-image responses, simply return the value as a string. | |
| return {'type': 'text', 'value': str(result['value'])} | |
| guid = uuid.uuid4() | |
| new_filename = f"{guid}" | |
| user_defined_path = os.path.join("./exports/charts/", new_filename) | |
| img_ID = "344744a88ad1098" | |
| img_secret = "3c542a40c215327045d7155bddfd8b8bc84aebbf" | |
| imgur_url = "https://api.imgur.com/3/image" | |
| imgur_headers = {"Authorization": f"Client-ID {img_ID}"} | |
| # ββββββββββ | |
| # Configuration and Logging | |
| # ββββββββββ | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY") | |
| if not GOOGLE_API_KEY: | |
| st.error("Google API Key is missing. Please set it in environment variables or secrets.toml.") | |
| else: | |
| genai.configure(api_key=GOOGLE_API_KEY) | |
| token = os.getenv('HF_API') | |
| headers = {"Authorization": f"Bearer {token}"} | |
| # Pandasai gemini | |
| llm1 = ChatGoogleGenerativeAI( | |
| model="gemini-2.0-flash-thinking-exp", # MODEL REVERTED | |
| temperature=0, | |
| max_tokens=None, | |
| timeout=1000, | |
| max_retries=2 | |
| ) | |
| # ββββββββββ | |
| # Utility Constants | |
| # ββββββββββ | |
| MAX_CHARACTERS = 200000 | |
| def configure_gemini(api_key): | |
| try: | |
| genai.configure(api_key=api_key) | |
| return genai.GenerativeModel('gemini-2.0-flash-thinking-exp') # MODEL REVERTED | |
| except Exception as e: | |
| logger.error(f"Error configuring Gemini: {str(e)}") | |
| raise | |
| # Initialize Gemini model for story generation | |
| model = configure_gemini(GOOGLE_API_KEY) | |
| os.environ["GEMINI_API_KEY"] = GOOGLE_API_KEY | |
| # ββββββββββ | |
| # PandasAI Response for DataFrame | |
| # ββββββββββ | |
| def generateResponse(prompt, df): | |
| """Generate response using PandasAI with SmartDataframe.""" | |
| pandas_agent = SmartDataframe(df, config={"llm": llm1, "custom_whitelisted_dependencies": [ | |
| "os", | |
| "io", | |
| "sys", | |
| "chr", | |
| "glob", | |
| "b64decoder", | |
| "collections", | |
| "geopy", | |
| "geopandas", | |
| "wordcloud", | |
| "builtins" | |
| ], "response_parser": StreamLitResponse,"security":"none", "enable_cache": False, "save_charts":False, "save_charts_path":user_defined_path}) | |
| return pandas_agent.chat(prompt) | |
| # ββββββββββ | |
| # DataFrame-Based Story Generation (for CSV/Excel files) | |
| # ββββββββββ | |
| def generate_story_from_dataframe(df, story_type): | |
| """ | |
| Generate a data-based story from a CSV/Excel file. | |
| """ | |
| df_json = json.dumps(df.to_dict()) | |
| prompts = { | |
| "free_form": "You are a professional storyteller. Using the following dataset in JSON format: " + df_json + | |
| ", create an engaging and concise story. ", | |
| "children": "You are a professional storyteller writing stories for children. Using the following dataset in JSON format: " + df_json + | |
| ", create a fun, factual, and concise story appropriate for children. ", | |
| "education": "You are a professional storyteller writing educational content. Using the following dataset in JSON format: " + df_json + | |
| ", create an informative, engaging, and concise educational story. Include interesting facts while keeping it engaging. ", | |
| "business": "You are a professional storyteller specializing in business narratives. Using the following dataset in JSON format: " + df_json + | |
| ", create a professional, concise business story with practical insights. ", | |
| "entertainment": "You are a professional storyteller writing creative entertaining stories. Using the following dataset in JSON format: " + df_json + | |
| ", create an engaging and concise entertaining story. Include interesting facts while keeping it engaging. " | |
| } | |
| story_prompt = prompts.get(story_type, prompts["free_form"]) | |
| full_prompt = ( | |
| story_prompt + | |
| "Write a story for a narrator meaning no labels of pages or sections the story should just flow. Divide your story into exactly 5 short and concise sections separated by [break]. " + | |
| "For each section, provide a brief narrative analysis and include, within angle brackets <>, a clear and plain-text description of a chart visualization that would represent the data. " + | |
| "Limit the descriptions by specifying only charts. " + | |
| "Ensure that your response contains only natural language descriptions examples: 'bar chart of', 'pie chart of' , 'histogram of', 'scatterplot of', 'boxplot of' etc and nothing else." | |
| ) | |
| try: | |
| response = model.generate_content(full_prompt) | |
| if not response or not response.text: | |
| return None | |
| sections = response.text.split("[break]") | |
| sections = [s.strip() for s in sections if s.strip()] | |
| if len(sections) < 5: | |
| sections += ["(Placeholder section)"] * (5 - len(sections)) | |
| elif len(sections) > 5: | |
| sections = sections[:5] | |
| return "[break]".join(sections) | |
| except Exception as e: | |
| st.error(f"Error generating story from dataframe: {e}") | |
| return None | |
| # ββββββββββ | |
| # Extract Image Prompts and Story Sections | |
| # ββββββββββ | |
| def extract_image_prompts_and_story(story_text): | |
| pages = [] | |
| image_prompts = [] | |
| parts = re.split(r"\[break\]", story_text) | |
| for part in parts: | |
| if not part.strip(): | |
| continue | |
| img_match = re.search(r"<(.*?)>", part) | |
| if img_match: | |
| image_prompts.append(img_match.group(1).strip()) | |
| pages.append(re.sub(r"<(.*?)>", "", part).strip()) | |
| else: | |
| snippet = part.strip()[:100] | |
| pages.append(snippet) | |
| image_prompts.append(f"A concise illustration of {snippet}") | |
| return pages, image_prompts | |
| def is_valid_png(file_path): | |
| try: | |
| with open(file_path, "rb") as f: | |
| header = f.read(8) | |
| if header != b'\x89PNG\r\n\x1a\n': | |
| return False | |
| with Image.open(file_path) as img: | |
| img.verify() | |
| return True | |
| except Exception as e: | |
| print(f"Invalid PNG file at {file_path}: {e}") | |
| return False | |
| def standardize_and_validate_image(file_path): | |
| try: | |
| with Image.open(file_path) as img: | |
| img.verify() | |
| with Image.open(file_path) as img: | |
| img = img.convert("RGB") | |
| buffer = io.BytesIO() | |
| img.save(buffer, format="PNG") | |
| buffer.seek(0) | |
| with open(file_path, "wb") as f: | |
| f.write(buffer.getvalue()) | |
| return True | |
| except Exception as e: | |
| print(f"Failed to standardize/validate {file_path}: {e}") | |
| return False | |
| def generate_image(prompt_text, style, model="hf"): | |
| try: | |
| if model == "pollinations_turbo": | |
| prompt_encoded = urllib.parse.quote(prompt_text) | |
| api_url = f"https://image.pollinations.ai/prompt/{prompt_encoded}?model=turbo" | |
| response = requests.get(api_url) | |
| if response.status_code != 200: | |
| logger.error(f"Pollinations API error: {response.status_code}, {response.text}") | |
| return None, None | |
| image_bytes = response.content | |
| elif model == "gemini": | |
| try: | |
| g_api_key = os.getenv("GEMINI") | |
| if not g_api_key: | |
| st.error("Google Gemini API key is missing.") | |
| return None, None | |
| client = genai.Client(api_key=g_api_key) | |
| enhanced_prompt = f"image of {prompt_text} in {style} style, high quality, detailed illustration" | |
| response = client.models.generate_content( | |
| model="models/gemini-2.0-flash-exp", # MODEL REVERTED | |
| contents=enhanced_prompt, | |
| config=types.GenerateContentConfig(response_modalities=['Text', 'Image']) | |
| ) | |
| for part in response.candidates[0].content.parts: | |
| if part.inline_data is not None: | |
| image = Image.open(BytesIO(part.inline_data.data)) | |
| buffered = io.BytesIO() | |
| image.save(buffered, format="JPEG") | |
| img_str = base64.b64encode(buffered.getvalue()).decode() | |
| return image, img_str | |
| logger.error("No image was found in the Gemini API response") | |
| return None, None | |
| except Exception as e: | |
| logger.error(f"Gemini API error: {str(e)}") | |
| return None, None | |
| else: | |
| enhanced_prompt = f"{prompt_text} in {style} style, high quality, detailed illustration" | |
| model_id = "black-forest-labs/FLUX.1-dev" | |
| api_url = f"https://api-inference.huggingface.co/models/{model_id}" | |
| payload = {"inputs": enhanced_prompt} | |
| response = requests.post(api_url, headers=headers, json=payload) | |
| if response.status_code != 200: | |
| logger.error(f"Hugging Face API error: {response.status_code}, {response.text}") | |
| return None, None | |
| image_bytes = response.content | |
| if model != "gemini": | |
| image = Image.open(io.BytesIO(image_bytes)) | |
| buffered = io.BytesIO() | |
| image.save(buffered, format="JPEG") | |
| img_str = base64.b64encode(buffered.getvalue()).decode() | |
| return image, img_str | |
| except Exception as e: | |
| logger.error(f"Image generation error: {str(e)}") | |
| return Image.new('RGB', (1024, 1024), color=(200,200,200)), None | |
| def generate_image_with_retry(prompt_text, style, model="hf", max_retries=3): | |
| for attempt in range(max_retries): | |
| try: | |
| if attempt > 0: | |
| time.sleep(2 ** attempt) | |
| return generate_image(prompt_text, style, model=model) | |
| except Exception as e: | |
| logger.error(f"Attempt {attempt+1} failed: {e}") | |
| if attempt == max_retries - 1: | |
| raise | |
| return None, None | |
| # ββββββββββ | |
| # Video Creation Functions | |
| # ββββββββββ | |
| def create_silent_video(images, durations, output_path, logo_path="sozo_logo2.png", font_path="lazy_dog.ttf"): | |
| try: | |
| height, width = 720, 1280 | |
| fps = 24 | |
| fourcc = cv2.VideoWriter_fourcc(*'mp4v') | |
| video = cv2.VideoWriter(output_path, fourcc, fps, (width, height)) | |
| if not video.isOpened(): | |
| st.error("Failed to create video file.") | |
| return None | |
| font = None | |
| try: | |
| font_size = 45 | |
| font = ImageFont.truetype(font_path, font_size) | |
| except IOError: | |
| st.warning(f"Font file not found at '{font_path}'. The text overlay will be skipped.") | |
| logo = None | |
| if logo_path: | |
| logo_img = cv2.imread(logo_path) | |
| if logo_img is not None: | |
| logo = cv2.resize(logo_img, (width, height)) | |
| else: | |
| st.warning(f"Failed to load logo from {logo_path}.") | |
| for img, duration in zip(images, durations): | |
| try: | |
| img = img.convert("RGB") | |
| img_resized = img.resize((width, height)) | |
| frame = np.array(img_resized) | |
| except Exception as e: | |
| print(f"Invalid image detected, replacing with logo: {e}") | |
| frame = logo if logo is not None else np.zeros((height, width, 3), dtype=np.uint8) | |
| # Only add text overlay if font was loaded successfully | |
| if font: | |
| pil_img = Image.fromarray(frame) | |
| draw = ImageDraw.Draw(pil_img) | |
| text1 = "Made With" | |
| text2 = "Sozo Business Studio" | |
| bbox = draw.textbbox((0, 0), text1, font=font) | |
| text1_height = bbox[3] - bbox[1] | |
| text_position1 = (width - 270, height - 120) | |
| text_position2 = (width - 430, height - 120 + text1_height + 5) | |
| draw.text(text_position1, text1, font=font, fill=(81, 34, 97, 255)) | |
| draw.text(text_position2, text2, font=font, fill=(81, 34, 97, 255)) | |
| frame = np.array(pil_img) | |
| frame_cv = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR) | |
| for _ in range(int(duration * fps)): | |
| video.write(frame_cv) | |
| if logo is not None: | |
| for _ in range(int(3 * fps)): | |
| video.write(logo) | |
| video.release() | |
| return output_path | |
| except Exception as e: | |
| st.error(f"Error creating silent video: {e}") | |
| return None | |
| def combine_video_audio(video_path, audio_files, output_path=None): | |
| try: | |
| if output_path is None: | |
| output_path = f"final_video_{uuid.uuid4()}.mp4" | |
| temp_audio_file = tempfile.NamedTemporaryFile(delete=False, suffix=".mp3") | |
| temp_audio_file.close() | |
| if len(audio_files) > 1: | |
| concat_list_path = tempfile.NamedTemporaryFile(delete=False, suffix=".txt") | |
| with open(concat_list_path.name, 'w') as f: | |
| for af in audio_files: | |
| f.write(f"file '{os.path.abspath(af)}'\n") | |
| concat_cmd = [ | |
| 'ffmpeg', '-y', '-f', 'concat', '-safe', '0', | |
| '-i', concat_list_path.name, '-c', 'copy', temp_audio_file.name | |
| ] | |
| subprocess.run(concat_cmd, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) | |
| os.unlink(concat_list_path.name) | |
| combined_audio = temp_audio_file.name | |
| else: | |
| combined_audio = audio_files[0] if audio_files else None | |
| if not combined_audio: | |
| return video_path | |
| combine_cmd = [ | |
| 'ffmpeg', '-y', '-i', video_path, '-i', combined_audio, | |
| '-map', '0:v', '-map', '1:a', '-c:v', 'libx264', | |
| '-crf', '23', '-c:a', 'aac', '-shortest', output_path | |
| ] | |
| subprocess.run(combine_cmd, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) | |
| if combined_audio == temp_audio_file.name: | |
| os.unlink(temp_audio_file.name) | |
| return output_path | |
| except (subprocess.CalledProcessError, Exception) as e: | |
| st.error(f"Error combining video and audio: {e}") | |
| return video_path | |
| def create_video(images, audio_files, output_path=None): | |
| try: | |
| subprocess.run(['ffmpeg', '-version'], check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) | |
| except (FileNotFoundError, subprocess.CalledProcessError): | |
| st.error("ffmpeg not found. It must be installed and in your system's PATH to create videos.") | |
| return None | |
| if output_path is None: | |
| output_path = f"output_video_{uuid.uuid4()}.mp4" | |
| silent_video_path = f"silent_{uuid.uuid4()}.mp4" | |
| durations = [get_audio_duration(af) if af else 5.0 for af in audio_files] | |
| if len(durations) < len(images): | |
| durations.extend([5.0]*(len(images)-len(durations))) | |
| silent_video = create_silent_video(images, durations, silent_video_path) | |
| if not silent_video: | |
| return None | |
| final_video = combine_video_audio(silent_video, audio_files, output_path) | |
| try: | |
| os.unlink(silent_video_path) | |
| except Exception: | |
| pass | |
| return final_video | |
| # ββββββββββ | |
| # Audio Generation Function | |
| # ββββββββββ | |
| def generate_audio(text, voice_model, audio_model="deepgram"): | |
| if audio_model == "deepgram": | |
| deepgram_api_key = os.getenv("DeepGram") | |
| if not deepgram_api_key: | |
| st.error("Deepgram API Key is missing.") | |
| return None | |
| headers_tts = { | |
| "Authorization": f"Token {deepgram_api_key}", | |
| "Content-Type": "text/plain" | |
| } | |
| url = f"https://api.deepgram.com/v1/speak?model={voice_model}" | |
| response = requests.post(url, headers=headers_tts, data=text) | |
| if response.status_code == 200: | |
| temp_file = tempfile.NamedTemporaryFile(delete=False, suffix=".mp3") | |
| temp_file.write(response.content) | |
| temp_file.close() | |
| return temp_file.name | |
| else: | |
| st.error(f"DeepGram TTS error: {response.status_code}") | |
| return None | |
| elif audio_model == "openai-audio": | |
| encoded_text = urllib.parse.quote(text) | |
| url = f"https://text.pollinations.ai/{encoded_text}?model=openai-audio&voice={voice_model}" | |
| response = requests.get(url) | |
| if response.status_code == 200: | |
| temp_file = tempfile.NamedTemporaryFile(delete=False, suffix=".mp3") | |
| temp_file.write(response.content) | |
| temp_file.close() | |
| return temp_file.name | |
| else: | |
| st.error(f"OpenAI Audio TTS error: {response.status_code}") | |
| return None | |
| else: | |
| st.error("Unsupported audio model selected.") | |
| return None | |
| def get_audio_duration(audio_file): | |
| try: | |
| cmd = ['ffprobe', '-v', 'error', '-show_entries', 'format=duration', | |
| '-of', 'default=noprint_wrappers=1:nokey=1', audio_file] | |
| result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, check=True) | |
| return float(result.stdout.strip()) | |
| except (FileNotFoundError, subprocess.CalledProcessError, ValueError): | |
| return 5.0 | |
| # ββββββββββ | |
| # Unified Process-Story Function | |
| # ββββββββββ | |
| def process_generated_story(style, voice_model, audio_model_param): | |
| pages, image_prompts = extract_image_prompts_and_story(st.session_state.full_story) | |
| st.session_state.story_pages = pages | |
| st.session_state.image_descriptions = image_prompts | |
| st.session_state.generated_images = [] | |
| st.session_state.story_audio = [] | |
| progress_bar = st.progress(0) | |
| total_steps = len(pages) * 2 # 1 for image, 1 for audio | |
| current_step = 0 | |
| for i, (page, img_prompt) in enumerate(zip(pages, image_prompts)): | |
| with st.spinner(f"Generating image {i+1}/{len(pages)}..."): | |
| img = None | |
| try: | |
| chart_response = generateResponse("Generate this visualization: " + img_prompt, st.session_state.dataframe) | |
| if isinstance(chart_response, dict) and chart_response.get("type") == "plot": | |
| img_path = chart_response["value"] | |
| if isinstance(img_path, str) and os.path.isfile(img_path) and is_valid_png(img_path) and standardize_and_validate_image(img_path): | |
| img = Image.open(img_path) | |
| else: | |
| img, _ = generate_image_with_retry(img_prompt, style) | |
| else: | |
| img, _ = generate_image_with_retry(img_prompt, style) | |
| except Exception as e: | |
| st.warning(f"Chart generation failed for section {i+1}: {e}. Using default image.") | |
| img, _ = generate_image_with_retry(img_prompt, style) | |
| img = img if img else Image.new('RGB', (1024, 1024), color=(200, 200, 200)) | |
| st.session_state.generated_images.append(img.convert('RGB')) | |
| current_step += 1 | |
| progress_bar.progress(current_step / total_steps) | |
| for i, page in enumerate(pages): | |
| with st.spinner(f"Generating audio {i+1}/{len(pages)}..."): | |
| audio = generate_audio(page, voice_model, audio_model=audio_model_param) | |
| st.session_state.story_audio.append(audio) | |
| current_step += 1 | |
| progress_bar.progress(current_step / total_steps) | |
| if st.session_state.generated_images: | |
| with st.spinner("Assembling video..."): | |
| audio_paths = [af for af in st.session_state.story_audio if af] | |
| if audio_paths: | |
| st.session_state.final_video_path = create_video(st.session_state.generated_images, audio_paths) | |
| else: | |
| silent_path = f"silent_video_{uuid.uuid4()}.mp4" | |
| durations = [5.0] * len(st.session_state.generated_images) | |
| st.session_state.final_video_path = create_silent_video(st.session_state.generated_images, durations, silent_path) | |
| progress_bar.empty() | |
| # ββββββββββ | |
| # Display Generated Content | |
| # ββββββββββ | |
| def display_generated_content(): | |
| st.subheader("Generated Narrative Video") | |
| tab1, tab2, tab3 = st.tabs(["Video Output", "Story Pages", "Full Script"]) | |
| with tab1: | |
| if st.session_state.final_video_path and os.path.exists(st.session_state.final_video_path): | |
| with open(st.session_state.final_video_path, "rb") as f: | |
| video_bytes = f.read() | |
| st.video(video_bytes) | |
| st.download_button("Download Video", data=video_bytes, file_name="sozo_business_narrative.mp4", mime="video/mp4") | |
| share_message = "Check out this AI-generated business narrative video!" | |
| whatsapp_link = f"https://api.whatsapp.com/send?text={urllib.parse.quote(share_message)}" | |
| st.markdown(f"[Share on WhatsApp]({whatsapp_link})", unsafe_allow_html=True) | |
| else: | |
| st.error("Video file not found or not readable.") | |
| with tab2: | |
| for i, (page, img) in enumerate(zip(st.session_state.story_pages, st.session_state.generated_images)): | |
| st.image(img, caption=f"Scene {i+1}") | |
| st.markdown(f"**Narration {i+1}**: {page}") | |
| if i < len(st.session_state.story_audio) and st.session_state.story_audio[i]: | |
| st.audio(st.session_state.story_audio[i]) | |
| with tab3: | |
| st.text_area("Complete Narrative Script", st.session_state.full_story, height=400) | |
| # ββββββββββ | |
| # Streamlit App Configuration and Sidebar | |
| # ββββββββββ | |
| st.set_page_config(page_title="Sozo Business Studio", page_icon="πΌ", layout="wide", initial_sidebar_state="expanded") | |
| for key in ["story_pages", "image_descriptions", "generated_images", "story_audio", "full_story", "final_video_path", "dataframe"]: | |
| if key not in st.session_state: | |
| st.session_state[key] = [] if 'pages' in key or 'images' in key or 'audio' in key else None | |
| with st.sidebar: | |
| st.subheader("Sozo Business Studio") | |
| story_types = { | |
| "business": "Business Narrative", | |
| "education": "Educational", | |
| "entertainment": "Entertaining", | |
| "free_form": "Free Form (AI's choice)", | |
| "children": "Children's Story", | |
| } | |
| selected_story_type = st.selectbox( | |
| "Narrative Style", | |
| options=list(story_types.keys()), | |
| format_func=lambda x: story_types[x], | |
| key="story_type_select" | |
| ) | |
| model_options = ["HuggingFace Flux", "Pollinations Turbo", "Google Gemini"] | |
| selected_model_name = st.selectbox("Select Image Generation Model", model_options, index=0, key="image_model_select") | |
| style_options = ["photorealistic", "cinematic", "cartoon", "concept art", "oil painting", "fantasy illustration", "whimsical"] | |
| selected_style = st.selectbox("Image Style", style_options, key="style_select") | |
| model_param = {"HuggingFace Flux": "hf", "Pollinations Turbo": "pollinations_turbo", "Google Gemini": "gemini"}[selected_model_name] | |
| audio_model_options = ["DeepGram", "Pollinations OpenAI-Audio"] | |
| selected_audio_model = st.selectbox("Select Audio Generation Model", audio_model_options, key="audio_model_select") | |
| if selected_audio_model == "DeepGram": | |
| voice_options = {"aura-asteria-en": "Female", "aura-helios-en": "Male"} | |
| selected_voice = st.selectbox("Voice Model", options=list(voice_options.keys()), format_func=voice_options.get, key="voice_select_deepgram") | |
| audio_model_param = "deepgram" | |
| else: | |
| voice_options = {"sage": "Female", "echo": "Male"} | |
| selected_voice = st.selectbox("Voice Model", options=list(voice_options.keys()), format_func=voice_options.get, key="voice_select_pollinations") | |
| audio_model_param = "openai-audio" | |
| st.markdown("### Tips for Best Results") | |
| st.markdown("- Ensure your data has clear column headers.\n- Use the 'Business Narrative' style for professional reports.\n- Try different image styles and voices to match your brand.") | |
| if st.button("Check System Requirements"): | |
| try: | |
| result = subprocess.run(['ffmpeg', '-version'], check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) | |
| st.success("β ffmpeg is installed.") | |
| except (FileNotFoundError, subprocess.CalledProcessError): | |
| st.error("β ffmpeg not found. It must be installed to create videos.") | |
| # β MAIN PAGE β | |
| st.subheader("Sozo Business Studio") | |
| st.markdown("#### Turn business data into compelling narratives.") | |
| st.markdown("---") | |
| st.markdown("### 1. Upload Your Business Data") | |
| uploaded_file = st.file_uploader( | |
| "Upload a CSV or Excel file to begin.", | |
| type=['csv', 'xlsx', 'xls'], | |
| label_visibility="collapsed" | |
| ) | |
| if uploaded_file: | |
| try: | |
| df = pd.read_excel(uploaded_file) if uploaded_file.name.endswith(('xlsx', 'xls')) else pd.read_csv(uploaded_file) | |
| st.session_state.dataframe = df | |
| st.success(f"β Loaded `{uploaded_file.name}`. Data preview:") | |
| st.dataframe(df.head()) | |
| except Exception as e: | |
| st.error(f"Error processing {uploaded_file.name}: {e}") | |
| st.session_state.dataframe = None | |
| st.markdown("### 2. Generate Your Video") | |
| if st.button("Generate Video Narrative", disabled=st.session_state.dataframe is None): | |
| with st.spinner("Analyzing data and generating narrative script..."): | |
| st.session_state.full_story = generate_story_from_dataframe(st.session_state.dataframe, selected_story_type) | |
| if st.session_state.full_story: | |
| st.success("Script generated! Now creating video assets...") | |
| process_generated_story(selected_style, selected_voice, audio_model_param) | |
| else: | |
| st.error("Failed to generate narrative script. The data might be formatted incorrectly or the AI model could be temporarily unavailable.") | |
| if st.session_state.story_pages: | |
| st.markdown("---") | |
| display_generated_content() |