Spaces:
Sleeping
Sleeping
| ############################################################################## | |
| # Sozo Business Studio Β· 10-Jul-2025 | |
| # β’ FIXED: Corrected the 'name 'spec' is not defined' error in the safe_chart function. | |
| # β’ NOTE: This is the only change. The user's prompts, classes, and AI calls are preserved exactly. | |
| ############################################################################## | |
| import os, re, json, hashlib, uuid, base64, io, tempfile, requests, subprocess, inspect, logging | |
| from pathlib import Path | |
| from typing import Tuple, Dict, List | |
| import streamlit as st | |
| import pandas as pd | |
| import numpy as np | |
| import matplotlib | |
| matplotlib.use("Agg") | |
| import matplotlib.pyplot as plt | |
| from matplotlib.animation import FuncAnimation, FFMpegWriter | |
| from fpdf import FPDF, HTMLMixin | |
| from markdown_it import MarkdownIt | |
| from PIL import Image | |
| import cv2 | |
| from langchain_experimental.agents import create_pandas_dataframe_agent | |
| from langchain_google_genai import ChatGoogleGenerativeAI | |
| from google import genai | |
| from google.genai import types | |
| # βββ CONFIG & LOGGING ββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s - %(levelname)s - [%(funcName)s] - %(message)s', | |
| datefmt='%Y-%m-%d %H:%M:%S' | |
| ) | |
| st.set_page_config(page_title="Sozo Business Studio", layout="wide") | |
| st.title("π Sozo Business Studio") | |
| st.caption("AI transforms business data into compelling narratives.") | |
| FPS, WIDTH, HEIGHT = 24, 1280, 720 | |
| MAX_CHARTS, VIDEO_SCENES = 5, 5 | |
| API_KEY = os.getenv("GEMINI_API_KEY") | |
| if not API_KEY: | |
| st.error("β οΈ GEMINI_API_KEY is not set."); st.stop() | |
| GEM = genai.Client(api_key=API_KEY) | |
| DG_KEY = os.getenv("DEEPGRAM_API_KEY") | |
| sha1_bytes = lambda b: hashlib.sha1(b).hexdigest() | |
| st.session_state.setdefault("bundle", None) | |
| # βββ HELPERS (Unchanged) ββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def load_dataframe_safely(buf: bytes, name: str) -> Tuple[pd.DataFrame, str]: | |
| try: | |
| ext = Path(name).suffix.lower() | |
| df = (pd.read_excel if ext in (".xlsx", ".xls") else pd.read_csv)(io.BytesIO(buf)) | |
| df.columns = df.columns.astype(str).str.strip() | |
| df = df.dropna(how="all") | |
| if df.empty or len(df.columns) == 0: raise ValueError("No usable data found") | |
| return df, None | |
| except Exception as e: return None, str(e) | |
| def arrow_df(df: pd.DataFrame) -> pd.DataFrame: | |
| safe = df.copy() | |
| for c in safe.columns: | |
| if safe[c].dtype.name in ("Int64", "Float64", "Boolean"): | |
| safe[c] = safe[c].astype(safe[c].dtype.name.lower()) | |
| return safe | |
| def deepgram_tts(txt: str) -> Tuple[bytes, str]: | |
| if not DG_KEY or not txt: return None, None | |
| txt = re.sub(r"[^\w\s.,!?;:-]", "", txt)[:1000] | |
| try: | |
| r = requests.post("https://api.deepgram.com/v1/speak", params={"model": "aura-2-andromeda-en"}, headers={"Authorization": f"Token {DG_KEY}", "Content-Type": "application/json"}, json={"text": txt}, timeout=30) | |
| r.raise_for_status() | |
| return r.content, r.headers.get("Content-Type", "audio/mpeg") | |
| except Exception: return None, None | |
| def generate_silence_mp3(duration: float, out: Path): | |
| subprocess.run([ "ffmpeg", "-y", "-f", "lavfi", "-i", "anullsrc=r=44100:cl=mono", "-t", f"{duration:.3f}", "-q:a", "9", str(out)], check=True, capture_output=True) | |
| def audio_duration(path: str) -> float: | |
| try: | |
| res = subprocess.run([ "ffprobe", "-v", "error", "-show_entries", "format=duration", "-of", "default=nw=1:nk=1", path], text=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=True) | |
| return float(res.stdout.strip()) | |
| except Exception: return 5.0 | |
| TAG_RE = re.compile( r'[<[]\s*generate_?chart\s*[:=]?\s*[\"\'ββ]?(?P<d>[^>\"\'β\]]+?)[\"\'ββ]?\s*[>\]]', re.I, ) | |
| extract_chart_tags = lambda t: list( dict.fromkeys(m.group("d").strip() for m in TAG_RE.finditer(t or "")) ) | |
| re_scene = re.compile(r"^\s*scene\s*\d+[:.\- ]*", re.I | re.M) | |
| def clean_narration(txt: str) -> str: | |
| txt = TAG_RE.sub("", txt); txt = re_scene.sub("", txt) | |
| phrases_to_remove = [r"as you can see in the chart", r"this chart shows", r"the chart illustrates", r"in this visual", r"this graph displays"] | |
| for phrase in phrases_to_remove: txt = re.sub(phrase, "", txt, flags=re.IGNORECASE) | |
| txt = re.sub(r"\s*\([^)]*\)", "", txt); txt = re.sub(r"[\*#_]", "", txt) | |
| return re.sub(r"\s{2,}", " ", txt).strip() | |
| def placeholder_img() -> Image.Image: return Image.new("RGB", (WIDTH, HEIGHT), (230, 230, 230)) | |
| def generate_image_from_prompt(prompt: str) -> Image.Image: | |
| model_main = "gemini-2.0-flash-exp-image-generation"; model_fallback = "gemini-2.0-flash-preview-image-generation" | |
| full_prompt = "A clean business-presentation illustration: " + prompt | |
| def fetch(model_name): | |
| res = GEM.models.generate_content(model=model_name, contents=full_prompt, config=types.GenerateContentConfig(response_modalities=["IMAGE"])) | |
| for part in res.candidates[0].content.parts: | |
| if getattr(part, "inline_data", None): return Image.open(io.BytesIO(part.inline_data.data)).convert("RGB") | |
| return None | |
| try: | |
| img = fetch(model_main) or fetch(model_fallback) | |
| return img if img else placeholder_img() | |
| except Exception: return placeholder_img() | |
| class PDF(FPDF, HTMLMixin): pass | |
| def build_pdf(md: str, charts: Dict[str, str]) -> bytes: | |
| def embed_chart_for_pdf(match): | |
| desc = match.group("d").strip(); path = charts.get(desc) | |
| if path and Path(path).exists(): return f'<img src="data:image/png;base64,{base64.b64encode(Path(path).read_bytes()).decode()}" width="600">' | |
| return "" | |
| html = MarkdownIt("commonmark", {"breaks": True}).enable("table").render(TAG_RE.sub(embed_chart_for_pdf, md)) | |
| pdf = PDF(); pdf.set_auto_page_break(True, margin=15); pdf.add_page() | |
| pdf.set_font("Arial", "B", 18); pdf.cell(0, 12, "AI-Generated Business Report", ln=True); pdf.ln(3) | |
| pdf.set_font("Arial", "", 11); pdf.write_html(html) | |
| return pdf.output(dest="S") | |
| # βββ ENHANCED CHART GENERATION SYSTEM (User's code - unchanged) βββββββββββ | |
| class ChartSpecification: | |
| def __init__(self, chart_type: str, title: str, x_col: str, y_col: str, agg_method: str = None, filter_condition: str = None, top_n: int = None, color_scheme: str = "professional"): | |
| self.chart_type = chart_type; self.title = title; self.x_col = x_col; self.y_col = y_col | |
| self.agg_method = agg_method or "sum"; self.filter_condition = filter_condition; self.top_n = top_n; self.color_scheme = color_scheme | |
| def enhance_data_context(df: pd.DataFrame, ctx_dict: Dict) -> Dict: | |
| enhanced_ctx = ctx_dict.copy(); numeric_cols = df.select_dtypes(include=['number']).columns.tolist(); categorical_cols = df.select_dtypes(exclude=['number']).columns.tolist() | |
| enhanced_ctx.update({"numeric_columns": numeric_cols, "categorical_columns": categorical_cols, "data_insights": {"has_time_series": any(col.lower() in ['date', 'time', 'month', 'year'] for col in df.columns), "has_categories": len(categorical_cols) > 0, "has_numeric": len(numeric_cols) > 0, "record_count": len(df), "correlation_pairs": get_correlation_pairs(df, numeric_cols) if len(numeric_cols) > 1 else []}, "recommended_charts": recommend_chart_types(df, numeric_cols, categorical_cols)}) | |
| return enhanced_ctx | |
| def get_correlation_pairs(df: pd.DataFrame, numeric_cols: List[str]) -> List[Tuple[str, str, float]]: | |
| correlations = []; | |
| if len(numeric_cols) > 1: | |
| corr_matrix = df[numeric_cols].corr() | |
| for i, col1 in enumerate(numeric_cols): | |
| for j, col2 in enumerate(numeric_cols[i+1:], i+1): | |
| if abs(corr_matrix.loc[col1, col2]) > 0.5: correlations.append((col1, col2, corr_matrix.loc[col1, col2])) | |
| return correlations | |
| def recommend_chart_types(df: pd.DataFrame, numeric_cols: List[str], categorical_cols: List[str]) -> Dict[str, str]: | |
| recommendations = {} | |
| if len(categorical_cols) > 0 and len(numeric_cols) > 0: | |
| recommendations["bar"] = f"Compare {numeric_cols[0]} across {categorical_cols[0]}" | |
| if len(df[categorical_cols[0]].unique()) <= 6: recommendations["pie"] = f"Distribution of {numeric_cols[0]} by {categorical_cols[0]}" | |
| if len(numeric_cols) > 1: | |
| recommendations["scatter"] = f"Relationship between {numeric_cols[0]} and {numeric_cols[1]}" | |
| if any(word in col.lower() for col in df.columns for word in ['date', 'time', 'month', 'year']): recommendations["line"] = f"Trend of {numeric_cols[0]} over time" | |
| if len(numeric_cols) > 0: recommendations["hist"] = f"Distribution of {numeric_cols[0]}" | |
| return recommendations | |
| def create_chart_generator(llm, df: pd.DataFrame) -> 'ChartGenerator': return ChartGenerator(llm, df) | |
| class ChartGenerator: | |
| def __init__(self, llm, df: pd.DataFrame): | |
| self.llm = llm; self.df = df | |
| self.enhanced_ctx = enhance_data_context(df, {"columns": list(df.columns), "shape": df.shape, "dtypes": {col: str(dtype) for col, dtype in df.dtypes.items()}}) | |
| def generate_chart_spec(self, description: str) -> ChartSpecification: | |
| logging.info(f"Generating chart spec for: '{description}'") | |
| spec_prompt = f""" | |
| You are a data visualization expert. Based on the dataset and chart description, generate a precise chart specification. | |
| **Dataset Info:** {json.dumps(self.enhanced_ctx, indent=2)} | |
| **Chart Request:** {description} | |
| **Return a JSON specification with these exact fields:** | |
| {{ | |
| "chart_type": "bar|pie|line|scatter|hist", "title": "Professional chart title", "x_col": "column_name_for_x_axis", | |
| "y_col": "column_name_for_y_axis_or_null", "agg_method": "sum|mean|count|max|min|null", "filter_condition": "description_of_filtering_or_null", | |
| "top_n": "number_for_top_n_filtering_or_null", "reasoning": "Why this specification was chosen" | |
| }} | |
| Return only the JSON specification, no additional text. | |
| """ | |
| try: | |
| response = self.llm.invoke(spec_prompt).content.strip() | |
| logging.info(f"LLM response for spec: {response}") | |
| if response.startswith("```json"): response = response[7:-3] | |
| elif response.startswith("```"): response = response[3:-3] | |
| spec_dict = json.loads(response) | |
| valid_keys = [p.name for p in inspect.signature(ChartSpecification).parameters.values()] | |
| filtered_dict = {k: v for k, v in spec_dict.items() if k in valid_keys} | |
| logging.info(f"Successfully parsed spec: {filtered_dict}") | |
| return ChartSpecification(**filtered_dict) | |
| except Exception as e: | |
| logging.error(f"Spec generation failed: {e}. Using fallback.") | |
| return self._create_fallback_spec(description) | |
| def _create_fallback_spec(self, description: str) -> ChartSpecification: | |
| logging.warning("Using fallback spec generation.") | |
| numeric_cols = self.enhanced_ctx['numeric_columns']; categorical_cols = self.enhanced_ctx['categorical_columns'] | |
| if "bar" in description.lower() and categorical_cols and numeric_cols: return ChartSpecification("bar", description, categorical_cols[0], numeric_cols[0]) | |
| elif "pie" in description.lower() and categorical_cols and numeric_cols: return ChartSpecification("pie", description, categorical_cols[0], numeric_cols[0]) | |
| elif "line" in description.lower() and len(numeric_cols) >= 1: return ChartSpecification("line", self.df.columns[0], numeric_cols[0]) | |
| elif "scatter" in description.lower() and len(numeric_cols) >= 2: return ChartSpecification("scatter", description, numeric_cols[0], numeric_cols[1]) | |
| elif "hist" in description.lower() and numeric_cols: return ChartSpecification("hist", description, numeric_cols[0], None) | |
| else: return ChartSpecification("bar", description, self.df.columns[0], self.df.columns[1] if len(self.df.columns) > 1 else None) | |
| def execute_chart_spec(spec: ChartSpecification, df: pd.DataFrame, output_path: Path) -> bool: | |
| try: | |
| plot_data = prepare_plot_data(spec, df) | |
| fig, ax = plt.subplots(figsize=(12, 8)); plt.style.use('default') | |
| if spec.chart_type == "bar": ax.bar(plot_data.index.astype(str), plot_data.values, color='#2E86AB', alpha=0.8); ax.set_xlabel(spec.x_col); ax.set_ylabel(spec.y_col); ax.tick_params(axis='x', rotation=45) | |
| elif spec.chart_type == "pie": ax.pie(plot_data.values, labels=plot_data.index, autopct='%1.1f%%', startangle=90); ax.axis('equal') | |
| elif spec.chart_type == "line": ax.plot(plot_data.index, plot_data.values, marker='o', linewidth=2, color='#A23B72'); ax.set_xlabel(spec.x_col); ax.set_ylabel(spec.y_col); ax.grid(True, alpha=0.3) | |
| elif spec.chart_type == "scatter": ax.scatter(plot_data.iloc[:, 0], plot_data.iloc[:, 1], alpha=0.6, color='#F18F01'); ax.set_xlabel(spec.x_col); ax.set_ylabel(spec.y_col); ax.grid(True, alpha=0.3) | |
| elif spec.chart_type == "hist": ax.hist(plot_data.values, bins=20, color='#C73E1D', alpha=0.7, edgecolor='black'); ax.set_xlabel(spec.x_col); ax.set_ylabel('Frequency'); ax.grid(True, alpha=0.3) | |
| ax.set_title(spec.title, fontsize=14, fontweight='bold', pad=20); plt.tight_layout() | |
| plt.savefig(output_path, dpi=300, bbox_inches='tight', facecolor='white'); plt.close() | |
| logging.info(f"Successfully created static chart '{spec.title}' at {output_path}") | |
| return True | |
| except Exception as e: logging.error(f"Static chart generation failed for '{spec.title}': {e}"); return False | |
| def prepare_plot_data(spec: ChartSpecification, df: pd.DataFrame) -> pd.Series: | |
| if spec.x_col not in df.columns or (spec.y_col and spec.y_col not in df.columns): raise ValueError(f"Invalid columns in chart spec: {spec.x_col}, {spec.y_col}") | |
| if spec.chart_type in ["bar", "pie"]: | |
| if not spec.y_col: return df[spec.x_col].value_counts().nlargest(spec.top_n or 10) | |
| grouped = df.groupby(spec.x_col)[spec.y_col].agg(spec.agg_method or 'sum') | |
| return grouped.nlargest(spec.top_n or 10) | |
| elif spec.chart_type == "line": return df.set_index(spec.x_col)[spec.y_col].sort_index() | |
| elif spec.chart_type == "scatter": return df[[spec.x_col, spec.y_col]].dropna() | |
| elif spec.chart_type == "hist": return df[spec.x_col].dropna() | |
| return df[spec.x_col] | |
| # βββ FIXED ANIMATION SYSTEM βββββββββββββββββββββββββββββββββββββββββββββββ | |
| def animate_chart(spec: ChartSpecification, df: pd.DataFrame, dur: float, out: Path, fps: int = FPS) -> str: | |
| logging.info(f"Animating chart '{spec.title}' ({spec.chart_type}) for {dur:.2f}s") | |
| plot_data = prepare_plot_data(spec, df) | |
| frames = max(10, int(dur * fps)) | |
| fig, ax = plt.subplots(figsize=(WIDTH / 100, HEIGHT / 100), dpi=100) | |
| plt.tight_layout(pad=3.0) | |
| ctype = spec.chart_type | |
| if ctype == "pie": | |
| wedges, _, _ = ax.pie(plot_data, labels=plot_data.index, startangle=90, autopct='%1.1f%%') | |
| ax.set_title(spec.title); ax.axis('equal') | |
| def init(): [w.set_alpha(0) for w in wedges]; return wedges | |
| def update(i): [w.set_alpha(i / (frames - 1)) for w in wedges]; return wedges | |
| elif ctype == "bar": | |
| bars = ax.bar(plot_data.index.astype(str), np.zeros_like(plot_data.values, dtype=float), color="#1f77b4") | |
| ax.set_ylim(0, plot_data.max() * 1.1 if not pd.isna(plot_data.max()) and plot_data.max() > 0 else 1) | |
| ax.set_title(spec.title); plt.xticks(rotation=45, ha="right") | |
| def init(): return bars | |
| def update(i): | |
| for b, h in zip(bars, plot_data.values): b.set_height(h * (i / (frames - 1))) | |
| return bars | |
| elif ctype == "scatter": | |
| scat = ax.scatter([], [], alpha=0.7) | |
| x_full, y_full = plot_data.iloc[:, 0], plot_data.iloc[:, 1] | |
| ax.set_xlim(x_full.min(), x_full.max()); ax.set_ylim(y_full.min(), y_full.max()) | |
| ax.set_title(spec.title); ax.grid(alpha=.3); ax.set_xlabel(spec.x_col); ax.set_ylabel(spec.y_col) | |
| def init(): scat.set_offsets(np.empty((0, 2))); return [scat] | |
| def update(i): | |
| k = max(1, int(len(x_full) * (i / (frames - 1)))) | |
| scat.set_offsets(plot_data.iloc[:k].values); return [scat] | |
| elif ctype == "hist": | |
| _, _, patches = ax.hist(plot_data, bins=20, alpha=0) | |
| ax.set_title(spec.title); ax.set_xlabel(spec.x_col); ax.set_ylabel("Frequency") | |
| def init(): [p.set_alpha(0) for p in patches]; return patches | |
| def update(i): [p.set_alpha((i / (frames - 1)) * 0.7) for p in patches]; return patches | |
| else: # line | |
| line, = ax.plot([], [], lw=2) | |
| plot_data = plot_data.sort_index() if not plot_data.index.is_monotonic_increasing else plot_data | |
| x_full, y_full = plot_data.index, plot_data.values | |
| ax.set_xlim(x_full.min(), x_full.max()); ax.set_ylim(y_full.min() * 0.9, y_full.max() * 1.1) | |
| ax.set_title(spec.title); ax.grid(alpha=.3); ax.set_xlabel(spec.x_col); ax.set_ylabel(spec.y_col) | |
| def init(): line.set_data([], []); return [line] | |
| def update(i): | |
| k = max(2, int(len(x_full) * (i / (frames - 1)))) | |
| line.set_data(x_full[:k], y_full[:k]); return [line] | |
| anim = FuncAnimation(fig, update, init_func=init, frames=frames, blit=True, interval=1000 / fps) | |
| anim.save(str(out), writer=FFMpegWriter(fps=fps, metadata={'artist': 'Sozo Studio'}), dpi=144) | |
| plt.close(fig) | |
| logging.info(f"Successfully animated chart '{spec.title}'") | |
| return str(out) | |
| def animate_image_fade(img: np.ndarray, dur: float, out: Path, fps: int = 24) -> str: | |
| fourcc = cv2.VideoWriter_fourcc(*'mp4v'); video_writer = cv2.VideoWriter(str(out), fourcc, fps, (WIDTH, HEIGHT)) | |
| total_frames = max(1, int(dur * fps)) | |
| for i in range(total_frames): | |
| alpha = i / (total_frames - 1) if total_frames > 1 else 1.0 | |
| frame = cv2.addWeighted(img, alpha, np.zeros_like(img), 1 - alpha, 0) | |
| video_writer.write(frame) | |
| video_writer.release() | |
| return str(out) | |
| def safe_chart(desc: str, df: pd.DataFrame, dur: float, out: Path) -> str: | |
| """FIXED: This function now correctly calls the animation engine.""" | |
| try: | |
| llm = ChatGoogleGenerativeAI(model="gemini-2.0-flash", google_api_key=API_KEY, temperature=0.1) | |
| chart_generator = create_chart_generator(llm, df) | |
| chart_spec = chart_generator.generate_chart_spec(desc) | |
| # FIXED: Corrected variable name from 'spec' to 'chart_spec' | |
| return animate_chart(chart_spec, df, dur, out) | |
| except Exception as e: | |
| logging.error(f"Chart animation failed for '{desc}': {e}. Falling back to static image.") | |
| temp_png = out.with_suffix(".png") | |
| llm = ChatGoogleGenerativeAI(model="gemini-2.0-flash", google_api_key=API_KEY, temperature=0.1) | |
| chart_generator = create_chart_generator(llm, df) | |
| chart_spec = chart_generator.generate_chart_spec(desc) | |
| if execute_chart_spec(chart_spec, df, temp_png): | |
| img = cv2.imread(str(temp_png)) | |
| img_resized = cv2.resize(img, (WIDTH, HEIGHT)) | |
| return animate_image_fade(img_resized, dur, out) | |
| else: | |
| logging.error("Ultimate fallback triggered: generating placeholder image.") | |
| img = generate_image_from_prompt(f"A professional business chart showing {desc}") | |
| img_cv = cv2.cvtColor(np.array(img.resize((WIDTH, HEIGHT))), cv2.COLOR_RGB2BGR) | |
| return animate_image_fade(img_cv, dur, out) | |
| def concat_media(file_paths: List[str], output_path: Path, media_type: str): | |
| logging.info(f"Concatenating {len(file_paths)} {media_type} files.") | |
| valid_paths = [p for p in file_paths if Path(p).exists() and Path(p).stat().st_size > 100] | |
| if not valid_paths: | |
| logging.error(f"Concatenation failed: No valid {media_type} files found.") | |
| fallback_dur = 1.0 | |
| if media_type == 'video': animate_image_fade(cv2.cvtColor(np.array(placeholder_img()), cv2.COLOR_RGB2BGR), fallback_dur, output_path) | |
| else: generate_silence_mp3(fallback_dur, output_path) | |
| return | |
| if len(valid_paths) == 1: | |
| logging.info("Only one valid file, copying directly.") | |
| import shutil; shutil.copy2(valid_paths[0], str(output_path)); return | |
| list_file = output_path.with_suffix(".txt") | |
| with open(list_file, 'w') as f: | |
| for path in valid_paths: f.write(f"file '{Path(path).resolve()}'\n") | |
| cmd = ["ffmpeg", "-y", "-f", "concat", "-safe", "0", "-i", str(list_file), "-c", "copy", str(output_path)] | |
| try: | |
| subprocess.run(cmd, check=True, capture_output=True, text=True) | |
| logging.info(f"Successfully concatenated {media_type} files to {output_path}") | |
| except subprocess.CalledProcessError as e: | |
| logging.error(f"FFmpeg concatenation failed for {media_type}: {e.stderr}. Copying first valid file as fallback.") | |
| import shutil; shutil.copy2(valid_paths[0], str(output_path)) | |
| finally: | |
| list_file.unlink(missing_ok=True) | |
| # βββ REPORT & VIDEO WORKFLOWS (User's prompts and classes are UNCHANGED) βββ | |
| def generate_report_bundle(buf: bytes, name: str, ctx: str, key: str): | |
| df, err = load_dataframe_safely(buf, name) | |
| if err: st.error(err); return None | |
| llm = ChatGoogleGenerativeAI(model="gemini-2.0-flash", google_api_key=API_KEY, temperature=0.1) | |
| ctx_dict = {"shape": df.shape, "columns": list(df.columns), "user_ctx": ctx or "General business analysis", "full_dataframe": df.to_dict("records"), "data_types": {c: str(d) for c, d in df.dtypes.to_dict().items()}, "missing_values": {c: int(v) for c, v in df.isnull().sum().to_dict().items()}, "numeric_summary": {c: {s: float(v) for s, v in stats.items()} for c, stats in df.describe().to_dict().items()} if len(df.select_dtypes(include=["number"]).columns) > 0 else {}} | |
| enhanced_ctx = enhance_data_context(df, ctx_dict) | |
| cols = ", ".join(enhanced_ctx["columns"][:6]) | |
| report_prompt = f""" | |
| You are a senior data analyst and business intelligence expert. Analyze the provided dataset and write a comprehensive executive-level Markdown report. | |
| **Dataset Analysis Context:** {json.dumps(enhanced_ctx, indent=2)} | |
| **Chart Recommendations Available:** {json.dumps(enhanced_ctx.get('recommended_charts', {}), indent=2)} | |
| **Instructions:** | |
| 1. **Identify Data Domain**: First, determine what type of data this represents (e.g., sales/revenue, healthcare/medical, HR/employee, financial, operational, customer, research, etc.) based on column names and sample data. | |
| 2. **Executive Summary**: Start with a high-level summary of key findings and business impact. | |
| 3. **Data Quality Assessment**: Comment on data completeness, any notable missing values, and data reliability. | |
| 4. **Key Insights**: You must provide exactly 5 key insights, each with its own chart tag. | |
| 5. **Strategic Recommendations**: Offer concrete, actionable recommendations based on the data. | |
| 6. **Visual Support**: When a visualization would enhance understanding, insert chart tags like: `<generate_chart: "chart_type | specific description">` | |
| Valid chart types: bar, pie, line, scatter, hist. Base every chart on actual columns: {cols} | |
| **IMPORTANT CHART SELECTION RULES:** | |
| - bar: Use when comparing categories with numeric values (requires categorical + numeric columns) | |
| - pie: Use for proportional breakdowns with few categories (<7) (requires categorical + numeric columns) | |
| - line: Use for time series, trends, or sequential data (requires numeric columns, preferably with time/sequence) | |
| - scatter: Use for correlation analysis between two numeric variables (requires 2+ numeric columns) | |
| - hist: Use for distribution analysis of a single numeric variable (requires 1 numeric column) | |
| **Data-Driven Chart Suggestions:** | |
| {chr(10).join([f" - {chart_type}: {description}" for chart_type, description in enhanced_ctx.get('recommended_charts', {}).items()])} | |
| 7. **Format Requirements**: | |
| - Use professional business language, include relevant metrics and percentages, structure with clear headers, and end with ## Next Steps section. | |
| **Domain-Specific Focus Areas:** | |
| - If sales data: focus on revenue trends, customer segments, product performance. If HR data: focus on workforce analytics, retention, performance metrics. | |
| - If financial data: focus on profitability, cost analysis, financial health. If operational data: focus on efficiency, bottlenecks, process optimization. | |
| - If customer data: focus on behavior patterns, satisfaction, churn analysis. | |
| Generate insights that would be valuable to C-level executives and department heads. Ensure all charts use real data columns and appropriate chart types. | |
| """ | |
| md = llm.invoke(report_prompt).content | |
| chart_descs = extract_chart_tags(md)[:MAX_CHARTS] | |
| chart_paths = {}; chart_generator = create_chart_generator(llm, df) | |
| for desc in chart_descs: | |
| with st.spinner(f"Generating chart: {desc}..."): | |
| img_path = Path(tempfile.gettempdir()) / f"{uuid.uuid4()}.png" | |
| try: | |
| chart_spec = chart_generator.generate_chart_spec(desc) | |
| if execute_chart_spec(chart_spec, df, img_path): chart_paths[desc] = str(img_path) | |
| except Exception as e: logging.error(f"Failed to generate static chart: {desc}, {e}") | |
| pdf_bytes = build_pdf(md, chart_paths) | |
| return {"type": "report", "key": key, "raw_md": md, "charts": chart_paths, "pdf": pdf_bytes} | |
| def build_story_prompt(ctx_dict): | |
| enhanced_ctx = enhance_data_context(pd.DataFrame(ctx_dict.get("full_dataframe", [])), ctx_dict) | |
| cols = ", ".join(enhanced_ctx["columns"][:6]) | |
| return f""" | |
| You are a professional business storyteller and data analyst. You must create a script with exactly {VIDEO_SCENES} scenes, each separated by '[SCENE_BREAK]'. | |
| **Enhanced Dataset Context:** {json.dumps(enhanced_ctx, indent=2)} | |
| **Available Chart Types and Recommendations:** {json.dumps(enhanced_ctx.get('recommended_charts', {}), indent=2)} | |
| **Task Requirements:** | |
| 1. **Identify the Data Story**: Determine what business domain this data represents and what story it tells | |
| 2. **Create {VIDEO_SCENES} distinct scenes** that build a logical narrative arc | |
| 3. **Each scene must contain:** 1-2 sentences of clear, professional narration and exactly one chart tag: `<generate_chart: "chart_type | specific description">` | |
| **ENHANCED Chart Guidelines:** | |
| - Valid types: bar, pie, line, scatter, hist. Base all charts on actual columns: {cols}. | |
| - **USE RECOMMENDED CHARTS**: {list(enhanced_ctx.get('recommended_charts', {}).keys())} | |
| - Choose chart types that best tell the story and match the data. | |
| **Data-Driven Chart Selection:** | |
| - Numeric columns available: {enhanced_ctx.get('numeric_columns', [])}. Categorical columns available: {enhanced_ctx.get('categorical_columns', [])}. | |
| - Correlation opportunities: {len(enhanced_ctx.get('data_insights', {}).get('correlation_pairs', []))} strong correlations found. | |
| - Time series potential: {enhanced_ctx.get('data_insights', {}).get('has_time_series', False)}. | |
| **Narrative Structure:** Scene 1: Set the context. Middle scenes: Develop insights. Final scene: Conclude with takeaways. | |
| **Content Standards:** Use conversational, executive-level language. Include specific data insights. Avoid chart descriptions in narration. Focus on business impact. | |
| **Output Format:** Separate each scene with exactly [SCENE_BREAK] | |
| Create a compelling, data-driven story that executives would find engaging and actionable, using charts that actually make sense for the data structure. | |
| """ | |
| def generate_video(buf: bytes, name: str, ctx: str, key: str): | |
| logging.info("Starting video generation process.") | |
| try: subprocess.run(["ffmpeg", "-version"], check=True, capture_output=True) | |
| except Exception: logging.error("FFmpeg not found!"); st.error("π΄ FFmpeg not available β cannot render video."); return None | |
| df, err = load_dataframe_safely(buf, name) | |
| if err: st.error(err); return None | |
| llm = ChatGoogleGenerativeAI(model="gemini-2.0-flash", google_api_key=API_KEY, temperature=0.2) | |
| ctx_dict = {"shape": df.shape, "columns": list(df.columns), "user_ctx": ctx or "General business analysis", "full_dataframe": df.to_dict("records")} | |
| script = llm.invoke(build_story_prompt(ctx_dict)).content | |
| scenes = [s.strip() for s in script.split("[SCENE_BREAK]") if s.strip()] | |
| video_parts, audio_parts, temps = [], [], [] | |
| for idx, sc in enumerate(scenes[:VIDEO_SCENES]): | |
| logging.info(f"--- Rendering Scene {idx + 1}/{VIDEO_SCENES} ---") | |
| descs, narrative = extract_chart_tags(sc), clean_narration(sc) | |
| logging.info(f"Narrative: '{narrative}'") | |
| audio_bytes, _ = deepgram_tts(narrative) | |
| mp3 = Path(tempfile.gettempdir()) / f"{uuid.uuid4()}.mp3" | |
| if audio_bytes: | |
| mp3.write_bytes(audio_bytes); dur = audio_duration(str(mp3)) | |
| if dur <= 0.1: logging.warning("Audio duration is near zero, defaulting to 5s."); dur = 5.0 | |
| logging.info(f"Generated TTS audio with duration: {dur:.2f}s") | |
| else: | |
| dur = 5.0; generate_silence_mp3(dur, mp3) | |
| logging.warning(f"TTS failed or was skipped. Generated silent audio: {dur:.2f}s") | |
| audio_parts.append(str(mp3)); temps.append(mp3) | |
| mp4 = Path(tempfile.gettempdir()) / f"{uuid.uuid4()}.mp4" | |
| if descs: | |
| logging.info(f"Chart requested: '{descs[0]}'") | |
| safe_chart(descs[0], df, dur, mp4) | |
| else: | |
| logging.info("No chart requested, generating image from prompt.") | |
| img = generate_image_from_prompt(narrative) | |
| img_cv = cv2.cvtColor(np.array(img.resize((WIDTH, HEIGHT))), cv2.COLOR_RGB2BGR) | |
| animate_image_fade(img_cv, dur, mp4) | |
| video_parts.append(str(mp4)); temps.append(mp4) | |
| silent_vid = Path(tempfile.gettempdir()) / f"{uuid.uuid4()}_v.mp4" | |
| audio_mix = Path(tempfile.gettempdir()) / f"{uuid.uuid4()}_a.mp3" | |
| concat_media(video_parts, silent_vid, "video") | |
| concat_media(audio_parts, audio_mix, "audio") | |
| final_vid = Path(tempfile.gettempdir()) / f"{key}.mp4" | |
| if not (silent_vid.exists() and audio_mix.exists()): | |
| st.error("Media concatenation failed. Cannot create final video."); return None | |
| logging.info("Merging final video and audio streams.") | |
| subprocess.run( | |
| ["ffmpeg", "-y", "-i", str(silent_vid), "-i", str(audio_mix), | |
| "-c:v", "libx264", "-pix_fmt", "yuv420p", "-c:a", "aac", | |
| "-map", "0:v:0", "-map", "1:a:0", "-shortest", str(final_vid)], | |
| check=True, capture_output=True, | |
| ) | |
| for p in temps + [silent_vid, audio_mix]: p.unlink(missing_ok=True) | |
| logging.info(f"Video generation complete: {final_vid}") | |
| return str(final_vid) | |
| # βββ UI & MAIN WORKFLOW (Unchanged) ββββββββββββββββββββββββββββββββββββββ | |
| mode = st.radio("Select Output Format:", ["Report (PDF)", "Video Narrative"], horizontal=True) | |
| upl = st.file_uploader("Upload CSV or Excel", type=["csv", "xlsx", "xls"]) | |
| if upl: | |
| df_prev, _ = load_dataframe_safely(upl.getvalue(), upl.name) | |
| with st.expander("π Data Preview"): st.dataframe(arrow_df(df_prev.head())) | |
| ctx = st.text_area("Business context or specific instructions (optional)") | |
| if st.button("π Generate", type="primary", disabled=not upl): | |
| key = sha1_bytes(b"".join([upl.getvalue(), mode.encode(), ctx.encode()])) | |
| st.session_state.bundle = None | |
| if mode == "Report (PDF)": | |
| with st.spinner("Generating full report and charts... Please wait."): | |
| bundle = generate_report_bundle(upl.getvalue(), upl.name, ctx, key) | |
| st.session_state.bundle = bundle | |
| else: | |
| bundle_path = generate_video(upl.getvalue(), upl.name, ctx, key) | |
| if bundle_path: st.session_state.bundle = {"type": "video", "video_path": bundle_path, "key": key} | |
| st.rerun() | |
| if (bundle := st.session_state.get("bundle")): | |
| if bundle.get("type") == "report": | |
| st.subheader("π Generated Report") | |
| with st.expander("View Report", expanded=True): | |
| report_md, charts = bundle["raw_md"], bundle["charts"] | |
| last_end = 0 | |
| for match in TAG_RE.finditer(report_md): | |
| st.markdown(report_md[last_end:match.start()]) | |
| desc = match.group("d").strip() | |
| if (chart_path := charts.get(desc)) and Path(chart_path).exists(): st.image(chart_path) | |
| else: st.warning(f"Could not render chart: '{desc}'") | |
| last_end = match.end() | |
| st.markdown(report_md[last_end:]) | |
| c1, c2 = st.columns(2) | |
| if bundle.get("pdf"): c1.download_button("Download PDF", bundle["pdf"], f"report_{bundle['key'][:8]}.pdf", "application/pdf", use_container_width=True) | |
| if DG_KEY and c2.button("π Narrate Summary", key=f"aud_{bundle['key']}"): | |
| audio, mime = deepgram_tts(re.sub(r"<[^>]+>", "", bundle["raw_md"])) | |
| if audio: st.audio(audio, format=mime) | |
| else: st.error("Narration failed.") | |
| elif bundle.get("type") == "video": | |
| st.subheader("π¬ Generated Video Narrative") | |
| if (vp := bundle.get("video_path")) and Path(vp).exists(): | |
| with open(vp, "rb") as f: st.video(f.read()) | |
| with open(vp, "rb") as f: st.download_button("Download Video", f, f"narrative_{bundle['key'][:8]}.mp4", "video/mp4") | |
| else: st.error("Video file missing β generation may have failed.") |