Spaces:
Sleeping
Sleeping
| import streamlit as st | |
| import pandas as pd | |
| import numpy as np | |
| import matplotlib.pyplot as plt | |
| import seaborn as sns | |
| from google import genai | |
| from google.genai import types | |
| import json | |
| import streamlit.components.v1 as components | |
| from datetime import datetime, date | |
| import io | |
| import base64 | |
| from typing import Dict, List, Any, Optional | |
| # ------------------------------ | |
| # Configuration & Constants | |
| # ------------------------------ | |
| APP_TITLE = "Enterprise AI BI Dashboard" | |
| APP_ICON = "π" | |
| # Model Configuration Strategy | |
| # We define the specific requested models here. | |
| # NOTE: Ensure your Google Cloud Project has access to these specific Model IDs. | |
| AI_CONFIG = { | |
| "analyst_model": "gemini-3.0-flash-preview", # The heavy lifter for reasoning | |
| "dashboard_model": "gemini-nano-banana-pro", # The specialist for HTML/Code generation | |
| "fallback_model": "gemini-2.0-flash-exp" # Fallback if specific previews aren't active | |
| } | |
| # ------------------------------ | |
| # Service Layer: Utilities | |
| # ------------------------------ | |
| class CustomJSONEncoder(json.JSONEncoder): | |
| """Robust JSON Encoder for Dataframes and NumPy types.""" | |
| def default(self, obj): | |
| if isinstance(obj, (datetime, date, pd.Timestamp)): | |
| return obj.isoformat() | |
| if isinstance(obj, np.integer): | |
| return int(obj) | |
| if isinstance(obj, np.floating): | |
| return float(obj) | |
| if isinstance(obj, np.ndarray): | |
| return obj.tolist() | |
| if pd.isna(obj): | |
| return None | |
| return super().default(obj) | |
| def clean_ai_response(text: str) -> str: | |
| """Cleans Markdown code blocks from AI responses.""" | |
| text = text.strip() | |
| if text.startswith("```"): | |
| # Find the first newline to skip the language tag (e.g. ```json) | |
| newline_index = text.find("\n") | |
| if newline_index != -1: | |
| text = text[newline_index+1:] | |
| # Remove the closing ``` | |
| if text.endswith("```"): | |
| text = text[:-3] | |
| return text.strip() | |
| # ------------------------------ | |
| # Service Layer: AI Handler | |
| # ------------------------------ | |
| class AIService: | |
| def __init__(self, api_key: str): | |
| self.client = genai.Client(api_key=api_key) | |
| def _generate(self, model_id: str, prompt: str) -> str: | |
| """Wrapper to handle generation with fallback logic.""" | |
| try: | |
| response = self.client.models.generate_content( | |
| model=model_id, | |
| contents=[prompt] | |
| ) | |
| return response.text | |
| except Exception as e: | |
| # If the specific preview model fails (404/Permission), try fallback | |
| if "404" in str(e) or "not found" in str(e).lower(): | |
| st.warning(f"β οΈ Model '{model_id}' not found. Falling back to '{AI_CONFIG['fallback_model']}'.") | |
| response = self.client.models.generate_content( | |
| model=AI_CONFIG['fallback_model'], | |
| contents=[prompt] | |
| ) | |
| return response.text | |
| raise e | |
| def analyze_dataset(self, schema: Dict) -> Dict: | |
| """Uses Gemini 3.0 Pro to analyze data structure and suggest charts.""" | |
| prompt = f""" | |
| You are a Principal Data Architect. Analyze this dataset schema: | |
| {json.dumps(schema, indent=2, cls=CustomJSONEncoder)} | |
| Task: | |
| 1. Identify the industry/domain. | |
| 2. Determine if this is company-specific data. | |
| 3. Create a visualization plan with 4-6 specific charts. | |
| 4. Generate 3 C-level executive insights. | |
| Return ONLY raw JSON: | |
| {{ | |
| "domain": "string", | |
| "is_company_data": boolean, | |
| "charts": [ | |
| {{"type": "bar|line|scatter|pie|histogram", "x": "col", "y": "col", "title": "string"}} | |
| ], | |
| "insights": ["string"] | |
| }} | |
| """ | |
| response_text = self._generate(AI_CONFIG['analyst_model'], prompt) | |
| return json.loads(clean_ai_response(response_text)) | |
| def generate_dashboard_html(self, context: Dict) -> str: | |
| """Uses Gemini Nano Banana Pro to generate high-performance HTML.""" | |
| prompt = f""" | |
| You are an Expert Frontend Engineer specialized in BI Dashboards. | |
| CONTEXT: | |
| Title: {context['title']} | |
| Domain: {context['domain']} | |
| Stats: {json.dumps(context['stats'], cls=CustomJSONEncoder)} | |
| Sample: {json.dumps(context['sample'], cls=CustomJSONEncoder)} | |
| REQUIREMENTS: | |
| 1. Create a single-file, responsive HTML dashboard. | |
| 2. Use **Chart.js** via CDN. | |
| 3. Style with a modern, glassmorphism dark theme suitable for {context['domain']}. | |
| 4. Include a 'Key Metrics' row at the top (Cards). | |
| 5. Include a grid of interactive charts. | |
| 6. Handle missing data gracefully in JavaScript. | |
| Return ONLY valid HTML code. | |
| """ | |
| return clean_ai_response(self._generate(AI_CONFIG['dashboard_model'], prompt)) | |
| def generate_presentation(self, context: Dict) -> str: | |
| """Uses Gemini 3.0 Pro to generate a strategic slide deck.""" | |
| prompt = f""" | |
| Create a Reveal.js (HTML) presentation for this dataset. | |
| Title: {context['title']} | |
| Insights: {json.dumps(context['insights'])} | |
| Create 5 slides: Title, Objectives, Data Analysis, Strategic Insights, Conclusion. | |
| Use a professional gradient theme. | |
| Return ONLY valid HTML. | |
| """ | |
| return clean_ai_response(self._generate(AI_CONFIG['analyst_model'], prompt)) | |
| # ------------------------------ | |
| # UI Configuration | |
| # ------------------------------ | |
| st.set_page_config(page_title=APP_TITLE, page_icon=APP_ICON, layout="wide") | |
| st.markdown(""" | |
| <style> | |
| .stApp { background-color: #0e1117; color: #fafafa; } | |
| .stButton>button { border-radius: 8px; font-weight: bold; } | |
| div[data-testid="stMetricValue"] { font-size: 24px; color: #4db8ff; } | |
| </style> | |
| """, unsafe_allow_html=True) | |
| # ------------------------------ | |
| # Main Application Logic | |
| # ------------------------------ | |
| def main(): | |
| # --- Sidebar --- | |
| with st.sidebar: | |
| st.header(f"{APP_ICON} Configuration") | |
| api_key = st.text_input("π Google Gemini API Key", type="password") | |
| st.divider() | |
| st.caption("Active Models:") | |
| st.code(f"Analyst: {AI_CONFIG['analyst_model']}\nDashboard: {AI_CONFIG['dashboard_model']}") | |
| st.info("Ensure your API key has access to the Preview models, otherwise fallback will be used.") | |
| if not api_key: | |
| st.warning("β οΈ Please enter your API Key to initialize the AI Engine.") | |
| st.stop() | |
| # Initialize Service | |
| try: | |
| ai_service = AIService(api_key) | |
| except Exception as e: | |
| st.error(f"Failed to initialize AI Client: {e}") | |
| st.stop() | |
| # --- Main Content --- | |
| st.title(f"{APP_TITLE}") | |
| uploaded_file = st.file_uploader("π Upload Data (CSV/Excel)", type=["csv", "xlsx"]) | |
| if uploaded_file: | |
| # Load Data | |
| try: | |
| if uploaded_file.name.endswith('.csv'): | |
| df = pd.read_csv(uploaded_file) | |
| else: | |
| df = pd.read_excel(uploaded_file) | |
| # Basic cleanup | |
| df.columns = [c.strip() for c in df.columns] | |
| # --- Data Overview --- | |
| st.divider() | |
| c1, c2, c3, c4 = st.columns(4) | |
| c1.metric("Rows", df.shape[0]) | |
| c2.metric("Columns", df.shape[1]) | |
| c3.metric("Numeric Fields", len(df.select_dtypes(include=np.number).columns)) | |
| c4.metric("Categorical Fields", len(df.select_dtypes(exclude=np.number).columns)) | |
| with st.expander("π View Raw Data & Quality Checks"): | |
| st.dataframe(df.head()) | |
| st.write(df.describe()) | |
| # --- AI Operations --- | |
| st.divider() | |
| st.subheader("π€ AI Intelligence Operations") | |
| col_ops1, col_ops2, col_ops3 = st.columns(3) | |
| # Prepare Schema for AI (Lightweight) | |
| sample_data = df.head(3).copy() | |
| # Convert timestamps to string for JSON serialization | |
| for col in sample_data.columns: | |
| if pd.api.types.is_datetime64_any_dtype(sample_data[col]): | |
| sample_data[col] = sample_data[col].astype(str) | |
| schema = { | |
| "columns": {col: str(df[col].dtype) for col in df.columns}, | |
| "sample": sample_data.to_dict(orient='records'), | |
| "numeric_columns": df.select_dtypes(include=np.number).columns.tolist() | |
| } | |
| # 1. ANALYZE DATA | |
| if col_ops1.button("π Analyze & Visualize", type="primary", use_container_width=True): | |
| with st.spinner(f"Reasoning with {AI_CONFIG['analyst_model']}..."): | |
| try: | |
| analysis = ai_service.analyze_dataset(schema) | |
| st.session_state['analysis'] = analysis | |
| st.session_state['df_context'] = df # Store for plotting | |
| except Exception as e: | |
| st.error(f"Analysis failed: {e}") | |
| # 2. GENERATE DASHBOARD | |
| if col_ops2.button("π¨ Create HTML Dashboard", use_container_width=True): | |
| if 'analysis' not in st.session_state: | |
| st.warning("Please run 'Analyze' first to determine the domain.") | |
| else: | |
| with st.spinner(f"Coding with {AI_CONFIG['dashboard_model']}..."): | |
| try: | |
| # Prepare context | |
| stats = df.describe().to_dict() | |
| context = { | |
| "title": uploaded_file.name, | |
| "domain": st.session_state['analysis'].get('domain', 'General'), | |
| "stats": stats, | |
| "sample": df.head(15).to_dict(orient='records') # larger sample for dashboard | |
| } | |
| html_code = ai_service.generate_dashboard_html(context) | |
| st.session_state['html_dashboard'] = html_code | |
| except Exception as e: | |
| st.error(f"Dashboard generation failed: {e}") | |
| # 3. GENERATE SLIDES | |
| if col_ops3.button("π€ Generate Presentation", use_container_width=True): | |
| if 'analysis' not in st.session_state: | |
| st.warning("Please run 'Analyze' first.") | |
| else: | |
| with st.spinner("Drafting slides..."): | |
| context = { | |
| "title": uploaded_file.name, | |
| "insights": st.session_state['analysis'].get('insights', []) | |
| } | |
| ppt_html = ai_service.generate_presentation(context) | |
| st.session_state['ppt_html'] = ppt_html | |
| # --- Display Results --- | |
| # Result 1: Static Charts (Collage) | |
| if 'analysis' in st.session_state and 'df_context' in st.session_state: | |
| st.divider() | |
| st.subheader(f"π Strategic Analysis ({st.session_state['analysis']['domain']})") | |
| # Display Insights | |
| for i, insight in enumerate(st.session_state['analysis']['insights']): | |
| st.success(f"**Insight {i+1}:** {insight}") | |
| # Plotting logic | |
| charts = st.session_state['analysis']['charts'] | |
| fig = plt.figure(figsize=(18, 5 * ((len(charts)+2)//3))) | |
| for idx, chart in enumerate(charts, 1): | |
| ax = fig.add_subplot(((len(charts)+2)//3), 3, idx) | |
| try: | |
| c_type = chart['type'] | |
| x_col = chart.get('x') | |
| y_col = chart.get('y') | |
| if c_type == 'bar' and x_col and y_col: | |
| # Aggregate for bar charts to avoid clutter | |
| data_agg = df.groupby(x_col)[y_col].sum().nlargest(10) | |
| sns.barplot(x=data_agg.values, y=data_agg.index, ax=ax, palette="viridis") | |
| ax.set_title(chart['title']) | |
| elif c_type == 'scatter' and x_col and y_col: | |
| sns.scatterplot(data=df, x=x_col, y=y_col, ax=ax, alpha=0.6) | |
| ax.set_title(chart['title']) | |
| elif c_type == 'line' and x_col and y_col: | |
| # Sort for line charts | |
| temp_df = df.sort_values(x_col) | |
| sns.lineplot(data=temp_df, x=x_col, y=y_col, ax=ax) | |
| ax.set_title(chart['title']) | |
| elif c_type == 'histogram' and x_col: | |
| sns.histplot(df[x_col], kde=True, ax=ax) | |
| ax.set_title(chart['title']) | |
| # Cleanup axes | |
| ax.tick_params(axis='x', rotation=45) | |
| except Exception as e: | |
| ax.text(0.5, 0.5, "Could not render chart", ha='center') | |
| plt.tight_layout() | |
| st.pyplot(fig) | |
| # Result 2: HTML Dashboard | |
| if 'html_dashboard' in st.session_state: | |
| st.divider() | |
| st.subheader("π₯οΈ Interactive Dashboard (Banana Pro Generated)") | |
| components.html(st.session_state['html_dashboard'], height=800, scrolling=True) | |
| st.download_button("π₯ Download HTML", st.session_state['html_dashboard'], "dashboard.html", "text/html") | |
| # Result 3: Presentation | |
| if 'ppt_html' in st.session_state: | |
| st.divider() | |
| st.subheader("π½οΈ Executive Presentation") | |
| components.html(st.session_state['ppt_html'], height=600) | |
| st.download_button("π₯ Download Slides", st.session_state['ppt_html'], "presentation.html", "text/html") | |
| except Exception as e: | |
| st.error(f"Error processing file: {e}") | |
| if __name__ == "__main__": | |
| main() |