File size: 11,855 Bytes
3b709a4 7379091 3b709a4 7379091 3b709a4 e667cdb 961338b d10203d 3a18ff6 a8bb1e2 7379091 3b709a4 dfdf9d6 3b709a4 53cc0dc 3b709a4 7748df4 d10203d b789d20 2602f1d a8bb1e2 d10203d 2602f1d d10203d 7748df4 2602f1d 7748df4 d10203d 2602f1d d10203d 3b709a4 7379091 3f635ca 7379091 3b709a4 7379091 3b709a4 55c1ea8 7379091 3b709a4 7379091 3b709a4 53cc0dc 3b709a4 53cc0dc 7379091 53cc0dc 7379091 53cc0dc 7379091 53cc0dc 7379091 53cc0dc 7379091 7748df4 53cc0dc 7379091 53cc0dc 7748df4 53cc0dc 7748df4 53cc0dc a8bb1e2 7748df4 a8bb1e2 cea70dc 7748df4 fef9a70 7748df4 fef9a70 53cc0dc 3b709a4 53cc0dc 3b709a4 7379091 3b709a4 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 | import streamlit as st
import os
import pandas as pd
from pandasai import SmartDataframe
from pandasai.responses.response_parser import ResponseParser
from pandasai.llm import GoogleGemini
import plotly.graph_objects as go
from PIL import Image
import io
import base64
import requests
import google.generativeai as genai
from fpdf import FPDF
import markdown2
import re
from markdown_pdf import MarkdownPdf, Section
# API Endpoint and payload
API_URL = "https://irisplus.elixir.co.zw/public/api/profile/reporting/stock-card/genericReports"
PAYLOAD = {
"stock_card_report_id": "d2f1a0e1-7be1-472c-9610-94287154e544"
}
# Configure Gemini API
gemini_api_key = os.environ.get('GOOGLE_API_KEY')
if not gemini_api_key:
st.error("GOOGLE_API_KEY environment variable not set.")
st.stop()
genai.configure(api_key=gemini_api_key)
generation_config = {
"temperature": 0.2,
"top_p": 0.95,
"max_output_tokens": 5000,
}
model = genai.GenerativeModel(
model_name="gemini-2.0-flash-thinking-exp",
generation_config=generation_config,
)
def fetch_data():
"""Fetch stock card report data from API and return cleaned DataFrame"""
response = requests.post(API_URL, data=PAYLOAD)
if response.status_code == 200:
try:
data = response.json()
if isinstance(data, dict) and 'actual_report' in data and isinstance(data['actual_report'], list):
df = pd.DataFrame(data['actual_report']) # Convert list to DataFrame
# Remove columns where all values are None
df.dropna(axis=1, how='all', inplace=True)
return df
else:
st.error("Unexpected response format from API.")
return None
except ValueError:
st.error("Error: Response is not valid JSON.")
return None
else:
st.error(f"Error fetching data: {response.status_code} - {response.text}")
return None
def md_to_pdf(md_text, pdf):
"""Renders basic Markdown to PDF using fpdf text functions (limited formatting)."""
md = markdown2.markdown(md_text) # Parse Markdown
lines = md.split('\n') # Split into lines
pdf.set_font("Arial", "", 12) # Set default font
for line in lines:
line = line.strip()
# Basic heading support (adjust as needed)
if line.startswith("# "):
pdf.set_font("Arial", "B", 18)
pdf.cell(0, 10, line[2:], ln=True)
elif line.startswith("## "):
pdf.set_font("Arial", "B", 16)
pdf.cell(0, 10, line[3:], ln=True)
elif line.startswith("### "):
pdf.set_font("Arial", "B", 14)
pdf.cell(0, 10, line[4:], ln=True)
# Basic bold text support (very limited)
elif "**" in line:
parts = line.split("**")
for i, part in enumerate(parts):
if i % 2 == 1: # Bold text
pdf.set_font("Arial", "B", 12)
pdf.cell(0, 10, part, ln=False) # Don't add newline for inline bold
else:
pdf.set_font("Arial", "", 12)
pdf.cell(0, 10, part, ln=False)
pdf.ln() # Newline after the whole line
# Add other basic formatting as needed...
else: # Normal text
pdf.set_font("Arial", "", 12)
pdf.multi_cell(0, 10, line) # multi_cell for wrapping
def generate_pdf(report_text):
"""Generates PDF from report text."""
pdf = FPDF()
pdf.add_page()
try:
pdf.add_font('Arial', '', 'arial.ttf', uni=True) # Add unicode support
except:
st.warning("Arial font not found. Unicode might not work.")
pdf.set_font("Arial", "", 12)
md_to_pdf(report_text, pdf)
pdf_bytes = pdf.output(dest="S").encode("latin1")
return pdf_bytes
# --- Chat Tab Functions ---
class StreamLitResponse(ResponseParser):
def __init__(self, context):
super().__init__(context)
def format_dataframe(self, result):
"""Enhanced DataFrame rendering with type identifier"""
return {
'type': 'dataframe',
'value': result['value']
}
def format_plot(self, result):
"""Enhanced plot rendering with type identifier"""
try:
image = result['value']
# Convert image to base64 for consistent storage
if isinstance(image, Image.Image):
buffered = io.BytesIO()
image.save(buffered, format="PNG")
base64_image = base64.b64encode(buffered.getvalue()).decode('utf-8')
elif isinstance(image, bytes):
base64_image = base64.b64encode(image).decode('utf-8')
elif isinstance(image, str) and os.path.exists(image):
with open(image, "rb") as f:
base64_image = base64.b64encode(f.read()).decode('utf-8')
else:
return {'type': 'text', 'value': "Unsupported image format"}
return {
'type': 'plot',
'value': base64_image
}
except Exception as e:
return {'type': 'text', 'value': f"Error processing plot: {e}"}
def format_other(self, result):
"""Handle other types of responses"""
return {
'type': 'text',
'value': str(result['value'])
}
def generateResponse(prompt, df):
"""Generate response using PandasAI with SmartDataframe"""
llm = GoogleGemini(api_key=gemini_api_key)
pandas_agent = SmartDataframe(df, config={
"llm": llm,
"response_parser": StreamLitResponse
})
return pandas_agent.chat(prompt)
def render_chat_message(message):
"""Render different types of chat messages"""
if "dataframe" in message:
st.dataframe(message["dataframe"])
elif "plot" in message:
try:
plot_data = message["plot"]
if isinstance(plot_data, str):
st.image(f"data:image/png;base64,{plot_data}")
elif isinstance(plot_data, Image.Image):
st.image(plot_data)
elif isinstance(plot_data, go.Figure):
st.plotly_chart(plot_data)
elif isinstance(plot_data, bytes):
image = Image.open(io.BytesIO(plot_data))
st.image(image)
else:
st.write("Unsupported plot format")
except Exception as e:
st.error(f"Error rendering plot: {e}")
if "content" in message:
st.markdown(message["content"])
def handle_userinput(question, df):
"""Enhanced input handling with robust content processing"""
try:
# Ensure data is loaded and not empty
if df is not None and not df.empty:
# Append user input to chat history
st.session_state.chat_history.append({
"role": "user",
"content": question
})
# Generate response with PandasAI
result = generateResponse(question, df)
if isinstance(result, dict):
response_type = result.get('type', 'text')
response_value = result.get('value')
if response_type == 'dataframe':
st.session_state.chat_history.append({
"role": "assistant",
"content": "Here's the table:",
"dataframe": response_value
})
elif response_type == 'plot':
st.session_state.chat_history.append({
"role": "assistant",
"content": "Here's the chart:",
"plot": response_value
})
else:
st.session_state.chat_history.append({
"role": "assistant",
"content": str(response_value)
})
else:
st.session_state.chat_history.append({
"role": "assistant",
"content": str(result)
})
else:
st.write("No data loaded.")
except Exception as e:
st.error(f"Error processing input: {e}")
def main():
st.set_page_config(page_title="AI Chat with Your Data", page_icon="📊")
# Initialize session state variables if not present
if "chat_history" not in st.session_state:
st.session_state.chat_history = []
if "dfs" not in st.session_state:
st.session_state.dfs = fetch_data() # Load DataFrame at startup
# Create two tabs: Chat and Reports
tab_chat, tab_reports = st.tabs(["Chat", "Reports"])
# --- Chat Tab ---
with tab_chat:
st.title("AI Chat with Your Data 📊")
# Container for chat messages so they update smoothly
chat_container = st.container()
with chat_container:
for message in st.session_state.chat_history:
with st.chat_message(message["role"]):
render_chat_message(message)
# Chat input
user_question = st.chat_input("Ask a question about your data:")
if user_question:
handle_userinput(user_question, st.session_state.dfs)
# Update chat container immediately after processing the input
chat_container.empty()
with chat_container:
for message in st.session_state.chat_history:
with st.chat_message(message["role"]):
render_chat_message(message)
# --- Reports Tab ---
# --- Reports Tab ---
with tab_reports:
st.title("Reports")
st.write("Filter by product to generate a report")
df_report = fetch_data()
if df_report is not None and not df_report.empty:
product_names = df_report["product"].unique().tolist() if "product" in df_report.columns else []
selected_products = st.multiselect("Select Product(s)", product_names, default=product_names)
if st.button("Apply Filters and Generate Report"):
filtered_df = df_report.copy()
if selected_products:
filtered_df = filtered_df[filtered_df["product"].isin(selected_products)]
st.write("Filtered DataFrame Preview:")
with st.expander("Preview"):
st.dataframe(filtered_df.head())
with st.spinner("Generating Report, Please Wait...."):
prompt = f"""
You are an expert business analyst. Analyze the following data and generate a comprehensive and insightful business report including key performance indicators and recommendations.\n\nData:\n{filtered_df.to_markdown(index=False)}
""" # Use to_markdown for better formatting
response = model.generate_content(prompt)
report = response.text
try:
st.markdown(report) # Display the report below the download button
except Exception as e:
st.error(f"Error generating report {e}")
# Fallback to displaying report in markdown if PDF fails
else:
st.error("No data available for reports.")
# --- Sidebar Options ---
with st.sidebar:
st.subheader("Options")
if st.button("Reload Data"):
with st.spinner("Fetching latest data..."):
st.session_state.dfs = fetch_data()
st.success("Data refreshed!")
if st.button("Clear Chat"):
st.session_state.chat_history = []
st.experimental_rerun()
if __name__ == "__main__":
main()
|