Spaces:
Sleeping
Sleeping
| ############################################################################### | |
| # Sozo Business Studio Β· AI transforms business data into compelling narratives | |
| ############################################################################### | |
| import os, re, json, hashlib, uuid, base64, io, tempfile, wave, requests, subprocess | |
| from pathlib import Path | |
| import streamlit as st | |
| import pandas as pd | |
| import numpy as np | |
| import matplotlib | |
| matplotlib.use("Agg") | |
| import matplotlib.pyplot as plt | |
| from fpdf import FPDF, HTMLMixin | |
| from markdown_it import MarkdownIt | |
| from PIL import Image | |
| from langchain_experimental.agents import create_pandas_dataframe_agent | |
| from langchain_google_genai import ChatGoogleGenerativeAI | |
| from google import genai | |
| import cv2 # Added for video processing | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # CONFIG & CONSTANTS | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| st.set_page_config(page_title="Sozo Business Studio", layout="wide") | |
| st.title("π Sozo Business Studio") | |
| st.caption("AI transforms business data into compelling narratives.") | |
| # --- Feature Caps --- | |
| MAX_CHARTS = 5 | |
| VIDEO_SCENES = 5 # Number of scenes for the video | |
| # --- API Keys & Clients (Correct Initialization) --- | |
| API_KEY = os.getenv("GEMINI_API_KEY") | |
| if not API_KEY: | |
| st.error("β οΈ GEMINI_API_KEY is not set."); st.stop() | |
| # Use the Client pattern from the original script | |
| GEM = genai.Client(api_key=API_KEY) | |
| DG_KEY = os.getenv("DEEPGRAM_API_KEY") # Optional but needed for narration | |
| # --- Session State --- | |
| # Simplified state to hold the most recent generated output | |
| st.session_state.setdefault("bundle", None) | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # HELPERS | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| sha1_bytes = lambda b: hashlib.sha1(b).hexdigest() | |
| def validate_file_upload(f): | |
| errs=[] | |
| if f is None: errs.append("No file uploaded") | |
| elif f.size==0: errs.append("File is empty") | |
| elif f.size>50*1024*1024: errs.append("File >50 MB") | |
| if f and Path(f.name).suffix.lower() not in (".csv",".xlsx",".xls"): | |
| errs.append("Unsupported file type") | |
| return errs | |
| def load_dataframe_safely(buf:bytes, name:str): | |
| try: | |
| ext = Path(name).suffix.lower() | |
| df = pd.read_excel(io.BytesIO(buf)) if ext in (".xlsx", ".xls") else pd.read_csv(io.BytesIO(buf)) | |
| if df.empty or len(df.columns)==0: raise ValueError("File contains no data") | |
| df.columns=df.columns.astype(str).str.strip() | |
| df=df.dropna(how="all") | |
| if df.empty: raise ValueError("Rows all empty") | |
| return df,None | |
| except Exception as e: return None,str(e) | |
| def fix_bullet(t:str)->str: | |
| return re.sub(r"[\x80-\x9f]", "", t) if isinstance(t, str) else "" | |
| # βββ Arrow helpers ββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def arrow_df(df:pd.DataFrame)->pd.DataFrame: | |
| safe=df.copy() | |
| for c in safe.columns: | |
| if safe[c].dtype.name in ("Int64","Float64","Boolean"): | |
| safe[c]=safe[c].astype(safe[c].dtype.name.lower()) | |
| return safe | |
| # βββ Text-to-Speech (Used by Both Features) ββββββββββββββββββββββββ | |
| def deepgram_tts(text:str): | |
| if not DG_KEY or not text: return None, None | |
| text = re.sub(r"[^\w\s.,!?;:-]", "", text)[:1000] | |
| try: | |
| r = requests.post("https://api.deepgram.com/v1/speak", | |
| params={"model":"aura-asteria-en"}, | |
| headers={"Authorization":f"Token {DG_KEY}", "Content-Type":"application/json"}, | |
| json={"text":text}, timeout=30) | |
| r.raise_for_status() | |
| return r.content, r.headers.get("Content-Type", "audio/mpeg") | |
| except Exception: | |
| return None, None | |
| def pcm_to_wav(pcm,sr=24000,ch=1,w=2): | |
| buf=io.BytesIO() | |
| with wave.open(buf,'wb') as wf: | |
| wf.setnchannels(ch); wf.setsampwidth(w); wf.setframerate(sr); wf.writeframes(pcm) | |
| buf.seek(0); return buf.getvalue() | |
| # βββ Chart & Tag Helpers βββββββββββββββββββββββββββββββββββββββββββ | |
| TAG_RE = re.compile(r'[<\[]\s*generate_?chart\s*[:=]?\s*["\']?(?P<d>[^>\]\'"ββ]+?)["\']?\s*[>\]]', re.I) | |
| extract_chart_tags = lambda t: list(dict.fromkeys(m.group("d").strip() for m in TAG_RE.finditer(t or ""))) | |
| def repl_tags(txt:str,mp:dict,str_fn): | |
| return TAG_RE.sub(lambda m: str_fn(mp[m.group("d").strip()]) if m.group("d").strip() in mp else m.group(0), txt) | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # FEATURE 1: REPORT GENERATION | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class PDF(FPDF,HTMLMixin): pass | |
| def build_pdf(md, charts): | |
| md = fix_bullet(md).replace("β’", "*") | |
| md = repl_tags(md, charts, lambda p: f'<img src="{p}">') | |
| html = MarkdownIt("commonmark", {"breaks":True}).enable("table").render(md) | |
| pdf = PDF(); pdf.set_auto_page_break(True, margin=15) | |
| pdf.add_page() | |
| pdf.set_font("Arial", "B", 18) | |
| pdf.cell(0, 12, "AI-Generated Business Report", ln=True); pdf.ln(3) | |
| pdf.set_font("Arial", "", 11) | |
| pdf.write_html(html) | |
| return bytes(pdf.output(dest="S")) | |
| def generate_report_assets(key, buf, name, ctx): | |
| df, err = load_dataframe_safely(buf, name) | |
| if err: st.error(err); return None | |
| llm = ChatGoogleGenerativeAI(model="gemini-1.5-flash", google_api_key=API_KEY, temperature=0.1) | |
| ctx_dict = {"shape": df.shape, "columns": list(df.columns), "user_ctx": ctx or "General business analysis"} | |
| report_md = llm.invoke(f"""You are a senior business analyst. Write an executive-level Markdown report | |
| with insights & recommendations. Use chart tags like <generate_chart: "description"> where helpful. | |
| Data Context: {json.dumps(ctx_dict, indent=2)}""").content | |
| chart_descs = extract_chart_tags(report_md)[:MAX_CHARTS] | |
| chart_paths = {} | |
| if chart_descs: | |
| ag = create_pandas_dataframe_agent(llm=llm, df=df, verbose=False, allow_dangerous_code=True) | |
| for d in chart_descs: | |
| with st.spinner(f"Generating chart: {d}"): | |
| with plt.ioff(): | |
| try: | |
| ag.run(f"Create a {d} with Matplotlib and save.") | |
| fig = plt.gcf() | |
| if fig.axes: | |
| p = Path(tempfile.gettempdir()) / f"{uuid.uuid4()}.png" | |
| fig.savefig(p, dpi=300, bbox_inches="tight", facecolor="white") | |
| chart_paths[d] = str(p) | |
| plt.close("all") | |
| except: plt.close("all") | |
| md = fix_bullet(report_md) | |
| pdf = build_pdf(md, chart_paths) | |
| preview = repl_tags(md, chart_paths, lambda p: f'<img src="data:image/png;base64,{base64.b64encode(Path(p).read_bytes()).decode()}" style="max-width:100%;">') | |
| return {"type": "report", "preview": preview, "pdf": pdf, "report_md": md, "key": key} | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # FEATURE 2: VIDEO GENERATION | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def generate_image_from_prompt(prompt, style): | |
| """Generates an illustrative image using the Gemini Client.""" | |
| try: | |
| full_prompt = f"A professional, clean, illustrative image for a business presentation: {prompt}, in the style of {style}." | |
| # Use the globally defined GEM client, as per the original script's pattern | |
| response = GEM.generate_content( | |
| contents=full_prompt, | |
| model="models/gemini-1.5-flash-latest", | |
| generation_config={"response_mime_type": "image/png"} | |
| ) | |
| img_bytes = response.parts[0].blob.data | |
| return Image.open(io.BytesIO(img_bytes)).convert("RGB") | |
| except Exception as e: | |
| st.warning(f"Illustrative image generation failed: {e}. Using placeholder.") | |
| return Image.new('RGB', (1024, 768), color = (230, 230, 230)) | |
| def create_silent_video(images, durations, output_path): | |
| width, height = 1280, 720 | |
| fourcc = cv2.VideoWriter_fourcc(*'mp4v') | |
| video = cv2.VideoWriter(output_path, fourcc, 24, (width, height)) | |
| for img, duration in zip(images, durations): | |
| # Resize image and convert to BGR for OpenCV | |
| frame = np.array(img.resize((width, height))) | |
| frame_bgr = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR) | |
| for _ in range(int(duration * 24)): # 24 fps | |
| video.write(frame_bgr) | |
| video.release() | |
| return output_path | |
| def combine_video_audio(video_path, audio_paths, output_path): | |
| concat_list_path = Path(tempfile.gettempdir()) / f"{uuid.uuid4()}.txt" | |
| with open(concat_list_path, 'w') as f: | |
| for af in audio_paths: | |
| f.write(f"file '{Path(af).resolve()}'\n") | |
| concat_audio_path = Path(tempfile.gettempdir()) / f"{uuid.uuid4()}.mp3" | |
| subprocess.run(['ffmpeg', '-y', '-f', 'concat', '-safe', '0', '-i', str(concat_list_path), '-c', 'copy', str(concat_audio_path)], check=True, capture_output=True) | |
| subprocess.run(['ffmpeg', '-y', '-i', video_path, '-i', str(concat_audio_path), '-c:v', 'copy', '-c:a', 'aac', '-shortest', output_path], check=True, capture_output=True) | |
| concat_list_path.unlink(missing_ok=True) | |
| concat_audio_path.unlink(missing_ok=True) | |
| return output_path | |
| def get_audio_duration(audio_file): | |
| try: | |
| result = subprocess.run(['ffprobe', '-v', 'error', '-show_entries', 'format=duration', '-of', 'default=noprint_wrappers=1:nokey=1', audio_file], | |
| stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, check=True) | |
| return float(result.stdout.strip()) | |
| except Exception: | |
| return 5.0 # Default duration | |
| def generate_video_assets(key, buf, name, ctx, style): | |
| try: | |
| subprocess.run(['ffmpeg', '-version'], check=True, capture_output=True) | |
| except (FileNotFoundError, subprocess.CalledProcessError): | |
| st.error("π΄ FFmpeg is not installed or not in your system's PATH. Video generation is not possible.") | |
| return None | |
| df, err = load_dataframe_safely(buf, name) | |
| if err: st.error(err); return None | |
| llm = ChatGoogleGenerativeAI(model="gemini-1.5-flash", google_api_key=API_KEY, temperature=0.2) | |
| ctx_dict = {"shape": df.shape, "columns": list(df.columns), "user_ctx": ctx or "General business analysis"} | |
| story_prompt = f"""Create a script for a short business video with exactly {VIDEO_SCENES} scenes. | |
| For each scene: | |
| 1. Write a concise narration (1-2 sentences). | |
| 2. If the data can be visualized for this scene, add a chart tag like <generate_chart: "bar chart of sales by region">. | |
| 3. Separate each scene with the marker `[SCENE_BREAK]`. | |
| Data Context: {json.dumps(ctx_dict, indent=2)}""" | |
| with st.spinner("Generating video script..."): | |
| full_script = llm.invoke(story_prompt).content | |
| scenes = [s.strip() for s in full_script.split("[SCENE_BREAK]")] | |
| visuals, audio_paths, temp_files = [], [], [] | |
| try: | |
| ag = create_pandas_dataframe_agent(llm=llm, df=df, verbose=False, allow_dangerous_code=True) | |
| for i, scene_text in enumerate(scenes[:VIDEO_SCENES]): | |
| progress = (i + 1) / VIDEO_SCENES | |
| st.progress(progress, text=f"Processing Scene {i+1}/{VIDEO_SCENES}...") | |
| chart_descs = extract_chart_tags(scene_text) | |
| narrative = repl_tags(scene_text, {}, lambda _: "").strip() | |
| if narrative: # Only process scenes with text | |
| # 1. Generate Visual | |
| if chart_descs: | |
| with plt.ioff(): | |
| try: | |
| ag.run(f"Create a {chart_descs[0]} with Matplotlib and save.") | |
| fig = plt.gcf() | |
| if fig.axes: | |
| p = Path(tempfile.gettempdir()) / f"{uuid.uuid4()}.png" | |
| fig.savefig(p, dpi=200, bbox_inches="tight", facecolor="white") | |
| visuals.append(Image.open(p).convert("RGB")) | |
| temp_files.append(p) | |
| else: raise ValueError("No chart produced") | |
| except Exception: | |
| visuals.append(generate_image_from_prompt(narrative, style)) | |
| finally: plt.close("all") | |
| else: | |
| visuals.append(generate_image_from_prompt(narrative, style)) | |
| # 2. Generate Audio | |
| audio_content, _ = deepgram_tts(narrative) | |
| if audio_content: | |
| audio_path = Path(tempfile.gettempdir()) / f"{uuid.uuid4()}.mp3" | |
| audio_path.write_bytes(audio_content) | |
| audio_paths.append(str(audio_path)) | |
| temp_files.append(audio_path) | |
| if not visuals or not audio_paths: | |
| st.error("Could not generate any scenes for the video. Please try a different context or file.") | |
| return None | |
| st.progress(1.0, text="Assembling video...") | |
| durations = [get_audio_duration(ap) for ap in audio_paths] | |
| silent_video_path = str(Path(tempfile.gettempdir()) / f"{uuid.uuid4()}.mp4") | |
| final_video_path = str(Path(tempfile.gettempdir()) / f"{key}.mp4") | |
| create_silent_video(visuals, durations, silent_video_path) | |
| temp_files.append(Path(silent_video_path)) | |
| combine_video_audio(silent_video_path, audio_paths, final_video_path) | |
| return {"type": "video", "video_path": final_video_path, "key": key} | |
| finally: | |
| for f in temp_files: f.unlink(missing_ok=True) # Cleanup all temp files | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # UI & MAIN WORKFLOW | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| mode = st.radio("Select Output Format:", ["Report (PDF)", "Video Narrative"], horizontal=True) | |
| # --- Conditional UI --- | |
| video_style = "professional illustration" | |
| if mode == "Video Narrative": | |
| with st.sidebar: | |
| st.subheader("π¬ Video Options") | |
| video_style = st.selectbox("Visual Style", | |
| ["professional illustration", "minimalist infographic", "photorealistic", "cinematic", "data visualization aesthetic"]) | |
| st.info("The AI will generate charts from your data where possible, and illustrative images for other scenes.") | |
| # --- Common UI --- | |
| upl = st.file_uploader("Upload CSV or Excel", type=["csv", "xlsx", "xls"]) | |
| if upl: | |
| df_prev, _ = load_dataframe_safely(upl.getvalue(), upl.name) | |
| with st.expander("π Data Preview"): | |
| st.dataframe(arrow_df(df_prev.head())) | |
| ctx = st.text_area("Business context or specific instructions (optional)") | |
| if st.button("π Generate", type="primary"): | |
| if not upl: | |
| st.warning("Please upload a file first.") | |
| st.stop() | |
| bkey = sha1_bytes(b"".join([upl.getvalue(), mode.encode(), ctx.encode(), video_style.encode()])) | |
| if mode == "Report (PDF)": | |
| with st.spinner("Generating report and charts..."): | |
| bundle = generate_report_assets(bkey, upl.getvalue(), upl.name, ctx) | |
| else: # Video Narrative | |
| bundle = generate_video_assets(bkey, upl.getvalue(), upl.name, ctx, video_style) | |
| st.session_state.bundle = bundle | |
| st.rerun() | |
| # --- Display Area (handles state correctly after rerun) --- | |
| if "bundle" in st.session_state and st.session_state.bundle: | |
| bundle = st.session_state.bundle | |
| if bundle.get("type") == "report": | |
| st.subheader("π Generated Report") | |
| with st.expander("View Report", expanded=True): | |
| if bundle["preview"]: | |
| st.markdown(bundle["preview"], unsafe_allow_html=True) | |
| c1, c2 = st.columns(2) | |
| with c1: | |
| st.download_button("Download PDF", bundle["pdf"], "business_report.pdf", "application/pdf", use_container_width=True) | |
| with c2: | |
| if DG_KEY and st.button("π Narrate Summary", use_container_width=True): | |
| report_text = re.sub(r'<[^>]+>', '', bundle["report_md"]) # Basic HTML strip | |
| audio, mime = deepgram_tts(report_text) | |
| if audio: | |
| st.audio(audio, format=mime) | |
| else: | |
| st.error("Narration failed.") | |
| else: | |
| st.warning("No report content was generated.") | |
| elif bundle.get("type") == "video": | |
| st.subheader("π¬ Generated Video Narrative") | |
| video_path = bundle.get("video_path") | |
| if video_path and Path(video_path).exists(): | |
| with open(video_path, "rb") as f: | |
| st.video(f.read()) | |
| with open(video_path, "rb") as f: | |
| st.download_button("Download Video", f, f"sozo_narrative_{bundle['key'][:8]}.mp4", "video/mp4") | |
| else: | |
| st.error("Video file could not be found or generation failed.") |