File size: 8,204 Bytes
25951ad |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 |
import pandas as pd
import numpy as np
import gradio as gr
import plotly.express as px
import plotly.graph_objects as go
from sklearn.ensemble import IsolationForest
from datetime import datetime
import nltk
from nltk.tokenize import word_tokenize
# Download required NLTK data
nltk.download('punkt')
nltk.download('stopwords')
nltk.download('averaged_perceptron_tagger')
class AugmentedAnalytics:
def __init__(self):
self.df = None
self.date_column = None
self.numeric_columns = []
def load_data(self, file):
"""Load and preprocess the CSV data"""
try:
# Read the CSV file
self.df = pd.read_csv(file.name)
# Reset columns
self.numeric_columns = []
self.date_column = None
# Identify date and numeric columns
for col in self.df.columns:
if self.df[col].dtype in ['float64', 'int64']:
self.numeric_columns.append(col)
elif self.df[col].dtype == 'object':
try:
pd.to_datetime(self.df[col])
self.date_column = col
self.df[col] = pd.to_datetime(self.df[col])
except:
continue
# Handle missing values
self.df = self.df.fillna(method='ffill')
# Generate summary and visualization
sales_summary = self.get_sales_summary()
sales_viz = self.create_sales_overview()
status = f"Data loaded successfully! Found {len(self.numeric_columns)} numeric columns and {self.date_column if self.date_column else 'no'} date column."
return sales_summary, sales_viz, status
except Exception as e:
return (
"Error in data loading. Please check your CSV file.",
None,
f"Error: {str(e)}"
)
def get_sales_summary(self):
"""Generate a summary of sales metrics"""
try:
if 'sales' not in self.df.columns:
return "No sales data found in the dataset"
summary = f"""Sales Summary:
- Total Sales: {self.df['sales'].sum():,.2f}
- Average Daily Sales: {self.df['sales'].mean():,.2f}
- Highest Sales Day: {self.df['sales'].max():,.2f}
- Lowest Sales Day: {self.df['sales'].min():,.2f}
- Total Revenue: ${self.df['revenue'].sum():,.2f}
- Average Profit Margin: {((self.df['revenue'] - self.df['costs'])/self.df['revenue']).mean()*100:.1f}%"""
return summary
except Exception as e:
return f"Error generating summary: {str(e)}"
def create_sales_overview(self):
"""Create an overview visualization of sales trends"""
try:
if self.df is None or len(self.df) == 0:
return None
fig = go.Figure()
# Add sales line if exists
if 'sales' in self.df.columns:
fig.add_trace(go.Scatter(
x=self.df[self.date_column] if self.date_column else self.df.index,
y=self.df['sales'],
name='Sales',
line=dict(color='blue')
))
# Add revenue line if exists
if 'revenue' in self.df.columns:
fig.add_trace(go.Scatter(
x=self.df[self.date_column] if self.date_column else self.df.index,
y=self.df['revenue'],
name='Revenue',
line=dict(color='green')
))
# Add moving average if sales exists
if 'sales' in self.df.columns:
fig.add_trace(go.Scatter(
x=self.df[self.date_column] if self.date_column else self.df.index,
y=self.df['sales'].rolling(7).mean(),
name='7-day Moving Average',
line=dict(color='red', dash='dash')
))
fig.update_layout(
title='Sales and Revenue Overview',
xaxis_title='Date',
yaxis_title='Amount',
hovermode='x unified'
)
return fig
except Exception as e:
return None
def answer_sales_query(self, query):
"""Process natural language queries about sales"""
try:
if self.df is None:
return "Please load data first."
query = query.lower()
# Parse time period from query
time_period = 'all'
if 'today' in query:
time_period = 'today'
elif 'week' in query:
time_period = 'week'
elif 'month' in query:
time_period = 'month'
elif 'year' in query:
time_period = 'year'
# Parse metric from query
metric = 'sales'
if 'revenue' in query:
metric = 'revenue'
elif 'profit' in query:
metric = 'profit'
elif 'cost' in query:
metric = 'costs'
if metric not in self.df.columns:
return f"No {metric} data found in the dataset"
# Calculate the requested value
if time_period == 'today':
value = self.df[metric].iloc[-1]
elif time_period == 'week':
value = self.df[metric].tail(7).mean()
elif time_period == 'month':
value = self.df[metric].tail(30).mean()
elif time_period == 'year':
value = self.df[metric].mean()
else:
value = self.df[metric].sum()
return f"{time_period.capitalize()} {metric}: {value:,.2f}"
except Exception as e:
return f"Error processing query: {str(e)}"
def create_gradio_interface():
"""Create the Gradio interface"""
analytics = AugmentedAnalytics()
with gr.Blocks() as interface:
gr.Markdown("# Augmented Analytics Dashboard")
with gr.Row():
file_input = gr.File(label="Upload CSV File")
load_status = gr.Textbox(label="Status", interactive=False)
with gr.Row():
sales_summary = gr.Textbox(
label="Sales Summary",
lines=8,
interactive=False
)
with gr.Row():
query_input = gr.Textbox(
label="Ask about sales (e.g., 'How much sales this week?' or 'Show monthly revenue')",
placeholder="Type your question here...",
interactive=True
)
query_output = gr.Textbox(label="Answer", interactive=False)
with gr.Row():
output_plot = gr.Plot(label="Sales Visualization")
def process_query(query, file):
try:
if analytics.df is None and file is not None:
analytics.load_data(file)
return analytics.answer_sales_query(query)
except Exception as e:
return f"Error: {str(e)}"
def load_data_callback(file):
if file is None:
return "Please upload a file.", "", None
return analytics.load_data(file)
# Set up event handlers
file_input.change(
load_data_callback,
inputs=[file_input],
outputs=[sales_summary, output_plot, load_status]
)
query_input.change(
process_query,
inputs=[query_input, file_input],
outputs=[query_output]
)
return interface
# Launch the interface
if __name__ == "__main__":
interface = create_gradio_interface()
interface.launch(share=True) |