TonyDemo / app.py
rairo's picture
Update app.py
fef9a70 verified
import streamlit as st
import os
import pandas as pd
from pandasai import SmartDataframe
from pandasai.responses.response_parser import ResponseParser
from pandasai.llm import GoogleGemini
import plotly.graph_objects as go
from PIL import Image
import io
import base64
import requests
import google.generativeai as genai
from fpdf import FPDF
import markdown2
import re
from markdown_pdf import MarkdownPdf, Section
# API Endpoint and payload
API_URL = "https://irisplus.elixir.co.zw/public/api/profile/reporting/stock-card/genericReports"
PAYLOAD = {
"stock_card_report_id": "d2f1a0e1-7be1-472c-9610-94287154e544"
}
# Configure Gemini API
gemini_api_key = os.environ.get('GOOGLE_API_KEY')
if not gemini_api_key:
st.error("GOOGLE_API_KEY environment variable not set.")
st.stop()
genai.configure(api_key=gemini_api_key)
generation_config = {
"temperature": 0.2,
"top_p": 0.95,
"max_output_tokens": 5000,
}
model = genai.GenerativeModel(
model_name="gemini-2.0-flash-thinking-exp",
generation_config=generation_config,
)
def fetch_data():
"""Fetch stock card report data from API and return cleaned DataFrame"""
response = requests.post(API_URL, data=PAYLOAD)
if response.status_code == 200:
try:
data = response.json()
if isinstance(data, dict) and 'actual_report' in data and isinstance(data['actual_report'], list):
df = pd.DataFrame(data['actual_report']) # Convert list to DataFrame
# Remove columns where all values are None
df.dropna(axis=1, how='all', inplace=True)
return df
else:
st.error("Unexpected response format from API.")
return None
except ValueError:
st.error("Error: Response is not valid JSON.")
return None
else:
st.error(f"Error fetching data: {response.status_code} - {response.text}")
return None
def md_to_pdf(md_text, pdf):
"""Renders basic Markdown to PDF using fpdf text functions (limited formatting)."""
md = markdown2.markdown(md_text) # Parse Markdown
lines = md.split('\n') # Split into lines
pdf.set_font("Arial", "", 12) # Set default font
for line in lines:
line = line.strip()
# Basic heading support (adjust as needed)
if line.startswith("# "):
pdf.set_font("Arial", "B", 18)
pdf.cell(0, 10, line[2:], ln=True)
elif line.startswith("## "):
pdf.set_font("Arial", "B", 16)
pdf.cell(0, 10, line[3:], ln=True)
elif line.startswith("### "):
pdf.set_font("Arial", "B", 14)
pdf.cell(0, 10, line[4:], ln=True)
# Basic bold text support (very limited)
elif "**" in line:
parts = line.split("**")
for i, part in enumerate(parts):
if i % 2 == 1: # Bold text
pdf.set_font("Arial", "B", 12)
pdf.cell(0, 10, part, ln=False) # Don't add newline for inline bold
else:
pdf.set_font("Arial", "", 12)
pdf.cell(0, 10, part, ln=False)
pdf.ln() # Newline after the whole line
# Add other basic formatting as needed...
else: # Normal text
pdf.set_font("Arial", "", 12)
pdf.multi_cell(0, 10, line) # multi_cell for wrapping
def generate_pdf(report_text):
"""Generates PDF from report text."""
pdf = FPDF()
pdf.add_page()
try:
pdf.add_font('Arial', '', 'arial.ttf', uni=True) # Add unicode support
except:
st.warning("Arial font not found. Unicode might not work.")
pdf.set_font("Arial", "", 12)
md_to_pdf(report_text, pdf)
pdf_bytes = pdf.output(dest="S").encode("latin1")
return pdf_bytes
# --- Chat Tab Functions ---
class StreamLitResponse(ResponseParser):
def __init__(self, context):
super().__init__(context)
def format_dataframe(self, result):
"""Enhanced DataFrame rendering with type identifier"""
return {
'type': 'dataframe',
'value': result['value']
}
def format_plot(self, result):
"""Enhanced plot rendering with type identifier"""
try:
image = result['value']
# Convert image to base64 for consistent storage
if isinstance(image, Image.Image):
buffered = io.BytesIO()
image.save(buffered, format="PNG")
base64_image = base64.b64encode(buffered.getvalue()).decode('utf-8')
elif isinstance(image, bytes):
base64_image = base64.b64encode(image).decode('utf-8')
elif isinstance(image, str) and os.path.exists(image):
with open(image, "rb") as f:
base64_image = base64.b64encode(f.read()).decode('utf-8')
else:
return {'type': 'text', 'value': "Unsupported image format"}
return {
'type': 'plot',
'value': base64_image
}
except Exception as e:
return {'type': 'text', 'value': f"Error processing plot: {e}"}
def format_other(self, result):
"""Handle other types of responses"""
return {
'type': 'text',
'value': str(result['value'])
}
def generateResponse(prompt, df):
"""Generate response using PandasAI with SmartDataframe"""
llm = GoogleGemini(api_key=gemini_api_key)
pandas_agent = SmartDataframe(df, config={
"llm": llm,
"response_parser": StreamLitResponse
})
return pandas_agent.chat(prompt)
def render_chat_message(message):
"""Render different types of chat messages"""
if "dataframe" in message:
st.dataframe(message["dataframe"])
elif "plot" in message:
try:
plot_data = message["plot"]
if isinstance(plot_data, str):
st.image(f"data:image/png;base64,{plot_data}")
elif isinstance(plot_data, Image.Image):
st.image(plot_data)
elif isinstance(plot_data, go.Figure):
st.plotly_chart(plot_data)
elif isinstance(plot_data, bytes):
image = Image.open(io.BytesIO(plot_data))
st.image(image)
else:
st.write("Unsupported plot format")
except Exception as e:
st.error(f"Error rendering plot: {e}")
if "content" in message:
st.markdown(message["content"])
def handle_userinput(question, df):
"""Enhanced input handling with robust content processing"""
try:
# Ensure data is loaded and not empty
if df is not None and not df.empty:
# Append user input to chat history
st.session_state.chat_history.append({
"role": "user",
"content": question
})
# Generate response with PandasAI
result = generateResponse(question, df)
if isinstance(result, dict):
response_type = result.get('type', 'text')
response_value = result.get('value')
if response_type == 'dataframe':
st.session_state.chat_history.append({
"role": "assistant",
"content": "Here's the table:",
"dataframe": response_value
})
elif response_type == 'plot':
st.session_state.chat_history.append({
"role": "assistant",
"content": "Here's the chart:",
"plot": response_value
})
else:
st.session_state.chat_history.append({
"role": "assistant",
"content": str(response_value)
})
else:
st.session_state.chat_history.append({
"role": "assistant",
"content": str(result)
})
else:
st.write("No data loaded.")
except Exception as e:
st.error(f"Error processing input: {e}")
def main():
st.set_page_config(page_title="AI Chat with Your Data", page_icon="📊")
# Initialize session state variables if not present
if "chat_history" not in st.session_state:
st.session_state.chat_history = []
if "dfs" not in st.session_state:
st.session_state.dfs = fetch_data() # Load DataFrame at startup
# Create two tabs: Chat and Reports
tab_chat, tab_reports = st.tabs(["Chat", "Reports"])
# --- Chat Tab ---
with tab_chat:
st.title("AI Chat with Your Data 📊")
# Container for chat messages so they update smoothly
chat_container = st.container()
with chat_container:
for message in st.session_state.chat_history:
with st.chat_message(message["role"]):
render_chat_message(message)
# Chat input
user_question = st.chat_input("Ask a question about your data:")
if user_question:
handle_userinput(user_question, st.session_state.dfs)
# Update chat container immediately after processing the input
chat_container.empty()
with chat_container:
for message in st.session_state.chat_history:
with st.chat_message(message["role"]):
render_chat_message(message)
# --- Reports Tab ---
# --- Reports Tab ---
with tab_reports:
st.title("Reports")
st.write("Filter by product to generate a report")
df_report = fetch_data()
if df_report is not None and not df_report.empty:
product_names = df_report["product"].unique().tolist() if "product" in df_report.columns else []
selected_products = st.multiselect("Select Product(s)", product_names, default=product_names)
if st.button("Apply Filters and Generate Report"):
filtered_df = df_report.copy()
if selected_products:
filtered_df = filtered_df[filtered_df["product"].isin(selected_products)]
st.write("Filtered DataFrame Preview:")
with st.expander("Preview"):
st.dataframe(filtered_df.head())
with st.spinner("Generating Report, Please Wait...."):
prompt = f"""
You are an expert business analyst. Analyze the following data and generate a comprehensive and insightful business report including key performance indicators and recommendations.\n\nData:\n{filtered_df.to_markdown(index=False)}
""" # Use to_markdown for better formatting
response = model.generate_content(prompt)
report = response.text
try:
st.markdown(report) # Display the report below the download button
except Exception as e:
st.error(f"Error generating report {e}")
# Fallback to displaying report in markdown if PDF fails
else:
st.error("No data available for reports.")
# --- Sidebar Options ---
with st.sidebar:
st.subheader("Options")
if st.button("Reload Data"):
with st.spinner("Fetching latest data..."):
st.session_state.dfs = fetch_data()
st.success("Data refreshed!")
if st.button("Clear Chat"):
st.session_state.chat_history = []
st.experimental_rerun()
if __name__ == "__main__":
main()