############################################################################### # Sozo Business Studio · AI transforms business data into compelling narratives ############################################################################### import os, re, json, hashlib, uuid, base64, io, tempfile, wave, requests, subprocess from pathlib import Path import streamlit as st import pandas as pd import numpy as np import matplotlib matplotlib.use("Agg") import matplotlib.pyplot as plt from fpdf import FPDF, HTMLMixin from markdown_it import MarkdownIt from PIL import Image from langchain_experimental.agents import create_pandas_dataframe_agent from langchain_google_genai import ChatGoogleGenerativeAI from google import genai import cv2 # Added for video processing # ───────────────────────────────────────────────────────────────────────────── # CONFIG & CONSTANTS # ───────────────────────────────────────────────────────────────────────────── st.set_page_config(page_title="Sozo Business Studio", layout="wide") st.title("📊 Sozo Business Studio") st.caption("AI transforms business data into compelling narratives.") # --- Feature Caps --- MAX_CHARTS = 5 VIDEO_SCENES = 5 # Number of scenes for the video # --- API Keys & Clients (Correct Initialization) --- API_KEY = os.getenv("GEMINI_API_KEY") if not API_KEY: st.error("⚠️ GEMINI_API_KEY is not set."); st.stop() # Use the Client pattern from the original script GEM = genai.Client(api_key=API_KEY) DG_KEY = os.getenv("DEEPGRAM_API_KEY") # Optional but needed for narration # --- Session State --- # Simplified state to hold the most recent generated output st.session_state.setdefault("bundle", None) # ───────────────────────────────────────────────────────────────────────────── # HELPERS # ───────────────────────────────────────────────────────────────────────────── sha1_bytes = lambda b: hashlib.sha1(b).hexdigest() def validate_file_upload(f): errs=[] if f is None: errs.append("No file uploaded") elif f.size==0: errs.append("File is empty") elif f.size>50*1024*1024: errs.append("File >50 MB") if f and Path(f.name).suffix.lower() not in (".csv",".xlsx",".xls"): errs.append("Unsupported file type") return errs def load_dataframe_safely(buf:bytes, name:str): try: ext = Path(name).suffix.lower() df = pd.read_excel(io.BytesIO(buf)) if ext in (".xlsx", ".xls") else pd.read_csv(io.BytesIO(buf)) if df.empty or len(df.columns)==0: raise ValueError("File contains no data") df.columns=df.columns.astype(str).str.strip() df=df.dropna(how="all") if df.empty: raise ValueError("Rows all empty") return df,None except Exception as e: return None,str(e) def fix_bullet(t:str)->str: return re.sub(r"[\x80-\x9f]", "", t) if isinstance(t, str) else "" # ——— Arrow helpers ———————————————————————————————————————————————— def arrow_df(df:pd.DataFrame)->pd.DataFrame: safe=df.copy() for c in safe.columns: if safe[c].dtype.name in ("Int64","Float64","Boolean"): safe[c]=safe[c].astype(safe[c].dtype.name.lower()) return safe # ——— Text-to-Speech (Used by Both Features) ———————————————————————— @st.cache_data(show_spinner=False) def deepgram_tts(text:str): if not DG_KEY or not text: return None, None text = re.sub(r"[^\w\s.,!?;:-]", "", text)[:1000] try: r = requests.post("https://api.deepgram.com/v1/speak", params={"model":"aura-asteria-en"}, headers={"Authorization":f"Token {DG_KEY}", "Content-Type":"application/json"}, json={"text":text}, timeout=30) r.raise_for_status() return r.content, r.headers.get("Content-Type", "audio/mpeg") except Exception: return None, None def pcm_to_wav(pcm,sr=24000,ch=1,w=2): buf=io.BytesIO() with wave.open(buf,'wb') as wf: wf.setnchannels(ch); wf.setsampwidth(w); wf.setframerate(sr); wf.writeframes(pcm) buf.seek(0); return buf.getvalue() # ——— Chart & Tag Helpers ——————————————————————————————————————————— TAG_RE = re.compile(r'[<\[]\s*generate_?chart\s*[:=]?\s*["\']?(?P[^>\]\'"”’]+?)["\']?\s*[>\]]', re.I) extract_chart_tags = lambda t: list(dict.fromkeys(m.group("d").strip() for m in TAG_RE.finditer(t or ""))) def repl_tags(txt:str,mp:dict,str_fn): return TAG_RE.sub(lambda m: str_fn(mp[m.group("d").strip()]) if m.group("d").strip() in mp else m.group(0), txt) # ───────────────────────────────────────────────────────────────────────────── # FEATURE 1: REPORT GENERATION # ───────────────────────────────────────────────────────────────────────────── class PDF(FPDF,HTMLMixin): pass def build_pdf(md, charts): md = fix_bullet(md).replace("•", "*") md = repl_tags(md, charts, lambda p: f'') html = MarkdownIt("commonmark", {"breaks":True}).enable("table").render(md) pdf = PDF(); pdf.set_auto_page_break(True, margin=15) pdf.add_page() pdf.set_font("Arial", "B", 18) pdf.cell(0, 12, "AI-Generated Business Report", ln=True); pdf.ln(3) pdf.set_font("Arial", "", 11) pdf.write_html(html) return bytes(pdf.output(dest="S")) def generate_report_assets(key, buf, name, ctx): df, err = load_dataframe_safely(buf, name) if err: st.error(err); return None llm = ChatGoogleGenerativeAI(model="gemini-1.5-flash", google_api_key=API_KEY, temperature=0.1) ctx_dict = {"shape": df.shape, "columns": list(df.columns), "user_ctx": ctx or "General business analysis"} report_md = llm.invoke(f"""You are a senior business analyst. Write an executive-level Markdown report with insights & recommendations. Use chart tags like where helpful. Data Context: {json.dumps(ctx_dict, indent=2)}""").content chart_descs = extract_chart_tags(report_md)[:MAX_CHARTS] chart_paths = {} if chart_descs: ag = create_pandas_dataframe_agent(llm=llm, df=df, verbose=False, allow_dangerous_code=True) for d in chart_descs: with st.spinner(f"Generating chart: {d}"): with plt.ioff(): try: ag.run(f"Create a {d} with Matplotlib and save.") fig = plt.gcf() if fig.axes: p = Path(tempfile.gettempdir()) / f"{uuid.uuid4()}.png" fig.savefig(p, dpi=300, bbox_inches="tight", facecolor="white") chart_paths[d] = str(p) plt.close("all") except: plt.close("all") md = fix_bullet(report_md) pdf = build_pdf(md, chart_paths) preview = repl_tags(md, chart_paths, lambda p: f'') return {"type": "report", "preview": preview, "pdf": pdf, "report_md": md, "key": key} # ───────────────────────────────────────────────────────────────────────────── # FEATURE 2: VIDEO GENERATION # ───────────────────────────────────────────────────────────────────────────── def generate_image_from_prompt(prompt, style): """Generates an illustrative image using the Gemini Client.""" try: full_prompt = f"A professional, clean, illustrative image for a business presentation: {prompt}, in the style of {style}." # Use the globally defined GEM client, as per the original script's pattern response = GEM.generate_content( contents=full_prompt, model="models/gemini-1.5-flash-latest", generation_config={"response_mime_type": "image/png"} ) img_bytes = response.parts[0].blob.data return Image.open(io.BytesIO(img_bytes)).convert("RGB") except Exception as e: st.warning(f"Illustrative image generation failed: {e}. Using placeholder.") return Image.new('RGB', (1024, 768), color = (230, 230, 230)) def create_silent_video(images, durations, output_path): width, height = 1280, 720 fourcc = cv2.VideoWriter_fourcc(*'mp4v') video = cv2.VideoWriter(output_path, fourcc, 24, (width, height)) for img, duration in zip(images, durations): # Resize image and convert to BGR for OpenCV frame = np.array(img.resize((width, height))) frame_bgr = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR) for _ in range(int(duration * 24)): # 24 fps video.write(frame_bgr) video.release() return output_path def combine_video_audio(video_path, audio_paths, output_path): concat_list_path = Path(tempfile.gettempdir()) / f"{uuid.uuid4()}.txt" with open(concat_list_path, 'w') as f: for af in audio_paths: f.write(f"file '{Path(af).resolve()}'\n") concat_audio_path = Path(tempfile.gettempdir()) / f"{uuid.uuid4()}.mp3" subprocess.run(['ffmpeg', '-y', '-f', 'concat', '-safe', '0', '-i', str(concat_list_path), '-c', 'copy', str(concat_audio_path)], check=True, capture_output=True) subprocess.run(['ffmpeg', '-y', '-i', video_path, '-i', str(concat_audio_path), '-c:v', 'copy', '-c:a', 'aac', '-shortest', output_path], check=True, capture_output=True) concat_list_path.unlink(missing_ok=True) concat_audio_path.unlink(missing_ok=True) return output_path def get_audio_duration(audio_file): try: result = subprocess.run(['ffprobe', '-v', 'error', '-show_entries', 'format=duration', '-of', 'default=noprint_wrappers=1:nokey=1', audio_file], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, check=True) return float(result.stdout.strip()) except Exception: return 5.0 # Default duration def generate_video_assets(key, buf, name, ctx, style): try: subprocess.run(['ffmpeg', '-version'], check=True, capture_output=True) except (FileNotFoundError, subprocess.CalledProcessError): st.error("🔴 FFmpeg is not installed or not in your system's PATH. Video generation is not possible.") return None df, err = load_dataframe_safely(buf, name) if err: st.error(err); return None llm = ChatGoogleGenerativeAI(model="gemini-1.5-flash", google_api_key=API_KEY, temperature=0.2) ctx_dict = {"shape": df.shape, "columns": list(df.columns), "user_ctx": ctx or "General business analysis"} story_prompt = f"""Create a script for a short business video with exactly {VIDEO_SCENES} scenes. For each scene: 1. Write a concise narration (1-2 sentences). 2. If the data can be visualized for this scene, add a chart tag like . 3. Separate each scene with the marker `[SCENE_BREAK]`. Data Context: {json.dumps(ctx_dict, indent=2)}""" with st.spinner("Generating video script..."): full_script = llm.invoke(story_prompt).content scenes = [s.strip() for s in full_script.split("[SCENE_BREAK]")] visuals, audio_paths, temp_files = [], [], [] try: ag = create_pandas_dataframe_agent(llm=llm, df=df, verbose=False, allow_dangerous_code=True) for i, scene_text in enumerate(scenes[:VIDEO_SCENES]): progress = (i + 1) / VIDEO_SCENES st.progress(progress, text=f"Processing Scene {i+1}/{VIDEO_SCENES}...") chart_descs = extract_chart_tags(scene_text) narrative = repl_tags(scene_text, {}, lambda _: "").strip() if narrative: # Only process scenes with text # 1. Generate Visual if chart_descs: with plt.ioff(): try: ag.run(f"Create a {chart_descs[0]} with Matplotlib and save.") fig = plt.gcf() if fig.axes: p = Path(tempfile.gettempdir()) / f"{uuid.uuid4()}.png" fig.savefig(p, dpi=200, bbox_inches="tight", facecolor="white") visuals.append(Image.open(p).convert("RGB")) temp_files.append(p) else: raise ValueError("No chart produced") except Exception: visuals.append(generate_image_from_prompt(narrative, style)) finally: plt.close("all") else: visuals.append(generate_image_from_prompt(narrative, style)) # 2. Generate Audio audio_content, _ = deepgram_tts(narrative) if audio_content: audio_path = Path(tempfile.gettempdir()) / f"{uuid.uuid4()}.mp3" audio_path.write_bytes(audio_content) audio_paths.append(str(audio_path)) temp_files.append(audio_path) if not visuals or not audio_paths: st.error("Could not generate any scenes for the video. Please try a different context or file.") return None st.progress(1.0, text="Assembling video...") durations = [get_audio_duration(ap) for ap in audio_paths] silent_video_path = str(Path(tempfile.gettempdir()) / f"{uuid.uuid4()}.mp4") final_video_path = str(Path(tempfile.gettempdir()) / f"{key}.mp4") create_silent_video(visuals, durations, silent_video_path) temp_files.append(Path(silent_video_path)) combine_video_audio(silent_video_path, audio_paths, final_video_path) return {"type": "video", "video_path": final_video_path, "key": key} finally: for f in temp_files: f.unlink(missing_ok=True) # Cleanup all temp files # ───────────────────────────────────────────────────────────────────────────── # UI & MAIN WORKFLOW # ───────────────────────────────────────────────────────────────────────────── mode = st.radio("Select Output Format:", ["Report (PDF)", "Video Narrative"], horizontal=True) # --- Conditional UI --- video_style = "professional illustration" if mode == "Video Narrative": with st.sidebar: st.subheader("🎬 Video Options") video_style = st.selectbox("Visual Style", ["professional illustration", "minimalist infographic", "photorealistic", "cinematic", "data visualization aesthetic"]) st.info("The AI will generate charts from your data where possible, and illustrative images for other scenes.") # --- Common UI --- upl = st.file_uploader("Upload CSV or Excel", type=["csv", "xlsx", "xls"]) if upl: df_prev, _ = load_dataframe_safely(upl.getvalue(), upl.name) with st.expander("📊 Data Preview"): st.dataframe(arrow_df(df_prev.head())) ctx = st.text_area("Business context or specific instructions (optional)") if st.button("🚀 Generate", type="primary"): if not upl: st.warning("Please upload a file first.") st.stop() bkey = sha1_bytes(b"".join([upl.getvalue(), mode.encode(), ctx.encode(), video_style.encode()])) if mode == "Report (PDF)": with st.spinner("Generating report and charts..."): bundle = generate_report_assets(bkey, upl.getvalue(), upl.name, ctx) else: # Video Narrative bundle = generate_video_assets(bkey, upl.getvalue(), upl.name, ctx, video_style) st.session_state.bundle = bundle st.rerun() # --- Display Area (handles state correctly after rerun) --- if "bundle" in st.session_state and st.session_state.bundle: bundle = st.session_state.bundle if bundle.get("type") == "report": st.subheader("📄 Generated Report") with st.expander("View Report", expanded=True): if bundle["preview"]: st.markdown(bundle["preview"], unsafe_allow_html=True) c1, c2 = st.columns(2) with c1: st.download_button("Download PDF", bundle["pdf"], "business_report.pdf", "application/pdf", use_container_width=True) with c2: if DG_KEY and st.button("🔊 Narrate Summary", use_container_width=True): report_text = re.sub(r'<[^>]+>', '', bundle["report_md"]) # Basic HTML strip audio, mime = deepgram_tts(report_text) if audio: st.audio(audio, format=mime) else: st.error("Narration failed.") else: st.warning("No report content was generated.") elif bundle.get("type") == "video": st.subheader("🎬 Generated Video Narrative") video_path = bundle.get("video_path") if video_path and Path(video_path).exists(): with open(video_path, "rb") as f: st.video(f.read()) with open(video_path, "rb") as f: st.download_button("Download Video", f, f"sozo_narrative_{bundle['key'][:8]}.mp4", "video/mp4") else: st.error("Video file could not be found or generation failed.")