Spaces:
Sleeping
Sleeping
| import streamlit as st | |
| import pandas as pd | |
| import pandasai as PandasAI | |
| from pandasai import SmartDatalake, SmartDataframe | |
| from pandasai.responses.response_parser import ResponseParser | |
| from pandasai.llm import GoogleGemini | |
| import plotly.express as px | |
| from PIL import Image | |
| import io | |
| import base64 | |
| import google.generativeai as genai | |
| #from fpdf import FPDF | |
| import markdown2 | |
| import re | |
| import json | |
| import os | |
| from markdown_pdf import MarkdownPdf, Section | |
| import tempfile | |
| from langchain_google_genai import ChatGoogleGenerativeAI | |
| # Configure Gemini API | |
| gemini_api_key = os.environ.get('GOOGLE_API_KEY') | |
| if not gemini_api_key: | |
| st.error("GOOGLE_API_KEY environment variable not set.") | |
| st.stop() | |
| genai.configure(api_key=gemini_api_key) | |
| generation_config = { | |
| "temperature": 0.2, | |
| "top_p": 0.95, | |
| "max_output_tokens": 5000, | |
| } | |
| model = genai.GenerativeModel( | |
| model_name="gemini-2.0-flash-thinking-exp", | |
| generation_config=generation_config, | |
| ) | |
| # Pandasai gemini | |
| llm1 = ChatGoogleGenerativeAI( | |
| model="gemini-2.0-flash-thinking-exp", | |
| temperature=0, | |
| max_tokens=None, | |
| timeout=None, | |
| max_retries=2 | |
| ) | |
| def load_data(): | |
| """Load data from CSV files and validate""" | |
| try: | |
| events_df = pd.read_csv("Delta-Events.csv") | |
| customers_df = pd.read_csv("delta_customers.csv") | |
| products_df = pd.read_csv("Customer_Products.csv") | |
| # Validate data | |
| if events_df.empty or customers_df.empty or products_df.empty: | |
| st.error("One or more data files are empty.") | |
| return None | |
| return { | |
| 'events': events_df, | |
| 'customers': customers_df, | |
| 'products': products_df | |
| } | |
| except Exception as e: | |
| st.error(f"Error loading data: {e}") | |
| return None | |
| #Dashboard | |
| def create_dashboard(data): | |
| """Create dashboard visualizations""" | |
| st.header("Business Insights Dashboard") | |
| # Merge relevant data | |
| merged_orders = pd.concat([ | |
| data['events'][['Surbub', 'Order Value $']].rename(columns={'Surbub': 'Suburb'}), | |
| data['customers'][['Surburb', 'Order_Value']].rename(columns={'Surburb': 'Suburb', 'Order_Value': 'Order Value $'}) | |
| ]) | |
| with st.container(): | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| # Total Orders by Suburb | |
| suburb_orders = merged_orders.groupby('Suburb')['Order Value $'].sum().reset_index() | |
| fig = px.bar(suburb_orders, x='Suburb', y='Order Value $', | |
| title='Total Order Value by Suburb') | |
| st.plotly_chart(fig, use_container_width=True) | |
| with col2: | |
| # Event Types Distribution | |
| event_counts = data['events'].groupby('Event')['Order Value $'].sum().reset_index() | |
| event_counts.columns = ['Event', 'Order Value $'] # Rename columns explicitly | |
| fig = px.pie(event_counts, names='Event', values='Order Value $', | |
| title='Event Type Distribution By Order Value') | |
| st.plotly_chart(fig, use_container_width=True) | |
| # Top Products Analysis | |
| with st.container(): | |
| st.subheader("Product Performance") | |
| product_sales = data['products'].groupby('Product')['Quantity'].sum().nlargest(10).reset_index() | |
| fig = px.bar(product_sales, x='Product', y='Quantity', | |
| title='Top 10 Products by Quantity Sold') | |
| st.plotly_chart(fig, use_container_width=True) | |
| # --- Chat Tab Functions --- | |
| class StreamLitResponse(ResponseParser): | |
| def __init__(self, context): | |
| super().__init__(context) | |
| def format_dataframe(self, result): | |
| """Enhanced DataFrame rendering with type identifier""" | |
| return { | |
| 'type': 'dataframe', | |
| 'value': result['value'] | |
| } | |
| def format_plot(self, result): | |
| """Enhanced plot rendering with type identifier""" | |
| try: | |
| image = result['value'] | |
| # Convert image to base64 for consistent storage | |
| if isinstance(image, Image.Image): | |
| buffered = io.BytesIO() | |
| image.save(buffered, format="PNG") | |
| base64_image = base64.b64encode(buffered.getvalue()).decode('utf-8') | |
| elif isinstance(image, bytes): | |
| base64_image = base64.b64encode(image).decode('utf-8') | |
| elif isinstance(image, str) and os.path.exists(image): | |
| with open(image, "rb") as f: | |
| base64_image = base64.b64encode(f.read()).decode('utf-8') | |
| else: | |
| return {'type': 'text', 'value': "Unsupported image format"} | |
| return { | |
| 'type': 'plot', | |
| 'value': base64_image | |
| } | |
| except Exception as e: | |
| return {'type': 'text', 'value': f"Error processing plot: {e}"} | |
| def format_other(self, result): | |
| """Handle other types of responses""" | |
| return { | |
| 'type': 'text', | |
| 'value': str(result['value']) | |
| } | |
| def generateResponse(prompt, data): | |
| """Generate response using PandasAI with SmartDataLake""" | |
| # Ensure data is a dictionary of DataFrames | |
| if not isinstance(data, dict) or not all(isinstance(df, pd.DataFrame) for df in data.values()): | |
| st.error("Invalid data format. Expected a dictionary of DataFrames.") | |
| return None | |
| pandas_agent = SmartDatalake( | |
| list(data.values()), # Pass list of DataFrames | |
| config={ | |
| "llm": llm1, | |
| "response_parser": StreamLitResponse | |
| } | |
| ) | |
| return pandas_agent.chat(prompt) | |
| def render_chat_message(message): | |
| """Render different types of chat messages""" | |
| if "dataframe" in message: | |
| st.dataframe(message["dataframe"]) | |
| elif "plot" in message: | |
| try: | |
| plot_data = message["plot"] | |
| if isinstance(plot_data, str): | |
| st.image(f"data:image/png;base64,{plot_data}") | |
| elif isinstance(plot_data, Image.Image): | |
| st.image(plot_data) | |
| elif isinstance(plot_data, go.Figure): | |
| st.plotly_chart(plot_data) | |
| elif isinstance(plot_data, bytes): | |
| image = Image.open(io.BytesIO(plot_data)) | |
| st.image(image) | |
| else: | |
| st.write("Unsupported plot format") | |
| except Exception as e: | |
| st.error(f"Error rendering plot: {e}") | |
| if "content" in message: | |
| st.markdown(message["content"]) | |
| def handle_userinput(question, data): | |
| """Handle user input with SmartDataLake""" | |
| try: | |
| if data and all(not df.empty for df in data.values()): | |
| st.session_state.chat_history.append({ | |
| "role": "user", | |
| "content": question | |
| }) | |
| result = generateResponse(question, data) | |
| if isinstance(result, dict): | |
| response_type = result.get('type', 'text') | |
| response_value = result.get('value') | |
| if response_type == 'dataframe': | |
| st.session_state.chat_history.append({ | |
| "role": "assistant", | |
| "content": "Here's the table:", | |
| "dataframe": response_value | |
| }) | |
| elif response_type == 'plot': | |
| st.session_state.chat_history.append({ | |
| "role": "assistant", | |
| "content": "Here's the chart:", | |
| "plot": response_value | |
| }) | |
| else: | |
| st.session_state.chat_history.append({ | |
| "role": "assistant", | |
| "content": str(response_value) | |
| }) | |
| else: | |
| st.session_state.chat_history.append({ | |
| "role": "assistant", | |
| "content": str(result) | |
| }) | |
| else: | |
| st.error("No valid data available for analysis.") | |
| except Exception as e: | |
| st.error(f"Error processing input: {e}") | |
| def main(): | |
| st.set_page_config(page_title="Business Analytics Suite", page_icon="📊", layout="wide") | |
| # Initialize session state | |
| if "chat_history" not in st.session_state: | |
| st.session_state.chat_history = [] | |
| if "data" not in st.session_state: | |
| st.session_state.data = load_data() | |
| # Create tabs | |
| tab_dashboard, tab_chat, tab_reports = st.tabs(["📊 Dashboard", "💬 Chat", "📈 Reports"]) | |
| # Dashboard Tab | |
| with tab_dashboard: | |
| if st.session_state.data: | |
| create_dashboard(st.session_state.data) | |
| else: | |
| st.error("Failed to load data for dashboard") | |
| # Chat Tab | |
| with tab_chat: | |
| st.title("AI Data Analyst") | |
| chat_container = st.container() | |
| with chat_container: | |
| for message in st.session_state.chat_history: | |
| with st.chat_message(message["role"]): | |
| render_chat_message(message) | |
| user_question = st.chat_input("Ask a question about your data:") | |
| if user_question: | |
| handle_userinput(user_question, st.session_state.data) | |
| chat_container.empty() | |
| with chat_container: | |
| for message in st.session_state.chat_history: | |
| with st.chat_message(message["role"]): | |
| render_chat_message(message) | |
| # Reports Tab | |
| with tab_reports: | |
| st.title("Custom Reports") | |
| if st.session_state.data: | |
| # Suburb Filter | |
| suburbs = pd.concat([ | |
| st.session_state.data['events']['Surbub'], | |
| st.session_state.data['customers']['Surburb'] | |
| ]).unique() | |
| selected_suburbs = st.multiselect("Select Suburbs", suburbs) | |
| if st.button("Generate Report"): | |
| with st.spinner("Analyzing data..."): | |
| # Prepare filtered data | |
| filtered_data = { | |
| 'events': st.session_state.data['events'][ | |
| st.session_state.data['events']['Surbub'].isin(selected_suburbs) | |
| ] if selected_suburbs else st.session_state.data['events'], | |
| 'customers': st.session_state.data['customers'][ | |
| st.session_state.data['customers']['Surburb'].isin(selected_suburbs) | |
| ] if selected_suburbs else st.session_state.data['customers'], | |
| 'products': st.session_state.data['products'] | |
| } | |
| # Convert to JSON | |
| json_data = {k: v.to_json(orient='records') for k, v in filtered_data.items()} | |
| # Generate report | |
| prompt = f""" | |
| Analyze this business data and generate a comprehensive report in plain text format. Use markdown for headings and structure. Do not include any json. | |
| Data: | |
| {json.dumps(json_data, indent=2)} | |
| No introductory quips or salutations or follow up questions, just write the report. | |
| """ | |
| response = model.generate_content(prompt) | |
| report = response.text | |
| html_text = markdown2.markdown(report) | |
| # PDF Generation and display | |
| try: | |
| # Create a temporary file to store the PDF | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=".pdf") as tmp_file: | |
| pdf = MarkdownPdf() | |
| pdf.meta["title"] = 'Suburb Business Report' | |
| pdf.add_section(Section(report, toc=False)) | |
| pdf.save(tmp_file.name) # Save the PDF to the temporary file | |
| # Read the PDF bytes from the temporary file | |
| with open(tmp_file.name, "rb") as f: | |
| pdf_bytes = f.read() | |
| # Provide the PDF for download | |
| st.download_button( | |
| label="Download Report as PDF", | |
| data=pdf_bytes, | |
| file_name="report.pdf", | |
| mime="application/pdf" | |
| ) | |
| st.write(html_text, unsafe_allow_html=True) # Display the report below the download button | |
| except Exception as e: | |
| st.error(f"Error generating PDF: {e}") | |
| st.write(html_text, unsafe_allow_html=True) | |
| else: | |
| st.error("No data available for reports") | |
| # Sidebar | |
| with st.sidebar: | |
| st.header("Data Management") | |
| if st.button("Reload Data"): | |
| st.session_state.data = load_data() | |
| if st.button("Clear Chat"): | |
| st.session_state.chat_history = [] | |
| if __name__ == "__main__": | |
| main() | |