Adk-Analyst2 / Demo1.py
rairo's picture
Create Demo1.py
0d73b2a verified
###############################################################################
# Sozo Business Studio Β· AI transforms business data into compelling narratives
###############################################################################
import os, re, json, hashlib, uuid, base64, io, tempfile, wave, requests, subprocess
from pathlib import Path
import streamlit as st
import pandas as pd
import numpy as np
import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt
from fpdf import FPDF, HTMLMixin
from markdown_it import MarkdownIt
from PIL import Image
from langchain_experimental.agents import create_pandas_dataframe_agent
from langchain_google_genai import ChatGoogleGenerativeAI
from google import genai
import cv2 # Added for video processing
# ─────────────────────────────────────────────────────────────────────────────
# CONFIG & CONSTANTS
# ─────────────────────────────────────────────────────────────────────────────
st.set_page_config(page_title="Sozo Business Studio", layout="wide")
st.title("πŸ“Š Sozo Business Studio")
st.caption("AI transforms business data into compelling narratives.")
# --- Feature Caps ---
MAX_CHARTS = 5
VIDEO_SCENES = 5 # Number of scenes for the video
# --- API Keys & Clients (Correct Initialization) ---
API_KEY = os.getenv("GEMINI_API_KEY")
if not API_KEY:
st.error("⚠️ GEMINI_API_KEY is not set."); st.stop()
# Use the Client pattern from the original script
GEM = genai.Client(api_key=API_KEY)
DG_KEY = os.getenv("DEEPGRAM_API_KEY") # Optional but needed for narration
# --- Session State ---
# Simplified state to hold the most recent generated output
st.session_state.setdefault("bundle", None)
# ─────────────────────────────────────────────────────────────────────────────
# HELPERS
# ─────────────────────────────────────────────────────────────────────────────
sha1_bytes = lambda b: hashlib.sha1(b).hexdigest()
def validate_file_upload(f):
errs=[]
if f is None: errs.append("No file uploaded")
elif f.size==0: errs.append("File is empty")
elif f.size>50*1024*1024: errs.append("File >50 MB")
if f and Path(f.name).suffix.lower() not in (".csv",".xlsx",".xls"):
errs.append("Unsupported file type")
return errs
def load_dataframe_safely(buf:bytes, name:str):
try:
ext = Path(name).suffix.lower()
df = pd.read_excel(io.BytesIO(buf)) if ext in (".xlsx", ".xls") else pd.read_csv(io.BytesIO(buf))
if df.empty or len(df.columns)==0: raise ValueError("File contains no data")
df.columns=df.columns.astype(str).str.strip()
df=df.dropna(how="all")
if df.empty: raise ValueError("Rows all empty")
return df,None
except Exception as e: return None,str(e)
def fix_bullet(t:str)->str:
return re.sub(r"[\x80-\x9f]", "", t) if isinstance(t, str) else ""
# β€”β€”β€” Arrow helpers β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”
def arrow_df(df:pd.DataFrame)->pd.DataFrame:
safe=df.copy()
for c in safe.columns:
if safe[c].dtype.name in ("Int64","Float64","Boolean"):
safe[c]=safe[c].astype(safe[c].dtype.name.lower())
return safe
# β€”β€”β€” Text-to-Speech (Used by Both Features) β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”
@st.cache_data(show_spinner=False)
def deepgram_tts(text:str):
if not DG_KEY or not text: return None, None
text = re.sub(r"[^\w\s.,!?;:-]", "", text)[:1000]
try:
r = requests.post("https://api.deepgram.com/v1/speak",
params={"model":"aura-asteria-en"},
headers={"Authorization":f"Token {DG_KEY}", "Content-Type":"application/json"},
json={"text":text}, timeout=30)
r.raise_for_status()
return r.content, r.headers.get("Content-Type", "audio/mpeg")
except Exception:
return None, None
def pcm_to_wav(pcm,sr=24000,ch=1,w=2):
buf=io.BytesIO()
with wave.open(buf,'wb') as wf:
wf.setnchannels(ch); wf.setsampwidth(w); wf.setframerate(sr); wf.writeframes(pcm)
buf.seek(0); return buf.getvalue()
# β€”β€”β€” Chart & Tag Helpers β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”
TAG_RE = re.compile(r'[<\[]\s*generate_?chart\s*[:=]?\s*["\']?(?P<d>[^>\]\'"”’]+?)["\']?\s*[>\]]', re.I)
extract_chart_tags = lambda t: list(dict.fromkeys(m.group("d").strip() for m in TAG_RE.finditer(t or "")))
def repl_tags(txt:str,mp:dict,str_fn):
return TAG_RE.sub(lambda m: str_fn(mp[m.group("d").strip()]) if m.group("d").strip() in mp else m.group(0), txt)
# ─────────────────────────────────────────────────────────────────────────────
# FEATURE 1: REPORT GENERATION
# ─────────────────────────────────────────────────────────────────────────────
class PDF(FPDF,HTMLMixin): pass
def build_pdf(md, charts):
md = fix_bullet(md).replace("β€’", "*")
md = repl_tags(md, charts, lambda p: f'<img src="{p}">')
html = MarkdownIt("commonmark", {"breaks":True}).enable("table").render(md)
pdf = PDF(); pdf.set_auto_page_break(True, margin=15)
pdf.add_page()
pdf.set_font("Arial", "B", 18)
pdf.cell(0, 12, "AI-Generated Business Report", ln=True); pdf.ln(3)
pdf.set_font("Arial", "", 11)
pdf.write_html(html)
return bytes(pdf.output(dest="S"))
def generate_report_assets(key, buf, name, ctx):
df, err = load_dataframe_safely(buf, name)
if err: st.error(err); return None
llm = ChatGoogleGenerativeAI(model="gemini-1.5-flash", google_api_key=API_KEY, temperature=0.1)
ctx_dict = {"shape": df.shape, "columns": list(df.columns), "user_ctx": ctx or "General business analysis"}
report_md = llm.invoke(f"""You are a senior business analyst. Write an executive-level Markdown report
with insights & recommendations. Use chart tags like <generate_chart: "description"> where helpful.
Data Context: {json.dumps(ctx_dict, indent=2)}""").content
chart_descs = extract_chart_tags(report_md)[:MAX_CHARTS]
chart_paths = {}
if chart_descs:
ag = create_pandas_dataframe_agent(llm=llm, df=df, verbose=False, allow_dangerous_code=True)
for d in chart_descs:
with st.spinner(f"Generating chart: {d}"):
with plt.ioff():
try:
ag.run(f"Create a {d} with Matplotlib and save.")
fig = plt.gcf()
if fig.axes:
p = Path(tempfile.gettempdir()) / f"{uuid.uuid4()}.png"
fig.savefig(p, dpi=300, bbox_inches="tight", facecolor="white")
chart_paths[d] = str(p)
plt.close("all")
except: plt.close("all")
md = fix_bullet(report_md)
pdf = build_pdf(md, chart_paths)
preview = repl_tags(md, chart_paths, lambda p: f'<img src="data:image/png;base64,{base64.b64encode(Path(p).read_bytes()).decode()}" style="max-width:100%;">')
return {"type": "report", "preview": preview, "pdf": pdf, "report_md": md, "key": key}
# ─────────────────────────────────────────────────────────────────────────────
# FEATURE 2: VIDEO GENERATION
# ─────────────────────────────────────────────────────────────────────────────
def generate_image_from_prompt(prompt, style):
"""Generates an illustrative image using the Gemini Client."""
try:
full_prompt = f"A professional, clean, illustrative image for a business presentation: {prompt}, in the style of {style}."
# Use the globally defined GEM client, as per the original script's pattern
response = GEM.generate_content(
contents=full_prompt,
model="models/gemini-1.5-flash-latest",
generation_config={"response_mime_type": "image/png"}
)
img_bytes = response.parts[0].blob.data
return Image.open(io.BytesIO(img_bytes)).convert("RGB")
except Exception as e:
st.warning(f"Illustrative image generation failed: {e}. Using placeholder.")
return Image.new('RGB', (1024, 768), color = (230, 230, 230))
def create_silent_video(images, durations, output_path):
width, height = 1280, 720
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
video = cv2.VideoWriter(output_path, fourcc, 24, (width, height))
for img, duration in zip(images, durations):
# Resize image and convert to BGR for OpenCV
frame = np.array(img.resize((width, height)))
frame_bgr = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR)
for _ in range(int(duration * 24)): # 24 fps
video.write(frame_bgr)
video.release()
return output_path
def combine_video_audio(video_path, audio_paths, output_path):
concat_list_path = Path(tempfile.gettempdir()) / f"{uuid.uuid4()}.txt"
with open(concat_list_path, 'w') as f:
for af in audio_paths:
f.write(f"file '{Path(af).resolve()}'\n")
concat_audio_path = Path(tempfile.gettempdir()) / f"{uuid.uuid4()}.mp3"
subprocess.run(['ffmpeg', '-y', '-f', 'concat', '-safe', '0', '-i', str(concat_list_path), '-c', 'copy', str(concat_audio_path)], check=True, capture_output=True)
subprocess.run(['ffmpeg', '-y', '-i', video_path, '-i', str(concat_audio_path), '-c:v', 'copy', '-c:a', 'aac', '-shortest', output_path], check=True, capture_output=True)
concat_list_path.unlink(missing_ok=True)
concat_audio_path.unlink(missing_ok=True)
return output_path
def get_audio_duration(audio_file):
try:
result = subprocess.run(['ffprobe', '-v', 'error', '-show_entries', 'format=duration', '-of', 'default=noprint_wrappers=1:nokey=1', audio_file],
stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, check=True)
return float(result.stdout.strip())
except Exception:
return 5.0 # Default duration
def generate_video_assets(key, buf, name, ctx, style):
try:
subprocess.run(['ffmpeg', '-version'], check=True, capture_output=True)
except (FileNotFoundError, subprocess.CalledProcessError):
st.error("πŸ”΄ FFmpeg is not installed or not in your system's PATH. Video generation is not possible.")
return None
df, err = load_dataframe_safely(buf, name)
if err: st.error(err); return None
llm = ChatGoogleGenerativeAI(model="gemini-1.5-flash", google_api_key=API_KEY, temperature=0.2)
ctx_dict = {"shape": df.shape, "columns": list(df.columns), "user_ctx": ctx or "General business analysis"}
story_prompt = f"""Create a script for a short business video with exactly {VIDEO_SCENES} scenes.
For each scene:
1. Write a concise narration (1-2 sentences).
2. If the data can be visualized for this scene, add a chart tag like <generate_chart: "bar chart of sales by region">.
3. Separate each scene with the marker `[SCENE_BREAK]`.
Data Context: {json.dumps(ctx_dict, indent=2)}"""
with st.spinner("Generating video script..."):
full_script = llm.invoke(story_prompt).content
scenes = [s.strip() for s in full_script.split("[SCENE_BREAK]")]
visuals, audio_paths, temp_files = [], [], []
try:
ag = create_pandas_dataframe_agent(llm=llm, df=df, verbose=False, allow_dangerous_code=True)
for i, scene_text in enumerate(scenes[:VIDEO_SCENES]):
progress = (i + 1) / VIDEO_SCENES
st.progress(progress, text=f"Processing Scene {i+1}/{VIDEO_SCENES}...")
chart_descs = extract_chart_tags(scene_text)
narrative = repl_tags(scene_text, {}, lambda _: "").strip()
if narrative: # Only process scenes with text
# 1. Generate Visual
if chart_descs:
with plt.ioff():
try:
ag.run(f"Create a {chart_descs[0]} with Matplotlib and save.")
fig = plt.gcf()
if fig.axes:
p = Path(tempfile.gettempdir()) / f"{uuid.uuid4()}.png"
fig.savefig(p, dpi=200, bbox_inches="tight", facecolor="white")
visuals.append(Image.open(p).convert("RGB"))
temp_files.append(p)
else: raise ValueError("No chart produced")
except Exception:
visuals.append(generate_image_from_prompt(narrative, style))
finally: plt.close("all")
else:
visuals.append(generate_image_from_prompt(narrative, style))
# 2. Generate Audio
audio_content, _ = deepgram_tts(narrative)
if audio_content:
audio_path = Path(tempfile.gettempdir()) / f"{uuid.uuid4()}.mp3"
audio_path.write_bytes(audio_content)
audio_paths.append(str(audio_path))
temp_files.append(audio_path)
if not visuals or not audio_paths:
st.error("Could not generate any scenes for the video. Please try a different context or file.")
return None
st.progress(1.0, text="Assembling video...")
durations = [get_audio_duration(ap) for ap in audio_paths]
silent_video_path = str(Path(tempfile.gettempdir()) / f"{uuid.uuid4()}.mp4")
final_video_path = str(Path(tempfile.gettempdir()) / f"{key}.mp4")
create_silent_video(visuals, durations, silent_video_path)
temp_files.append(Path(silent_video_path))
combine_video_audio(silent_video_path, audio_paths, final_video_path)
return {"type": "video", "video_path": final_video_path, "key": key}
finally:
for f in temp_files: f.unlink(missing_ok=True) # Cleanup all temp files
# ─────────────────────────────────────────────────────────────────────────────
# UI & MAIN WORKFLOW
# ─────────────────────────────────────────────────────────────────────────────
mode = st.radio("Select Output Format:", ["Report (PDF)", "Video Narrative"], horizontal=True)
# --- Conditional UI ---
video_style = "professional illustration"
if mode == "Video Narrative":
with st.sidebar:
st.subheader("🎬 Video Options")
video_style = st.selectbox("Visual Style",
["professional illustration", "minimalist infographic", "photorealistic", "cinematic", "data visualization aesthetic"])
st.info("The AI will generate charts from your data where possible, and illustrative images for other scenes.")
# --- Common UI ---
upl = st.file_uploader("Upload CSV or Excel", type=["csv", "xlsx", "xls"])
if upl:
df_prev, _ = load_dataframe_safely(upl.getvalue(), upl.name)
with st.expander("πŸ“Š Data Preview"):
st.dataframe(arrow_df(df_prev.head()))
ctx = st.text_area("Business context or specific instructions (optional)")
if st.button("πŸš€ Generate", type="primary"):
if not upl:
st.warning("Please upload a file first.")
st.stop()
bkey = sha1_bytes(b"".join([upl.getvalue(), mode.encode(), ctx.encode(), video_style.encode()]))
if mode == "Report (PDF)":
with st.spinner("Generating report and charts..."):
bundle = generate_report_assets(bkey, upl.getvalue(), upl.name, ctx)
else: # Video Narrative
bundle = generate_video_assets(bkey, upl.getvalue(), upl.name, ctx, video_style)
st.session_state.bundle = bundle
st.rerun()
# --- Display Area (handles state correctly after rerun) ---
if "bundle" in st.session_state and st.session_state.bundle:
bundle = st.session_state.bundle
if bundle.get("type") == "report":
st.subheader("πŸ“„ Generated Report")
with st.expander("View Report", expanded=True):
if bundle["preview"]:
st.markdown(bundle["preview"], unsafe_allow_html=True)
c1, c2 = st.columns(2)
with c1:
st.download_button("Download PDF", bundle["pdf"], "business_report.pdf", "application/pdf", use_container_width=True)
with c2:
if DG_KEY and st.button("πŸ”Š Narrate Summary", use_container_width=True):
report_text = re.sub(r'<[^>]+>', '', bundle["report_md"]) # Basic HTML strip
audio, mime = deepgram_tts(report_text)
if audio:
st.audio(audio, format=mime)
else:
st.error("Narration failed.")
else:
st.warning("No report content was generated.")
elif bundle.get("type") == "video":
st.subheader("🎬 Generated Video Narrative")
video_path = bundle.get("video_path")
if video_path and Path(video_path).exists():
with open(video_path, "rb") as f:
st.video(f.read())
with open(video_path, "rb") as f:
st.download_button("Download Video", f, f"sozo_narrative_{bundle['key'][:8]}.mp4", "video/mp4")
else:
st.error("Video file could not be found or generation failed.")